When processing incoming attachments via HG, if a request for uuid gathering or final asset import takes too long remove remaining requests from same user to prevent hold up of other user's incoming attachments.

This improves upon the earlier naive simply queueing immplementation.
Threshold is 30 seconds.  If this happens to a user they can relog and fetch will be reattempted.
mb-throttle-test
Justin Clark-Casey (justincc) 2014-11-06 01:15:22 +00:00
parent 06a5d6e9ef
commit aeae34505f
4 changed files with 121 additions and 29 deletions

View File

@ -546,6 +546,31 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
}
}
private void RemoveIncomingSceneObjectJobs(string commonIdToRemove)
{
List<Job> jobsToReinsert = new List<Job>();
int jobsRemoved = 0;
Job job;
while ((job = m_incomingSceneObjectEngine.RemoveNextRequest()) != null)
{
if (job.CommonId != commonIdToRemove)
jobsToReinsert.Add(job);
else
jobsRemoved++;
}
m_log.DebugFormat(
"[HG ENTITY TRANSFER]: Removing {0} jobs with common ID {1} and reinserting {2} other jobs",
jobsRemoved, commonIdToRemove, jobsToReinsert.Count);
if (jobsToReinsert.Count > 0)
{
foreach (Job jobToReinsert in jobsToReinsert)
m_incomingSceneObjectEngine.QueueRequest(jobToReinsert);
}
}
public override bool HandleIncomingSceneObject(SceneObjectGroup so, Vector3 newPosition)
{
// FIXME: We must make it so that we can use SOG.IsAttachment here. At the moment it is always null!
@ -564,38 +589,73 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
{
m_incomingSceneObjectEngine.QueueRequest(
string.Format("HG UUID Gather for attachment {0} for {1}", so.Name, aCircuit.Name),
so.OwnerID.ToString(),
o =>
{
string url = aCircuit.ServiceURLs["AssetServerURI"].ToString();
m_log.DebugFormat(
"[HG ENTITY TRANSFER MODULE]: Incoming attachment {0} for HG user {1} with asset server {2}",
so.Name, so.AttachedAvatar, url);
// m_log.DebugFormat(
// "[HG ENTITY TRANSFER MODULE]: Incoming attachment {0} for HG user {1} with asset service {2}",
// so.Name, so.AttachedAvatar, url);
IteratingHGUuidGatherer uuidGatherer = new IteratingHGUuidGatherer(Scene.AssetService, url);
uuidGatherer.RecordAssetUuids(so);
// XXX: We will shortly use this iterating mechanism to check if a fetch is taking too long
// but just for now we will simply fetch everything. If this was permanent could use
// GatherAll()
while (uuidGatherer.GatherNext())
m_log.DebugFormat(
"[HG ENTITY TRANSFER]: Gathered attachment {0} for HG user {1} with asset server {2}",
so.Name, so.OwnerID, url);
while (!uuidGatherer.Complete)
{
int tickStart = Util.EnvironmentTickCount();
UUID? nextUuid = uuidGatherer.NextUuidToInspect;
uuidGatherer.GatherNext();
// m_log.DebugFormat(
// "[HG ENTITY TRANSFER]: Gathered attachment asset uuid {0} for object {1} for HG user {2} took {3} ms with asset service {4}",
// nextUuid, so.Name, so.OwnerID, Util.EnvironmentTickCountSubtract(tickStart), url);
int ticksElapsed = Util.EnvironmentTickCountSubtract(tickStart);
if (ticksElapsed > 30000)
{
m_log.WarnFormat(
"[HG ENTITY TRANSFER]: Removing incoming scene object jobs for HG user {0} as gather of {1} from {2} took {3} ms to respond (> {4} ms)",
so.OwnerID, so.Name, url, ticksElapsed, 30000);
RemoveIncomingSceneObjectJobs(so.OwnerID.ToString());
return;
}
}
IDictionary<UUID, sbyte> ids = uuidGatherer.GetGatheredUuids();
m_log.DebugFormat(
"[HG ENTITY TRANSFER]: Fetching {0} assets for attachment {1} for HG user {2} with asset server {3}",
ids.Count, so.Name, so.OwnerID, url);
// m_log.DebugFormat(
// "[HG ENTITY TRANSFER]: Fetching {0} assets for attachment {1} for HG user {2} with asset service {3}",
// ids.Count, so.Name, so.OwnerID, url);
foreach (KeyValuePair<UUID, sbyte> kvp in ids)
{
int tickStart = Util.EnvironmentTickCount();
uuidGatherer.FetchAsset(kvp.Key);
int ticksElapsed = Util.EnvironmentTickCountSubtract(tickStart);
if (ticksElapsed > 30000)
{
m_log.WarnFormat(
"[HG ENTITY TRANSFER]: Removing incoming scene object jobs for HG user {0} as fetch of {1} from {2} took {3} ms to respond (> {4} ms)",
so.OwnerID, kvp.Key, url, ticksElapsed, 30000);
RemoveIncomingSceneObjectJobs(so.OwnerID.ToString());
return;
}
}
base.HandleIncomingSceneObject(so, newPosition);
m_log.DebugFormat(
"[HG ENTITY TRANSFER MODULE]: Completed incoming attachment {0} for HG user {1} with asset server {2}",
so.Name, so.OwnerID, url);
// m_log.DebugFormat(
// "[HG ENTITY TRANSFER MODULE]: Completed incoming attachment {0} for HG user {1} with asset server {2}",
// so.Name, so.OwnerID, url);
},
null);
}

View File

@ -38,13 +38,15 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
{
public class Job
{
public string Name;
public WaitCallback Callback;
public object O;
public string Name { get; private set; }
public string CommonId { get; private set; }
public WaitCallback Callback { get; private set; }
public object O { get; private set; }
public Job(string name, WaitCallback callback, object o)
public Job(string name, string commonId, WaitCallback callback, object o)
{
Name = name;
CommonId = commonId;
Callback = callback;
O = o;
}
@ -90,6 +92,7 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
public HGIncomingSceneObjectEngine(string name)
{
// LogLevel = 1;
Name = name;
RequestProcessTimeoutOnStop = 5000;
@ -192,10 +195,24 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
}
}
public bool QueueRequest(string name, WaitCallback req, object o)
public Job RemoveNextRequest()
{
Job nextRequest;
m_requestQueue.TryTake(out nextRequest);
return nextRequest;
}
public bool QueueRequest(string name, string commonId, WaitCallback req, object o)
{
return QueueRequest(new Job(name, commonId, req, o));
}
public bool QueueRequest(Job job)
{
if (LogLevel >= 1)
m_log.DebugFormat("[HG INCOMING SCENE OBJECT ENGINE]: Queued job {0}", name);
m_log.DebugFormat(
"[HG INCOMING SCENE OBJECT ENGINE]: Queued job {0}, common ID {1}", job.Name, job.CommonId);
if (m_requestQueue.Count < m_requestQueue.BoundedCapacity)
{
@ -203,7 +220,7 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer
// "[OUTGOING QUEUE REFILL ENGINE]: Adding request for categories {0} for {1} in {2}",
// categories, client.AgentID, m_udpServer.Scene.Name);
m_requestQueue.Add(new Job(name, req, o));
m_requestQueue.Add(job);
if (!m_warnOverMaxQueue)
m_warnOverMaxQueue = true;

View File

@ -655,7 +655,22 @@ namespace OpenSim.Region.Framework.Scenes
/// <summary>
/// Is gathering complete?
/// </summary>
public bool GatheringComplete { get { return m_assetUuidsToInspect.Count <= 0; } }
public bool Complete { get { return m_assetUuidsToInspect.Count <= 0; } }
/// <summary>
/// Gets the next UUID to inspect.
/// </summary>
/// <value>If there is no next UUID then returns null</value>
public UUID? NextUuidToInspect
{
get
{
if (Complete)
return null;
else
return m_assetUuidsToInspect.Peek();
}
}
protected IAssetService m_assetService;
@ -693,7 +708,7 @@ namespace OpenSim.Region.Framework.Scenes
/// <returns>false if gathering is already complete, true otherwise</returns>
public bool GatherNext()
{
if (GatheringComplete)
if (Complete)
return false;
GetAssetUuids(m_assetUuidsToInspect.Dequeue());
@ -707,7 +722,7 @@ namespace OpenSim.Region.Framework.Scenes
/// <returns>false if gathering is already complete, true otherwise</returns>
public bool GatherAll()
{
if (GatheringComplete)
if (Complete)
return false;
while (GatherNext());

View File

@ -100,7 +100,7 @@ namespace OpenSim.Services.HypergridService
return null;
if (asset.Metadata.Type == (sbyte)AssetType.Object)
asset.Data = AdjustIdentifiers(asset.Data); ;
asset.Data = AdjustIdentifiers(asset.Data);
AdjustIdentifiers(asset.Metadata);