* Try/catch around EndInvoke() when Util.FireAndForget() returns to catch exceptions thrown in the async method

* Added packet stats handling to the new LLUDP implementation
* Attempting to avoid a race condition when creating a new LLUDPClient
prioritization
John Hurliman 2009-10-06 10:12:59 -07:00
parent 2519f071f2
commit fb19d1ca0a
4 changed files with 115 additions and 79 deletions

View File

@ -1283,7 +1283,9 @@ namespace OpenSim.Framework
{ {
System.Threading.WaitCallback callback = (System.Threading.WaitCallback)ar.AsyncState; System.Threading.WaitCallback callback = (System.Threading.WaitCallback)ar.AsyncState;
callback.EndInvoke(ar); try { callback.EndInvoke(ar); }
catch (Exception ex) { m_log.Error("[UTIL]: Asynchronous method threw an exception: " + ex.Message, ex); }
ar.AsyncWaitHandle.Close(); ar.AsyncWaitHandle.Close();
} }

View File

@ -186,8 +186,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
m_udpServer = udpServer; m_udpServer = udpServer;
m_udpClient = udpClient; m_udpClient = udpClient;
m_udpClient.OnQueueEmpty += HandleQueueEmpty; m_udpClient.OnQueueEmpty += HandleQueueEmpty;
// FIXME: Implement this m_udpClient.OnPacketStats += PopulateStats;
//m_udpClient.OnPacketStats += PopulateStats;
RegisterLocalPacketHandlers(); RegisterLocalPacketHandlers();
} }

View File

