move to PacketQueue for throttling. This has been tested with a couple

of people, but is enough of a change that more should try it out.  This
removes 500 lines from ClientView.cs in the process.
afrisby
Sean Dague 2007-12-07 21:30:01 +00:00
parent 4221ec23f9
commit 0aa982c252
2 changed files with 50 additions and 621 deletions

View File

@ -96,58 +96,6 @@ namespace OpenSim.Region.ClientStack
private int probesWithNoIngressPackets = 0; private int probesWithNoIngressPackets = 0;
private int lastPacketsReceived = 0; private int lastPacketsReceived = 0;
// 1536000
private int throttleOutboundMax = 1536000; // Number of bytes allowed to go out per second. (256kbps per client)
// TODO: Make this variable. Lower throttle on un-ack. Raise over time?
private int bytesSent = 0; // Number of bytes sent this period
private int throttleOutbound = 162144; // Number of bytes allowed to go out per second. (256kbps per client)
// TODO: Make this variable. Lower throttle on un-ack. Raise over time
// All throttle times and number of bytes are calculated by dividing by this value
// This value also determines how many times per throttletimems the timer will run
// If throttleimems is 1000 ms, then the timer will fire every 1000/7 milliseconds
private int throttleTimeDivisor = 7;
private int throttletimems = 1000;
// Maximum -per type- throttle
private int ResendthrottleMAX = 100000;
private int LandthrottleMax = 100000;
private int WindthrottleMax = 100000;
private int CloudthrottleMax = 100000;
private int TaskthrottleMax = 800000;
private int AssetthrottleMax = 800000;
private int TexturethrottleMax = 800000;
// Minimum -per type- throttle
private int ResendthrottleMin = 5000; // setting resendmin to 0 results in mostly dropped packets
private int LandthrottleMin = 1000;
private int WindthrottleMin = 1000;
private int CloudthrottleMin = 1000;
private int TaskthrottleMin = 1000;
private int AssetthrottleMin = 1000;
private int TexturethrottleMin = 1000;
// Sim default per-client settings.
private int ResendthrottleOutbound = 50000;
private int ResendBytesSent = 0;
private int LandthrottleOutbound = 100000;
private int LandBytesSent = 0;
private int WindthrottleOutbound = 10000;
private int WindBytesSent = 0;
private int CloudthrottleOutbound = 5000;
private int CloudBytesSent = 0;
private int TaskthrottleOutbound = 100000;
private int TaskBytesSent = 0;
private int AssetthrottleOutbound = 80000;
private int AssetBytesSent = 0;
private int TexturethrottleOutbound = 100000;
private int TextureBytesSent = 0;
private Timer throttleTimer;
public ClientView(EndPoint remoteEP, UseCircuitCodePacket initialcirpack, ClientManager clientManager, public ClientView(EndPoint remoteEP, UseCircuitCodePacket initialcirpack, ClientManager clientManager,
IScene scene, AssetCache assetCache, PacketServer packServer, IScene scene, AssetCache assetCache, PacketServer packServer,
AgentCircuitManager authenSessions) AgentCircuitManager authenSessions)
@ -168,34 +116,18 @@ namespace OpenSim.Region.ClientStack
startpos = m_authenticateSessionsHandler.GetPosition(initialcirpack.CircuitCode.Code); startpos = m_authenticateSessionsHandler.GetPosition(initialcirpack.CircuitCode.Code);
// While working on this, the BlockingQueue had me fooled for a bit. // While working on this, the BlockingQueue had me fooled for a bit.
// The Blocking queue causes the thread to stop until there's something // The Blocking queue causes the thread to stop until there's something
// in it to process. it's an on-purpose threadlock though because // in it to process. it's an on-purpose threadlock though because
// without it, the clientloop will suck up all sim resources. // without it, the clientloop will suck up all sim resources.
PacketQueue = new BlockingQueue<QueItem>(); PacketQueue = new PacketQueue();
IncomingPacketQueue = new Queue<QueItem>();
OutgoingPacketQueue = new Queue<QueItem>();
ResendOutgoingPacketQueue = new Queue<QueItem>();
LandOutgoingPacketQueue = new Queue<QueItem>();
WindOutgoingPacketQueue = new Queue<QueItem>();
CloudOutgoingPacketQueue = new Queue<QueItem>();
TaskOutgoingPacketQueue = new Queue<QueItem>();
TextureOutgoingPacketQueue = new Queue<QueItem>();
AssetOutgoingPacketQueue = new Queue<QueItem>();
//this.UploadAssets = new AgentAssetUpload(this, m_assetCache, m_inventoryCache); //this.UploadAssets = new AgentAssetUpload(this, m_assetCache, m_inventoryCache);
AckTimer = new Timer(750); AckTimer = new Timer(750);
AckTimer.Elapsed += new ElapsedEventHandler(AckTimer_Elapsed); AckTimer.Elapsed += new ElapsedEventHandler(AckTimer_Elapsed);
AckTimer.Start(); AckTimer.Start();
throttleTimer = new Timer((int)(throttletimems/throttleTimeDivisor));
throttleTimer.Elapsed += new ElapsedEventHandler(throttleTimer_Elapsed);
throttleTimer.Start();
RegisterLocalPacketHandlers(); RegisterLocalPacketHandlers();
ClientThread = new Thread(new ThreadStart(AuthUser)); ClientThread = new Thread(new ThreadStart(AuthUser));
@ -203,100 +135,6 @@ namespace OpenSim.Region.ClientStack
ClientThread.Start(); ClientThread.Start();
} }
void throttleTimer_Elapsed(object sender, ElapsedEventArgs e)
{
bytesSent = 0;
ResendBytesSent = 0;
LandBytesSent = 0;
WindBytesSent = 0;
CloudBytesSent = 0;
TaskBytesSent = 0;
AssetBytesSent = 0;
TextureBytesSent = 0;
// I was considering this.. Will an event fire if the thread it's on is blocked?
// Then I figured out.. it doesn't really matter.. because this thread won't be blocked for long
// The General overhead of the UDP protocol gets sent to the queue un-throttled by this
// so This'll pick up about around the right time.
int MaxThrottleLoops = 4550; // 50*7 packets can be dequeued at once.
int throttleLoops = 0;
// We're going to dequeue all of the saved up packets until
// we've hit the throttle limit or there's no more packets to send
lock (throttleTimer) {
while ((bytesSent <= ((int)(throttleOutbound/throttleTimeDivisor)) &&
(ResendOutgoingPacketQueue.Count > 0 ||
LandOutgoingPacketQueue.Count > 0 ||
WindOutgoingPacketQueue.Count > 0 ||
CloudOutgoingPacketQueue.Count > 0 ||
TaskOutgoingPacketQueue.Count > 0 ||
AssetOutgoingPacketQueue.Count > 0 ||
TextureOutgoingPacketQueue.Count > 0)) && throttleLoops <= MaxThrottleLoops)
{
throttleLoops++;
//Now comes the fun part.. we dump all our elements into PacketQueue that we've saved up.
if (ResendBytesSent <= ((int)(ResendthrottleOutbound/throttleTimeDivisor)) && ResendOutgoingPacketQueue.Count > 0)
{
QueItem qpack = ResendOutgoingPacketQueue.Dequeue();
PacketQueue.Enqueue(qpack);
bytesSent += qpack.Packet.ToBytes().Length;
ResendBytesSent += qpack.Packet.ToBytes().Length;
}
if (LandBytesSent <= ((int)(LandthrottleOutbound/throttleTimeDivisor)) && LandOutgoingPacketQueue.Count > 0)
{
QueItem qpack = LandOutgoingPacketQueue.Dequeue();
PacketQueue.Enqueue(qpack);
bytesSent += qpack.Packet.ToBytes().Length;
LandBytesSent += qpack.Packet.ToBytes().Length;
}
if (WindBytesSent <= ((int)(WindthrottleOutbound/throttleTimeDivisor)) && WindOutgoingPacketQueue.Count > 0)
{
QueItem qpack = WindOutgoingPacketQueue.Dequeue();
PacketQueue.Enqueue(qpack);
bytesSent += qpack.Packet.ToBytes().Length;
WindBytesSent += qpack.Packet.ToBytes().Length;
}
if (CloudBytesSent <= ((int)(CloudthrottleOutbound/throttleTimeDivisor)) && CloudOutgoingPacketQueue.Count > 0)
{
QueItem qpack = CloudOutgoingPacketQueue.Dequeue();
PacketQueue.Enqueue(qpack);
bytesSent += qpack.Packet.ToBytes().Length;
CloudBytesSent += qpack.Packet.ToBytes().Length;
}
if (TaskBytesSent <= ((int)(TaskthrottleOutbound/throttleTimeDivisor)) && TaskOutgoingPacketQueue.Count > 0)
{
QueItem qpack = TaskOutgoingPacketQueue.Dequeue();
PacketQueue.Enqueue(qpack);
bytesSent += qpack.Packet.ToBytes().Length;
TaskBytesSent += qpack.Packet.ToBytes().Length;
}
if (TextureBytesSent <= ((int)(TexturethrottleOutbound/throttleTimeDivisor)) && TextureOutgoingPacketQueue.Count > 0)
{
QueItem qpack = TextureOutgoingPacketQueue.Dequeue();
PacketQueue.Enqueue(qpack);
bytesSent += qpack.Packet.ToBytes().Length;
TextureBytesSent += qpack.Packet.ToBytes().Length;
}
if (AssetBytesSent <= ((int)(AssetthrottleOutbound/throttleTimeDivisor)) && AssetOutgoingPacketQueue.Count > 0)
{
QueItem qpack = AssetOutgoingPacketQueue.Dequeue();
PacketQueue.Enqueue(qpack);
bytesSent += qpack.Packet.ToBytes().Length;
AssetBytesSent += qpack.Packet.ToBytes().Length;
}
}
}
}
public LLUUID SessionId public LLUUID SessionId
{ {
get { return m_sessionId; } get { return m_sessionId; }
@ -315,6 +153,8 @@ namespace OpenSim.Region.ClientStack
m_scene.RemoveClient(AgentId); m_scene.RemoveClient(AgentId);
PacketQueue.Close();
ClientThread.Abort(); ClientThread.Abort();
} }
public void Kick(string message) public void Kick(string message)
@ -429,16 +269,12 @@ namespace OpenSim.Region.ClientStack
protected virtual void ClientLoop() protected virtual void ClientLoop()
{ {
bool queuedLast = false;
MainLog.Instance.Verbose("CLIENT", "Entered loop"); MainLog.Instance.Verbose("CLIENT", "Entered loop");
while (true) while (true)
{ {
QueItem nextPacket = PacketQueue.Dequeue(); QueItem nextPacket = PacketQueue.Dequeue();
if (nextPacket.Incoming) if (nextPacket.Incoming)
{ {
queuedLast = false;
//is a incoming packet //is a incoming packet
if (nextPacket.Packet.Type != PacketType.AgentUpdate) if (nextPacket.Packet.Type != PacketType.AgentUpdate)
{ {
@ -449,36 +285,8 @@ namespace OpenSim.Region.ClientStack
} }
else else
{ {
// Throw it back on the queue if it's going to cause us to flood the client
if (bytesSent > throttleOutboundMax)
{
PacketQueue.Enqueue(nextPacket);
MainLog.Instance.Verbose("THROTTLE", "Client over throttle limit, requeuing packet");
if (queuedLast)
{
MainLog.Instance.Verbose("THROTTLE", "No more sendable packets, need to sleep now");
Thread.Sleep(100); // Wait a little while if this was the last packet we saw
}
queuedLast = true;
}
else
{
queuedLast = false;
// TODO: May be a bit expensive doing this twice.
//Don't throttle AvatarPickerReplies!, they return a null .ToBytes()!
if (nextPacket.Packet.Type != PacketType.AvatarPickerReply)
bytesSent += nextPacket.Packet.ToBytes().Length;
//is a out going packet
DebugPacket("OUT", nextPacket.Packet); DebugPacket("OUT", nextPacket.Packet);
ProcessOutPacket(nextPacket.Packet); ProcessOutPacket(nextPacket.Packet);
}
} }
} }
} }
@ -2207,7 +2015,7 @@ namespace OpenSim.Region.ClientStack
} }
// Previously ClientView.PacketQueue // Previously ClientView.PacketQueue
protected BlockingQueue<QueItem> PacketQueue; protected PacketQueue PacketQueue;
protected Queue<QueItem> IncomingPacketQueue; protected Queue<QueItem> IncomingPacketQueue;
protected Queue<QueItem> OutgoingPacketQueue; protected Queue<QueItem> OutgoingPacketQueue;
@ -2400,66 +2208,13 @@ namespace OpenSim.Region.ClientStack
} }
} }
private void ThrottleCheck(ref int TypeBytesSent, int Throttle, ref Queue<QueItem> q, QueItem item)
{
// The idea.. is if the packet throttle queues are empty
// and the client is under throttle for the type. Queue
// it up directly. This basically short cuts having to
// wait for the timer to fire to put things into the
// output queue
if((q.Count == 0) && (TypeBytesSent <= ((int)(Throttle / throttleTimeDivisor))))
{
bytesSent += item.Packet.ToBytes().Length;
TypeBytesSent += item.Packet.ToBytes().Length;
PacketQueue.Enqueue(item);
}
else
{
q.Enqueue(item);
}
}
public virtual void OutPacket(Packet NewPack, ThrottleOutPacketType throttlePacketType) public virtual void OutPacket(Packet NewPack, ThrottleOutPacketType throttlePacketType)
{ {
QueItem item = new QueItem(); QueItem item = new QueItem();
item.Packet = NewPack; item.Packet = NewPack;
item.Incoming = false; item.Incoming = false;
item.throttleType = throttlePacketType; // Packet throttle type item.throttleType = throttlePacketType; // Packet throttle type
// The idea.. is if the packet throttle queues are empty and the client is under throttle for the type.
// Queue it up directly.
switch (throttlePacketType)
{
case ThrottleOutPacketType.Resend:
ThrottleCheck(ref ResendBytesSent, ResendthrottleOutbound, ref ResendOutgoingPacketQueue, item);
break;
case ThrottleOutPacketType.Texture:
ThrottleCheck(ref TextureBytesSent, TexturethrottleOutbound, ref TextureOutgoingPacketQueue, item);
break;
case ThrottleOutPacketType.Task:
ThrottleCheck(ref TaskBytesSent, TaskthrottleOutbound, ref TaskOutgoingPacketQueue, item);
break;
case ThrottleOutPacketType.Land:
ThrottleCheck(ref LandBytesSent, LandthrottleOutbound, ref LandOutgoingPacketQueue, item);
break;
case ThrottleOutPacketType.Asset:
ThrottleCheck(ref AssetBytesSent, AssetthrottleOutbound, ref AssetOutgoingPacketQueue, item);
break;
case ThrottleOutPacketType.Cloud:
ThrottleCheck(ref CloudBytesSent, CloudthrottleOutbound, ref CloudOutgoingPacketQueue, item);
break;
case ThrottleOutPacketType.Wind:
ThrottleCheck(ref WindBytesSent, WindthrottleOutbound, ref WindOutgoingPacketQueue, item);
break;
default:
// Acknowledgements and other such stuff should go directly to the blocking Queue
// Throttling them may and likely 'will' be problematic
PacketQueue.Enqueue(item); PacketQueue.Enqueue(item);
break;
}
//OutgoingPacketQueue.Enqueue(item);
} }
# region Low Level Packet Methods # region Low Level Packet Methods
@ -3333,275 +3088,7 @@ namespace OpenSim.Region.ClientStack
break; break;
case PacketType.AgentThrottle: case PacketType.AgentThrottle:
AgentThrottlePacket atpack = (AgentThrottlePacket)Pack; PacketQueue.SetThrottleFromClient(Pack);
byte[] throttle = atpack.Throttle.Throttles;
int tResend = -1;
int tLand = -1;
int tWind = -1;
int tCloud = -1;
int tTask = -1;
int tTexture = -1;
int tAsset = -1;
int tall = -1;
int singlefloat = 4;
//Agent Throttle Block contains 7 single floatingpoint values.
int j = 0;
// Some Systems may be big endian...
// it might be smart to do this check more often...
if (!BitConverter.IsLittleEndian)
for (int i = 0; i < 7; i++)
Array.Reverse(throttle, j + i * singlefloat, singlefloat);
// values gotten from libsecondlife.org/wiki/Throttle. Thanks MW_
// bytes
// Convert to integer, since.. the full fp space isn't used.
tResend = (int)BitConverter.ToSingle(throttle, j);
j += singlefloat;
tLand = (int)BitConverter.ToSingle(throttle, j);
j += singlefloat;
tWind = (int)BitConverter.ToSingle(throttle, j);
j += singlefloat;
tCloud = (int)BitConverter.ToSingle(throttle, j);
j += singlefloat;
tTask = (int)BitConverter.ToSingle(throttle, j);
j += singlefloat;
tTexture = (int)BitConverter.ToSingle(throttle, j);
j += singlefloat;
tAsset = (int)BitConverter.ToSingle(throttle, j);
tall = tResend + tLand + tWind + tCloud + tTask + tTexture + tAsset;
/*
MainLog.Instance.Verbose("CLIENT", "Client AgentThrottle - Got throttle:resendbytes=" + tResend +
" landbytes=" + tLand +
" windbytes=" + tWind +
" cloudbytes=" + tCloud +
" taskbytes=" + tTask +
" texturebytes=" + tTexture +
" Assetbytes=" + tAsset +
" Allbytes=" + tall);
*/
// Total Sanity
// Make sure that the client sent sane total values.
// If the client didn't send acceptable values....
// Scale the clients values down until they are acceptable.
if (tall <= throttleOutboundMax)
{
// Sanity
// Making sure the client sends sane values
// This gives us a measure of control of the comms
// Check Max of Type
// Then Check Min of type
// Resend throttle
if (tResend <= ResendthrottleMAX)
ResendthrottleOutbound = tResend;
if (tResend < ResendthrottleMin)
ResendthrottleOutbound = ResendthrottleMin;
// Land throttle
if (tLand <= LandthrottleMax)
LandthrottleOutbound = tLand;
if (tLand < LandthrottleMin)
LandthrottleOutbound = LandthrottleMin;
// Wind throttle
if (tWind <= WindthrottleMax)
WindthrottleOutbound = tWind;
if (tWind < WindthrottleMin)
WindthrottleOutbound = WindthrottleMin;
// Cloud throttle
if (tCloud <= CloudthrottleMax)
CloudthrottleOutbound = tCloud;
if (tCloud < CloudthrottleMin)
CloudthrottleOutbound = CloudthrottleMin;
// Task throttle
if (tTask <= TaskthrottleMax)
TaskthrottleOutbound = tTask;
if (tTask < TaskthrottleMin)
TaskthrottleOutbound = TaskthrottleMin;
// Texture throttle
if (tTexture <= TexturethrottleMax)
TexturethrottleOutbound = tTexture;
if (tTexture < TexturethrottleMin)
TexturethrottleOutbound = TexturethrottleMin;
//Asset throttle
if (tAsset <= AssetthrottleMax)
AssetthrottleOutbound = tAsset;
if (tAsset < AssetthrottleMin)
AssetthrottleOutbound = AssetthrottleMin;
/* MainLog.Instance.Verbose("THROTTLE", "Using:resendbytes=" + ResendthrottleOutbound +
" landbytes=" + LandthrottleOutbound +
" windbytes=" + WindthrottleOutbound +
" cloudbytes=" + CloudthrottleOutbound +
" taskbytes=" + TaskthrottleOutbound +
" texturebytes=" + TexturethrottleOutbound +
" Assetbytes=" + AssetthrottleOutbound +
" Allbytes=" + tall);
*/
}
else
{
// The client didn't send acceptable values..
// so it's our job now to turn them into acceptable values
// We're going to first scale the values down
// After that we're going to check if the scaled values are sane
// We're going to be dividing by a user value.. so make sure
// we don't get a divide by zero error.
if (tall > 0)
{
// Find out the percentage of all communications
// the client requests for each type. We'll keep resend at
// it's client recommended level (won't scale it down)
// unless it's beyond sane values itself.
if (tResend <= ResendthrottleMAX)
{
// This is nexted because we only want to re-set the values
// the packet throttler uses once.
if (tResend >= ResendthrottleMin)
{
ResendthrottleOutbound = tResend;
}
else
{
ResendthrottleOutbound = ResendthrottleMin;
}
}
else
{
ResendthrottleOutbound = ResendthrottleMAX;
}
// Getting Percentages of communication for each type of data
float LandPercent = (float)(tLand / tall);
float WindPercent = (float)(tWind / tall);
float CloudPercent = (float)(tCloud / tall);
float TaskPercent = (float)(tTask / tall);
float TexturePercent = (float)(tTexture / tall);
float AssetPercent = (float)(tAsset / tall);
// Okay.. now we've got the percentages of total communication.
// Apply them to a new max total
int tLandResult = (int)(LandPercent * throttleOutboundMax);
int tWindResult = (int)(WindPercent * throttleOutboundMax);
int tCloudResult = (int)(CloudPercent * throttleOutboundMax);
int tTaskResult = (int)(TaskPercent * throttleOutboundMax);
int tTextureResult = (int)(TexturePercent * throttleOutboundMax);
int tAssetResult = (int)(AssetPercent * throttleOutboundMax);
// Now we have to check our scaled values for sanity
// Check Max of Type
// Then Check Min of type
// Land throttle
if (tLandResult <= LandthrottleMax)
LandthrottleOutbound = tLandResult;
if (tLandResult < LandthrottleMin)
LandthrottleOutbound = LandthrottleMin;
// Wind throttle
if (tWindResult <= WindthrottleMax)
WindthrottleOutbound = tWindResult;
if (tWindResult < WindthrottleMin)
WindthrottleOutbound = WindthrottleMin;
// Cloud throttle
if (tCloudResult <= CloudthrottleMax)
CloudthrottleOutbound = tCloudResult;
if (tCloudResult < CloudthrottleMin)
CloudthrottleOutbound = CloudthrottleMin;
// Task throttle
if (tTaskResult <= TaskthrottleMax)
TaskthrottleOutbound = tTaskResult;
if (tTaskResult < TaskthrottleMin)
TaskthrottleOutbound = TaskthrottleMin;
// Texture throttle
if (tTextureResult <= TexturethrottleMax)
TexturethrottleOutbound = tTextureResult;
if (tTextureResult < TexturethrottleMin)
TexturethrottleOutbound = TexturethrottleMin;
//Asset throttle
if (tAssetResult <= AssetthrottleMax)
AssetthrottleOutbound = tAssetResult;
if (tAssetResult < AssetthrottleMin)
AssetthrottleOutbound = AssetthrottleMin;
/* MainLog.Instance.Verbose("THROTTLE", "Using:resendbytes=" + ResendthrottleOutbound +
" landbytes=" + LandthrottleOutbound +
" windbytes=" + WindthrottleOutbound +
" cloudbytes=" + CloudthrottleOutbound +
" taskbytes=" + TaskthrottleOutbound +
" texturebytes=" + TexturethrottleOutbound +
" Assetbytes=" + AssetthrottleOutbound +
" Allbytes=" + tall);
*/
}
else
{
// The client sent a stupid value..
// We're going to set the throttles to the minimum possible
ResendthrottleOutbound = ResendthrottleMin;
LandthrottleOutbound = LandthrottleMin;
WindthrottleOutbound = WindthrottleMin;
CloudthrottleOutbound = CloudthrottleMin;
TaskthrottleOutbound = TaskthrottleMin;
TexturethrottleOutbound = TexturethrottleMin;
AssetthrottleOutbound = AssetthrottleMin;
MainLog.Instance.Verbose("THROTTLE", "ClientSentBadThrottle Using:resendbytes=" + ResendthrottleOutbound +
" landbytes=" + LandthrottleOutbound +
" windbytes=" + WindthrottleOutbound +
" cloudbytes=" + CloudthrottleOutbound +
" taskbytes=" + TaskthrottleOutbound +
" texturebytes=" + TexturethrottleOutbound +
" Assetbytes=" + AssetthrottleOutbound +
" Allbytes=" + tall);
}
}
// Reset Client Throttles
// This has the effect of 'wiggling the slider
// causes prim and stuck textures that didn't download to download
ResendBytesSent = 0;
LandBytesSent = 0;
WindBytesSent = 0;
CloudBytesSent = 0;
TaskBytesSent = 0;
AssetBytesSent = 0;
TextureBytesSent = 0;
//Yay, we've finally handled the agent Throttle packet!
break; break;
#endregion #endregion

