Reverting the reverts I did yesterday. cpu-branch has now been

successfully tested, and I'm merging back those changes, which proved to
be good.
Revert "Revert "Cleared up much confusion in PollServiceRequestManager. Here's the history:""

This reverts commit fa2370b32e.
TeleportWork
Diva Canto 2013-07-18 12:23:27 -07:00
parent ae997fffee
commit 9e35b069a4
4 changed files with 41 additions and 52 deletions

View File

@ -76,7 +76,7 @@ namespace OpenSim.Framework
{ {
lock (m_queueSync) lock (m_queueSync)
{ {
if (m_queue.Count < 1 && m_pqueue.Count < 1) while (m_queue.Count < 1 && m_pqueue.Count < 1)
{ {
Monitor.Wait(m_queueSync, msTimeout); Monitor.Wait(m_queueSync, msTimeout);
} }

View File

@ -50,7 +50,7 @@ namespace OpenSim.Framework.Servers.HttpServer
public enum EventType : int public enum EventType : int
{ {
Normal = 0, LongPoll = 0,
LslHttp = 1, LslHttp = 1,
Inventory = 2 Inventory = 2
} }
@ -80,7 +80,7 @@ namespace OpenSim.Framework.Servers.HttpServer
NoEvents = pNoEvents; NoEvents = pNoEvents;
Id = pId; Id = pId;
TimeOutms = pTimeOutms; TimeOutms = pTimeOutms;
Type = EventType.Normal; Type = EventType.LongPoll;
} }
} }
} }

View File

