#region License Information /* HeuristicLab * Copyright (C) 2002-2008 Heuristic and Evolutionary Algorithms Laboratory (HEAL) * * This file is part of HeuristicLab. * * HeuristicLab is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * HeuristicLab is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with HeuristicLab. If not, see . */ #endregion using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.ServiceModel; using HeuristicLab.Grid; using System.Threading; using HeuristicLab.Core; using System.IO; using System.Windows.Forms; using System.Diagnostics; namespace HeuristicLab.Grid { public class JobExecutionException : ApplicationException { public JobExecutionException(string msg) : base(msg) { } } public class JobManager { private const int MAX_RESTARTS = 5; private const int RESULT_POLLING_TIMEOUT = 5; private IGridServer server; private object waitingQueueLock = new object(); private Queue waitingJobs = new Queue(); private object runningQueueLock = new object(); private Queue runningJobs = new Queue(); private AutoResetEvent runningWaitHandle = new AutoResetEvent(false); private AutoResetEvent waitingWaitHandle = new AutoResetEvent(false); public JobManager(IGridServer server) { this.server = server; Thread starterThread = new Thread(StartEngines); Thread resultsGatheringThread = new Thread(GetResults); starterThread.Start(); resultsGatheringThread.Start(); } public void Reset() { lock (waitingQueueLock) { foreach (AsyncGridResult r in waitingJobs) { r.WaitHandle.Close(); } waitingJobs.Clear(); } lock (runningQueueLock) { foreach (AsyncGridResult r in runningJobs) { r.WaitHandle.Close(); } runningJobs.Clear(); } } public void StartEngines() { try { while (true) { AsyncGridResult job = null; lock (waitingQueueLock) { if (waitingJobs.Count > 0) job = waitingJobs.Dequeue(); } if (job == null) waitingWaitHandle.WaitOne(); // no jobs waiting else { Guid currentEngineGuid = server.BeginExecuteEngine(ZipEngine(job.Engine)); if (currentEngineGuid == Guid.Empty) { // couldn't start the job -> requeue if (job.Restarts < MAX_RESTARTS) { job.Restarts++; lock (waitingQueueLock) waitingJobs.Enqueue(job); waitingWaitHandle.Set(); } else { // max restart count reached -> give up on this job and flag error job.Aborted = true; job.SignalFinished(); } } else { // job started successfully job.Guid = currentEngineGuid; lock (runningQueueLock) { runningJobs.Enqueue(job); runningWaitHandle.Set(); } } } } } catch (Exception e) { HeuristicLab.Tracing.Logger.Error("Exception " + e + " in JobManager.StartEngines() killed the start-engine thread\n" + e.StackTrace); } } public void GetResults() { try { while (true) { AsyncGridResult job = null; lock (runningQueueLock) { if (runningJobs.Count > 0) job = runningJobs.Dequeue(); } if (job == null) runningWaitHandle.WaitOne(); // no jobs running else { byte[] zippedResult = server.TryEndExecuteEngine(job.Guid); if (zippedResult != null) { // successful => store result job.ZippedResult = zippedResult; // notify consumer that result is ready job.SignalFinished(); } else { // there was a problem -> check the state of the job and restart if necessary JobState jobState = server.JobState(job.Guid); if (jobState == JobState.Unknown) { job.Restarts++; lock (waitingQueueLock) { waitingJobs.Enqueue(job); waitingWaitHandle.Set(); } } else { // job still active at the server lock (runningQueueLock) { runningJobs.Enqueue(job); runningWaitHandle.Set(); } Thread.Sleep(TimeSpan.FromSeconds(RESULT_POLLING_TIMEOUT)); // sleep a while before trying to get the next result } } } } } catch (Exception e) { HeuristicLab.Tracing.Logger.Error("Exception " + e + " in JobManager.GetResults() killed the results-gathering thread\n" + e.StackTrace); } } public AsyncGridResult BeginExecuteEngine(ProcessingEngine engine) { AsyncGridResult asyncResult = new AsyncGridResult(engine); asyncResult.Engine = engine; lock (waitingQueueLock) { waitingJobs.Enqueue(asyncResult); } waitingWaitHandle.Set(); return asyncResult; } private byte[] ZipEngine(IEngine engine) { return PersistenceManager.SaveToGZip(engine); } public IEngine EndExecuteEngine(AsyncGridResult asyncResult) { if (asyncResult.Aborted) { throw new JobExecutionException("Maximal number of job restarts reached. There is a problem with the connection to the grid-server."); } else { // restore the engine return (IEngine)PersistenceManager.RestoreFromGZip(asyncResult.ZippedResult); } } } }