Committing Jim's optimization to replace the 20ms sleep in outgoing packet handling with an interruptible wait handle

prioritization
John Hurliman 2009-10-21 00:18:35 -07:00
parent 45dc4e0a54
commit cde47c2b3d
4 changed files with 120 additions and 49 deletions

View File

@ -3320,6 +3320,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// If we received an update about our own avatar, process the avatar update priority queue immediately // If we received an update about our own avatar, process the avatar update priority queue immediately
if (data.AgentID == m_agentId) if (data.AgentID == m_agentId)
ProcessAvatarTerseUpdates(); ProcessAvatarTerseUpdates();
else
m_udpServer.SignalOutgoingPacketHandler();
} }
private void ProcessAvatarTerseUpdates() private void ProcessAvatarTerseUpdates()
@ -3407,6 +3409,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
lock (m_primFullUpdates.SyncRoot) lock (m_primFullUpdates.SyncRoot)
m_primFullUpdates.Enqueue(data.priority, objectData, data.localID); m_primFullUpdates.Enqueue(data.priority, objectData, data.localID);
m_udpServer.SignalOutgoingPacketHandler();
} }
void ProcessPrimFullUpdates() void ProcessPrimFullUpdates()
@ -3450,6 +3454,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
lock (m_primTerseUpdates.SyncRoot) lock (m_primTerseUpdates.SyncRoot)
m_primTerseUpdates.Enqueue(data.Priority, objectData, data.LocalID); m_primTerseUpdates.Enqueue(data.Priority, objectData, data.LocalID);
m_udpServer.SignalOutgoingPacketHandler();
} }
void ProcessPrimTerseUpdates() void ProcessPrimTerseUpdates()

View File

