diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs b/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs index 38bbce04b0..997f38cea6 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/LLClientView.cs @@ -1228,10 +1228,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP StartPingCheckPacket pc = (StartPingCheckPacket)PacketPool.Instance.GetPacket(PacketType.StartPingCheck); pc.Header.Reliable = false; - OutgoingPacket oldestPacket = m_udpClient.NeedAcks.GetOldest(); - pc.PingID.PingID = seq; - pc.PingID.OldestUnacked = (oldestPacket != null) ? oldestPacket.SequenceNumber : 0; + // We *could* get OldestUnacked, but it would hurt performance and not provide any benefit + pc.PingID.OldestUnacked = 0; OutPacket(pc, ThrottleOutPacketType.Unknown); } @@ -3320,8 +3319,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP // If we received an update about our own avatar, process the avatar update priority queue immediately if (data.AgentID == m_agentId) ProcessAvatarTerseUpdates(); - else - m_udpServer.SignalOutgoingPacketHandler(); } private void ProcessAvatarTerseUpdates() @@ -3409,8 +3406,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP lock (m_primFullUpdates.SyncRoot) m_primFullUpdates.Enqueue(data.priority, objectData, data.localID); - - m_udpServer.SignalOutgoingPacketHandler(); } void ProcessPrimFullUpdates() @@ -3454,8 +3449,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP lock (m_primTerseUpdates.SyncRoot) m_primTerseUpdates.Enqueue(data.Priority, objectData, data.LocalID); - - m_udpServer.SignalOutgoingPacketHandler(); } void ProcessPrimTerseUpdates() diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs b/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs index ca9925c4ae..458e78daa0 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/LLUDPClient.cs @@ -202,7 +202,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP public void Shutdown() { IsConnected = false; - NeedAcks.Clear(); for (int i = 0; i < THROTTLE_CATEGORY_COUNT; i++) { m_packetOutboxes[i].Clear(); @@ -394,7 +393,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP { // Not enough tokens in the bucket, queue this packet queue.Enqueue(packet); - m_udpServer.SignalOutgoingPacketHandler(); return true; } } @@ -411,15 +409,13 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// /// This function is only called from a synchronous loop in the /// UDPServer so we don't need to bother making this thread safe - /// The minimum amount of time before the next packet - /// can be sent to this client - public int DequeueOutgoing() + /// True if any packets were sent, otherwise false + public bool DequeueOutgoing() { OutgoingPacket packet; OpenSim.Framework.LocklessQueue queue; TokenBucket bucket; - int dataLength; - int minTimeout = Int32.MaxValue; + bool packetSent = false; //string queueDebugOutput = String.Empty; // Serious debug business @@ -434,18 +430,12 @@ namespace OpenSim.Region.ClientStack.LindenUDP // leaving a dequeued packet still waiting to be sent out. Try to // send it again OutgoingPacket nextPacket = m_nextPackets[i]; - dataLength = nextPacket.Buffer.DataLength; - if (bucket.RemoveTokens(dataLength)) + if (bucket.RemoveTokens(nextPacket.Buffer.DataLength)) { // Send the packet m_udpServer.SendPacketFinal(nextPacket); m_nextPackets[i] = null; - minTimeout = 0; - } - else if (minTimeout != 0) - { - // Check the minimum amount of time we would have to wait before this packet can be sent out - minTimeout = Math.Min(minTimeout, ((dataLength - bucket.Content) / bucket.DripPerMS) + 1); + packetSent = true; } } else @@ -457,23 +447,16 @@ namespace OpenSim.Region.ClientStack.LindenUDP { // A packet was pulled off the queue. See if we have // enough tokens in the bucket to send it out - dataLength = packet.Buffer.DataLength; - if (bucket.RemoveTokens(dataLength)) + if (bucket.RemoveTokens(packet.Buffer.DataLength)) { // Send the packet m_udpServer.SendPacketFinal(packet); - minTimeout = 0; + packetSent = true; } else { // Save the dequeued packet for the next iteration m_nextPackets[i] = packet; - - if (minTimeout != 0) - { - // Check the minimum amount of time we would have to wait before this packet can be sent out - minTimeout = Math.Min(minTimeout, ((dataLength - bucket.Content) / bucket.DripPerMS) + 1); - } } // If the queue is empty after this dequeue, fire the queue @@ -492,7 +475,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP } //m_log.Info("[LLUDPCLIENT]: Queues: " + queueDebugOutput); // Serious debug business - return minTimeout; + return packetSent; } /// diff --git a/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs b/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs index 7d5c11e421..a8ce102305 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/LLUDPServer.cs @@ -121,12 +121,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP private int m_recvBufferSize; /// Flag to process packets asynchronously or synchronously private bool m_asyncPacketHandling; - /// Track the minimum amount of time needed to send the next packet in the - /// OutgoingPacketHandler loop so we know when to sleep - private int m_minTimeout = Int32.MaxValue; - /// EventWaitHandle to signify the outgoing packet handler thread that - /// there is more work to do - private EventWaitHandle m_outgoingWaitHandle; + /// Tracks whether or not a packet was sent each round so we know + /// whether or not to sleep + private bool m_packetSent; public Socket Server { get { return null; } } @@ -174,8 +171,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP base.Start(m_recvBufferSize, m_asyncPacketHandling); - m_outgoingWaitHandle = new EventWaitHandle(false, EventResetMode.AutoReset); - // Start the incoming packet processing thread Thread incomingThread = new Thread(IncomingPacketHandler); incomingThread.Name = "Incoming Packets (" + m_scene.RegionInfo.RegionName + ")"; @@ -190,8 +185,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP { m_log.Info("[LLUDPSERVER]: Shutting down the LLUDP server for " + m_scene.RegionInfo.RegionName); base.Stop(); - - m_outgoingWaitHandle.Close(); } public void AddScene(IScene scene) @@ -374,10 +367,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP StartPingCheckPacket pc = (StartPingCheckPacket)PacketPool.Instance.GetPacket(PacketType.StartPingCheck); pc.Header.Reliable = false; - OutgoingPacket oldestPacket = udpClient.NeedAcks.GetOldest(); - pc.PingID.PingID = (byte)udpClient.CurrentPingSequence++; - pc.PingID.OldestUnacked = (oldestPacket != null) ? oldestPacket.SequenceNumber : 0; + // We *could* get OldestUnacked, but it would hurt performance and not provide any benefit + pc.PingID.OldestUnacked = 0; SendPacket(udpClient, pc, ThrottleOutPacketType.Unknown, false); } @@ -397,39 +389,36 @@ namespace OpenSim.Region.ClientStack.LindenUDP return; } - if (udpClient.NeedAcks.Count > 0) + // Get a list of all of the packets that have been sitting unacked longer than udpClient.RTO + List expiredPackets = udpClient.NeedAcks.GetExpiredPackets(udpClient.RTO); + + if (expiredPackets != null) { - // Get a list of all of the packets that have been sitting unacked longer than udpClient.RTO - List expiredPackets = udpClient.NeedAcks.GetExpiredPackets(udpClient.RTO); + m_log.Debug("[LLUDPSERVER]: Resending " + expiredPackets.Count + " packets to " + udpClient.AgentID + ", RTO=" + udpClient.RTO); - if (expiredPackets != null) + // Resend packets + for (int i = 0; i < expiredPackets.Count; i++) { - m_log.Debug("[LLUDPSERVER]: Resending " + expiredPackets.Count + " packets to " + udpClient.AgentID); + OutgoingPacket outgoingPacket = expiredPackets[i]; - // Resend packets - for (int i = 0; i < expiredPackets.Count; i++) - { - OutgoingPacket outgoingPacket = expiredPackets[i]; + //m_log.DebugFormat("[LLUDPSERVER]: Resending packet #{0} (attempt {1}), {2}ms have passed", + // outgoingPacket.SequenceNumber, outgoingPacket.ResendCount, Environment.TickCount - outgoingPacket.TickCount); - //m_log.DebugFormat("[LLUDPSERVER]: Resending packet #{0} (attempt {1}), {2}ms have passed", - // outgoingPacket.SequenceNumber, outgoingPacket.ResendCount, Environment.TickCount - outgoingPacket.TickCount); + // Set the resent flag + outgoingPacket.Buffer.Data[0] = (byte)(outgoingPacket.Buffer.Data[0] | Helpers.MSG_RESENT); + outgoingPacket.Category = ThrottleOutPacketType.Resend; - // Set the resent flag - outgoingPacket.Buffer.Data[0] = (byte)(outgoingPacket.Buffer.Data[0] | Helpers.MSG_RESENT); - outgoingPacket.Category = ThrottleOutPacketType.Resend; + // The TickCount will be set to the current time when the packet + // is actually sent out again + outgoingPacket.TickCount = 0; - // The TickCount will be set to the current time when the packet - // is actually sent out again - outgoingPacket.TickCount = 0; + // Bump up the resend count on this packet + Interlocked.Increment(ref outgoingPacket.ResendCount); + //Interlocked.Increment(ref Stats.ResentPackets); - // Bump up the resend count on this packet - Interlocked.Increment(ref outgoingPacket.ResendCount); - //Interlocked.Increment(ref Stats.ResentPackets); - - // Requeue or resend the packet - if (!outgoingPacket.Client.EnqueueOutgoing(outgoingPacket)) - SendPacketFinal(outgoingPacket); - } + // Requeue or resend the packet + if (!outgoingPacket.Client.EnqueueOutgoing(outgoingPacket)) + SendPacketFinal(outgoingPacket); } } } @@ -577,11 +566,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP // Handle appended ACKs if (packet.Header.AppendedAcks && packet.Header.AckList != null) { - lock (udpClient.NeedAcks.SyncRoot) - { - for (int i = 0; i < packet.Header.AckList.Length; i++) - AcknowledgePacket(udpClient, packet.Header.AckList[i], now, packet.Header.Resent); - } + for (int i = 0; i < packet.Header.AckList.Length; i++) + udpClient.NeedAcks.Remove(packet.Header.AckList[i], now, packet.Header.Resent); } // Handle PacketAck packets @@ -589,11 +575,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP { PacketAckPacket ackPacket = (PacketAckPacket)packet; - lock (udpClient.NeedAcks.SyncRoot) - { - for (int i = 0; i < ackPacket.Packets.Length; i++) - AcknowledgePacket(udpClient, ackPacket.Packets[i].ID, now, packet.Header.Resent); - } + for (int i = 0; i < ackPacket.Packets.Length; i++) + udpClient.NeedAcks.Remove(ackPacket.Packets[i].ID, now, packet.Header.Resent); // We don't need to do anything else with PacketAck packets return; @@ -734,21 +717,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP client.Close(); } - private void AcknowledgePacket(LLUDPClient client, uint ack, int currentTime, bool fromResend) - { - OutgoingPacket ackedPacket; - if (client.NeedAcks.RemoveUnsafe(ack, out ackedPacket) && !fromResend) - { - // Update stats - Interlocked.Add(ref client.UnackedBytes, -ackedPacket.Buffer.DataLength); - - // Calculate the round-trip time for this packet and its ACK - int rtt = currentTime - ackedPacket.TickCount; - if (rtt > 0) - client.UpdateRoundTrip(rtt); - } - } - private void IncomingPacketHandler() { // Set this culture for the thread that incoming packets are received @@ -775,11 +743,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP packetInbox.Clear(); } - public bool SignalOutgoingPacketHandler() - { - return m_outgoingWaitHandle.Set(); - } - private void OutgoingPacketHandler() { // Set this culture for the thread that outgoing packets are sent @@ -790,28 +753,19 @@ namespace OpenSim.Region.ClientStack.LindenUDP { try { - m_minTimeout = Int32.MaxValue; + m_packetSent = false; // Handle outgoing packets, resends, acknowledgements, and pings for each - // client. m_minTimeout will be set to 0 if more packets are waiting in the - // queues with bandwidth to spare, or the number of milliseconds we need to - // wait before at least one packet can be sent to a client + // client. m_packetSent will be set to true if a packet is sent m_scene.ClientManager.ForEachSync(ClientOutgoingPacketHandler); - // Can't wait for a negative amount of time, and put a 100ms ceiling on our - // maximum wait time - m_minTimeout = Utils.Clamp(m_minTimeout, 0, 100); - - if (m_minTimeout > 0) - { - // Don't bother waiting for a shorter interval than our TickCountResolution - // since the token buckets wouldn't update anyways - m_minTimeout = Math.Max(m_minTimeout, (int)TickCountResolution); - - // Wait for someone to signal that packets are ready to be sent, or for our - // sleep interval to expire - m_outgoingWaitHandle.WaitOne(m_minTimeout); - } + // If a packet was sent, only do a minimum length context switch to allow + // other parts of the code to do work. If nothing was sent, sleep for the + // minimum amount of time before a token bucket could get more tokens + if (m_packetSent) + Thread.Sleep(0); + else + Thread.Sleep((int)TickCountResolution); } catch (Exception ex) { @@ -841,7 +795,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP if (udpClient.ElapsedMSOutgoingPacketHandler >= 100) { ResendUnacked(udpClient); - udpClient.ElapsedMSOutgoingPacketHandler -= 100; + udpClient.ElapsedMSOutgoingPacketHandler = 0; udpClient.Elapsed100MSOutgoingPacketHandler += 1; } @@ -849,7 +803,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP if (udpClient.Elapsed100MSOutgoingPacketHandler >= 5) { SendAcks(udpClient); - udpClient.Elapsed100MSOutgoingPacketHandler -= 5; + udpClient.Elapsed100MSOutgoingPacketHandler = 0; udpClient.Elapsed500MSOutgoingPacketHandler += 1; } @@ -857,19 +811,12 @@ namespace OpenSim.Region.ClientStack.LindenUDP if (udpClient.Elapsed500MSOutgoingPacketHandler >= 10) { SendPing(udpClient); - udpClient.Elapsed500MSOutgoingPacketHandler -= 10; + udpClient.Elapsed500MSOutgoingPacketHandler = 0; } // Dequeue any outgoing packets that are within the throttle limits - // and get the minimum time we would have to sleep before this client - // could send a packet out - int minTimeoutThisLoop = udpClient.DequeueOutgoing(); - - // Although this is not thread safe, it is cheaper than locking and the - // worst that will happen is we sleep for slightly longer than the - // minimum necessary interval - if (minTimeoutThisLoop < m_minTimeout) - m_minTimeout = minTimeoutThisLoop; + if (udpClient.DequeueOutgoing()) + m_packetSent = true; } udpClient.TickLastOutgoingPacketHandler = thisTick; diff --git a/OpenSim/Region/ClientStack/LindenUDP/UnackedPacketCollection.cs b/OpenSim/Region/ClientStack/LindenUDP/UnackedPacketCollection.cs index 87c7df46ab..12f0c0a8b5 100644 --- a/OpenSim/Region/ClientStack/LindenUDP/UnackedPacketCollection.cs +++ b/OpenSim/Region/ClientStack/LindenUDP/UnackedPacketCollection.cs @@ -37,22 +37,34 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// public sealed class UnackedPacketCollection { - /// Synchronization primitive. A lock must be acquired on this - /// object before calling any of the unsafe methods - public object SyncRoot = new object(); + /// + /// Holds information about a pending acknowledgement + /// + private struct PendingAck + { + /// Sequence number of the packet to remove + public uint SequenceNumber; + /// Environment.TickCount value when the remove was queued. + /// This is used to update round-trip times for packets + public int RemoveTime; + /// Whether or not this acknowledgement was attached to a + /// resent packet. If so, round-trip time will not be calculated + public bool FromResend; + + public PendingAck(uint sequenceNumber, int currentTime, bool fromResend) + { + SequenceNumber = sequenceNumber; + RemoveTime = currentTime; + FromResend = fromResend; + } + } /// Holds the actual unacked packet data, sorted by sequence number - private SortedDictionary packets = new SortedDictionary(); - - /// Gets the total number of unacked packets - public int Count { get { return packets.Count; } } - - /// - /// Default constructor - /// - public UnackedPacketCollection() - { - } + private SortedDictionary m_packets = new SortedDictionary(); + /// Holds packets that need to be added to the unacknowledged list + private LocklessQueue m_pendingAdds = new LocklessQueue(); + /// Holds information about pending acknowledgements + private LocklessQueue m_pendingRemoves = new LocklessQueue(); /// /// Add an unacked packet to the collection @@ -60,74 +72,22 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// Packet that is awaiting acknowledgement /// True if the packet was successfully added, false if the /// packet already existed in the collection - public bool Add(OutgoingPacket packet) + /// This does not immediately add the ACK to the collection, + /// it only queues it so it can be added in a thread-safe way later + public void Add(OutgoingPacket packet) { - lock (SyncRoot) - { - if (!packets.ContainsKey(packet.SequenceNumber)) - { - packets.Add(packet.SequenceNumber, packet); - return true; - } - return false; - } + m_pendingAdds.Enqueue(packet); } /// - /// Removes a packet from the collection without attempting to obtain a - /// lock first + /// Marks a packet as acknowledged /// - /// Sequence number of the packet to remove - /// True if the packet was found and removed, otherwise false - public bool RemoveUnsafe(uint sequenceNumber) + /// Sequence number of the packet to + /// acknowledge + /// Current value of Environment.TickCount + public void Remove(uint sequenceNumber, int currentTime, bool fromResend) { - return packets.Remove(sequenceNumber); - } - - /// - /// Removes a packet from the collection without attempting to obtain a - /// lock first - /// - /// Sequence number of the packet to remove - /// Returns the removed packet - /// True if the packet was found and removed, otherwise false - public bool RemoveUnsafe(uint sequenceNumber, out OutgoingPacket packet) - { - if (packets.TryGetValue(sequenceNumber, out packet)) - { - packets.Remove(sequenceNumber); - return true; - } - - return false; - } - - /// - /// Removes all elements from the collection - /// - public void Clear() - { - lock (SyncRoot) - packets.Clear(); - } - - /// - /// Gets the packet with the lowest sequence number - /// - /// The packet with the lowest sequence number, or null if the - /// collection is empty - public OutgoingPacket GetOldest() - { - lock (SyncRoot) - { - using (SortedDictionary.ValueCollection.Enumerator e = packets.Values.GetEnumerator()) - { - if (e.MoveNext()) - return e.Current; - else - return null; - } - } + m_pendingRemoves.Enqueue(new PendingAck(sequenceNumber, currentTime, fromResend)); } /// @@ -138,14 +98,19 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// packet is considered expired /// A list of all expired packets according to the given /// expiration timeout + /// This function is not thread safe, and cannot be called + /// multiple times concurrently public List GetExpiredPackets(int timeoutMS) { + ProcessQueues(); + List expiredPackets = null; - lock (SyncRoot) + if (m_packets.Count > 0) { int now = Environment.TickCount; - foreach (OutgoingPacket packet in packets.Values) + + foreach (OutgoingPacket packet in m_packets.Values) { // TickCount of zero means a packet is in the resend queue // but hasn't actually been sent over the wire yet @@ -167,5 +132,35 @@ namespace OpenSim.Region.ClientStack.LindenUDP return expiredPackets; } + + private void ProcessQueues() + { + // Process all the pending adds + OutgoingPacket pendingAdd; + while (m_pendingAdds.Dequeue(out pendingAdd)) + m_packets[pendingAdd.SequenceNumber] = pendingAdd; + + // Process all the pending removes, including updating statistics and round-trip times + PendingAck pendingRemove; + OutgoingPacket ackedPacket; + while (m_pendingRemoves.Dequeue(out pendingRemove)) + { + if (m_packets.TryGetValue(pendingRemove.SequenceNumber, out ackedPacket)) + { + m_packets.Remove(pendingRemove.SequenceNumber); + + // Update stats + System.Threading.Interlocked.Add(ref ackedPacket.Client.UnackedBytes, -ackedPacket.Buffer.DataLength); + + if (!pendingRemove.FromResend) + { + // Calculate the round-trip time for this packet and its ACK + int rtt = pendingRemove.RemoveTime - ackedPacket.TickCount; + if (rtt > 0) + ackedPacket.Client.UpdateRoundTrip(rtt); + } + } + } + } } }