change UDPPacketBuffer pools (does waste a bit of memory)

0.9.1.0-post-fixes
UbitUmarov 2019-02-25 21:46:23 +00:00
parent 4dd89ce094
commit d01165818d
6 changed files with 157 additions and 324 deletions

View File

@ -12555,14 +12555,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// provide your own method.</param> /// provide your own method.</param>
protected void OutPacket(Packet packet, ThrottleOutPacketType throttlePacketType, bool doAutomaticSplitting, UnackedPacketMethod method) protected void OutPacket(Packet packet, ThrottleOutPacketType throttlePacketType, bool doAutomaticSplitting, UnackedPacketMethod method)
{ {
/* this is causing packet loss for some reason
if(!m_udpClient.IsConnected)
{
PacketPool.Instance.ReturnPacket(packet);
return;
}
*/
if (m_outPacketsToDrop != null) if (m_outPacketsToDrop != null)
{ {
if (m_outPacketsToDrop.Contains(packet.Type.ToString())) if (m_outPacketsToDrop.Contains(packet.Type.ToString()))

View File

@ -120,7 +120,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// <summary>Circuit code that this client is connected on</summary> /// <summary>Circuit code that this client is connected on</summary>
public readonly uint CircuitCode; public readonly uint CircuitCode;
/// <summary>Sequence numbers of packets we've received (for duplicate checking)</summary> /// <summary>Sequence numbers of packets we've received (for duplicate checking)</summary>
public IncomingPacketHistoryCollection PacketArchive = new IncomingPacketHistoryCollection(200); public IncomingPacketHistoryCollection PacketArchive = new IncomingPacketHistoryCollection(256);
/// <summary>Packets we have sent that need to be ACKed by the client</summary> /// <summary>Packets we have sent that need to be ACKed by the client</summary>
public UnackedPacketCollection NeedAcks = new UnackedPacketCollection(); public UnackedPacketCollection NeedAcks = new UnackedPacketCollection();
@ -803,8 +803,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
} }
} }
/// <summary> /// <summary>
/// Fires the OnQueueEmpty callback and sets the minimum time that it /// Fires the OnQueueEmpty callback and sets the minimum time that it
/// can be called again /// can be called again
@ -843,6 +841,11 @@ namespace OpenSim.Region.ClientStack.LindenUDP
return 0; return 0;
} }
public void FreeUDPBuffer(UDPPacketBuffer buf)
{
m_udpServer.FreeUDPBuffer(buf);
}
/// <summary> /// <summary>
/// Converts a <seealso cref="ThrottleOutPacketType"/> integer to a /// Converts a <seealso cref="ThrottleOutPacketType"/> integer to a
/// flag value /// flag value
@ -853,20 +856,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
{ {
ThrottleOutPacketType category = (ThrottleOutPacketType)i; ThrottleOutPacketType category = (ThrottleOutPacketType)i;
/*
* Land = 1,
/// <summary>Wind data</summary>
Wind = 2,
/// <summary>Cloud data</summary>
Cloud = 3,
/// <summary>Any packets that do not fit into the other throttles</summary>
Task = 4,
/// <summary>Texture assets</summary>
Texture = 5,
/// <summary>Non-texture assets</summary>
Asset = 6,
*/
switch (category) switch (category)
{ {
case ThrottleOutPacketType.Land: case ThrottleOutPacketType.Land:

View File

@ -344,18 +344,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
protected ExpiringCache<IPEndPoint, Queue<UDPPacketBuffer>> m_pendingCache = new ExpiringCache<IPEndPoint, Queue<UDPPacketBuffer>>(); protected ExpiringCache<IPEndPoint, Queue<UDPPacketBuffer>> m_pendingCache = new ExpiringCache<IPEndPoint, Queue<UDPPacketBuffer>>();
protected Pool<IncomingPacket> m_incomingPacketPool;
/// <summary>
/// Stat for number of packets in the main pool awaiting use.
/// </summary>
protected Stat m_poolCountStat;
/// <summary>
/// Stat for number of packets in the inbound packet pool awaiting use.
/// </summary>
protected Stat m_incomingPacketPoolStat;
protected int m_defaultRTO = 0; protected int m_defaultRTO = 0;
protected int m_maxRTO = 0; protected int m_maxRTO = 0;
protected int m_ackTimeout = 0; protected int m_ackTimeout = 0;
@ -498,7 +486,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// if (usePools) // if (usePools)
// EnablePools(); // EnablePools();
base.DisablePools();
} }
public void Start() public void Start()
@ -554,83 +541,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
OqrEngine.Stop(); OqrEngine.Stop();
} }
public override bool EnablePools()
{
if (!UsePools)
{
base.EnablePools();
m_incomingPacketPool = new Pool<IncomingPacket>(() => new IncomingPacket(), 500);
return true;
}
return false;
}
public override bool DisablePools()
{
if (UsePools)
{
base.DisablePools();
StatsManager.DeregisterStat(m_incomingPacketPoolStat);
// We won't null out the pool to avoid a race condition with code that may be in the middle of using it.
return true;
}
return false;
}
/// <summary>
/// This is a seperate method so that it can be called once we have an m_scene to distinguish different scene
/// stats.
/// </summary>
protected internal void EnablePoolStats()
{
m_poolCountStat
= new Stat(
"UDPPacketBufferPoolCount",
"Objects within the UDPPacketBuffer pool",
"The number of objects currently stored within the UDPPacketBuffer pool",
"",
"clientstack",
Scene.Name,
StatType.Pull,
stat => stat.Value = Pool.Count,
StatVerbosity.Debug);
StatsManager.RegisterStat(m_poolCountStat);
m_incomingPacketPoolStat
= new Stat(
"IncomingPacketPoolCount",
"Objects within incoming packet pool",
"The number of objects currently stored within the incoming packet pool",
"",
"clientstack",
Scene.Name,
StatType.Pull,
stat => stat.Value = m_incomingPacketPool.Count,
StatVerbosity.Debug);
StatsManager.RegisterStat(m_incomingPacketPoolStat);
}
/// <summary>
/// Disables pool stats.
/// </summary>
protected internal void DisablePoolStats()
{
StatsManager.DeregisterStat(m_poolCountStat);
m_poolCountStat = null;
StatsManager.DeregisterStat(m_incomingPacketPoolStat);
m_incomingPacketPoolStat = null;
}
/// <summary> /// <summary>
/// If the outgoing UDP thread times out, then return client that was being processed to help with debugging. /// If the outgoing UDP thread times out, then return client that was being processed to help with debugging.
/// </summary> /// </summary>
@ -658,8 +568,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
string.Format("Incoming Packet Async Handling Engine ({0})", Scene.Name), string.Format("Incoming Packet Async Handling Engine ({0})", Scene.Name),
"INCOMING PACKET ASYNC HANDLING ENGINE"); "INCOMING PACKET ASYNC HANDLING ENGINE");
*/ */
OqrEngine OqrEngine = new JobEngine(
= new JobEngine(
string.Format("Outgoing Queue Refill Engine ({0})", Scene.Name), string.Format("Outgoing Queue Refill Engine ({0})", Scene.Name),
"OUTGOING QUEUE REFILL ENGINE"); "OUTGOING QUEUE REFILL ENGINE");
@ -769,15 +678,20 @@ namespace OpenSim.Region.ClientStack.LindenUDP
stat => stat.Value = OqrEngine.JobsWaiting, stat => stat.Value = OqrEngine.JobsWaiting,
StatVerbosity.Debug)); StatVerbosity.Debug));
// We delay enabling pool stats to AddScene() instead of Initialize() so that we can distinguish pool stats by StatsManager.RegisterStat(
// scene name new Stat(
if (UsePools) "UDPBuffersPoolCount",
EnablePoolStats(); "Buffers in the UDP buffers pool",
"The number of buffers currently stored within the UDP buffers pool",
"",
"clientstack",
Scene.Name,
StatType.Pull,
stat => stat.Value = m_udpBuffersPoolPtr,
StatVerbosity.Debug));
LLUDPServerCommands commands = new LLUDPServerCommands(MainConsole.Instance, this); LLUDPServerCommands commands = new LLUDPServerCommands(MainConsole.Instance, this);
commands.Register(); commands.Register();
} }
public bool HandlesRegion(Location x) public bool HandlesRegion(Location x)
@ -939,9 +853,10 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// The vast majority of packets are less than 200 bytes, although due to asset transfers and packet splitting // The vast majority of packets are less than 200 bytes, although due to asset transfers and packet splitting
// there are a decent number of packets in the 1000-1140 byte range. We allocate one of two sizes of data here // there are a decent number of packets in the 1000-1140 byte range. We allocate one of two sizes of data here
// to accomodate for both common scenarios and provide ample room for ACK appending in both // to accomodate for both common scenarios and provide ample room for ACK appending in both
int bufferSize = (dataLength > 180) ? LLUDPServer.MTU : 200; //int bufferSize = (dataLength > 180) ? LLUDPServer.MTU : 200;
UDPPacketBuffer buffer = new UDPPacketBuffer(udpClient.RemoteEndPoint, bufferSize); //UDPPacketBuffer buffer = new UDPPacketBuffer(udpClient.RemoteEndPoint, bufferSize);
UDPPacketBuffer buffer = GetNewUDPBuffer(udpClient.RemoteEndPoint);
// Zerocode if needed // Zerocode if needed
if (doZerocode) if (doZerocode)
@ -971,7 +886,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// If the packet data wasn't already copied during zerocoding, copy it now // If the packet data wasn't already copied during zerocoding, copy it now
if (doCopy) if (doCopy)
{ {
if (dataLength <= buffer.Data.Length) //if (dataLength <= buffer.Data.Length)
if (dataLength <= LLUDPServer.MTU)
{ {
Buffer.BlockCopy(data, 0, buffer.Data, 0, dataLength); Buffer.BlockCopy(data, 0, buffer.Data, 0, dataLength);
} }
@ -979,7 +895,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
{ {
m_log.Error("[LLUDPSERVER]: Packet exceeded buffer size! This could be an indication of packet assembly not obeying the MTU. Type=" + m_log.Error("[LLUDPSERVER]: Packet exceeded buffer size! This could be an indication of packet assembly not obeying the MTU. Type=" +
type + ", DataLength=" + dataLength + ", BufferLength=" + buffer.Data.Length); type + ", DataLength=" + dataLength + ", BufferLength=" + buffer.Data.Length);
buffer = new UDPPacketBuffer(udpClient.RemoteEndPoint, dataLength); // buffer = new UDPPacketBuffer(udpClient.RemoteEndPoint, dataLength);
buffer = GetNewUDPBuffer(udpClient.RemoteEndPoint);
Buffer.BlockCopy(data, 0, buffer.Data, 0, dataLength); Buffer.BlockCopy(data, 0, buffer.Data, 0, dataLength);
} }
} }
@ -1168,9 +1085,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// Set the appended ACKs flag on this packet // Set the appended ACKs flag on this packet
buffer.Data[0] = (byte)(buffer.Data[0] | Helpers.MSG_APPENDED_ACKS); buffer.Data[0] = (byte)(buffer.Data[0] | Helpers.MSG_APPENDED_ACKS);
} }
}
buffer.DataLength = dataLength; buffer.DataLength = dataLength;
}
if (!isResend) if (!isResend)
{ {
@ -1178,12 +1094,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
uint sequenceNumber = (uint)Interlocked.Increment(ref udpClient.CurrentSequence); uint sequenceNumber = (uint)Interlocked.Increment(ref udpClient.CurrentSequence);
Utils.UIntToBytesBig(sequenceNumber, buffer.Data, 1); Utils.UIntToBytesBig(sequenceNumber, buffer.Data, 1);
outgoingPacket.SequenceNumber = sequenceNumber; outgoingPacket.SequenceNumber = sequenceNumber;
if (isReliable)
{
// Add this packet to the list of ACK responses we are waiting on from the server
udpClient.NeedAcks.Add(outgoingPacket);
}
} }
else else
{ {
@ -1196,9 +1106,18 @@ namespace OpenSim.Region.ClientStack.LindenUDP
PacketsSentCount++; PacketsSentCount++;
SyncSend(buffer); SyncSend(buffer);
// Keep track of when this packet was sent out (right now) // Keep track of when this packet was sent out (right now)
outgoingPacket.TickCount = Environment.TickCount & Int32.MaxValue; outgoingPacket.TickCount = Environment.TickCount & Int32.MaxValue;
if (outgoingPacket.UnackedMethod == null)
FreeUDPBuffer(buffer);
else if(!isResend)
{
// Add this packet to the list of ACK responses we are waiting on from the server
udpClient.NeedAcks.Add(outgoingPacket);
}
if (udpClient.DebugDataOutLevel > 0) if (udpClient.DebugDataOutLevel > 0)
m_log.DebugFormat( m_log.DebugFormat(
"[LLUDPSERVER]: Sending packet #{0} (rel: {1}, res: {2}) to {3} from {4}", "[LLUDPSERVER]: Sending packet #{0} (rel: {1}, res: {2}) to {3} from {4}",
@ -1240,7 +1159,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// buffer.DataLength, buffer.RemoteEndPoint, m_scene.RegionInfo.RegionName); // buffer.DataLength, buffer.RemoteEndPoint, m_scene.RegionInfo.RegionName);
RecordMalformedInboundPacket(endPoint); RecordMalformedInboundPacket(endPoint);
FreeUDPBuffer(buffer);
return; // Drop undersized packet return; // Drop undersized packet
} }
@ -1260,21 +1179,21 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// buffer.RemoteEndPoint, m_scene.RegionInfo.RegionName); // buffer.RemoteEndPoint, m_scene.RegionInfo.RegionName);
RecordMalformedInboundPacket(endPoint); RecordMalformedInboundPacket(endPoint);
FreeUDPBuffer(buffer);
return; // Malformed header return; // Malformed header
} }
try try
{ {
// packet = Packet.BuildPacket(buffer.Data, ref packetEnd, packet = Packet.BuildPacket(buffer.Data, ref packetEnd,
// // Only allocate a buffer for zerodecoding if the packet is zerocoded // Only allocate a buffer for zerodecoding if the packet is zerocoded
// ((buffer.Data[0] & Helpers.MSG_ZEROCODED) != 0) ? new byte[4096] : null); ((buffer.Data[0] & Helpers.MSG_ZEROCODED) != 0) ? new byte[4096] : null);
// If OpenSimUDPBase.UsePool == true (which is currently separate from the PacketPool) then we // If OpenSimUDPBase.UsePool == true (which is currently separate from the PacketPool) then we
// assume that packet construction does not retain a reference to byte[] buffer.Data (instead, all // assume that packet construction does not retain a reference to byte[] buffer.Data (instead, all
// bytes are copied out). // bytes are copied out).
packet = PacketPool.Instance.GetPacket(buffer.Data, ref packetEnd, // packet = PacketPool.Instance.GetPacket(buffer.Data, ref packetEnd,
// Only allocate a buffer for zerodecoding if the packet is zerocoded // Only allocate a buffer for zerodecoding if the packet is zerocoded
((buffer.Data[0] & Helpers.MSG_ZEROCODED) != 0) ? new byte[4096] : null); // ((buffer.Data[0] & Helpers.MSG_ZEROCODED) != 0) ? new byte[4096] : null);
} }
catch (Exception e) catch (Exception e)
{ {
@ -1292,7 +1211,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
} }
RecordMalformedInboundPacket(endPoint); RecordMalformedInboundPacket(endPoint);
FreeUDPBuffer(buffer);
return; return;
} }
@ -1311,17 +1230,16 @@ namespace OpenSim.Region.ClientStack.LindenUDP
lock (m_pendingCache) lock (m_pendingCache)
{ {
if (m_pendingCache.Contains(endPoint)) if (m_pendingCache.Contains(endPoint))
{
FreeUDPBuffer(buffer);
return; return;
}
m_pendingCache.AddOrUpdate(endPoint, new Queue<UDPPacketBuffer>(), 60); m_pendingCache.AddOrUpdate(endPoint, new Queue<UDPPacketBuffer>(), 60);
} }
// We need to copy the endpoint so that it doesn't get changed when another thread reuses the Util.FireAndForget(HandleUseCircuitCode, new object[] { endPoint, packet });
// buffer. FreeUDPBuffer(buffer);
object[] array = new object[] { new IPEndPoint(endPoint.Address, endPoint.Port), packet };
Util.FireAndForget(HandleUseCircuitCode, array);
return; return;
} }
} }
@ -1336,24 +1254,10 @@ namespace OpenSim.Region.ClientStack.LindenUDP
queue.Enqueue(buffer); queue.Enqueue(buffer);
return; return;
} }
/*
else if (packet.Type == PacketType.CompleteAgentMovement)
{
// Send ack straight away to let the viewer know that we got it.
SendAckImmediate(endPoint, packet.Header.Sequence);
// We need to copy the endpoint so that it doesn't get changed when another thread reuses the
// buffer.
object[] array = new object[] { new IPEndPoint(endPoint.Address, endPoint.Port), packet };
Util.FireAndForget(HandleCompleteMovementIntoRegion, array);
return;
}
*/
} }
FreeUDPBuffer(buffer);
// Determine which agent this packet came from // Determine which agent this packet came from
if (client == null || !(client is LLClientView)) if (client == null || !(client is LLClientView))
{ {
@ -1471,10 +1375,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
LogPacketHeader(true, udpClient.CircuitCode, 0, packet.Type, (ushort)packet.Length); LogPacketHeader(true, udpClient.CircuitCode, 0, packet.Type, (ushort)packet.Length);
#endregion BinaryStats #endregion BinaryStats
//AgentUpdate removed from here
#region Ping Check Handling #region Ping Check Handling
if (packet.Type == PacketType.StartPingCheck) if (packet.Type == PacketType.StartPingCheck)
@ -1506,17 +1406,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
IncomingPacket incomingPacket; IncomingPacket incomingPacket;
// Inbox insertion
if (UsePools)
{
incomingPacket = m_incomingPacketPool.GetObject();
incomingPacket.Client = (LLClientView)client;
incomingPacket.Packet = packet;
}
else
{
incomingPacket = new IncomingPacket((LLClientView)client, packet); incomingPacket = new IncomingPacket((LLClientView)client, packet);
}
// if (incomingPacket.Packet.Type == PacketType.AgentUpdate || // if (incomingPacket.Packet.Type == PacketType.AgentUpdate ||
// incomingPacket.Packet.Type == PacketType.ChatFromViewer) // incomingPacket.Packet.Type == PacketType.ChatFromViewer)
@ -1525,7 +1415,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// else // else
// packetInbox.Enqueue(incomingPacket); // packetInbox.Enqueue(incomingPacket);
packetInbox.Add(incomingPacket); packetInbox.Add(incomingPacket);
} }
#region BinaryStats #region BinaryStats
@ -1881,13 +1770,14 @@ namespace OpenSim.Region.ClientStack.LindenUDP
byte[] packetData = ack.ToBytes(); byte[] packetData = ack.ToBytes();
int length = packetData.Length; int length = packetData.Length;
UDPPacketBuffer buffer = new UDPPacketBuffer(remoteEndpoint, length); UDPPacketBuffer buffer = GetNewUDPBuffer(remoteEndpoint);
buffer.DataLength = length; buffer.DataLength = length;
Buffer.BlockCopy(packetData, 0, buffer.Data, 0, length); Buffer.BlockCopy(packetData, 0, buffer.Data, 0, length);
// AsyncBeginSend(buffer); // AsyncBeginSend(buffer);
SyncSend(buffer); SyncSend(buffer);
FreeUDPBuffer(buffer);
} }
protected bool IsClientAuthorized(UseCircuitCodePacket useCircuitCode, out AuthenticateResponse sessionInfo) protected bool IsClientAuthorized(UseCircuitCodePacket useCircuitCode, out AuthenticateResponse sessionInfo)
@ -1982,17 +1872,11 @@ namespace OpenSim.Region.ClientStack.LindenUDP
Scene.ThreadAlive(1); Scene.ThreadAlive(1);
try try
{ {
packetInbox.TryTake(out incomingPacket, 250); packetInbox.TryTake(out incomingPacket, 4500);
if (incomingPacket != null && IsRunningInbound) if (incomingPacket != null && IsRunningInbound)
{ {
ProcessInPacket(incomingPacket); ProcessInPacket(incomingPacket);
if (UsePools)
{
incomingPacket.Client = null;
m_incomingPacketPool.ReturnObject(incomingPacket);
}
incomingPacket = null; incomingPacket = null;
} }
} }

