diff --git a/OpenSim/Framework/Monitoring/JobEngine.cs b/OpenSim/Framework/Monitoring/JobEngine.cs
index a6a059dcc6..115871e4a7 100644
--- a/OpenSim/Framework/Monitoring/JobEngine.cs
+++ b/OpenSim/Framework/Monitoring/JobEngine.cs
@@ -57,7 +57,8 @@ namespace OpenSim.Framework.Monitoring
///
/// Will be null if no job is currently running.
///
- public Job CurrentJob { get; private set; }
+ private Job m_currentJob;
+ public Job CurrentJob { get { return m_currentJob;} }
///
/// Number of jobs waiting to be processed.
@@ -82,16 +83,15 @@ namespace OpenSim.Framework.Monitoring
private CancellationTokenSource m_cancelSource;
- ///
- /// Used to signal that we are ready to complete stop.
- ///
- private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false);
+ private int m_timeout = -1;
- public JobEngine(string name, string loggingName)
+ private bool m_threadRunnig = false;
+
+ public JobEngine(string name, string loggingName, int timeout = -1)
{
Name = name;
LoggingName = loggingName;
-
+ m_timeout = timeout;
RequestProcessTimeoutOnStop = 5000;
}
@@ -104,18 +104,9 @@ namespace OpenSim.Framework.Monitoring
IsRunning = true;
- m_finishedProcessingAfterStop.Reset();
-
m_cancelSource = new CancellationTokenSource();
-
- WorkManager.StartThread(
- ProcessRequests,
- Name,
- ThreadPriority.Normal,
- false,
- true,
- null,
- int.MaxValue);
+ WorkManager.RunInThreadPool(ProcessRequests, null, Name, false);
+ m_threadRunnig = true;
}
}
@@ -131,20 +122,16 @@ namespace OpenSim.Framework.Monitoring
m_log.DebugFormat("[JobEngine] Stopping {0}", Name);
IsRunning = false;
-
- m_finishedProcessingAfterStop.Reset();
- if(m_jobQueue.Count <= 0)
+ if(m_threadRunnig)
+ {
m_cancelSource.Cancel();
-
- m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop);
- m_finishedProcessingAfterStop.Close();
+ m_threadRunnig = false;
+ }
}
finally
{
if(m_cancelSource != null)
m_cancelSource.Dispose();
- if(m_finishedProcessingAfterStop != null)
- m_finishedProcessingAfterStop.Dispose();
}
}
}
@@ -203,6 +190,18 @@ namespace OpenSim.Framework.Monitoring
///
public bool QueueJob(Job job)
{
+ lock(JobLock)
+ {
+ if(!IsRunning)
+ return false;
+
+ if(!m_threadRunnig)
+ {
+ WorkManager.RunInThreadPool(ProcessRequests, null, Name, false);
+ m_threadRunnig = true;
+ }
+ }
+
if (m_jobQueue.Count < m_jobQueue.BoundedCapacity)
{
m_jobQueue.Add(job);
@@ -222,31 +221,28 @@ namespace OpenSim.Framework.Monitoring
m_warnOverMaxQueue = false;
}
-
return false;
}
}
- private void ProcessRequests()
+ private void ProcessRequests(Object o)
{
- while(IsRunning || m_jobQueue.Count > 0)
+ while(IsRunning)
{
try
{
- CurrentJob = m_jobQueue.Take(m_cancelSource.Token);
+ if(!m_jobQueue.TryTake(out m_currentJob, m_timeout, m_cancelSource.Token))
+ {
+ lock(JobLock)
+ m_threadRunnig = false;
+ break;
+ }
}
catch(ObjectDisposedException e)
{
- // If we see this whilst not running then it may be due to a race where this thread checks
- // IsRunning after the stopping thread sets it to false and disposes of the cancellation source.
- if(IsRunning)
- throw e;
- else
- {
- m_log.DebugFormat("[JobEngine] {0} stopping ignoring {1} jobs in queue",
- Name,m_jobQueue.Count);
- break;
- }
+ m_log.DebugFormat("[JobEngine] {0} stopping ignoring {1} jobs in queue",
+ Name,m_jobQueue.Count);
+ break;
}
catch(OperationCanceledException)
{
@@ -254,27 +250,24 @@ namespace OpenSim.Framework.Monitoring
}
if(LogLevel >= 1)
- m_log.DebugFormat("[{0}]: Processing job {1}",LoggingName,CurrentJob.Name);
+ m_log.DebugFormat("[{0}]: Processing job {1}",LoggingName,m_currentJob.Name);
try
{
- CurrentJob.Action();
+ m_currentJob.Action();
}
catch(Exception e)
{
m_log.Error(
string.Format(
- "[{0}]: Job {1} failed, continuing. Exception ",LoggingName,CurrentJob.Name),e);
+ "[{0}]: Job {1} failed, continuing. Exception ",LoggingName,m_currentJob.Name),e);
}
if(LogLevel >= 1)
- m_log.DebugFormat("[{0}]: Processed job {1}",LoggingName,CurrentJob.Name);
+ m_log.DebugFormat("[{0}]: Processed job {1}",LoggingName,m_currentJob.Name);
- CurrentJob = null;
+ m_currentJob = null;
}
-
- Watchdog.RemoveThread(false);
- m_finishedProcessingAfterStop.Set();
}
public class Job