issues with udp buffers pool on heavy load

0.9.1.0-post-fixes
UbitUmarov 2019-02-26 15:02:57 +00:00
parent e24adb9ea1
commit 4de5e14e54
4 changed files with 53 additions and 42 deletions

View File

@ -650,6 +650,12 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// leaving a dequeued packet still waiting to be sent out. Try to // leaving a dequeued packet still waiting to be sent out. Try to
// send it again // send it again
OutgoingPacket nextPacket = m_nextPackets[i]; OutgoingPacket nextPacket = m_nextPackets[i];
if(nextPacket.Buffer == null)
{
if (m_packetOutboxes[i].Count < 5)
emptyCategories |= CategoryToFlag(i);
continue;
}
if (bucket.RemoveTokens(nextPacket.Buffer.DataLength)) if (bucket.RemoveTokens(nextPacket.Buffer.DataLength))
{ {
// Send the packet // Send the packet
@ -681,21 +687,29 @@ namespace OpenSim.Region.ClientStack.LindenUDP
{ {
// A packet was pulled off the queue. See if we have // A packet was pulled off the queue. See if we have
// enough tokens in the bucket to send it out // enough tokens in the bucket to send it out
if (bucket.RemoveTokens(packet.Buffer.DataLength)) if(packet.Buffer == null)
{ {
// Send the packet // packet canceled elsewhere (by a ack for example)
m_udpServer.SendPacketFinal(packet);
packetSent = true;
if (queue.Count < 5) if (queue.Count < 5)
emptyCategories |= CategoryToFlag(i); emptyCategories |= CategoryToFlag(i);
} }
else else
{ {
// Save the dequeued packet for the next iteration if (bucket.RemoveTokens(packet.Buffer.DataLength))
m_nextPackets[i] = packet; {
} // Send the packet
m_udpServer.SendPacketFinal(packet);
packetSent = true;
if (queue.Count < 5)
emptyCategories |= CategoryToFlag(i);
}
else
{
// Save the dequeued packet for the next iteration
m_nextPackets[i] = packet;
}
}
} }
else else
{ {

View File

@ -983,7 +983,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
{ {
LLUDPClient udpClient = client.UDPClient; LLUDPClient udpClient = client.UDPClient;
if (!udpClient.IsConnected) if (!client.IsActive || !udpClient.IsConnected)
return; return;
// Disconnect an agent if no packets are received for some time // Disconnect an agent if no packets are received for some time
@ -1053,14 +1053,16 @@ namespace OpenSim.Region.ClientStack.LindenUDP
internal void SendPacketFinal(OutgoingPacket outgoingPacket) internal void SendPacketFinal(OutgoingPacket outgoingPacket)
{ {
UDPPacketBuffer buffer = outgoingPacket.Buffer; UDPPacketBuffer buffer = outgoingPacket.Buffer;
if(buffer == null) // canceled packet
return;
LLUDPClient udpClient = outgoingPacket.Client;
if (!udpClient.IsConnected)
return;
byte flags = buffer.Data[0]; byte flags = buffer.Data[0];
bool isResend = (flags & Helpers.MSG_RESENT) != 0; bool isResend = (flags & Helpers.MSG_RESENT) != 0;
bool isReliable = (flags & Helpers.MSG_RELIABLE) != 0; bool isReliable = (flags & Helpers.MSG_RELIABLE) != 0;
bool isZerocoded = (flags & Helpers.MSG_ZEROCODED) != 0; bool isZerocoded = (flags & Helpers.MSG_ZEROCODED) != 0;
LLUDPClient udpClient = outgoingPacket.Client;
if (!udpClient.IsConnected)
return;
int dataLength = buffer.DataLength; int dataLength = buffer.DataLength;
@ -1916,7 +1918,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
{ {
Scene.ThreadAlive(2); Scene.ThreadAlive(2);
try try
{ {
m_packetSent = false; m_packetSent = false;
@ -1971,7 +1972,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
} }
else if (!m_packetSent) else if (!m_packetSent)
// Thread.Sleep((int)TickCountResolution); outch this is bad on linux // Thread.Sleep((int)TickCountResolution); outch this is bad on linux
Thread.Sleep(15); // match the 16ms of windows7, dont ask 16 or win may decide to do 32ms. Thread.Sleep(15); // match the 16ms of windows, dont ask 16 or win may decide to do 32ms.
Watchdog.UpdateThread(); Watchdog.UpdateThread();
} }
@ -1995,14 +1996,17 @@ namespace OpenSim.Region.ClientStack.LindenUDP
if (udpClient.IsConnected) if (udpClient.IsConnected)
{ {
if (m_resendUnacked) if (client.IsActive && m_resendUnacked)
HandleUnacked(llClient); HandleUnacked(llClient);
if (m_sendAcks) if (client.IsActive)
SendAcks(udpClient); {
if (m_sendAcks)
SendAcks(udpClient);
if (m_sendPing) if (m_sendPing)
SendPing(udpClient); SendPing(udpClient);
}
// Dequeue any outgoing packets that are within the throttle limits // Dequeue any outgoing packets that are within the throttle limits
if (udpClient.DequeueOutgoing()) if (udpClient.DequeueOutgoing())
@ -2015,7 +2019,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
m_log.Error( m_log.Error(
string.Format("[LLUDPSERVER]: OutgoingPacketHandler iteration for {0} threw ", client.Name), ex); string.Format("[LLUDPSERVER]: OutgoingPacketHandler iteration for {0} threw ", client.Name), ex);
} }
client = null;
} }
#region Emergency Monitoring #region Emergency Monitoring

View File

@ -343,14 +343,12 @@ namespace OpenMetaverse
{ {
// kick off an async read // kick off an async read
m_udpSocket.BeginReceiveFrom( m_udpSocket.BeginReceiveFrom(
//wrappedBuffer.Instance.Data,
buf.Data, buf.Data,
0, 0,
UDPPacketBuffer.BUFFER_SIZE, buf.Data.Length,
SocketFlags.None, SocketFlags.None,
ref buf.RemoteEndPoint, ref buf.RemoteEndPoint,
AsyncEndReceive, AsyncEndReceive,
//wrappedBuffer);
buf); buf);
} }
catch (SocketException e) catch (SocketException e)
@ -364,14 +362,12 @@ namespace OpenMetaverse
try try
{ {
m_udpSocket.BeginReceiveFrom( m_udpSocket.BeginReceiveFrom(
//wrappedBuffer.Instance.Data,
buf.Data, buf.Data,
0, 0,
UDPPacketBuffer.BUFFER_SIZE, buf.Data.Length,
SocketFlags.None, SocketFlags.None,
ref buf.RemoteEndPoint, ref buf.RemoteEndPoint,
AsyncEndReceive, AsyncEndReceive,
//wrappedBuffer);
buf); buf);
salvaged = true; salvaged = true;
} }
@ -382,11 +378,6 @@ namespace OpenMetaverse
m_log.Warn("[UDPBASE]: Salvaged the UDP listener on port " + m_udpPort); m_log.Warn("[UDPBASE]: Salvaged the UDP listener on port " + m_udpPort);
} }
} }
catch (ObjectDisposedException e)
{
m_log.Error(
string.Format("[UDPBASE]: Error processing UDP begin receive {0}. Exception ", UdpReceives), e);
}
catch (Exception e) catch (Exception e)
{ {
m_log.Error( m_log.Error(
@ -443,11 +434,6 @@ namespace OpenMetaverse
UdpReceives, se.ErrorCode), UdpReceives, se.ErrorCode),
se); se);
} }
catch (ObjectDisposedException e)
{
m_log.Error(
string.Format("[UDPBASE]: Error processing UDP end receive {0}. Exception ", UdpReceives), e);
}
catch (Exception e) catch (Exception e)
{ {
m_log.Error( m_log.Error(
@ -502,6 +488,8 @@ namespace OpenMetaverse
*/ */
public void SyncSend(UDPPacketBuffer buf) public void SyncSend(UDPPacketBuffer buf)
{ {
if(buf.RemoteEndPoint == null)
return; // was already expired
try try
{ {
m_udpSocket.SendTo( m_udpSocket.SendTo(
@ -515,7 +503,7 @@ namespace OpenMetaverse
} }
catch (SocketException e) catch (SocketException e)
{ {
m_log.Warn("[UDPBASE]: sync send SocketException {0} " + e.Message); m_log.WarnFormat("[UDPBASE]: sync send SocketException {0} {1}", buf.RemoteEndPoint, e.Message);
} }
catch (ObjectDisposedException) { } catch (ObjectDisposedException) { }
} }

View File

@ -189,8 +189,10 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// Process all the pending adds // Process all the pending adds
OutgoingPacket pendingAdd; OutgoingPacket pendingAdd;
while (m_pendingAdds.TryDequeue(out pendingAdd)) while (m_pendingAdds.TryDequeue(out pendingAdd))
{
if (pendingAdd != null) if (pendingAdd != null)
m_packets[pendingAdd.SequenceNumber] = pendingAdd; m_packets[pendingAdd.SequenceNumber] = pendingAdd;
}
// Process all the pending removes, including updating statistics and round-trip times // Process all the pending removes, including updating statistics and round-trip times
PendingAck pendingAcknowledgement; PendingAck pendingAcknowledgement;
@ -203,15 +205,17 @@ namespace OpenSim.Region.ClientStack.LindenUDP
if (ackedPacket != null) if (ackedPacket != null)
{ {
m_packets.Remove(pendingAcknowledgement.SequenceNumber); m_packets.Remove(pendingAcknowledgement.SequenceNumber);
// Update stats
Interlocked.Add(ref ackedPacket.Client.UnackedBytes, -ackedPacket.Buffer.DataLength);
ackedPacket.Client.FreeUDPBuffer(ackedPacket.Buffer); ackedPacket.Client.FreeUDPBuffer(ackedPacket.Buffer);
ackedPacket.Buffer = null;
// As with other network applications, assume that an acknowledged packet is an // As with other network applications, assume that an acknowledged packet is an
// indication that the network can handle a little more load, speed up the transmission // indication that the network can handle a little more load, speed up the transmission
ackedPacket.Client.FlowThrottle.AcknowledgePackets(1); ackedPacket.Client.FlowThrottle.AcknowledgePackets(1);
// Update stats
Interlocked.Add(ref ackedPacket.Client.UnackedBytes, -ackedPacket.Buffer.DataLength);
if (!pendingAcknowledgement.FromResend) if (!pendingAcknowledgement.FromResend)
{ {
// Calculate the round-trip time for this packet and its ACK // Calculate the round-trip time for this packet and its ACK
@ -242,10 +246,12 @@ namespace OpenSim.Region.ClientStack.LindenUDP
if (removedPacket != null) if (removedPacket != null)
{ {
m_packets.Remove(pendingRemove); m_packets.Remove(pendingRemove);
removedPacket.Client.FreeUDPBuffer(removedPacket.Buffer);
// Update stats // Update stats
Interlocked.Add(ref removedPacket.Client.UnackedBytes, -removedPacket.Buffer.DataLength); Interlocked.Add(ref removedPacket.Client.UnackedBytes, -removedPacket.Buffer.DataLength);
removedPacket.Client.FreeUDPBuffer(removedPacket.Buffer);
removedPacket.Buffer = null;
} }
} }
} }