View File

@ -44,7 +44,7 @@ namespace OpenSim.Region.ClientStack
{ {
public class PacketQueue public class PacketQueue
{ {
private Queue<QueItem> SendQueue; private BlockingQueue<QueItem> SendQueue;
private Queue<QueItem> IncomingPacketQueue; private Queue<QueItem> IncomingPacketQueue;
private Queue<QueItem> OutgoingPacketQueue; private Queue<QueItem> OutgoingPacketQueue;
@ -76,8 +76,9 @@ namespace OpenSim.Region.ClientStack
private PacketThrottle TextureThrottle; private PacketThrottle TextureThrottle;
private PacketThrottle TotalThrottle; private PacketThrottle TotalThrottle;
private long LastThrottle; // private long LastThrottle;
private long ThrottleInterval; // private long ThrottleInterval;
private Timer throttleTimer;
public PacketQueue() public PacketQueue()
{ {
@ -86,7 +87,7 @@ namespace OpenSim.Region.ClientStack
// in it to process. it's an on-purpose threadlock though because // in it to process. it's an on-purpose threadlock though because
// without it, the clientloop will suck up all sim resources. // without it, the clientloop will suck up all sim resources.
SendQueue = new Queue<QueItem>(); SendQueue = new BlockingQueue<QueItem>();
IncomingPacketQueue = new Queue<QueItem>(); IncomingPacketQueue = new Queue<QueItem>();
OutgoingPacketQueue = new Queue<QueItem>(); OutgoingPacketQueue = new Queue<QueItem>();
@ -111,9 +112,14 @@ namespace OpenSim.Region.ClientStack
// Number of bytes allowed to go out per second. (256kbps per client) // Number of bytes allowed to go out per second. (256kbps per client)
TotalThrottle = new PacketThrottle(0, 162144, 1536000); TotalThrottle = new PacketThrottle(0, 162144, 1536000);
throttleTimer = new Timer((int)(throttletimems/throttleTimeDivisor));
throttleTimer.Elapsed += new ElapsedEventHandler(ThrottleTimerElapsed);
throttleTimer.Start();
// TIMERS needed for this // TIMERS needed for this
LastThrottle = DateTime.Now.Ticks; // LastThrottle = DateTime.Now.Ticks;
ThrottleInterval = (long)(throttletimems/throttleTimeDivisor); // ThrottleInterval = (long)(throttletimems/throttleTimeDivisor);
} }
/* STANDARD QUEUE MANIPULATION INTERFACES */ /* STANDARD QUEUE MANIPULATION INTERFACES */
@ -123,7 +129,7 @@ namespace OpenSim.Region.ClientStack
{ {
// We could micro lock, but that will tend to actually // We could micro lock, but that will tend to actually
// probably be worse than just synchronizing on SendQueue // probably be worse than just synchronizing on SendQueue
lock (SendQueue) { lock (this) {
switch (item.throttleType) switch (item.throttleType)
{ {
case ThrottleOutPacketType.Resend: case ThrottleOutPacketType.Resend:
@ -159,12 +165,14 @@ namespace OpenSim.Region.ClientStack
public QueItem Dequeue() public QueItem Dequeue()
{ {
if (ThrottlingTime()) {
ProcessThrottle();
}
lock (SendQueue) {
return SendQueue.Dequeue(); return SendQueue.Dequeue();
} }
public void Close()
{
// one last push
ProcessThrottle();
throttleTimer.Stop();
} }
private void ResetCounters() private void ResetCounters()
@ -192,15 +200,15 @@ namespace OpenSim.Region.ClientStack
// Run through our wait queues and flush out allotted numbers of bytes into the process queue // Run through our wait queues and flush out allotted numbers of bytes into the process queue
private bool ThrottlingTime() // private bool ThrottlingTime()
{ // {
if(DateTime.Now.Ticks < (LastThrottle + ThrottleInterval)) { // if(DateTime.Now.Ticks > (LastThrottle + ThrottleInterval)) {
LastThrottle = DateTime.Now.Ticks; // LastThrottle = DateTime.Now.Ticks;
return true; // return true;
} else { // } else {
return false; // return false;
} // }
} // }
public void ProcessThrottle() public void ProcessThrottle()
{ {
@ -216,8 +224,9 @@ namespace OpenSim.Region.ClientStack
// We're going to dequeue all of the saved up packets until // We're going to dequeue all of the saved up packets until
// we've hit the throttle limit or there's no more packets to send // we've hit the throttle limit or there's no more packets to send
lock (SendQueue) { lock (this) {
ResetCounters(); ResetCounters();
// MainLog.Instance.Verbose("THROTTLE", "Entering Throttle");
while (TotalThrottle.UnderLimit() && PacketsWaiting() && while (TotalThrottle.UnderLimit() && PacketsWaiting() &&
(throttleLoops <= MaxThrottleLoops)) (throttleLoops <= MaxThrottleLoops))
{ {
@ -280,86 +289,16 @@ namespace OpenSim.Region.ClientStack
AssetThrottle.Add(qpack.Packet.ToBytes().Length); AssetThrottle.Add(qpack.Packet.ToBytes().Length);
} }
} }
// MainLog.Instance.Verbose("THROTTLE", "Processed " + throttleLoops + " packets");
} }
} }
private void throttleTimer_Elapsed(object sender, ElapsedEventArgs e) private void ThrottleTimerElapsed(object sender, ElapsedEventArgs e)
{ {
ResetCounters(); // just to change the signature, and that ProcessThrottle
// will be used elsewhere possibly
// I was considering this.. Will an event fire if the thread it's on is blocked? ProcessThrottle();
// Then I figured out.. it doesn't really matter.. because this thread won't be blocked for long
// The General overhead of the UDP protocol gets sent to the queue un-throttled by this
// so This'll pick up about around the right time.
int MaxThrottleLoops = 4550; // 50*7 packets can be dequeued at once.
int throttleLoops = 0;
// We're going to dequeue all of the saved up packets until
// we've hit the throttle limit or there's no more packets to send
while (TotalThrottle.UnderLimit() && PacketsWaiting() &&
(throttleLoops <= MaxThrottleLoops))
{
throttleLoops++;
//Now comes the fun part.. we dump all our elements into PacketQueue that we've saved up.
if (ResendThrottle.UnderLimit() && ResendOutgoingPacketQueue.Count > 0)
{
QueItem qpack = ResendOutgoingPacketQueue.Dequeue();
SendQueue.Enqueue(qpack);
TotalThrottle.Add(qpack.Packet.ToBytes().Length);
ResendThrottle.Add(qpack.Packet.ToBytes().Length);
}
if (LandThrottle.UnderLimit() && LandOutgoingPacketQueue.Count > 0)
{
QueItem qpack = LandOutgoingPacketQueue.Dequeue();
SendQueue.Enqueue(qpack);
TotalThrottle.Add(qpack.Packet.ToBytes().Length);
LandThrottle.Add(qpack.Packet.ToBytes().Length);
}
if (WindThrottle.UnderLimit() && WindOutgoingPacketQueue.Count > 0)
{
QueItem qpack = WindOutgoingPacketQueue.Dequeue();
SendQueue.Enqueue(qpack);
TotalThrottle.Add(qpack.Packet.ToBytes().Length);
WindThrottle.Add(qpack.Packet.ToBytes().Length);
}
if (CloudThrottle.UnderLimit() && CloudOutgoingPacketQueue.Count > 0)
{
QueItem qpack = CloudOutgoingPacketQueue.Dequeue();
SendQueue.Enqueue(qpack);
TotalThrottle.Add(qpack.Packet.ToBytes().Length);
CloudThrottle.Add(qpack.Packet.ToBytes().Length);
}
if (TaskThrottle.UnderLimit() && TaskOutgoingPacketQueue.Count > 0)
{
QueItem qpack = TaskOutgoingPacketQueue.Dequeue();
SendQueue.Enqueue(qpack);
TotalThrottle.Add(qpack.Packet.ToBytes().Length);
TaskThrottle.Add(qpack.Packet.ToBytes().Length);
}
if (TextureThrottle.UnderLimit() && TextureOutgoingPacketQueue.Count > 0)
{
QueItem qpack = TextureOutgoingPacketQueue.Dequeue();
SendQueue.Enqueue(qpack);
TotalThrottle.Add(qpack.Packet.ToBytes().Length);
TextureThrottle.Add(qpack.Packet.ToBytes().Length);
}
if (AssetThrottle.UnderLimit() && AssetOutgoingPacketQueue.Count > 0)
{
QueItem qpack = AssetOutgoingPacketQueue.Dequeue();
SendQueue.Enqueue(qpack);
TotalThrottle.Add(qpack.Packet.ToBytes().Length);
AssetThrottle.Add(qpack.Packet.ToBytes().Length);
}
}
} }
private void ThrottleCheck(ref PacketThrottle throttle, ref Queue<QueItem> q, QueItem item) private void ThrottleCheck(ref PacketThrottle throttle, ref Queue<QueItem> q, QueItem item)
@ -372,9 +311,12 @@ namespace OpenSim.Region.ClientStack
if((q.Count == 0) && (throttle.UnderLimit())) if((q.Count == 0) && (throttle.UnderLimit()))
{ {
Monitor.Enter(this);
throttle.Add(item.Packet.ToBytes().Length); throttle.Add(item.Packet.ToBytes().Length);
TotalThrottle.Add(item.Packet.ToBytes().Length); TotalThrottle.Add(item.Packet.ToBytes().Length);
SendQueue.Enqueue(item); SendQueue.Enqueue(item);
Monitor.Pulse(this);
Monitor.Exit(this);
} }
else else
{ {