@ -105,9 +105,13 @@ namespace OpenSim.Region.ClientStack.LindenUDP
public int TickLastPacketReceived; public int TickLastPacketReceived;
/// <summary>Environment.TickCount of the last time the outgoing packet handler executed for this client</summary> /// <summary>Environment.TickCount of the last time the outgoing packet handler executed for this client</summary>
public int TickLastOutgoingPacketHandler; public int TickLastOutgoingPacketHandler;
/// <summary>Keeps track of the number of elapsed milliseconds since the last time the outgoing packet handler executed for this client</summary>
public int ElapsedMSOutgoingPacketHandler;
/// <summary>Keeps track of the number of 100 millisecond periods elapsed in the outgoing packet handler executed for this client</summary>
public int Elapsed100MSOutgoingPacketHandler;
/// <summary>Keeps track of the number of 500 millisecond periods elapsed in the outgoing packet handler executed for this client</summary>
public int Elapsed500MSOutgoingPacketHandler;
/// <summary>Timer granularity. This is set to the measured resolution of Environment.TickCount</summary>
public readonly float G;
/// <summary>Smoothed round-trip time. A smoothed average of the round-trip time for sending a /// <summary>Smoothed round-trip time. A smoothed average of the round-trip time for sending a
/// reliable packet to the client and receiving an ACK</summary> /// reliable packet to the client and receiving an ACK</summary>
public float SRTT; public float SRTT;
@ -182,15 +186,14 @@ namespace OpenSim.Region.ClientStack.LindenUDP
m_throttleCategories[i] = new TokenBucket(m_throttle, rates.GetLimit(type), rates.GetRate(type)); m_throttleCategories[i] = new TokenBucket(m_throttle, rates.GetLimit(type), rates.GetRate(type));
} }
// Set the granularity variable used for retransmission calculations to
// the measured resolution of Environment.TickCount
G = server.TickCountResolution;
// Default the retransmission timeout to three seconds // Default the retransmission timeout to three seconds
RTO = 3000; RTO = 3000;
// Initialize this to a sane value to prevent early disconnects // Initialize this to a sane value to prevent early disconnects
TickLastPacketReceived = Environment.TickCount; TickLastPacketReceived = Environment.TickCount & Int32.MaxValue;
ElapsedMSOutgoingPacketHandler = 0;
Elapsed100MSOutgoingPacketHandler = 0;
Elapsed500MSOutgoingPacketHandler = 0;
} }
/// <summary> /// <summary>
@ -391,6 +394,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
{ {
// Not enough tokens in the bucket, queue this packet // Not enough tokens in the bucket, queue this packet
queue.Enqueue(packet); queue.Enqueue(packet);
m_udpServer.SignalOutgoingPacketHandler();
return true; return true;
} }
} }
@ -407,13 +411,15 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// </summary> /// </summary>
/// <remarks>This function is only called from a synchronous loop in the /// <remarks>This function is only called from a synchronous loop in the
/// UDPServer so we don't need to bother making this thread safe</remarks> /// UDPServer so we don't need to bother making this thread safe</remarks>
/// <returns>True if any packets were sent, otherwise false</returns> /// <returns>The minimum amount of time before the next packet
public bool DequeueOutgoing() /// can be sent to this client</returns>
public int DequeueOutgoing()
{ {
OutgoingPacket packet; OutgoingPacket packet;
OpenSim.Framework.LocklessQueue<OutgoingPacket> queue; OpenSim.Framework.LocklessQueue<OutgoingPacket> queue;
TokenBucket bucket; TokenBucket bucket;
bool packetSent = false; int dataLength;
int minTimeout = Int32.MaxValue;
//string queueDebugOutput = String.Empty; // Serious debug business //string queueDebugOutput = String.Empty; // Serious debug business
@ -428,12 +434,18 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// leaving a dequeued packet still waiting to be sent out. Try to // leaving a dequeued packet still waiting to be sent out. Try to
// send it again // send it again
OutgoingPacket nextPacket = m_nextPackets[i]; OutgoingPacket nextPacket = m_nextPackets[i];
if (bucket.RemoveTokens(nextPacket.Buffer.DataLength)) dataLength = nextPacket.Buffer.DataLength;
if (bucket.RemoveTokens(dataLength))
{ {
// Send the packet // Send the packet
m_udpServer.SendPacketFinal(nextPacket); m_udpServer.SendPacketFinal(nextPacket);
m_nextPackets[i] = null; m_nextPackets[i] = null;
packetSent = true; minTimeout = 0;
}
else if (minTimeout != 0)
{
// Check the minimum amount of time we would have to wait before this packet can be sent out
minTimeout = Math.Min(minTimeout, ((dataLength - bucket.Content) / bucket.DripPerMS) + 1);
} }
} }
else else
@ -445,16 +457,23 @@ namespace OpenSim.Region.ClientStack.LindenUDP
{ {
// A packet was pulled off the queue. See if we have // A packet was pulled off the queue. See if we have
// enough tokens in the bucket to send it out // enough tokens in the bucket to send it out
if (bucket.RemoveTokens(packet.Buffer.DataLength)) dataLength = packet.Buffer.DataLength;
if (bucket.RemoveTokens(dataLength))
{ {
// Send the packet // Send the packet
m_udpServer.SendPacketFinal(packet); m_udpServer.SendPacketFinal(packet);
packetSent = true; minTimeout = 0;
} }
else else
{ {
// Save the dequeued packet for the next iteration // Save the dequeued packet for the next iteration
m_nextPackets[i] = packet; m_nextPackets[i] = packet;
if (minTimeout != 0)
{
// Check the minimum amount of time we would have to wait before this packet can be sent out
minTimeout = Math.Min(minTimeout, ((dataLength - bucket.Content) / bucket.DripPerMS) + 1);
}
} }
// If the queue is empty after this dequeue, fire the queue // If the queue is empty after this dequeue, fire the queue
@ -473,7 +492,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
} }
//m_log.Info("[LLUDPCLIENT]: Queues: " + queueDebugOutput); // Serious debug business //m_log.Info("[LLUDPCLIENT]: Queues: " + queueDebugOutput); // Serious debug business
return packetSent; return minTimeout;
} }
/// <summary> /// <summary>
@ -504,7 +523,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
} }
// Always round retransmission timeout up to two seconds // Always round retransmission timeout up to two seconds
RTO = Math.Max(2000, (int)(SRTT + Math.Max(G, K * RTTVAR))); RTO = Math.Max(2000, (int)(SRTT + Math.Max(m_udpServer.TickCountResolution, K * RTTVAR)));
//m_log.Debug("[LLUDPCLIENT]: Setting agent " + this.Agent.FullName + "'s RTO to " + RTO + "ms with an RTTVAR of " + //m_log.Debug("[LLUDPCLIENT]: Setting agent " + this.Agent.FullName + "'s RTO to " + RTO + "ms with an RTTVAR of " +
// RTTVAR + " based on new RTT of " + r + "ms"); // RTTVAR + " based on new RTT of " + r + "ms");
} }

