diff --git a/OpenSim/Framework/IClientAPI.cs b/OpenSim/Framework/IClientAPI.cs index eea9107e74..f1874689f2 100644 --- a/OpenSim/Framework/IClientAPI.cs +++ b/OpenSim/Framework/IClientAPI.cs @@ -576,34 +576,69 @@ namespace OpenSim.Framework public class IEntityUpdate { - public ISceneEntity Entity; - public uint Flags; + private ISceneEntity m_entity; + private uint m_flags; + private int m_updateTime; + + public ISceneEntity Entity + { + get { return m_entity; } + } + + public uint Flags + { + get { return m_flags; } + } + + public int UpdateTime + { + get { return m_updateTime; } + } public virtual void Update(IEntityUpdate update) { - this.Flags |= update.Flags; + m_flags |= update.Flags; + + // Use the older of the updates as the updateTime + if (Util.EnvironmentTickCountCompare(UpdateTime, update.UpdateTime) > 0) + m_updateTime = update.UpdateTime; } public IEntityUpdate(ISceneEntity entity, uint flags) { - Entity = entity; - Flags = flags; + m_entity = entity; + m_flags = flags; + m_updateTime = Util.EnvironmentTickCount(); + } + + public IEntityUpdate(ISceneEntity entity, uint flags, Int32 updateTime) + { + m_entity = entity; + m_flags = flags; + m_updateTime = updateTime; } } - public class EntityUpdate : IEntityUpdate { - // public ISceneEntity Entity; - // public PrimUpdateFlags Flags; - public float TimeDilation; + private float m_timeDilation; + + public float TimeDilation + { + get { return m_timeDilation; } + } public EntityUpdate(ISceneEntity entity, PrimUpdateFlags flags, float timedilation) - : base(entity,(uint)flags) + : base(entity, (uint)flags) { - //Entity = entity; // Flags = flags; - TimeDilation = timedilation; + m_timeDilation = timedilation; + } + + public EntityUpdate(ISceneEntity entity, PrimUpdateFlags flags, float timedilation, Int32 updateTime) + : base(entity,(uint)flags,updateTime) + { + m_timeDilation = timedilation; } } diff --git a/OpenSim/Region/ClientStack/LindenUDP/PriorityQueue.cs b/OpenSim/Framework/PriorityQueue.cs similarity index 84% rename from OpenSim/Region/ClientStack/LindenUDP/PriorityQueue.cs rename to OpenSim/Framework/PriorityQueue.cs index b62ec07ed9..eec2a9268e 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/PriorityQueue.cs +++ b/OpenSim/Framework/PriorityQueue.cs @@ -34,20 +34,21 @@ using OpenSim.Framework; using OpenSim.Framework.Client; using log4net; -namespace OpenSim.Region.ClientStack.LindenUDP +namespace OpenSim.Framework { public class PriorityQueue { private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); - internal delegate bool UpdatePriorityHandler(ref uint priority, ISceneEntity entity); + public delegate bool UpdatePriorityHandler(ref uint priority, ISceneEntity entity); // Heap[0] for self updates // Heap[1..12] for entity updates - internal const uint m_numberOfQueues = 12; + public const uint NumberOfQueues = 12; + public const uint ImmediateQueue = 0; - private MinHeap[] m_heaps = new MinHeap[m_numberOfQueues]; + private MinHeap[] m_heaps = new MinHeap[NumberOfQueues]; private Dictionary m_lookupTable; private uint m_nextQueue = 0; private UInt64 m_nextRequest = 0; @@ -57,9 +58,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP get { return this.m_syncRoot; } } - internal PriorityQueue() : this(MinHeap.DEFAULT_CAPACITY) { } + public PriorityQueue() : this(MinHeap.DEFAULT_CAPACITY) { } - internal PriorityQueue(int capacity) + public PriorityQueue(int capacity) { m_lookupTable = new Dictionary(capacity); @@ -67,7 +68,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP m_heaps[i] = new MinHeap(capacity); } - internal int Count + public int Count { get { @@ -91,7 +92,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP lookup.Heap.Remove(lookup.Handle); } - pqueue = Util.Clamp(pqueue, 0, m_numberOfQueues - 1); + pqueue = Util.Clamp(pqueue, 0, NumberOfQueues - 1); lookup.Heap = m_heaps[pqueue]; lookup.Heap.Add(new MinHeapItem(pqueue, entry, value), ref lookup.Handle); m_lookupTable[localid] = lookup; @@ -99,18 +100,30 @@ namespace OpenSim.Region.ClientStack.LindenUDP return true; } - internal bool TryDequeue(out IEntityUpdate value, out Int32 timeinqueue) + public bool TryDequeue(out IEntityUpdate value, out Int32 timeinqueue) { - for (int i = 0; i < m_numberOfQueues; ++i) + // If there is anything in priority queue 0, return it first no + // matter what else. Breaks fairness. But very useful. + if (m_heaps[ImmediateQueue].Count > 0) + { + MinHeapItem item = m_heaps[ImmediateQueue].RemoveMin(); + m_lookupTable.Remove(item.Value.Entity.LocalId); + timeinqueue = Util.EnvironmentTickCountSubtract(item.EntryTime); + value = item.Value; + + return true; + } + + for (int i = 0; i < NumberOfQueues; ++i) { // To get the fair queing, we cycle through each of the // queues when finding an element to dequeue, this code // assumes that the distribution of updates in the queues // is polynomial, probably quadractic (eg distance of PI * R^2) - uint h = (uint)((m_nextQueue + i) % m_numberOfQueues); + uint h = (uint)((m_nextQueue + i) % NumberOfQueues); if (m_heaps[h].Count > 0) { - m_nextQueue = (uint)((h + 1) % m_numberOfQueues); + m_nextQueue = (uint)((h + 1) % NumberOfQueues); MinHeapItem item = m_heaps[h].RemoveMin(); m_lookupTable.Remove(item.Value.Entity.LocalId); @@ -126,7 +139,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP return false; } - internal void Reprioritize(UpdatePriorityHandler handler) + public void Reprioritize(UpdatePriorityHandler handler) { MinHeapItem item; foreach (LookupItem lookup in new List(this.m_lookupTable.Values)) @@ -140,7 +153,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP { // unless the priority queue has changed, there is no need to modify // the entry - pqueue = Util.Clamp(pqueue, 0, m_numberOfQueues - 1); + pqueue = Util.Clamp(pqueue, 0, NumberOfQueues - 1); if (pqueue != item.PriorityQueue) { lookup.Heap.Remove(lookup.Handle); @@ -164,7 +177,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP public override string ToString() { string s = ""; - for (int i = 0; i < m_numberOfQueues; i++) + for (int i = 0; i < NumberOfQueues; i++) { if (s != "") s += ","; s += m_heaps[i].Count.ToString(); diff --git a/OpenSim/Framework/Util.cs b/OpenSim/Framework/Util.cs index 3f676f9275..366a38f1ec 100644 --- a/OpenSim/Framework/Util.cs +++ b/OpenSim/Framework/Util.cs @@ -1549,6 +1549,23 @@ namespace OpenSim.Framework return (diff >= 0) ? diff : (diff + EnvironmentTickCountMask + 1); } + // Returns value of Tick Count A - TickCount B accounting for wrapping of TickCount + // Assumes both tcA and tcB came from previous calls to Util.EnvironmentTickCount(). + // A positive return value indicates A occured later than B + public static Int32 EnvironmentTickCountCompare(Int32 tcA, Int32 tcB) + { + // A, B and TC are all between 0 and 0x3fffffff + int tc = EnvironmentTickCount(); + + if (tc - tcA >= 0) + tcA += EnvironmentTickCountMask + 1; + + if (tc - tcB >= 0) + tcB += EnvironmentTickCountMask + 1; + + return tcA - tcB; + } + /// /// Prints the call stack at any given point. Useful for debugging. /// diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs b/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs index 803114fa8f..cd438d67d1 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs @@ -3596,6 +3596,40 @@ namespace OpenSim.Region.ClientStack.LindenUDP m_entityUpdates.Enqueue(priority, new EntityUpdate(entity, updateFlags, m_scene.TimeDilation)); } + /// + /// Requeue an EntityUpdate when it was not acknowledged by the client. + /// We will update the priority and put it in the correct queue, merging update flags + /// with any other updates that may be queued for the same entity. + /// The original update time is used for the merged update. + /// + private void ResendPrimUpdate(EntityUpdate update) + { + // If the update exists in priority queue, it will be updated. + // If it does not exist then it will be added with the current (rather than its original) priority + uint priority = m_prioritizer.GetUpdatePriority(this, update.Entity); + + lock (m_entityUpdates.SyncRoot) + m_entityUpdates.Enqueue(priority, update); + } + + /// + /// Requeue a list of EntityUpdates when they were not acknowledged by the client. + /// We will update the priority and put it in the correct queue, merging update flags + /// with any other updates that may be queued for the same entity. + /// The original update time is used for the merged update. + /// + private void ResendPrimUpdates(List updates, OutgoingPacket oPacket) + { + // m_log.WarnFormat("[CLIENT] resending prim update {0}",updates[0].UpdateTime); + + // Remove the update packet from the list of packets waiting for acknowledgement + // because we are requeuing the list of updates. They will be resent in new packets + // with the most recent state and priority. + m_udpClient.NeedAcks.Remove(oPacket.SequenceNumber, 0, true); + foreach (EntityUpdate update in updates) + ResendPrimUpdate(update); + } + private void ProcessEntityUpdates(int maxUpdates) { OpenSim.Framework.Lazy> objectUpdateBlocks = new OpenSim.Framework.Lazy>(); @@ -3603,6 +3637,11 @@ namespace OpenSim.Region.ClientStack.LindenUDP OpenSim.Framework.Lazy> terseUpdateBlocks = new OpenSim.Framework.Lazy>(); OpenSim.Framework.Lazy> terseAgentUpdateBlocks = new OpenSim.Framework.Lazy>(); + OpenSim.Framework.Lazy> objectUpdates = new OpenSim.Framework.Lazy>(); + OpenSim.Framework.Lazy> compressedUpdates = new OpenSim.Framework.Lazy>(); + OpenSim.Framework.Lazy> terseUpdates = new OpenSim.Framework.Lazy>(); + OpenSim.Framework.Lazy> terseAgentUpdates = new OpenSim.Framework.Lazy>(); + // Check to see if this is a flush if (maxUpdates <= 0) { @@ -4027,7 +4066,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP { SendFamilyProps = SendFamilyProps || update.SendFamilyProps; SendObjectProps = SendObjectProps || update.SendObjectProps; - Flags |= update.Flags; + // other properties may need to be updated by base class + base.Update(update); } } @@ -4038,6 +4078,25 @@ namespace OpenSim.Region.ClientStack.LindenUDP m_entityProps.Enqueue(priority, new ObjectPropertyUpdate(entity,requestFlags,true,false)); } + private void ResendPropertyUpdate(ObjectPropertyUpdate update) + { + uint priority = 0; + lock (m_entityProps.SyncRoot) + m_entityProps.Enqueue(priority, update); + } + + private void ResendPropertyUpdates(List updates, OutgoingPacket oPacket) + { + // m_log.WarnFormat("[CLIENT] resending object property {0}",updates[0].UpdateTime); + + // Remove the update packet from the list of packets waiting for acknowledgement + // because we are requeuing the list of updates. They will be resent in new packets + // with the most recent state. + m_udpClient.NeedAcks.Remove(oPacket.SequenceNumber, 0, true); + foreach (ObjectPropertyUpdate update in updates) + ResendPropertyUpdate(update); + } + public void SendObjectPropertiesReply(ISceneEntity entity) { uint priority = 0; // time based ordering only @@ -4053,6 +4112,12 @@ namespace OpenSim.Region.ClientStack.LindenUDP OpenSim.Framework.Lazy> objectPropertiesBlocks = new OpenSim.Framework.Lazy>(); + OpenSim.Framework.Lazy> familyUpdates = + new OpenSim.Framework.Lazy>(); + + OpenSim.Framework.Lazy> propertyUpdates = + new OpenSim.Framework.Lazy>(); + IEntityUpdate iupdate; Int32 timeinqueue; // this is just debugging code & can be dropped later @@ -4071,6 +4136,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP SceneObjectPart sop = (SceneObjectPart)update.Entity; ObjectPropertiesFamilyPacket.ObjectDataBlock objPropDB = CreateObjectPropertiesFamilyBlock(sop,update.Flags); objectFamilyBlocks.Value.Add(objPropDB); + familyUpdates.Value.Add(update); } } @@ -4081,6 +4147,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP SceneObjectPart sop = (SceneObjectPart)update.Entity; ObjectPropertiesPacket.ObjectDataBlock objPropDB = CreateObjectPropertiesBlock(sop); objectPropertiesBlocks.Value.Add(objPropDB); + propertyUpdates.Value.Add(update); } } @@ -4088,12 +4155,13 @@ namespace OpenSim.Region.ClientStack.LindenUDP } - Int32 ppcnt = 0; - Int32 pbcnt = 0; + // Int32 ppcnt = 0; + // Int32 pbcnt = 0; if (objectPropertiesBlocks.IsValueCreated) { List blocks = objectPropertiesBlocks.Value; + List updates = propertyUpdates.Value; ObjectPropertiesPacket packet = (ObjectPropertiesPacket)PacketPool.Instance.GetPacket(PacketType.ObjectProperties); packet.ObjectData = new ObjectPropertiesPacket.ObjectDataBlock[blocks.Count]; @@ -4101,28 +4169,26 @@ namespace OpenSim.Region.ClientStack.LindenUDP packet.ObjectData[i] = blocks[i]; packet.Header.Zerocoded = true; - OutPacket(packet, ThrottleOutPacketType.Task, true); - pbcnt += blocks.Count; - ppcnt++; + // Pass in the delegate so that if this packet needs to be resent, we send the current properties + // of the object rather than the properties when the packet was created + OutPacket(packet, ThrottleOutPacketType.Task, true, + delegate(OutgoingPacket oPacket) + { + ResendPropertyUpdates(updates, oPacket); + }); + + // pbcnt += blocks.Count; + // ppcnt++; } - Int32 fpcnt = 0; - Int32 fbcnt = 0; + // Int32 fpcnt = 0; + // Int32 fbcnt = 0; if (objectFamilyBlocks.IsValueCreated) { List blocks = objectFamilyBlocks.Value; - - // ObjectPropertiesFamilyPacket objPropFamilyPack = - // (ObjectPropertiesFamilyPacket)PacketPool.Instance.GetPacket(PacketType.ObjectPropertiesFamily); - // - // objPropFamilyPack.ObjectData = new ObjectPropertiesFamilyPacket.ObjectDataBlock[blocks.Count]; - // for (int i = 0; i < blocks.Count; i++) - // objPropFamilyPack.ObjectData[i] = blocks[i]; - // - // OutPacket(objPropFamilyPack, ThrottleOutPacketType.Task, true); - + // one packet per object block... uggh... for (int i = 0; i < blocks.Count; i++) { @@ -4131,10 +4197,19 @@ namespace OpenSim.Region.ClientStack.LindenUDP packet.ObjectData = blocks[i]; packet.Header.Zerocoded = true; - OutPacket(packet, ThrottleOutPacketType.Task); - fpcnt++; - fbcnt++; + // Pass in the delegate so that if this packet needs to be resent, we send the current properties + // of the object rather than the properties when the packet was created + List updates = new List(); + updates.Add(familyUpdates.Value[i]); + OutPacket(packet, ThrottleOutPacketType.Task, true, + delegate(OutgoingPacket oPacket) + { + ResendPropertyUpdates(updates, oPacket); + }); + + // fpcnt++; + // fbcnt++; } } @@ -4171,7 +4246,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP return block; } - + private ObjectPropertiesPacket.ObjectDataBlock CreateObjectPropertiesBlock(SceneObjectPart sop) { //ObjectPropertiesPacket proper = (ObjectPropertiesPacket)PacketPool.Instance.GetPacket(PacketType.ObjectProperties); @@ -11472,6 +11547,22 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// packets (the default), or false to disable splitting if the calling code /// handles splitting manually protected void OutPacket(Packet packet, ThrottleOutPacketType throttlePacketType, bool doAutomaticSplitting) + { + OutPacket(packet, throttlePacketType, doAutomaticSplitting, null); + } + + /// + /// This is the starting point for sending a simulator packet out to the client + /// + /// Packet to send + /// Throttling category for the packet + /// True to automatically split oversized + /// packets (the default), or false to disable splitting if the calling code + /// handles splitting manually + /// The method to be called in the event this packet is reliable + /// and unacknowledged. The server will provide normal resend capability if you do not + /// provide your own method. + protected void OutPacket(Packet packet, ThrottleOutPacketType throttlePacketType, bool doAutomaticSplitting, UnackedPacketMethod method) { if (m_debugPacketLevel > 0) { @@ -11498,7 +11589,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP m_log.DebugFormat("[CLIENT]: Packet OUT {0}", packet.Type); } - m_udpServer.SendPacket(m_udpClient, packet, throttlePacketType, doAutomaticSplitting); + m_udpServer.SendPacket(m_udpClient, packet, throttlePacketType, doAutomaticSplitting, method); } public bool AddMoney(int debit) diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs b/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs index 01d7122d3f..e54d32629c 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs @@ -135,7 +135,12 @@ namespace OpenSim.Region.ClientStack.LindenUDP private int m_nextOnQueueEmpty = 1; /// Throttle bucket for this agent's connection - private readonly TokenBucket m_throttleClient; + private readonly AdaptiveTokenBucket m_throttleClient; + public AdaptiveTokenBucket FlowThrottle + { + get { return m_throttleClient; } + } + /// Throttle bucket for this agent's connection private readonly TokenBucket m_throttleCategory; /// Throttle buckets for each packet category @@ -177,7 +182,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP m_maxRTO = maxRTO; // Create a token bucket throttle for this client that has the scene token bucket as a parent - m_throttleClient = new TokenBucket(parentThrottle, rates.TotalLimit); + m_throttleClient = new AdaptiveTokenBucket(parentThrottle, rates.TotalLimit); // Create a token bucket throttle for the total categary with the client bucket as a throttle m_throttleCategory = new TokenBucket(m_throttleClient, rates.TotalLimit); // Create an array of token buckets for this clients different throttle categories diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs b/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs index 6decc7b1c1..a1a58e5a68 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs @@ -297,7 +297,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP delegate(IClientAPI client) { if (client is LLClientView) - SendPacketData(((LLClientView)client).UDPClient, data, packet.Type, category); + SendPacketData(((LLClientView)client).UDPClient, data, packet.Type, category, null); } ); } @@ -309,7 +309,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP delegate(IClientAPI client) { if (client is LLClientView) - SendPacketData(((LLClientView)client).UDPClient, data, packet.Type, category); + SendPacketData(((LLClientView)client).UDPClient, data, packet.Type, category, null); } ); } @@ -322,7 +322,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// /// /// - public void SendPacket(LLUDPClient udpClient, Packet packet, ThrottleOutPacketType category, bool allowSplitting) + public void SendPacket(LLUDPClient udpClient, Packet packet, ThrottleOutPacketType category, bool allowSplitting, UnackedPacketMethod method) { // CoarseLocationUpdate packets cannot be split in an automated way if (packet.Type == PacketType.CoarseLocationUpdate && allowSplitting) @@ -339,13 +339,13 @@ namespace OpenSim.Region.ClientStack.LindenUDP for (int i = 0; i < packetCount; i++) { byte[] data = datas[i]; - SendPacketData(udpClient, data, packet.Type, category); + SendPacketData(udpClient, data, packet.Type, category, method); } } else { byte[] data = packet.ToBytes(); - SendPacketData(udpClient, data, packet.Type, category); + SendPacketData(udpClient, data, packet.Type, category, method); } } @@ -356,7 +356,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// /// /// - public void SendPacketData(LLUDPClient udpClient, byte[] data, PacketType type, ThrottleOutPacketType category) + public void SendPacketData(LLUDPClient udpClient, byte[] data, PacketType type, ThrottleOutPacketType category, UnackedPacketMethod method) { int dataLength = data.Length; bool doZerocode = (data[0] & Helpers.MSG_ZEROCODED) != 0; @@ -411,7 +411,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP #region Queue or Send - OutgoingPacket outgoingPacket = new OutgoingPacket(udpClient, buffer, category); + OutgoingPacket outgoingPacket = new OutgoingPacket(udpClient, buffer, category, null); + // If we were not provided a method for handling unacked, use the UDPServer default method + outgoingPacket.UnackedMethod = ((method == null) ? delegate(OutgoingPacket oPacket) { ResendUnacked(oPacket); } : method); // If a Linden Lab 1.23.5 client receives an update packet after a kill packet for an object, it will // continue to display the deleted object until relog. Therefore, we need to always queue a kill object @@ -445,7 +447,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP packet.Header.Reliable = false; packet.Packets = blocks.ToArray(); - SendPacket(udpClient, packet, ThrottleOutPacketType.Unknown, true); + SendPacket(udpClient, packet, ThrottleOutPacketType.Unknown, true, null); } } @@ -458,17 +460,17 @@ namespace OpenSim.Region.ClientStack.LindenUDP // We *could* get OldestUnacked, but it would hurt performance and not provide any benefit pc.PingID.OldestUnacked = 0; - SendPacket(udpClient, pc, ThrottleOutPacketType.Unknown, false); + SendPacket(udpClient, pc, ThrottleOutPacketType.Unknown, false, null); } public void CompletePing(LLUDPClient udpClient, byte pingID) { CompletePingCheckPacket completePing = new CompletePingCheckPacket(); completePing.PingID.PingID = pingID; - SendPacket(udpClient, completePing, ThrottleOutPacketType.Unknown, false); + SendPacket(udpClient, completePing, ThrottleOutPacketType.Unknown, false, null); } - public void ResendUnacked(LLUDPClient udpClient) + public void HandleUnacked(LLUDPClient udpClient) { if (!udpClient.IsConnected) return; @@ -488,33 +490,31 @@ namespace OpenSim.Region.ClientStack.LindenUDP if (expiredPackets != null) { - //m_log.Debug("[LLUDPSERVER]: Resending " + expiredPackets.Count + " packets to " + udpClient.AgentID + ", RTO=" + udpClient.RTO); - + //m_log.Debug("[LLUDPSERVER]: Handling " + expiredPackets.Count + " packets to " + udpClient.AgentID + ", RTO=" + udpClient.RTO); // Exponential backoff of the retransmission timeout udpClient.BackoffRTO(); - - // Resend packets - for (int i = 0; i < expiredPackets.Count; i++) - { - OutgoingPacket outgoingPacket = expiredPackets[i]; - - //m_log.DebugFormat("[LLUDPSERVER]: Resending packet #{0} (attempt {1}), {2}ms have passed", - // outgoingPacket.SequenceNumber, outgoingPacket.ResendCount, Environment.TickCount - outgoingPacket.TickCount); - - // Set the resent flag - outgoingPacket.Buffer.Data[0] = (byte)(outgoingPacket.Buffer.Data[0] | Helpers.MSG_RESENT); - outgoingPacket.Category = ThrottleOutPacketType.Resend; - - // Bump up the resend count on this packet - Interlocked.Increment(ref outgoingPacket.ResendCount); - - // Requeue or resend the packet - if (!outgoingPacket.Client.EnqueueOutgoing(outgoingPacket, false)) - SendPacketFinal(outgoingPacket); - } + for (int i = 0; i < expiredPackets.Count; ++i) + expiredPackets[i].UnackedMethod(expiredPackets[i]); } } + public void ResendUnacked(OutgoingPacket outgoingPacket) + { + //m_log.DebugFormat("[LLUDPSERVER]: Resending packet #{0} (attempt {1}), {2}ms have passed", + // outgoingPacket.SequenceNumber, outgoingPacket.ResendCount, Environment.TickCount - outgoingPacket.TickCount); + + // Set the resent flag + outgoingPacket.Buffer.Data[0] = (byte)(outgoingPacket.Buffer.Data[0] | Helpers.MSG_RESENT); + outgoingPacket.Category = ThrottleOutPacketType.Resend; + + // Bump up the resend count on this packet + Interlocked.Increment(ref outgoingPacket.ResendCount); + + // Requeue or resend the packet + if (!outgoingPacket.Client.EnqueueOutgoing(outgoingPacket, false)) + SendPacketFinal(outgoingPacket); + } + public void Flush(LLUDPClient udpClient) { // FIXME: Implement? @@ -1098,7 +1098,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP if (udpClient.IsConnected) { if (m_resendUnacked) - ResendUnacked(udpClient); + HandleUnacked(udpClient); if (m_sendAcks) SendAcks(udpClient); @@ -1154,7 +1154,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP nticksUnack++; watch2.Start(); - ResendUnacked(udpClient); + HandleUnacked(udpClient); watch2.Stop(); avgResendUnackedTicks = (nticksUnack - 1)/(float)nticksUnack * avgResendUnackedTicks + (watch2.ElapsedTicks / (float)nticksUnack); diff --git a/OpenSim/Region/ClientStack/LindenUDP/OutgoingPacket.cs b/OpenSim/Region/ClientStack/LindenUDP/OutgoingPacket.cs index 1a1a1cb4e3..76c6c145c8 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/OutgoingPacket.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/OutgoingPacket.cs @@ -31,6 +31,8 @@ using OpenMetaverse; namespace OpenSim.Region.ClientStack.LindenUDP { + + public delegate void UnackedPacketMethod(OutgoingPacket oPacket); /// /// Holds a reference to the this packet is /// destined for, along with the serialized packet data, sequence number @@ -52,6 +54,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP public int TickCount; /// Category this packet belongs to public ThrottleOutPacketType Category; + /// The delegate to be called if this packet is determined to be unacknowledged + public UnackedPacketMethod UnackedMethod; /// /// Default constructor @@ -60,11 +64,12 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// Serialized packet data. If the flags or sequence number /// need to be updated, they will be injected directly into this binary buffer /// Throttling category for this packet - public OutgoingPacket(LLUDPClient client, UDPPacketBuffer buffer, ThrottleOutPacketType category) + public OutgoingPacket(LLUDPClient client, UDPPacketBuffer buffer, ThrottleOutPacketType category, UnackedPacketMethod method) { Client = client; Buffer = buffer; Category = category; + UnackedMethod = method; } } } diff --git a/OpenSim/Region/ClientStack/LindenUDP/TokenBucket.cs b/OpenSim/Region/ClientStack/LindenUDP/TokenBucket.cs index 07b0a1df7a..4ee6d3a30a 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/TokenBucket.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/TokenBucket.cs @@ -48,31 +48,31 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// Number of ticks (ms) per quantum, drip rate and max burst /// are defined over this interval. /// - private const Int32 m_ticksPerQuantum = 1000; + protected const Int32 m_ticksPerQuantum = 1000; /// /// This is the number of quantums worth of packets that can /// be accommodated during a burst /// - private const Double m_quantumsPerBurst = 1.5; + protected const Double m_quantumsPerBurst = 1.5; /// /// - private const Int32 m_minimumDripRate = 1400; + protected const Int32 m_minimumDripRate = 1400; /// Time of the last drip, in system ticks - private Int32 m_lastDrip; + protected Int32 m_lastDrip; /// /// The number of bytes that can be sent at this moment. This is the /// current number of tokens in the bucket /// - private Int64 m_tokenCount; + protected Int64 m_tokenCount; /// /// Map of children buckets and their requested maximum burst rate /// - private Dictionary m_children = new Dictionary(); + protected Dictionary m_children = new Dictionary(); #region Properties @@ -81,7 +81,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// parent. The parent bucket will limit the aggregate bandwidth of all /// of its children buckets /// - private TokenBucket m_parent; + protected TokenBucket m_parent; public TokenBucket Parent { get { return m_parent; } @@ -93,7 +93,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// of tokens that can accumulate in the bucket at any one time. This /// also sets the total request for leaf nodes /// - private Int64 m_burstRate; + protected Int64 m_burstRate; public Int64 RequestedBurstRate { get { return m_burstRate; } @@ -118,8 +118,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// Tokens are added to the bucket any time /// is called, at the granularity of /// the system tick interval (typically around 15-22ms) - private Int64 m_dripRate; - public Int64 RequestedDripRate + protected Int64 m_dripRate; + public virtual Int64 RequestedDripRate { get { return (m_dripRate == 0 ? m_totalDripRequest : m_dripRate); } set { @@ -131,7 +131,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP } } - public Int64 DripRate + public virtual Int64 DripRate { get { if (m_parent == null) @@ -149,7 +149,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// The current total of the requested maximum burst rates of /// this bucket's children buckets. /// - private Int64 m_totalDripRequest; + protected Int64 m_totalDripRequest; public Int64 TotalDripRequest { get { return m_totalDripRequest; } @@ -189,7 +189,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// hierarchy. However, if any of the parents is over-booked, then /// the modifier will be less than 1. /// - private double DripRateModifier() + protected double DripRateModifier() { Int64 driprate = DripRate; return driprate >= TotalDripRequest ? 1.0 : (double)driprate / (double)TotalDripRequest; @@ -197,7 +197,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// /// - private double BurstRateModifier() + protected double BurstRateModifier() { // for now... burst rate is always m_quantumsPerBurst (constant) // larger than drip rate so the ratio of burst requests is the @@ -268,7 +268,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// Deposit tokens into the bucket from a child bucket that did /// not use all of its available tokens /// - private void Deposit(Int64 count) + protected void Deposit(Int64 count) { m_tokenCount += count; @@ -285,7 +285,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// call to Drip /// /// True if tokens were added to the bucket, otherwise false - private void Drip() + protected void Drip() { // This should never happen... means we are a leaf node and were created // with no drip rate... @@ -310,4 +310,64 @@ namespace OpenSim.Region.ClientStack.LindenUDP Deposit(deltaMS * DripRate / m_ticksPerQuantum); } } + + public class AdaptiveTokenBucket : TokenBucket + { + private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); + + // + // The minimum rate for flow control. + // + protected const Int64 m_minimumFlow = m_minimumDripRate * 10; + + // + // The maximum rate for flow control. Drip rate can never be + // greater than this. + // + protected Int64 m_maxDripRate = 0; + protected Int64 MaxDripRate + { + get { return (m_maxDripRate == 0 ? m_totalDripRequest : m_maxDripRate); } + set { m_maxDripRate = (value == 0 ? 0 : Math.Max(value,m_minimumFlow)); } + } + + // + // + // + public virtual Int64 AdjustedDripRate + { + get { return m_dripRate; } + set { + m_dripRate = OpenSim.Framework.Util.Clamp(value,m_minimumFlow,MaxDripRate); + m_burstRate = (Int64)((double)m_dripRate * m_quantumsPerBurst); + if (m_parent != null) + m_parent.RegisterRequest(this,m_dripRate); + } + } + + // + // + // + public AdaptiveTokenBucket(TokenBucket parent, Int64 maxDripRate) : base(parent,m_minimumFlow) + { + MaxDripRate = maxDripRate; + } + + // + // + // + public void ExpirePackets(Int32 count) + { + // m_log.WarnFormat("[ADAPTIVEBUCKET] drop {0} by {1} expired packets",AdjustedDripRate,count); + AdjustedDripRate = (Int64) (AdjustedDripRate / Math.Pow(2,count)); + } + + // + // + // + public void AcknowledgePackets(Int32 count) + { + AdjustedDripRate = AdjustedDripRate + count; + } + } } diff --git a/OpenSim/Region/ClientStack/LindenUDP/UnackedPacketCollection.cs b/OpenSim/Region/ClientStack/LindenUDP/UnackedPacketCollection.cs index d195110a23..b1709649f5 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/UnackedPacketCollection.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/UnackedPacketCollection.cs @@ -130,6 +130,10 @@ namespace OpenSim.Region.ClientStack.LindenUDP // is actually sent out again packet.TickCount = 0; + // As with other network applications, assume that an expired packet is + // an indication of some network problem, slow transmission + packet.Client.FlowThrottle.ExpirePackets(1); + expiredPackets.Add(packet); } } @@ -157,6 +161,10 @@ namespace OpenSim.Region.ClientStack.LindenUDP { m_packets.Remove(pendingRemove.SequenceNumber); + // As with other network applications, assume that an acknowledged packet is an + // indication that the network can handle a little more load, speed up the transmission + ackedPacket.Client.FlowThrottle.AcknowledgePackets(ackedPacket.Buffer.DataLength); + // Update stats Interlocked.Add(ref ackedPacket.Client.UnackedBytes, -ackedPacket.Buffer.DataLength); diff --git a/OpenSim/Region/Framework/Scenes/Prioritizer.cs b/OpenSim/Region/Framework/Scenes/Prioritizer.cs index e4b0323f83..17b2da13dd 100644 --- a/OpenSim/Region/Framework/Scenes/Prioritizer.cs +++ b/OpenSim/Region/Framework/Scenes/Prioritizer.cs @@ -88,7 +88,7 @@ namespace OpenSim.Region.Framework.Scenes // If this is an update for our own avatar give it the highest priority if (client.AgentId == entity.UUID) - return 0; + return PriorityQueue.ImmediateQueue; uint priority; @@ -180,7 +180,7 @@ namespace OpenSim.Region.Framework.Scenes // m_log.WarnFormat("[PRIORITIZER] attempt to use agent {0} not in the scene",client.AgentId); // throw new InvalidOperationException("Prioritization agent not defined"); - return Int32.MaxValue; + return PriorityQueue.NumberOfQueues - 1; } // Use group position for child prims, since we are putting child prims in