Merge branch 'queuetest' into careminster-presence-refactor

avinationmerge
Melanie 2011-04-21 16:51:39 +01:00
commit 204b8b7b7e
10 changed files with 340 additions and 106 deletions

View File

@ -576,34 +576,69 @@ namespace OpenSim.Framework
public class IEntityUpdate
{
public ISceneEntity Entity;
public uint Flags;
private ISceneEntity m_entity;
private uint m_flags;
private int m_updateTime;
public ISceneEntity Entity
{
get { return m_entity; }
}
public uint Flags
{
get { return m_flags; }
}
public int UpdateTime
{
get { return m_updateTime; }
}
public virtual void Update(IEntityUpdate update)
{
this.Flags |= update.Flags;
m_flags |= update.Flags;
// Use the older of the updates as the updateTime
if (Util.EnvironmentTickCountCompare(UpdateTime, update.UpdateTime) > 0)
m_updateTime = update.UpdateTime;
}
public IEntityUpdate(ISceneEntity entity, uint flags)
{
Entity = entity;
Flags = flags;
m_entity = entity;
m_flags = flags;
m_updateTime = Util.EnvironmentTickCount();
}
public IEntityUpdate(ISceneEntity entity, uint flags, Int32 updateTime)
{
m_entity = entity;
m_flags = flags;
m_updateTime = updateTime;
}
}
public class EntityUpdate : IEntityUpdate
{
// public ISceneEntity Entity;
// public PrimUpdateFlags Flags;
public float TimeDilation;
private float m_timeDilation;
public float TimeDilation
{
get { return m_timeDilation; }
}
public EntityUpdate(ISceneEntity entity, PrimUpdateFlags flags, float timedilation)
: base(entity,(uint)flags)
: base(entity, (uint)flags)
{
//Entity = entity;
// Flags = flags;
TimeDilation = timedilation;
m_timeDilation = timedilation;
}
public EntityUpdate(ISceneEntity entity, PrimUpdateFlags flags, float timedilation, Int32 updateTime)
: base(entity,(uint)flags,updateTime)
{
m_timeDilation = timedilation;
}
}

View File

