Refactor UnackedPacketCollection so ProcessQueues will handle Adds, Acks, and Removes in that order.

bulletsim
Dan Lake 2011-04-21 15:40:32 -07:00
parent 371576d1dd
commit 7f28dd4b31
2 changed files with 34 additions and 26 deletions

View File

@ -672,7 +672,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
if (packet.Header.AppendedAcks && packet.Header.AckList != null) if (packet.Header.AppendedAcks && packet.Header.AckList != null)
{ {
for (int i = 0; i < packet.Header.AckList.Length; i++) for (int i = 0; i < packet.Header.AckList.Length; i++)
udpClient.NeedAcks.Remove(packet.Header.AckList[i], now, packet.Header.Resent); udpClient.NeedAcks.Acknowledge(packet.Header.AckList[i], now, packet.Header.Resent);
} }
// Handle PacketAck packets // Handle PacketAck packets
@ -681,7 +681,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
PacketAckPacket ackPacket = (PacketAckPacket)packet; PacketAckPacket ackPacket = (PacketAckPacket)packet;
for (int i = 0; i < ackPacket.Packets.Length; i++) for (int i = 0; i < ackPacket.Packets.Length; i++)
udpClient.NeedAcks.Remove(ackPacket.Packets[i].ID, now, packet.Header.Resent); udpClient.NeedAcks.Acknowledge(ackPacket.Packets[i].ID, now, packet.Header.Resent);
// We don't need to do anything else with PacketAck packets // We don't need to do anything else with PacketAck packets
return; return;

View File

@ -65,7 +65,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// <summary>Holds packets that need to be added to the unacknowledged list</summary> /// <summary>Holds packets that need to be added to the unacknowledged list</summary>
private LocklessQueue<OutgoingPacket> m_pendingAdds = new LocklessQueue<OutgoingPacket>(); private LocklessQueue<OutgoingPacket> m_pendingAdds = new LocklessQueue<OutgoingPacket>();
/// <summary>Holds information about pending acknowledgements</summary> /// <summary>Holds information about pending acknowledgements</summary>
private LocklessQueue<PendingAck> m_pendingRemoves = new LocklessQueue<PendingAck>(); private LocklessQueue<PendingAck> m_pendingAcknowledgements = new LocklessQueue<PendingAck>();
/// <summary>Holds information about pending removals</summary>
private LocklessQueue<uint> m_pendingRemoves = new LocklessQueue<uint>();
/// <summary> /// <summary>
/// Add an unacked packet to the collection /// Add an unacked packet to the collection
@ -92,9 +94,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// <param name="currentTime">Current value of Environment.TickCount</param> /// <param name="currentTime">Current value of Environment.TickCount</param>
/// <remarks>This does not immediately acknowledge the packet, it only /// <remarks>This does not immediately acknowledge the packet, it only
/// queues the ack so it can be handled in a thread-safe way later</remarks> /// queues the ack so it can be handled in a thread-safe way later</remarks>
public void Remove(uint sequenceNumber, int currentTime, bool fromResend) public void Acknowledge(uint sequenceNumber, int currentTime, bool fromResend)
{ {
m_pendingRemoves.Enqueue(new PendingAck(sequenceNumber, currentTime, fromResend)); m_pendingAcknowledgements.Enqueue(new PendingAck(sequenceNumber, currentTime, fromResend));
} }
/// <summary> /// <summary>
@ -105,21 +107,11 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// </summary> /// </summary>
/// <param name="sequenceNumber">Sequence number of the packet to /// <param name="sequenceNumber">Sequence number of the packet to
/// acknowledge</param> /// acknowledge</param>
/// <remarks>The packet is removed from the collection immediately. /// <remarks>The does not immediately remove the packet, it only queues the removal
/// This function is not threadsafe. It must be called by the thread calling GetExpiredPackets.</remarks> /// so it can be handled in a thread safe way later</remarks>
public void Remove(uint sequenceNumber) public void Remove(uint sequenceNumber)
{ {
OutgoingPacket removedPacket; m_pendingRemoves.Enqueue(sequenceNumber);
if (m_packets.TryGetValue(sequenceNumber, out removedPacket))
{
if (removedPacket != null)
{
m_packets.Remove(sequenceNumber);
// Update stats
Interlocked.Add(ref removedPacket.Client.UnackedBytes, -removedPacket.Buffer.DataLength);
}
}
} }
/// <summary> /// <summary>
@ -179,15 +171,15 @@ namespace OpenSim.Region.ClientStack.LindenUDP
m_packets[pendingAdd.SequenceNumber] = pendingAdd; m_packets[pendingAdd.SequenceNumber] = pendingAdd;
// Process all the pending removes, including updating statistics and round-trip times // Process all the pending removes, including updating statistics and round-trip times
PendingAck pendingRemove; PendingAck pendingAcknowledgement;
OutgoingPacket ackedPacket; while (m_pendingAcknowledgements.TryDequeue(out pendingAcknowledgement))
while (m_pendingRemoves.TryDequeue(out pendingRemove))
{ {
if (m_packets.TryGetValue(pendingRemove.SequenceNumber, out ackedPacket)) OutgoingPacket ackedPacket;
if (m_packets.TryGetValue(pendingAcknowledgement.SequenceNumber, out ackedPacket))
{ {
if (ackedPacket != null) if (ackedPacket != null)
{ {
m_packets.Remove(pendingRemove.SequenceNumber); m_packets.Remove(pendingAcknowledgement.SequenceNumber);
// As with other network applications, assume that an acknowledged packet is an // As with other network applications, assume that an acknowledged packet is an
// indication that the network can handle a little more load, speed up the transmission // indication that the network can handle a little more load, speed up the transmission
@ -196,16 +188,32 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// Update stats // Update stats
Interlocked.Add(ref ackedPacket.Client.UnackedBytes, -ackedPacket.Buffer.DataLength); Interlocked.Add(ref ackedPacket.Client.UnackedBytes, -ackedPacket.Buffer.DataLength);
if (!pendingRemove.FromResend) if (!pendingAcknowledgement.FromResend)
{ {
// Calculate the round-trip time for this packet and its ACK // Calculate the round-trip time for this packet and its ACK
int rtt = pendingRemove.RemoveTime - ackedPacket.TickCount; int rtt = pendingAcknowledgement.RemoveTime - ackedPacket.TickCount;
if (rtt > 0) if (rtt > 0)
ackedPacket.Client.UpdateRoundTrip(rtt); ackedPacket.Client.UpdateRoundTrip(rtt);
} }
} }
} }
} }
uint pendingRemove;
while(m_pendingRemoves.TryDequeue(out pendingRemove))
{
OutgoingPacket removedPacket;
if (m_packets.TryGetValue(pendingRemove, out removedPacket))
{
if (removedPacket != null)
{
m_packets.Remove(pendingRemove);
// Update stats
Interlocked.Add(ref removedPacket.Client.UnackedBytes, -removedPacket.Buffer.DataLength);
}
}
}
} }
} }
} }