using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading; using System.Threading.Tasks; using HeuristicLab.Clients.Hive; using HeuristicLab.Core; namespace HeuristicLab.Problems.Modifiers { /// /// non-storable poller that downloads hive jobs /// public class HiveEvaluationResultPoller { protected static readonly object LogLocker = new object(); protected static readonly Semaphore Throttle = new Semaphore(90, 90); //The Hive server throttles connections down to 100 } public class HiveEvaluationResultPoller : HiveEvaluationResultPoller where T : IExecutable { private Thread thread; private readonly object locker = new object(); private readonly Dictionary> jobs = new Dictionary>(); public TimeSpan Interval { get; set; } public string LogFile; public HiveEvaluationResultPoller(TimeSpan interval) { Interval = interval; } public TaskCompletionSource StartEvaluation(T evaluation, HiveApi.Options options) { HiveApi.RefreshHiveClient(out var projects, out _); var projectId = HiveApi.GuardProjectId(options.ProjectId, projects); var resourceIds = options.ResourceIds; var jobName = HiveApi.GuardJobName(new[] { evaluation }, options.JobName); var distribute = options.Distribute; using (var refreshableJob = HiveApi.PackJob(new[] { evaluation }, projectId, resourceIds, jobName, distribute)) { Throttle.WaitOne(); try { HiveClient.Store(refreshableJob, CancellationToken.None); } catch (Exception e) { WriteToLog(new[] { "Exception occured when uploading Job : " + DateTime.Now, e.ToString(), "========" }); throw; } finally { Throttle.Release(); } var comp = new TaskCompletionSource(); lock (locker) { var startNew = jobs.Count == 0; jobs.Add(refreshableJob.Id, comp); if (startNew) Start(); } return comp; } } #region Helpers private void Start() { thread = new Thread(RunPolling); thread.Start(); } private void RunPolling() { var stopRequested = false; do { try { Thread.Sleep(Interval); stopRequested = FetchJobResults(); HiveApi.RefreshHiveClient(out _, out _); } catch (Exception e) { WriteToLog(new[] { "Poller Exception occurred at : " + DateTime.Now, e.ToString(), "========" }); } } while (!stopRequested); } private bool FetchJobResults() { var stopping = false; HiveServiceLocator.Instance.CallHiveService(service => { Guid[] tasks1; lock (locker) { tasks1 = jobs.Keys.ToArray(); } var toRemoves = new HashSet(); foreach (var x in tasks1) { LightweightTask task; try { task = service.GetLightweightJobTasksWithoutStateLog(x).Single(); } catch (Exception e) { toRemoves.Add(x); lock (locker) { if (jobs.ContainsKey(x)) jobs[x].SetException(e); } continue; } switch (task.State) { case TaskState.Offline: case TaskState.Waiting: case TaskState.Transferring: case TaskState.Calculating: case TaskState.Paused: break; case TaskState.Finished: var evaluation = (T)PersistenceUtil.Deserialize(service.GetTaskData(task.Id).Data).Item; HiveClient.Delete(service.GetJob(x)); toRemoves.Add(x); lock (locker) { if (jobs.ContainsKey(x)) jobs[x].SetResult(evaluation); } break; case TaskState.Aborted: case TaskState.Failed: toRemoves.Add(x); lock (locker) { if (jobs.ContainsKey(x)) jobs[x].SetException(new HiveException($"The evaluation with job id {x} and task id {task.Id} failed. Please consult the Hive Job-Manager for further information")); } break; default: throw new ArgumentOutOfRangeException(); } } lock (locker) { foreach (var x in toRemoves.Where(x => jobs.ContainsKey(x))) { jobs.Remove(x); if (jobs.Count != 0) continue; stopping = true; thread = null; } } }); return stopping; } private void WriteToLog(IEnumerable lines) { if (LogFile == null) return; lock (LogLocker) { try { File.AppendAllLines(LogFile, lines); } catch { // ignored } } } #endregion } }