@ -34,20 +34,21 @@ using OpenSim.Framework;
using OpenSim.Framework.Client;
using log4net;
namespace OpenSim.Region.ClientStack.LindenUDP
namespace OpenSim.Framework
{
public class PriorityQueue
{
private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
internal delegate bool UpdatePriorityHandler(ref uint priority, ISceneEntity entity);
public delegate bool UpdatePriorityHandler(ref uint priority, ISceneEntity entity);
// Heap[0] for self updates
// Heap[1..12] for entity updates
internal const uint m_numberOfQueues = 12;
public const uint NumberOfQueues = 12;
public const uint ImmediateQueue = 0;
private MinHeap<MinHeapItem>[] m_heaps = new MinHeap<MinHeapItem>[m_numberOfQueues];
private MinHeap<MinHeapItem>[] m_heaps = new MinHeap<MinHeapItem>[NumberOfQueues];
private Dictionary<uint, LookupItem> m_lookupTable;
private uint m_nextQueue = 0;
private UInt64 m_nextRequest = 0;
@ -57,9 +58,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP
get { return this.m_syncRoot; }
}
internal PriorityQueue() : this(MinHeap<MinHeapItem>.DEFAULT_CAPACITY) { }
public PriorityQueue() : this(MinHeap<MinHeapItem>.DEFAULT_CAPACITY) { }
internal PriorityQueue(int capacity)
public PriorityQueue(int capacity)
{
m_lookupTable = new Dictionary<uint, LookupItem>(capacity);
@ -67,7 +68,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
m_heaps[i] = new MinHeap<MinHeapItem>(capacity);
}
internal int Count
public int Count
{
get
{
@ -91,7 +92,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
lookup.Heap.Remove(lookup.Handle);
}
pqueue = Util.Clamp<uint>(pqueue, 0, m_numberOfQueues - 1);
pqueue = Util.Clamp<uint>(pqueue, 0, NumberOfQueues - 1);
lookup.Heap = m_heaps[pqueue];
lookup.Heap.Add(new MinHeapItem(pqueue, entry, value), ref lookup.Handle);
m_lookupTable[localid] = lookup;
@ -99,18 +100,30 @@ namespace OpenSim.Region.ClientStack.LindenUDP
return true;
}
internal bool TryDequeue(out IEntityUpdate value, out Int32 timeinqueue)
public bool TryDequeue(out IEntityUpdate value, out Int32 timeinqueue)
{
for (int i = 0; i < m_numberOfQueues; ++i)
// If there is anything in priority queue 0, return it first no
// matter what else. Breaks fairness. But very useful.
if (m_heaps[ImmediateQueue].Count > 0)
{
MinHeapItem item = m_heaps[ImmediateQueue].RemoveMin();
m_lookupTable.Remove(item.Value.Entity.LocalId);
timeinqueue = Util.EnvironmentTickCountSubtract(item.EntryTime);
value = item.Value;
return true;
}
for (int i = 0; i < NumberOfQueues; ++i)
{
// To get the fair queing, we cycle through each of the
// queues when finding an element to dequeue, this code
// assumes that the distribution of updates in the queues
// is polynomial, probably quadractic (eg distance of PI * R^2)
uint h = (uint)((m_nextQueue + i) % m_numberOfQueues);
uint h = (uint)((m_nextQueue + i) % NumberOfQueues);
if (m_heaps[h].Count > 0)
{
m_nextQueue = (uint)((h + 1) % m_numberOfQueues);
m_nextQueue = (uint)((h + 1) % NumberOfQueues);
MinHeapItem item = m_heaps[h].RemoveMin();
m_lookupTable.Remove(item.Value.Entity.LocalId);
@ -126,7 +139,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
return false;
}
internal void Reprioritize(UpdatePriorityHandler handler)
public void Reprioritize(UpdatePriorityHandler handler)
{
MinHeapItem item;
foreach (LookupItem lookup in new List<LookupItem>(this.m_lookupTable.Values))
@ -140,7 +153,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
{
// unless the priority queue has changed, there is no need to modify
// the entry
pqueue = Util.Clamp<uint>(pqueue, 0, m_numberOfQueues - 1);
pqueue = Util.Clamp<uint>(pqueue, 0, NumberOfQueues - 1);
if (pqueue != item.PriorityQueue)
{
lookup.Heap.Remove(lookup.Handle);
@ -164,7 +177,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
public override string ToString()
{
string s = "";
for (int i = 0; i < m_numberOfQueues; i++)
for (int i = 0; i < NumberOfQueues; i++)
{
if (s != "") s += ",";
s += m_heaps[i].Count.ToString();

View File

@ -1549,6 +1549,23 @@ namespace OpenSim.Framework
return (diff >= 0) ? diff : (diff + EnvironmentTickCountMask + 1);
}
// Returns value of Tick Count A - TickCount B accounting for wrapping of TickCount
// Assumes both tcA and tcB came from previous calls to Util.EnvironmentTickCount().
// A positive return value indicates A occured later than B
public static Int32 EnvironmentTickCountCompare(Int32 tcA, Int32 tcB)
{
// A, B and TC are all between 0 and 0x3fffffff
int tc = EnvironmentTickCount();
if (tc - tcA >= 0)
tcA += EnvironmentTickCountMask + 1;
if (tc - tcB >= 0)
tcB += EnvironmentTickCountMask + 1;
return tcA - tcB;
}
/// <summary>
/// Prints the call stack at any given point. Useful for debugging.
/// </summary>

View File

@ -3596,6 +3596,40 @@ namespace OpenSim.Region.ClientStack.LindenUDP
m_entityUpdates.Enqueue(priority, new EntityUpdate(entity, updateFlags, m_scene.TimeDilation));
}
/// <summary>
/// Requeue an EntityUpdate when it was not acknowledged by the client.
/// We will update the priority and put it in the correct queue, merging update flags
/// with any other updates that may be queued for the same entity.
/// The original update time is used for the merged update.
/// </summary>
private void ResendPrimUpdate(EntityUpdate update)
{
// If the update exists in priority queue, it will be updated.
// If it does not exist then it will be added with the current (rather than its original) priority
uint priority = m_prioritizer.GetUpdatePriority(this, update.Entity);
lock (m_entityUpdates.SyncRoot)
m_entityUpdates.Enqueue(priority, update);
}
/// <summary>
/// Requeue a list of EntityUpdates when they were not acknowledged by the client.
/// We will update the priority and put it in the correct queue, merging update flags
/// with any other updates that may be queued for the same entity.
/// The original update time is used for the merged update.
/// </summary>
private void ResendPrimUpdates(List<EntityUpdate> updates, OutgoingPacket oPacket)
{
// m_log.WarnFormat("[CLIENT] resending prim update {0}",updates[0].UpdateTime);
// Remove the update packet from the list of packets waiting for acknowledgement
// because we are requeuing the list of updates. They will be resent in new packets
// with the most recent state and priority.
m_udpClient.NeedAcks.Remove(oPacket.SequenceNumber, 0, true);
foreach (EntityUpdate update in updates)
ResendPrimUpdate(update);
}
private void ProcessEntityUpdates(int maxUpdates)
{
OpenSim.Framework.Lazy<List<ObjectUpdatePacket.ObjectDataBlock>> objectUpdateBlocks = new OpenSim.Framework.Lazy<List<ObjectUpdatePacket.ObjectDataBlock>>();
@ -3603,6 +3637,11 @@ namespace OpenSim.Region.ClientStack.LindenUDP
OpenSim.Framework.Lazy<List<ImprovedTerseObjectUpdatePacket.ObjectDataBlock>> terseUpdateBlocks = new OpenSim.Framework.Lazy<List<ImprovedTerseObjectUpdatePacket.ObjectDataBlock>>();
OpenSim.Framework.Lazy<List<ImprovedTerseObjectUpdatePacket.ObjectDataBlock>> terseAgentUpdateBlocks = new OpenSim.Framework.Lazy<List<ImprovedTerseObjectUpdatePacket.ObjectDataBlock>>();
OpenSim.Framework.Lazy<List<EntityUpdate>> objectUpdates = new OpenSim.Framework.Lazy<List<EntityUpdate>>();
OpenSim.Framework.Lazy<List<EntityUpdate>> compressedUpdates = new OpenSim.Framework.Lazy<List<EntityUpdate>>();
OpenSim.Framework.Lazy<List<EntityUpdate>> terseUpdates = new OpenSim.Framework.Lazy<List<EntityUpdate>>();
OpenSim.Framework.Lazy<List<EntityUpdate>> terseAgentUpdates = new OpenSim.Framework.Lazy<List<EntityUpdate>>();
// Check to see if this is a flush
if (maxUpdates <= 0)
{
@ -4027,7 +4066,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
{
SendFamilyProps = SendFamilyProps || update.SendFamilyProps;
SendObjectProps = SendObjectProps || update.SendObjectProps;
Flags |= update.Flags;
// other properties may need to be updated by base class
base.Update(update);
}
}
@ -4038,6 +4078,25 @@ namespace OpenSim.Region.ClientStack.LindenUDP
m_entityProps.Enqueue(priority, new ObjectPropertyUpdate(entity,requestFlags,true,false));
}
private void ResendPropertyUpdate(ObjectPropertyUpdate update)
{
uint priority = 0;
lock (m_entityProps.SyncRoot)
m_entityProps.Enqueue(priority, update);
}
private void ResendPropertyUpdates(List<ObjectPropertyUpdate> updates, OutgoingPacket oPacket)
{
// m_log.WarnFormat("[CLIENT] resending object property {0}",updates[0].UpdateTime);
// Remove the update packet from the list of packets waiting for acknowledgement
// because we are requeuing the list of updates. They will be resent in new packets
// with the most recent state.
m_udpClient.NeedAcks.Remove(oPacket.SequenceNumber, 0, true);
foreach (ObjectPropertyUpdate update in updates)
ResendPropertyUpdate(update);
}
public void SendObjectPropertiesReply(ISceneEntity entity)
{
uint priority = 0; // time based ordering only
@ -4053,6 +4112,12 @@ namespace OpenSim.Region.ClientStack.LindenUDP
OpenSim.Framework.Lazy<List<ObjectPropertiesPacket.ObjectDataBlock>> objectPropertiesBlocks =
new OpenSim.Framework.Lazy<List<ObjectPropertiesPacket.ObjectDataBlock>>();
OpenSim.Framework.Lazy<List<ObjectPropertyUpdate>> familyUpdates =
new OpenSim.Framework.Lazy<List<ObjectPropertyUpdate>>();
OpenSim.Framework.Lazy<List<ObjectPropertyUpdate>> propertyUpdates =
new OpenSim.Framework.Lazy<List<ObjectPropertyUpdate>>();
IEntityUpdate iupdate;
Int32 timeinqueue; // this is just debugging code & can be dropped later
@ -4071,6 +4136,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
SceneObjectPart sop = (SceneObjectPart)update.Entity;
ObjectPropertiesFamilyPacket.ObjectDataBlock objPropDB = CreateObjectPropertiesFamilyBlock(sop,update.Flags);
objectFamilyBlocks.Value.Add(objPropDB);
familyUpdates.Value.Add(update);
}
}
@ -4081,6 +4147,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
SceneObjectPart sop = (SceneObjectPart)update.Entity;
ObjectPropertiesPacket.ObjectDataBlock objPropDB = CreateObjectPropertiesBlock(sop);
objectPropertiesBlocks.Value.Add(objPropDB);
propertyUpdates.Value.Add(update);
}
}
@ -4088,12 +4155,13 @@ namespace OpenSim.Region.ClientStack.LindenUDP
}
Int32 ppcnt = 0;
Int32 pbcnt = 0;
// Int32 ppcnt = 0;
// Int32 pbcnt = 0;
if (objectPropertiesBlocks.IsValueCreated)
{
List<ObjectPropertiesPacket.ObjectDataBlock> blocks = objectPropertiesBlocks.Value;
List<ObjectPropertyUpdate> updates = propertyUpdates.Value;
ObjectPropertiesPacket packet = (ObjectPropertiesPacket)PacketPool.Instance.GetPacket(PacketType.ObjectProperties);
packet.ObjectData = new ObjectPropertiesPacket.ObjectDataBlock[blocks.Count];
@ -4101,28 +4169,26 @@ namespace OpenSim.Region.ClientStack.LindenUDP
packet.ObjectData[i] = blocks[i];
packet.Header.Zerocoded = true;
OutPacket(packet, ThrottleOutPacketType.Task, true);
pbcnt += blocks.Count;
ppcnt++;
// Pass in the delegate so that if this packet needs to be resent, we send the current properties
// of the object rather than the properties when the packet was created
OutPacket(packet, ThrottleOutPacketType.Task, true,
delegate(OutgoingPacket oPacket)
{
ResendPropertyUpdates(updates, oPacket);
});
// pbcnt += blocks.Count;
// ppcnt++;
}
Int32 fpcnt = 0;
Int32 fbcnt = 0;
// Int32 fpcnt = 0;
// Int32 fbcnt = 0;
if (objectFamilyBlocks.IsValueCreated)
{
List<ObjectPropertiesFamilyPacket.ObjectDataBlock> blocks = objectFamilyBlocks.Value;
// ObjectPropertiesFamilyPacket objPropFamilyPack =
// (ObjectPropertiesFamilyPacket)PacketPool.Instance.GetPacket(PacketType.ObjectPropertiesFamily);
//
// objPropFamilyPack.ObjectData = new ObjectPropertiesFamilyPacket.ObjectDataBlock[blocks.Count];
// for (int i = 0; i < blocks.Count; i++)
// objPropFamilyPack.ObjectData[i] = blocks[i];
//
// OutPacket(objPropFamilyPack, ThrottleOutPacketType.Task, true);
// one packet per object block... uggh...
for (int i = 0; i < blocks.Count; i++)
{
@ -4131,10 +4197,19 @@ namespace OpenSim.Region.ClientStack.LindenUDP
packet.ObjectData = blocks[i];
packet.Header.Zerocoded = true;
OutPacket(packet, ThrottleOutPacketType.Task);
fpcnt++;
fbcnt++;
// Pass in the delegate so that if this packet needs to be resent, we send the current properties
// of the object rather than the properties when the packet was created
List<ObjectPropertyUpdate> updates = new List<ObjectPropertyUpdate>();
updates.Add(familyUpdates.Value[i]);
OutPacket(packet, ThrottleOutPacketType.Task, true,
delegate(OutgoingPacket oPacket)
{
ResendPropertyUpdates(updates, oPacket);
});
// fpcnt++;
// fbcnt++;
}
}
@ -4171,7 +4246,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
return block;
}
private ObjectPropertiesPacket.ObjectDataBlock CreateObjectPropertiesBlock(SceneObjectPart sop)
{
//ObjectPropertiesPacket proper = (ObjectPropertiesPacket)PacketPool.Instance.GetPacket(PacketType.ObjectProperties);
@ -11472,6 +11547,22 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// packets (the default), or false to disable splitting if the calling code
/// handles splitting manually</param>
protected void OutPacket(Packet packet, ThrottleOutPacketType throttlePacketType, bool doAutomaticSplitting)
{
OutPacket(packet, throttlePacketType, doAutomaticSplitting, null);
}
/// <summary>
/// This is the starting point for sending a simulator packet out to the client
/// </summary>
/// <param name="packet">Packet to send</param>
/// <param name="throttlePacketType">Throttling category for the packet</param>
/// <param name="doAutomaticSplitting">True to automatically split oversized
/// packets (the default), or false to disable splitting if the calling code
/// handles splitting manually</param>
/// <param name="method">The method to be called in the event this packet is reliable
/// and unacknowledged. The server will provide normal resend capability if you do not
/// provide your own method.</param>
protected void OutPacket(Packet packet, ThrottleOutPacketType throttlePacketType, bool doAutomaticSplitting, UnackedPacketMethod method)
{
if (m_debugPacketLevel > 0)
{
@ -11498,7 +11589,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
m_log.DebugFormat("[CLIENT]: Packet OUT {0}", packet.Type);
}
m_udpServer.SendPacket(m_udpClient, packet, throttlePacketType, doAutomaticSplitting);
m_udpServer.SendPacket(m_udpClient, packet, throttlePacketType, doAutomaticSplitting, method);
}
public bool AddMoney(int debit)

View File

@ -135,7 +135,12 @@ namespace OpenSim.Region.ClientStack.LindenUDP
private int m_nextOnQueueEmpty = 1;
/// <summary>Throttle bucket for this agent's connection</summary>
private readonly TokenBucket m_throttleClient;
private readonly AdaptiveTokenBucket m_throttleClient;
public AdaptiveTokenBucket FlowThrottle
{
get { return m_throttleClient; }
}
/// <summary>Throttle bucket for this agent's connection</summary>
private readonly TokenBucket m_throttleCategory;
/// <summary>Throttle buckets for each packet category</summary>
@ -177,7 +182,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
m_maxRTO = maxRTO;
// Create a token bucket throttle for this client that has the scene token bucket as a parent
m_throttleClient = new TokenBucket(parentThrottle, rates.TotalLimit);
m_throttleClient = new AdaptiveTokenBucket(parentThrottle, rates.TotalLimit);
// Create a token bucket throttle for the total categary with the client bucket as a throttle
m_throttleCategory = new TokenBucket(m_throttleClient, rates.TotalLimit);
// Create an array of token buckets for this clients different throttle categories

View File

@ -297,7 +297,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
delegate(IClientAPI client)
{
if (client is LLClientView)
SendPacketData(((LLClientView)client).UDPClient, data, packet.Type, category);
SendPacketData(((LLClientView)client).UDPClient, data, packet.Type, category, null);
}
);
}
@ -309,7 +309,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
delegate(IClientAPI client)
{
if (client is LLClientView)
SendPacketData(((LLClientView)client).UDPClient, data, packet.Type, category);
SendPacketData(((LLClientView)client).UDPClient, data, packet.Type, category, null);
}
);
}
@ -322,7 +322,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// <param name="packet"></param>
/// <param name="category"></param>
/// <param name="allowSplitting"></param>
public void SendPacket(LLUDPClient udpClient, Packet packet, ThrottleOutPacketType category, bool allowSplitting)
public void SendPacket(LLUDPClient udpClient, Packet packet, ThrottleOutPacketType category, bool allowSplitting, UnackedPacketMethod method)
{
// CoarseLocationUpdate packets cannot be split in an automated way
if (packet.Type == PacketType.CoarseLocationUpdate && allowSplitting)
@ -339,13 +339,13 @@ namespace OpenSim.Region.ClientStack.LindenUDP
for (int i = 0; i < packetCount; i++)
{
byte[] data = datas[i];
SendPacketData(udpClient, data, packet.Type, category);
SendPacketData(udpClient, data, packet.Type, category, method);
}
}
else
{
byte[] data = packet.ToBytes();
SendPacketData(udpClient, data, packet.Type, category);
SendPacketData(udpClient, data, packet.Type, category, method);
}
}
@ -356,7 +356,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// <param name="data"></param>
/// <param name="type"></param>
/// <param name="category"></param>
public void SendPacketData(LLUDPClient udpClient, byte[] data, PacketType type, ThrottleOutPacketType category)
public void SendPacketData(LLUDPClient udpClient, byte[] data, PacketType type, ThrottleOutPacketType category, UnackedPacketMethod method)
{
int dataLength = data.Length;
bool doZerocode = (data[0] & Helpers.MSG_ZEROCODED) != 0;
@ -411,7 +411,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP
#region Queue or Send
OutgoingPacket outgoingPacket = new OutgoingPacket(udpClient, buffer, category);
OutgoingPacket outgoingPacket = new OutgoingPacket(udpClient, buffer, category, null);
// If we were not provided a method for handling unacked, use the UDPServer default method
outgoingPacket.UnackedMethod = ((method == null) ? delegate(OutgoingPacket oPacket) { ResendUnacked(oPacket); } : method);
// If a Linden Lab 1.23.5 client receives an update packet after a kill packet for an object, it will
// continue to display the deleted object until relog. Therefore, we need to always queue a kill object
@ -445,7 +447,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
packet.Header.Reliable = false;
packet.Packets = blocks.ToArray();
SendPacket(udpClient, packet, ThrottleOutPacketType.Unknown, true);
SendPacket(udpClient, packet, ThrottleOutPacketType.Unknown, true, null);
}
}
@ -458,17 +460,17 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// We *could* get OldestUnacked, but it would hurt performance and not provide any benefit
pc.PingID.OldestUnacked = 0;
SendPacket(udpClient, pc, ThrottleOutPacketType.Unknown, false);
SendPacket(udpClient, pc, ThrottleOutPacketType.Unknown, false, null);
}
public void CompletePing(LLUDPClient udpClient, byte pingID)
{
CompletePingCheckPacket completePing = new CompletePingCheckPacket();
completePing.PingID.PingID = pingID;
SendPacket(udpClient, completePing, ThrottleOutPacketType.Unknown, false);
SendPacket(udpClient, completePing, ThrottleOutPacketType.Unknown, false, null);
}
public void ResendUnacked(LLUDPClient udpClient)
public void HandleUnacked(LLUDPClient udpClient)
{
if (!udpClient.IsConnected)
return;
@ -488,33 +490,31 @@ namespace OpenSim.Region.ClientStack.LindenUDP
if (expiredPackets != null)
{
//m_log.Debug("[LLUDPSERVER]: Resending " + expiredPackets.Count + " packets to " + udpClient.AgentID + ", RTO=" + udpClient.RTO);
//m_log.Debug("[LLUDPSERVER]: Handling " + expiredPackets.Count + " packets to " + udpClient.AgentID + ", RTO=" + udpClient.RTO);
// Exponential backoff of the retransmission timeout
udpClient.BackoffRTO();
// Resend packets
for (int i = 0; i < expiredPackets.Count; i++)
{
OutgoingPacket outgoingPacket = expiredPackets[i];
//m_log.DebugFormat("[LLUDPSERVER]: Resending packet #{0} (attempt {1}), {2}ms have passed",
// outgoingPacket.SequenceNumber, outgoingPacket.ResendCount, Environment.TickCount - outgoingPacket.TickCount);
// Set the resent flag
outgoingPacket.Buffer.Data[0] = (byte)(outgoingPacket.Buffer.Data[0] | Helpers.MSG_RESENT);
outgoingPacket.Category = ThrottleOutPacketType.Resend;
// Bump up the resend count on this packet
Interlocked.Increment(ref outgoingPacket.ResendCount);
// Requeue or resend the packet
if (!outgoingPacket.Client.EnqueueOutgoing(outgoingPacket, false))
SendPacketFinal(outgoingPacket);
}
for (int i = 0; i < expiredPackets.Count; ++i)
expiredPackets[i].UnackedMethod(expiredPackets[i]);
}
}
public void ResendUnacked(OutgoingPacket outgoingPacket)
{
//m_log.DebugFormat("[LLUDPSERVER]: Resending packet #{0} (attempt {1}), {2}ms have passed",
// outgoingPacket.SequenceNumber, outgoingPacket.ResendCount, Environment.TickCount - outgoingPacket.TickCount);
// Set the resent flag
outgoingPacket.Buffer.Data[0] = (byte)(outgoingPacket.Buffer.Data[0] | Helpers.MSG_RESENT);
outgoingPacket.Category = ThrottleOutPacketType.Resend;
// Bump up the resend count on this packet
Interlocked.Increment(ref outgoingPacket.ResendCount);
// Requeue or resend the packet
if (!outgoingPacket.Client.EnqueueOutgoing(outgoingPacket, false))
SendPacketFinal(outgoingPacket);
}
public void Flush(LLUDPClient udpClient)
{
// FIXME: Implement?
@ -1098,7 +1098,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
if (udpClient.IsConnected)
{
if (m_resendUnacked)
ResendUnacked(udpClient);
HandleUnacked(udpClient);
if (m_sendAcks)
SendAcks(udpClient);
@ -1154,7 +1154,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
nticksUnack++;
watch2.Start();
ResendUnacked(udpClient);
HandleUnacked(udpClient);
watch2.Stop();
avgResendUnackedTicks = (nticksUnack - 1)/(float)nticksUnack * avgResendUnackedTicks + (watch2.ElapsedTicks / (float)nticksUnack);

View File

@ -31,6 +31,8 @@ using OpenMetaverse;
namespace OpenSim.Region.ClientStack.LindenUDP
{
public delegate void UnackedPacketMethod(OutgoingPacket oPacket);
/// <summary>
/// Holds a reference to the <seealso cref="LLUDPClient"/> this packet is
/// destined for, along with the serialized packet data, sequence number
@ -52,6 +54,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
public int TickCount;
/// <summary>Category this packet belongs to</summary>
public ThrottleOutPacketType Category;
/// <summary>The delegate to be called if this packet is determined to be unacknowledged</summary>
public UnackedPacketMethod UnackedMethod;
/// <summary>
/// Default constructor
@ -60,11 +64,12 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// <param name="buffer">Serialized packet data. If the flags or sequence number
/// need to be updated, they will be injected directly into this binary buffer</param>
/// <param name="category">Throttling category for this packet</param>
public OutgoingPacket(LLUDPClient client, UDPPacketBuffer buffer, ThrottleOutPacketType category)
public OutgoingPacket(LLUDPClient client, UDPPacketBuffer buffer, ThrottleOutPacketType category, UnackedPacketMethod method)
{
Client = client;
Buffer = buffer;
Category = category;
UnackedMethod = method;
}
}
}

View File

@ -48,31 +48,31 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// Number of ticks (ms) per quantum, drip rate and max burst
/// are defined over this interval.
/// </summary>
private const Int32 m_ticksPerQuantum = 1000;
protected const Int32 m_ticksPerQuantum = 1000;
/// <summary>
/// This is the number of quantums worth of packets that can
/// be accommodated during a burst
/// </summary>
private const Double m_quantumsPerBurst = 1.5;
protected const Double m_quantumsPerBurst = 1.5;
/// <summary>
/// </summary>
private const Int32 m_minimumDripRate = 1400;
protected const Int32 m_minimumDripRate = 1400;
/// <summary>Time of the last drip, in system ticks</summary>
private Int32 m_lastDrip;
protected Int32 m_lastDrip;
/// <summary>
/// The number of bytes that can be sent at this moment. This is the
/// current number of tokens in the bucket
/// </summary>
private Int64 m_tokenCount;
protected Int64 m_tokenCount;
/// <summary>
/// Map of children buckets and their requested maximum burst rate
/// </summary>
private Dictionary<TokenBucket,Int64> m_children = new Dictionary<TokenBucket,Int64>();
protected Dictionary<TokenBucket,Int64> m_children = new Dictionary<TokenBucket,Int64>();
#region Properties
@ -81,7 +81,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// parent. The parent bucket will limit the aggregate bandwidth of all
/// of its children buckets
/// </summary>
private TokenBucket m_parent;
protected TokenBucket m_parent;
public TokenBucket Parent
{
get { return m_parent; }
@ -93,7 +93,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// of tokens that can accumulate in the bucket at any one time. This
/// also sets the total request for leaf nodes
/// </summary>
private Int64 m_burstRate;
protected Int64 m_burstRate;
public Int64 RequestedBurstRate
{
get { return m_burstRate; }
@ -118,8 +118,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// <remarks>Tokens are added to the bucket any time
/// <seealso cref="RemoveTokens"/> is called, at the granularity of
/// the system tick interval (typically around 15-22ms)</remarks>
private Int64 m_dripRate;
public Int64 RequestedDripRate
protected Int64 m_dripRate;
public virtual Int64 RequestedDripRate
{
get { return (m_dripRate == 0 ? m_totalDripRequest : m_dripRate); }
set {
@ -131,7 +131,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
}
}
public Int64 DripRate
public virtual Int64 DripRate
{
get {
if (m_parent == null)
@ -149,7 +149,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// The current total of the requested maximum burst rates of
/// this bucket's children buckets.
/// </summary>
private Int64 m_totalDripRequest;
protected Int64 m_totalDripRequest;
public Int64 TotalDripRequest
{
get { return m_totalDripRequest; }
@ -189,7 +189,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// hierarchy. However, if any of the parents is over-booked, then
/// the modifier will be less than 1.
/// </summary>
private double DripRateModifier()
protected double DripRateModifier()
{
Int64 driprate = DripRate;
return driprate >= TotalDripRequest ? 1.0 : (double)driprate / (double)TotalDripRequest;
@ -197,7 +197,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// <summary>
/// </summary>
private double BurstRateModifier()
protected double BurstRateModifier()
{
// for now... burst rate is always m_quantumsPerBurst (constant)
// larger than drip rate so the ratio of burst requests is the
@ -268,7 +268,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// Deposit tokens into the bucket from a child bucket that did
/// not use all of its available tokens
/// </summary>
private void Deposit(Int64 count)
protected void Deposit(Int64 count)
{
m_tokenCount += count;
@ -285,7 +285,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// call to Drip
/// </summary>
/// <returns>True if tokens were added to the bucket, otherwise false</returns>
private void Drip()
protected void Drip()
{
// This should never happen... means we are a leaf node and were created
// with no drip rate...
@ -310,4 +310,64 @@ namespace OpenSim.Region.ClientStack.LindenUDP
Deposit(deltaMS * DripRate / m_ticksPerQuantum);
}
}
public class AdaptiveTokenBucket : TokenBucket
{
private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
// <summary>
// The minimum rate for flow control.
// </summary>
protected const Int64 m_minimumFlow = m_minimumDripRate * 10;
// <summary>
// The maximum rate for flow control. Drip rate can never be
// greater than this.
// </summary>
protected Int64 m_maxDripRate = 0;
protected Int64 MaxDripRate
{
get { return (m_maxDripRate == 0 ? m_totalDripRequest : m_maxDripRate); }
set { m_maxDripRate = (value == 0 ? 0 : Math.Max(value,m_minimumFlow)); }
}
// <summary>
//
// </summary>
public virtual Int64 AdjustedDripRate
{
get { return m_dripRate; }
set {
m_dripRate = OpenSim.Framework.Util.Clamp<Int64>(value,m_minimumFlow,MaxDripRate);
m_burstRate = (Int64)((double)m_dripRate * m_quantumsPerBurst);
if (m_parent != null)
m_parent.RegisterRequest(this,m_dripRate);
}
}
// <summary>
//
// </summary>
public AdaptiveTokenBucket(TokenBucket parent, Int64 maxDripRate) : base(parent,m_minimumFlow)
{
MaxDripRate = maxDripRate;
}
// <summary>
//
// </summary>
public void ExpirePackets(Int32 count)
{
// m_log.WarnFormat("[ADAPTIVEBUCKET] drop {0} by {1} expired packets",AdjustedDripRate,count);
AdjustedDripRate = (Int64) (AdjustedDripRate / Math.Pow(2,count));
}
// <summary>
//
// </summary>
public void AcknowledgePackets(Int32 count)
{
AdjustedDripRate = AdjustedDripRate + count;
}
}
}

View File

@ -130,6 +130,10 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// is actually sent out again
packet.TickCount = 0;
// As with other network applications, assume that an expired packet is
// an indication of some network problem, slow transmission
packet.Client.FlowThrottle.ExpirePackets(1);
expiredPackets.Add(packet);
}
}
@ -157,6 +161,10 @@ namespace OpenSim.Region.ClientStack.LindenUDP
{
m_packets.Remove(pendingRemove.SequenceNumber);
// As with other network applications, assume that an acknowledged packet is an
// indication that the network can handle a little more load, speed up the transmission
ackedPacket.Client.FlowThrottle.AcknowledgePackets(ackedPacket.Buffer.DataLength);
// Update stats
Interlocked.Add(ref ackedPacket.Client.UnackedBytes, -ackedPacket.Buffer.DataLength);

View File

@ -88,7 +88,7 @@ namespace OpenSim.Region.Framework.Scenes
// If this is an update for our own avatar give it the highest priority
if (client.AgentId == entity.UUID)
return 0;
return PriorityQueue.ImmediateQueue;
uint priority;
@ -180,7 +180,7 @@ namespace OpenSim.Region.Framework.Scenes
// m_log.WarnFormat("[PRIORITIZER] attempt to use agent {0} not in the scene",client.AgentId);
// throw new InvalidOperationException("Prioritization agent not defined");
return Int32.MaxValue;
return PriorityQueue.NumberOfQueues - 1;
}
// Use group position for child prims, since we are putting child prims in