View File

@ -777,41 +777,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
m_udpServer.StopOutbound(); m_udpServer.StopOutbound();
} }
private void HandlePoolCommand(string module, string[] args)
{
if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene)
return;
if (args.Length != 4)
{
MainConsole.Instance.Output("Usage: debug lludp pool <on|off>");
return;
}
string enabled = args[3];
if (enabled == "on")
{
if (m_udpServer.EnablePools())
{
m_udpServer.EnablePoolStats();
MainConsole.Instance.OutputFormat("Packet pools enabled on {0}", m_udpServer.Scene.Name);
}
}
else if (enabled == "off")
{
if (m_udpServer.DisablePools())
{
m_udpServer.DisablePoolStats();
MainConsole.Instance.OutputFormat("Packet pools disabled on {0}", m_udpServer.Scene.Name);
}
}
else
{
MainConsole.Instance.Output("Usage: debug lludp pool <on|off>");
}
}
private void HandleAgentUpdateCommand(string module, string[] args) private void HandleAgentUpdateCommand(string module, string[] args)
{ {
if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene) if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene)
@ -834,8 +799,6 @@ namespace OpenSim.Region.ClientStack.LindenUDP
MainConsole.Instance.OutputFormat( MainConsole.Instance.OutputFormat(
"OUT LLUDP packet processing for {0} is {1}", m_udpServer.Scene.Name, m_udpServer.IsRunningOutbound ? "enabled" : "disabled"); "OUT LLUDP packet processing for {0} is {1}", m_udpServer.Scene.Name, m_udpServer.IsRunningOutbound ? "enabled" : "disabled");
MainConsole.Instance.OutputFormat("LLUDP pools in {0} are {1}", m_udpServer.Scene.Name, m_udpServer.UsePools ? "on" : "off");
MainConsole.Instance.OutputFormat( MainConsole.Instance.OutputFormat(
"Packet debug level for new clients is {0}", m_udpServer.DefaultClientPacketDebugLevel); "Packet debug level for new clients is {0}", m_udpServer.DefaultClientPacketDebugLevel);
} }

