diff --git a/OpenSim/Framework/Monitoring/JobEngine.cs b/OpenSim/Framework/Monitoring/JobEngine.cs index 75ad75da0f..7709f629aa 100644 --- a/OpenSim/Framework/Monitoring/JobEngine.cs +++ b/OpenSim/Framework/Monitoring/JobEngine.cs @@ -40,6 +40,8 @@ namespace OpenSim.Framework.Monitoring public int LogLevel { get; set; } + private object JobLock = new object(); + public string Name { get; private set; } public string LoggingName { get; private set; } @@ -95,7 +97,7 @@ namespace OpenSim.Framework.Monitoring public void Start() { - lock (this) + lock (JobLock) { if (IsRunning) return; @@ -119,43 +121,22 @@ namespace OpenSim.Framework.Monitoring public void Stop() { - lock (this) + lock (JobLock) { try { if (!IsRunning) return; + m_log.DebugFormat("[JobEngine] Stopping {0}", Name); + IsRunning = false; - int requestsLeft = m_jobQueue.Count; - - if (requestsLeft <= 0) - { + m_finishedProcessingAfterStop.Reset(); + if(m_jobQueue.Count <= 0) m_cancelSource.Cancel(); - } - else - { - m_log.InfoFormat("[{0}]: Waiting to write {1} events after stop.", LoggingName, requestsLeft); - while (requestsLeft > 0) - { - if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop)) - { - // After timeout no events have been written - if (requestsLeft == m_jobQueue.Count) - { - m_log.WarnFormat( - "[{0}]: No requests processed after {1} ms wait. Discarding remaining {2} requests", - LoggingName, RequestProcessTimeoutOnStop, requestsLeft); - - break; - } - } - - requestsLeft = m_jobQueue.Count; - } - } + m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop); } finally { @@ -244,48 +225,51 @@ namespace OpenSim.Framework.Monitoring private void ProcessRequests() { - try + while(IsRunning || m_jobQueue.Count > 0) { - while (IsRunning || m_jobQueue.Count > 0) + try { - try - { - CurrentJob = m_jobQueue.Take(m_cancelSource.Token); - } - catch (ObjectDisposedException e) - { - // If we see this whilst not running then it may be due to a race where this thread checks - // IsRunning after the stopping thread sets it to false and disposes of the cancellation source. - if (IsRunning) - throw e; - else - break; - } - - if (LogLevel >= 1) - m_log.DebugFormat("[{0}]: Processing job {1}", LoggingName, CurrentJob.Name); - - try - { - CurrentJob.Action(); - } - catch (Exception e) - { - m_log.Error( - string.Format( - "[{0}]: Job {1} failed, continuing. Exception ", LoggingName, CurrentJob.Name), e); - } - - if (LogLevel >= 1) - m_log.DebugFormat("[{0}]: Processed job {1}", LoggingName, CurrentJob.Name); - - CurrentJob = null; + CurrentJob = m_jobQueue.Take(m_cancelSource.Token); } - } - catch (OperationCanceledException) - { + catch(ObjectDisposedException e) + { + // If we see this whilst not running then it may be due to a race where this thread checks + // IsRunning after the stopping thread sets it to false and disposes of the cancellation source. + if(IsRunning) + throw e; + else + { + m_log.DebugFormat("[JobEngine] {0} stopping ignoring {1} jobs in queue", + Name,m_jobQueue.Count); + break; + } + } + catch(OperationCanceledException) + { + break; + } + + if(LogLevel >= 1) + m_log.DebugFormat("[{0}]: Processing job {1}",LoggingName,CurrentJob.Name); + + try + { + CurrentJob.Action(); + } + catch(Exception e) + { + m_log.Error( + string.Format( + "[{0}]: Job {1} failed, continuing. Exception ",LoggingName,CurrentJob.Name),e); + } + + if(LogLevel >= 1) + m_log.DebugFormat("[{0}]: Processed job {1}",LoggingName,CurrentJob.Name); + + CurrentJob = null; } + Watchdog.RemoveThread(false); m_finishedProcessingAfterStop.Set(); } diff --git a/OpenSim/Framework/Monitoring/Watchdog.cs b/OpenSim/Framework/Monitoring/Watchdog.cs index 4485a9c501..83f8e016bb 100644 --- a/OpenSim/Framework/Monitoring/Watchdog.cs +++ b/OpenSim/Framework/Monitoring/Watchdog.cs @@ -333,39 +333,45 @@ namespace OpenSim.Framework.Monitoring { List callbackInfos = null; + // get a copy since we may change m_threads + List threadsInfo; lock (m_threads) + threadsInfo = m_threads.Values.ToList(); + + foreach (ThreadWatchdogInfo threadInfo in threadsInfo) { - // get a copy since we may change m_threads - List threadsInfo = m_threads.Values.ToList(); - foreach (ThreadWatchdogInfo threadInfo in threadsInfo) + lock (m_threads) { - if (threadInfo.Thread.ThreadState == ThreadState.Stopped) - { - RemoveThread(threadInfo.Thread.ManagedThreadId); + if(!m_threads.ContainsValue(threadInfo)) + continue; + } - if (callbackInfos == null) + if(threadInfo.Thread.ThreadState == ThreadState.Stopped) + { + RemoveThread(threadInfo.Thread.ManagedThreadId); + + if(callbackInfos == null) + callbackInfos = new List(); + + callbackInfos.Add(threadInfo); + } + else if(!threadInfo.IsTimedOut && now - threadInfo.LastTick >= threadInfo.Timeout) + { + threadInfo.IsTimedOut = true; + + if(threadInfo.AlarmIfTimeout) + { + if(callbackInfos == null) callbackInfos = new List(); - callbackInfos.Add(threadInfo); - } - else if (!threadInfo.IsTimedOut && now - threadInfo.LastTick >= threadInfo.Timeout) - { - threadInfo.IsTimedOut = true; - - if (threadInfo.AlarmIfTimeout) - { - if (callbackInfos == null) - callbackInfos = new List(); - - // Send a copy of the watchdog info to prevent race conditions where the watchdog - // thread updates the monitoring info after an alarm has been sent out. - callbackInfos.Add(new ThreadWatchdogInfo(threadInfo)); - } + // Send a copy of the watchdog info to prevent race conditions where the watchdog + // thread updates the monitoring info after an alarm has been sent out. + callbackInfos.Add(new ThreadWatchdogInfo(threadInfo)); } } } - if (callbackInfos != null) + if(callbackInfos != null) foreach (ThreadWatchdogInfo callbackInfo in callbackInfos) callback(callbackInfo); }