@ -33,6 +33,7 @@ using OpenMetaverse;
namespace OpenSim.Region.ClientStack.LindenUDP namespace OpenSim.Region.ClientStack.LindenUDP
{ {
public delegate void PacketStats(int inPackets, int outPackets, int unAckedBytes);
public delegate void QueueEmpty(ThrottleOutPacketType category); public delegate void QueueEmpty(ThrottleOutPacketType category);
public class LLUDPClient public class LLUDPClient
@ -41,6 +42,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// or removed, this number must also change</summary> /// or removed, this number must also change</summary>
const int THROTTLE_CATEGORY_COUNT = 7; const int THROTTLE_CATEGORY_COUNT = 7;
public event PacketStats OnPacketStats;
public event QueueEmpty OnQueueEmpty; public event QueueEmpty OnQueueEmpty;
/// <summary>AgentID for this client</summary> /// <summary>AgentID for this client</summary>
@ -84,6 +86,17 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// <summary>Number of bytes received since the last acknowledgement was sent out. This is used /// <summary>Number of bytes received since the last acknowledgement was sent out. This is used
/// to loosely follow the TCP delayed ACK algorithm in RFC 1122 (4.2.3.2)</summary> /// to loosely follow the TCP delayed ACK algorithm in RFC 1122 (4.2.3.2)</summary>
public int BytesSinceLastACK; public int BytesSinceLastACK;
/// <summary>Number of packets received from this client</summary>
public int PacketsReceived;
/// <summary>Number of packets sent to this client</summary>
public int PacketsSent;
/// <summary>Total byte count of unacked packets sent to this client</summary>
public int UnackedBytes;
/// <summary>Total number of received packets that we have reported to the OnPacketStats event(s)</summary>
private int m_packetsReceivedReported;
/// <summary>Total number of sent packets that we have reported to the OnPacketStats event(s)</summary>
private int m_packetsSentReported;
/// <summary>Throttle bucket for this agent's connection</summary> /// <summary>Throttle bucket for this agent's connection</summary>
private readonly TokenBucket throttle; private readonly TokenBucket throttle;
@ -162,17 +175,24 @@ namespace OpenSim.Region.ClientStack.LindenUDP
public string GetStats() public string GetStats()
{ {
// TODO: ???
return string.Format("{0,7} {1,7} {2,7} {3,7} {4,7} {5,7} {6,7} {7,7} {8,7} {9,7}", return string.Format("{0,7} {1,7} {2,7} {3,7} {4,7} {5,7} {6,7} {7,7} {8,7} {9,7}",
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
0, }
0,
0, public void SendPacketStats()
0, {
0, PacketStats callback = OnPacketStats;
0, if (callback != null)
0, {
0, int newPacketsReceived = PacketsReceived - m_packetsReceivedReported;
0); int newPacketsSent = PacketsSent - m_packetsSentReported;
callback(newPacketsReceived, newPacketsSent, UnackedBytes);
m_packetsReceivedReported += newPacketsReceived;
m_packetsSentReported += newPacketsSent;
}
} }
public void SetThrottles(byte[] throttleData) public void SetThrottles(byte[] throttleData)

View File

@ -359,6 +359,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// is actually sent out again // is actually sent out again
outgoingPacket.TickCount = 0; outgoingPacket.TickCount = 0;
// Bump up the resend count on this packet
Interlocked.Increment(ref outgoingPacket.ResendCount); Interlocked.Increment(ref outgoingPacket.ResendCount);
//Interlocked.Increment(ref Stats.ResentPackets); //Interlocked.Increment(ref Stats.ResentPackets);
@ -393,6 +394,68 @@ namespace OpenSim.Region.ClientStack.LindenUDP
public void Flush() public void Flush()
{ {
// FIXME: Implement?
}
internal void SendPacketFinal(OutgoingPacket outgoingPacket)
{
UDPPacketBuffer buffer = outgoingPacket.Buffer;
byte flags = buffer.Data[0];
bool isResend = (flags & Helpers.MSG_RESENT) != 0;
bool isReliable = (flags & Helpers.MSG_RELIABLE) != 0;
LLUDPClient client = outgoingPacket.Client;
// Keep track of when this packet was sent out (right now)
outgoingPacket.TickCount = Environment.TickCount;
#region ACK Appending
int dataLength = buffer.DataLength;
// Keep appending ACKs until there is no room left in the packet or there are
// no more ACKs to append
uint ackCount = 0;
uint ack;
while (dataLength + 5 < buffer.Data.Length && client.PendingAcks.Dequeue(out ack))
{
Utils.UIntToBytesBig(ack, buffer.Data, dataLength);
dataLength += 4;
++ackCount;
}
if (ackCount > 0)
{
// Set the last byte of the packet equal to the number of appended ACKs
buffer.Data[dataLength++] = (byte)ackCount;
// Set the appended ACKs flag on this packet
buffer.Data[0] = (byte)(buffer.Data[0] | Helpers.MSG_APPENDED_ACKS);
}
buffer.DataLength = dataLength;
#endregion ACK Appending
if (!isResend)
{
// Not a resend, assign a new sequence number
uint sequenceNumber = (uint)Interlocked.Increment(ref client.CurrentSequence);
Utils.UIntToBytesBig(sequenceNumber, buffer.Data, 1);
outgoingPacket.SequenceNumber = sequenceNumber;
if (isReliable)
{
// Add this packet to the list of ACK responses we are waiting on from the server
client.NeedAcks.Add(outgoingPacket);
}
}
// Stats tracking
Interlocked.Increment(ref client.PacketsSent);
if (isReliable)
Interlocked.Add(ref client.UnackedBytes, outgoingPacket.Buffer.DataLength);
// Put the UDP payload on the wire
AsyncBeginSend(buffer);
} }
protected override void PacketReceived(UDPPacketBuffer buffer) protected override void PacketReceived(UDPPacketBuffer buffer)
@ -456,8 +519,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
#endregion UseCircuitCode Handling #endregion UseCircuitCode Handling
//if (packet.Header.Resent) // Stats tracking
// Interlocked.Increment(ref Stats.ReceivedResends); Interlocked.Increment(ref client.PacketsReceived);
#region ACK Receiving #region ACK Receiving
@ -581,7 +644,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
{ {
// Create the LLUDPClient // Create the LLUDPClient
LLUDPClient client = new LLUDPClient(this, m_throttleRates, m_throttle, circuitCode, agentID, remoteEndPoint); LLUDPClient client = new LLUDPClient(this, m_throttleRates, m_throttle, circuitCode, agentID, remoteEndPoint);
clients.Add(agentID, client.RemoteEndPoint, client);
// Create the LLClientView // Create the LLClientView
LLClientView clientApi = new LLClientView(remoteEndPoint, m_scene, this, client, sessionInfo, agentID, sessionID, circuitCode); LLClientView clientApi = new LLClientView(remoteEndPoint, m_scene, this, client, sessionInfo, agentID, sessionID, circuitCode);
@ -589,12 +651,15 @@ namespace OpenSim.Region.ClientStack.LindenUDP
clientApi.OnLogout += LogoutHandler; clientApi.OnLogout += LogoutHandler;
clientApi.OnConnectionClosed += RemoveClient; clientApi.OnConnectionClosed += RemoveClient;
// Give LLUDPClient a reference to IClientAPI
client.ClientAPI = clientApi;
// Start the IClientAPI // Start the IClientAPI
m_scene.ClientManager.Add(circuitCode, clientApi); m_scene.ClientManager.Add(circuitCode, clientApi);
clientApi.Start(); clientApi.Start();
// Give LLUDPClient a reference to IClientAPI
client.ClientAPI = clientApi;
// Add the new client to our list of tracked clients
clients.Add(agentID, client.RemoteEndPoint, client);
} }
private void AcknowledgePacket(LLUDPClient client, uint ack, int currentTime, bool fromResend) private void AcknowledgePacket(LLUDPClient client, uint ack, int currentTime, bool fromResend)
@ -602,6 +667,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP
OutgoingPacket ackedPacket; OutgoingPacket ackedPacket;
if (client.NeedAcks.RemoveUnsafe(ack, out ackedPacket) && !fromResend) if (client.NeedAcks.RemoveUnsafe(ack, out ackedPacket) && !fromResend)
{ {
// Update stats
Interlocked.Add(ref client.UnackedBytes, -ackedPacket.Buffer.DataLength);
// Calculate the round-trip time for this packet and its ACK // Calculate the round-trip time for this packet and its ACK
int rtt = currentTime - ackedPacket.TickCount; int rtt = currentTime - ackedPacket.TickCount;
if (rtt > 0) if (rtt > 0)
@ -650,7 +718,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
StatsManager.SimExtraStats.AddAbnormalClientThreadTermination(); StatsManager.SimExtraStats.AddAbnormalClientThreadTermination();
// Don't let a failure in an individual client thread crash the whole sim. // Don't let a failure in an individual client thread crash the whole sim.
m_log.ErrorFormat("[LLUDPSERVER]: Client thread for {0} {1} crashed. Logging them out", client.ClientAPI.Name, client.AgentID); m_log.ErrorFormat("[LLUDPSERVER]: Client thread for {0} crashed. Logging them out", client.AgentID);
m_log.Error(e.Message, e); m_log.Error(e.Message, e);
try try
@ -674,7 +742,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
} }
catch (Exception e2) catch (Exception e2)
{ {
m_log.Error("[LLUDPSERVER]: Further exception thrown on forced session logout for " + client.ClientAPI.Name); m_log.Error("[LLUDPSERVER]: Further exception thrown on forced session logout for " + client.AgentID);
m_log.Error(e2.Message, e2); m_log.Error(e2.Message, e2);
} }
} }
@ -715,8 +783,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
elapsed100MS = 0; elapsed100MS = 0;
++elapsed500MS; ++elapsed500MS;
} }
// Send pings to clients every 2000ms // Send pings to clients every 5000ms
if (elapsed500MS >= 4) if (elapsed500MS >= 10)
{ {
sendPings = true; sendPings = true;
elapsed500MS = 0; elapsed500MS = 0;
@ -730,7 +798,10 @@ namespace OpenSim.Region.ClientStack.LindenUDP
if (resendUnacked) if (resendUnacked)
ResendUnacked(client); ResendUnacked(client);
if (sendAcks) if (sendAcks)
{
SendAcks(client); SendAcks(client);
client.SendPacketStats();
}
if (sendPings) if (sendPings)
SendPing(client); SendPing(client);
} }
@ -746,61 +817,5 @@ namespace OpenSim.Region.ClientStack.LindenUDP
client.SendLogoutPacket(); client.SendLogoutPacket();
RemoveClient(client); RemoveClient(client);
} }
internal void SendPacketFinal(OutgoingPacket outgoingPacket)
{
UDPPacketBuffer buffer = outgoingPacket.Buffer;
byte flags = buffer.Data[0];
bool isResend = (flags & Helpers.MSG_RESENT) != 0;
bool isReliable = (flags & Helpers.MSG_RELIABLE) != 0;
LLUDPClient client = outgoingPacket.Client;
// Keep track of when this packet was sent out (right now)
outgoingPacket.TickCount = Environment.TickCount;
#region ACK Appending
int dataLength = buffer.DataLength;
// Keep appending ACKs until there is no room left in the packet or there are
// no more ACKs to append
uint ackCount = 0;
uint ack;
while (dataLength + 5 < buffer.Data.Length && client.PendingAcks.Dequeue(out ack))
{
Utils.UIntToBytesBig(ack, buffer.Data, dataLength);
dataLength += 4;
++ackCount;
}
if (ackCount > 0)
{
// Set the last byte of the packet equal to the number of appended ACKs
buffer.Data[dataLength++] = (byte)ackCount;
// Set the appended ACKs flag on this packet
buffer.Data[0] = (byte)(buffer.Data[0] | Helpers.MSG_APPENDED_ACKS);
}
buffer.DataLength = dataLength;
#endregion ACK Appending
if (!isResend)
{
// Not a resend, assign a new sequence number
uint sequenceNumber = (uint)Interlocked.Increment(ref client.CurrentSequence);
Utils.UIntToBytesBig(sequenceNumber, buffer.Data, 1);
outgoingPacket.SequenceNumber = sequenceNumber;
if (isReliable)
{
// Add this packet to the list of ACK responses we are waiting on from the server
client.NeedAcks.Add(outgoingPacket);
}
}
// Put the UDP payload on the wire
AsyncBeginSend(buffer);
}
} }
} }