Merge branch 'master' of git://opensimulator.org/git/opensim

TeleportWork
Dan Lake 2013-07-17 15:06:25 -07:00
commit 49b3b7ee83
11 changed files with 193 additions and 115 deletions

View File

@ -76,9 +76,10 @@ namespace OpenSim.Framework
{
lock (m_queueSync)
{
if (m_queue.Count < 1 && m_pqueue.Count < 1)
bool success = true;
while (m_queue.Count < 1 && m_pqueue.Count < 1 && success)
{
Monitor.Wait(m_queueSync, msTimeout);
success = Monitor.Wait(m_queueSync, msTimeout);
}
if (m_pqueue.Count > 0)

View File

@ -50,7 +50,7 @@ namespace OpenSim.Framework.Servers.HttpServer
public enum EventType : int
{
Normal = 0,
LongPoll = 0,
LslHttp = 1,
Inventory = 2
}
@ -80,7 +80,7 @@ namespace OpenSim.Framework.Servers.HttpServer
NoEvents = pNoEvents;
Id = pId;
TimeOutms = pTimeOutms;
Type = EventType.Normal;
Type = EventType.LongPoll;
}
}
}

View File

@ -47,12 +47,11 @@ namespace OpenSim.Framework.Servers.HttpServer
private readonly BaseHttpServer m_server;
private BlockingQueue<PollServiceHttpRequest> m_requests = new BlockingQueue<PollServiceHttpRequest>();
private static Queue<PollServiceHttpRequest> m_slowRequests = new Queue<PollServiceHttpRequest>();
private static Queue<PollServiceHttpRequest> m_retryRequests = new Queue<PollServiceHttpRequest>();
private static Queue<PollServiceHttpRequest> m_longPollRequests = new Queue<PollServiceHttpRequest>();
private uint m_WorkerThreadCount = 0;
private Thread[] m_workerThreads;
private Thread m_retrysThread;
private Thread m_longPollThread;
private bool m_running = true;
private int slowCount = 0;
@ -84,9 +83,9 @@ namespace OpenSim.Framework.Servers.HttpServer
int.MaxValue);
}
m_retrysThread = Watchdog.StartThread(
this.CheckRetries,
string.Format("PollServiceWatcherThread:{0}", m_server.Port),
m_longPollThread = Watchdog.StartThread(
this.CheckLongPollThreads,
string.Format("LongPollServiceWatcherThread:{0}", m_server.Port),
ThreadPriority.Normal,
false,
true,
@ -97,49 +96,52 @@ namespace OpenSim.Framework.Servers.HttpServer
private void ReQueueEvent(PollServiceHttpRequest req)
{
if (m_running)
{
lock (m_retryRequests)
m_retryRequests.Enqueue(req);
}
m_requests.Enqueue(req);
}
public void Enqueue(PollServiceHttpRequest req)
{
if (m_running)
{
if (req.PollServiceArgs.Type != PollServiceEventArgs.EventType.Normal)
if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll)
{
m_requests.Enqueue(req);
lock (m_longPollRequests)
m_longPollRequests.Enqueue(req);
}
else
{
lock (m_slowRequests)
m_slowRequests.Enqueue(req);
}
m_requests.Enqueue(req);
}
}
private void CheckRetries()
private void CheckLongPollThreads()
{
// The only purpose of this thread is to check the EQs for events.
// If there are events, that thread will be placed in the "ready-to-serve" queue, m_requests.
// If there are no events, that thread will be back to its "waiting" queue, m_longPollRequests.
// All other types of tasks (Inventory handlers) don't have the long-poll nature,
// so if they aren't ready to be served by a worker thread (no events), they are placed
// directly back in the "ready-to-serve" queue by the worker thread.
while (m_running)
{
Thread.Sleep(100); // let the world move .. back to faster rate
Thread.Sleep(1000);
Watchdog.UpdateThread();
lock (m_retryRequests)
{
while (m_retryRequests.Count > 0 && m_running)
m_requests.Enqueue(m_retryRequests.Dequeue());
}
slowCount++;
if (slowCount >= 10)
{
slowCount = 0;
lock (m_slowRequests)
List<PollServiceHttpRequest> not_ready = new List<PollServiceHttpRequest>();
lock (m_longPollRequests)
{
while (m_longPollRequests.Count > 0 && m_running)
{
while (m_slowRequests.Count > 0 && m_running)
m_requests.Enqueue(m_slowRequests.Dequeue());
PollServiceHttpRequest req = m_longPollRequests.Dequeue();
if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id) || // there are events in this EQ
(Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms) // no events, but timeout
m_requests.Enqueue(req);
else
not_ready.Add(req);
}
foreach (PollServiceHttpRequest req in not_ready)
m_longPollRequests.Enqueue(req);
}
}
}
@ -153,24 +155,12 @@ namespace OpenSim.Framework.Servers.HttpServer
foreach (Thread t in m_workerThreads)
Watchdog.AbortThread(t.ManagedThreadId);
try
{
foreach (PollServiceHttpRequest req in m_retryRequests)
{
req.DoHTTPGruntWork(m_server, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
}
}
catch
{
}
PollServiceHttpRequest wreq;
m_retryRequests.Clear();
lock (m_slowRequests)
lock (m_longPollRequests)
{
while (m_slowRequests.Count > 0 && m_running)
m_requests.Enqueue(m_slowRequests.Dequeue());
while (m_longPollRequests.Count > 0 && m_running)
m_requests.Enqueue(m_longPollRequests.Dequeue());
}
while (m_requests.Count() > 0)
@ -195,34 +185,33 @@ namespace OpenSim.Framework.Servers.HttpServer
{
while (m_running)
{
PollServiceHttpRequest req = m_requests.Dequeue(5000);
Watchdog.UpdateThread();
if (req != null)
PollServiceHttpRequest req = null;
lock (m_requests)
{
try
if (m_requests.Count() > 0)
req = m_requests.Dequeue();
}
if (req == null)
Thread.Sleep(100);
else
{
//PollServiceHttpRequest req = m_requests.Dequeue(5000);
//m_log.WarnFormat("[YYY]: Dequeued {0}", (req == null ? "null" : req.PollServiceArgs.Type.ToString()));
if (req != null)
{
if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id))
try
{
Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id);
if (responsedata == null)
continue;
if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.Normal) // This is the event queue
if (req.PollServiceArgs.HasEvents(req.RequestID, req.PollServiceArgs.Id))
{
try
{
req.DoHTTPGruntWork(m_server, responsedata);
}
catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
{
// Ignore it, no need to reply
}
}
else
{
m_threadPool.QueueWorkItem(x =>
Hashtable responsedata = req.PollServiceArgs.GetEvents(req.RequestID, req.PollServiceArgs.Id);
if (responsedata == null)
continue;
if (req.PollServiceArgs.Type == PollServiceEventArgs.EventType.LongPoll) // This is the event queue
{
try
{
@ -232,27 +221,41 @@ namespace OpenSim.Framework.Servers.HttpServer
{
// Ignore it, no need to reply
}
}
else
{
m_threadPool.QueueWorkItem(x =>
{
try
{
req.DoHTTPGruntWork(m_server, responsedata);
}
catch (ObjectDisposedException) // Browser aborted before we could read body, server closed the stream
{
// Ignore it, no need to reply
}
return null;
}, null);
}
}
else
{
if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
{
req.DoHTTPGruntWork(
m_server, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
return null;
}, null);
}
}
else
{
ReQueueEvent(req);
if ((Environment.TickCount - req.RequestTime) > req.PollServiceArgs.TimeOutms)
{
req.DoHTTPGruntWork(
m_server, req.PollServiceArgs.NoEvents(req.RequestID, req.PollServiceArgs.Id));
}
else
{
ReQueueEvent(req);
}
}
}
}
catch (Exception e)
{
m_log.ErrorFormat("Exception in poll service thread: " + e.ToString());
catch (Exception e)
{
m_log.ErrorFormat("Exception in poll service thread: " + e.ToString());
}
}
}
}

