The ThreadPool ClientView Branch.

ThreadPoolClientBranch
Teravus Ovares 2008-02-22 05:58:55 +00:00
parent f75e418211
commit 14a8700be0
11 changed files with 395 additions and 262 deletions

View File

@ -34,6 +34,10 @@ namespace OpenSim.Framework
{ {
public Dictionary<uint, AgentCircuitData> AgentCircuits = new Dictionary<uint, AgentCircuitData>(); public Dictionary<uint, AgentCircuitData> AgentCircuits = new Dictionary<uint, AgentCircuitData>();
// here be dragons
public delegate void CircuitAddedCallback(uint circuitCode, AgentCircuitData agentData);
public event CircuitAddedCallback onCircuitAdded;
public AgentCircuitManager() public AgentCircuitManager()
{ {
} }
@ -83,6 +87,8 @@ namespace OpenSim.Framework
else else
{ {
AgentCircuits.Add(circuitCode, agentData); AgentCircuits.Add(circuitCode, agentData);
if (null != onCircuitAdded)
onCircuitAdded(circuitCode, agentData);
} }
} }
@ -124,4 +130,4 @@ namespace OpenSim.Framework
return false; return false;
} }
} }
} }

View File

@ -34,6 +34,7 @@ namespace OpenSim.Framework
{ {
private readonly Queue<T> m_queue = new Queue<T>(); private readonly Queue<T> m_queue = new Queue<T>();
private readonly object m_queueSync = new object(); private readonly object m_queueSync = new object();
private readonly object m_dequeueSync = new object();
public void Enqueue(T value) public void Enqueue(T value)
{ {
@ -50,9 +51,11 @@ namespace OpenSim.Framework
{ {
if (m_queue.Count < 1) if (m_queue.Count < 1)
{ {
Monitor.Wait(m_queueSync); lock (m_dequeueSync)
{
Monitor.Wait(m_queueSync);
}
} }
return m_queue.Dequeue(); return m_queue.Dequeue();
} }
} }
@ -64,10 +67,13 @@ namespace OpenSim.Framework
return m_queue.Contains(item); return m_queue.Contains(item);
} }
} }
public int Count() public int Count()
{ {
return m_queue.Count; lock (m_queueSync)
{
return m_queue.Count;
}
} }
} }
} }

View File

@ -93,10 +93,15 @@ namespace OpenSim.Framework
bool tryGetRet = false; bool tryGetRet = false;
lock (m_clients) lock (m_clients)
tryGetRet = m_clients.TryGetValue(circuitCode, out client); tryGetRet = m_clients.TryGetValue(circuitCode, out client);
if(tryGetRet) if (tryGetRet)
{ {
//m_log.Debug("[ClientManager]: Processing IN packet " + packet.Type.ToString());
client.InPacket(packet); client.InPacket(packet);
} }
else
{
m_log.Debug("[ClientManager]: Failed to find client for " + circuitCode.ToString());
}
} }
public void CloseAllAgents(uint circuitCode) public void CloseAllAgents(uint circuitCode)

View File

@ -528,7 +528,12 @@ namespace OpenSim.Framework
event MoneyBalanceRequest OnMoneyBalanceRequest; event MoneyBalanceRequest OnMoneyBalanceRequest;
EndPoint EndPoint
{
set;
get;
}
LLVector3 StartPos { get; set; } LLVector3 StartPos { get; set; }
LLUUID AgentId { get; } LLUUID AgentId { get; }
@ -645,8 +650,8 @@ namespace OpenSim.Framework
uint flags, LLUUID flImageID, LLUUID imageID, string profileURL, LLUUID partnerID); uint flags, LLUUID flImageID, LLUUID imageID, string profileURL, LLUUID partnerID);
byte[] GetThrottlesPacked(float multiplier); byte[] GetThrottlesPacked(float multiplier);
bool Claim(EndPoint ep, UseCircuitCodePacket usePacket);
void SetDebug(int newDebug); void SetDebug(int newDebug);
void InPacket(Packet NewPack); void InPacket(Packet NewPack);
void Close(bool ShutdownCircuit); void Close(bool ShutdownCircuit);

View File