View File

@ -26,6 +26,7 @@
*/ */
using System; using System;
using System.Collections.Concurrent;
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
using System.Threading; using System.Threading;
@ -57,15 +58,9 @@ namespace OpenMetaverse
/// <summary>UDP socket, used in either client or server mode</summary> /// <summary>UDP socket, used in either client or server mode</summary>
private Socket m_udpSocket; private Socket m_udpSocket;
/// <summary> public static Object m_udpBuffersPoolLock = new Object();
/// Are we to use object pool(s) to reduce memory churn when receiving data? public static UDPPacketBuffer[] m_udpBuffersPool = new UDPPacketBuffer[1000];
/// </summary> public static int m_udpBuffersPoolPtr = -1;
public bool UsePools { get; protected set; }
/// <summary>
/// Pool to use for handling data. May be null if UsePools = false;
/// </summary>
protected OpenSim.Framework.Pool<UDPPacketBuffer> Pool { get; private set; }
/// <summary>Returns true if the server is currently listening for inbound packets, otherwise false</summary> /// <summary>Returns true if the server is currently listening for inbound packets, otherwise false</summary>
public bool IsRunningInbound { get; private set; } public bool IsRunningInbound { get; private set; }
@ -186,6 +181,52 @@ namespace OpenMetaverse
if(m_udpSocket !=null) if(m_udpSocket !=null)
try { m_udpSocket.Close(); } catch { } try { m_udpSocket.Close(); } catch { }
} }
public UDPPacketBuffer GetNewUDPBuffer()
{
lock (m_udpBuffersPoolLock)
{
if (m_udpBuffersPoolPtr >= 0)
{
UDPPacketBuffer buf = m_udpBuffersPool[m_udpBuffersPoolPtr];
m_udpBuffersPool[m_udpBuffersPoolPtr] = null;
m_udpBuffersPoolPtr--;
buf.RemoteEndPoint = new IPEndPoint(IPAddress.Any, 0);
return buf;
}
}
return new UDPPacketBuffer(new IPEndPoint(IPAddress.Any, 0));
}
public UDPPacketBuffer GetNewUDPBuffer(IPEndPoint remoteEndpoint)
{
lock (m_udpBuffersPoolLock)
{
if (m_udpBuffersPoolPtr >= 0)
{
UDPPacketBuffer buf = m_udpBuffersPool[m_udpBuffersPoolPtr];
m_udpBuffersPool[m_udpBuffersPoolPtr] = null;
m_udpBuffersPoolPtr--;
buf.RemoteEndPoint = remoteEndpoint;
return buf;
}
}
return new UDPPacketBuffer(remoteEndpoint);
}
public void FreeUDPBuffer(UDPPacketBuffer buf)
{
lock (m_udpBuffersPoolLock)
{
if (m_udpBuffersPoolPtr < 999)
{
buf.RemoteEndPoint = null;
m_udpBuffersPoolPtr++;
m_udpBuffersPool[m_udpBuffersPoolPtr] = buf;
}
}
}
/// <summary> /// <summary>
/// Start inbound UDP packet handling. /// Start inbound UDP packet handling.
/// </summary> /// </summary>
@ -202,6 +243,7 @@ namespace OpenMetaverse
/// manner (not throwing an exception when the remote side resets the /// manner (not throwing an exception when the remote side resets the
/// connection). This call is ignored on Mono where the flag is not /// connection). This call is ignored on Mono where the flag is not
/// necessary</remarks> /// necessary</remarks>
public virtual void StartInbound(int recvBufferSize) public virtual void StartInbound(int recvBufferSize)
{ {
if (!IsRunningInbound) if (!IsRunningInbound)
@ -306,48 +348,12 @@ namespace OpenMetaverse
IsRunningOutbound = false; IsRunningOutbound = false;
} }
public virtual bool EnablePools()
{
if (!UsePools)
{
Pool = new Pool<UDPPacketBuffer>(() => new UDPPacketBuffer(), 500);
UsePools = true;
return true;
}
return false;
}
public virtual bool DisablePools()
{
if (UsePools)
{
UsePools = false;
// We won't null out the pool to avoid a race condition with code that may be in the middle of using it.
return true;
}
return false;
}
private void AsyncBeginReceive() private void AsyncBeginReceive()
{ {
UDPPacketBuffer buf; if (!IsRunningInbound)
return;
// FIXME: Disabled for now as this causes issues with reused packet objects interfering with each other UDPPacketBuffer buf = GetNewUDPBuffer();
// on Windows with m_asyncPacketHandling = true, though this has not been seen on Linux.
// Possibly some unexpected issue with fetching UDP data concurrently with multiple threads. Requires more investigation.
// if (UsePools)
// buf = Pool.GetObject();
// else
buf = new UDPPacketBuffer();
if (IsRunningInbound)
{
try try
{ {
// kick off an async read // kick off an async read
@ -402,7 +408,6 @@ namespace OpenMetaverse
string.Format("[UDPBASE]: Error processing UDP begin receive {0}. Exception ", UdpReceives), e); string.Format("[UDPBASE]: Error processing UDP begin receive {0}. Exception ", UdpReceives), e);
} }
} }
}
private void AsyncEndReceive(IAsyncResult iar) private void AsyncEndReceive(IAsyncResult iar)
{ {
@ -465,14 +470,12 @@ namespace OpenMetaverse
} }
finally finally
{ {
// if (UsePools)
// Pool.ReturnObject(buffer);
AsyncBeginReceive(); AsyncBeginReceive();
} }
} }
} }
/* not in use
public void AsyncBeginSend(UDPPacketBuffer buf) public void AsyncBeginSend(UDPPacketBuffer buf)
{ {
// if (IsRunningOutbound) // if (IsRunningOutbound)
@ -511,7 +514,7 @@ namespace OpenMetaverse
catch (SocketException) { } catch (SocketException) { }
catch (ObjectDisposedException) { } catch (ObjectDisposedException) { }
} }
*/
public void SyncSend(UDPPacketBuffer buf) public void SyncSend(UDPPacketBuffer buf)
{ {
try try

View File

@ -203,6 +203,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
if (ackedPacket != null) if (ackedPacket != null)
{ {
m_packets.Remove(pendingAcknowledgement.SequenceNumber); m_packets.Remove(pendingAcknowledgement.SequenceNumber);
ackedPacket.Client.FreeUDPBuffer(ackedPacket.Buffer);
// As with other network applications, assume that an acknowledged packet is an // As with other network applications, assume that an acknowledged packet is an
// indication that the network can handle a little more load, speed up the transmission // indication that the network can handle a little more load, speed up the transmission
@ -241,6 +242,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
if (removedPacket != null) if (removedPacket != null)
{ {
m_packets.Remove(pendingRemove); m_packets.Remove(pendingRemove);
removedPacket.Client.FreeUDPBuffer(removedPacket.Buffer);
// Update stats // Update stats
Interlocked.Add(ref removedPacket.Client.UnackedBytes, -removedPacket.Buffer.DataLength); Interlocked.Add(ref removedPacket.Client.UnackedBytes, -removedPacket.Buffer.DataLength);