View File

@ -364,8 +364,7 @@ namespace OpenSim.Region.ClientStack.Linden
caps.RegisterPollHandler(
"EventQueueGet",
new PollServiceEventArgs(
null, GenerateEqgCapPath(eventQueueGetUUID), HasEvents, GetEvents, NoEvents, agentID, 40000));
new PollServiceEventArgs(null, GenerateEqgCapPath(eventQueueGetUUID), HasEvents, GetEvents, NoEvents, agentID, 40000));
Random rnd = new Random(Environment.TickCount);
lock (m_ids)
@ -383,7 +382,10 @@ namespace OpenSim.Region.ClientStack.Linden
Queue<OSD> queue = GetQueue(agentID);
if (queue != null)
lock (queue)
{
//m_log.WarnFormat("POLLED FOR EVENTS BY {0} in {1} -- {2}", agentID, m_scene.RegionInfo.RegionName, queue.Count);
return queue.Count > 0;
}
return false;
}
@ -406,7 +408,7 @@ namespace OpenSim.Region.ClientStack.Linden
public Hashtable GetEvents(UUID requestID, UUID pAgentId)
{
if (DebugLevel >= 2)
m_log.DebugFormat("POLLED FOR EQ MESSAGES BY {0} in {1}", pAgentId, m_scene.RegionInfo.RegionName);
m_log.WarnFormat("POLLED FOR EQ MESSAGES BY {0} in {1}", pAgentId, m_scene.RegionInfo.RegionName);
Queue<OSD> queue = TryGetQueue(pAgentId);
OSD element;

View File

@ -50,15 +50,16 @@ namespace OpenSim.Region.CoreModules.Framework
private readonly List<Scene> m_scenes = new List<Scene>();
private System.Timers.Timer m_timer = new System.Timers.Timer();
//private OpenSim.Framework.BlockingQueue<GridRegionRequest> m_RequestQueue = new OpenSim.Framework.BlockingQueue<GridRegionRequest>();
// private OpenSim.Framework.DoubleQueue<GridRegionRequest> m_RequestQueue = new OpenSim.Framework.DoubleQueue<GridRegionRequest>();
//private Queue<GridRegionRequest> m_RequestQueue = new Queue<GridRegionRequest>();
private Queue<Action> m_RequestQueue = new Queue<Action>();
private Dictionary<string, List<string>> m_Pending = new Dictionary<string, List<string>>();
private int m_Interval;
#region ISharedRegionModule
public void Initialise(IConfigSource config)
{
m_Interval = Util.GetConfigVarFromSections<int>(config, "Interval", new string[] { "ServiceThrottle" }, 5000);
m_timer = new System.Timers.Timer();
m_timer.AutoReset = false;
m_timer.Enabled = true;
@ -131,7 +132,7 @@ namespace OpenSim.Region.CoreModules.Framework
{
if (!m_timer.Enabled)
{
m_timer.Interval = 1000;
m_timer.Interval = m_Interval;
m_timer.Enabled = true;
m_timer.Start();
}
@ -156,18 +157,36 @@ namespace OpenSim.Region.CoreModules.Framework
client.SendRegionHandle(regionID, r.RegionHandle);
};
lock (m_RequestQueue)
m_RequestQueue.Enqueue(action);
Enqueue("region", regionID.ToString(), action);
}
#endregion Events
#region IServiceThrottleModule
public void Enqueue(Action continuation)
public void Enqueue(string category, string itemid, Action continuation)
{
m_RequestQueue.Enqueue(continuation);
lock (m_RequestQueue)
{
if (m_Pending.ContainsKey(category))
{
if (m_Pending[category].Contains(itemid))
// Don't enqueue, it's already pending
return;
}
else
m_Pending.Add(category, new List<string>());
m_Pending[category].Add(itemid);
m_RequestQueue.Enqueue(delegate
{
lock (m_RequestQueue)
m_Pending[category].Remove(itemid);
continuation();
});
}
}
#endregion IServiceThrottleModule

View File

@ -172,11 +172,11 @@ namespace OpenSim.Region.CoreModules.Framework.UserManagement
return;
}
// Not found in cache, get it from services
m_ServiceThrottle.Enqueue(delegate
// Not found in cache, queue continuation
m_ServiceThrottle.Enqueue("name", uuid.ToString(), delegate
{
//m_log.DebugFormat("[YYY]: Name request {0}", uuid);
bool foundRealName = TryGetUserNamesFromServices(uuid, names);
bool foundRealName = TryGetUserNames(uuid, names);
if (names.Length == 2)
{

View File

@ -81,6 +81,8 @@ namespace OpenSim.Region.CoreModules.World.WorldMap
private List<UUID> m_rootAgents = new List<UUID>();
private volatile bool threadrunning = false;
private IServiceThrottleModule m_ServiceThrottle;
//private int CacheRegionsDistance = 256;
#region INonSharedRegionModule Members
@ -131,6 +133,10 @@ namespace OpenSim.Region.CoreModules.World.WorldMap
public virtual void RegionLoaded (Scene scene)
{
if (!m_Enabled)
return;
m_ServiceThrottle = scene.RequestModuleInterface<IServiceThrottleModule>();
}
@ -170,13 +176,13 @@ namespace OpenSim.Region.CoreModules.World.WorldMap
m_scene.EventManager.OnMakeRootAgent += MakeRootAgent;
m_scene.EventManager.OnRegionUp += OnRegionUp;
StartThread(new object());
// StartThread(new object());
}
// this has to be called with a lock on m_scene
protected virtual void RemoveHandlers()
{
StopThread();
// StopThread();
m_scene.EventManager.OnRegionUp -= OnRegionUp;
m_scene.EventManager.OnMakeRootAgent -= MakeRootAgent;
@ -526,7 +532,7 @@ namespace OpenSim.Region.CoreModules.World.WorldMap
/// </summary>
public void process()
{
const int MAX_ASYNC_REQUESTS = 20;
//const int MAX_ASYNC_REQUESTS = 20;
try
{
while (true)
@ -571,13 +577,44 @@ namespace OpenSim.Region.CoreModules.World.WorldMap
Watchdog.RemoveThread();
}
const int MAX_ASYNC_REQUESTS = 20;
/// <summary>
/// Enqueues the map item request into the processing thread
/// Enqueues the map item request into the services throttle processing thread
/// </summary>
/// <param name="state"></param>
public void EnqueueMapItemRequest(MapRequestState state)
public void EnqueueMapItemRequest(MapRequestState st)
{
requests.Enqueue(state);
m_ServiceThrottle.Enqueue("map-item", st.regionhandle.ToString() + st.agentID.ToString(), delegate
{
if (st.agentID != UUID.Zero)
{
bool dorequest = true;
lock (m_rootAgents)
{
if (!m_rootAgents.Contains(st.agentID))
dorequest = false;
}
if (dorequest && !m_blacklistedregions.ContainsKey(st.regionhandle))
{
if (nAsyncRequests >= MAX_ASYNC_REQUESTS) // hit the break
{
// AH!!! Recursive !
// Put this request back in the queue and return
EnqueueMapItemRequest(st);
return;
}
RequestMapItemsDelegate d = RequestMapItemsAsync;
d.BeginInvoke(st.agentID, st.flags, st.EstateID, st.godlike, st.itemtype, st.regionhandle, RequestMapItemsCompleted, null);
//OSDMap response = RequestMapItemsAsync(st.agentID, st.flags, st.EstateID, st.godlike, st.itemtype, st.regionhandle);
//RequestMapItemsCompleted(response);
Interlocked.Increment(ref nAsyncRequests);
}
}
});
}
/// <summary>

View File

@ -5,7 +5,15 @@ namespace OpenSim.Region.Framework.Interfaces
{
public interface IServiceThrottleModule
{
void Enqueue(Action continuation);
/// <summary>
/// Enqueue a continuation meant to get a resource from elsewhere.
/// As usual with CPS, caller beware: if that continuation is a never-ending computation,
/// the whole thread will be blocked, and no requests are processed
/// </summary>
/// <param name="category">Category of the resource (e.g. name, region)</param>
/// <param name="itemid">The resource identifier</param>
/// <param name="continuation">The continuation to be executed</param>
void Enqueue(string category, string itemid, Action continuation);
}
}

View File

@ -103,9 +103,10 @@ public abstract class BSPhysObject : PhysicsActor
CollisionsLastTickStep = -1;
SubscribedEventsMs = 0;
CollidingStep = 0;
CollidingGroundStep = 0;
CollisionAccumulation = 0;
// Crazy values that will never be true
CollidingStep = BSScene.NotASimulationStep;
CollidingGroundStep = BSScene.NotASimulationStep;
CollisionAccumulation = BSScene.NotASimulationStep;
ColliderIsMoving = false;
CollisionScore = 0;
@ -349,7 +350,7 @@ public abstract class BSPhysObject : PhysicsActor
if (value)
CollidingStep = PhysScene.SimulationStep;
else
CollidingStep = 0;
CollidingStep = BSScene.NotASimulationStep;
}
}
public override bool CollidingGround {
@ -359,7 +360,7 @@ public abstract class BSPhysObject : PhysicsActor
if (value)
CollidingGroundStep = PhysScene.SimulationStep;
else
CollidingGroundStep = 0;
CollidingGroundStep = BSScene.NotASimulationStep;
}
}
public override bool CollidingObj {
@ -368,7 +369,7 @@ public abstract class BSPhysObject : PhysicsActor
if (value)
CollidingObjectStep = PhysScene.SimulationStep;
else
CollidingObjectStep = 0;
CollidingObjectStep = BSScene.NotASimulationStep;
}
}

View File

@ -97,6 +97,9 @@ public sealed class BSScene : PhysicsScene, IPhysicsParameters
internal long m_simulationStep = 0; // The current simulation step.
public long SimulationStep { get { return m_simulationStep; } }
// A number to use for SimulationStep that is probably not any step value
// Used by the collision code (which remembers the step when a collision happens) to remember not any simulation step.
public static long NotASimulationStep = -1234;
internal float LastTimeStep { get; private set; } // The simulation time from the last invocation of Simulate()

View File

@ -1727,5 +1727,9 @@ MaxStringSpace = 0
;; {MaxDistance} {} {Cut-off distance at which sounds will not be sent to users} {100.0}
MaxDistance = 100.0
[ServiceThrottle]
;; Default time interval (in ms) for the throttle service thread to wake up
Interval = 5000
[Modules]
Include-modules = "addon-modules/*/config/*.ini"