Add a way to put things at the front of the queue for any throttle group.

Adds a DoubleLocklessQueue and uses it for the outgoing buckets. Added
a flag value to the Throttle Type (again) because although it's hacky, it's
the best of a bad bunch to get the message through the UDP stack to where it's
needed.
avinationmerge
Melanie 2013-01-16 19:29:27 +01:00
parent 5c8cced0a1
commit 582cb89beb
5 changed files with 63 additions and 19 deletions

View File

@ -29,7 +29,7 @@ using System.Threading;
namespace OpenSim.Framework namespace OpenSim.Framework
{ {
public sealed class LocklessQueue<T> public class LocklessQueue<T>
{ {
private sealed class SingleLinkNode private sealed class SingleLinkNode
{ {
@ -41,7 +41,7 @@ namespace OpenSim.Framework
SingleLinkNode tail; SingleLinkNode tail;
int count; int count;
public int Count { get { return count; } } public virtual int Count { get { return count; } }
public LocklessQueue() public LocklessQueue()
{ {
@ -76,7 +76,7 @@ namespace OpenSim.Framework
Interlocked.Increment(ref count); Interlocked.Increment(ref count);
} }
public bool Dequeue(out T item) public virtual bool Dequeue(out T item)
{ {
item = default(T); item = default(T);
SingleLinkNode oldHead = null; SingleLinkNode oldHead = null;
@ -136,4 +136,4 @@ namespace OpenSim.Framework
(object)Interlocked.CompareExchange<SingleLinkNode>(ref location, newValue, comparand); (object)Interlocked.CompareExchange<SingleLinkNode>(ref location, newValue, comparand);
} }
} }
} }

View File

@ -47,6 +47,8 @@ namespace OpenSim.Framework
Texture = 5, Texture = 5,
/// <summary>Non-texture assets</summary> /// <summary>Non-texture assets</summary>
Asset = 6, Asset = 6,
HighPriority = 128,
} }
[Flags] [Flags]

View File

@ -2788,7 +2788,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
Transfer.TransferInfo.Size = req.AssetInf.Data.Length; Transfer.TransferInfo.Size = req.AssetInf.Data.Length;
Transfer.TransferInfo.TransferID = req.TransferRequestID; Transfer.TransferInfo.TransferID = req.TransferRequestID;
Transfer.Header.Zerocoded = true; Transfer.Header.Zerocoded = true;
OutPacket(Transfer, isWearable ? ThrottleOutPacketType.Task : ThrottleOutPacketType.Asset); OutPacket(Transfer, isWearable ? ThrottleOutPacketType.Task | ThrottleOutPacketType.HighPriority : ThrottleOutPacketType.Asset);
if (req.NumPackets == 1) if (req.NumPackets == 1)
{ {
@ -2799,7 +2799,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
TransferPacket.TransferData.Data = req.AssetInf.Data; TransferPacket.TransferData.Data = req.AssetInf.Data;
TransferPacket.TransferData.Status = 1; TransferPacket.TransferData.Status = 1;
TransferPacket.Header.Zerocoded = true; TransferPacket.Header.Zerocoded = true;
OutPacket(TransferPacket, isWearable ? ThrottleOutPacketType.Task : ThrottleOutPacketType.Asset); OutPacket(TransferPacket, isWearable ? ThrottleOutPacketType.Task | ThrottleOutPacketType.HighPriority : ThrottleOutPacketType.Asset);
} }
else else
{ {
@ -2832,7 +2832,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
TransferPacket.TransferData.Status = 1; TransferPacket.TransferData.Status = 1;
} }
TransferPacket.Header.Zerocoded = true; TransferPacket.Header.Zerocoded = true;
OutPacket(TransferPacket, isWearable ? ThrottleOutPacketType.Task : ThrottleOutPacketType.Asset); OutPacket(TransferPacket, isWearable ? ThrottleOutPacketType.Task | ThrottleOutPacketType.HighPriority : ThrottleOutPacketType.Asset);
processedLength += chunkSize; processedLength += chunkSize;
packetNumber++; packetNumber++;
@ -3605,7 +3605,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
} }
} }
OutPacket(aw, ThrottleOutPacketType.Task); OutPacket(aw, ThrottleOutPacketType.Task | ThrottleOutPacketType.HighPriority);
} }
public void SendAppearance(UUID agentID, byte[] visualParams, byte[] textureEntry) public void SendAppearance(UUID agentID, byte[] visualParams, byte[] textureEntry)

