give BlockingCollection more chances

httptests
UbitUmarov 2018-01-22 00:24:29 +00:00
parent 855dcda9c3
commit 73b587989c
3 changed files with 98 additions and 82 deletions

View File

@ -225,13 +225,16 @@ namespace OpenSim.Framework.Servers.HttpServer
private void PoolWorkerJob() private void PoolWorkerJob()
{ {
PollServiceHttpRequest req;
while (m_running) while (m_running)
{ {
PollServiceHttpRequest req; if(!m_requests.TryTake(out req, 4500) || req == null)
m_requests.TryTake(out req, 4500); {
Watchdog.UpdateThread(); Watchdog.UpdateThread();
if(req == null)
continue; continue;
}
Watchdog.UpdateThread();
try try
{ {
@ -256,17 +259,18 @@ namespace OpenSim.Framework.Servers.HttpServer
if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id)) if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id))
{ {
PollServiceHttpRequest nreq = req;
m_threadPool.QueueWorkItem(x => m_threadPool.QueueWorkItem(x =>
{ {
try try
{ {
Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id); Hashtable responsedata = nreq.PollServiceArgs.GetEvents(nreq.RequestID, nreq.PollServiceArgs.Id);
req.DoHTTPGruntWork(m_server, responsedata); nreq.DoHTTPGruntWork(m_server, responsedata);
} }
catch (ObjectDisposedException) { } catch (ObjectDisposedException) { }
finally finally
{ {
byContextDequeue(req); byContextDequeue(nreq);
} }
return null; return null;
}, null); }, null);
@ -275,17 +279,18 @@ namespace OpenSim.Framework.Servers.HttpServer
{ {
if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
{ {
PollServiceHttpRequest nreq = req;
m_threadPool.QueueWorkItem(x => m_threadPool.QueueWorkItem(x =>
{ {
try try
{ {
req.DoHTTPGruntWork(m_server, nreq.DoHTTPGruntWork(m_server,
req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id)); nreq.PollServiceArgs.NoEvents(nreq.RequestID, nreq.PollServiceArgs.Id));
} }
catch (ObjectDisposedException) {} catch (ObjectDisposedException) {}
finally finally
{ {
byContextDequeue(req); byContextDequeue(nreq);
} }
return null; return null;
}, null); }, null);

View File

@ -28,17 +28,14 @@
using System; using System;
using System.Collections; using System.Collections;
using System.Collections.Generic; using System.Collections.Generic;
using System.Collections.Specialized; using System.Collections.Concurrent;
using System.Reflection; using System.Reflection;
using System.IO;
using System.Threading; using System.Threading;
using System.Web;
using Mono.Addins; using Mono.Addins;
using OpenSim.Framework.Monitoring; using OpenSim.Framework.Monitoring;
using log4net; using log4net;
using Nini.Config; using Nini.Config;
using OpenMetaverse; using OpenMetaverse;
using OpenMetaverse.StructuredData;
using OpenSim.Capabilities.Handlers; using OpenSim.Capabilities.Handlers;
using OpenSim.Framework; using OpenSim.Framework;
using OpenSim.Framework.Servers; using OpenSim.Framework.Servers;
@ -57,7 +54,6 @@ namespace OpenSim.Region.ClientStack.Linden
// LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); // LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
private Scene m_scene; private Scene m_scene;
private IAssetService m_AssetService;
private bool m_Enabled = true; private bool m_Enabled = true;
private string m_URL; private string m_URL;
@ -65,20 +61,19 @@ namespace OpenSim.Region.ClientStack.Linden
private string m_RedirectURL = null; private string m_RedirectURL = null;
private string m_RedirectURL2 = null; private string m_RedirectURL2 = null;
struct aPollRequest class APollRequest
{ {
public PollServiceMeshEventArgs thepoll; public PollServiceMeshEventArgs thepoll;
public UUID reqID; public UUID reqID;
public Hashtable request; public Hashtable request;
} }
public class aPollResponse public class APollResponse
{ {
public Hashtable response; public Hashtable response;
public int bytes; public int bytes;
} }
private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
private static GetMeshHandler m_getMeshHandler; private static GetMeshHandler m_getMeshHandler;
@ -88,8 +83,7 @@ namespace OpenSim.Region.ClientStack.Linden
private Dictionary<UUID, string> m_capsDict = new Dictionary<UUID, string>(); private Dictionary<UUID, string> m_capsDict = new Dictionary<UUID, string>();
private static Thread[] m_workerThreads = null; private static Thread[] m_workerThreads = null;
private static int m_NumberScenes = 0; private static int m_NumberScenes = 0;
private static OpenSim.Framework.BlockingQueue<aPollRequest> m_queue = private static BlockingCollection<APollRequest> m_queue = new BlockingCollection<APollRequest>();
new OpenSim.Framework.BlockingQueue<aPollRequest>();
private Dictionary<UUID, PollServiceMeshEventArgs> m_pollservices = new Dictionary<UUID, PollServiceMeshEventArgs>(); private Dictionary<UUID, PollServiceMeshEventArgs> m_pollservices = new Dictionary<UUID, PollServiceMeshEventArgs>();
@ -131,33 +125,35 @@ namespace OpenSim.Region.ClientStack.Linden
return; return;
m_scene = pScene; m_scene = pScene;
m_assetService = pScene.AssetService;
} }
public void RemoveRegion(Scene scene) public void RemoveRegion(Scene s)
{ {
if (!m_Enabled) if (!m_Enabled)
return; return;
m_scene.EventManager.OnRegisterCaps -= RegisterCaps; s.EventManager.OnRegisterCaps -= RegisterCaps;
m_scene.EventManager.OnDeregisterCaps -= DeregisterCaps; s.EventManager.OnDeregisterCaps -= DeregisterCaps;
m_scene.EventManager.OnThrottleUpdate -= ThrottleUpdate; s.EventManager.OnThrottleUpdate -= ThrottleUpdate;
m_NumberScenes--; m_NumberScenes--;
m_scene = null; m_scene = null;
} }
public void RegionLoaded(Scene scene) public void RegionLoaded(Scene s)
{ {
if (!m_Enabled) if (!m_Enabled)
return; return;
m_AssetService = m_scene.RequestModuleInterface<IAssetService>(); if(m_assetService == null)
m_scene.EventManager.OnRegisterCaps += RegisterCaps; {
// We'll reuse the same handler for all requests. m_assetService = m_scene.RequestModuleInterface<IAssetService>();
m_getMeshHandler = new GetMeshHandler(m_assetService); // We'll reuse the same handler for all requests.
m_scene.EventManager.OnDeregisterCaps += DeregisterCaps; m_getMeshHandler = new GetMeshHandler(m_assetService);
m_scene.EventManager.OnThrottleUpdate += ThrottleUpdate; }
s.EventManager.OnRegisterCaps += RegisterCaps;
s.EventManager.OnDeregisterCaps += DeregisterCaps;
s.EventManager.OnThrottleUpdate += ThrottleUpdate;
m_NumberScenes++; m_NumberScenes++;
@ -189,7 +185,7 @@ namespace OpenSim.Region.ClientStack.Linden
// Prevent red ink. // Prevent red ink.
try try
{ {
m_queue.Clear(); m_queue.Dispose();
} }
catch {} catch {}
} }
@ -201,14 +197,18 @@ namespace OpenSim.Region.ClientStack.Linden
private static void DoMeshRequests() private static void DoMeshRequests()
{ {
while(true) while (m_NumberScenes > 0)
{ {
aPollRequest poolreq = m_queue.Dequeue(4500); APollRequest poolreq;
Watchdog.UpdateThread(); if(m_queue.TryTake(out poolreq, 4500))
if(m_NumberScenes <= 0) {
return; if(m_NumberScenes <= 0)
if(poolreq.reqID != UUID.Zero) break;
poolreq.thepoll.Process(poolreq);
if(poolreq.reqID != UUID.Zero)
poolreq.thepoll.Process(poolreq);
}
Watchdog.UpdateThread();
} }
} }
@ -228,8 +228,8 @@ namespace OpenSim.Region.ClientStack.Linden
{ {
private List<Hashtable> requests = private List<Hashtable> requests =
new List<Hashtable>(); new List<Hashtable>();
private Dictionary<UUID, aPollResponse> responses = private Dictionary<UUID, APollResponse> responses =
new Dictionary<UUID, aPollResponse>(); new Dictionary<UUID, APollResponse>();
private HashSet<UUID> dropedResponses = new HashSet<UUID>(); private HashSet<UUID> dropedResponses = new HashSet<UUID>();
private Scene m_scene; private Scene m_scene;
@ -278,12 +278,12 @@ namespace OpenSim.Region.ClientStack.Linden
// x is request id, y is request data hashtable // x is request id, y is request data hashtable
Request = (x, y) => Request = (x, y) =>
{ {
aPollRequest reqinfo = new aPollRequest(); APollRequest reqinfo = new APollRequest();
reqinfo.thepoll = this; reqinfo.thepoll = this;
reqinfo.reqID = x; reqinfo.reqID = x;
reqinfo.request = y; reqinfo.request = y;
m_queue.Enqueue(reqinfo); m_queue.Add(reqinfo);
m_throttler.PassTime(); m_throttler.PassTime();
}; };
@ -309,7 +309,7 @@ namespace OpenSim.Region.ClientStack.Linden
}; };
} }
public void Process(aPollRequest requestinfo) public void Process(APollRequest requestinfo)
{ {
Hashtable response; Hashtable response;
@ -338,7 +338,7 @@ namespace OpenSim.Region.ClientStack.Linden
response["str_response_string"] = "Script timeout"; response["str_response_string"] = "Script timeout";
response["content_type"] = "text/plain"; response["content_type"] = "text/plain";
response["keepalive"] = false; response["keepalive"] = false;
responses[requestID] = new aPollResponse() { bytes = 0, response = response}; responses[requestID] = new APollResponse() { bytes = 0, response = response};
return; return;
} }
@ -357,7 +357,7 @@ namespace OpenSim.Region.ClientStack.Linden
} }
} }
responses[requestID] = new aPollResponse() responses[requestID] = new APollResponse()
{ {
bytes = (int)response["int_bytes"], bytes = (int)response["int_bytes"],
response = response response = response
@ -437,7 +437,7 @@ namespace OpenSim.Region.ClientStack.Linden
lastTimeElapsed = Util.GetTimeStampMS(); lastTimeElapsed = Util.GetTimeStampMS();
} }
public bool hasEvents(UUID key, Dictionary<UUID, aPollResponse> responses) public bool hasEvents(UUID key, Dictionary<UUID, APollResponse> responses)
{ {
PassTime(); PassTime();
// Note, this is called IN LOCK // Note, this is called IN LOCK
@ -447,7 +447,7 @@ namespace OpenSim.Region.ClientStack.Linden
{ {
return false; return false;
} }
aPollResponse response; APollResponse response;
if (responses.TryGetValue(key, out response)) if (responses.TryGetValue(key, out response))
{ {
// Normal // Normal
@ -472,7 +472,7 @@ namespace OpenSim.Region.ClientStack.Linden
return; return;
int add = (int)(ThrottleBytes * timeElapsed * 0.001); int add = (int)(ThrottleBytes * timeElapsed * 0.001);
if (add >= 1000) if (add >= 1000)
{ {
lastTimeElapsed = currenttime; lastTimeElapsed = currenttime;
BytesSent -= add; BytesSent -= add;
if (BytesSent < 0) BytesSent = 0; if (BytesSent < 0) BytesSent = 0;
@ -480,6 +480,6 @@ namespace OpenSim.Region.ClientStack.Linden
} }
public int ThrottleBytes; public int ThrottleBytes;
} }
} }
} }

