assets uploads to grid. Change the retry code. Stop using asset.UploadAttempts field that should be removed

LSLKeyTest
UbitUmarov 2016-09-01 12:45:09 +01:00
parent 2a0df34087
commit 924aaedfce
2 changed files with 127 additions and 95 deletions

View File

@ -125,20 +125,14 @@ namespace OpenSim.Data.Tests
m_db.StoreAsset(a1); m_db.StoreAsset(a1);
m_db.StoreAsset(a2); m_db.StoreAsset(a2);
m_db.StoreAsset(a3); m_db.StoreAsset(a3);
a1.UploadAttempts = 0;
a2.UploadAttempts = 0;
a3.UploadAttempts = 0;
AssetBase a1a = m_db.GetAsset(uuid1); AssetBase a1a = m_db.GetAsset(uuid1);
a1a.UploadAttempts = 0;
Assert.That(a1a, Constraints.PropertyCompareConstraint(a1)); Assert.That(a1a, Constraints.PropertyCompareConstraint(a1));
AssetBase a2a = m_db.GetAsset(uuid2); AssetBase a2a = m_db.GetAsset(uuid2);
a2a.UploadAttempts = 0;
Assert.That(a2a, Constraints.PropertyCompareConstraint(a2)); Assert.That(a2a, Constraints.PropertyCompareConstraint(a2));
AssetBase a3a = m_db.GetAsset(uuid3); AssetBase a3a = m_db.GetAsset(uuid3);
a3a.UploadAttempts = 0;
Assert.That(a3a, Constraints.PropertyCompareConstraint(a3)); Assert.That(a3a, Constraints.PropertyCompareConstraint(a3));
scrambler.Scramble(a1a); scrambler.Scramble(a1a);
@ -148,20 +142,14 @@ namespace OpenSim.Data.Tests
m_db.StoreAsset(a1a); m_db.StoreAsset(a1a);
m_db.StoreAsset(a2a); m_db.StoreAsset(a2a);
m_db.StoreAsset(a3a); m_db.StoreAsset(a3a);
a1a.UploadAttempts = 0;
a2a.UploadAttempts = 0;
a3a.UploadAttempts = 0;
AssetBase a1b = m_db.GetAsset(uuid1); AssetBase a1b = m_db.GetAsset(uuid1);
a1b.UploadAttempts = 0;
Assert.That(a1b, Constraints.PropertyCompareConstraint(a1a)); Assert.That(a1b, Constraints.PropertyCompareConstraint(a1a));
AssetBase a2b = m_db.GetAsset(uuid2); AssetBase a2b = m_db.GetAsset(uuid2);
a2b.UploadAttempts = 0;
Assert.That(a2b, Constraints.PropertyCompareConstraint(a2a)); Assert.That(a2b, Constraints.PropertyCompareConstraint(a2a));
AssetBase a3b = m_db.GetAsset(uuid3); AssetBase a3b = m_db.GetAsset(uuid3);
a3b.UploadAttempts = 0;
Assert.That(a3b, Constraints.PropertyCompareConstraint(a3a)); Assert.That(a3b, Constraints.PropertyCompareConstraint(a3a));
bool[] exist = m_db.AssetsExist(new[] { uuid1, uuid2, uuid3 }); bool[] exist = m_db.AssetsExist(new[] { uuid1, uuid2, uuid3 });
@ -202,22 +190,16 @@ namespace OpenSim.Data.Tests
a3.Data = data1; a3.Data = data1;
m_db.StoreAsset(a1); m_db.StoreAsset(a1);
a1.UploadAttempts = 0;
m_db.StoreAsset(a2); m_db.StoreAsset(a2);
a2.UploadAttempts = 0;
m_db.StoreAsset(a3); m_db.StoreAsset(a3);
a3.UploadAttempts = 0;
AssetBase a1a = m_db.GetAsset(uuid1); AssetBase a1a = m_db.GetAsset(uuid1);
a1a.UploadAttempts = 0;
Assert.That(a1a, Constraints.PropertyCompareConstraint(a1)); Assert.That(a1a, Constraints.PropertyCompareConstraint(a1));
AssetBase a2a = m_db.GetAsset(uuid2); AssetBase a2a = m_db.GetAsset(uuid2);
a2a.UploadAttempts = 0;
Assert.That(a2a, Constraints.PropertyCompareConstraint(a2)); Assert.That(a2a, Constraints.PropertyCompareConstraint(a2));
AssetBase a3a = m_db.GetAsset(uuid3); AssetBase a3a = m_db.GetAsset(uuid3);
a3a.UploadAttempts = 0;
Assert.That(a3a, Constraints.PropertyCompareConstraint(a3)); Assert.That(a3a, Constraints.PropertyCompareConstraint(a3));
} }
} }