View File

@ -92,7 +92,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// <summary>Packets we have sent that need to be ACKed by the client</summary> /// <summary>Packets we have sent that need to be ACKed by the client</summary>
public readonly UnackedPacketCollection NeedAcks = new UnackedPacketCollection(); public readonly UnackedPacketCollection NeedAcks = new UnackedPacketCollection();
/// <summary>ACKs that are queued up, waiting to be sent to the client</summary> /// <summary>ACKs that are queued up, waiting to be sent to the client</summary>
public readonly OpenSim.Framework.LocklessQueue<uint> PendingAcks = new OpenSim.Framework.LocklessQueue<uint>(); public readonly DoubleLocklessQueue<uint> PendingAcks = new DoubleLocklessQueue<uint>();
/// <summary>Current packet sequence number</summary> /// <summary>Current packet sequence number</summary>
public int CurrentSequence; public int CurrentSequence;
@ -146,7 +146,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// <summary>Throttle buckets for each packet category</summary> /// <summary>Throttle buckets for each packet category</summary>
private readonly TokenBucket[] m_throttleCategories; private readonly TokenBucket[] m_throttleCategories;
/// <summary>Outgoing queues for throttled packets</summary> /// <summary>Outgoing queues for throttled packets</summary>
private readonly OpenSim.Framework.LocklessQueue<OutgoingPacket>[] m_packetOutboxes = new OpenSim.Framework.LocklessQueue<OutgoingPacket>[THROTTLE_CATEGORY_COUNT]; private readonly DoubleLocklessQueue<OutgoingPacket>[] m_packetOutboxes = new DoubleLocklessQueue<OutgoingPacket>[THROTTLE_CATEGORY_COUNT];
/// <summary>A container that can hold one packet for each outbox, used to store /// <summary>A container that can hold one packet for each outbox, used to store
/// dequeued packets that are being held for throttling</summary> /// dequeued packets that are being held for throttling</summary>
private readonly OutgoingPacket[] m_nextPackets = new OutgoingPacket[THROTTLE_CATEGORY_COUNT]; private readonly OutgoingPacket[] m_nextPackets = new OutgoingPacket[THROTTLE_CATEGORY_COUNT];
@ -202,7 +202,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
ThrottleOutPacketType type = (ThrottleOutPacketType)i; ThrottleOutPacketType type = (ThrottleOutPacketType)i;
// Initialize the packet outboxes, where packets sit while they are waiting for tokens // Initialize the packet outboxes, where packets sit while they are waiting for tokens
m_packetOutboxes[i] = new OpenSim.Framework.LocklessQueue<OutgoingPacket>(); m_packetOutboxes[i] = new DoubleLocklessQueue<OutgoingPacket>();
// Initialize the token buckets that control the throttling for each category // Initialize the token buckets that control the throttling for each category
m_throttleCategories[i] = new TokenBucket(m_throttleCategory, rates.GetRate(type)); m_throttleCategories[i] = new TokenBucket(m_throttleCategory, rates.GetRate(type));
} }
@ -429,16 +429,21 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// false if the packet has not been queued and should be sent immediately. /// false if the packet has not been queued and should be sent immediately.
/// </returns> /// </returns>
public bool EnqueueOutgoing(OutgoingPacket packet, bool forceQueue) public bool EnqueueOutgoing(OutgoingPacket packet, bool forceQueue)
{
return EnqueueOutgoing(packet, forceQueue, false);
}
public bool EnqueueOutgoing(OutgoingPacket packet, bool forceQueue, bool highPriority)
{ {
int category = (int)packet.Category; int category = (int)packet.Category;
if (category >= 0 && category < m_packetOutboxes.Length) if (category >= 0 && category < m_packetOutboxes.Length)
{ {
OpenSim.Framework.LocklessQueue<OutgoingPacket> queue = m_packetOutboxes[category]; DoubleLocklessQueue<OutgoingPacket> queue = m_packetOutboxes[category];
if (m_deliverPackets == false) if (m_deliverPackets == false)
{ {
queue.Enqueue(packet); queue.Enqueue(packet, highPriority);
return true; return true;
} }
@ -449,7 +454,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// queued packets // queued packets
if (queue.Count > 0) if (queue.Count > 0)
{ {
queue.Enqueue(packet); queue.Enqueue(packet, highPriority);
return true; return true;
} }
@ -462,7 +467,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
else else
{ {
// Force queue specified or not enough tokens in the bucket, queue this packet // Force queue specified or not enough tokens in the bucket, queue this packet
queue.Enqueue(packet); queue.Enqueue(packet, highPriority);
return true; return true;
} }
} }
@ -494,7 +499,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
if (m_deliverPackets == false) return false; if (m_deliverPackets == false) return false;
OutgoingPacket packet = null; OutgoingPacket packet = null;
OpenSim.Framework.LocklessQueue<OutgoingPacket> queue; DoubleLocklessQueue<OutgoingPacket> queue;
TokenBucket bucket; TokenBucket bucket;
bool packetSent = false; bool packetSent = false;
ThrottleOutPacketTypeFlags emptyCategories = 0; ThrottleOutPacketTypeFlags emptyCategories = 0;
@ -534,7 +539,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
} }
catch catch
{ {
m_packetOutboxes[i] = new OpenSim.Framework.LocklessQueue<OutgoingPacket>(); m_packetOutboxes[i] = new DoubleLocklessQueue<OutgoingPacket>();
} }
if (success) if (success)
{ {
@ -567,7 +572,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
} }
else else
{ {
m_packetOutboxes[i] = new OpenSim.Framework.LocklessQueue<OutgoingPacket>(); m_packetOutboxes[i] = new DoubleLocklessQueue<OutgoingPacket>();
emptyCategories |= CategoryToFlag(i); emptyCategories |= CategoryToFlag(i);
} }
} }
@ -724,4 +729,33 @@ namespace OpenSim.Region.ClientStack.LindenUDP
} }
} }
} }
public class DoubleLocklessQueue<T> : OpenSim.Framework.LocklessQueue<T>
{
OpenSim.Framework.LocklessQueue<T> highQueue = new OpenSim.Framework.LocklessQueue<T>();
public override int Count
{
get
{
return base.Count + highQueue.Count;
}
}
public override bool Dequeue(out T item)
{
if (highQueue.Dequeue(out item))
return true;
return base.Dequeue(out item);
}
public void Enqueue(T item, bool highPriority)
{
if (highPriority)
highQueue.Enqueue(item);
else
Enqueue(item);
}
}
} }

