using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using HeuristicLab.Clients.Hive;
using HeuristicLab.Core;
namespace HeuristicLab.Problems.Modifiers {
///
/// non-storable poller that downloads hive jobs
///
public class HiveEvaluationResultPoller {
protected static readonly object LogLocker = new object();
protected static readonly Semaphore Throttle = new Semaphore(90, 90); //The Hive server throttles connections down to 100
}
public class HiveEvaluationResultPoller : HiveEvaluationResultPoller where T : IExecutable {
private Thread thread;
private readonly object locker = new object();
private readonly Dictionary> jobs = new Dictionary>();
public TimeSpan Interval { get; set; }
public string LogFile;
public HiveEvaluationResultPoller(TimeSpan interval) {
Interval = interval;
}
public TaskCompletionSource StartEvaluation(T evaluation, HiveApi.Options options) {
HiveApi.RefreshHiveClient(out var projects, out _);
var projectId = HiveApi.GuardProjectId(options.ProjectId, projects);
var resourceIds = options.ResourceIds;
var jobName = HiveApi.GuardJobName(new[] { evaluation }, options.JobName);
var distribute = options.Distribute;
using (var refreshableJob = HiveApi.PackJob(new[] { evaluation }, projectId, resourceIds, jobName, distribute)) {
Throttle.WaitOne();
try {
HiveClient.Store(refreshableJob, CancellationToken.None);
} catch (Exception e) {
WriteToLog(new[] { "Exception occured when uploading Job : " + DateTime.Now, e.ToString(), "========" });
throw;
} finally {
Throttle.Release();
}
var comp = new TaskCompletionSource();
lock (locker) {
var startNew = jobs.Count == 0;
jobs.Add(refreshableJob.Id, comp);
if (startNew) Start();
}
return comp;
}
}
#region Helpers
private void Start() {
thread = new Thread(RunPolling);
thread.Start();
}
private void RunPolling() {
var stopRequested = false;
do {
try {
Thread.Sleep(Interval);
stopRequested = FetchJobResults();
HiveApi.RefreshHiveClient(out _, out _);
} catch (Exception e) {
WriteToLog(new[] { "Poller Exception occurred at : " + DateTime.Now, e.ToString(), "========" });
}
} while (!stopRequested);
}
private bool FetchJobResults() {
var stopping = false;
HiveServiceLocator.Instance.CallHiveService(service => {
Guid[] tasks1;
lock (locker) {
tasks1 = jobs.Keys.ToArray();
}
var toRemoves = new HashSet();
foreach (var x in tasks1) {
LightweightTask task;
try {
task = service.GetLightweightJobTasksWithoutStateLog(x).Single();
} catch (Exception e) {
toRemoves.Add(x);
lock (locker) {
if (jobs.ContainsKey(x)) jobs[x].SetException(e);
}
continue;
}
switch (task.State) {
case TaskState.Offline:
case TaskState.Waiting:
case TaskState.Transferring:
case TaskState.Calculating:
case TaskState.Paused:
break;
case TaskState.Finished:
var evaluation = (T)PersistenceUtil.Deserialize(service.GetTaskData(task.Id).Data).Item;
HiveClient.Delete(service.GetJob(x));
toRemoves.Add(x);
lock (locker) {
if (jobs.ContainsKey(x))
jobs[x].SetResult(evaluation);
}
break;
case TaskState.Aborted:
case TaskState.Failed:
toRemoves.Add(x);
lock (locker) {
if (jobs.ContainsKey(x))
jobs[x].SetException(new HiveException($"The evaluation with job id {x} and task id {task.Id} failed. Please consult the Hive Job-Manager for further information"));
}
break;
default:
throw new ArgumentOutOfRangeException();
}
}
lock (locker) {
foreach (var x in toRemoves.Where(x => jobs.ContainsKey(x))) {
jobs.Remove(x);
if (jobs.Count != 0) continue;
stopping = true;
thread = null;
}
}
});
return stopping;
}
private void WriteToLog(IEnumerable lines) {
if (LogFile == null) return;
lock (LogLocker) {
try {
File.AppendAllLines(LogFile, lines);
} catch { // ignored
}
}
}
#endregion
}
}