View File

@ -46,10 +46,12 @@ namespace OpenSim.Services.Connectors
LogManager.GetLogger( LogManager.GetLogger(
MethodBase.GetCurrentMethod().DeclaringType); MethodBase.GetCurrentMethod().DeclaringType);
// const int MAXSENDRETRIESLEN = 30;
const int MAXSENDRETRIESLEN = 2;
private string m_ServerURI = String.Empty; private string m_ServerURI = String.Empty;
private IImprovedAssetCache m_Cache = null; private IImprovedAssetCache m_Cache = null;
private int m_retryCounter; private int m_retryCounter;
private Dictionary<int, List<AssetBase>> m_retryQueue = new Dictionary<int, List<AssetBase>>(); private List<AssetBase>[] m_sendRetries = new List<AssetBase>[MAXSENDRETRIESLEN];
private System.Timers.Timer m_retryTimer; private System.Timers.Timer m_retryTimer;
private int m_maxAssetRequestConcurrency = 30; private int m_maxAssetRequestConcurrency = 30;
@ -110,9 +112,9 @@ namespace OpenSim.Services.Connectors
throw new Exception("Asset connector init error"); throw new Exception("Asset connector init error");
} }
m_retryTimer = new System.Timers.Timer(); m_retryTimer = new System.Timers.Timer();
m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck); m_retryTimer.Elapsed += new ElapsedEventHandler(retryCheck);
m_retryTimer.AutoReset = false;
m_retryTimer.Interval = 60000; m_retryTimer.Interval = 60000;
Uri serverUri = new Uri(m_ServerURI); Uri serverUri = new Uri(m_ServerURI);
@ -167,47 +169,57 @@ namespace OpenSim.Services.Connectors
protected void retryCheck(object source, ElapsedEventArgs e) protected void retryCheck(object source, ElapsedEventArgs e)
{ {
m_retryCounter++; m_retryCounter++;
if (m_retryCounter > 60) if(m_retryCounter >= 61 ) // avoid overflow 60 is max in use below
m_retryCounter -= 60; m_retryCounter = 1;
List<int> keys = new List<int>(); int inUse = 0;
foreach (int a in m_retryQueue.Keys) int nextlevel;
{ int timefactor;
keys.Add(a); List<AssetBase> retrylist;
} // we need to go down
foreach (int a in keys) for(int i = MAXSENDRETRIESLEN - 1; i >= 0; i--)
{ {
lock(m_sendRetries)
retrylist = m_sendRetries[i];
if(retrylist == null)
continue;
inUse++;
nextlevel = i + 1;
//We exponentially fall back on frequency until we reach one attempt per hour //We exponentially fall back on frequency until we reach one attempt per hour
//The net result is that we end up in the queue for roughly 24 hours.. //The net result is that we end up in the queue for roughly 24 hours..
//24 hours worth of assets could be a lot, so the hope is that the region admin //24 hours worth of assets could be a lot, so the hope is that the region admin
//will have gotten the asset connector back online quickly! //will have gotten the asset connector back online quickly!
if(i == 0)
int timefactor = a ^ 2; timefactor = 1;
if (timefactor > 60) else
{ {
timefactor = 1 << nextlevel;
if (timefactor > 60)
timefactor = 60; timefactor = 60;
} }
//First, find out if we care about this timefactor if(m_retryCounter < timefactor)
if (timefactor % a == 0) continue; // to update inUse;
{
//Yes, we do!
List<AssetBase> retrylist = m_retryQueue[a];
m_retryQueue.Remove(a);
if (m_retryCounter % timefactor != 0)
continue;
// a list to retry
lock(m_sendRetries)
m_sendRetries[i] = null;
// we are the only ones with a copy of this retrylist now
foreach(AssetBase ass in retrylist) foreach(AssetBase ass in retrylist)
{ retryStore(ass, nextlevel);
Store(ass); //Store my ass. This function will put it back in the dictionary if it fails
}
}
} }
if (m_retryQueue.Count == 0) lock(m_sendRetries)
{ {
//It might only be one tick per minute, but I have if(inUse > 0 && !m_retryTimer.Enabled)
//repented and abandoned my wasteful ways m_retryTimer.Start();
m_retryCounter = 0;
m_retryTimer.Stop();
} }
} }
@ -237,7 +249,8 @@ namespace OpenSim.Services.Connectors
asset = SynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, m_Auth); asset = SynchronousRestObjectRequester.MakeRequest<int, AssetBase>("GET", uri, 0, m_Auth);
if (m_Cache != null)
if (asset != null && m_Cache != null)
m_Cache.Cache(asset); m_Cache.Cache(asset);
} }
return asset; return asset;
@ -340,25 +353,18 @@ namespace OpenSim.Services.Connectors
m_AssetHandlers.Remove(id); m_AssetHandlers.Remove(id);
} }
if(handlers != null)
{
Util.FireAndForget(x => Util.FireAndForget(x =>
{ {
foreach (AssetRetrievedEx h in handlers) foreach (AssetRetrievedEx h in handlers)
{ {
// Util.FireAndForget(x =>
// {
try { h.Invoke(a); } try { h.Invoke(a); }
catch { } catch { }
// });
} }
if (handlers != null)
handlers.Clear(); handlers.Clear();
}); });
}
// if (handlers != null)
// handlers.Clear();
success = true; success = true;
} }
} }
@ -393,30 +399,25 @@ namespace OpenSim.Services.Connectors
{ {
AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); }); AssetRetrievedEx handlerEx = new AssetRetrievedEx(delegate(AssetBase _asset) { handler(id, sender, _asset); });
// AssetRetrievedEx handlers;
List<AssetRetrievedEx> handlers; List<AssetRetrievedEx> handlers;
if (m_AssetHandlers.TryGetValue(id, out handlers)) if (m_AssetHandlers.TryGetValue(id, out handlers))
{ {
// Someone else is already loading this asset. It will notify our handler when done. // Someone else is already loading this asset. It will notify our handler when done.
// handlers += handlerEx;
handlers.Add(handlerEx); handlers.Add(handlerEx);
return true; return true;
} }
// Load the asset ourselves
// handlers += handlerEx;
handlers = new List<AssetRetrievedEx>(); handlers = new List<AssetRetrievedEx>();
handlers.Add(handlerEx); handlers.Add(handlerEx);
m_AssetHandlers.Add(id, handlers); m_AssetHandlers.Add(id, handlers);
}
QueuedAssetRequest request = new QueuedAssetRequest(); QueuedAssetRequest request = new QueuedAssetRequest();
request.id = id; request.id = id;
request.uri = uri; request.uri = uri;
m_requestQueue.Enqueue(request); m_requestQueue.Enqueue(request);
} }
}
else else
{ {
handler(id, sender, asset); handler(id, sender, asset);
@ -495,43 +496,35 @@ namespace OpenSim.Services.Connectors
newID = SynchronousRestObjectRequester. newID = SynchronousRestObjectRequester.
MakeRequest<AssetBase, string>("POST", uri, asset, 100000, m_Auth); MakeRequest<AssetBase, string>("POST", uri, asset, 100000, m_Auth);
} }
catch {} catch
{
newID = null;
}
if (newID == null || newID == String.Empty || newID == stringUUIDZero) if (newID == null || newID == String.Empty || newID == stringUUIDZero)
{ {
//The asset upload failed, put it in a queue for later //The asset upload failed, try later
asset.UploadAttempts++; lock(m_sendRetries)
if (asset.UploadAttempts > 30)
{ {
//By this stage we've been in the queue for a good few hours; if (m_sendRetries[0] == null)
//We're going to drop the asset. m_sendRetries[0] = new List<AssetBase>();
m_log.ErrorFormat("[Assets] Dropping asset {0} - Upload has been in the queue for too long.", asset.ID.ToString()); List<AssetBase> m_queue = m_sendRetries[0];
}
else
{
if (!m_retryQueue.ContainsKey(asset.UploadAttempts))
{
m_retryQueue.Add(asset.UploadAttempts, new List<AssetBase>());
}
List<AssetBase> m_queue = m_retryQueue[asset.UploadAttempts];
m_queue.Add(asset); m_queue.Add(asset);
m_log.WarnFormat("[Assets] Upload failed: {0} - Requeuing asset for another run.", asset.ID.ToString()); m_log.WarnFormat("[Assets] Upload failed: {0} type {1} will retry later",
asset.ID.ToString(), asset.Type.ToString());
if(!m_retryTimer.Enabled)
m_retryTimer.Start(); m_retryTimer.Start();
} }
} }
else else
{ {
if (asset.UploadAttempts > 0)
{
m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), asset.UploadAttempts.ToString());
}
if (newID != asset.ID) if (newID != asset.ID)
{ {
// Placing this here, so that this work with old asset servers that don't send any reply back // Placing this here, so that this work with old asset servers that don't send any reply back
// SynchronousRestObjectRequester returns somethins that is not an empty string // SynchronousRestObjectRequester returns somethins that is not an empty string
asset.ID = newID; asset.ID = newID;
// what about FullID ????
if (m_Cache != null) if (m_Cache != null)
m_Cache.Cache(asset); m_Cache.Cache(asset);
} }
@ -539,6 +532,62 @@ namespace OpenSim.Services.Connectors
return asset.ID; return asset.ID;
} }
public void retryStore(AssetBase asset, int nextRetryLevel)
{
/* this may be bad, so excluding
if (m_Cache != null && !m_Cache.Check(asset.ID))
{
m_log.WarnFormat("[Assets] Upload giveup asset bc no longer in local cache: {0}",
asset.ID.ToString();
return; // if no longer in cache, it was deleted or expired
}
*/
string uri = MapServer(asset.FullID.ToString()) + "/assets/";
string newID = null;
try
{
newID = SynchronousRestObjectRequester.
MakeRequest<AssetBase, string>("POST", uri, asset, 100000, m_Auth);
}
catch
{
newID = null;
}
if (newID == null || newID == String.Empty || newID == stringUUIDZero)
{
if(nextRetryLevel >= MAXSENDRETRIESLEN)
m_log.WarnFormat("[Assets] Upload giveup after several retries id: {0} type {1}",
asset.ID.ToString(), asset.Type.ToString());
else
{
lock(m_sendRetries)
{
if (m_sendRetries[nextRetryLevel] == null)
{
m_sendRetries[nextRetryLevel] = new List<AssetBase>();
}
List<AssetBase> m_queue = m_sendRetries[nextRetryLevel];
m_queue.Add(asset);
m_log.WarnFormat("[Assets] Upload failed: {0} type {1} will retry later",
asset.ID.ToString(), asset.Type.ToString());
}
}
}
else
{
m_log.InfoFormat("[Assets] Upload of {0} succeeded after {1} failed attempts", asset.ID.ToString(), nextRetryLevel.ToString());
if (newID != asset.ID)
{
asset.ID = newID;
if (m_Cache != null)
m_Cache.Cache(asset);
}
}
}
public bool UpdateContent(string id, byte[] data) public bool UpdateContent(string id, byte[] data)
{ {
AssetBase asset = null; AssetBase asset = null;
@ -569,6 +618,7 @@ namespace OpenSim.Services.Connectors
return false; return false;
} }
public bool Delete(string id) public bool Delete(string id)
{ {
string uri = MapServer(id) + "/assets/" + id; string uri = MapServer(id) + "/assets/" + id;