make JobEngine be a workitem of mail pool (smartThread), with the option to release thread after a idle time, so is free to do other service elsewhere

0.9.0-post-fixes
UbitUmarov 2017-06-13 18:50:34 +01:00
parent 5ce15566ac
commit 482ff06e13
1 changed files with 41 additions and 48 deletions

View File

@ -57,7 +57,8 @@ namespace OpenSim.Framework.Monitoring
/// <remarks> /// <remarks>
/// Will be null if no job is currently running. /// Will be null if no job is currently running.
/// </remarks> /// </remarks>
public Job CurrentJob { get; private set; } private Job m_currentJob;
public Job CurrentJob { get { return m_currentJob;} }
/// <summary> /// <summary>
/// Number of jobs waiting to be processed. /// Number of jobs waiting to be processed.
@ -82,16 +83,15 @@ namespace OpenSim.Framework.Monitoring
private CancellationTokenSource m_cancelSource; private CancellationTokenSource m_cancelSource;
/// <summary> private int m_timeout = -1;
/// Used to signal that we are ready to complete stop.
/// </summary>
private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false);
public JobEngine(string name, string loggingName) private bool m_threadRunnig = false;
public JobEngine(string name, string loggingName, int timeout = -1)
{ {
Name = name; Name = name;
LoggingName = loggingName; LoggingName = loggingName;
m_timeout = timeout;
RequestProcessTimeoutOnStop = 5000; RequestProcessTimeoutOnStop = 5000;
} }
@ -104,18 +104,9 @@ namespace OpenSim.Framework.Monitoring
IsRunning = true; IsRunning = true;
m_finishedProcessingAfterStop.Reset();
m_cancelSource = new CancellationTokenSource(); m_cancelSource = new CancellationTokenSource();
WorkManager.RunInThreadPool(ProcessRequests, null, Name, false);
WorkManager.StartThread( m_threadRunnig = true;
ProcessRequests,
Name,
ThreadPriority.Normal,
false,
true,
null,
int.MaxValue);
} }
} }
@ -131,20 +122,16 @@ namespace OpenSim.Framework.Monitoring
m_log.DebugFormat("[JobEngine] Stopping {0}", Name); m_log.DebugFormat("[JobEngine] Stopping {0}", Name);
IsRunning = false; IsRunning = false;
if(m_threadRunnig)
m_finishedProcessingAfterStop.Reset(); {
if(m_jobQueue.Count <= 0)
m_cancelSource.Cancel(); m_cancelSource.Cancel();
m_threadRunnig = false;
m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop); }
m_finishedProcessingAfterStop.Close();
} }
finally finally
{ {
if(m_cancelSource != null) if(m_cancelSource != null)
m_cancelSource.Dispose(); m_cancelSource.Dispose();
if(m_finishedProcessingAfterStop != null)
m_finishedProcessingAfterStop.Dispose();
} }
} }
} }
@ -203,6 +190,18 @@ namespace OpenSim.Framework.Monitoring
/// </param> /// </param>
public bool QueueJob(Job job) public bool QueueJob(Job job)
{ {
lock(JobLock)
{
if(!IsRunning)
return false;
if(!m_threadRunnig)
{
WorkManager.RunInThreadPool(ProcessRequests, null, Name, false);
m_threadRunnig = true;
}
}
if (m_jobQueue.Count < m_jobQueue.BoundedCapacity) if (m_jobQueue.Count < m_jobQueue.BoundedCapacity)
{ {
m_jobQueue.Add(job); m_jobQueue.Add(job);
@ -222,31 +221,28 @@ namespace OpenSim.Framework.Monitoring
m_warnOverMaxQueue = false; m_warnOverMaxQueue = false;
} }
return false; return false;
} }
} }
private void ProcessRequests() private void ProcessRequests(Object o)
{ {
while(IsRunning || m_jobQueue.Count > 0) while(IsRunning)
{ {
try try
{ {
CurrentJob = m_jobQueue.Take(m_cancelSource.Token); if(!m_jobQueue.TryTake(out m_currentJob, m_timeout, m_cancelSource.Token))
{
lock(JobLock)
m_threadRunnig = false;
break;
}
} }
catch(ObjectDisposedException e) catch(ObjectDisposedException e)
{ {
// If we see this whilst not running then it may be due to a race where this thread checks m_log.DebugFormat("[JobEngine] {0} stopping ignoring {1} jobs in queue",
// IsRunning after the stopping thread sets it to false and disposes of the cancellation source. Name,m_jobQueue.Count);
if(IsRunning) break;
throw e;
else
{
m_log.DebugFormat("[JobEngine] {0} stopping ignoring {1} jobs in queue",
Name,m_jobQueue.Count);
break;
}
} }
catch(OperationCanceledException) catch(OperationCanceledException)
{ {
@ -254,27 +250,24 @@ namespace OpenSim.Framework.Monitoring
} }
if(LogLevel >= 1) if(LogLevel >= 1)
m_log.DebugFormat("[{0}]: Processing job {1}",LoggingName,CurrentJob.Name); m_log.DebugFormat("[{0}]: Processing job {1}",LoggingName,m_currentJob.Name);
try try
{ {
CurrentJob.Action(); m_currentJob.Action();
} }
catch(Exception e) catch(Exception e)
{ {
m_log.Error( m_log.Error(
string.Format( string.Format(
"[{0}]: Job {1} failed, continuing. Exception ",LoggingName,CurrentJob.Name),e); "[{0}]: Job {1} failed, continuing. Exception ",LoggingName,m_currentJob.Name),e);
} }
if(LogLevel >= 1) if(LogLevel >= 1)
m_log.DebugFormat("[{0}]: Processed job {1}",LoggingName,CurrentJob.Name); m_log.DebugFormat("[{0}]: Processed job {1}",LoggingName,m_currentJob.Name);
CurrentJob = null; m_currentJob = null;
} }
Watchdog.RemoveThread(false);
m_finishedProcessingAfterStop.Set();
} }
public class Job public class Job