@ -47,12 +47,11 @@ namespace OpenSim.Framework.Servers.HttpServer
private readonly BaseHttpServer m_server; private readonly BaseHttpServer m_server;
private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>(); private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>();
private static Queue<PollServiceHttpRequest> m_slowRequests = new Queue<PollServiceHttpRequest>(); private static Queue<PollServiceHttpRequest> m_longPollRequests = new Queue<PollServiceHttpRequest>();
private static Queue<PollServiceHttpRequest> m_retryRequests = new Queue<PollServiceHttpRequest>();
private uint m_WorkerThreadCount = 0; private uint m_WorkerThreadCount = 0;
private Thread[] m_workerThreads; private Thread[] m_workerThreads;
private Thread m_retrysThread; private Thread m_longPollThread;
private bool m_running = true; private bool m_running = true;
private int slowCount = 0; private int slowCount = 0;
@ -84,9 +83,9 @@ namespace OpenSim.Framework.Servers.HttpServer
int.MaxValue); int.MaxValue);
} }
m_retrysThread = Watchdog.StartThread( m_longPollThread = Watchdog.StartThread(
this.CheckRetries, this.CheckLongPollThreads,
string.Format("PollServiceWatcherThread:{0}", m_server.Port), string.Format("LongPollServiceWatcherThread:{0}", m_server.Port),
ThreadPriority.Normal, ThreadPriority.Normal,
false, false,
true, true,
@ -97,48 +96,47 @@ namespace OpenSim.Framework.Servers.HttpServer
private void ReQueueEvent(PollServiceHttpRequest req) private void ReQueueEvent(PollServiceHttpRequest req)
{ {
if (m_running) if (m_running)
{ m_requests.Enqueue(req);
lock (m_retryRequests)
m_retryRequests.Enqueue(req);
}
} }
public void Enqueue(PollServiceHttpRequest req) public void Enqueue(PollServiceHttpRequest req)
{ {
if (m_running) if (m_running)
{ {
if (req.PollServiceArgs.Type != PollServiceEventArgs.EventType.Normal) if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll)
{ {
m_requests.Enqueue(req); lock (m_longPollRequests)
m_longPollRequests.Enqueue(req);
} }
else else
{ m_requests.Enqueue(req);
lock (m_slowRequests)
m_slowRequests.Enqueue(req);
}
} }
} }
private void CheckRetries() private void CheckLongPollThreads()
{ {
// The only purpose of this thread is to check the EQs for events.
// If there are events, that thread will be placed in the "ready-to-serve" queue, m_requests.
// If there are no events, that thread will be back to its "waiting" queue, m_longPollRequests.
// All other types of tasks (Inventory handlers) don't have the long-poll nature,
// so if they aren't ready to be served by a worker thread (no events), they are placed
// directly back in the "ready-to-serve" queue by the worker thread.
while (m_running) while (m_running)
{ {
Thread.Sleep(100); // let the world move .. back to faster rate Thread.Sleep(1000);
Watchdog.UpdateThread(); Watchdog.UpdateThread();
lock (m_retryRequests)
{
while (m_retryRequests.Count > 0 && m_running)
m_requests.Enqueue(m_retryRequests.Dequeue());
}
slowCount++;
if (slowCount >= 10)
{
slowCount = 0;
lock (m_slowRequests) PollServiceHttpRequest req;
lock (m_longPollRequests)
{
while (m_longPollRequests.Count > 0 && m_running)
{ {
while (m_slowRequests.Count > 0 && m_running) req = m_longPollRequests.Dequeue();
m_requests.Enqueue(m_slowRequests.Dequeue()); if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id) || // there are events in this EQ
(Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) // no events, but timeout
m_requests.Enqueue(req);
else
m_longPollRequests.Enqueue(req);
} }
} }
} }
@ -153,24 +151,12 @@ namespace OpenSim.Framework.Servers.HttpServer
foreach (Thread t in m_workerThreads) foreach (Thread t in m_workerThreads)
Watchdog.AbortThread(t.ManagedThreadId); Watchdog.AbortThread(t.ManagedThreadId);
try
{
foreach (PollServiceHttpRequest req in m_retryRequests)
{
req.DoHTTPGruntWork(m_server, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
}
}
catch
{
}
PollServiceHttpRequest wreq; PollServiceHttpRequest wreq;
m_retryRequests.Clear();
lock (m_slowRequests) lock (m_longPollRequests)
{ {
while (m_slowRequests.Count > 0 && m_running) while (m_longPollRequests.Count > 0 && m_running)
m_requests.Enqueue(m_slowRequests.Dequeue()); m_requests.Enqueue(m_longPollRequests.Dequeue());
} }
while (m_requests.Count() > 0) while (m_requests.Count() > 0)
@ -196,6 +182,7 @@ namespace OpenSim.Framework.Servers.HttpServer
while (m_running) while (m_running)
{ {
PollServiceHttpRequest req = m_requests.Dequeue(5000); PollServiceHttpRequest req = m_requests.Dequeue(5000);
//m_log.WarnFormat("[YYY]: Dequeued {0}", (req == null ? "null" : req.PollServiceArgs.Type.ToString()));
Watchdog.UpdateThread(); Watchdog.UpdateThread();
if (req != null) if (req != null)
@ -209,7 +196,7 @@ namespace OpenSim.Framework.Servers.HttpServer
if (responsedata == null) if (responsedata == null)
continue; continue;
if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.Normal) // This is the event queue if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll) // This is the event queue
{ {
try try
{ {

View File

@ -364,8 +364,7 @@ namespace OpenSim.Region.ClientStack.Linden
caps.RegisterPollHandler( caps.RegisterPollHandler(
"EventQueueGet", "EventQueueGet",
new PollServiceEventArgs( new PollServiceEventArgs(null, GenerateEqgCapPath(eventQueueGetUUID), HasEvents, GetEvents, NoEvents, agentID, 40000));
null, GenerateEqgCapPath(eventQueueGetUUID), HasEvents, GetEvents, NoEvents, agentID, 40000));
Random rnd = new Random(Environment.TickCount); Random rnd = new Random(Environment.TickCount);
lock (m_ids) lock (m_ids)
@ -383,7 +382,10 @@ namespace OpenSim.Region.ClientStack.Linden
Queue<OSD> queue = GetQueue(agentID); Queue<OSD> queue = GetQueue(agentID);
if (queue != null) if (queue != null)
lock (queue) lock (queue)
{
//m_log.WarnFormat("POLLED FOR EVENTS BY {0} in {1} -- {2}", agentID, m_scene.RegionInfo.RegionName, queue.Count);
return queue.Count > 0; return queue.Count > 0;
}
return false; return false;
} }
@ -406,7 +408,7 @@ namespace OpenSim.Region.ClientStack.Linden
public Hashtable GetEvents(UUID requestID, UUID pAgentId) public Hashtable GetEvents(UUID requestID, UUID pAgentId)
{ {
if (DebugLevel >= 2) if (DebugLevel >= 2)
m_log.DebugFormat("POLLED FOR EQ MESSAGES BY {0} in {1}", pAgentId, m_scene.RegionInfo.RegionName); m_log.WarnFormat("POLLED FOR EQ MESSAGES BY {0} in {1}", pAgentId, m_scene.RegionInfo.RegionName);
Queue<OSD> queue = TryGetQueue(pAgentId); Queue<OSD> queue = TryGetQueue(pAgentId);
OSD element; OSD element;