View File

@ -28,6 +28,7 @@
using System; using System;
using System.Collections; using System.Collections;
using System.Collections.Generic; using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Reflection; using System.Reflection;
using System.Threading; using System.Threading;
using log4net; using log4net;
@ -51,7 +52,7 @@ namespace OpenSim.Region.ClientStack.Linden
public class GetTextureModule : INonSharedRegionModule public class GetTextureModule : INonSharedRegionModule
{ {
struct aPollRequest class APollRequest
{ {
public PollServiceTextureEventArgs thepoll; public PollServiceTextureEventArgs thepoll;
public UUID reqID; public UUID reqID;
@ -59,7 +60,7 @@ namespace OpenSim.Region.ClientStack.Linden
public bool send503; public bool send503;
} }
public class aPollResponse public class APollResponse
{ {
public Hashtable response; public Hashtable response;
public int bytes; public int bytes;
@ -77,8 +78,7 @@ namespace OpenSim.Region.ClientStack.Linden
private Dictionary<UUID, string> m_capsDict = new Dictionary<UUID, string>(); private Dictionary<UUID, string> m_capsDict = new Dictionary<UUID, string>();
private static Thread[] m_workerThreads = null; private static Thread[] m_workerThreads = null;
private static int m_NumberScenes = 0; private static int m_NumberScenes = 0;
private static OpenSim.Framework.BlockingQueue<aPollRequest> m_queue = private static BlockingCollection<APollRequest> m_queue = new BlockingCollection<APollRequest>();
new OpenSim.Framework.BlockingQueue<aPollRequest>();
private Dictionary<UUID,PollServiceTextureEventArgs> m_pollservices = new Dictionary<UUID,PollServiceTextureEventArgs>(); private Dictionary<UUID,PollServiceTextureEventArgs> m_pollservices = new Dictionary<UUID,PollServiceTextureEventArgs>();
@ -107,26 +107,29 @@ namespace OpenSim.Region.ClientStack.Linden
public void AddRegion(Scene s) public void AddRegion(Scene s)
{ {
m_scene = s; m_scene = s;
m_assetService = s.AssetService;
} }
public void RemoveRegion(Scene s) public void RemoveRegion(Scene s)
{ {
m_scene.EventManager.OnRegisterCaps -= RegisterCaps; s.EventManager.OnRegisterCaps -= RegisterCaps;
m_scene.EventManager.OnDeregisterCaps -= DeregisterCaps; s.EventManager.OnDeregisterCaps -= DeregisterCaps;
m_scene.EventManager.OnThrottleUpdate -= ThrottleUpdate; s.EventManager.OnThrottleUpdate -= ThrottleUpdate;
m_NumberScenes--; m_NumberScenes--;
m_scene = null; m_scene = null;
} }
public void RegionLoaded(Scene s) public void RegionLoaded(Scene s)
{ {
// We'll reuse the same handler for all requests. if(m_assetService == null)
m_getTextureHandler = new GetTextureHandler(m_assetService); {
m_assetService = s.RequestModuleInterface<IAssetService>();
// We'll reuse the same handler for all requests.
m_getTextureHandler = new GetTextureHandler(m_assetService);
}
m_scene.EventManager.OnRegisterCaps += RegisterCaps; s.EventManager.OnRegisterCaps += RegisterCaps;
m_scene.EventManager.OnDeregisterCaps += DeregisterCaps; s.EventManager.OnDeregisterCaps += DeregisterCaps;
m_scene.EventManager.OnThrottleUpdate += ThrottleUpdate; s.EventManager.OnThrottleUpdate += ThrottleUpdate;
m_NumberScenes++; m_NumberScenes++;
@ -173,7 +176,7 @@ namespace OpenSim.Region.ClientStack.Linden
foreach (Thread t in m_workerThreads) foreach (Thread t in m_workerThreads)
Watchdog.AbortThread(t.ManagedThreadId); Watchdog.AbortThread(t.ManagedThreadId);
m_queue.Clear(); m_queue.Dispose();
} }
} }
@ -190,8 +193,8 @@ namespace OpenSim.Region.ClientStack.Linden
{ {
private List<Hashtable> requests = private List<Hashtable> requests =
new List<Hashtable>(); new List<Hashtable>();
private Dictionary<UUID, aPollResponse> responses = private Dictionary<UUID, APollResponse> responses =
new Dictionary<UUID, aPollResponse>(); new Dictionary<UUID, APollResponse>();
private HashSet<UUID> dropedResponses = new HashSet<UUID>(); private HashSet<UUID> dropedResponses = new HashSet<UUID>();
private Scene m_scene; private Scene m_scene;
@ -239,7 +242,7 @@ namespace OpenSim.Region.ClientStack.Linden
// x is request id, y is request data hashtable // x is request id, y is request data hashtable
Request = (x, y) => Request = (x, y) =>
{ {
aPollRequest reqinfo = new aPollRequest(); APollRequest reqinfo = new APollRequest();
reqinfo.thepoll = this; reqinfo.thepoll = this;
reqinfo.reqID = x; reqinfo.reqID = x;
reqinfo.request = y; reqinfo.request = y;
@ -249,14 +252,14 @@ namespace OpenSim.Region.ClientStack.Linden
{ {
if (responses.Count > 0) if (responses.Count > 0)
{ {
if (m_queue.Count() >= 4) if (m_queue.Count >= 4)
{ {
// Never allow more than 4 fetches to wait // Never allow more than 4 fetches to wait
reqinfo.send503 = true; reqinfo.send503 = true;
} }
} }
} }
m_queue.Enqueue(reqinfo); m_queue.Add(reqinfo);
m_throttler.PassTime(); m_throttler.PassTime();
}; };
@ -282,7 +285,7 @@ namespace OpenSim.Region.ClientStack.Linden
}; };
} }
public void Process(aPollRequest requestinfo) public void Process(APollRequest requestinfo)
{ {
Hashtable response; Hashtable response;
@ -316,7 +319,7 @@ namespace OpenSim.Region.ClientStack.Linden
headers["Retry-After"] = 30; headers["Retry-After"] = 30;
response["headers"] = headers; response["headers"] = headers;
responses[requestID] = new aPollResponse() {bytes = 0, response = response}; responses[requestID] = new APollResponse() {bytes = 0, response = response};
return; return;
} }
@ -332,7 +335,7 @@ namespace OpenSim.Region.ClientStack.Linden
response["keepalive"] = false; response["keepalive"] = false;
response["reusecontext"] = false; response["reusecontext"] = false;
responses[requestID] = new aPollResponse() {bytes = 0, response = response}; responses[requestID] = new APollResponse() {bytes = 0, response = response};
return; return;
} }
@ -351,7 +354,7 @@ namespace OpenSim.Region.ClientStack.Linden
return; return;
} }
} }
responses[requestID] = new aPollResponse() responses[requestID] = new APollResponse()
{ {
bytes = (int) response["int_bytes"], bytes = (int) response["int_bytes"],
response = response response = response
@ -420,12 +423,20 @@ namespace OpenSim.Region.ClientStack.Linden
private static void DoTextureRequests() private static void DoTextureRequests()
{ {
while (true) APollRequest poolreq;
while (m_NumberScenes > 0)
{ {
aPollRequest poolreq = m_queue.Dequeue(4500); poolreq = null;
Watchdog.UpdateThread(); if(!m_queue.TryTake(out poolreq, 4500) || poolreq == null)
{
Watchdog.UpdateThread();
continue;
}
if(m_NumberScenes <= 0) if(m_NumberScenes <= 0)
return; break;
Watchdog.UpdateThread();
if(poolreq.reqID != UUID.Zero) if(poolreq.reqID != UUID.Zero)
poolreq.thepoll.Process(poolreq); poolreq.thepoll.Process(poolreq);
} }
@ -442,7 +453,7 @@ namespace OpenSim.Region.ClientStack.Linden
ThrottleBytes = pBytes; ThrottleBytes = pBytes;
lastTimeElapsed = Util.GetTimeStampMS(); lastTimeElapsed = Util.GetTimeStampMS();
} }
public bool hasEvents(UUID key, Dictionary<UUID, GetTextureModule.aPollResponse> responses) public bool hasEvents(UUID key, Dictionary<UUID, GetTextureModule.APollResponse> responses)
{ {
PassTime(); PassTime();
// Note, this is called IN LOCK // Note, this is called IN LOCK
@ -451,7 +462,7 @@ namespace OpenSim.Region.ClientStack.Linden
{ {
return false; return false;
} }
GetTextureModule.aPollResponse response; GetTextureModule.APollResponse response;
if (responses.TryGetValue(key, out response)) if (responses.TryGetValue(key, out response))
{ {
// This is any error response // This is any error response