View File

@ -803,6 +803,14 @@ namespace OpenSim.Region.ClientStack.LindenUDP
#region Queue or Send #region Queue or Send
bool highPriority = false;
if (category != ThrottleOutPacketType.Unknown && (category & ThrottleOutPacketType.HighPriority) != 0)
{
category = (ThrottleOutPacketType)((int)category & 127);
highPriority = true;
}
OutgoingPacket outgoingPacket = new OutgoingPacket(udpClient, buffer, category, null); OutgoingPacket outgoingPacket = new OutgoingPacket(udpClient, buffer, category, null);
// If we were not provided a method for handling unacked, use the UDPServer default method // If we were not provided a method for handling unacked, use the UDPServer default method
outgoingPacket.UnackedMethod = ((method == null) ? delegate(OutgoingPacket oPacket) { ResendUnacked(oPacket); } : method); outgoingPacket.UnackedMethod = ((method == null) ? delegate(OutgoingPacket oPacket) { ResendUnacked(oPacket); } : method);
@ -811,7 +819,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// continue to display the deleted object until relog. Therefore, we need to always queue a kill object // continue to display the deleted object until relog. Therefore, we need to always queue a kill object
// packet so that it isn't sent before a queued update packet. // packet so that it isn't sent before a queued update packet.
bool requestQueue = type == PacketType.KillObject; bool requestQueue = type == PacketType.KillObject;
if (!outgoingPacket.Client.EnqueueOutgoing(outgoingPacket, requestQueue)) if (!outgoingPacket.Client.EnqueueOutgoing(outgoingPacket, requestQueue, highPriority))
SendPacketFinal(outgoingPacket); SendPacketFinal(outgoingPacket);
#endregion Queue or Send #endregion Queue or Send