@ -115,10 +115,20 @@ namespace OpenSim.Region.ClientStack
/* public variables */ /* public variables */
protected string m_firstName; protected string m_firstName;
protected string m_lastName; protected string m_lastName;
protected Thread m_clientThread; // protected Thread m_clientThread;
protected LLVector3 m_startpos; protected LLVector3 m_startpos;
protected EndPoint m_userEndPoint; protected EndPoint m_userEndPoint;
/// <summary>
/// do we process or wait?
/// </summary>
protected bool ReadyForPackets = false;
/// <summary>
/// holds packets that were received while paused.
/// </summary>
protected Queue<Packet> PausedPackets = new Queue<Packet>();
/* Properties */ /* Properties */
public LLUUID SecureSessionId public LLUUID SecureSessionId
@ -189,46 +199,62 @@ namespace OpenSim.Region.ClientStack
get { return m_moneyBalance; } get { return m_moneyBalance; }
} }
public EndPoint EndPoint
{
set { m_userEndPoint = value; }
get { return m_userEndPoint; }
}
/* METHODS */ /* METHODS */
public ClientView(EndPoint remoteEP, IScene scene, AssetCache assetCache, PacketServer packServer, public ClientView(IScene scene, AssetCache assetCache, PacketServer packServer,
AgentCircuitManager authenSessions, LLUUID agentId, LLUUID sessionId, uint circuitCode) AgentCircuitManager authenSessions, AgentCircuitData agentData)
{ {
m_log.Info("[CLIENT]: Creating new client for SessionID " + agentData.SessionID.ToString());
m_moneyBalance = 1000; m_moneyBalance = 1000;
m_channelVersion = Helpers.StringToField(scene.GetSimulatorVersion()); m_channelVersion = Helpers.StringToField(scene.GetSimulatorVersion());
m_scene = scene; m_scene = scene;
m_assetCache = assetCache; m_assetCache = assetCache;
m_networkServer = packServer; m_networkServer = packServer;
// m_inventoryCache = inventoryCache;
m_authenticateSessionsHandler = authenSessions; m_authenticateSessionsHandler = authenSessions;
m_log.Info("[CLIENT]: Started up new client thread to handle incoming request"); m_agentId = agentData.AgentID;
m_sessionId = agentData.SessionID;
m_circuitCode = agentData.circuitcode;
m_agentId = agentId; m_startpos = m_authenticateSessionsHandler.GetPosition(m_circuitCode);
m_sessionId = sessionId; m_packetQueue = new PacketQueue(this);
m_circuitCode = circuitCode;
m_userEndPoint = remoteEP;
m_startpos = m_authenticateSessionsHandler.GetPosition(circuitCode);
// While working on this, the BlockingQueue had me fooled for a bit.
// The Blocking queue causes the thread to stop until there's something
// in it to process. It's an on-purpose threadlock though because
// without it, the clientloop will suck up all sim resources.
m_packetQueue = new PacketQueue();
RegisterLocalPacketHandlers(); RegisterLocalPacketHandlers();
}
m_clientThread = new Thread(new ThreadStart(AuthUser)); public bool Claim(EndPoint ep, UseCircuitCodePacket usePacket)
m_clientThread.Name = "ClientThread"; {
m_clientThread.IsBackground = true; if (usePacket.CircuitCode.Code != m_circuitCode)
m_clientThread.Start(); {
OpenSim.Framework.ThreadTracker.Add(m_clientThread); m_log.Debug("[ClientView]: Circuit code " + usePacket.CircuitCode.Code.ToString()
+ " does NOT match." + m_circuitCode.ToString());
return false;
}
if (0 != usePacket.CircuitCode.SessionID.CompareTo(m_sessionId))
{
m_log.Debug("[ClientView]: SessionID " + usePacket.CircuitCode.SessionID.ToString()
+ " does not match " + m_sessionId.ToString());
return false;
}
// this isnt' really the right name anymore.
EndPoint = ep;
AuthUser();
ack_pack(usePacket);
m_log.Debug("[ClientView]: Dealing with " + PausedPackets.Count.ToString() + " packets backlogged.");
ReadyForPackets = true;
// stuff the paused packets BACK into the thread queue. :)
while (PausedPackets.Count > 0)
{
m_networkServer.Enqueue(m_circuitCode, PausedPackets.Dequeue());
}
return true;
} }
public void SetDebug(int newDebug) public void SetDebug(int newDebug)
@ -254,29 +280,10 @@ namespace OpenSim.Region.ClientStack
m_packetQueue.Close(); m_packetQueue.Close();
m_packetQueue.Flush(); m_packetQueue.Flush();
Thread.Sleep(2000);
// Shut down timers
m_ackTimer.Stop(); m_ackTimer.Stop();
m_clientPingTimer.Stop(); m_clientPingTimer.Stop();
// This is just to give the client a reasonable chance of GC.Collect();
// flushing out all it's packets. There should probably
// be a better mechanism here
// We can't reach into other scenes and close the connection
// We need to do this over grid communications
//m_scene.CloseAllAgents(CircuitCode);
// If we're not shutting down the circuit, then this is the last time we'll go here.
// If we are shutting down the circuit, the UDP Server will come back here with
// ShutDownCircuit = false
if (!(ShutdownCircult))
{
GC.Collect();
m_clientThread.Abort();
}
} }
/// <summary> /// <summary>
@ -287,13 +294,14 @@ namespace OpenSim.Region.ClientStack
public void Close(bool ShutdownCircult) public void Close(bool ShutdownCircult)
{ {
// Pull Client out of Region // Pull Client out of Region
m_log.Info("[CLIENT]: Close has been called"); m_log.Info("[CLIENT]: Close has been called with " + ShutdownCircult.ToString());
//raiseevent on the packet server to Shutdown the circuit //raiseevent on the packet server to Shutdown the circuit
if (ShutdownCircult) if (ShutdownCircult)
OnConnectionClosed(this); m_networkServer.CloseClient(this);
else
CloseCleanup(ShutdownCircult); CloseCleanup(ShutdownCircult);
//OnConnectionClosed(this);
} }
public void Kick(string message) public void Kick(string message)
@ -402,29 +410,6 @@ namespace OpenSim.Region.ClientStack
} }
} }
protected virtual void ClientLoop()
{
m_log.Info("[CLIENT]: Entered loop");
while (true)
{
QueItem nextPacket = m_packetQueue.Dequeue();
if (nextPacket.Incoming)
{
if (nextPacket.Packet.Type != PacketType.AgentUpdate)
{
m_packetsReceived++;
}
DebugPacket("IN", nextPacket.Packet);
ProcessInPacket(nextPacket.Packet);
}
else
{
DebugPacket("OUT", nextPacket.Packet);
ProcessOutPacket(nextPacket.Packet);
}
}
}
# endregion # endregion
protected void CheckClientConnectivity(object sender, ElapsedEventArgs e) protected void CheckClientConnectivity(object sender, ElapsedEventArgs e)
@ -460,6 +445,7 @@ namespace OpenSim.Region.ClientStack
protected virtual void InitNewClient() protected virtual void InitNewClient()
{ {
m_log.Debug("[ClientView]: Setting up timers");
//this.UploadAssets = new AgentAssetUpload(this, m_assetCache, m_inventoryCache); //this.UploadAssets = new AgentAssetUpload(this, m_assetCache, m_inventoryCache);
// Establish our two timers. We could probably get this down to one // Establish our two timers. We could probably get this down to one
@ -471,7 +457,7 @@ namespace OpenSim.Region.ClientStack
m_clientPingTimer.Elapsed += new ElapsedEventHandler(CheckClientConnectivity); m_clientPingTimer.Elapsed += new ElapsedEventHandler(CheckClientConnectivity);
m_clientPingTimer.Enabled = true; m_clientPingTimer.Enabled = true;
m_log.Info("[CLIENT]: Adding viewer agent to scene"); m_log.Info("[ClientView]: Adding viewer agent to scene");
m_scene.AddNewClient(this, true); m_scene.AddNewClient(this, true);
} }
@ -487,11 +473,11 @@ namespace OpenSim.Region.ClientStack
m_log.Info("[CLIENT]: New user request denied to " + m_userEndPoint.ToString()); m_log.Info("[CLIENT]: New user request denied to " + m_userEndPoint.ToString());
m_packetQueue.Flush(); m_packetQueue.Flush();
m_packetQueue.Close(); m_packetQueue.Close();
m_clientThread.Abort(); // m_clientThread.Abort();
} }
else else
{ {
m_log.Info("[CLIENT]: Got authenticated connection from " + m_userEndPoint.ToString()); m_log.Info("[CLIENT]: Received authenticated connection from " + m_userEndPoint.ToString());
//session is authorised //session is authorised
m_firstName = sessionInfo.LoginInfo.First; m_firstName = sessionInfo.LoginInfo.First;
m_lastName = sessionInfo.LoginInfo.Last; m_lastName = sessionInfo.LoginInfo.Last;
@ -503,7 +489,7 @@ namespace OpenSim.Region.ClientStack
// This sets up all the timers // This sets up all the timers
InitNewClient(); InitNewClient();
ClientLoop(); //ClientLoop();
} }
} }
@ -2458,11 +2444,15 @@ namespace OpenSim.Region.ClientStack
} }
} }
protected virtual void ProcessOutPacket(Packet Pack) /// <summary>
/// processes an outbound packet and sends the data.
/// Needs to be public so that throttle can use it.
/// </summary>
/// <param name="Pack"></param>
public virtual void ProcessOutPacket(Packet Pack)
{ {
// Keep track of when this packet was sent out // Keep track of when this packet was sent out
Pack.TickCount = System.Environment.TickCount; Pack.TickCount = System.Environment.TickCount;
if (!Pack.Header.Resent) if (!Pack.Header.Resent)
{ {
Pack.Header.Sequence = NextSeqNum(); Pack.Header.Sequence = NextSeqNum();
@ -2482,6 +2472,8 @@ namespace OpenSim.Region.ClientStack
// Actually make the byte array and send it // Actually make the byte array and send it
try try
{ {
//m_log.Debug("[ClientView]: Sending packet " + Pack.Type.ToString() + " to "
// + m_circuitCode.ToString());
byte[] sendbuffer = Pack.ToBytes(); byte[] sendbuffer = Pack.ToBytes();
PacketPool.Instance.ReturnPacket(Pack); PacketPool.Instance.ReturnPacket(Pack);
@ -2489,10 +2481,12 @@ namespace OpenSim.Region.ClientStack
{ {
int packetsize = Helpers.ZeroEncode(sendbuffer, sendbuffer.Length, ZeroOutBuffer); int packetsize = Helpers.ZeroEncode(sendbuffer, sendbuffer.Length, ZeroOutBuffer);
m_networkServer.SendPacketTo(ZeroOutBuffer, packetsize, SocketFlags.None, m_circuitCode); m_networkServer.SendPacketTo(ZeroOutBuffer, packetsize, SocketFlags.None, m_circuitCode);
// m_log.Debug("[ClientView]: Sent to " + m_circuitCode.ToString() + " " + Pack.Type.ToString());
} }
else else
{ {
m_networkServer.SendPacketTo(sendbuffer, sendbuffer.Length, SocketFlags.None, m_circuitCode); m_networkServer.SendPacketTo(sendbuffer, sendbuffer.Length, SocketFlags.None, m_circuitCode);
// m_log.Debug("[ClientView]: sent to " + m_circuitCode.ToString() + " " + Pack.Type.ToString());
} }
} }
catch (Exception e) catch (Exception e)
@ -2507,72 +2501,96 @@ namespace OpenSim.Region.ClientStack
public virtual void InPacket(Packet NewPack) public virtual void InPacket(Packet NewPack)
{ {
// Handle appended ACKs // deals with the lags involved on gettign the client setup.
if (NewPack != null) // we can't accept packets for the user until after Auth()
if (!ReadyForPackets)
{ {
if (NewPack.Header.AppendedAcks) PausedPackets.Enqueue(NewPack);
{ return;
lock (m_needAck) }
{
foreach (uint ackedPacketId in NewPack.Header.AckList)
{
Packet ackedPacket;
if (m_needAck.TryGetValue(ackedPacketId, out ackedPacket)) // lock (m_packetQueue)
{
// Handle appended ACKs
if (NewPack != null)
{
if (NewPack.Header.AppendedAcks)
{
lock (m_needAck)
{
foreach (uint ackedPacketId in NewPack.Header.AckList)
{ {
m_unAckedBytes -= ackedPacket.ToBytes().Length; Packet ackedPacket;
m_needAck.Remove(ackedPacketId);
if (m_needAck.TryGetValue(ackedPacketId, out ackedPacket))
{
m_unAckedBytes -= ackedPacket.ToBytes().Length;
m_needAck.Remove(ackedPacketId);
}
} }
} }
} }
}
// Handle PacketAck packets // Handle PacketAck packets
if (NewPack.Type == PacketType.PacketAck) if (NewPack.Type == PacketType.PacketAck)
{
PacketAckPacket ackPacket = (PacketAckPacket)NewPack;
lock (m_needAck)
{ {
foreach (PacketAckPacket.PacketsBlock block in ackPacket.Packets) PacketAckPacket ackPacket = (PacketAckPacket)NewPack;
lock (m_needAck)
{ {
uint ackedPackId = block.ID; foreach (PacketAckPacket.PacketsBlock block in ackPacket.Packets)
Packet ackedPacket;
if (m_needAck.TryGetValue(ackedPackId, out ackedPacket))
{ {
m_unAckedBytes -= ackedPacket.ToBytes().Length; uint ackedPackId = block.ID;
m_needAck.Remove(ackedPackId); Packet ackedPacket;
if (m_needAck.TryGetValue(ackedPackId, out ackedPacket))
{
m_unAckedBytes -= ackedPacket.ToBytes().Length;
m_needAck.Remove(ackedPackId);
}
} }
} }
} }
} else if ((NewPack.Type == PacketType.StartPingCheck))
else if ((NewPack.Type == PacketType.StartPingCheck)) {
{ //reply to pingcheck
//reply to pingcheck StartPingCheckPacket startPing = (StartPingCheckPacket)NewPack;
StartPingCheckPacket startPing = (StartPingCheckPacket)NewPack; CompletePingCheckPacket endPing = (CompletePingCheckPacket)PacketPool.Instance.GetPacket(PacketType.CompletePingCheck);
CompletePingCheckPacket endPing = (CompletePingCheckPacket)PacketPool.Instance.GetPacket(PacketType.CompletePingCheck); endPing.PingID.PingID = startPing.PingID.PingID;
endPing.PingID.PingID = startPing.PingID.PingID; OutPacket(endPing, ThrottleOutPacketType.Task);
OutPacket(endPing, ThrottleOutPacketType.Task); }
} else
else {
{ QueItem item = new QueItem();
QueItem item = new QueItem(); item.Packet = NewPack;
item.Packet = NewPack; item.Incoming = true;
item.Incoming = true; //m_packetQueue.Enqueue(item);
m_packetQueue.Enqueue(item); ProcessInPacket(NewPack);
}
} }
} }
} }
public virtual void OutPacket(Packet NewPack, ThrottleOutPacketType throttlePacketType) public virtual void OutPacket(Packet NewPack, ThrottleOutPacketType throttlePacketType)
{ {
QueItem item = new QueItem(); //lock (m_packetQueue)
item.Packet = NewPack; {
item.Incoming = false; QueItem item = new QueItem();
item.throttleType = throttlePacketType; // Packet throttle type item.Packet = NewPack;
m_packetQueue.Enqueue(item); item.Incoming = false;
m_packetsSent++; item.throttleType = throttlePacketType; // Packet throttle type
// if it's unknown, just punt it out. probably an ack.
if (throttlePacketType == ThrottleOutPacketType.Unknown)
{
ProcessOutPacket(NewPack);
}
else
{
m_packetQueue.Enqueue(item);
}
m_packetsSent++;
}
} }
# region Low Level Packet Methods # region Low Level Packet Methods
@ -2581,6 +2599,7 @@ namespace OpenSim.Region.ClientStack
{ {
if (Pack.Header.Reliable) if (Pack.Header.Reliable)
{ {
//m_log.Debug("[ClientView]: Acking " + Pack.Type.ToString());
PacketAckPacket ack_it = (PacketAckPacket)PacketPool.Instance.GetPacket(PacketType.PacketAck); PacketAckPacket ack_it = (PacketAckPacket)PacketPool.Instance.GetPacket(PacketType.PacketAck);
// TODO: don't create new blocks if recycling an old packet // TODO: don't create new blocks if recycling an old packet
ack_it.Packets = new PacketAckPacket.PacketsBlock[1]; ack_it.Packets = new PacketAckPacket.PacketsBlock[1];

View File

@ -37,9 +37,15 @@ namespace OpenSim.Region.ClientStack
{ {
public class PacketQueue public class PacketQueue
{ {
//private static readonly log4net.ILog m_log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType); private static readonly log4net.ILog m_log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
private bool m_enabled = true; private bool m_enabled = false;
public bool Enable
{
get { return m_enabled; }
set { m_enabled = value; m_log.Debug("[PacketQueue]: Enabled == " + value.ToString()); }
}
private BlockingQueue<QueItem> SendQueue; private BlockingQueue<QueItem> SendQueue;
@ -77,7 +83,12 @@ namespace OpenSim.Region.ClientStack
// private long ThrottleInterval; // private long ThrottleInterval;
private Timer throttleTimer; private Timer throttleTimer;
public PacketQueue() /// <summary>
/// backreference so we can push packets out the client. May be temporary.
/// </summary>
private ClientView Client;
public PacketQueue(ClientView client)
{ {
// While working on this, the BlockingQueue had me fooled for a bit. // While working on this, the BlockingQueue had me fooled for a bit.
// The Blocking queue causes the thread to stop until there's something // The Blocking queue causes the thread to stop until there's something
@ -86,6 +97,8 @@ namespace OpenSim.Region.ClientStack
SendQueue = new BlockingQueue<QueItem>(); SendQueue = new BlockingQueue<QueItem>();
Client = client;
IncomingPacketQueue = new Queue<QueItem>(); IncomingPacketQueue = new Queue<QueItem>();
OutgoingPacketQueue = new Queue<QueItem>(); OutgoingPacketQueue = new Queue<QueItem>();
ResendOutgoingPacketQueue = new Queue<QueItem>(); ResendOutgoingPacketQueue = new Queue<QueItem>();
@ -123,44 +136,49 @@ namespace OpenSim.Region.ClientStack
public void Enqueue(QueItem item) public void Enqueue(QueItem item)
{ {
if (!m_enabled)
{
return;
}
// We could micro lock, but that will tend to actually // We could micro lock, but that will tend to actually
// probably be worse than just synchronizing on SendQueue // probably be worse than just synchronizing on SendQueue
lock (this) // enqueue inbound right away. forget throttle checking inbound!
if (item.Incoming)
{ {
switch (item.throttleType) SendQueue.Enqueue(item);
}
else
{
lock (this)
{ {
case ThrottleOutPacketType.Resend: switch (item.throttleType)
ThrottleCheck(ref ResendThrottle, ref ResendOutgoingPacketQueue, item); {
break; case ThrottleOutPacketType.Resend:
case ThrottleOutPacketType.Texture: ThrottleCheck(ref ResendThrottle, ref ResendOutgoingPacketQueue, item);
ThrottleCheck(ref TextureThrottle, ref TextureOutgoingPacketQueue, item); break;
break; case ThrottleOutPacketType.Texture:
case ThrottleOutPacketType.Task: ThrottleCheck(ref TextureThrottle, ref TextureOutgoingPacketQueue, item);
ThrottleCheck(ref TaskThrottle, ref TaskOutgoingPacketQueue, item); break;
break; case ThrottleOutPacketType.Task:
case ThrottleOutPacketType.Land: ThrottleCheck(ref TaskThrottle, ref TaskOutgoingPacketQueue, item);
ThrottleCheck(ref LandThrottle, ref LandOutgoingPacketQueue, item); break;
break; case ThrottleOutPacketType.Land:
case ThrottleOutPacketType.Asset: ThrottleCheck(ref LandThrottle, ref LandOutgoingPacketQueue, item);
ThrottleCheck(ref AssetThrottle, ref AssetOutgoingPacketQueue, item); break;
break; case ThrottleOutPacketType.Asset:
case ThrottleOutPacketType.Cloud: ThrottleCheck(ref AssetThrottle, ref AssetOutgoingPacketQueue, item);
ThrottleCheck(ref CloudThrottle, ref CloudOutgoingPacketQueue, item); break;
break; case ThrottleOutPacketType.Cloud:
case ThrottleOutPacketType.Wind: ThrottleCheck(ref CloudThrottle, ref CloudOutgoingPacketQueue, item);
ThrottleCheck(ref WindThrottle, ref WindOutgoingPacketQueue, item); break;
break; case ThrottleOutPacketType.Wind:
ThrottleCheck(ref WindThrottle, ref WindOutgoingPacketQueue, item);
break;
default: default:
// Acknowledgements and other such stuff should go directly to the blocking Queue // Acknowledgements and other such stuff should go directly to the blocking Queue
// Throttling them may and likely 'will' be problematic // Throttling them may and likely 'will' be problematic
SendQueue.Enqueue(item); Client.ProcessOutPacket(item.Packet);
break; break;
}
} }
} }
} }
@ -174,39 +192,34 @@ namespace OpenSim.Region.ClientStack
{ {
lock (this) lock (this)
{ {
while (PacketsWaiting()) while (ResendOutgoingPacketQueue.Count > 0)
{ {
//Now comes the fun part.. we dump all our elements into m_packetQueue that we've saved up. Client.ProcessOutPacket(ResendOutgoingPacketQueue.Dequeue().Packet);
if (ResendOutgoingPacketQueue.Count > 0) }
{ while (LandOutgoingPacketQueue.Count > 0)
SendQueue.Enqueue(ResendOutgoingPacketQueue.Dequeue()); {
} Client.ProcessOutPacket(LandOutgoingPacketQueue.Dequeue().Packet);
if (LandOutgoingPacketQueue.Count > 0) }
{ while (WindOutgoingPacketQueue.Count > 0)
SendQueue.Enqueue(LandOutgoingPacketQueue.Dequeue()); {
} Client.ProcessOutPacket(WindOutgoingPacketQueue.Dequeue().Packet);
if (WindOutgoingPacketQueue.Count > 0) }
{ while (CloudOutgoingPacketQueue.Count > 0)
SendQueue.Enqueue(WindOutgoingPacketQueue.Dequeue()); {
} Client.ProcessOutPacket(CloudOutgoingPacketQueue.Dequeue().Packet);
if (CloudOutgoingPacketQueue.Count > 0) }
{ while (TaskOutgoingPacketQueue.Count > 0)
SendQueue.Enqueue(CloudOutgoingPacketQueue.Dequeue()); {
} Client.ProcessOutPacket(TaskOutgoingPacketQueue.Dequeue().Packet);
if (TaskOutgoingPacketQueue.Count > 0) }
{ while (TextureOutgoingPacketQueue.Count > 0)
SendQueue.Enqueue(TaskOutgoingPacketQueue.Dequeue()); {
} Client.ProcessOutPacket(TextureOutgoingPacketQueue.Dequeue().Packet);
if (TextureOutgoingPacketQueue.Count > 0) }
{ while (AssetOutgoingPacketQueue.Count > 0)
SendQueue.Enqueue(TextureOutgoingPacketQueue.Dequeue()); {
} Client.ProcessOutPacket(AssetOutgoingPacketQueue.Dequeue().Packet);
if (AssetOutgoingPacketQueue.Count > 0)
{
SendQueue.Enqueue(AssetOutgoingPacketQueue.Dequeue());
}
} }
// m_log.Info("[THROTTLE]: Processed " + throttleLoops + " packets");
} }
} }
@ -265,7 +278,9 @@ namespace OpenSim.Region.ClientStack
{ {
QueItem qpack = ResendOutgoingPacketQueue.Dequeue(); QueItem qpack = ResendOutgoingPacketQueue.Dequeue();
SendQueue.Enqueue(qpack); //SendQueue.Enqueue(qpack);
// m_log.Debug("[PacketQueue]: ThrottleCheck Sending " + qpack.Incoming.ToString() + " packet " + qpack.Packet.Type.ToString());
Client.ProcessOutPacket(qpack.Packet);
TotalThrottle.Add(qpack.Packet.ToBytes().Length); TotalThrottle.Add(qpack.Packet.ToBytes().Length);
ResendThrottle.Add(qpack.Packet.ToBytes().Length); ResendThrottle.Add(qpack.Packet.ToBytes().Length);
} }
@ -273,7 +288,9 @@ namespace OpenSim.Region.ClientStack
{ {
QueItem qpack = LandOutgoingPacketQueue.Dequeue(); QueItem qpack = LandOutgoingPacketQueue.Dequeue();
SendQueue.Enqueue(qpack); //SendQueue.Enqueue(qpack);
// m_log.Debug("[PacketQueue]: ThrottleCheck Sending " + qpack.Incoming.ToString() + " packet " + qpack.Packet.Type.ToString());
Client.ProcessOutPacket(qpack.Packet);
TotalThrottle.Add(qpack.Packet.ToBytes().Length); TotalThrottle.Add(qpack.Packet.ToBytes().Length);
LandThrottle.Add(qpack.Packet.ToBytes().Length); LandThrottle.Add(qpack.Packet.ToBytes().Length);
} }
@ -281,7 +298,9 @@ namespace OpenSim.Region.ClientStack
{ {
QueItem qpack = WindOutgoingPacketQueue.Dequeue(); QueItem qpack = WindOutgoingPacketQueue.Dequeue();
SendQueue.Enqueue(qpack); // SendQueue.Enqueue(qpack);
//m_log.Debug("[PacketQueue]: ThrottleCheck Sending " + qpack.Incoming.ToString() + " packet " + qpack.Packet.Type.ToString());
Client.ProcessOutPacket(qpack.Packet);
TotalThrottle.Add(qpack.Packet.ToBytes().Length); TotalThrottle.Add(qpack.Packet.ToBytes().Length);
WindThrottle.Add(qpack.Packet.ToBytes().Length); WindThrottle.Add(qpack.Packet.ToBytes().Length);
} }
@ -289,7 +308,9 @@ namespace OpenSim.Region.ClientStack
{ {
QueItem qpack = CloudOutgoingPacketQueue.Dequeue(); QueItem qpack = CloudOutgoingPacketQueue.Dequeue();
SendQueue.Enqueue(qpack); //SendQueue.Enqueue(qpack);
// m_log.Debug("[PacketQueue]: ThrottleCheck Sending " + qpack.Incoming.ToString() + " packet " + qpack.Packet.Type.ToString());
Client.ProcessOutPacket(qpack.Packet);
TotalThrottle.Add(qpack.Packet.ToBytes().Length); TotalThrottle.Add(qpack.Packet.ToBytes().Length);
CloudThrottle.Add(qpack.Packet.ToBytes().Length); CloudThrottle.Add(qpack.Packet.ToBytes().Length);
} }
@ -297,7 +318,10 @@ namespace OpenSim.Region.ClientStack
{ {
QueItem qpack = TaskOutgoingPacketQueue.Dequeue(); QueItem qpack = TaskOutgoingPacketQueue.Dequeue();
SendQueue.Enqueue(qpack); //SendQueue.Enqueue(qpack);
// m_log.Debug("[PacketQueue]: ThrottleCheck Sending " + qpack.Incoming.ToString() + " packet " + qpack.Packet.Type.ToString());
Client.ProcessOutPacket(qpack.Packet);
TotalThrottle.Add(qpack.Packet.ToBytes().Length); TotalThrottle.Add(qpack.Packet.ToBytes().Length);
TaskThrottle.Add(qpack.Packet.ToBytes().Length); TaskThrottle.Add(qpack.Packet.ToBytes().Length);
} }
@ -305,7 +329,9 @@ namespace OpenSim.Region.ClientStack
{ {
QueItem qpack = TextureOutgoingPacketQueue.Dequeue(); QueItem qpack = TextureOutgoingPacketQueue.Dequeue();
SendQueue.Enqueue(qpack); //SendQueue.Enqueue(qpack);
// m_log.Debug("[PacketQueue]: ThrottleCheck Sending " + qpack.Incoming.ToString() + " packet " + qpack.Packet.Type.ToString());
Client.ProcessOutPacket(qpack.Packet);
TotalThrottle.Add(qpack.Packet.ToBytes().Length); TotalThrottle.Add(qpack.Packet.ToBytes().Length);
TextureThrottle.Add(qpack.Packet.ToBytes().Length); TextureThrottle.Add(qpack.Packet.ToBytes().Length);
} }
@ -313,7 +339,9 @@ namespace OpenSim.Region.ClientStack
{ {
QueItem qpack = AssetOutgoingPacketQueue.Dequeue(); QueItem qpack = AssetOutgoingPacketQueue.Dequeue();
SendQueue.Enqueue(qpack); //SendQueue.Enqueue(qpack);
// m_log.Debug("[PacketQueue]: ThrottleCheck Sending " + qpack.Incoming.ToString() + " packet " + qpack.Packet.Type.ToString());
Client.ProcessOutPacket(qpack.Packet);
TotalThrottle.Add(qpack.Packet.ToBytes().Length); TotalThrottle.Add(qpack.Packet.ToBytes().Length);
AssetThrottle.Add(qpack.Packet.ToBytes().Length); AssetThrottle.Add(qpack.Packet.ToBytes().Length);
} }
@ -337,17 +365,18 @@ namespace OpenSim.Region.ClientStack
// wait for the timer to fire to put things into the // wait for the timer to fire to put things into the
// output queue // output queue
if ((q.Count == 0) && (throttle.UnderLimit())) if (m_enabled && (q.Count == 0) && (throttle.UnderLimit()))
{ {
Monitor.Enter(this); Monitor.Enter(this);
throttle.Add(item.Packet.ToBytes().Length); throttle.Add(item.Packet.ToBytes().Length);
TotalThrottle.Add(item.Packet.ToBytes().Length); TotalThrottle.Add(item.Packet.ToBytes().Length);
SendQueue.Enqueue(item); Client.ProcessOutPacket(item.Packet);
Monitor.Pulse(this); Monitor.Pulse(this);
Monitor.Exit(this); Monitor.Exit(this);
} }
else else
{ {
// m_log.Debug("[PacketQueue]: ThrottleCheck Queueing " + item.Incoming.ToString() + " packet " + item.Packet.Type.ToString());
q.Enqueue(item); q.Enqueue(item);
} }
} }

View File

@ -29,6 +29,9 @@
using System; using System;
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
using System.Threading;
using System.Collections;
using System.Collections.Generic;
using libsecondlife; using libsecondlife;
using libsecondlife.Packets; using libsecondlife.Packets;
using OpenSim.Framework; using OpenSim.Framework;
@ -38,12 +41,27 @@ namespace OpenSim.Region.ClientStack
{ {
public class PacketServer public class PacketServer
{ {
//private static readonly log4net.ILog m_log private static readonly log4net.ILog m_log
// = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType); = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
private ClientStackNetworkHandler m_networkHandler; private ClientStackNetworkHandler m_networkHandler;
private IScene m_scene; private IScene m_scene;
public struct QueuePacket
{
public Packet Packet;
public uint CircuitCode;
public QueuePacket(Packet p, uint c)
{
Packet = p;
CircuitCode = c;
}
}
private List<Thread> Threads = new List<Thread>();
private BlockingQueue<QueuePacket> PacketQueue;
//private readonly ClientManager m_clientManager = new ClientManager(); //private readonly ClientManager m_clientManager = new ClientManager();
//public ClientManager ClientManager //public ClientManager ClientManager
//{ //{
@ -54,6 +72,25 @@ namespace OpenSim.Region.ClientStack
{ {
m_networkHandler = networkHandler; m_networkHandler = networkHandler;
m_networkHandler.RegisterPacketServer(this); m_networkHandler.RegisterPacketServer(this);
PacketQueue = new BlockingQueue<QueuePacket>();
int ThreadCount = 4;
m_log.Debug("[PacketServer]: launching " + ThreadCount.ToString() + " threads.");
for (int x = 0; x < ThreadCount; x++)
{
Thread thread = new Thread(PacketRunner);
thread.IsBackground = true;
thread.Name = "Packet Runner";
thread.Start();
Threads.Add(thread);
}
}
public void Enqueue(uint CircuitCode, Packet packet)
{
//m_log.Debug("[PacketServer]: Enquing " + packet.Type.ToString() + " from " + CircuitCode.ToString());
//lock(PacketQueue)
PacketQueue.Enqueue(new QueuePacket(packet, CircuitCode));
} }
public IScene LocalScene public IScene LocalScene
@ -61,41 +98,57 @@ namespace OpenSim.Region.ClientStack
set { m_scene = value; } set { m_scene = value; }
} }
/// <summary> private void PacketRunner()
///
/// </summary>
/// <param name="circuitCode"></param>
/// <param name="packet"></param>
public virtual void InPacket(uint circuitCode, Packet packet)
{ {
m_scene.ClientManager.InPacket(circuitCode, packet); while (true)
{
QueuePacket p;
// Mantis 641
lock(PacketQueue)
p = PacketQueue.Dequeue();
if (p.Packet != null)
{
m_scene.ClientManager.InPacket(p.CircuitCode, p.Packet);
}
else
{
m_log.Debug("[PacketServer]: Empty packet from queue!");
}
}
} }
protected virtual IClientAPI CreateNewClient(EndPoint remoteEP, UseCircuitCodePacket initialcirpack, public bool ClaimEndPoint(UseCircuitCodePacket usePacket, EndPoint ep)
ClientManager clientManager, IScene scene, AssetCache assetCache,
PacketServer packServer, AgentCircuitManager authenSessions,
LLUUID agentId, LLUUID sessionId, uint circuitCode)
{ {
return IClientAPI client;
new ClientView(remoteEP, scene, assetCache, packServer, authenSessions, agentId, sessionId, circuitCode); if (m_scene.ClientManager.TryGetClient(usePacket.CircuitCode.Code, out client))
{
if (client.Claim(ep, usePacket))
{
m_log.Debug("[PacketServer]: Claimed client.");
return true;
}
}
m_log.Debug("[PacketServer]: Failed to claim client.");
return false;
} }
public virtual bool AddNewClient(EndPoint epSender, UseCircuitCodePacket useCircuit, AssetCache assetCache, public virtual bool AddNewClient(uint circuitCode, AgentCircuitData agentData
AgentCircuitManager authenticateSessionsClass) , AssetCache assetCache, AgentCircuitManager authenticateSessionsClass)
{ {
m_log.Debug("[PacketServer]: Creating new client for " + circuitCode.ToString());
IClientAPI newuser; IClientAPI newuser;
if (m_scene.ClientManager.TryGetClient(useCircuit.CircuitCode.Code, out newuser)) if (m_scene.ClientManager.TryGetClient(circuitCode, out newuser))
{ {
m_log.Debug("[PacketServer]: Already have client for code " + circuitCode.ToString());
return false; return false;
} }
else else
{ {
newuser = CreateNewClient(epSender, useCircuit, m_scene.ClientManager, m_scene, assetCache, this, newuser = new ClientView(m_scene, assetCache, this, authenticateSessionsClass, agentData);
authenticateSessionsClass, useCircuit.CircuitCode.ID,
useCircuit.CircuitCode.SessionID, useCircuit.CircuitCode.Code);
m_scene.ClientManager.Add(useCircuit.CircuitCode.Code, newuser); m_scene.ClientManager.Add(circuitCode, newuser);
newuser.OnViewerEffect += m_scene.ClientManager.ViewerEffectHandler; newuser.OnViewerEffect += m_scene.ClientManager.ViewerEffectHandler;
newuser.OnLogout += LogoutHandler; newuser.OnLogout += LogoutHandler;

View File

@ -30,6 +30,7 @@ using System.Collections;
using System.Collections.Generic; using System.Collections.Generic;
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
using System.Threading;
using libsecondlife.Packets; using libsecondlife.Packets;
using OpenSim.Framework; using OpenSim.Framework;
using OpenSim.Framework.Communications.Cache; using OpenSim.Framework.Communications.Cache;
@ -60,6 +61,9 @@ namespace OpenSim.Region.ClientStack
protected AssetCache m_assetCache; protected AssetCache m_assetCache;
protected AgentCircuitManager m_authenticateSessionsClass; protected AgentCircuitManager m_authenticateSessionsClass;
// temporary queue until I can merge this with userthread rework of packet processing.
protected Queue<Packet> CreateUserPacket = new Queue<Packet>();
public PacketServer PacketServer public PacketServer PacketServer
{ {
get { return m_packetServer; } get { return m_packetServer; }
@ -92,6 +96,7 @@ namespace OpenSim.Region.ClientStack
Allow_Alternate_Port = allow_alternate_port; Allow_Alternate_Port = allow_alternate_port;
m_assetCache = assetCache; m_assetCache = assetCache;
m_authenticateSessionsClass = authenticateClass; m_authenticateSessionsClass = authenticateClass;
m_authenticateSessionsClass.onCircuitAdded += new AgentCircuitManager.CircuitAddedCallback(m_authenticateSessionsClass_onCircuitAdded);
CreatePacketServer(); CreatePacketServer();
// Return new port // Return new port
@ -100,6 +105,16 @@ namespace OpenSim.Region.ClientStack
port = listenPort; port = listenPort;
} }
void m_authenticateSessionsClass_onCircuitAdded(uint circuitCode, AgentCircuitData agentData)
{
m_log.Debug("Got informed of circuit " + circuitCode.ToString() + " for " + agentData.firstname + " " + agentData.lastname
+ " using session ID " + agentData.SessionID.ToString());
// create with afacke endpoint of 0.0.0.0 so that we can tell if it's active and sending packets.
EndPoint ep = new IPEndPoint(IPAddress.Any, 0);
PacketServer.AddNewClient(circuitCode, agentData, m_assetCache, m_authenticateSessionsClass);
}
protected virtual void CreatePacketServer() protected virtual void CreatePacketServer()
{ {
PacketServer packetServer = new PacketServer(this); PacketServer packetServer = new PacketServer(this);
@ -271,38 +286,19 @@ namespace OpenSim.Region.ClientStack
} }
if (ret) if (ret)
{ {
//if so then send packet to the packetserver m_packetServer.Enqueue(circuit, packet);
//m_log.Warn("[UDPSERVER]: ALREADY HAVE Circuit!");
m_packetServer.InPacket(circuit, packet);
} }
else if (packet.Type == PacketType.UseCircuitCode) else if (packet.Type == PacketType.UseCircuitCode)
{ {
// new client // new client
m_log.Debug("[UDPSERVER]: Adding New Client"); m_log.Debug("[UDPSERVER]: Adding New Client");
AddNewClient(packet); AddNewClient(packet, epSender);
}
else
{
// invalid client
//CFK: This message seems to have served its usefullness as of 12-15 so I am commenting it out for now
//m_log.Warn("[UDPSERVER]: Got a packet from an invalid client - " + packet.ToString());
} }
} }
catch (Exception ex) catch (Exception ex)
{ {
m_log.Error("[UDPSERVER]: Exception in processing packet."); m_log.Error("[UDPSERVER]: Exception in processing packet.");
m_log.Debug("[UDPSERVER]: Adding New Client"); m_log.Error("[UDPSERVER]: " + ex.ToString());
try
{
AddNewClient(packet);
}
catch (Exception e3)
{
m_log.Error("[UDPSERVER]: Adding New Client threw exception " + e3.ToString());
Server.BeginReceiveFrom(RecvBuffer, 0, RecvBuffer.Length, SocketFlags.None, ref epSender,
ReceivedData, null);
}
} }
} }
@ -320,7 +316,7 @@ namespace OpenSim.Region.ClientStack
} }
} }
protected virtual void AddNewClient(Packet packet) protected virtual void AddNewClient(Packet packet, EndPoint epSender)
{ {
UseCircuitCodePacket useCircuit = (UseCircuitCodePacket) packet; UseCircuitCodePacket useCircuit = (UseCircuitCodePacket) packet;
lock (clientCircuits) lock (clientCircuits)
@ -337,8 +333,7 @@ namespace OpenSim.Region.ClientStack
else else
m_log.Error("[UDPSERVER]: clientCurcuits_reverse already contains entry for user " + useCircuit.CircuitCode.Code.ToString() + ". NOT adding."); m_log.Error("[UDPSERVER]: clientCurcuits_reverse already contains entry for user " + useCircuit.CircuitCode.Code.ToString() + ". NOT adding.");
} }
PacketServer.ClaimEndPoint(useCircuit, epSender);
PacketServer.AddNewClient(epSender, useCircuit, m_assetCache, m_authenticateSessionsClass);
} }
public void ServerListener() public void ServerListener()
@ -384,17 +379,19 @@ namespace OpenSim.Region.ClientStack
} }
public virtual void SendPacketTo(byte[] buffer, int size, SocketFlags flags, uint circuitcode) public virtual void SendPacketTo(byte[] buffer, int size, SocketFlags flags, uint circuitcode)
//EndPoint packetSender)
{ {
// find the endpoint for this circuit
EndPoint sendto = null; EndPoint sendto = null;
lock (clientCircuits_reverse) lock (clientCircuits_reverse)
{ {
if (clientCircuits_reverse.TryGetValue(circuitcode, out sendto)) if (clientCircuits_reverse.TryGetValue(circuitcode, out sendto))
{ {
//we found the endpoint so send the packet to it
Server.SendTo(buffer, size, flags, sendto); Server.SendTo(buffer, size, flags, sendto);
} }
else
{
m_log.Debug("[UDPServer]: Failed to find person to send packet to!");
}
} }
} }

View File

@ -410,7 +410,7 @@ namespace OpenSim.Region.Environment.Scenes
public ScenePresence CreateAndAddScenePresence(IClientAPI client, bool child, AvatarAppearance appearance) public ScenePresence CreateAndAddScenePresence(IClientAPI client, bool child, AvatarAppearance appearance)
{ {
ScenePresence newAvatar = null; ScenePresence newAvatar = null;
m_log.Debug("[InnerScene]: Creating avatar");
newAvatar = new ScenePresence(client, m_parentScene, m_regInfo, appearance); newAvatar = new ScenePresence(client, m_parentScene, m_regInfo, appearance);
newAvatar.IsChildAgent = child; newAvatar.IsChildAgent = child;

View File

@ -1342,10 +1342,13 @@ namespace OpenSim.Region.Environment.Scenes
m_estateManager.sendRegionHandshake(client); m_estateManager.sendRegionHandshake(client);
m_log.Debug("[SCENE]: Calling CreateScenePresence");
CreateAndAddScenePresence(client, child); CreateAndAddScenePresence(client, child);
m_log.Debug("[SCENE]: Sending parcel overlay");
m_LandManager.sendParcelOverlay(client); m_LandManager.sendParcelOverlay(client);
m_log.Debug("[SCENE]: Adding user to cache.");
CommsManager.UserProfileCacheService.AddNewUser(client.AgentId); CommsManager.UserProfileCacheService.AddNewUser(client.AgentId);
m_log.Debug("[SCENE]: Done with adding new user.");
} }
protected virtual void SubscribeToClientEvents(IClientAPI client) protected virtual void SubscribeToClientEvents(IClientAPI client)
@ -1434,7 +1437,7 @@ namespace OpenSim.Region.Environment.Scenes
AvatarAppearance appearance; AvatarAppearance appearance;
GetAvatarAppearance(client, out appearance); GetAvatarAppearance(client, out appearance);
m_log.Debug("[SCENE]: Calling plugin CreateAndAddScenePresence");
avatar = m_innerScene.CreateAndAddScenePresence(client, child, appearance); avatar = m_innerScene.CreateAndAddScenePresence(client, child, appearance);
if (avatar.IsChildAgent) if (avatar.IsChildAgent)

View File

@ -158,6 +158,11 @@ namespace SimpleApp
private LLUUID myID = LLUUID.Random(); private LLUUID myID = LLUUID.Random();
public bool Claim(EndPoint ep, UseCircuitCodePacket packet)
{
throw new Exception("Unimplemented!");
}
public MyNpcCharacter(EventManager eventManager) public MyNpcCharacter(EventManager eventManager)
{ {
// startPos = new LLVector3(128, (float)(Util.RandomClass.NextDouble()*100), 2); // startPos = new LLVector3(128, (float)(Util.RandomClass.NextDouble()*100), 2);
@ -204,6 +209,11 @@ namespace SimpleApp
get { return FirstName + LastName; } get { return FirstName + LastName; }
} }
public EndPoint EndPoint
{
get { throw new Exception("Unimplemented.");}
set { throw new Exception("Unimplemented.");}
}
public virtual void OutPacket(Packet newPack, ThrottleOutPacketType packType) public virtual void OutPacket(Packet newPack, ThrottleOutPacketType packType)
{ {