From d3f1fc79e50edb9ef851bdedfa83e9f5c13b4519 Mon Sep 17 00:00:00 2001 From: UbitUmarov Date: Sun, 1 Jul 2012 01:33:20 +0100 Subject: [PATCH] *TO CHECK/REVIEW/REVERT/TEST whatever* pollService new requests get enqueued to unified requests queue directly. Retries get into that every 100ms. 3 working threads as before plus another that only does retries timming. --- .../HttpServer/PollServiceRequestManager.cs | 203 ++++++++++++++++++ .../HttpServer/PollServiceWorkerThread.cs | 3 + 2 files changed, 206 insertions(+) diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs index c3e1a79f94..e488b380e7 100644 --- a/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs +++ b/OpenSim/Framework/Servers/HttpServer/PollServiceRequestManager.cs @@ -33,8 +33,11 @@ using log4net; using HttpServer; using OpenSim.Framework; + +/* namespace OpenSim.Framework.Servers.HttpServer { + public class PollServiceRequestManager { // private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); @@ -156,3 +159,203 @@ namespace OpenSim.Framework.Servers.HttpServer } } } + */ + +using System.IO; +using System.Text; +using System.Collections.Generic; + +namespace OpenSim.Framework.Servers.HttpServer +{ + public class PollServiceRequestManager + { + private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); + + private readonly BaseHttpServer m_server; + + private BlockingQueue m_requests = new BlockingQueue(); + private static Queue m_retry_requests = new Queue(); + + private uint m_WorkerThreadCount = 0; + private Thread[] m_workerThreads; + private Thread m_retrysThread; + + private bool m_running = true; + + private int m_timeout = 250; + + public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout) + { + m_server = pSrv; + m_WorkerThreadCount = pWorkerThreadCount; + m_workerThreads = new Thread[m_WorkerThreadCount]; + + //startup worker threads + for (uint i = 0; i < m_WorkerThreadCount; i++) + { + m_workerThreads[i] + = Watchdog.StartThread( + poolWorkerJob, + String.Format("PollServiceWorkerThread{0}", i), + ThreadPriority.Normal, + false, + true, + int.MaxValue); + } + + m_retrysThread = Watchdog.StartThread( + this.CheckRetries, + "PollServiceWatcherThread", + ThreadPriority.Normal, + false, + true, + 1000 * 60 * 10); + } + + + private void ReQueueEvent(PollServiceHttpRequest req) + { + if (m_running) + { + lock (m_retry_requests) + m_retry_requests.Enqueue(req); + } + } + + public void Enqueue(PollServiceHttpRequest req) + { + if (m_running) + m_requests.Enqueue(req); + } + + private void CheckRetries() + { + while (m_running) + { + Thread.Sleep(100); // let the world move + Watchdog.UpdateThread(); + lock (m_retry_requests) + { + while (m_retry_requests.Count > 0 && m_running) + Enqueue(m_retry_requests.Dequeue()); + } + } + } + + ~PollServiceRequestManager() + { + m_running = false; + m_timeout = -10000; // cause all to expire + Thread.Sleep(1000); // let the world move + + foreach (Thread t in m_workerThreads) + { + try + { + t.Abort(); + } + catch + { + } + } + + try + { + foreach (PollServiceHttpRequest req in m_retry_requests) + { + m_server.DoHTTPGruntWork( + req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id), + new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext)); + } + } + catch + { + } + + PollServiceHttpRequest wreq; + m_retry_requests.Clear(); + + while (m_requests.Count() > 0) + { + try + { + wreq = m_requests.Dequeue(0); + m_server.DoHTTPGruntWork( + wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id), + new OSHttpResponse(new HttpResponse(wreq.HttpContext, wreq.Request), wreq.HttpContext)); + } + catch + { + } + } + + m_requests.Clear(); + } + + // work threads + + private void poolWorkerJob() + { + PollServiceHttpRequest req; + StreamReader str; + + while (true) + { + req = m_requests.Dequeue(5000); + + Watchdog.UpdateThread(); + if (req != null) + { + try + { + if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id)) + { + try + { + str = new StreamReader(req.Request.Body); + } + catch (System.ArgumentException) + { + // Stream was not readable means a child agent + // was closed due to logout, leaving the + // Event Queue request orphaned. + continue; + } + + try + { + Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id, str.ReadToEnd()); + m_server.DoHTTPGruntWork(responsedata, + new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext)); + } + catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream + { + // Ignore it, no need to reply + } + + str.Close(); + + } + else + { + if ((Environment.TickCount - req.RequestTime) > m_timeout) + { + m_server.DoHTTPGruntWork(req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id), + new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext)); + } + else + { + ReQueueEvent(req); + } + } + } + catch (Exception e) + { + m_log.ErrorFormat("Exception in poll service thread: " + e.ToString()); + } + } + } + } + } +} + diff --git a/OpenSim/Framework/Servers/HttpServer/PollServiceWorkerThread.cs b/OpenSim/Framework/Servers/HttpServer/PollServiceWorkerThread.cs index b39185fba9..7cd27e5290 100644 --- a/OpenSim/Framework/Servers/HttpServer/PollServiceWorkerThread.cs +++ b/OpenSim/Framework/Servers/HttpServer/PollServiceWorkerThread.cs @@ -25,6 +25,8 @@ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +/* Ubit work moved to PollServiceRequestManager + using System; using System.Collections; using System.Collections.Generic; @@ -128,3 +130,4 @@ namespace OpenSim.Framework.Servers.HttpServer } } } +*/ \ No newline at end of file