Adding Avination's PollService to round out the HTTP inventory changes

cpu-performance
Melanie 2013-06-07 23:43:45 +01:00
parent 346eda27cc
commit 7c0bfca7a0
9 changed files with 228 additions and 244 deletions

View File

@ -234,7 +234,7 @@ namespace OpenSim.Framework.Console
string uri = "/ReadResponses/" + sessionID.ToString() + "/"; string uri = "/ReadResponses/" + sessionID.ToString() + "/";
m_Server.AddPollServiceHTTPHandler( m_Server.AddPollServiceHTTPHandler(
uri, new PollServiceEventArgs(null, HasEvents, GetEvents, NoEvents, sessionID)); uri, new PollServiceEventArgs(null, HasEvents, GetEvents, NoEvents, sessionID,25000)); // 25 secs timeout
XmlDocument xmldoc = new XmlDocument(); XmlDocument xmldoc = new XmlDocument();
XmlNode xmlnode = xmldoc.CreateNode(XmlNodeType.XmlDeclaration, XmlNode xmlnode = xmldoc.CreateNode(XmlNodeType.XmlDeclaration,
@ -425,7 +425,7 @@ namespace OpenSim.Framework.Console
return false; return false;
} }
private Hashtable GetEvents(UUID RequestID, UUID sessionID, string request) private Hashtable GetEvents(UUID RequestID, UUID sessionID)
{ {
ConsoleConnection c = null; ConsoleConnection c = null;

View File

@ -1805,7 +1805,6 @@ namespace OpenSim.Framework.Servers.HttpServer
// Long Poll Service Manager with 3 worker threads a 25 second timeout for no events // Long Poll Service Manager with 3 worker threads a 25 second timeout for no events
m_PollServiceManager = new PollServiceRequestManager(this, 3, 25000); m_PollServiceManager = new PollServiceRequestManager(this, 3, 25000);
m_PollServiceManager.Start();
HTTPDRunning = true; HTTPDRunning = true;
//HttpListenerContext context; //HttpListenerContext context;
@ -1856,7 +1855,7 @@ namespace OpenSim.Framework.Servers.HttpServer
HTTPDRunning = false; HTTPDRunning = false;
try try
{ {
m_PollServiceManager.Stop(); // m_PollServiceManager.Stop();
m_httpListener2.ExceptionThrown -= httpServerException; m_httpListener2.ExceptionThrown -= httpServerException;
//m_httpListener2.DisconnectHandler = null; //m_httpListener2.DisconnectHandler = null;

View File

@ -34,7 +34,7 @@ namespace OpenSim.Framework.Servers.HttpServer
public delegate void RequestMethod(UUID requestID, Hashtable request); public delegate void RequestMethod(UUID requestID, Hashtable request);
public delegate bool HasEventsMethod(UUID requestID, UUID pId); public delegate bool HasEventsMethod(UUID requestID, UUID pId);
public delegate Hashtable GetEventsMethod(UUID requestID, UUID pId, string request); public delegate Hashtable GetEventsMethod(UUID requestID, UUID pId);
public delegate Hashtable NoEventsMethod(UUID requestID, UUID pId); public delegate Hashtable NoEventsMethod(UUID requestID, UUID pId);
@ -45,17 +45,30 @@ namespace OpenSim.Framework.Servers.HttpServer
public NoEventsMethod NoEvents; public NoEventsMethod NoEvents;
public RequestMethod Request; public RequestMethod Request;
public UUID Id; public UUID Id;
public int TimeOutms;
public EventType Type;
public enum EventType : int
{
Normal = 0,
LslHttp = 1,
Inventory = 2,
Texture = 3,
Mesh = 4
}
public PollServiceEventArgs( public PollServiceEventArgs(
RequestMethod pRequest, RequestMethod pRequest,
HasEventsMethod pHasEvents, GetEventsMethod pGetEvents, NoEventsMethod pNoEvents, HasEventsMethod pHasEvents, GetEventsMethod pGetEvents, NoEventsMethod pNoEvents,
UUID pId) UUID pId, int pTimeOutms)
{ {
Request = pRequest; Request = pRequest;
HasEvents = pHasEvents; HasEvents = pHasEvents;
GetEvents = pGetEvents; GetEvents = pGetEvents;
NoEvents = pNoEvents; NoEvents = pNoEvents;
Id = pId; Id = pId;
TimeOutms = pTimeOutms;
Type = EventType.Normal;
} }
} }
} }

