Port Avination's inventory send throttling

cpu-performance
Melanie 2013-06-05 23:42:50 +01:00
parent 43d804b998
commit a7dbafb0e3
3 changed files with 324 additions and 54 deletions

View File

@ -2233,4 +2233,112 @@ namespace OpenSim.Framework
return str.Replace("_", "\\_").Replace("%", "\\%");
}
}
public class DoubleQueue<T> where T:class
{
private Queue<T> m_lowQueue = new Queue<T>();
private Queue<T> m_highQueue = new Queue<T>();
private object m_syncRoot = new object();
private Semaphore m_s = new Semaphore(0, 1);
public DoubleQueue()
{
}
public virtual int Count
{
get { return m_highQueue.Count + m_lowQueue.Count; }
}
public virtual void Enqueue(T data)
{
Enqueue(m_lowQueue, data);
}
public virtual void EnqueueLow(T data)
{
Enqueue(m_lowQueue, data);
}
public virtual void EnqueueHigh(T data)
{
Enqueue(m_highQueue, data);
}
private void Enqueue(Queue<T> q, T data)
{
lock (m_syncRoot)
{
m_lowQueue.Enqueue(data);
m_s.WaitOne(0);
m_s.Release();
}
}
public virtual T Dequeue()
{
return Dequeue(Timeout.Infinite);
}
public virtual T Dequeue(int tmo)
{
return Dequeue(TimeSpan.FromMilliseconds(tmo));
}
public virtual T Dequeue(TimeSpan wait)
{
T res = null;
if (!Dequeue(wait, ref res))
return null;
return res;
}
public bool Dequeue(int timeout, ref T res)
{
return Dequeue(TimeSpan.FromMilliseconds(timeout), ref res);
}
public bool Dequeue(TimeSpan wait, ref T res)
{
if (!m_s.WaitOne(wait))
return false;
lock (m_syncRoot)
{
if (m_highQueue.Count > 0)
res = m_highQueue.Dequeue();
else
res = m_lowQueue.Dequeue();
if (m_highQueue.Count == 0 && m_lowQueue.Count == 0)
return true;
try
{
m_s.Release();
}
catch
{
}
return true;
}
}
public virtual void Clear()
{
lock (m_syncRoot)
{
// Make sure sem count is 0
m_s.WaitOne(0);
m_lowQueue.Clear();
m_highQueue.Clear();
}
}
}
}

View File

