Add a queue with two priority levels. This is a drop in replacement for

the BlockingQueue from OMV, but allows two priorities.
avinationmerge
Melanie 2012-08-28 22:17:17 +02:00
parent 5284e514d5
commit 378a79e7cc
1 changed files with 111 additions and 2 deletions

View File

@ -110,7 +110,10 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// <summary>Handlers for incoming packets</summary> /// <summary>Handlers for incoming packets</summary>
//PacketEventDictionary packetEvents = new PacketEventDictionary(); //PacketEventDictionary packetEvents = new PacketEventDictionary();
/// <summary>Incoming packets that are awaiting handling</summary> /// <summary>Incoming packets that are awaiting handling</summary>
private OpenMetaverse.BlockingQueue<IncomingPacket> packetInbox = new OpenMetaverse.BlockingQueue<IncomingPacket>(); //private OpenMetaverse.BlockingQueue<IncomingPacket> packetInbox = new OpenMetaverse.BlockingQueue<IncomingPacket>();
private DoubleQueue<IncomingPacket> packetInbox = new DoubleQueue<IncomingPacket>();
/// <summary></summary> /// <summary></summary>
//private UDPClientCollection m_clients = new UDPClientCollection(); //private UDPClientCollection m_clients = new UDPClientCollection();
/// <summary>Bandwidth throttle for this UDP server</summary> /// <summary>Bandwidth throttle for this UDP server</summary>
@ -919,7 +922,11 @@ namespace OpenSim.Region.ClientStack.LindenUDP
#endregion Ping Check Handling #endregion Ping Check Handling
// Inbox insertion // Inbox insertion
packetInbox.Enqueue(new IncomingPacket((LLClientView)client, packet)); if (packet.Type == PacketType.AgentUpdate ||
packet.Type == PacketType.ChatFromViewer)
packetInbox.EnqueueHigh(new IncomingPacket((LLClientView)client, packet));
else
packetInbox.EnqueueLow(new IncomingPacket((LLClientView)client, packet));
} }
#region BinaryStats #region BinaryStats
@ -1519,4 +1526,106 @@ namespace OpenSim.Region.ClientStack.LindenUDP
} }
} }
} }
internal class DoubleQueue<T> where T:class
{
private Queue<T> m_lowQueue = new Queue<T>();
private Queue<T> m_highQueue = new Queue<T>();
private object m_syncRoot = new object();
private Semaphore m_s = new Semaphore(0, 1);
public DoubleQueue()
{
}
public virtual int Count
{
get { return m_highQueue.Count + m_lowQueue.Count; }
}
public virtual void Enqueue(T data)
{
Enqueue(m_lowQueue, data);
}
public virtual void EnqueueLow(T data)
{
Enqueue(m_lowQueue, data);
}
public virtual void EnqueueHigh(T data)
{
Enqueue(m_highQueue, data);
}
private void Enqueue(Queue<T> q, T data)
{
lock (m_syncRoot)
{
m_lowQueue.Enqueue(data);
m_s.WaitOne(0);
m_s.Release();
}
}
public virtual T Dequeue()
{
return Dequeue(Timeout.Infinite);
}
public virtual T Dequeue(int tmo)
{
return Dequeue(TimeSpan.FromMilliseconds(tmo));
}
public virtual T Dequeue(TimeSpan wait)
{
T res = null;
if (!Dequeue(wait, ref res))
return null;
return res;
}
public bool Dequeue(int timeout, ref T res)
{
return Dequeue(TimeSpan.FromMilliseconds(timeout), ref res);
}
public bool Dequeue(TimeSpan wait, ref T res)
{
if (!m_s.WaitOne(wait))
return false;
lock (m_syncRoot)
{
if (m_highQueue.Count > 0)
res = m_highQueue.Dequeue();
else
res = m_lowQueue.Dequeue();
if (m_highQueue.Count == 0 || m_lowQueue.Count == 0)
return true;
m_s.Release();
return true;
}
}
public virtual void Clear()
{
lock (m_syncRoot)
{
// Make sure sem count is 0
m_s.WaitOne(0);
m_lowQueue.Clear();
m_highQueue.Clear();
}
}
}
} }