change JobEngine stop code and add a extra check for thread removed on watchdog timeout checks

LSLKeyTest
UbitUmarov 2015-11-27 13:28:10 +00:00
parent 3029080d9b
commit baf8e762a6
2 changed files with 78 additions and 88 deletions

View File

@ -40,6 +40,8 @@ namespace OpenSim.Framework.Monitoring
public int LogLevel { get; set; } public int LogLevel { get; set; }
private object JobLock = new object();
public string Name { get; private set; } public string Name { get; private set; }
public string LoggingName { get; private set; } public string LoggingName { get; private set; }
@ -95,7 +97,7 @@ namespace OpenSim.Framework.Monitoring
public void Start() public void Start()
{ {
lock (this) lock (JobLock)
{ {
if (IsRunning) if (IsRunning)
return; return;
@ -119,43 +121,22 @@ namespace OpenSim.Framework.Monitoring
public void Stop() public void Stop()
{ {
lock (this) lock (JobLock)
{ {
try try
{ {
if (!IsRunning) if (!IsRunning)
return; return;
m_log.DebugFormat("[JobEngine] Stopping {0}", Name);
IsRunning = false; IsRunning = false;
int requestsLeft = m_jobQueue.Count; m_finishedProcessingAfterStop.Reset();
if(m_jobQueue.Count <= 0)
if (requestsLeft <= 0)
{
m_cancelSource.Cancel(); m_cancelSource.Cancel();
}
else
{
m_log.InfoFormat("[{0}]: Waiting to write {1} events after stop.", LoggingName, requestsLeft);
while (requestsLeft > 0) m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop);
{
if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop))
{
// After timeout no events have been written
if (requestsLeft == m_jobQueue.Count)
{
m_log.WarnFormat(
"[{0}]: No requests processed after {1} ms wait. Discarding remaining {2} requests",
LoggingName, RequestProcessTimeoutOnStop, requestsLeft);
break;
}
}
requestsLeft = m_jobQueue.Count;
}
}
} }
finally finally
{ {
@ -244,48 +225,51 @@ namespace OpenSim.Framework.Monitoring
private void ProcessRequests() private void ProcessRequests()
{ {
try while(IsRunning || m_jobQueue.Count > 0)
{
while (IsRunning || m_jobQueue.Count > 0)
{ {
try try
{ {
CurrentJob = m_jobQueue.Take(m_cancelSource.Token); CurrentJob = m_jobQueue.Take(m_cancelSource.Token);
} }
catch (ObjectDisposedException e) catch(ObjectDisposedException e)
{ {
// If we see this whilst not running then it may be due to a race where this thread checks // If we see this whilst not running then it may be due to a race where this thread checks
// IsRunning after the stopping thread sets it to false and disposes of the cancellation source. // IsRunning after the stopping thread sets it to false and disposes of the cancellation source.
if (IsRunning) if(IsRunning)
throw e; throw e;
else else
{
m_log.DebugFormat("[JobEngine] {0} stopping ignoring {1} jobs in queue",
Name,m_jobQueue.Count);
break;
}
}
catch(OperationCanceledException)
{
break; break;
} }
if (LogLevel >= 1) if(LogLevel >= 1)
m_log.DebugFormat("[{0}]: Processing job {1}", LoggingName, CurrentJob.Name); m_log.DebugFormat("[{0}]: Processing job {1}",LoggingName,CurrentJob.Name);
try try
{ {
CurrentJob.Action(); CurrentJob.Action();
} }
catch (Exception e) catch(Exception e)
{ {
m_log.Error( m_log.Error(
string.Format( string.Format(
"[{0}]: Job {1} failed, continuing. Exception ", LoggingName, CurrentJob.Name), e); "[{0}]: Job {1} failed, continuing. Exception ",LoggingName,CurrentJob.Name),e);
} }
if (LogLevel >= 1) if(LogLevel >= 1)
m_log.DebugFormat("[{0}]: Processed job {1}", LoggingName, CurrentJob.Name); m_log.DebugFormat("[{0}]: Processed job {1}",LoggingName,CurrentJob.Name);
CurrentJob = null; CurrentJob = null;
} }
}
catch (OperationCanceledException)
{
}
Watchdog.RemoveThread(false);
m_finishedProcessingAfterStop.Set(); m_finishedProcessingAfterStop.Set();
} }

View File

@ -333,28 +333,35 @@ namespace OpenSim.Framework.Monitoring
{ {
List<ThreadWatchdogInfo> callbackInfos = null; List<ThreadWatchdogInfo> callbackInfos = null;
lock (m_threads)
{
// get a copy since we may change m_threads // get a copy since we may change m_threads
List<ThreadWatchdogInfo> threadsInfo = m_threads.Values.ToList(); List<ThreadWatchdogInfo> threadsInfo;
lock (m_threads)
threadsInfo = m_threads.Values.ToList();
foreach (ThreadWatchdogInfo threadInfo in threadsInfo) foreach (ThreadWatchdogInfo threadInfo in threadsInfo)
{ {
if (threadInfo.Thread.ThreadState == ThreadState.Stopped) lock (m_threads)
{
if(!m_threads.ContainsValue(threadInfo))
continue;
}
if(threadInfo.Thread.ThreadState == ThreadState.Stopped)
{ {
RemoveThread(threadInfo.Thread.ManagedThreadId); RemoveThread(threadInfo.Thread.ManagedThreadId);
if (callbackInfos == null) if(callbackInfos == null)
callbackInfos = new List<ThreadWatchdogInfo>(); callbackInfos = new List<ThreadWatchdogInfo>();
callbackInfos.Add(threadInfo); callbackInfos.Add(threadInfo);
} }
else if (!threadInfo.IsTimedOut && now - threadInfo.LastTick >= threadInfo.Timeout) else if(!threadInfo.IsTimedOut && now - threadInfo.LastTick >= threadInfo.Timeout)
{ {
threadInfo.IsTimedOut = true; threadInfo.IsTimedOut = true;
if (threadInfo.AlarmIfTimeout) if(threadInfo.AlarmIfTimeout)
{ {
if (callbackInfos == null) if(callbackInfos == null)
callbackInfos = new List<ThreadWatchdogInfo>(); callbackInfos = new List<ThreadWatchdogInfo>();
// Send a copy of the watchdog info to prevent race conditions where the watchdog // Send a copy of the watchdog info to prevent race conditions where the watchdog
@ -363,9 +370,8 @@ namespace OpenSim.Framework.Monitoring
} }
} }
} }
}
if (callbackInfos != null) if(callbackInfos != null)
foreach (ThreadWatchdogInfo callbackInfo in callbackInfos) foreach (ThreadWatchdogInfo callbackInfo in callbackInfos)
callback(callbackInfo); callback(callbackInfo);
} }