View File

@ -33,53 +33,56 @@ using log4net;
using HttpServer; using HttpServer;
using OpenSim.Framework; using OpenSim.Framework;
using OpenSim.Framework.Monitoring; using OpenSim.Framework.Monitoring;
using Amib.Threading;
using System.IO;
using System.Text;
using System.Collections.Generic;
namespace OpenSim.Framework.Servers.HttpServer namespace OpenSim.Framework.Servers.HttpServer
{ {
public class PollServiceRequestManager public class PollServiceRequestManager
{ {
// private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
private readonly BaseHttpServer m_server; private readonly BaseHttpServer m_server;
private static Queue m_requests = Queue.Synchronized(new Queue());
private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>();
private static Queue<PollServiceHttpRequest> m_slowRequests = new Queue<PollServiceHttpRequest>();
private static Queue<PollServiceHttpRequest> m_retryRequests = new Queue<PollServiceHttpRequest>();
private uint m_WorkerThreadCount = 0; private uint m_WorkerThreadCount = 0;
private Thread[] m_workerThreads; private Thread[] m_workerThreads;
private PollServiceWorkerThread[] m_PollServiceWorkerThreads; private Thread m_retrysThread;
private volatile bool m_running = true;
private int m_pollTimeout; private bool m_running = true;
private int slowCount = 0;
private SmartThreadPool m_threadPool = new SmartThreadPool(20000, 12, 2);
// private int m_timeout = 1000; // increase timeout 250; now use the event one
public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout) public PollServiceRequestManager(BaseHttpServer pSrv, uint pWorkerThreadCount, int pTimeout)
{ {
m_server = pSrv; m_server = pSrv;
m_WorkerThreadCount = pWorkerThreadCount; m_WorkerThreadCount = pWorkerThreadCount;
m_pollTimeout = pTimeout;
}
public void Start()
{
m_running = true;
m_workerThreads = new Thread[m_WorkerThreadCount]; m_workerThreads = new Thread[m_WorkerThreadCount];
m_PollServiceWorkerThreads = new PollServiceWorkerThread[m_WorkerThreadCount];
//startup worker threads //startup worker threads
for (uint i = 0; i < m_WorkerThreadCount; i++) for (uint i = 0; i < m_WorkerThreadCount; i++)
{ {
m_PollServiceWorkerThreads[i] = new PollServiceWorkerThread(m_server, m_pollTimeout);
m_PollServiceWorkerThreads[i].ReQueue += ReQueueEvent;
m_workerThreads[i] m_workerThreads[i]
= Watchdog.StartThread( = Watchdog.StartThread(
m_PollServiceWorkerThreads[i].ThreadStart, PoolWorkerJob,
String.Format("PollServiceWorkerThread{0}", i), String.Format("PollServiceWorkerThread{0}", i),
ThreadPriority.Normal, ThreadPriority.Normal,
false, false,
true, false,
null, null,
int.MaxValue); int.MaxValue);
} }
Watchdog.StartThread( m_retrysThread = Watchdog.StartThread(
this.ThreadStart, this.CheckRetries,
"PollServiceWatcherThread", "PollServiceWatcherThread",
ThreadPriority.Normal, ThreadPriority.Normal,
false, false,
@ -88,78 +91,211 @@ namespace OpenSim.Framework.Servers.HttpServer
1000 * 60 * 10); 1000 * 60 * 10);
} }
internal void ReQueueEvent(PollServiceHttpRequest req)
private void ReQueueEvent(PollServiceHttpRequest req)
{ {
// Do accounting stuff here if (m_running)
Enqueue(req); {
lock (m_retryRequests)
m_retryRequests.Enqueue(req);
}
} }
public void Enqueue(PollServiceHttpRequest req) public void Enqueue(PollServiceHttpRequest req)
{ {
lock (m_requests) if (m_running)
{
if (req.PollServiceArgs.Type != PollServiceEventArgs.EventType.Normal)
{
m_requests.Enqueue(req); m_requests.Enqueue(req);
} }
else
{
lock (m_slowRequests)
m_slowRequests.Enqueue(req);
}
}
}
public void ThreadStart() private void CheckRetries()
{ {
while (m_running) while (m_running)
{ {
Thread.Sleep(100); // let the world move .. back to faster rate
Watchdog.UpdateThread(); Watchdog.UpdateThread();
ProcessQueuedRequests(); lock (m_retryRequests)
Thread.Sleep(1000); {
while (m_retryRequests.Count > 0 && m_running)
m_requests.Enqueue(m_retryRequests.Dequeue());
}
slowCount++;
if (slowCount >= 10)
{
slowCount = 0;
lock (m_slowRequests)
{
while (m_slowRequests.Count > 0 && m_running)
m_requests.Enqueue(m_slowRequests.Dequeue());
}
}
} }
} }
private void ProcessQueuedRequests() ~PollServiceRequestManager()
{ {
lock (m_requests) m_running = false;
// m_timeout = -10000; // cause all to expire
Thread.Sleep(1000); // let the world move
foreach (Thread t in m_workerThreads)
Watchdog.AbortThread(t.ManagedThreadId);
try
{ {
if (m_requests.Count == 0) foreach (PollServiceHttpRequest req in m_retryRequests)
return;
// m_log.DebugFormat("[POLL SERVICE REQUEST MANAGER]: Processing {0} requests", m_requests.Count);
int reqperthread = (int) (m_requests.Count/m_WorkerThreadCount) + 1;
// For Each WorkerThread
for (int tc = 0; tc < m_WorkerThreadCount && m_requests.Count > 0; tc++)
{ {
//Loop over number of requests each thread handles. DoHTTPGruntWork(m_server,req,
for (int i = 0; i < reqperthread && m_requests.Count > 0; i++) req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
}
}
catch
{
}
PollServiceHttpRequest wreq;
m_retryRequests.Clear();
lock (m_slowRequests)
{
while (m_slowRequests.Count > 0 && m_running)
m_requests.Enqueue(m_slowRequests.Dequeue());
}
while (m_requests.Count() > 0)
{ {
try try
{ {
m_PollServiceWorkerThreads[tc].Enqueue((PollServiceHttpRequest)m_requests.Dequeue()); wreq = m_requests.Dequeue(0);
DoHTTPGruntWork(m_server,wreq,
wreq.PollServiceArgs.NoEvents(wreq.RequestID, wreq.PollServiceArgs.Id));
} }
catch (InvalidOperationException) catch
{ {
// The queue is empty, we did our calculations wrong!
return;
} }
}
}
}
}
public void Stop()
{
m_running = false;
foreach (object o in m_requests)
{
PollServiceHttpRequest req = (PollServiceHttpRequest) o;
PollServiceWorkerThread.DoHTTPGruntWork(
m_server, req, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
} }
m_requests.Clear(); m_requests.Clear();
}
foreach (Thread t in m_workerThreads) // work threads
private void PoolWorkerJob()
{ {
t.Abort(); while (m_running)
{
PollServiceHttpRequest req = m_requests.Dequeue(5000);
Watchdog.UpdateThread();
if (req != null)
{
try
{
if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id))
{
Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id);
if (responsedata == null)
continue;
if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.Normal) // This is the event queue
{
try
{
DoHTTPGruntWork(m_server, req, responsedata);
}
catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
{
// Ignore it, no need to reply
}
}
else
{
m_threadPool.QueueWorkItem(x =>
{
try
{
DoHTTPGruntWork(m_server, req, responsedata);
}
catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
{
// Ignore it, no need to reply
}
return null;
}, null);
}
}
else
{
if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
{
DoHTTPGruntWork(m_server, req,
req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
}
else
{
ReQueueEvent(req);
}
}
}
catch (Exception e)
{
m_log.ErrorFormat("Exception in poll service thread: " + e.ToString());
} }
} }
} }
} }
// DoHTTPGruntWork changed, not sending response
// do the same work around as core
internal static void DoHTTPGruntWork(BaseHttpServer server, PollServiceHttpRequest req, Hashtable responsedata)
{
OSHttpResponse response
= new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext);
byte[] buffer = server.DoHTTPGruntWork(responsedata, response);
response.SendChunked = false;
response.ContentLength64 = buffer.Length;
response.ContentEncoding = Encoding.UTF8;
try
{
response.OutputStream.Write(buffer, 0, buffer.Length);
}
catch (Exception ex)
{
m_log.Warn(string.Format("[POLL SERVICE WORKER THREAD]: Error ", ex));
}
finally
{
//response.OutputStream.Close();
try
{
response.OutputStream.Flush();
response.Send();
//if (!response.KeepAlive && response.ReuseContext)
// response.FreeContext();
}
catch (Exception e)
{
m_log.Warn(String.Format("[POLL SERVICE WORKER THREAD]: Error ", e));
}
}
}
}
}

