further screwing around with the PacketQueue data structure.

Nearly time to replace a chunk of ClientView with this.
afrisby
Sean Dague 2007-12-07 19:13:35 +00:00
parent 25ec01311d
commit 4221ec23f9
1 changed files with 149 additions and 39 deletions

View File

@ -44,7 +44,7 @@ namespace OpenSim.Region.ClientStack
{ {
public class PacketQueue public class PacketQueue
{ {
private BlockingQueue<QueItem> SendQueue; private Queue<QueItem> SendQueue;
private Queue<QueItem> IncomingPacketQueue; private Queue<QueItem> IncomingPacketQueue;
private Queue<QueItem> OutgoingPacketQueue; private Queue<QueItem> OutgoingPacketQueue;
@ -76,7 +76,8 @@ namespace OpenSim.Region.ClientStack
private PacketThrottle TextureThrottle; private PacketThrottle TextureThrottle;
private PacketThrottle TotalThrottle; private PacketThrottle TotalThrottle;
private Timer throttleTimer; private long LastThrottle;
private long ThrottleInterval;
public PacketQueue() public PacketQueue()
{ {
@ -85,7 +86,7 @@ namespace OpenSim.Region.ClientStack
// in it to process. it's an on-purpose threadlock though because // in it to process. it's an on-purpose threadlock though because
// without it, the clientloop will suck up all sim resources. // without it, the clientloop will suck up all sim resources.
SendQueue = new BlockingQueue<QueItem>(); SendQueue = new Queue<QueItem>();
IncomingPacketQueue = new Queue<QueItem>(); IncomingPacketQueue = new Queue<QueItem>();
OutgoingPacketQueue = new Queue<QueItem>(); OutgoingPacketQueue = new Queue<QueItem>();
@ -111,9 +112,59 @@ namespace OpenSim.Region.ClientStack
TotalThrottle = new PacketThrottle(0, 162144, 1536000); TotalThrottle = new PacketThrottle(0, 162144, 1536000);
// TIMERS needed for this // TIMERS needed for this
throttleTimer = new Timer((int)(throttletimems/throttleTimeDivisor)); LastThrottle = DateTime.Now.Ticks;
throttleTimer.Elapsed += new ElapsedEventHandler(throttleTimer_Elapsed); ThrottleInterval = (long)(throttletimems/throttleTimeDivisor);
throttleTimer.Start(); }
/* STANDARD QUEUE MANIPULATION INTERFACES */
public void Enqueue(QueItem item)
{
// We could micro lock, but that will tend to actually
// probably be worse than just synchronizing on SendQueue
lock (SendQueue) {
switch (item.throttleType)
{
case ThrottleOutPacketType.Resend:
ThrottleCheck(ref ResendThrottle, ref ResendOutgoingPacketQueue, item);
break;
case ThrottleOutPacketType.Texture:
ThrottleCheck(ref TextureThrottle, ref TextureOutgoingPacketQueue, item);
break;
case ThrottleOutPacketType.Task:
ThrottleCheck(ref TaskThrottle, ref TaskOutgoingPacketQueue, item);
break;
case ThrottleOutPacketType.Land:
ThrottleCheck(ref LandThrottle, ref LandOutgoingPacketQueue, item);
break;
case ThrottleOutPacketType.Asset:
ThrottleCheck(ref AssetThrottle, ref AssetOutgoingPacketQueue, item);
break;
case ThrottleOutPacketType.Cloud:
ThrottleCheck(ref CloudThrottle, ref CloudOutgoingPacketQueue, item);
break;
case ThrottleOutPacketType.Wind:
ThrottleCheck(ref WindThrottle, ref WindOutgoingPacketQueue, item);
break;
default:
// Acknowledgements and other such stuff should go directly to the blocking Queue
// Throttling them may and likely 'will' be problematic
SendQueue.Enqueue(item);
break;
}
}
}
public QueItem Dequeue()
{
if (ThrottlingTime()) {
ProcessThrottle();
}
lock (SendQueue) {
return SendQueue.Dequeue();
}
} }
private void ResetCounters() private void ResetCounters()
@ -139,6 +190,98 @@ namespace OpenSim.Region.ClientStack
TextureOutgoingPacketQueue.Count > 0); TextureOutgoingPacketQueue.Count > 0);
} }
// Run through our wait queues and flush out allotted numbers of bytes into the process queue
private bool ThrottlingTime()
{
if(DateTime.Now.Ticks < (LastThrottle + ThrottleInterval)) {
LastThrottle = DateTime.Now.Ticks;
return true;
} else {
return false;
}
}
public void ProcessThrottle()
{
// I was considering this.. Will an event fire if the thread it's on is blocked?
// Then I figured out.. it doesn't really matter.. because this thread won't be blocked for long
// The General overhead of the UDP protocol gets sent to the queue un-throttled by this
// so This'll pick up about around the right time.
int MaxThrottleLoops = 4550; // 50*7 packets can be dequeued at once.
int throttleLoops = 0;
// We're going to dequeue all of the saved up packets until
// we've hit the throttle limit or there's no more packets to send
lock (SendQueue) {
ResetCounters();
while (TotalThrottle.UnderLimit() && PacketsWaiting() &&
(throttleLoops <= MaxThrottleLoops))
{
throttleLoops++;
//Now comes the fun part.. we dump all our elements into PacketQueue that we've saved up.
if (ResendThrottle.UnderLimit() && ResendOutgoingPacketQueue.Count > 0)
{
QueItem qpack = ResendOutgoingPacketQueue.Dequeue();
SendQueue.Enqueue(qpack);
TotalThrottle.Add(qpack.Packet.ToBytes().Length);
ResendThrottle.Add(qpack.Packet.ToBytes().Length);
}
if (LandThrottle.UnderLimit() && LandOutgoingPacketQueue.Count > 0)
{
QueItem qpack = LandOutgoingPacketQueue.Dequeue();
SendQueue.Enqueue(qpack);
TotalThrottle.Add(qpack.Packet.ToBytes().Length);
LandThrottle.Add(qpack.Packet.ToBytes().Length);
}
if (WindThrottle.UnderLimit() && WindOutgoingPacketQueue.Count > 0)
{
QueItem qpack = WindOutgoingPacketQueue.Dequeue();
SendQueue.Enqueue(qpack);
TotalThrottle.Add(qpack.Packet.ToBytes().Length);
WindThrottle.Add(qpack.Packet.ToBytes().Length);
}
if (CloudThrottle.UnderLimit() && CloudOutgoingPacketQueue.Count > 0)
{
QueItem qpack = CloudOutgoingPacketQueue.Dequeue();
SendQueue.Enqueue(qpack);
TotalThrottle.Add(qpack.Packet.ToBytes().Length);
CloudThrottle.Add(qpack.Packet.ToBytes().Length);
}
if (TaskThrottle.UnderLimit() && TaskOutgoingPacketQueue.Count > 0)
{
QueItem qpack = TaskOutgoingPacketQueue.Dequeue();
SendQueue.Enqueue(qpack);
TotalThrottle.Add(qpack.Packet.ToBytes().Length);
TaskThrottle.Add(qpack.Packet.ToBytes().Length);
}
if (TextureThrottle.UnderLimit() && TextureOutgoingPacketQueue.Count > 0)
{
QueItem qpack = TextureOutgoingPacketQueue.Dequeue();
SendQueue.Enqueue(qpack);
TotalThrottle.Add(qpack.Packet.ToBytes().Length);
TextureThrottle.Add(qpack.Packet.ToBytes().Length);
}
if (AssetThrottle.UnderLimit() && AssetOutgoingPacketQueue.Count > 0)
{
QueItem qpack = AssetOutgoingPacketQueue.Dequeue();
SendQueue.Enqueue(qpack);
TotalThrottle.Add(qpack.Packet.ToBytes().Length);
AssetThrottle.Add(qpack.Packet.ToBytes().Length);
}
}
}
}
private void throttleTimer_Elapsed(object sender, ElapsedEventArgs e) private void throttleTimer_Elapsed(object sender, ElapsedEventArgs e)
{ {
@ -239,39 +382,6 @@ namespace OpenSim.Region.ClientStack
} }
} }
public void Add(QueItem item)
{
switch (item.throttleType)
{
case ThrottleOutPacketType.Resend:
ThrottleCheck(ref ResendThrottle, ref ResendOutgoingPacketQueue, item);
break;
case ThrottleOutPacketType.Texture:
ThrottleCheck(ref TextureThrottle, ref TextureOutgoingPacketQueue, item);
break;
case ThrottleOutPacketType.Task:
ThrottleCheck(ref TaskThrottle, ref TaskOutgoingPacketQueue, item);
break;
case ThrottleOutPacketType.Land:
ThrottleCheck(ref LandThrottle, ref LandOutgoingPacketQueue, item);
break;
case ThrottleOutPacketType.Asset:
ThrottleCheck(ref AssetThrottle, ref AssetOutgoingPacketQueue, item);
break;
case ThrottleOutPacketType.Cloud:
ThrottleCheck(ref CloudThrottle, ref CloudOutgoingPacketQueue, item);
break;
case ThrottleOutPacketType.Wind:
ThrottleCheck(ref WindThrottle, ref WindOutgoingPacketQueue, item);
break;
default:
// Acknowledgements and other such stuff should go directly to the blocking Queue
// Throttling them may and likely 'will' be problematic
SendQueue.Enqueue(item);
break;
}
}
private int ScaleThrottle(int value, int curmax, int newmax) private int ScaleThrottle(int value, int curmax, int newmax)
{ {