View File

@ -96,6 +96,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP
private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
/// <summary>The measured resolution of Environment.TickCount</summary>
public readonly float TickCountResolution;
/// <summary>Handlers for incoming packets</summary> /// <summary>Handlers for incoming packets</summary>
//PacketEventDictionary packetEvents = new PacketEventDictionary(); //PacketEventDictionary packetEvents = new PacketEventDictionary();
/// <summary>Incoming packets that are awaiting handling</summary> /// <summary>Incoming packets that are awaiting handling</summary>
@ -112,20 +115,19 @@ namespace OpenSim.Region.ClientStack.LindenUDP
private Scene m_scene; private Scene m_scene;
/// <summary>The X/Y coordinates of the scene this UDP server is attached to</summary> /// <summary>The X/Y coordinates of the scene this UDP server is attached to</summary>
private Location m_location; private Location m_location;
/// <summary>The measured resolution of Environment.TickCount</summary>
private float m_tickCountResolution;
/// <summary>The size of the receive buffer for the UDP socket. This value /// <summary>The size of the receive buffer for the UDP socket. This value
/// is passed up to the operating system and used in the system networking /// is passed up to the operating system and used in the system networking
/// stack. Use zero to leave this value as the default</summary> /// stack. Use zero to leave this value as the default</summary>
private int m_recvBufferSize; private int m_recvBufferSize;
/// <summary>Flag to process packets asynchronously or synchronously</summary> /// <summary>Flag to process packets asynchronously or synchronously</summary>
private bool m_asyncPacketHandling; private bool m_asyncPacketHandling;
/// <summary>Track whether or not a packet was sent in the /// <summary>Track the minimum amount of time needed to send the next packet in the
/// OutgoingPacketHandler loop so we know when to sleep</summary> /// OutgoingPacketHandler loop so we know when to sleep</summary>
private bool m_packetSentLastLoop; private int m_minTimeout = Int32.MaxValue;
/// <summary>EventWaitHandle to signify the outgoing packet handler thread that
/// there is more work to do</summary>
private EventWaitHandle m_outgoingWaitHandle;
/// <summary>The measured resolution of Environment.TickCount</summary>
public float TickCountResolution { get { return m_tickCountResolution; } }
public Socket Server { get { return null; } } public Socket Server { get { return null; } }
public LLUDPServer(IPAddress listenIP, ref uint port, int proxyPortOffsetParm, bool allow_alternate_port, IConfigSource configSource, AgentCircuitManager circuitManager) public LLUDPServer(IPAddress listenIP, ref uint port, int proxyPortOffsetParm, bool allow_alternate_port, IConfigSource configSource, AgentCircuitManager circuitManager)
@ -134,16 +136,17 @@ namespace OpenSim.Region.ClientStack.LindenUDP
#region Environment.TickCount Measurement #region Environment.TickCount Measurement
// Measure the resolution of Environment.TickCount // Measure the resolution of Environment.TickCount
m_tickCountResolution = 0f; TickCountResolution = 0f;
for (int i = 0; i < 5; i++) for (int i = 0; i < 5; i++)
{ {
int start = Environment.TickCount; int start = Environment.TickCount;
int now = start; int now = start;
while (now == start) while (now == start)
now = Environment.TickCount; now = Environment.TickCount;
m_tickCountResolution += (float)(now - start) * 0.2f; TickCountResolution += (float)(now - start) * 0.2f;
} }
m_log.Info("[LLUDPSERVER]: Average Environment.TickCount resolution: " + TickCountResolution + "ms"); m_log.Info("[LLUDPSERVER]: Average Environment.TickCount resolution: " + TickCountResolution + "ms");
TickCountResolution = (float)Math.Ceiling(TickCountResolution);
#endregion Environment.TickCount Measurement #endregion Environment.TickCount Measurement
@ -171,6 +174,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
base.Start(m_recvBufferSize, m_asyncPacketHandling); base.Start(m_recvBufferSize, m_asyncPacketHandling);
m_outgoingWaitHandle = new EventWaitHandle(false, EventResetMode.AutoReset);
// Start the incoming packet processing thread // Start the incoming packet processing thread
Thread incomingThread = new Thread(IncomingPacketHandler); Thread incomingThread = new Thread(IncomingPacketHandler);
incomingThread.Name = "Incoming Packets (" + m_scene.RegionInfo.RegionName + ")"; incomingThread.Name = "Incoming Packets (" + m_scene.RegionInfo.RegionName + ")";
@ -185,6 +190,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
{ {
m_log.Info("[LLUDPSERVER]: Shutting down the LLUDP server for " + m_scene.RegionInfo.RegionName); m_log.Info("[LLUDPSERVER]: Shutting down the LLUDP server for " + m_scene.RegionInfo.RegionName);
base.Stop(); base.Stop();
m_outgoingWaitHandle.Close();
} }
public void AddScene(IScene scene) public void AddScene(IScene scene)
@ -768,6 +775,11 @@ namespace OpenSim.Region.ClientStack.LindenUDP
packetInbox.Clear(); packetInbox.Clear();
} }
public bool SignalOutgoingPacketHandler()
{
return m_outgoingWaitHandle.Set();
}
private void OutgoingPacketHandler() private void OutgoingPacketHandler()
{ {
// Set this culture for the thread that outgoing packets are sent // Set this culture for the thread that outgoing packets are sent
@ -778,14 +790,28 @@ namespace OpenSim.Region.ClientStack.LindenUDP
{ {
try try
{ {
m_packetSentLastLoop = false; m_minTimeout = Int32.MaxValue;
// Handle outgoing packets, resends, acknowledgements, and pings for each
// client. m_minTimeout will be set to 0 if more packets are waiting in the
// queues with bandwidth to spare, or the number of milliseconds we need to
// wait before at least one packet can be sent to a client
m_scene.ClientManager.ForEachSync(ClientOutgoingPacketHandler); m_scene.ClientManager.ForEachSync(ClientOutgoingPacketHandler);
// If no packets at all were sent, sleep to avoid chewing up CPU cycles // Can't wait for a negative amount of time, and put a 100ms ceiling on our
// when there is nothing to do // maximum wait time
if (!m_packetSentLastLoop) m_minTimeout = Utils.Clamp(m_minTimeout, 0, 100);
Thread.Sleep(20);
if (m_minTimeout > 0)
{
// Don't bother waiting for a shorter interval than our TickCountResolution
// since the token buckets wouldn't update anyways
m_minTimeout = Math.Max(m_minTimeout, (int)TickCountResolution);
// Wait for someone to signal that packets are ready to be sent, or for our
// sleep interval to expire
m_outgoingWaitHandle.WaitOne(m_minTimeout);
}
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -802,32 +828,48 @@ namespace OpenSim.Region.ClientStack.LindenUDP
{ {
LLUDPClient udpClient = ((LLClientView)client).UDPClient; LLUDPClient udpClient = ((LLClientView)client).UDPClient;
// Update ElapsedMSOutgoingPacketHandler
int thisTick = Environment.TickCount & Int32.MaxValue; int thisTick = Environment.TickCount & Int32.MaxValue;
int elapsedMS = thisTick - udpClient.TickLastOutgoingPacketHandler; if (udpClient.TickLastOutgoingPacketHandler > thisTick)
udpClient.ElapsedMSOutgoingPacketHandler += ((Int32.MaxValue - udpClient.TickLastOutgoingPacketHandler) + thisTick);
else
udpClient.ElapsedMSOutgoingPacketHandler += (thisTick - udpClient.TickLastOutgoingPacketHandler);
if (udpClient.IsConnected) if (udpClient.IsConnected)
{ {
// Check for pending outgoing resends every 100ms // Check for pending outgoing resends every 100ms
if (elapsedMS >= 100) if (udpClient.ElapsedMSOutgoingPacketHandler >= 100)
{ {
ResendUnacked(udpClient); ResendUnacked(udpClient);
udpClient.ElapsedMSOutgoingPacketHandler -= 100;
udpClient.Elapsed100MSOutgoingPacketHandler += 1;
}
// Check for pending outgoing ACKs every 500ms // Check for pending outgoing ACKs every 500ms
if (elapsedMS >= 500) if (udpClient.Elapsed100MSOutgoingPacketHandler >= 5)
{ {
SendAcks(udpClient); SendAcks(udpClient);
udpClient.Elapsed100MSOutgoingPacketHandler -= 5;
udpClient.Elapsed500MSOutgoingPacketHandler += 1;
}
// Send pings to clients every 5000ms // Send pings to clients every 5000ms
if (elapsedMS >= 5000) if (udpClient.Elapsed500MSOutgoingPacketHandler >= 10)
{ {
SendPing(udpClient); SendPing(udpClient);
} udpClient.Elapsed500MSOutgoingPacketHandler -= 10;
}
} }
// Dequeue any outgoing packets that are within the throttle limits // Dequeue any outgoing packets that are within the throttle limits
if (udpClient.DequeueOutgoing()) // and get the minimum time we would have to sleep before this client
m_packetSentLastLoop = true; // could send a packet out
int minTimeoutThisLoop = udpClient.DequeueOutgoing();
// Although this is not thread safe, it is cheaper than locking and the
// worst that will happen is we sleep for slightly longer than the
// minimum necessary interval
if (minTimeoutThisLoop < m_minTimeout)
m_minTimeout = minTimeoutThisLoop;
} }
udpClient.TickLastOutgoingPacketHandler = thisTick; udpClient.TickLastOutgoingPacketHandler = thisTick;

View File

@ -97,6 +97,14 @@ namespace OpenSim.Region.ClientStack.LindenUDP
} }
} }
/// <summary>
/// The speed limit of this bucket in bytes per millisecond
/// </summary>
public int DripPerMS
{
get { return tokensPerMS; }
}
/// <summary> /// <summary>
/// The number of bytes that can be sent at this moment. This is the /// The number of bytes that can be sent at this moment. This is the
/// current number of tokens in the bucket /// current number of tokens in the bucket
@ -106,11 +114,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// </summary> /// </summary>
public int Content public int Content
{ {
get get { return content; }
{
Drip();
return content;
}
} }
#endregion Properties #endregion Properties
@ -182,7 +186,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// call to Drip /// call to Drip
/// </summary> /// </summary>
/// <returns>True if tokens were added to the bucket, otherwise false</returns> /// <returns>True if tokens were added to the bucket, otherwise false</returns>
private bool Drip() public bool Drip()
{ {
if (tokensPerMS == 0) if (tokensPerMS == 0)
{ {