View File

@ -1,165 +0,0 @@
/*
* Copyright (c) Contributors, http://opensimulator.org/
* See CONTRIBUTORS.TXT for a full list of copyright holders.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of the OpenSimulator Project nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using System.Text;
using HttpServer;
using OpenMetaverse;
using System.Reflection;
using log4net;
using OpenSim.Framework.Monitoring;
namespace OpenSim.Framework.Servers.HttpServer
{
public delegate void ReQueuePollServiceItem(PollServiceHttpRequest req);
public class PollServiceWorkerThread
{
private static readonly ILog m_log =
LogManager.GetLogger(
MethodBase.GetCurrentMethod().DeclaringType);
public event ReQueuePollServiceItem ReQueue;
private readonly BaseHttpServer m_server;
private BlockingQueue<PollServiceHttpRequest> m_request;
private bool m_running = true;
private int m_timeout = 250;
public PollServiceWorkerThread(BaseHttpServer pSrv, int pTimeout)
{
m_request = new BlockingQueue<PollServiceHttpRequest>();
m_server = pSrv;
m_timeout = pTimeout;
}
public void ThreadStart()
{
Run();
}
public void Run()
{
while (m_running)
{
PollServiceHttpRequest req = m_request.Dequeue();
Watchdog.UpdateThread();
try
{
if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id))
{
StreamReader str;
try
{
str = new StreamReader(req.Request.Body);
}
catch (System.ArgumentException)
{
// Stream was not readable means a child agent
// was closed due to logout, leaving the
// Event Queue request orphaned.
continue;
}
Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id, str.ReadToEnd());
DoHTTPGruntWork(m_server, req, responsedata);
}
else
{
if ((Environment.TickCount - req.RequestTime) > m_timeout)
{
DoHTTPGruntWork(
m_server,
req,
req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
}
else
{
ReQueuePollServiceItem reQueueItem = ReQueue;
if (reQueueItem != null)
reQueueItem(req);
}
}
}
catch (Exception e)
{
m_log.ErrorFormat("Exception in poll service thread: " + e.ToString());
}
}
}
internal void Enqueue(PollServiceHttpRequest pPollServiceHttpRequest)
{
m_request.Enqueue(pPollServiceHttpRequest);
}
/// <summary>
/// FIXME: This should be part of BaseHttpServer
/// </summary>
internal static void DoHTTPGruntWork(BaseHttpServer server, PollServiceHttpRequest req, Hashtable responsedata)
{
OSHttpResponse response
= new OSHttpResponse(new HttpResponse(req.HttpContext, req.Request), req.HttpContext);
byte[] buffer = server.DoHTTPGruntWork(responsedata, response);
response.SendChunked = false;
response.ContentLength64 = buffer.Length;
response.ContentEncoding = Encoding.UTF8;
try
{
response.OutputStream.Write(buffer, 0, buffer.Length);
}
catch (Exception ex)
{
m_log.Warn(string.Format("[POLL SERVICE WORKER THREAD]: Error ", ex));
}
finally
{
//response.OutputStream.Close();
try
{
response.OutputStream.Flush();
response.Send();
//if (!response.KeepAlive && response.ReuseContext)
// response.FreeContext();
}
catch (Exception e)
{
m_log.Warn(String.Format("[POLL SERVICE WORKER THREAD]: Error ", e));
}
}
}
}
}

View File

@ -376,7 +376,7 @@ namespace OpenSim.Region.ClientStack.Linden
// TODO: Add EventQueueGet name/description for diagnostics // TODO: Add EventQueueGet name/description for diagnostics
MainServer.Instance.AddPollServiceHTTPHandler( MainServer.Instance.AddPollServiceHTTPHandler(
eventQueueGetPath, eventQueueGetPath,
new PollServiceEventArgs(null, HasEvents, GetEvents, NoEvents, agentID)); new PollServiceEventArgs(null, HasEvents, GetEvents, NoEvents, agentID, 40000));
// m_log.DebugFormat( // m_log.DebugFormat(
// "[EVENT QUEUE GET MODULE]: Registered EQG handler {0} for {1} in {2}", // "[EVENT QUEUE GET MODULE]: Registered EQG handler {0} for {1} in {2}",
@ -418,7 +418,7 @@ namespace OpenSim.Region.ClientStack.Linden
} }
} }
public Hashtable GetEvents(UUID requestID, UUID pAgentId, string request) public Hashtable GetEvents(UUID requestID, UUID pAgentId)
{ {
if (DebugLevel >= 2) if (DebugLevel >= 2)
m_log.DebugFormat("POLLED FOR EQ MESSAGES BY {0} in {1}", pAgentId, m_scene.RegionInfo.RegionName); m_log.DebugFormat("POLLED FOR EQ MESSAGES BY {0} in {1}", pAgentId, m_scene.RegionInfo.RegionName);

View File

@ -156,12 +156,12 @@ namespace OpenSim.Region.ClientStack.Linden
private Scene m_scene; private Scene m_scene;
public PollServiceInventoryEventArgs(Scene scene, UUID pId) : public PollServiceInventoryEventArgs(Scene scene, UUID pId) :
base(null, null, null, null, pId) base(null, null, null, null, pId, int.MaxValue)
{ {
m_scene = scene; m_scene = scene;
HasEvents = (x, y) => { lock (responses) return responses.ContainsKey(x); }; HasEvents = (x, y) => { lock (responses) return responses.ContainsKey(x); };
GetEvents = (x, y, z) => GetEvents = (x, y) =>
{ {
lock (responses) lock (responses)
{ {

View File

@ -237,7 +237,7 @@ namespace OpenSim.Region.CoreModules.Scripting.LSLHttp
m_HttpServer.AddPollServiceHTTPHandler( m_HttpServer.AddPollServiceHTTPHandler(
uri, uri,
new PollServiceEventArgs(HttpRequestHandler, HasEvents, GetEvents, NoEvents, urlcode)); new PollServiceEventArgs(HttpRequestHandler, HasEvents, GetEvents, NoEvents, urlcode, 25000));
m_log.DebugFormat( m_log.DebugFormat(
"[URL MODULE]: Set up incoming request url {0} for {1} in {2} {3}", "[URL MODULE]: Set up incoming request url {0} for {1} in {2} {3}",
@ -282,7 +282,7 @@ namespace OpenSim.Region.CoreModules.Scripting.LSLHttp
m_HttpsServer.AddPollServiceHTTPHandler( m_HttpsServer.AddPollServiceHTTPHandler(
uri, uri,
new PollServiceEventArgs(HttpRequestHandler, HasEvents, GetEvents, NoEvents, urlcode)); new PollServiceEventArgs(HttpRequestHandler, HasEvents, GetEvents, NoEvents, urlcode, 25000));
m_log.DebugFormat( m_log.DebugFormat(
"[URL MODULE]: Set up incoming secure request url {0} for {1} in {2} {3}", "[URL MODULE]: Set up incoming secure request url {0} for {1} in {2} {3}",
@ -516,7 +516,7 @@ namespace OpenSim.Region.CoreModules.Scripting.LSLHttp
} }
} }
private Hashtable GetEvents(UUID requestID, UUID sessionID, string request) private Hashtable GetEvents(UUID requestID, UUID sessionID)
{ {
Hashtable response; Hashtable response;

View File

@ -191,6 +191,7 @@
<Reference name="XMLRPC" path="../../../../bin/"/> <Reference name="XMLRPC" path="../../../../bin/"/>
<Reference name="log4net" path="../../../../bin/"/> <Reference name="log4net" path="../../../../bin/"/>
<Reference name="HttpServer_OpenSim" path="../../../../bin/"/> <Reference name="HttpServer_OpenSim" path="../../../../bin/"/>
<Reference name="SmartThreadPool"/>
<Files> <Files>
<Match pattern="*.cs" recurse="true"> <Match pattern="*.cs" recurse="true">