Revamp the v3 inventory sending. Uses threads and some nifty mechanics to

leverage the Poll Service without blocking it's workers.
avinationmerge
Melanie 2012-09-11 10:39:43 +02:00
parent 657428a439
commit a6753c14a5
2 changed files with 119 additions and 82 deletions

View File

@ -42,6 +42,7 @@ using OpenSim.Region.Framework.Scenes;
using OpenSim.Services.Interfaces; using OpenSim.Services.Interfaces;
using Caps = OpenSim.Framework.Capabilities.Caps; using Caps = OpenSim.Framework.Capabilities.Caps;
using OpenSim.Capabilities.Handlers; using OpenSim.Capabilities.Handlers;
using OpenSim.Framework.Monitoring;
namespace OpenSim.Region.ClientStack.Linden namespace OpenSim.Region.ClientStack.Linden
{ {
@ -58,13 +59,13 @@ namespace OpenSim.Region.ClientStack.Linden
private IInventoryService m_InventoryService; private IInventoryService m_InventoryService;
private ILibraryService m_LibraryService; private ILibraryService m_LibraryService;
private WebFetchInvDescHandler m_webFetchHandler; private static WebFetchInvDescHandler m_webFetchHandler;
private object m_lock = new object();
private Dictionary<UUID, string> m_capsDict = new Dictionary<UUID, string>(); private Dictionary<UUID, string> m_capsDict = new Dictionary<UUID, string>();
private Dictionary<UUID, Hashtable> m_requests = new Dictionary<UUID, Hashtable>(); private static Thread[] m_workerThreads = null;
bool m_busy = false;
private static OpenMetaverse.BlockingQueue<PollServiceInventoryEventArgs> m_queue =
new OpenMetaverse.BlockingQueue<PollServiceInventoryEventArgs>();
#region ISharedRegionModule Members #region ISharedRegionModule Members
@ -94,6 +95,22 @@ namespace OpenSim.Region.ClientStack.Linden
m_scene.EventManager.OnRegisterCaps += RegisterCaps; m_scene.EventManager.OnRegisterCaps += RegisterCaps;
m_scene.EventManager.OnDeregisterCaps += DeregisterCaps; m_scene.EventManager.OnDeregisterCaps += DeregisterCaps;
if (m_workerThreads == null)
{
m_workerThreads = new Thread[2];
for (uint i = 0; i < 2; i++)
{
m_workerThreads[i] = Watchdog.StartThread(DoInventoryRequests,
String.Format("InventoryWorkerThread{0}", i),
ThreadPriority.Normal,
false,
true,
null,
int.MaxValue);
}
}
} }
public void PostInitialise() public void PostInitialise()
@ -111,13 +128,103 @@ namespace OpenSim.Region.ClientStack.Linden
#endregion #endregion
~WebFetchInvDescModule()
{
foreach (Thread t in m_workerThreads)
t.Abort();
}
private class PollServiceInventoryEventArgs : PollServiceEventArgs
{
private List<Hashtable> requests =
new List<Hashtable>();
private Dictionary<UUID, Hashtable> responses =
new Dictionary<UUID, Hashtable>();
public PollServiceInventoryEventArgs(UUID pId) :
base(null, null, null, null, pId, 30000)
{
HasEvents = (x, y) => { return this.responses.ContainsKey(x); };
GetEvents = (x, y, s) =>
{
try
{
return this.responses[x];
}
finally
{
responses.Remove(x);
}
};
Request = (x, y) =>
{
y["RequestID"] = x.ToString();
lock (this.requests)
this.requests.Add(y);
m_queue.Enqueue(this);
};
NoEvents = (x, y) =>
{
lock (this.requests)
{
Hashtable request = requests.Find(id => id["RequestID"].ToString() == x.ToString());
requests.Remove(request);
}
Hashtable response = new Hashtable();
response["int_response_code"] = 500;
response["str_response_string"] = "Script timeout";
response["content_type"] = "text/plain";
response["keepalive"] = false;
response["reusecontext"] = false;
return response;
};
}
public void Process()
{
Hashtable request = null;
try
{
lock (this.requests)
{
request = requests[0];
requests.RemoveAt(0);
}
}
catch
{
return;
}
UUID requestID = new UUID(request["RequestID"].ToString());
Hashtable response = new Hashtable();
response["int_response_code"] = 200;
response["content_type"] = "text/plain";
response["keepalive"] = false;
response["reusecontext"] = false;
response["str_response_string"] = m_webFetchHandler.FetchInventoryDescendentsRequest(request["body"].ToString(), String.Empty, String.Empty, null, null);
responses[requestID] = response;
}
}
private void RegisterCaps(UUID agentID, Caps caps) private void RegisterCaps(UUID agentID, Caps caps)
{ {
string capUrl = "/CAPS/" + UUID.Random() + "/"; string capUrl = "/CAPS/" + UUID.Random() + "/";
// Register this as a poll service // Register this as a poll service
// absurd large timeout to tune later to make a bit less than viewer // absurd large timeout to tune later to make a bit less than viewer
PollServiceEventArgs args = new PollServiceEventArgs(HttpRequestHandler, HasEvents, GetEvents, NoEvents, agentID, 300000); PollServiceInventoryEventArgs args = new PollServiceInventoryEventArgs(agentID);
args.Type = PollServiceEventArgs.EventType.Inventory; args.Type = PollServiceEventArgs.EventType.Inventory;
MainServer.Instance.AddPollServiceHTTPHandler(capUrl, args); MainServer.Instance.AddPollServiceHTTPHandler(capUrl, args);
@ -135,8 +242,6 @@ namespace OpenSim.Region.ClientStack.Linden
caps.RegisterHandler("FetchInventoryDescendents2", String.Format("{0}://{1}:{2}{3}", protocol, hostName, port, capUrl)); caps.RegisterHandler("FetchInventoryDescendents2", String.Format("{0}://{1}:{2}{3}", protocol, hostName, port, capUrl));
m_capsDict[agentID] = capUrl; m_capsDict[agentID] = capUrl;
m_busy = false;
} }
private void DeregisterCaps(UUID agentID, Caps caps) private void DeregisterCaps(UUID agentID, Caps caps)
@ -150,83 +255,14 @@ namespace OpenSim.Region.ClientStack.Linden
} }
} }
public void HttpRequestHandler(UUID requestID, Hashtable request) private void DoInventoryRequests()
{ {
// m_log.DebugFormat("[FETCH2]: Received request {0}", requestID); while (true)
lock(m_lock) {
m_requests[requestID] = request; PollServiceInventoryEventArgs args = m_queue.Dequeue();
args.Process();
} }
private bool HasEvents(UUID requestID, UUID sessionID)
{
lock (m_lock)
{
return !m_busy;
}
}
private Hashtable NoEvents(UUID requestID, UUID sessionID)
{
lock(m_lock)
m_requests.Remove(requestID);
Hashtable response = new Hashtable();
response["int_response_code"] = 500;
response["str_response_string"] = "Script timeout";
response["content_type"] = "text/plain";
response["keepalive"] = false;
response["reusecontext"] = false;
lock (m_lock)
m_busy = false;
return response;
}
private Hashtable GetEvents(UUID requestID, UUID sessionID, string request)
{
lock (m_lock)
m_busy = true;
Hashtable response = new Hashtable();
response["int_response_code"] = 500;
response["str_response_string"] = "Internal error";
response["content_type"] = "text/plain";
response["keepalive"] = false;
response["reusecontext"] = false;
try
{
Hashtable requestHash;
lock (m_lock)
{
if (!m_requests.TryGetValue(requestID, out requestHash))
{
m_busy = false;
response["str_response_string"] = "Invalid request";
return response;
}
m_requests.Remove(requestID);
}
// m_log.DebugFormat("[FETCH2]: Processed request {0}", requestID);
string reply = m_webFetchHandler.FetchInventoryDescendentsRequest(requestHash["body"].ToString(), String.Empty, String.Empty, null, null);
response["int_response_code"] = 200;
response["str_response_string"] = reply;
}
finally
{
lock (m_lock)
m_busy = false;
}
return response;
} }
} }
} }

View File

@ -1706,6 +1706,7 @@
<Reference name="OpenSim.Framework.Servers"/> <Reference name="OpenSim.Framework.Servers"/>
<Reference name="OpenSim.Framework.Servers.HttpServer"/> <Reference name="OpenSim.Framework.Servers.HttpServer"/>
<Reference name="OpenSim.Framework.Console"/> <Reference name="OpenSim.Framework.Console"/>
<Reference name="OpenSim.Framework.Monitoring"/>
<Reference name="OpenSim.Region.Physics.Manager"/> <Reference name="OpenSim.Region.Physics.Manager"/>
<Reference name="OpenSim.Services.Interfaces"/> <Reference name="OpenSim.Services.Interfaces"/>
<Reference name="Mono.Addins" path="../../../../../bin/"/> <Reference name="Mono.Addins" path="../../../../../bin/"/>