Add OnQueueEmpty event to the packet layers. No user functinality yet

prioritization
Melanie 2009-10-01 21:08:17 +01:00
parent 44776fea72
commit 5e9da4daab
3 changed files with 66 additions and 2 deletions

View File

@ -34,6 +34,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
{ {
public delegate void PacketStats(int inPackets, int outPackets, int unAckedBytes); public delegate void PacketStats(int inPackets, int outPackets, int unAckedBytes);
public delegate void PacketDrop(Packet pack, Object id); public delegate void PacketDrop(Packet pack, Object id);
public delegate void QueueEmpty(ThrottleOutPacketType queue);
public delegate bool SynchronizeClientHandler(IScene scene, Packet packet, UUID agentID, ThrottleOutPacketType throttlePacketType); public delegate bool SynchronizeClientHandler(IScene scene, Packet packet, UUID agentID, ThrottleOutPacketType throttlePacketType);
/// <summary> /// <summary>
@ -44,6 +45,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
{ {
event PacketStats OnPacketStats; event PacketStats OnPacketStats;
event PacketDrop OnPacketDrop; event PacketDrop OnPacketDrop;
event QueueEmpty OnQueueEmpty;
SynchronizeClientHandler SynchronizeClient { set; } SynchronizeClientHandler SynchronizeClient { set; }
int PacketsReceived { get; } int PacketsReceived { get; }

View File

@ -129,6 +129,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// //
public event PacketStats OnPacketStats; public event PacketStats OnPacketStats;
public event PacketDrop OnPacketDrop; public event PacketDrop OnPacketDrop;
public event QueueEmpty OnQueueEmpty;
//private SynchronizeClientHandler m_SynchronizeClient = null; //private SynchronizeClientHandler m_SynchronizeClient = null;
@ -172,6 +173,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
m_PacketQueue = new LLPacketQueue(client.AgentId, userSettings); m_PacketQueue = new LLPacketQueue(client.AgentId, userSettings);
m_PacketQueue.OnQueueEmpty += TriggerOnQueueEmpty;
m_AckTimer.Elapsed += AckTimerElapsed; m_AckTimer.Elapsed += AckTimerElapsed;
m_AckTimer.Start(); m_AckTimer.Start();
} }
@ -769,6 +772,16 @@ namespace OpenSim.Region.ClientStack.LindenUDP
handlerPacketDrop(packet, id); handlerPacketDrop(packet, id);
} }
private void TriggerOnQueueEmpty(ThrottleOutPacketType queue)
{
QueueEmpty handlerQueueEmpty = OnQueueEmpty;
if (handlerQueueEmpty == null)
return;
handlerQueueEmpty(queue);
}
// Convert the packet to bytes and stuff it onto the send queue // Convert the packet to bytes and stuff it onto the send queue
// //
public void ProcessOutPacket(LLQueItem item) public void ProcessOutPacket(LLQueItem item)

View File

