Sequence/throttle asset retrievals.

avinationmerge
Melanie 2012-09-14 23:09:07 +02:00
parent 387e59ff7f
commit 2aa7a22129
2 changed files with 79 additions and 47 deletions

View File

@ -1471,6 +1471,8 @@ namespace OpenSim.Framework.Servers.HttpServer
else else
responseString = (string)responsedata["str_response_string"]; responseString = (string)responsedata["str_response_string"];
contentType = (string)responsedata["content_type"]; contentType = (string)responsedata["content_type"];
if (responseString == null)
responseString = String.Empty;
} }
catch catch
{ {

View File

@ -27,6 +27,7 @@
using log4net; using log4net;
using System; using System;
using System.Threading;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO; using System.IO;
using System.Reflection; using System.Reflection;
@ -50,7 +51,7 @@ namespace OpenSim.Services.Connectors
private IImprovedAssetCache m_Cache = null; private IImprovedAssetCache m_Cache = null;
private int m_retryCounter; private int m_retryCounter;
private Dictionary<int, List<AssetBase>> m_retryQueue = new Dictionary<int, List<AssetBase>>(); private Dictionary<int, List<AssetBase>> m_retryQueue = new Dictionary<int, List<AssetBase>>();
private Timer m_retryTimer; private System.Timers.Timer m_retryTimer;
private delegate void AssetRetrievedEx(AssetBase asset); private delegate void AssetRetrievedEx(AssetBase asset);
// Keeps track of concurrent requests for the same asset, so that it's only loaded once. // Keeps track of concurrent requests for the same asset, so that it's only loaded once.
@ -61,6 +62,8 @@ namespace OpenSim.Services.Connectors
private Dictionary<string, string> m_UriMap = new Dictionary<string, string>(); private Dictionary<string, string> m_UriMap = new Dictionary<string, string>();
private Thread[] m_fetchThreads;
public AssetServicesConnector() public AssetServicesConnector()
{ {
} }
@ -96,7 +99,7 @@ namespace OpenSim.Services.Connectors
} }
m_retryTimer = new Timer(); m_retryTimer = new System.Timers.Timer();
m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck); m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck);
m_retryTimer.Interval = 60000; m_retryTimer.Interval = 60000;
@ -112,6 +115,14 @@ namespace OpenSim.Services.Connectors
m_UriMap[prefix] = groupHost; m_UriMap[prefix] = groupHost;
//m_log.DebugFormat("[ASSET]: Using {0} for prefix {1}", groupHost, prefix); //m_log.DebugFormat("[ASSET]: Using {0} for prefix {1}", groupHost, prefix);
} }
m_fetchThreads = new Thread[2];
for (int i = 0 ; i < 2 ; i++)
{
m_fetchThreads[i] = new Thread(AssetRequestProcessor);
m_fetchThreads[i].Start();
}
} }
private string MapServer(string id) private string MapServer(string id)
@ -261,6 +272,66 @@ namespace OpenSim.Services.Connectors
return null; return null;
} }
private class QueuedAssetRequest
{
public string uri;
public string id;
}
private OpenMetaverse.BlockingQueue<QueuedAssetRequest> m_requestQueue =
new OpenMetaverse.BlockingQueue<QueuedAssetRequest>();
private void AssetRequestProcessor()
{
QueuedAssetRequest r;
while (true)
{
r = m_requestQueue.Dequeue();
string uri = r.uri;
string id = r.id;
bool success = false;
try
{
AsynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0,
delegate(AssetBase a)
{
if (m_Cache != null)
m_Cache.Cache(a);
List<AssetRetrievedEx> handlers;
lock (m_AssetHandlers)
{
handlers = m_AssetHandlers[id];
m_AssetHandlers.Remove(id);
}
foreach (AssetRetrievedEx h in handlers)
h.Invoke(a);
if (handlers != null)
handlers.Clear();
}, 30);
success = true;
}
finally
{
if (!success)
{
List<AssetRetrievedEx> handlers;
lock (m_AssetHandlers)
{
handlers = m_AssetHandlers[id];
m_AssetHandlers.Remove(id);
}
if (handlers != null)
handlers.Clear();
}
}
}
}
public bool Get(string id, Object sender, AssetRetrieved handler) public bool Get(string id, Object sender, AssetRetrieved handler)
{ {
string uri = MapServer(id) + "/assets/" + id; string uri = MapServer(id) + "/assets/" + id;
@ -293,52 +364,11 @@ namespace OpenSim.Services.Connectors
m_AssetHandlers.Add(id, handlers); m_AssetHandlers.Add(id, handlers);
} }
bool success = false; QueuedAssetRequest request = new QueuedAssetRequest();
try request.id = id;
{ request.uri = uri;
AsynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0,
delegate(AssetBase a)
{
if (m_Cache != null)
m_Cache.Cache(a);
/*
AssetRetrievedEx handlers;
lock (m_AssetHandlers)
{
handlers = m_AssetHandlers[id];
m_AssetHandlers.Remove(id);
}
handlers.Invoke(a); m_requestQueue.Enqueue(request);
*/
List<AssetRetrievedEx> handlers;
lock (m_AssetHandlers)
{
handlers = m_AssetHandlers[id];
m_AssetHandlers.Remove(id);
}
foreach (AssetRetrievedEx h in handlers)
h.Invoke(a);
if (handlers != null)
handlers.Clear();
}, 30);
success = true;
}
finally
{
if (!success)
{
List<AssetRetrievedEx> handlers;
lock (m_AssetHandlers)
{
handlers = m_AssetHandlers[id];
m_AssetHandlers.Remove(id);
}
if (handlers != null)
handlers.Clear();
}
}
} }
else else
{ {