Implements adaptive queue management and fair queueing for

improved networking performance.

Reprioritization algorithms need to be ported still. One is
in place.
bulletsim
Mic Bowman 2011-03-28 10:00:53 -07:00
parent 5a61c76455
commit 61371c7c16
2 changed files with 307 additions and 123 deletions

View File

@ -49,6 +49,8 @@ using Timer = System.Timers.Timer;
using AssetLandmark = OpenSim.Framework.AssetLandmark;
using Nini.Config;
using System.IO;
namespace OpenSim.Region.ClientStack.LindenUDP
{
public delegate bool PacketMethod(IClientAPI simClient, Packet packet);
@ -298,6 +300,77 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// <summary>Used to adjust Sun Orbit values so Linden based viewers properly position sun</summary>
private const float m_sunPainDaHalfOrbitalCutoff = 4.712388980384689858f;
// First log file or time has expired, start writing to a new log file
//<MIC>
// -----------------------------------------------------------------
// -----------------------------------------------------------------
// THIS IS DEBUGGING CODE & SHOULD BE REMOVED
// -----------------------------------------------------------------
// -----------------------------------------------------------------
public class QueueLogger
{
public Int32 start = 0;
public StreamWriter Log = null;
private Dictionary<UUID,int> m_idMap = new Dictionary<UUID,int>();
public QueueLogger()
{
DateTime now = DateTime.Now;
String fname = String.Format("queue-{0}.log", now.ToString("yyyyMMddHHmmss"));
Log = new StreamWriter(fname);
start = Util.EnvironmentTickCount();
}
public int LookupID(UUID uuid)
{
int localid;
if (! m_idMap.TryGetValue(uuid,out localid))
{
localid = m_idMap.Count + 1;
m_idMap[uuid] = localid;
}
return localid;
}
}
public static QueueLogger QueueLog = null;
// -----------------------------------------------------------------
public void LogAvatarUpdateEvent(UUID client, UUID avatar, Int32 timeinqueue)
{
if (QueueLog == null)
QueueLog = new QueueLogger();
Int32 ticks = Util.EnvironmentTickCountSubtract(QueueLog.start);
lock(QueueLog)
{
int cid = QueueLog.LookupID(client);
int aid = QueueLog.LookupID(avatar);
QueueLog.Log.WriteLine("{0},AU,AV{1:D4},AV{2:D4},{3}",ticks,cid,aid,timeinqueue);
}
}
// -----------------------------------------------------------------
public void LogQueueProcessEvent(UUID client, PriorityQueue queue, uint maxup)
{
if (QueueLog == null)
QueueLog = new QueueLogger();
Int32 ticks = Util.EnvironmentTickCountSubtract(QueueLog.start);
lock(QueueLog)
{
int cid = QueueLog.LookupID(client);
QueueLog.Log.WriteLine("{0},PQ,AV{1:D4},{2},{3}",ticks,cid,maxup,queue.ToString());
}
}
// -----------------------------------------------------------------
// -----------------------------------------------------------------
// -----------------------------------------------------------------
// -----------------------------------------------------------------
//</MIC>
private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
protected static Dictionary<PacketType, PacketMethod> PacketHandlers = new Dictionary<PacketType, PacketMethod>(); //Global/static handlers for all clients
@ -3547,18 +3620,23 @@ namespace OpenSim.Region.ClientStack.LindenUDP
#region Primitive Packet/Data Sending Methods
/// <summary>
/// Generate one of the object update packets based on PrimUpdateFlags
/// and broadcast the packet to clients
/// </summary>
public void SendPrimUpdate(ISceneEntity entity, PrimUpdateFlags updateFlags)
{
double priority = m_prioritizer.GetUpdatePriority(this, entity);
//double priority = m_prioritizer.GetUpdatePriority(this, entity);
uint priority = m_prioritizer.GetUpdatePriority(this, entity);
lock (m_entityUpdates.SyncRoot)
m_entityUpdates.Enqueue(priority, new EntityUpdate(entity, updateFlags, m_scene.TimeDilation), entity.LocalId);
m_entityUpdates.Enqueue(priority, new EntityUpdate(entity, updateFlags, m_scene.TimeDilation));
}
private Int32 m_LastQueueFill = 0;
private uint m_maxUpdates = 0;
private void ProcessEntityUpdates(int maxUpdates)
{
OpenSim.Framework.Lazy<List<ObjectUpdatePacket.ObjectDataBlock>> objectUpdateBlocks = new OpenSim.Framework.Lazy<List<ObjectUpdatePacket.ObjectDataBlock>>();
@ -3566,23 +3644,55 @@ namespace OpenSim.Region.ClientStack.LindenUDP
OpenSim.Framework.Lazy<List<ImprovedTerseObjectUpdatePacket.ObjectDataBlock>> terseUpdateBlocks = new OpenSim.Framework.Lazy<List<ImprovedTerseObjectUpdatePacket.ObjectDataBlock>>();
OpenSim.Framework.Lazy<List<ImprovedTerseObjectUpdatePacket.ObjectDataBlock>> terseAgentUpdateBlocks = new OpenSim.Framework.Lazy<List<ImprovedTerseObjectUpdatePacket.ObjectDataBlock>>();
if (maxUpdates <= 0) maxUpdates = Int32.MaxValue;
if (maxUpdates <= 0)
{
m_maxUpdates = Int32.MaxValue;
}
else
{
if (m_maxUpdates == 0 || m_LastQueueFill == 0)
{
m_maxUpdates = (uint)maxUpdates;
}
else
{
if (Util.EnvironmentTickCountSubtract(m_LastQueueFill) < 200)
m_maxUpdates += 5;
else
m_maxUpdates = m_maxUpdates >> 1;
}
m_maxUpdates = Util.Clamp<uint>(m_maxUpdates,10,500);
}
m_LastQueueFill = Util.EnvironmentTickCount();
int updatesThisCall = 0;
//<MIC>
// DEBUGGING CODE... REMOVE
// LogQueueProcessEvent(this.m_agentId,m_entityUpdates,m_maxUpdates);
//</MIC>
// We must lock for both manipulating the kill record and sending the packet, in order to avoid a race
// condition where a kill can be processed before an out-of-date update for the same object.
lock (m_killRecord)
{
float avgTimeDilation = 1.0f;
EntityUpdate update;
while (updatesThisCall < maxUpdates)
Int32 timeinqueue; // this is just debugging code & can be dropped later
while (updatesThisCall < m_maxUpdates)
{
lock (m_entityUpdates.SyncRoot)
if (!m_entityUpdates.TryDequeue(out update))
if (!m_entityUpdates.TryDequeue(out update, out timeinqueue))
break;
avgTimeDilation += update.TimeDilation;
avgTimeDilation *= 0.5f;
// <MIC>
// DEBUGGING CODE... REMOVE
// if (update.Entity is ScenePresence)
// LogAvatarUpdateEvent(this.m_agentId,update.Entity.UUID,timeinqueue);
// </MIC>
if (update.Entity is SceneObjectPart)
{
SceneObjectPart part = (SceneObjectPart)update.Entity;
@ -3679,36 +3789,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
}
else
{
// if (update.Entity is SceneObjectPart && ((SceneObjectPart)update.Entity).IsAttachment)
// {
// SceneObjectPart sop = (SceneObjectPart)update.Entity;
// string text = sop.Text;
// if (text.IndexOf("\n") >= 0)
// text = text.Remove(text.IndexOf("\n"));
//
// if (m_attachmentsSent.Contains(sop.ParentID))
// {
//// m_log.DebugFormat(
//// "[CLIENT]: Sending full info about attached prim {0} text {1}",
//// sop.LocalId, text);
//
// objectUpdateBlocks.Value.Add(CreatePrimUpdateBlock(sop, this.m_agentId));
//
// m_attachmentsSent.Add(sop.LocalId);
// }
// else
// {
// m_log.DebugFormat(
// "[CLIENT]: Requeueing full update of prim {0} text {1} since we haven't sent its parent {2} yet",
// sop.LocalId, text, sop.ParentID);
//
// m_entityUpdates.Enqueue(double.MaxValue, update, sop.LocalId);
// }
// }
// else
// {
objectUpdateBlocks.Value.Add(CreatePrimUpdateBlock((SceneObjectPart)update.Entity, this.m_agentId));
// }
}
}
else if (!canUseImproved)
@ -3802,26 +3883,24 @@ namespace OpenSim.Region.ClientStack.LindenUDP
public void ReprioritizeUpdates()
{
//m_log.Debug("[CLIENT]: Reprioritizing prim updates for " + m_firstName + " " + m_lastName);
lock (m_entityUpdates.SyncRoot)
m_entityUpdates.Reprioritize(UpdatePriorityHandler);
}
private bool UpdatePriorityHandler(ref double priority, uint localID)
private bool UpdatePriorityHandler(ref uint priority, ISceneEntity entity)
{
EntityBase entity;
if (m_scene.Entities.TryGetValue(localID, out entity))
if (entity != null)
{
priority = m_prioritizer.GetUpdatePriority(this, entity);
return true;
}
return priority != double.NaN;
return false;
}
public void FlushPrimUpdates()
{
m_log.Debug("[CLIENT]: Flushing prim updates to " + m_firstName + " " + m_lastName);
m_log.WarnFormat("[CLIENT]: Flushing prim updates to " + m_firstName + " " + m_lastName);
while (m_entityUpdates.Count > 0)
ProcessEntityUpdates(-1);
@ -11704,86 +11783,85 @@ namespace OpenSim.Region.ClientStack.LindenUDP
#region PriorityQueue
public class PriorityQueue
{
internal delegate bool UpdatePriorityHandler(ref double priority, uint local_id);
internal delegate bool UpdatePriorityHandler(ref uint priority, ISceneEntity entity);
private MinHeap<MinHeapItem>[] m_heaps = new MinHeap<MinHeapItem>[1];
// Heap[0] for self updates
// Heap[1..12] for entity updates
internal const uint m_numberOfQueues = 12;
private MinHeap<MinHeapItem>[] m_heaps = new MinHeap<MinHeapItem>[m_numberOfQueues];
private Dictionary<uint, LookupItem> m_lookupTable;
private Comparison<double> m_comparison;
private object m_syncRoot = new object();
private uint m_nextQueue = 0;
private UInt64 m_nextRequest = 0;
internal PriorityQueue() :
this(MinHeap<MinHeapItem>.DEFAULT_CAPACITY, Comparer<double>.Default) { }
internal PriorityQueue(int capacity) :
this(capacity, Comparer<double>.Default) { }
internal PriorityQueue(IComparer<double> comparer) :
this(new Comparison<double>(comparer.Compare)) { }
internal PriorityQueue(Comparison<double> comparison) :
this(MinHeap<MinHeapItem>.DEFAULT_CAPACITY, comparison) { }
internal PriorityQueue(int capacity, IComparer<double> comparer) :
this(capacity, new Comparison<double>(comparer.Compare)) { }
internal PriorityQueue(int capacity, Comparison<double> comparison)
this(MinHeap<MinHeapItem>.DEFAULT_CAPACITY) { }
internal PriorityQueue(int capacity)
{
m_lookupTable = new Dictionary<uint, LookupItem>(capacity);
for (int i = 0; i < m_heaps.Length; ++i)
m_heaps[i] = new MinHeap<MinHeapItem>(capacity);
this.m_comparison = comparison;
}
public object SyncRoot { get { return this.m_syncRoot; } }
internal int Count
{
get
{
int count = 0;
for (int i = 0; i < m_heaps.Length; ++i)
count = m_heaps[i].Count;
count += m_heaps[i].Count;
return count;
}
}
public bool Enqueue(double priority, EntityUpdate value, uint local_id)
public bool Enqueue(uint pqueue, EntityUpdate value)
{
LookupItem item;
LookupItem lookup;
if (m_lookupTable.TryGetValue(local_id, out item))
uint localid = value.Entity.LocalId;
UInt64 entry = m_nextRequest++;
if (m_lookupTable.TryGetValue(localid, out lookup))
{
// Combine flags
value.Flags |= item.Heap[item.Handle].Value.Flags;
item.Heap[item.Handle] = new MinHeapItem(priority, value, local_id, this.m_comparison);
return false;
entry = lookup.Heap[lookup.Handle].EntryOrder;
value.Flags |= lookup.Heap[lookup.Handle].Value.Flags;
lookup.Heap.Remove(lookup.Handle);
}
else
{
item.Heap = m_heaps[0];
item.Heap.Add(new MinHeapItem(priority, value, local_id, this.m_comparison), ref item.Handle);
m_lookupTable.Add(local_id, item);
pqueue = Util.Clamp<uint>(pqueue, 0, m_numberOfQueues - 1);
lookup.Heap = m_heaps[pqueue];
lookup.Heap.Add(new MinHeapItem(pqueue, entry, value), ref lookup.Handle);
m_lookupTable[localid] = lookup;
return true;
}
}
internal EntityUpdate Peek()
internal bool TryDequeue(out EntityUpdate value, out Int32 timeinqueue)
{
for (int i = 0; i < m_heaps.Length; ++i)
if (m_heaps[i].Count > 0)
return m_heaps[i].Min().Value;
throw new InvalidOperationException(string.Format("The {0} is empty", this.GetType().ToString()));
}
for (int i = 0; i < m_numberOfQueues; ++i)
{
// To get the fair queing, we cycle through each of the
// queues when finding an element to dequeue, this code
// assumes that the distribution of updates in the queues
// is polynomial, probably quadractic (eg distance of PI * R^2)
uint h = (uint)((m_nextQueue + i) % m_numberOfQueues);
if (m_heaps[h].Count > 0)
{
m_nextQueue = (uint)((h + 1) % m_numberOfQueues);
internal bool TryDequeue(out EntityUpdate value)
{
for (int i = 0; i < m_heaps.Length; ++i)
{
if (m_heaps[i].Count > 0)
{
MinHeapItem item = m_heaps[i].RemoveMin();
m_lookupTable.Remove(item.LocalID);
MinHeapItem item = m_heaps[h].RemoveMin();
m_lookupTable.Remove(item.Value.Entity.LocalId);
timeinqueue = Util.EnvironmentTickCountSubtract(item.EntryTime);
value = item.Value;
return true;
}
}
timeinqueue = 0;
value = default(EntityUpdate);
return false;
}
@ -11791,68 +11869,107 @@ namespace OpenSim.Region.ClientStack.LindenUDP
internal void Reprioritize(UpdatePriorityHandler handler)
{
MinHeapItem item;
double priority;
foreach (LookupItem lookup in new List<LookupItem>(this.m_lookupTable.Values))
{
if (lookup.Heap.TryGetValue(lookup.Handle, out item))
{
priority = item.Priority;
if (handler(ref priority, item.LocalID))
uint pqueue = item.PriorityQueue;
uint localid = item.Value.Entity.LocalId;
if (handler(ref pqueue, item.Value.Entity))
{
if (lookup.Heap.ContainsHandle(lookup.Handle))
lookup.Heap[lookup.Handle] =
new MinHeapItem(priority, item.Value, item.LocalID, this.m_comparison);
// unless the priority queue has changed, there is no need to modify
// the entry
pqueue = Util.Clamp<uint>(pqueue, 0, m_numberOfQueues - 1);
if (pqueue != item.PriorityQueue)
{
lookup.Heap.Remove(lookup.Handle);
LookupItem litem = lookup;
litem.Heap = m_heaps[pqueue];
litem.Heap.Add(new MinHeapItem(pqueue, item), ref litem.Handle);
m_lookupTable[localid] = litem;
}
}
else
{
m_log.Warn("[LLCLIENTVIEW]: UpdatePriorityHandler returned false, dropping update");
m_log.WarnFormat("[LLCLIENTVIEW]: UpdatePriorityHandler returned false for {0}",item.Value.Entity.UUID);
lookup.Heap.Remove(lookup.Handle);
this.m_lookupTable.Remove(item.LocalID);
this.m_lookupTable.Remove(localid);
}
}
}
}
public override string ToString()
{
string s = "";
for (int i = 0; i < m_numberOfQueues; i++)
{
if (s != "") s += ",";
s += m_heaps[i].Count.ToString();
}
return s;
}
#region MinHeapItem
private struct MinHeapItem : IComparable<MinHeapItem>
{
private double priority;
private EntityUpdate value;
private uint local_id;
private Comparison<double> comparison;
internal MinHeapItem(double priority, EntityUpdate value, uint local_id) :
this(priority, value, local_id, Comparer<double>.Default) { }
internal MinHeapItem(double priority, EntityUpdate value, uint local_id, IComparer<double> comparer) :
this(priority, value, local_id, new Comparison<double>(comparer.Compare)) { }
internal MinHeapItem(double priority, EntityUpdate value, uint local_id, Comparison<double> comparison)
{
this.priority = priority;
this.value = value;
this.local_id = local_id;
this.comparison = comparison;
internal EntityUpdate Value {
get {
return this.value;
}
}
internal double Priority { get { return this.priority; } }
internal EntityUpdate Value { get { return this.value; } }
internal uint LocalID { get { return this.local_id; } }
private uint pqueue;
internal uint PriorityQueue {
get {
return this.pqueue;
}
}
private Int32 entrytime;
internal Int32 EntryTime {
get {
return this.entrytime;
}
}
private UInt64 entryorder;
internal UInt64 EntryOrder
{
get {
return this.entryorder;
}
}
internal MinHeapItem(uint pqueue, MinHeapItem other)
{
this.entrytime = other.entrytime;
this.entryorder = other.entryorder;
this.value = other.value;
this.pqueue = pqueue;
}
internal MinHeapItem(uint pqueue, UInt64 entryorder, EntityUpdate value)
{
this.entrytime = Util.EnvironmentTickCount();
this.entryorder = entryorder;
this.value = value;
this.pqueue = pqueue;
}
public override string ToString()
{
StringBuilder sb = new StringBuilder();
sb.Append("[");
sb.Append(this.priority.ToString());
sb.Append(",");
if (this.value != null)
sb.Append(this.value.ToString());
sb.Append("]");
return sb.ToString();
return String.Format("[{0},{1},{2}]",pqueue,entryorder,value.Entity.LocalId);
}
public int CompareTo(MinHeapItem other)
{
return this.comparison(this.priority, other.priority);
// I'm assuming that the root part of an SOG is added to the update queue
// before the component parts
return Comparer<UInt64>.Default.Compare(this.EntryOrder, other.EntryOrder);
}
}
#endregion

View File

@ -58,7 +58,7 @@ namespace OpenSim.Region.Framework.Scenes
public class Prioritizer
{
// private static readonly ILog m_log = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
private static readonly ILog m_log = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
/// <summary>
/// This is added to the priority of all child prims, to make sure that the root prim update is sent to the
@ -76,7 +76,74 @@ namespace OpenSim.Region.Framework.Scenes
m_scene = scene;
}
public double GetUpdatePriority(IClientAPI client, ISceneEntity entity)
//<mic>
public uint GetUpdatePriority(IClientAPI client, ISceneEntity entity)
{
if (entity == null)
{
m_log.WarnFormat("[PRIORITIZER] attempt to prioritize null entity");
throw new InvalidOperationException("Prioritization entity not defined");
}
// If this is an update for our own avatar give it the highest priority
if (client.AgentId == entity.UUID)
return 0;
// Get this agent's position
ScenePresence presence = m_scene.GetScenePresence(client.AgentId);
if (presence == null)
{
m_log.WarnFormat("[PRIORITIZER] attempt to prioritize agent no longer in the scene");
throw new InvalidOperationException("Prioritization agent not defined");
}
// Use group position for child prims
Vector3 entityPos = entity.AbsolutePosition;
if (entity is SceneObjectPart)
{
SceneObjectGroup group = (entity as SceneObjectPart).ParentGroup;
if (group != null)
entityPos = group.AbsolutePosition;
}
// Use the camera position for local agents and avatar position for remote agents
Vector3 presencePos = (presence.IsChildAgent) ?
presence.AbsolutePosition :
presence.CameraPosition;
// Compute the distance...
double distance = Vector3.Distance(presencePos, entityPos);
// And convert the distance to a priority queue, this computation gives queues
// at 10, 20, 40, 80, 160, 320, 640, and 1280m
uint pqueue = 1;
for (int i = 0; i < 8; i++)
{
if (distance < 10 * Math.Pow(2.0,i))
break;
pqueue++;
}
// If this is a root agent, then determine front & back
// Bump up the priority queue for any objects behind the avatar
if (! presence.IsChildAgent)
{
// Root agent, decrease priority for objects behind us
Vector3 camPosition = presence.CameraPosition;
Vector3 camAtAxis = presence.CameraAtAxis;
// Plane equation
float d = -Vector3.Dot(camPosition, camAtAxis);
float p = Vector3.Dot(camAtAxis, entityPos) + d;
if (p < 0.0f)
pqueue++;
}
return pqueue;
}
//</mic>
public double bGetUpdatePriority(IClientAPI client, ISceneEntity entity)
{
double priority = 0;