@ -105,6 +105,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
private UUID m_agentId; private UUID m_agentId;
public event QueueEmpty OnQueueEmpty;
public LLPacketQueue(UUID agentId, ClientStackUserSettings userSettings) public LLPacketQueue(UUID agentId, ClientStackUserSettings userSettings)
{ {
// While working on this, the BlockingQueue had me fooled for a bit. // While working on this, the BlockingQueue had me fooled for a bit.
@ -293,30 +295,42 @@ namespace OpenSim.Region.ClientStack.LindenUDP
if (LandOutgoingPacketQueue.Count > 0) if (LandOutgoingPacketQueue.Count > 0)
{ {
SendQueue.Enqueue(LandOutgoingPacketQueue.Dequeue()); SendQueue.Enqueue(LandOutgoingPacketQueue.Dequeue());
TriggerOnQueueEmpty(ThrottleOutPacketType.Land);
} }
if (WindOutgoingPacketQueue.Count > 0) if (WindOutgoingPacketQueue.Count > 0)
{ {
SendQueue.Enqueue(WindOutgoingPacketQueue.Dequeue()); SendQueue.Enqueue(WindOutgoingPacketQueue.Dequeue());
TriggerOnQueueEmpty(ThrottleOutPacketType.Wind);
} }
if (CloudOutgoingPacketQueue.Count > 0) if (CloudOutgoingPacketQueue.Count > 0)
{ {
SendQueue.Enqueue(CloudOutgoingPacketQueue.Dequeue()); SendQueue.Enqueue(CloudOutgoingPacketQueue.Dequeue());
TriggerOnQueueEmpty(ThrottleOutPacketType.Cloud);
} }
bool tasksSent = false;
if (TaskOutgoingPacketQueue.Count > 0) if (TaskOutgoingPacketQueue.Count > 0)
{ {
tasksSent = true;
SendQueue.PriorityEnqueue(TaskOutgoingPacketQueue.Dequeue()); SendQueue.PriorityEnqueue(TaskOutgoingPacketQueue.Dequeue());
} }
if (TaskLowpriorityPacketQueue.Count > 0) if (TaskLowpriorityPacketQueue.Count > 0)
{ {
tasksSent = true;
SendQueue.Enqueue(TaskLowpriorityPacketQueue.Dequeue()); SendQueue.Enqueue(TaskLowpriorityPacketQueue.Dequeue());
} }
if (tasksSent)
{
TriggerOnQueueEmpty(ThrottleOutPacketType.Task);
}
if (TextureOutgoingPacketQueue.Count > 0) if (TextureOutgoingPacketQueue.Count > 0)
{ {
SendQueue.Enqueue(TextureOutgoingPacketQueue.Dequeue()); SendQueue.Enqueue(TextureOutgoingPacketQueue.Dequeue());
TriggerOnQueueEmpty(ThrottleOutPacketType.Texture);
} }
if (AssetOutgoingPacketQueue.Count > 0) if (AssetOutgoingPacketQueue.Count > 0)
{ {
SendQueue.Enqueue(AssetOutgoingPacketQueue.Dequeue()); SendQueue.Enqueue(AssetOutgoingPacketQueue.Dequeue());
TriggerOnQueueEmpty(ThrottleOutPacketType.Asset);
} }
} }
// m_log.Info("[THROTTLE]: Processed " + throttleLoops + " packets"); // m_log.Info("[THROTTLE]: Processed " + throttleLoops + " packets");
@ -405,6 +419,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
bool qchanged = true; bool qchanged = true;
ResetCounters(); ResetCounters();
List<ThrottleOutPacketType> Empty = new List<ThrottleOutPacketType>();
// m_log.Info("[THROTTLE]: Entering Throttle"); // m_log.Info("[THROTTLE]: Entering Throttle");
while (TotalThrottle.UnderLimit() && qchanged && throttleLoops <= MaxThrottleLoops) while (TotalThrottle.UnderLimit() && qchanged && throttleLoops <= MaxThrottleLoops)
{ {
@ -431,6 +447,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP
TotalThrottle.AddBytes(qpack.Length); TotalThrottle.AddBytes(qpack.Length);
LandThrottle.AddBytes(qpack.Length); LandThrottle.AddBytes(qpack.Length);
qchanged = true; qchanged = true;
if (LandOutgoingPacketQueue.Count == 0)
Empty.Add(ThrottleOutPacketType.Land);
} }
if ((WindOutgoingPacketQueue.Count > 0) && WindThrottle.UnderLimit()) if ((WindOutgoingPacketQueue.Count > 0) && WindThrottle.UnderLimit())
@ -441,6 +460,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP
TotalThrottle.AddBytes(qpack.Length); TotalThrottle.AddBytes(qpack.Length);
WindThrottle.AddBytes(qpack.Length); WindThrottle.AddBytes(qpack.Length);
qchanged = true; qchanged = true;
if (WindOutgoingPacketQueue.Count == 0)
Empty.Add(ThrottleOutPacketType.Wind);
} }
if ((CloudOutgoingPacketQueue.Count > 0) && CloudThrottle.UnderLimit()) if ((CloudOutgoingPacketQueue.Count > 0) && CloudThrottle.UnderLimit())
@ -451,6 +473,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP
TotalThrottle.AddBytes(qpack.Length); TotalThrottle.AddBytes(qpack.Length);
CloudThrottle.AddBytes(qpack.Length); CloudThrottle.AddBytes(qpack.Length);
qchanged = true; qchanged = true;
if (CloudOutgoingPacketQueue.Count == 0)
Empty.Add(ThrottleOutPacketType.Cloud);
} }
if ((TaskOutgoingPacketQueue.Count > 0 || TaskLowpriorityPacketQueue.Count > 0) && TaskThrottle.UnderLimit()) if ((TaskOutgoingPacketQueue.Count > 0 || TaskLowpriorityPacketQueue.Count > 0) && TaskThrottle.UnderLimit())
@ -470,6 +495,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP
TotalThrottle.AddBytes(qpack.Length); TotalThrottle.AddBytes(qpack.Length);
TaskThrottle.AddBytes(qpack.Length); TaskThrottle.AddBytes(qpack.Length);
qchanged = true; qchanged = true;
if (TaskOutgoingPacketQueue.Count == 0 && TaskLowpriorityPacketQueue.Count == 0)
Empty.Add(ThrottleOutPacketType.Task);
} }
if ((TextureOutgoingPacketQueue.Count > 0) && TextureThrottle.UnderLimit()) if ((TextureOutgoingPacketQueue.Count > 0) && TextureThrottle.UnderLimit())
@ -480,6 +508,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP
TotalThrottle.AddBytes(qpack.Length); TotalThrottle.AddBytes(qpack.Length);
TextureThrottle.AddBytes(qpack.Length); TextureThrottle.AddBytes(qpack.Length);
qchanged = true; qchanged = true;
if (TextureOutgoingPacketQueue.Count == 0)
Empty.Add(ThrottleOutPacketType.Texture);
} }
if ((AssetOutgoingPacketQueue.Count > 0) && AssetThrottle.UnderLimit()) if ((AssetOutgoingPacketQueue.Count > 0) && AssetThrottle.UnderLimit())
@ -490,12 +521,30 @@ namespace OpenSim.Region.ClientStack.LindenUDP
TotalThrottle.AddBytes(qpack.Length); TotalThrottle.AddBytes(qpack.Length);
AssetThrottle.AddBytes(qpack.Length); AssetThrottle.AddBytes(qpack.Length);
qchanged = true; qchanged = true;
if (AssetOutgoingPacketQueue.Count == 0)
Empty.Add(ThrottleOutPacketType.Asset);
} }
} }
// m_log.Info("[THROTTLE]: Processed " + throttleLoops + " packets"); // m_log.Info("[THROTTLE]: Processed " + throttleLoops + " packets");
foreach (ThrottleOutPacketType t in Empty)
{
TriggerOnQueueEmpty(t);
}
} }
} }
private void TriggerOnQueueEmpty(ThrottleOutPacketType queue)
{
QueueEmpty handlerQueueEmpty = OnQueueEmpty;
if (handlerQueueEmpty == null)
return;
handlerQueueEmpty(queue);
}
private void ThrottleTimerElapsed(object sender, ElapsedEventArgs e) private void ThrottleTimerElapsed(object sender, ElapsedEventArgs e)
{ {
// just to change the signature, and that ProcessThrottle // just to change the signature, and that ProcessThrottle