changes on lludp acks and resends

0.9.1.1
UbitUmarov 2019-12-18 23:26:13 +00:00
parent 7516288634
commit 066a6fbaa1
5 changed files with 196 additions and 181 deletions

View File

@ -411,7 +411,7 @@ namespace OpenSim.Region.ClientStack.Linden
{ {
try try
{ {
// m_log.Debug("[CAPS]: ScriptTaskInventory Request in region: " + m_regionName); //m_log.Debug("[CAPS]: ScriptTaskInventory Request in region: " + m_regionName);
//m_log.DebugFormat("[CAPS]: request: {0}, path: {1}, param: {2}", request, path, param); //m_log.DebugFormat("[CAPS]: request: {0}, path: {1}, param: {2}", request, path, param);
Hashtable hash = (Hashtable)LLSD.LLSDDeserialize(Utils.StringToBytes(request)); Hashtable hash = (Hashtable)LLSD.LLSDDeserialize(Utils.StringToBytes(request));

View File

@ -4482,7 +4482,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
Utils.FloatToBytesSafepos(hover, data, pos); pos += 4; Utils.FloatToBytesSafepos(hover, data, pos); pos += 4;
buf.DataLength = pos; buf.DataLength = pos;
m_udpServer.SendUDPPacket(m_udpClient, buf, ThrottleOutPacketType.Task | ThrottleOutPacketType.HighPriority, null, false, true); m_udpServer.SendUDPPacket(m_udpClient, buf, ThrottleOutPacketType.Task | ThrottleOutPacketType.HighPriority, null, true);
} }
static private readonly byte[] AvatarAnimationHeader = new byte[] { static private readonly byte[] AvatarAnimationHeader = new byte[] {
@ -4638,7 +4638,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
CreateImprovedTerseBlock(ent, buf.Data, ref pos, false); CreateImprovedTerseBlock(ent, buf.Data, ref pos, false);
buf.DataLength = pos; buf.DataLength = pos;
m_udpServer.SendUDPPacket(m_udpClient, buf, ThrottleOutPacketType.Task, null, false, true); m_udpServer.SendUDPPacket(m_udpClient, buf, ThrottleOutPacketType.Task, null, true);
} }
//UUID m_courseLocationPrey = UUID.Zero; //UUID m_courseLocationPrey = UUID.Zero;
@ -5197,7 +5197,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
m_udpServer.SendUDPPacket(m_udpClient, buf, ThrottleOutPacketType.Task, m_udpServer.SendUDPPacket(m_udpClient, buf, ThrottleOutPacketType.Task,
//delegate (OutgoingPacket oPacket) { ResendPrimUpdates(tau, oPacket); }, false, false); //delegate (OutgoingPacket oPacket) { ResendPrimUpdates(tau, oPacket); }, false, false);
null, false, false); null, false);
buf = newbuf; buf = newbuf;
zc.Data = buf.Data; zc.Data = buf.Data;
@ -5221,7 +5221,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
buf.DataLength = zc.Finish(); buf.DataLength = zc.Finish();
m_udpServer.SendUDPPacket(m_udpClient, buf, ThrottleOutPacketType.Task, m_udpServer.SendUDPPacket(m_udpClient, buf, ThrottleOutPacketType.Task,
//delegate (OutgoingPacket oPacket) { ResendPrimUpdates(tau, oPacket); }, false, false); //delegate (OutgoingPacket oPacket) { ResendPrimUpdates(tau, oPacket); }, false, false);
null, false, false); null, false);
} }
} }
@ -5359,7 +5359,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
m_udpServer.SendUDPPacket(m_udpClient, buf, ThrottleOutPacketType.Task, m_udpServer.SendUDPPacket(m_udpClient, buf, ThrottleOutPacketType.Task,
//delegate (OutgoingPacket oPacket) { ResendPrimUpdates(tau, oPacket); }, false, false); //delegate (OutgoingPacket oPacket) { ResendPrimUpdates(tau, oPacket); }, false, false);
null, false, false); null, false);
buf = newbuf; buf = newbuf;
zc.Data = buf.Data; zc.Data = buf.Data;
@ -5383,7 +5383,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
buf.DataLength = zc.Finish(); buf.DataLength = zc.Finish();
m_udpServer.SendUDPPacket(m_udpClient, buf, ThrottleOutPacketType.Task, m_udpServer.SendUDPPacket(m_udpClient, buf, ThrottleOutPacketType.Task,
//delegate (OutgoingPacket oPacket) { ResendPrimUpdates(tau, oPacket); }, false, false); //delegate (OutgoingPacket oPacket) { ResendPrimUpdates(tau, oPacket); }, false, false);
null, false, false); null, false);
} }
} }
@ -5432,7 +5432,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
buf.Data[countposition] = (byte)count; buf.Data[countposition] = (byte)count;
buf.DataLength = pos; buf.DataLength = pos;
m_udpServer.SendUDPPacket(m_udpClient, buf, ThrottleOutPacketType.Task, null, false, false); m_udpServer.SendUDPPacket(m_udpClient, buf, ThrottleOutPacketType.Task, null, false);
buf = newbuf; buf = newbuf;
data = buf.Data; data = buf.Data;
@ -5446,7 +5446,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
{ {
buf.Data[countposition] = (byte)count; buf.Data[countposition] = (byte)count;
buf.DataLength = pos; buf.DataLength = pos;
m_udpServer.SendUDPPacket(m_udpClient, buf, ThrottleOutPacketType.Task, null, false, false); m_udpServer.SendUDPPacket(m_udpClient, buf, ThrottleOutPacketType.Task, null, false);
} }
} }
@ -5493,8 +5493,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
buf.DataLength = lastpos; buf.DataLength = lastpos;
// zero encode is not as spec // zero encode is not as spec
m_udpServer.SendUDPPacket(m_udpClient, buf, ThrottleOutPacketType.Task, m_udpServer.SendUDPPacket(m_udpClient, buf, ThrottleOutPacketType.Task,
//delegate (OutgoingPacket oPacket) { ResendPrimUpdates(tau, oPacket); }, false, true); delegate (OutgoingPacket oPacket) { ResendPrimUpdates(tau, oPacket); }, true);
null, false, true); //null, false, true);
tau = new List<EntityUpdate>(30); tau = new List<EntityUpdate>(30);
tau.Add(eu); tau.Add(eu);
@ -5509,8 +5509,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
buf.Data[17] = (byte)count; buf.Data[17] = (byte)count;
buf.DataLength = pos; buf.DataLength = pos;
m_udpServer.SendUDPPacket(m_udpClient, buf, ThrottleOutPacketType.Task, m_udpServer.SendUDPPacket(m_udpClient, buf, ThrottleOutPacketType.Task,
//delegate (OutgoingPacket oPacket) { ResendPrimUpdates(tau, oPacket); }, false, true); //delegate (OutgoingPacket oPacket) { ResendPrimUpdates(tau, oPacket); }, true);
null, false, true); null, true);
} }
} }
@ -14541,7 +14541,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
LLUDPServer.LogPacketHeader(false, m_circuitCode, 0, packet.Type, (ushort)packet.Length); LLUDPServer.LogPacketHeader(false, m_circuitCode, 0, packet.Type, (ushort)packet.Length);
#endregion BinaryStats #endregion BinaryStats
OutPacket(packet, throttlePacketType, true); OutPacket(packet, throttlePacketType, true, null);
} }
/// <summary> /// <summary>
@ -15174,7 +15174,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
int pos = 18; int pos = 18;
CreateImprovedTerseBlock(p, buf.Data, ref pos, false); CreateImprovedTerseBlock(p, buf.Data, ref pos, false);
buf.DataLength = pos; buf.DataLength = pos;
m_udpServer.SendUDPPacket(m_udpClient, buf, ThrottleOutPacketType.Task, null, false, true); m_udpServer.SendUDPPacket(m_udpClient, buf, ThrottleOutPacketType.Task, null, true);
} }
} }

