make oar/iar assets writer be done by caller thread like the rest of the

oar/iar save. This may look more painfull but should reduce errors and
threads fights. Fill lucky i just don't stop the entire simulation during
this
0.9.0-post-fixes
UbitUmarov 2017-06-21 18:39:58 +01:00
parent 70da902732
commit 637d35631c
3 changed files with 55 additions and 181 deletions

View File

@ -423,14 +423,12 @@ namespace OpenSim.Region.CoreModules.Avatar.Inventory.Archiver
m_log.DebugFormat(
"[INVENTORY ARCHIVER]: Saving {0} assets for items", m_assetGatherer.GatheredUuids.Count);
AssetsRequest ar
= new AssetsRequest(
AssetsRequest ar = new AssetsRequest(
new AssetsArchiver(m_archiveWriter),
m_assetGatherer.GatheredUuids, m_scene.AssetService,
m_scene.UserAccountService, m_scene.RegionInfo.ScopeID,
options, ReceivedAllAssets);
WorkManager.RunInThread(o => ar.Execute(), null, string.Format("AssetsRequest ({0})", m_scene.Name));
ar.Execute();
}
else
{

View File

@ -195,14 +195,13 @@ namespace OpenSim.Region.CoreModules.World.Archiver
m_log.DebugFormat("[ARCHIVER]: Saving {0} assets", assetUuids.Count);
// Asynchronously request all the assets required to perform this archive operation
AssetsRequest ar
= new AssetsRequest(
AssetsRequest ar = new AssetsRequest(
new AssetsArchiver(m_archiveWriter), assetUuids,
m_rootScene.AssetService, m_rootScene.UserAccountService,
m_rootScene.RegionInfo.ScopeID, options, ReceivedAllAssets);
WorkManager.RunInThread(o => ar.Execute(), null, "Archive Assets Request");
// WorkManager.RunInThread(o => ar.Execute(), null, "Archive Assets Request");
ar.Execute();
// CloseArchive() will be called from ReceivedAllAssets()
}
else

View File

@ -61,24 +61,6 @@ namespace OpenSim.Region.CoreModules.World.Archiver
Aborted
};
/// <value>
/// Timeout threshold if we still need assets or missing asset notifications but have stopped receiving them
/// from the asset service
/// </value>
protected const int TIMEOUT = 60 * 1000;
/// <value>
/// If a timeout does occur, limit the amount of UUID information put to the console.
/// </value>
protected const int MAX_UUID_DISPLAY_ON_TIMEOUT = 3;
protected System.Timers.Timer m_requestCallbackTimer;
/// <value>
/// State of this request
/// </value>
private RequestState m_requestState = RequestState.Initial;
/// <value>
/// uuids to request
/// </value>
@ -104,6 +86,9 @@ namespace OpenSim.Region.CoreModules.World.Archiver
/// </value>
private int m_repliesRequired;
private System.Timers.Timer m_timeOutTimer;
private bool m_timeout;
/// <value>
/// Asset service used to request the assets
/// </value>
@ -129,186 +114,76 @@ namespace OpenSim.Region.CoreModules.World.Archiver
m_scopeID = scope;
m_options = options;
m_repliesRequired = uuids.Count;
// FIXME: This is a really poor way of handling the timeout since it will always leave the original requesting thread
// hanging. Need to restructure so an original request thread waits for a ManualResetEvent on asset received
// so we can properly abort that thread. Or request all assets synchronously, though that would be a more
// radical change
m_requestCallbackTimer = new System.Timers.Timer(TIMEOUT);
m_requestCallbackTimer.AutoReset = false;
m_requestCallbackTimer.Elapsed += new ElapsedEventHandler(OnRequestCallbackTimeout);
}
protected internal void Execute()
{
m_requestState = RequestState.Running;
Culture.SetCurrentCulture();
m_log.DebugFormat("[ARCHIVER]: AssetsRequest executed looking for {0} possible assets", m_repliesRequired);
// We can stop here if there are no assets to fetch
if (m_repliesRequired == 0)
{
m_requestState = RequestState.Completed;
PerformAssetsRequestCallback(false);
return;
}
m_requestCallbackTimer.Enabled = true;
m_timeOutTimer = new System.Timers.Timer(60000);
m_timeOutTimer .AutoReset = false;
m_timeOutTimer.Elapsed += OnTimeout;
m_timeout = false;
foreach (KeyValuePair<UUID, sbyte> kvp in m_uuids)
{
// m_log.DebugFormat("[ARCHIVER]: Requesting asset {0}", kvp.Key);
// m_assetService.Get(kvp.Key.ToString(), kvp.Value, PreAssetRequestCallback);
AssetBase asset = m_assetService.Get(kvp.Key.ToString());
PreAssetRequestCallback(kvp.Key.ToString(), kvp.Value, asset);
}
}
protected void OnRequestCallbackTimeout(object source, ElapsedEventArgs args)
{
bool timedOut = true;
try
{
lock (this)
string thiskey = kvp.Key.ToString();
try
{
// Take care of the possibilty that this thread started but was paused just outside the lock before
// the final request came in (assuming that such a thing is possible)
if (m_requestState == RequestState.Completed)
{
timedOut = false;
return;
}
m_requestState = RequestState.Aborted;
}
// Calculate which uuids were not found. This is an expensive way of doing it, but this is a failure
// case anyway.
List<UUID> uuids = new List<UUID>();
foreach (UUID uuid in m_uuids.Keys)
{
uuids.Add(uuid);
}
foreach (UUID uuid in m_foundAssetUuids)
{
uuids.Remove(uuid);
}
foreach (UUID uuid in m_notFoundAssetUuids)
{
uuids.Remove(uuid);
}
m_log.ErrorFormat(
"[ARCHIVER]: Asset service failed to return information about {0} requested assets", uuids.Count);
int i = 0;
foreach (UUID uuid in uuids)
{
m_log.ErrorFormat("[ARCHIVER]: No information about asset {0} received", uuid);
if (++i >= MAX_UUID_DISPLAY_ON_TIMEOUT)
m_timeOutTimer.Enabled = true;
AssetBase asset = m_assetService.Get(thiskey);
if(m_timeout)
break;
m_timeOutTimer.Enabled = false;
if(asset == null)
{
m_notFoundAssetUuids.Add(new UUID(thiskey));
continue;
}
sbyte assetType = kvp.Value;
if (asset != null && assetType == (sbyte)AssetType.Unknown)
{
m_log.InfoFormat("[ARCHIVER]: Rewriting broken asset type for {0} to {1}", thiskey, SLUtil.AssetTypeFromCode(assetType));
asset.Type = assetType;
}
m_foundAssetUuids.Add(asset.FullID);
m_assetsArchiver.WriteAsset(PostProcess(asset));
}
if (uuids.Count > MAX_UUID_DISPLAY_ON_TIMEOUT)
m_log.ErrorFormat(
"[ARCHIVER]: (... {0} more not shown)", uuids.Count - MAX_UUID_DISPLAY_ON_TIMEOUT);
m_log.Error("[ARCHIVER]: Archive save aborted. PLEASE DO NOT USE THIS ARCHIVE, IT WILL BE INCOMPLETE.");
}
catch (Exception e)
{
m_log.ErrorFormat("[ARCHIVER]: Timeout handler exception {0}{1}", e.Message, e.StackTrace);
}
finally
{
if (timedOut)
WorkManager.RunInThread(PerformAssetsRequestCallback, true, "Archive Assets Request Callback");
}
}
protected void PreAssetRequestCallback(string fetchedAssetID, object assetType, AssetBase fetchedAsset)
{
// Check for broken asset types and fix them with the AssetType gleaned by UuidGatherer
if (fetchedAsset != null && fetchedAsset.Type == (sbyte)AssetType.Unknown)
{
m_log.InfoFormat("[ARCHIVER]: Rewriting broken asset type for {0} to {1}", fetchedAsset.ID, SLUtil.AssetTypeFromCode((sbyte)assetType));
fetchedAsset.Type = (sbyte)assetType;
}
AssetRequestCallback(fetchedAssetID, this, fetchedAsset);
}
/// <summary>
/// Called back by the asset cache when it has the asset
/// </summary>
/// <param name="assetID"></param>
/// <param name="asset"></param>
public void AssetRequestCallback(string id, object sender, AssetBase asset)
{
Culture.SetCurrentCulture();
try
{
lock (this)
catch (Exception e)
{
//m_log.DebugFormat("[ARCHIVER]: Received callback for asset {0}", id);
m_log.ErrorFormat("[ARCHIVER]: Execute failed with {0}", e);
}
}
m_requestCallbackTimer.Stop();
m_timeOutTimer.Dispose();
if ((m_requestState == RequestState.Aborted) || (m_requestState == RequestState.Completed))
{
m_log.WarnFormat(
"[ARCHIVER]: Received information about asset {0} while in state {1}. Ignoring.",
id, m_requestState);
return;
}
if (asset != null)
{
// m_log.DebugFormat("[ARCHIVER]: Writing asset {0}", id);
m_foundAssetUuids.Add(asset.FullID);
m_assetsArchiver.WriteAsset(PostProcess(asset));
}
else
{
// m_log.DebugFormat("[ARCHIVER]: Recording asset {0} as not found", id);
m_notFoundAssetUuids.Add(new UUID(id));
}
if (m_foundAssetUuids.Count + m_notFoundAssetUuids.Count >= m_repliesRequired)
{
m_requestState = RequestState.Completed;
if(m_notFoundAssetUuids.Count == 0)
m_log.DebugFormat(
"[ARCHIVER]: Successfully added {0} assets",
m_foundAssetUuids.Count);
else
m_log.DebugFormat(
"[ARCHIVER]: Successfully added {0} assets ({1} assets not found but these may be expected invalid references)",
if(m_timeout)
m_log.DebugFormat("[ARCHIVER]: Aborted because AssetService request timeout. Successfully added {0} assets", m_foundAssetUuids.Count);
else if(m_notFoundAssetUuids.Count == 0)
m_log.DebugFormat("[ARCHIVER]: Successfully added all {0} assets", m_foundAssetUuids.Count);
else
m_log.DebugFormat("[ARCHIVER]: Successfully added {0} assets ({1} assets not found)",
m_foundAssetUuids.Count, m_notFoundAssetUuids.Count);
// We want to stop using the asset cache thread asap
// as we now need to do the work of producing the rest of the archive
WorkManager.RunInThread(PerformAssetsRequestCallback, false, "Archive Assets Request Callback");
}
else
{
m_requestCallbackTimer.Start();
}
}
}
catch (Exception e)
{
m_log.ErrorFormat("[ARCHIVER]: AssetRequestCallback failed with {0}", e);
}
PerformAssetsRequestCallback(m_timeout);
}
void OnTimeout(object source, ElapsedEventArgs args)
{
m_timeout = true;
}
/// <summary>
@ -316,6 +191,8 @@ namespace OpenSim.Region.CoreModules.World.Archiver
/// </summary>
protected void PerformAssetsRequestCallback(object o)
{
if(m_assetsRequestCallback == null)
return;
Culture.SetCurrentCulture();
Boolean timedOut = (Boolean)o;