using System; using System.Collections.Generic; using System.Linq; using System.Threading; using HeuristicLab.Clients.Hive; using HeuristicLab.Core; using HeuristicLab.Optimization; namespace HeuristicLab.Problems.Modifiers { public static class HiveApi { private static readonly object locker = new object(); public class Options { public Guid ProjectId { get; set; } public IEnumerable ResourceIds { get; set; } public string JobName { get; set; } public bool Distribute { get; set; } } public static IEnumerable ExecuteInHive(IEnumerable executables, Options options, CancellationToken cancellationToken) where T : IExecutable { if (options == null) { options = new Options { ProjectId = Guid.Empty, ResourceIds = new Guid[0], JobName = string.Empty, Distribute = true }; } var safeExecutables = executables as T[] ?? executables.ToArray(); if (!safeExecutables.Any()) throw new ArgumentException("At least one executable must be specified."); Exception exception = null; bool jobFinished; var tries = 0; const int maxRetries = 5; do { exception = null; Project[] projects; lock (locker) { HiveClient.Instance.Refresh(); projects = HiveClient.Instance.Projects.ToArray(); } var projectId = GuardProjectId(options.ProjectId, projects); var resourceIds = GuardResourceIds(options.ResourceIds, projectId); var jobName = GuardJobName(safeExecutables, options.JobName); var distribute = options.Distribute; using (var refreshableJob = PackJob(safeExecutables, projectId, resourceIds, jobName, distribute)) { HiveClient.Store(refreshableJob, cancellationToken); var taskIds = refreshableJob.HiveTasks.Select(x => x.Task.Id).ToArray(); using (var signal = SetupWaitHandle(refreshableJob, e => exception = e)) { refreshableJob.StartResultPolling(); try { signal.Wait(cancellationToken); } catch (OperationCanceledException) { } finally { HiveClient.LoadJob(refreshableJob); executables = UnpackJob(refreshableJob, taskIds); HiveClient.Delete(refreshableJob); // keep problematic jobs for debugging } } jobFinished = refreshableJob.IsFinished(); } } while (tries++ < maxRetries && (exception != null || !jobFinished)); return executables; } public static void RefreshHiveClient(out Project[] projects, out Resource[] resources) { lock (locker) { HiveClient.Instance.Refresh(); projects = HiveClient.Instance.Projects.ToArray(); resources = HiveClient.Instance.Resources.ToArray(); } } public static Guid GuardProjectId(Guid projectId, IEnumerable projects) { Project selectedProject; if (projectId == Guid.Empty) { selectedProject = projects.FirstOrDefault(); if (selectedProject == null) throw new ArgumentException("A default project is not available."); } else { selectedProject = projects.SingleOrDefault(x => x.Id == projectId); if (selectedProject == null) throw new ArgumentException("The specified project is not available."); } return selectedProject.Id; } public static IEnumerable GuardResourceIds(IEnumerable resourceIds, Guid projectId) { Resource[] availableResources; lock (locker) availableResources = HiveClient.Instance.GetAvailableResourcesForProject(projectId).ToArray(); var availableResourceIds = availableResources.Select(x => x.Id).ToArray(); var guardResourceIds = resourceIds as Guid[] ?? resourceIds.ToArray(); var unavailableResources = guardResourceIds.Except(availableResourceIds); if (unavailableResources.Any()) throw new ArgumentException("Some of the specified resources are not available for the specified project."); return guardResourceIds; } public static string GuardJobName(IEnumerable executables, string jobName) where T : IExecutable { if (string.IsNullOrEmpty(jobName)) jobName = string.Join(" + ", executables); return jobName; } public static RefreshableJob PackJob(IEnumerable executables, Guid projectId, IEnumerable resourceIds, string jobName, bool distribute) where T : IExecutable { var refreshableJob = new RefreshableJob() { Job = { Name = jobName, ProjectId = projectId, ResourceIds = resourceIds.ToList() } }; foreach (var executable in executables) { var itemTask = ItemTask.GetItemTaskForItem(executable); itemTask.ComputeInParallel = distribute && (executable is Experiment || executable is BatchRun); var hiveTask = itemTask.CreateHiveTask(); refreshableJob.HiveTasks.Add(hiveTask); } return refreshableJob; } public static IEnumerable UnpackJob(RefreshableJob refreshableJob, IList taskIds) where T : IExecutable { var hiveTasks = refreshableJob.HiveTasks.OrderBy(x => taskIds.IndexOf(x.Task.Id)); foreach (var hiveTask in hiveTasks) { var executable = (T)hiveTask.ItemTask.Item; yield return executable; } } public static ManualResetEventSlim SetupWaitHandle(RefreshableJob refreshableJob, Action exceptionCallback) { var signal = new ManualResetEventSlim(false); refreshableJob.StateLogListChanged += (sender, args) => { if (refreshableJob.IsFinished()) signal.Set(); }; refreshableJob.ExceptionOccured += (sender, args) => { exceptionCallback(args.Value); signal.Set(); }; return signal; } } }