View File

@ -115,7 +115,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// <summary>Circuit code that this client is connected on</summary> /// <summary>Circuit code that this client is connected on</summary>
public readonly uint CircuitCode; public readonly uint CircuitCode;
/// <summary>Sequence numbers of packets we've received (for duplicate checking)</summary> /// <summary>Sequence numbers of packets we've received (for duplicate checking)</summary>
public IncomingPacketHistoryCollection PacketArchive = new IncomingPacketHistoryCollection(256); public IncomingPacketHistoryCollection PacketArchive = new IncomingPacketHistoryCollection(1024);
/// <summary>Packets we have sent that need to be ACKed by the client</summary> /// <summary>Packets we have sent that need to be ACKed by the client</summary>
public UnackedPacketCollection NeedAcks = new UnackedPacketCollection(); public UnackedPacketCollection NeedAcks = new UnackedPacketCollection();
@ -123,6 +123,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// <summary>ACKs that are queued up, waiting to be sent to the client</summary> /// <summary>ACKs that are queued up, waiting to be sent to the client</summary>
public DoubleLocklessQueue<uint> PendingAcks = new DoubleLocklessQueue<uint>(); public DoubleLocklessQueue<uint> PendingAcks = new DoubleLocklessQueue<uint>();
public int AckStalls;
/// <summary>Current packet sequence number</summary> /// <summary>Current packet sequence number</summary>
public int CurrentSequence; public int CurrentSequence;
/// <summary>Current ping sequence number</summary> /// <summary>Current ping sequence number</summary>
@ -185,7 +187,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
private byte[] m_packedThrottles; private byte[] m_packedThrottles;
private int m_defaultRTO = 1000; // 1sec is the recommendation in the RFC private int m_defaultRTO = 1000; // 1sec is the recommendation in the RFC
private int m_maxRTO = 10000; private int m_maxRTO = 3000;
private int m_minRTO = 250;
public bool m_deliverPackets = true; public bool m_deliverPackets = true;
private float m_burstTime; private float m_burstTime;
@ -538,46 +541,20 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// true if the packet has been queued, /// true if the packet has been queued,
/// false if the packet has not been queued and should be sent immediately. /// false if the packet has not been queued and should be sent immediately.
/// </returns> /// </returns>
public bool EnqueueOutgoing(OutgoingPacket packet, bool forceQueue) public bool EnqueueOutgoing(OutgoingPacket packet)
{ {
return EnqueueOutgoing(packet, forceQueue, false); return EnqueueOutgoing(packet, false);
} }
public bool EnqueueOutgoing(OutgoingPacket packet, bool forceQueue, bool highPriority) public bool EnqueueOutgoing(OutgoingPacket packet, bool highPriority)
{ {
int category = (int)packet.Category; int category = (int)packet.Category;
if (category >= 0 && category < m_packetOutboxes.Length) if (category >= 0 && category < m_packetOutboxes.Length)
{ {
DoubleLocklessQueue<OutgoingPacket> queue = m_packetOutboxes[category]; DoubleLocklessQueue<OutgoingPacket> queue = m_packetOutboxes[category];
queue.Enqueue(packet, highPriority);
if (forceQueue || m_deliverPackets == false) return true;
{
queue.Enqueue(packet, highPriority);
return true;
}
// need to enqueue if queue is not empty
if (queue.Count > 0 || m_nextPackets[category] != null)
{
queue.Enqueue(packet, highPriority);
return true;
}
// check bandwidth
TokenBucket bucket = m_throttleCategories[category];
if (bucket.CheckTokens(packet.Buffer.DataLength))
{
// enough tokens so it can be sent imediatly by caller
bucket.RemoveTokens(packet.Buffer.DataLength);
return false;
}
else
{
// Force queue specified or not enough tokens in the bucket, queue this packet
queue.Enqueue(packet, highPriority);
return true;
}
} }
else else
{ {
@ -608,33 +585,84 @@ namespace OpenSim.Region.ClientStack.LindenUDP
OutgoingPacket packet = null; OutgoingPacket packet = null;
DoubleLocklessQueue<OutgoingPacket> queue; DoubleLocklessQueue<OutgoingPacket> queue;
TokenBucket bucket;
bool packetSent = false; bool packetSent = false;
ThrottleOutPacketTypeFlags emptyCategories = 0; ThrottleOutPacketTypeFlags emptyCategories = 0;
//string queueDebugOutput = String.Empty; // Serious debug business //string queueDebugOutput = String.Empty; // Serious debug business
// do resends
for (int i = 0; i < THROTTLE_CATEGORY_COUNT; i++) packet = m_nextPackets[0];
if (packet != null)
{
if (packet.Buffer != null)
{
if (m_throttleCategories[0].RemoveTokens(packet.Buffer.DataLength))
{
// Send the packet
m_udpServer.SendPacketFinal(packet);
packetSent = true;
m_nextPackets[0] = null;
}
}
else
m_nextPackets[0] = null;
}
else
{
queue = m_packetOutboxes[0];
if (queue != null)
{
if(queue.Dequeue(out packet))
{
// A packet was pulled off the queue. See if we have
// enough tokens in the bucket to send it out
if (packet.Buffer != null)
{
if (m_throttleCategories[0].RemoveTokens(packet.Buffer.DataLength))
{
// Send the packet
m_udpServer.SendPacketFinal(packet);
packetSent = true;
}
else
{
// Save the dequeued packet for the next iteration
m_nextPackets[0] = packet;
}
}
}
}
else
{
m_packetOutboxes[0] = new DoubleLocklessQueue<OutgoingPacket>();
}
}
if(NeedAcks.Count() > 50)
{
Interlocked.Increment(ref AckStalls);
return true;
}
for (int i = 1; i < THROTTLE_CATEGORY_COUNT; i++)
{ {
bucket = m_throttleCategories[i];
//queueDebugOutput += m_packetOutboxes[i].Count + " "; // Serious debug business //queueDebugOutput += m_packetOutboxes[i].Count + " "; // Serious debug business
if (m_nextPackets[i] != null) packet = m_nextPackets[i];
if (packet != null)
{ {
// This bucket was empty the last time we tried to send a packet, if(packet.Buffer == null)
// leaving a dequeued packet still waiting to be sent out. Try to
// send it again
OutgoingPacket nextPacket = m_nextPackets[i];
if(nextPacket.Buffer == null)
{ {
if (m_packetOutboxes[i].Count < 5) if (m_packetOutboxes[i].Count < 5)
emptyCategories |= CategoryToFlag(i); emptyCategories |= CategoryToFlag(i);
m_nextPackets[i] = null;
continue; continue;
} }
if (bucket.RemoveTokens(nextPacket.Buffer.DataLength))
if (m_throttleCategories[i].RemoveTokens(packet.Buffer.DataLength))
{ {
// Send the packet // Send the packet
m_udpServer.SendPacketFinal(nextPacket); m_udpServer.SendPacketFinal(packet);
m_nextPackets[i] = null; m_nextPackets[i] = null;
packetSent = true; packetSent = true;
@ -647,55 +675,34 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// No dequeued packet waiting to be sent, try to pull one off // No dequeued packet waiting to be sent, try to pull one off
// this queue // this queue
queue = m_packetOutboxes[i]; queue = m_packetOutboxes[i];
if (queue != null) if(queue.Dequeue(out packet))
{ {
bool success = false; if (packet.Buffer == null)
try
{ {
success = queue.Dequeue(out packet); // packet canceled elsewhere (by a ack for example)
if (queue.Count < 5)
emptyCategories |= CategoryToFlag(i);
continue;
} }
catch
{
m_packetOutboxes[i] = new DoubleLocklessQueue<OutgoingPacket>();
}
if (success)
{
// A packet was pulled off the queue. See if we have
// enough tokens in the bucket to send it out
if(packet.Buffer == null)
{
// packet canceled elsewhere (by a ack for example)
if (queue.Count < 5)
emptyCategories |= CategoryToFlag(i);
}
else
{
if (bucket.RemoveTokens(packet.Buffer.DataLength))
{
// Send the packet
m_udpServer.SendPacketFinal(packet);
packetSent = true;
if (queue.Count < 5) if (m_throttleCategories[i].RemoveTokens(packet.Buffer.DataLength))
emptyCategories |= CategoryToFlag(i); {
} // Send the packet
else m_udpServer.SendPacketFinal(packet);
{ packetSent = true;
// Save the dequeued packet for the next iteration if (queue.Count < 5)
m_nextPackets[i] = packet; emptyCategories |= CategoryToFlag(i);
}
}
} }
else else
{ {
// No packets in this queue. Fire the queue empty callback // Save the dequeued packet for the next iteration
// if it has not been called recently m_nextPackets[i] = packet;
emptyCategories |= CategoryToFlag(i);
} }
} }
else else
{ {
m_packetOutboxes[i] = new DoubleLocklessQueue<OutgoingPacket>(); // No packets in this queue. Fire the queue empty callback
// if it has not been called recently
emptyCategories |= CategoryToFlag(i); emptyCategories |= CategoryToFlag(i);
} }
} }
@ -718,8 +725,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
p *= 5; p *= 5;
if( p> m_maxRTO) if( p> m_maxRTO)
p = m_maxRTO; p = m_maxRTO;
else if(p < m_defaultRTO) else if(p < m_minRTO)
p = m_defaultRTO; p = m_minRTO;
m_RTO = p; m_RTO = p;
} }

View File

@ -880,8 +880,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// If a Linden Lab 1.23.5 client receives an update packet after a kill packet for an object, it will // If a Linden Lab 1.23.5 client receives an update packet after a kill packet for an object, it will
// continue to display the deleted object until relog. Therefore, we need to always queue a kill object // continue to display the deleted object until relog. Therefore, we need to always queue a kill object
// packet so that it isn't sent before a queued update packet. // packet so that it isn't sent before a queued update packet.
bool requestQueue = type == PacketType.KillObject; if (!outgoingPacket.Client.EnqueueOutgoing(outgoingPacket, highPriority))
if (!outgoingPacket.Client.EnqueueOutgoing(outgoingPacket, requestQueue, highPriority))
{ {
SendPacketFinal(outgoingPacket); SendPacketFinal(outgoingPacket);
return true; return true;
@ -952,7 +951,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
} }
public void SendUDPPacket( public void SendUDPPacket(
LLUDPClient udpClient, UDPPacketBuffer buffer, ThrottleOutPacketType category, UnackedPacketMethod method, bool forcequeue, bool zerocode) LLUDPClient udpClient, UDPPacketBuffer buffer, ThrottleOutPacketType category, UnackedPacketMethod method, bool zerocode)
{ {
bool highPriority = false; bool highPriority = false;
@ -971,7 +970,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
if ((outgoingPacket.Buffer.Data[0] & Helpers.MSG_RELIABLE) != 0) if ((outgoingPacket.Buffer.Data[0] & Helpers.MSG_RELIABLE) != 0)
outgoingPacket.UnackedMethod = ((method == null) ? delegate (OutgoingPacket oPacket) { ResendUnacked(oPacket); } : method); outgoingPacket.UnackedMethod = ((method == null) ? delegate (OutgoingPacket oPacket) { ResendUnacked(oPacket); } : method);
if (!outgoingPacket.Client.EnqueueOutgoing(outgoingPacket, forcequeue, highPriority)) if (!outgoingPacket.Client.EnqueueOutgoing(outgoingPacket, highPriority))
SendPacketFinal(outgoingPacket); SendPacketFinal(outgoingPacket);
} }
@ -991,7 +990,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
if ((outgoingPacket.Buffer.Data[0] & Helpers.MSG_RELIABLE) != 0) if ((outgoingPacket.Buffer.Data[0] & Helpers.MSG_RELIABLE) != 0)
outgoingPacket.UnackedMethod = delegate (OutgoingPacket oPacket) { ResendUnacked(oPacket); }; outgoingPacket.UnackedMethod = delegate (OutgoingPacket oPacket) { ResendUnacked(oPacket); };
if (!outgoingPacket.Client.EnqueueOutgoing(outgoingPacket, false, highPriority)) if (!outgoingPacket.Client.EnqueueOutgoing(outgoingPacket, highPriority))
SendPacketFinal(outgoingPacket); SendPacketFinal(outgoingPacket);
} }
@ -1056,11 +1055,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
data[7] = udpClient.CurrentPingSequence++; data[7] = udpClient.CurrentPingSequence++;
// older seq number of our un ack packets, so viewers could clean deduplication lists TODO // older seq number of our un ack packets, so viewers could clean deduplication lists TODO
//Utils.UIntToBytes(0, data, 8); Utils.UIntToBytes(udpClient.NeedAcks.Oldest(), data, 8);
data[8] = 0;
data[9] = 0;
data[10] = 0;
data[11] = 0;
buf.DataLength = 12; buf.DataLength = 12;
SendUDPPacket(udpClient, buf, ThrottleOutPacketType.Unknown); SendUDPPacket(udpClient, buf, ThrottleOutPacketType.Unknown);
@ -1140,7 +1135,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// Bump up the resend count on this packet // Bump up the resend count on this packet
Interlocked.Increment(ref outgoingPacket.ResendCount); Interlocked.Increment(ref outgoingPacket.ResendCount);
// Requeue or resend the packet // Requeue or resend the packet
if (!outgoingPacket.Client.EnqueueOutgoing(outgoingPacket, false)) if (!outgoingPacket.Client.EnqueueOutgoing(outgoingPacket, false))
SendPacketFinal(outgoingPacket); SendPacketFinal(outgoingPacket);
@ -1162,7 +1156,10 @@ namespace OpenSim.Region.ClientStack.LindenUDP
return; return;
LLUDPClient udpClient = outgoingPacket.Client; LLUDPClient udpClient = outgoingPacket.Client;
if (!udpClient.IsConnected) if (!udpClient.IsConnected)
{
FreeUDPBuffer(buffer);
return; return;
}
byte flags = buffer.Data[0]; byte flags = buffer.Data[0];
bool isResend = (flags & Helpers.MSG_RESENT) != 0; bool isResend = (flags & Helpers.MSG_RESENT) != 0;
@ -1172,7 +1169,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
int dataLength = buffer.DataLength; int dataLength = buffer.DataLength;
// only append acks on plain reliable messages // only append acks on plain reliable messages
if (flags == Helpers.MSG_RELIABLE) if (flags == Helpers.MSG_RELIABLE && outgoingPacket.UnackedMethod == null)
{ {
// Keep appending ACKs until there is no room left in the buffer or there are // Keep appending ACKs until there is no room left in the buffer or there are
// no more ACKs to append // no more ACKs to append
@ -1211,10 +1208,12 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// Stats tracking // Stats tracking
Interlocked.Increment(ref udpClient.PacketsSent); Interlocked.Increment(ref udpClient.PacketsSent);
PacketsSentCount++; PacketsSentCount++;
SyncSend(buffer); SyncSend(buffer);
// Keep track of when this packet was sent out (right now) // Keep track of when this packet was sent out (right now)
outgoingPacket.TickCount = Environment.TickCount & Int32.MaxValue; int enow = Environment.TickCount & Int32.MaxValue;
Interlocked.Exchange(ref outgoingPacket.TickCount, enow);
if (outgoingPacket.UnackedMethod == null) if (outgoingPacket.UnackedMethod == null)
FreeUDPBuffer(buffer); FreeUDPBuffer(buffer);
@ -1935,7 +1934,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
{ {
m_resendUnacked = true; m_resendUnacked = true;
m_elapsedMSOutgoingPacketHandler = 0.0; m_elapsedMSOutgoingPacketHandler = 0.0;
m_elapsed100MSOutgoingPacketHandler += 1; ++m_elapsed100MSOutgoingPacketHandler;
} }
// Check for pending outgoing ACKs every 500ms // Check for pending outgoing ACKs every 500ms
@ -1943,7 +1942,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
{ {
m_sendAcks = true; m_sendAcks = true;
m_elapsed100MSOutgoingPacketHandler = 0; m_elapsed100MSOutgoingPacketHandler = 0;
m_elapsed500MSOutgoingPacketHandler += 1; ++m_elapsed500MSOutgoingPacketHandler;
} }
// Send pings to clients every 5000ms // Send pings to clients every 5000ms

View File

@ -66,21 +66,31 @@ namespace OpenSim.Region.ClientStack.LindenUDP
//private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); //private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
/// <summary>Holds the actual unacked packet data, sorted by sequence number</summary> /// <summary>Holds the actual unacked packet data, sorted by sequence number</summary>
private Dictionary<uint, OutgoingPacket> m_packets = new Dictionary<uint, OutgoingPacket>(); private SortedDictionary<uint, OutgoingPacket> m_packets = new SortedDictionary<uint, OutgoingPacket>();
/// <summary>Holds packets that need to be added to the unacknowledged list</summary> /// <summary>Holds packets that need to be added to the unacknowledged list</summary>
private LocklessQueue<OutgoingPacket> m_pendingAdds = new LocklessQueue<OutgoingPacket>(); private LocklessQueue<OutgoingPacket> m_pendingAdds = new LocklessQueue<OutgoingPacket>();
/// <summary>Holds information about pending acknowledgements</summary> /// <summary>Holds information about pending acknowledgements</summary>
private LocklessQueue<PendingAck> m_pendingAcknowledgements = new LocklessQueue<PendingAck>(); private LocklessQueue<PendingAck> m_pendingAcknowledgements = new LocklessQueue<PendingAck>();
/// <summary>Holds information about pending removals</summary> /// <summary>Holds information about pending removals</summary>
private LocklessQueue<uint> m_pendingRemoves = new LocklessQueue<uint>(); private LocklessQueue<uint> m_pendingRemoves = new LocklessQueue<uint>();
private uint m_older;
public void Clear() public void Clear()
{ {
m_packets.Clear(); m_packets.Clear();
m_pendingAdds = null; m_pendingAdds = null;
m_pendingAcknowledgements = null; m_pendingAcknowledgements = null;
m_pendingRemoves = null; m_pendingRemoves = null;
m_older = 0;
}
public int Count()
{
return m_packets.Count + m_pendingAdds.Count - m_pendingAcknowledgements.Count - m_pendingRemoves.Count;
}
public uint Oldest()
{
return m_older;
} }
/// <summary> /// <summary>
@ -148,33 +158,47 @@ namespace OpenSim.Region.ClientStack.LindenUDP
ProcessQueues(); ProcessQueues();
List<OutgoingPacket> expiredPackets = null; List<OutgoingPacket> expiredPackets = null;
bool doolder = true;
if (m_packets.Count > 0) if (m_packets.Count > 0)
{ {
int now = Environment.TickCount & Int32.MaxValue; int now = Environment.TickCount & Int32.MaxValue;
foreach (OutgoingPacket packet in m_packets.Values) foreach (OutgoingPacket packet in m_packets.Values)
{ {
if(packet.Buffer == null)
{
Remove(packet.SequenceNumber);
continue;
}
if(doolder)
{
m_older = packet.SequenceNumber;
doolder = false;
}
// TickCount of zero means a packet is in the resend queue // TickCount of zero means a packet is in the resend queue
// but hasn't actually been sent over the wire yet // but hasn't actually been sent over the wire yet
if (packet.TickCount == 0) int ptime = Interlocked.Exchange(ref packet.TickCount, 0);
if (ptime == 0)
continue; continue;
if (now - packet.TickCount >= timeoutMS) if(now - ptime < timeoutMS)
{ {
if (expiredPackets == null) int t = Interlocked.Exchange(ref packet.TickCount, ptime);
expiredPackets = new List<OutgoingPacket>(); if (t > ptime)
Interlocked.Exchange(ref packet.TickCount, t);
// The TickCount will be set to the current time when the packet continue;
// is actually sent out again
packet.TickCount = 0;
// As with other network applications, assume that an expired packet is
// an indication of some network problem, slow transmission
packet.Client.FlowThrottle.ExpirePackets(1);
expiredPackets.Add(packet);
} }
if (expiredPackets == null)
expiredPackets = new List<OutgoingPacket>();
// As with other network applications, assume that an expired packet is
// an indication of some network problem, slow transmission
packet.Client.FlowThrottle.ExpirePackets(1);
expiredPackets.Add(packet);
} }
} }
@ -186,59 +210,13 @@ namespace OpenSim.Region.ClientStack.LindenUDP
private void ProcessQueues() private void ProcessQueues()
{ {
// Process all the pending adds while (m_pendingRemoves.TryDequeue(out uint pendingRemove))
OutgoingPacket pendingAdd;
while (m_pendingAdds.TryDequeue(out pendingAdd))
{ {
if (pendingAdd != null) if (m_packets.TryGetValue(pendingRemove, out OutgoingPacket removedPacket))
m_packets[pendingAdd.SequenceNumber] = pendingAdd;
}
// Process all the pending removes, including updating statistics and round-trip times
PendingAck pendingAcknowledgement;
while (m_pendingAcknowledgements.TryDequeue(out pendingAcknowledgement))
{
//m_log.DebugFormat("[UNACKED PACKET COLLECTION]: Processing ack {0}", pendingAcknowledgement.SequenceNumber);
OutgoingPacket ackedPacket;
if (m_packets.TryGetValue(pendingAcknowledgement.SequenceNumber, out ackedPacket))
{
if (ackedPacket != null)
{
m_packets.Remove(pendingAcknowledgement.SequenceNumber);
// Update stats
Interlocked.Add(ref ackedPacket.Client.UnackedBytes, -ackedPacket.Buffer.DataLength);
ackedPacket.Client.FreeUDPBuffer(ackedPacket.Buffer);
ackedPacket.Buffer = null;
// As with other network applications, assume that an acknowledged packet is an
// indication that the network can handle a little more load, speed up the transmission
ackedPacket.Client.FlowThrottle.AcknowledgePackets(1);
}
else
{
// m_log.WarnFormat("[UNACKED PACKET COLLECTION]: found null packet for sequence number {0} to ack",
// pendingAcknowledgement.SequenceNumber);
}
}
else
{
// m_log.WarnFormat("[UNACKED PACKET COLLECTION]: Could not find packet with sequence number {0} to ack",
// pendingAcknowledgement.SequenceNumber);
}
}
uint pendingRemove;
while(m_pendingRemoves.TryDequeue(out pendingRemove))
{
OutgoingPacket removedPacket;
if (m_packets.TryGetValue(pendingRemove, out removedPacket))
{ {
m_packets.Remove(pendingRemove);
if (removedPacket != null) if (removedPacket != null)
{ {
m_packets.Remove(pendingRemove);
// Update stats // Update stats
Interlocked.Add(ref removedPacket.Client.UnackedBytes, -removedPacket.Buffer.DataLength); Interlocked.Add(ref removedPacket.Client.UnackedBytes, -removedPacket.Buffer.DataLength);
@ -247,6 +225,37 @@ namespace OpenSim.Region.ClientStack.LindenUDP
} }
} }
} }
// Process all the pending adds
while (m_pendingAdds.TryDequeue(out OutgoingPacket pendingAdd))
{
if (pendingAdd != null)
m_packets[pendingAdd.SequenceNumber] = pendingAdd;
}
// Process all the pending removes, including updating statistics and round-trip times
while (m_pendingAcknowledgements.TryDequeue(out PendingAck pendingAcknowledgement))
{
//m_log.DebugFormat("[UNACKED PACKET COLLECTION]: Processing ack {0}", pendingAcknowledgement.SequenceNumber);
if (m_packets.TryGetValue(pendingAcknowledgement.SequenceNumber, out OutgoingPacket ackedPacket))
{
m_packets.Remove(pendingAcknowledgement.SequenceNumber);
if (ackedPacket != null)
{
// Update stats
if(ackedPacket.Buffer != null)
{
Interlocked.Add(ref ackedPacket.Client.UnackedBytes, -ackedPacket.Buffer.DataLength);
ackedPacket.Client.FreeUDPBuffer(ackedPacket.Buffer);
ackedPacket.Buffer = null;
}
// As with other network applications, assume that an acknowledged packet is an
// indication that the network can handle a little more load, speed up the transmission
ackedPacket.Client.FlowThrottle.AcknowledgePackets(1);
}
}
}
} }
} }
} }