@ -27,18 +27,25 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Reflection;
using System.Threading;
using log4net;
using Nini.Config;
using Mono.Addins;
using OpenMetaverse;
using OpenSim.Framework;
using OpenSim.Framework.Monitoring;
using OpenSim.Framework.Servers;
using OpenSim.Framework.Servers.HttpServer;
using OpenSim.Region.Framework.Interfaces;
using OpenSim.Region.Framework.Scenes;
using OpenSim.Framework.Capabilities;
using OpenSim.Services.Interfaces;
using Caps = OpenSim.Framework.Capabilities.Caps;
using OpenSim.Capabilities.Handlers;
using OpenMetaverse;
using OpenMetaverse.StructuredData;
namespace OpenSim.Region.ClientStack.Linden
{
@ -48,67 +55,74 @@ namespace OpenSim.Region.ClientStack.Linden
[Extension(Path = "/OpenSim/RegionModules", NodeName = "RegionModule", Id = "WebFetchInvDescModule")]
public class WebFetchInvDescModule : INonSharedRegionModule
{
// private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
class aPollRequest
{
public PollServiceInventoryEventArgs thepoll;
public UUID reqID;
public Hashtable request;
public ScenePresence presence;
public List<UUID> folders;
}
private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
private Scene m_scene;
private IInventoryService m_InventoryService;
private ILibraryService m_LibraryService;
private bool m_Enabled;
private static WebFetchInvDescHandler m_webFetchHandler;
private string m_fetchInventoryDescendents2Url;
private string m_webFetchInventoryDescendentsUrl;
private Dictionary<UUID, string> m_capsDict = new Dictionary<UUID, string>();
private static Thread[] m_workerThreads = null;
private WebFetchInvDescHandler m_webFetchHandler;
private static DoubleQueue<aPollRequest> m_queue =
new DoubleQueue<aPollRequest>();
#region ISharedRegionModule Members
public void Initialise(IConfigSource source)
{
IConfig config = source.Configs["ClientStack.LindenCaps"];
if (config == null)
return;
m_fetchInventoryDescendents2Url = config.GetString("Cap_FetchInventoryDescendents2", string.Empty);
m_webFetchInventoryDescendentsUrl = config.GetString("Cap_WebFetchInventoryDescendents", string.Empty);
if (m_fetchInventoryDescendents2Url != string.Empty || m_webFetchInventoryDescendentsUrl != string.Empty)
{
m_Enabled = true;
}
}
public void AddRegion(Scene s)
{
if (!m_Enabled)
return;
m_scene = s;
}
public void RemoveRegion(Scene s)
{
if (!m_Enabled)
return;
m_scene.EventManager.OnRegisterCaps -= RegisterCaps;
m_scene.EventManager.OnDeregisterCaps -= DeregisterCaps;
m_scene = null;
}
public void RegionLoaded(Scene s)
{
if (!m_Enabled)
return;
m_InventoryService = m_scene.InventoryService;
m_LibraryService = m_scene.LibraryService;
// We'll reuse the same handler for all requests.
if (m_fetchInventoryDescendents2Url == "localhost" || m_webFetchInventoryDescendentsUrl == "localhost")
m_webFetchHandler = new WebFetchInvDescHandler(m_InventoryService, m_LibraryService);
m_webFetchHandler = new WebFetchInvDescHandler(m_InventoryService, m_LibraryService);
m_scene.EventManager.OnRegisterCaps += RegisterCaps;
m_scene.EventManager.OnDeregisterCaps += DeregisterCaps;
if (m_workerThreads == null)
{
m_workerThreads = new Thread[2];
for (uint i = 0; i < 2; i++)
{
m_workerThreads[i] = Watchdog.StartThread(DoInventoryRequests,
String.Format("InventoryWorkerThread{0}", i),
ThreadPriority.Normal,
false,
true,
null,
int.MaxValue);
}
}
}
public void PostInitialise()
@ -126,43 +140,190 @@ namespace OpenSim.Region.ClientStack.Linden
#endregion
private void RegisterCaps(UUID agentID, Caps caps)
~WebFetchInvDescModule()
{
if (m_webFetchInventoryDescendentsUrl != "")
RegisterFetchCap(agentID, caps, "WebFetchInventoryDescendents", m_webFetchInventoryDescendentsUrl);
if (m_fetchInventoryDescendents2Url != "")
RegisterFetchCap(agentID, caps, "FetchInventoryDescendents2", m_fetchInventoryDescendents2Url);
foreach (Thread t in m_workerThreads)
Watchdog.AbortThread(t.ManagedThreadId);
}
private void RegisterFetchCap(UUID agentID, Caps caps, string capName, string url)
private class PollServiceInventoryEventArgs : PollServiceEventArgs
{
private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
private Dictionary<UUID, Hashtable> responses =
new Dictionary<UUID, Hashtable>();
private Scene m_scene;
public PollServiceInventoryEventArgs(Scene scene, UUID pId) :
base(null, null, null, null, pId)
{
m_scene = scene;
HasEvents = (x, y) => { lock (responses) return responses.ContainsKey(x); };
GetEvents = (x, y, z) =>
{
lock (responses)
{
try
{
return responses[x];
}
finally
{
responses.Remove(x);
}
}
};
Request = (x, y) =>
{
ScenePresence sp = m_scene.GetScenePresence(Id);
if (sp == null)
{
m_log.ErrorFormat("[INVENTORY]: Unable to find ScenePresence for {0}", Id);
return;
}
aPollRequest reqinfo = new aPollRequest();
reqinfo.thepoll = this;
reqinfo.reqID = x;
reqinfo.request = y;
reqinfo.presence = sp;
reqinfo.folders = new List<UUID>();
// Decode the request here
string request = y["body"].ToString();
request = request.Replace("<string>00000000-0000-0000-0000-000000000000</string>", "<uuid>00000000-0000-0000-0000-000000000000</uuid>");
request = request.Replace("<key>fetch_folders</key><integer>0</integer>", "<key>fetch_folders</key><boolean>0</boolean>");
request = request.Replace("<key>fetch_folders</key><integer>1</integer>", "<key>fetch_folders</key><boolean>1</boolean>");
Hashtable hash = new Hashtable();
try
{
hash = (Hashtable)LLSD.LLSDDeserialize(Utils.StringToBytes(request));
}
catch (LLSD.LLSDParseException e)
{
m_log.ErrorFormat("[INVENTORY]: Fetch error: {0}{1}" + e.Message, e.StackTrace);
m_log.Error("Request: " + request);
return;
}
catch (System.Xml.XmlException)
{
m_log.ErrorFormat("[INVENTORY]: XML Format error");
}
ArrayList foldersrequested = (ArrayList)hash["folders"];
bool highPriority = false;
for (int i = 0; i < foldersrequested.Count; i++)
{
Hashtable inventoryhash = (Hashtable)foldersrequested[i];
string folder = inventoryhash["folder_id"].ToString();
UUID folderID;
if (UUID.TryParse(folder, out folderID))
{
if (!reqinfo.folders.Contains(folderID))
{
//TODO: Port COF handling from Avination
reqinfo.folders.Add(folderID);
}
}
}
if (highPriority)
m_queue.EnqueueHigh(reqinfo);
else
m_queue.EnqueueLow(reqinfo);
};
NoEvents = (x, y) =>
{
/*
lock (requests)
{
Hashtable request = requests.Find(id => id["RequestID"].ToString() == x.ToString());
requests.Remove(request);
}
*/
Hashtable response = new Hashtable();
response["int_response_code"] = 500;
response["str_response_string"] = "Script timeout";
response["content_type"] = "text/plain";
response["keepalive"] = false;
response["reusecontext"] = false;
return response;
};
}
public void Process(aPollRequest requestinfo)
{
UUID requestID = requestinfo.reqID;
Hashtable response = new Hashtable();
response["int_response_code"] = 200;
response["content_type"] = "text/plain";
response["keepalive"] = false;
response["reusecontext"] = false;
response["str_response_string"] = m_webFetchHandler.FetchInventoryDescendentsRequest(
requestinfo.request["body"].ToString(), String.Empty, String.Empty, null, null);
lock (responses)
responses[requestID] = response;
}
}
private void RegisterCaps(UUID agentID, Caps caps)
{
string capUrl = "/CAPS/" + UUID.Random() + "/";
// Register this as a poll service
PollServiceInventoryEventArgs args = new PollServiceInventoryEventArgs(m_scene, agentID);
MainServer.Instance.AddPollServiceHTTPHandler(capUrl, args);
string hostName = m_scene.RegionInfo.ExternalHostName;
uint port = (MainServer.Instance == null) ? 0 : MainServer.Instance.Port;
string protocol = "http";
if (MainServer.Instance.UseSSL)
{
hostName = MainServer.Instance.SSLCommonName;
port = MainServer.Instance.SSLPort;
protocol = "https";
}
caps.RegisterHandler("FetchInventoryDescendents2", String.Format("{0}://{1}:{2}{3}", protocol, hostName, port, capUrl));
m_capsDict[agentID] = capUrl;
}
private void DeregisterCaps(UUID agentID, Caps caps)
{
string capUrl;
if (url == "localhost")
if (m_capsDict.TryGetValue(agentID, out capUrl))
{
capUrl = "/CAPS/" + UUID.Random();
IRequestHandler reqHandler
= new RestStreamHandler(
"POST",
capUrl,
m_webFetchHandler.FetchInventoryDescendentsRequest,
"FetchInventoryDescendents2",
agentID.ToString());
caps.RegisterHandler(capName, reqHandler);
MainServer.Instance.RemoveHTTPHandler("", capUrl);
m_capsDict.Remove(agentID);
}
else
}
private void DoInventoryRequests()
{
while (true)
{
capUrl = url;
aPollRequest poolreq = m_queue.Dequeue();
caps.RegisterHandler(capName, capUrl);
poolreq.thepoll.Process(poolreq);
}
// m_log.DebugFormat(
// "[WEB FETCH INV DESC MODULE]: Registered capability {0} at {1} in region {2} for {3}",
// capName, capUrl, m_scene.RegionInfo.RegionName, agentID);
}
}
}
}

View File

@ -1531,6 +1531,7 @@
<Reference name="OpenSim.Capabilities"/>
<Reference name="OpenSim.Capabilities.Handlers"/>
<Reference name="OpenSim.Framework"/>
<Reference name="OpenSim.Framework.Monitoring"/>
<Reference name="OpenSim.Framework.Servers"/>
<Reference name="OpenSim.Framework.Servers.HttpServer"/>
<Reference name="OpenSim.Framework.Console"/>