From 14a8700be0d8312b5c1d2060da61fb53b77f89b7 Mon Sep 17 00:00:00 2001 From: Teravus Ovares Date: Fri, 22 Feb 2008 05:58:55 +0000 Subject: [PATCH] The ThreadPool ClientView Branch. --- OpenSim/Framework/AgentCircuitManager.cs | 8 +- OpenSim/Framework/BlockingQueue.cs | 14 +- OpenSim/Framework/ClientManager.cs | 7 +- OpenSim/Framework/IClientAPI.cs | 9 +- OpenSim/Region/ClientStack/ClientView.cs | 271 ++++++++++-------- OpenSim/Region/ClientStack/PacketQueue.cs | 179 +++++++----- OpenSim/Region/ClientStack/PacketServer.cs | 97 +++++-- OpenSim/Region/ClientStack/UDPServer.cs | 53 ++-- .../Region/Environment/Scenes/InnerScene.cs | 2 +- OpenSim/Region/Environment/Scenes/Scene.cs | 7 +- .../Examples/SimpleApp/MyNpcCharacter.cs | 10 + 11 files changed, 395 insertions(+), 262 deletions(-) diff --git a/OpenSim/Framework/AgentCircuitManager.cs b/OpenSim/Framework/AgentCircuitManager.cs index 938dce8db9..7a3ee48af3 100644 --- a/OpenSim/Framework/AgentCircuitManager.cs +++ b/OpenSim/Framework/AgentCircuitManager.cs @@ -34,6 +34,10 @@ namespace OpenSim.Framework { public Dictionary AgentCircuits = new Dictionary(); + // here be dragons + public delegate void CircuitAddedCallback(uint circuitCode, AgentCircuitData agentData); + public event CircuitAddedCallback onCircuitAdded; + public AgentCircuitManager() { } @@ -83,6 +87,8 @@ namespace OpenSim.Framework else { AgentCircuits.Add(circuitCode, agentData); + if (null != onCircuitAdded) + onCircuitAdded(circuitCode, agentData); } } @@ -124,4 +130,4 @@ namespace OpenSim.Framework return false; } } -} \ No newline at end of file +} diff --git a/OpenSim/Framework/BlockingQueue.cs b/OpenSim/Framework/BlockingQueue.cs index e72884c013..1ea4d38c66 100644 --- a/OpenSim/Framework/BlockingQueue.cs +++ b/OpenSim/Framework/BlockingQueue.cs @@ -34,6 +34,7 @@ namespace OpenSim.Framework { private readonly Queue m_queue = new Queue(); private readonly object m_queueSync = new object(); + private readonly object m_dequeueSync = new object(); public void Enqueue(T value) { @@ -50,9 +51,11 @@ namespace OpenSim.Framework { if (m_queue.Count < 1) { - Monitor.Wait(m_queueSync); + lock (m_dequeueSync) + { + Monitor.Wait(m_queueSync); + } } - return m_queue.Dequeue(); } } @@ -64,10 +67,13 @@ namespace OpenSim.Framework return m_queue.Contains(item); } } - + public int Count() { - return m_queue.Count; + lock (m_queueSync) + { + return m_queue.Count; + } } } } diff --git a/OpenSim/Framework/ClientManager.cs b/OpenSim/Framework/ClientManager.cs index cfdcbf05a6..0da7743274 100644 --- a/OpenSim/Framework/ClientManager.cs +++ b/OpenSim/Framework/ClientManager.cs @@ -93,10 +93,15 @@ namespace OpenSim.Framework bool tryGetRet = false; lock (m_clients) tryGetRet = m_clients.TryGetValue(circuitCode, out client); - if(tryGetRet) + if (tryGetRet) { + //m_log.Debug("[ClientManager]: Processing IN packet " + packet.Type.ToString()); client.InPacket(packet); } + else + { + m_log.Debug("[ClientManager]: Failed to find client for " + circuitCode.ToString()); + } } public void CloseAllAgents(uint circuitCode) diff --git a/OpenSim/Framework/IClientAPI.cs b/OpenSim/Framework/IClientAPI.cs index 2635a239e9..e25a8e6f38 100644 --- a/OpenSim/Framework/IClientAPI.cs +++ b/OpenSim/Framework/IClientAPI.cs @@ -528,7 +528,12 @@ namespace OpenSim.Framework event MoneyBalanceRequest OnMoneyBalanceRequest; - + EndPoint EndPoint + { + set; + get; + } + LLVector3 StartPos { get; set; } LLUUID AgentId { get; } @@ -645,8 +650,8 @@ namespace OpenSim.Framework uint flags, LLUUID flImageID, LLUUID imageID, string profileURL, LLUUID partnerID); byte[] GetThrottlesPacked(float multiplier); - + bool Claim(EndPoint ep, UseCircuitCodePacket usePacket); void SetDebug(int newDebug); void InPacket(Packet NewPack); void Close(bool ShutdownCircuit); diff --git a/OpenSim/Region/ClientStack/ClientView.cs b/OpenSim/Region/ClientStack/ClientView.cs index 1440d6c2aa..a7be56bd1b 100644 --- a/OpenSim/Region/ClientStack/ClientView.cs +++ b/OpenSim/Region/ClientStack/ClientView.cs @@ -115,10 +115,20 @@ namespace OpenSim.Region.ClientStack /* public variables */ protected string m_firstName; protected string m_lastName; - protected Thread m_clientThread; + // protected Thread m_clientThread; protected LLVector3 m_startpos; protected EndPoint m_userEndPoint; + /// + /// do we process or wait? + /// + protected bool ReadyForPackets = false; + + /// + /// holds packets that were received while paused. + /// + protected Queue PausedPackets = new Queue(); + /* Properties */ public LLUUID SecureSessionId @@ -189,46 +199,62 @@ namespace OpenSim.Region.ClientStack get { return m_moneyBalance; } } + + public EndPoint EndPoint + { + set { m_userEndPoint = value; } + get { return m_userEndPoint; } + } + /* METHODS */ - public ClientView(EndPoint remoteEP, IScene scene, AssetCache assetCache, PacketServer packServer, - AgentCircuitManager authenSessions, LLUUID agentId, LLUUID sessionId, uint circuitCode) + public ClientView(IScene scene, AssetCache assetCache, PacketServer packServer, + AgentCircuitManager authenSessions, AgentCircuitData agentData) { + m_log.Info("[CLIENT]: Creating new client for SessionID " + agentData.SessionID.ToString()); m_moneyBalance = 1000; - m_channelVersion = Helpers.StringToField(scene.GetSimulatorVersion()); - m_scene = scene; m_assetCache = assetCache; - m_networkServer = packServer; - // m_inventoryCache = inventoryCache; m_authenticateSessionsHandler = authenSessions; - m_log.Info("[CLIENT]: Started up new client thread to handle incoming request"); + m_agentId = agentData.AgentID; + m_sessionId = agentData.SessionID; + m_circuitCode = agentData.circuitcode; - m_agentId = agentId; - m_sessionId = sessionId; - m_circuitCode = circuitCode; - - m_userEndPoint = remoteEP; - - m_startpos = m_authenticateSessionsHandler.GetPosition(circuitCode); - - // While working on this, the BlockingQueue had me fooled for a bit. - // The Blocking queue causes the thread to stop until there's something - // in it to process. It's an on-purpose threadlock though because - // without it, the clientloop will suck up all sim resources. - - m_packetQueue = new PacketQueue(); + m_startpos = m_authenticateSessionsHandler.GetPosition(m_circuitCode); + m_packetQueue = new PacketQueue(this); RegisterLocalPacketHandlers(); + } - m_clientThread = new Thread(new ThreadStart(AuthUser)); - m_clientThread.Name = "ClientThread"; - m_clientThread.IsBackground = true; - m_clientThread.Start(); - OpenSim.Framework.ThreadTracker.Add(m_clientThread); + public bool Claim(EndPoint ep, UseCircuitCodePacket usePacket) + { + if (usePacket.CircuitCode.Code != m_circuitCode) + { + m_log.Debug("[ClientView]: Circuit code " + usePacket.CircuitCode.Code.ToString() + + " does NOT match." + m_circuitCode.ToString()); + return false; + } + if (0 != usePacket.CircuitCode.SessionID.CompareTo(m_sessionId)) + { + m_log.Debug("[ClientView]: SessionID " + usePacket.CircuitCode.SessionID.ToString() + + " does not match " + m_sessionId.ToString()); + return false; + } + // this isnt' really the right name anymore. + EndPoint = ep; + AuthUser(); + ack_pack(usePacket); + m_log.Debug("[ClientView]: Dealing with " + PausedPackets.Count.ToString() + " packets backlogged."); + ReadyForPackets = true; + // stuff the paused packets BACK into the thread queue. :) + while (PausedPackets.Count > 0) + { + m_networkServer.Enqueue(m_circuitCode, PausedPackets.Dequeue()); + } + return true; } public void SetDebug(int newDebug) @@ -254,29 +280,10 @@ namespace OpenSim.Region.ClientStack m_packetQueue.Close(); m_packetQueue.Flush(); - Thread.Sleep(2000); - - - // Shut down timers m_ackTimer.Stop(); m_clientPingTimer.Stop(); - // This is just to give the client a reasonable chance of - // flushing out all it's packets. There should probably - // be a better mechanism here - - // We can't reach into other scenes and close the connection - // We need to do this over grid communications - //m_scene.CloseAllAgents(CircuitCode); - - // If we're not shutting down the circuit, then this is the last time we'll go here. - // If we are shutting down the circuit, the UDP Server will come back here with - // ShutDownCircuit = false - if (!(ShutdownCircult)) - { - GC.Collect(); - m_clientThread.Abort(); - } + GC.Collect(); } /// @@ -287,13 +294,14 @@ namespace OpenSim.Region.ClientStack public void Close(bool ShutdownCircult) { // Pull Client out of Region - m_log.Info("[CLIENT]: Close has been called"); + m_log.Info("[CLIENT]: Close has been called with " + ShutdownCircult.ToString()); //raiseevent on the packet server to Shutdown the circuit if (ShutdownCircult) - OnConnectionClosed(this); - - CloseCleanup(ShutdownCircult); + m_networkServer.CloseClient(this); + else + CloseCleanup(ShutdownCircult); + //OnConnectionClosed(this); } public void Kick(string message) @@ -402,29 +410,6 @@ namespace OpenSim.Region.ClientStack } } - protected virtual void ClientLoop() - { - m_log.Info("[CLIENT]: Entered loop"); - while (true) - { - QueItem nextPacket = m_packetQueue.Dequeue(); - if (nextPacket.Incoming) - { - if (nextPacket.Packet.Type != PacketType.AgentUpdate) - { - m_packetsReceived++; - } - DebugPacket("IN", nextPacket.Packet); - ProcessInPacket(nextPacket.Packet); - } - else - { - DebugPacket("OUT", nextPacket.Packet); - ProcessOutPacket(nextPacket.Packet); - } - } - } - # endregion protected void CheckClientConnectivity(object sender, ElapsedEventArgs e) @@ -460,6 +445,7 @@ namespace OpenSim.Region.ClientStack protected virtual void InitNewClient() { + m_log.Debug("[ClientView]: Setting up timers"); //this.UploadAssets = new AgentAssetUpload(this, m_assetCache, m_inventoryCache); // Establish our two timers. We could probably get this down to one @@ -471,7 +457,7 @@ namespace OpenSim.Region.ClientStack m_clientPingTimer.Elapsed += new ElapsedEventHandler(CheckClientConnectivity); m_clientPingTimer.Enabled = true; - m_log.Info("[CLIENT]: Adding viewer agent to scene"); + m_log.Info("[ClientView]: Adding viewer agent to scene"); m_scene.AddNewClient(this, true); } @@ -487,11 +473,11 @@ namespace OpenSim.Region.ClientStack m_log.Info("[CLIENT]: New user request denied to " + m_userEndPoint.ToString()); m_packetQueue.Flush(); m_packetQueue.Close(); - m_clientThread.Abort(); + // m_clientThread.Abort(); } else { - m_log.Info("[CLIENT]: Got authenticated connection from " + m_userEndPoint.ToString()); + m_log.Info("[CLIENT]: Received authenticated connection from " + m_userEndPoint.ToString()); //session is authorised m_firstName = sessionInfo.LoginInfo.First; m_lastName = sessionInfo.LoginInfo.Last; @@ -503,7 +489,7 @@ namespace OpenSim.Region.ClientStack // This sets up all the timers InitNewClient(); - ClientLoop(); + //ClientLoop(); } } @@ -2458,11 +2444,15 @@ namespace OpenSim.Region.ClientStack } } - protected virtual void ProcessOutPacket(Packet Pack) + /// + /// processes an outbound packet and sends the data. + /// Needs to be public so that throttle can use it. + /// + /// + public virtual void ProcessOutPacket(Packet Pack) { // Keep track of when this packet was sent out Pack.TickCount = System.Environment.TickCount; - if (!Pack.Header.Resent) { Pack.Header.Sequence = NextSeqNum(); @@ -2482,6 +2472,8 @@ namespace OpenSim.Region.ClientStack // Actually make the byte array and send it try { + //m_log.Debug("[ClientView]: Sending packet " + Pack.Type.ToString() + " to " + // + m_circuitCode.ToString()); byte[] sendbuffer = Pack.ToBytes(); PacketPool.Instance.ReturnPacket(Pack); @@ -2489,10 +2481,12 @@ namespace OpenSim.Region.ClientStack { int packetsize = Helpers.ZeroEncode(sendbuffer, sendbuffer.Length, ZeroOutBuffer); m_networkServer.SendPacketTo(ZeroOutBuffer, packetsize, SocketFlags.None, m_circuitCode); + // m_log.Debug("[ClientView]: Sent to " + m_circuitCode.ToString() + " " + Pack.Type.ToString()); } else { m_networkServer.SendPacketTo(sendbuffer, sendbuffer.Length, SocketFlags.None, m_circuitCode); + // m_log.Debug("[ClientView]: sent to " + m_circuitCode.ToString() + " " + Pack.Type.ToString()); } } catch (Exception e) @@ -2507,72 +2501,96 @@ namespace OpenSim.Region.ClientStack public virtual void InPacket(Packet NewPack) { - // Handle appended ACKs - if (NewPack != null) + // deals with the lags involved on gettign the client setup. + // we can't accept packets for the user until after Auth() + if (!ReadyForPackets) { - if (NewPack.Header.AppendedAcks) - { - lock (m_needAck) - { - foreach (uint ackedPacketId in NewPack.Header.AckList) - { - Packet ackedPacket; + PausedPackets.Enqueue(NewPack); + return; + } - if (m_needAck.TryGetValue(ackedPacketId, out ackedPacket)) + // lock (m_packetQueue) + { + // Handle appended ACKs + if (NewPack != null) + { + if (NewPack.Header.AppendedAcks) + { + lock (m_needAck) + { + foreach (uint ackedPacketId in NewPack.Header.AckList) { - m_unAckedBytes -= ackedPacket.ToBytes().Length; - m_needAck.Remove(ackedPacketId); + Packet ackedPacket; + + if (m_needAck.TryGetValue(ackedPacketId, out ackedPacket)) + { + m_unAckedBytes -= ackedPacket.ToBytes().Length; + m_needAck.Remove(ackedPacketId); + } } } } - } - // Handle PacketAck packets - if (NewPack.Type == PacketType.PacketAck) - { - PacketAckPacket ackPacket = (PacketAckPacket)NewPack; - - lock (m_needAck) + // Handle PacketAck packets + if (NewPack.Type == PacketType.PacketAck) { - foreach (PacketAckPacket.PacketsBlock block in ackPacket.Packets) + PacketAckPacket ackPacket = (PacketAckPacket)NewPack; + + lock (m_needAck) { - uint ackedPackId = block.ID; - Packet ackedPacket; - if (m_needAck.TryGetValue(ackedPackId, out ackedPacket)) + foreach (PacketAckPacket.PacketsBlock block in ackPacket.Packets) { - m_unAckedBytes -= ackedPacket.ToBytes().Length; - m_needAck.Remove(ackedPackId); + uint ackedPackId = block.ID; + Packet ackedPacket; + if (m_needAck.TryGetValue(ackedPackId, out ackedPacket)) + { + m_unAckedBytes -= ackedPacket.ToBytes().Length; + m_needAck.Remove(ackedPackId); + } } } } - } - else if ((NewPack.Type == PacketType.StartPingCheck)) - { - //reply to pingcheck - StartPingCheckPacket startPing = (StartPingCheckPacket)NewPack; - CompletePingCheckPacket endPing = (CompletePingCheckPacket)PacketPool.Instance.GetPacket(PacketType.CompletePingCheck); - endPing.PingID.PingID = startPing.PingID.PingID; - OutPacket(endPing, ThrottleOutPacketType.Task); - } - else - { - QueItem item = new QueItem(); - item.Packet = NewPack; - item.Incoming = true; - m_packetQueue.Enqueue(item); + else if ((NewPack.Type == PacketType.StartPingCheck)) + { + //reply to pingcheck + StartPingCheckPacket startPing = (StartPingCheckPacket)NewPack; + CompletePingCheckPacket endPing = (CompletePingCheckPacket)PacketPool.Instance.GetPacket(PacketType.CompletePingCheck); + endPing.PingID.PingID = startPing.PingID.PingID; + OutPacket(endPing, ThrottleOutPacketType.Task); + } + else + { + QueItem item = new QueItem(); + item.Packet = NewPack; + item.Incoming = true; + //m_packetQueue.Enqueue(item); + ProcessInPacket(NewPack); + } } } } public virtual void OutPacket(Packet NewPack, ThrottleOutPacketType throttlePacketType) { - QueItem item = new QueItem(); - item.Packet = NewPack; - item.Incoming = false; - item.throttleType = throttlePacketType; // Packet throttle type - m_packetQueue.Enqueue(item); - m_packetsSent++; + //lock (m_packetQueue) + { + QueItem item = new QueItem(); + item.Packet = NewPack; + item.Incoming = false; + item.throttleType = throttlePacketType; // Packet throttle type + + // if it's unknown, just punt it out. probably an ack. + if (throttlePacketType == ThrottleOutPacketType.Unknown) + { + ProcessOutPacket(NewPack); + } + else + { + m_packetQueue.Enqueue(item); + } + m_packetsSent++; + } } # region Low Level Packet Methods @@ -2581,6 +2599,7 @@ namespace OpenSim.Region.ClientStack { if (Pack.Header.Reliable) { + //m_log.Debug("[ClientView]: Acking " + Pack.Type.ToString()); PacketAckPacket ack_it = (PacketAckPacket)PacketPool.Instance.GetPacket(PacketType.PacketAck); // TODO: don't create new blocks if recycling an old packet ack_it.Packets = new PacketAckPacket.PacketsBlock[1]; diff --git a/OpenSim/Region/ClientStack/PacketQueue.cs b/OpenSim/Region/ClientStack/PacketQueue.cs index 06ed32e7d9..2ee259bfc0 100644 --- a/OpenSim/Region/ClientStack/PacketQueue.cs +++ b/OpenSim/Region/ClientStack/PacketQueue.cs @@ -37,9 +37,15 @@ namespace OpenSim.Region.ClientStack { public class PacketQueue { - //private static readonly log4net.ILog m_log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType); + private static readonly log4net.ILog m_log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType); - private bool m_enabled = true; + private bool m_enabled = false; + + public bool Enable + { + get { return m_enabled; } + set { m_enabled = value; m_log.Debug("[PacketQueue]: Enabled == " + value.ToString()); } + } private BlockingQueue SendQueue; @@ -77,7 +83,12 @@ namespace OpenSim.Region.ClientStack // private long ThrottleInterval; private Timer throttleTimer; - public PacketQueue() + /// + /// backreference so we can push packets out the client. May be temporary. + /// + private ClientView Client; + + public PacketQueue(ClientView client) { // While working on this, the BlockingQueue had me fooled for a bit. // The Blocking queue causes the thread to stop until there's something @@ -86,6 +97,8 @@ namespace OpenSim.Region.ClientStack SendQueue = new BlockingQueue(); + Client = client; + IncomingPacketQueue = new Queue(); OutgoingPacketQueue = new Queue(); ResendOutgoingPacketQueue = new Queue(); @@ -123,44 +136,49 @@ namespace OpenSim.Region.ClientStack public void Enqueue(QueItem item) { - if (!m_enabled) - { - return; - } + // We could micro lock, but that will tend to actually // probably be worse than just synchronizing on SendQueue - lock (this) + // enqueue inbound right away. forget throttle checking inbound! + if (item.Incoming) { - switch (item.throttleType) + SendQueue.Enqueue(item); + } + else + { + lock (this) { - case ThrottleOutPacketType.Resend: - ThrottleCheck(ref ResendThrottle, ref ResendOutgoingPacketQueue, item); - break; - case ThrottleOutPacketType.Texture: - ThrottleCheck(ref TextureThrottle, ref TextureOutgoingPacketQueue, item); - break; - case ThrottleOutPacketType.Task: - ThrottleCheck(ref TaskThrottle, ref TaskOutgoingPacketQueue, item); - break; - case ThrottleOutPacketType.Land: - ThrottleCheck(ref LandThrottle, ref LandOutgoingPacketQueue, item); - break; - case ThrottleOutPacketType.Asset: - ThrottleCheck(ref AssetThrottle, ref AssetOutgoingPacketQueue, item); - break; - case ThrottleOutPacketType.Cloud: - ThrottleCheck(ref CloudThrottle, ref CloudOutgoingPacketQueue, item); - break; - case ThrottleOutPacketType.Wind: - ThrottleCheck(ref WindThrottle, ref WindOutgoingPacketQueue, item); - break; + switch (item.throttleType) + { + case ThrottleOutPacketType.Resend: + ThrottleCheck(ref ResendThrottle, ref ResendOutgoingPacketQueue, item); + break; + case ThrottleOutPacketType.Texture: + ThrottleCheck(ref TextureThrottle, ref TextureOutgoingPacketQueue, item); + break; + case ThrottleOutPacketType.Task: + ThrottleCheck(ref TaskThrottle, ref TaskOutgoingPacketQueue, item); + break; + case ThrottleOutPacketType.Land: + ThrottleCheck(ref LandThrottle, ref LandOutgoingPacketQueue, item); + break; + case ThrottleOutPacketType.Asset: + ThrottleCheck(ref AssetThrottle, ref AssetOutgoingPacketQueue, item); + break; + case ThrottleOutPacketType.Cloud: + ThrottleCheck(ref CloudThrottle, ref CloudOutgoingPacketQueue, item); + break; + case ThrottleOutPacketType.Wind: + ThrottleCheck(ref WindThrottle, ref WindOutgoingPacketQueue, item); + break; - default: - // Acknowledgements and other such stuff should go directly to the blocking Queue - // Throttling them may and likely 'will' be problematic - SendQueue.Enqueue(item); - break; + default: + // Acknowledgements and other such stuff should go directly to the blocking Queue + // Throttling them may and likely 'will' be problematic + Client.ProcessOutPacket(item.Packet); + break; + } } } } @@ -174,39 +192,34 @@ namespace OpenSim.Region.ClientStack { lock (this) { - while (PacketsWaiting()) + while (ResendOutgoingPacketQueue.Count > 0) { - //Now comes the fun part.. we dump all our elements into m_packetQueue that we've saved up. - if (ResendOutgoingPacketQueue.Count > 0) - { - SendQueue.Enqueue(ResendOutgoingPacketQueue.Dequeue()); - } - if (LandOutgoingPacketQueue.Count > 0) - { - SendQueue.Enqueue(LandOutgoingPacketQueue.Dequeue()); - } - if (WindOutgoingPacketQueue.Count > 0) - { - SendQueue.Enqueue(WindOutgoingPacketQueue.Dequeue()); - } - if (CloudOutgoingPacketQueue.Count > 0) - { - SendQueue.Enqueue(CloudOutgoingPacketQueue.Dequeue()); - } - if (TaskOutgoingPacketQueue.Count > 0) - { - SendQueue.Enqueue(TaskOutgoingPacketQueue.Dequeue()); - } - if (TextureOutgoingPacketQueue.Count > 0) - { - SendQueue.Enqueue(TextureOutgoingPacketQueue.Dequeue()); - } - if (AssetOutgoingPacketQueue.Count > 0) - { - SendQueue.Enqueue(AssetOutgoingPacketQueue.Dequeue()); - } + Client.ProcessOutPacket(ResendOutgoingPacketQueue.Dequeue().Packet); + } + while (LandOutgoingPacketQueue.Count > 0) + { + Client.ProcessOutPacket(LandOutgoingPacketQueue.Dequeue().Packet); + } + while (WindOutgoingPacketQueue.Count > 0) + { + Client.ProcessOutPacket(WindOutgoingPacketQueue.Dequeue().Packet); + } + while (CloudOutgoingPacketQueue.Count > 0) + { + Client.ProcessOutPacket(CloudOutgoingPacketQueue.Dequeue().Packet); + } + while (TaskOutgoingPacketQueue.Count > 0) + { + Client.ProcessOutPacket(TaskOutgoingPacketQueue.Dequeue().Packet); + } + while (TextureOutgoingPacketQueue.Count > 0) + { + Client.ProcessOutPacket(TextureOutgoingPacketQueue.Dequeue().Packet); + } + while (AssetOutgoingPacketQueue.Count > 0) + { + Client.ProcessOutPacket(AssetOutgoingPacketQueue.Dequeue().Packet); } - // m_log.Info("[THROTTLE]: Processed " + throttleLoops + " packets"); } } @@ -265,7 +278,9 @@ namespace OpenSim.Region.ClientStack { QueItem qpack = ResendOutgoingPacketQueue.Dequeue(); - SendQueue.Enqueue(qpack); + //SendQueue.Enqueue(qpack); + // m_log.Debug("[PacketQueue]: ThrottleCheck Sending " + qpack.Incoming.ToString() + " packet " + qpack.Packet.Type.ToString()); + Client.ProcessOutPacket(qpack.Packet); TotalThrottle.Add(qpack.Packet.ToBytes().Length); ResendThrottle.Add(qpack.Packet.ToBytes().Length); } @@ -273,7 +288,9 @@ namespace OpenSim.Region.ClientStack { QueItem qpack = LandOutgoingPacketQueue.Dequeue(); - SendQueue.Enqueue(qpack); + //SendQueue.Enqueue(qpack); + // m_log.Debug("[PacketQueue]: ThrottleCheck Sending " + qpack.Incoming.ToString() + " packet " + qpack.Packet.Type.ToString()); + Client.ProcessOutPacket(qpack.Packet); TotalThrottle.Add(qpack.Packet.ToBytes().Length); LandThrottle.Add(qpack.Packet.ToBytes().Length); } @@ -281,7 +298,9 @@ namespace OpenSim.Region.ClientStack { QueItem qpack = WindOutgoingPacketQueue.Dequeue(); - SendQueue.Enqueue(qpack); + // SendQueue.Enqueue(qpack); + //m_log.Debug("[PacketQueue]: ThrottleCheck Sending " + qpack.Incoming.ToString() + " packet " + qpack.Packet.Type.ToString()); + Client.ProcessOutPacket(qpack.Packet); TotalThrottle.Add(qpack.Packet.ToBytes().Length); WindThrottle.Add(qpack.Packet.ToBytes().Length); } @@ -289,7 +308,9 @@ namespace OpenSim.Region.ClientStack { QueItem qpack = CloudOutgoingPacketQueue.Dequeue(); - SendQueue.Enqueue(qpack); + //SendQueue.Enqueue(qpack); + // m_log.Debug("[PacketQueue]: ThrottleCheck Sending " + qpack.Incoming.ToString() + " packet " + qpack.Packet.Type.ToString()); + Client.ProcessOutPacket(qpack.Packet); TotalThrottle.Add(qpack.Packet.ToBytes().Length); CloudThrottle.Add(qpack.Packet.ToBytes().Length); } @@ -297,7 +318,10 @@ namespace OpenSim.Region.ClientStack { QueItem qpack = TaskOutgoingPacketQueue.Dequeue(); - SendQueue.Enqueue(qpack); + //SendQueue.Enqueue(qpack); + // m_log.Debug("[PacketQueue]: ThrottleCheck Sending " + qpack.Incoming.ToString() + " packet " + qpack.Packet.Type.ToString()); + Client.ProcessOutPacket(qpack.Packet); + TotalThrottle.Add(qpack.Packet.ToBytes().Length); TaskThrottle.Add(qpack.Packet.ToBytes().Length); } @@ -305,7 +329,9 @@ namespace OpenSim.Region.ClientStack { QueItem qpack = TextureOutgoingPacketQueue.Dequeue(); - SendQueue.Enqueue(qpack); + //SendQueue.Enqueue(qpack); + // m_log.Debug("[PacketQueue]: ThrottleCheck Sending " + qpack.Incoming.ToString() + " packet " + qpack.Packet.Type.ToString()); + Client.ProcessOutPacket(qpack.Packet); TotalThrottle.Add(qpack.Packet.ToBytes().Length); TextureThrottle.Add(qpack.Packet.ToBytes().Length); } @@ -313,7 +339,9 @@ namespace OpenSim.Region.ClientStack { QueItem qpack = AssetOutgoingPacketQueue.Dequeue(); - SendQueue.Enqueue(qpack); + //SendQueue.Enqueue(qpack); + // m_log.Debug("[PacketQueue]: ThrottleCheck Sending " + qpack.Incoming.ToString() + " packet " + qpack.Packet.Type.ToString()); + Client.ProcessOutPacket(qpack.Packet); TotalThrottle.Add(qpack.Packet.ToBytes().Length); AssetThrottle.Add(qpack.Packet.ToBytes().Length); } @@ -337,17 +365,18 @@ namespace OpenSim.Region.ClientStack // wait for the timer to fire to put things into the // output queue - if ((q.Count == 0) && (throttle.UnderLimit())) + if (m_enabled && (q.Count == 0) && (throttle.UnderLimit())) { Monitor.Enter(this); throttle.Add(item.Packet.ToBytes().Length); TotalThrottle.Add(item.Packet.ToBytes().Length); - SendQueue.Enqueue(item); + Client.ProcessOutPacket(item.Packet); Monitor.Pulse(this); Monitor.Exit(this); } else { + // m_log.Debug("[PacketQueue]: ThrottleCheck Queueing " + item.Incoming.ToString() + " packet " + item.Packet.Type.ToString()); q.Enqueue(item); } } diff --git a/OpenSim/Region/ClientStack/PacketServer.cs b/OpenSim/Region/ClientStack/PacketServer.cs index 250b90afcb..1d1f4a747e 100644 --- a/OpenSim/Region/ClientStack/PacketServer.cs +++ b/OpenSim/Region/ClientStack/PacketServer.cs @@ -29,6 +29,9 @@ using System; using System.Net; using System.Net.Sockets; +using System.Threading; +using System.Collections; +using System.Collections.Generic; using libsecondlife; using libsecondlife.Packets; using OpenSim.Framework; @@ -38,12 +41,27 @@ namespace OpenSim.Region.ClientStack { public class PacketServer { - //private static readonly log4net.ILog m_log - // = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType); + private static readonly log4net.ILog m_log + = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType); private ClientStackNetworkHandler m_networkHandler; private IScene m_scene; + public struct QueuePacket + { + public Packet Packet; + public uint CircuitCode; + + public QueuePacket(Packet p, uint c) + { + Packet = p; + CircuitCode = c; + } + } + + private List Threads = new List(); + private BlockingQueue PacketQueue; + //private readonly ClientManager m_clientManager = new ClientManager(); //public ClientManager ClientManager //{ @@ -54,6 +72,25 @@ namespace OpenSim.Region.ClientStack { m_networkHandler = networkHandler; m_networkHandler.RegisterPacketServer(this); + PacketQueue = new BlockingQueue(); + + int ThreadCount = 4; + m_log.Debug("[PacketServer]: launching " + ThreadCount.ToString() + " threads."); + for (int x = 0; x < ThreadCount; x++) + { + Thread thread = new Thread(PacketRunner); + thread.IsBackground = true; + thread.Name = "Packet Runner"; + thread.Start(); + Threads.Add(thread); + } + } + + public void Enqueue(uint CircuitCode, Packet packet) + { + //m_log.Debug("[PacketServer]: Enquing " + packet.Type.ToString() + " from " + CircuitCode.ToString()); + //lock(PacketQueue) + PacketQueue.Enqueue(new QueuePacket(packet, CircuitCode)); } public IScene LocalScene @@ -61,41 +98,57 @@ namespace OpenSim.Region.ClientStack set { m_scene = value; } } - /// - /// - /// - /// - /// - public virtual void InPacket(uint circuitCode, Packet packet) + private void PacketRunner() { - m_scene.ClientManager.InPacket(circuitCode, packet); + while (true) + { + QueuePacket p; + // Mantis 641 + lock(PacketQueue) + p = PacketQueue.Dequeue(); + + if (p.Packet != null) + { + m_scene.ClientManager.InPacket(p.CircuitCode, p.Packet); + } + else + { + m_log.Debug("[PacketServer]: Empty packet from queue!"); + } + } } - protected virtual IClientAPI CreateNewClient(EndPoint remoteEP, UseCircuitCodePacket initialcirpack, - ClientManager clientManager, IScene scene, AssetCache assetCache, - PacketServer packServer, AgentCircuitManager authenSessions, - LLUUID agentId, LLUUID sessionId, uint circuitCode) + public bool ClaimEndPoint(UseCircuitCodePacket usePacket, EndPoint ep) { - return - new ClientView(remoteEP, scene, assetCache, packServer, authenSessions, agentId, sessionId, circuitCode); + IClientAPI client; + if (m_scene.ClientManager.TryGetClient(usePacket.CircuitCode.Code, out client)) + { + if (client.Claim(ep, usePacket)) + { + m_log.Debug("[PacketServer]: Claimed client."); + return true; + } + } + m_log.Debug("[PacketServer]: Failed to claim client."); + return false; } - public virtual bool AddNewClient(EndPoint epSender, UseCircuitCodePacket useCircuit, AssetCache assetCache, - AgentCircuitManager authenticateSessionsClass) + public virtual bool AddNewClient(uint circuitCode, AgentCircuitData agentData + , AssetCache assetCache, AgentCircuitManager authenticateSessionsClass) { + m_log.Debug("[PacketServer]: Creating new client for " + circuitCode.ToString()); IClientAPI newuser; - if (m_scene.ClientManager.TryGetClient(useCircuit.CircuitCode.Code, out newuser)) + if (m_scene.ClientManager.TryGetClient(circuitCode, out newuser)) { + m_log.Debug("[PacketServer]: Already have client for code " + circuitCode.ToString()); return false; } else { - newuser = CreateNewClient(epSender, useCircuit, m_scene.ClientManager, m_scene, assetCache, this, - authenticateSessionsClass, useCircuit.CircuitCode.ID, - useCircuit.CircuitCode.SessionID, useCircuit.CircuitCode.Code); + newuser = new ClientView(m_scene, assetCache, this, authenticateSessionsClass, agentData); - m_scene.ClientManager.Add(useCircuit.CircuitCode.Code, newuser); + m_scene.ClientManager.Add(circuitCode, newuser); newuser.OnViewerEffect += m_scene.ClientManager.ViewerEffectHandler; newuser.OnLogout += LogoutHandler; diff --git a/OpenSim/Region/ClientStack/UDPServer.cs b/OpenSim/Region/ClientStack/UDPServer.cs index 19974531b1..0a0cfd4b99 100644 --- a/OpenSim/Region/ClientStack/UDPServer.cs +++ b/OpenSim/Region/ClientStack/UDPServer.cs @@ -30,6 +30,7 @@ using System.Collections; using System.Collections.Generic; using System.Net; using System.Net.Sockets; +using System.Threading; using libsecondlife.Packets; using OpenSim.Framework; using OpenSim.Framework.Communications.Cache; @@ -60,6 +61,9 @@ namespace OpenSim.Region.ClientStack protected AssetCache m_assetCache; protected AgentCircuitManager m_authenticateSessionsClass; + // temporary queue until I can merge this with userthread rework of packet processing. + protected Queue CreateUserPacket = new Queue(); + public PacketServer PacketServer { get { return m_packetServer; } @@ -92,6 +96,7 @@ namespace OpenSim.Region.ClientStack Allow_Alternate_Port = allow_alternate_port; m_assetCache = assetCache; m_authenticateSessionsClass = authenticateClass; + m_authenticateSessionsClass.onCircuitAdded += new AgentCircuitManager.CircuitAddedCallback(m_authenticateSessionsClass_onCircuitAdded); CreatePacketServer(); // Return new port @@ -100,6 +105,16 @@ namespace OpenSim.Region.ClientStack port = listenPort; } + void m_authenticateSessionsClass_onCircuitAdded(uint circuitCode, AgentCircuitData agentData) + { + m_log.Debug("Got informed of circuit " + circuitCode.ToString() + " for " + agentData.firstname + " " + agentData.lastname + + " using session ID " + agentData.SessionID.ToString()); + + // create with afacke endpoint of 0.0.0.0 so that we can tell if it's active and sending packets. + EndPoint ep = new IPEndPoint(IPAddress.Any, 0); + PacketServer.AddNewClient(circuitCode, agentData, m_assetCache, m_authenticateSessionsClass); + } + protected virtual void CreatePacketServer() { PacketServer packetServer = new PacketServer(this); @@ -271,38 +286,19 @@ namespace OpenSim.Region.ClientStack } if (ret) { - //if so then send packet to the packetserver - //m_log.Warn("[UDPSERVER]: ALREADY HAVE Circuit!"); - m_packetServer.InPacket(circuit, packet); + m_packetServer.Enqueue(circuit, packet); } else if (packet.Type == PacketType.UseCircuitCode) { // new client m_log.Debug("[UDPSERVER]: Adding New Client"); - AddNewClient(packet); - } - else - { - // invalid client - //CFK: This message seems to have served its usefullness as of 12-15 so I am commenting it out for now - //m_log.Warn("[UDPSERVER]: Got a packet from an invalid client - " + packet.ToString()); - + AddNewClient(packet, epSender); } } catch (Exception ex) { m_log.Error("[UDPSERVER]: Exception in processing packet."); - m_log.Debug("[UDPSERVER]: Adding New Client"); - try - { - AddNewClient(packet); - } - catch (Exception e3) - { - m_log.Error("[UDPSERVER]: Adding New Client threw exception " + e3.ToString()); - Server.BeginReceiveFrom(RecvBuffer, 0, RecvBuffer.Length, SocketFlags.None, ref epSender, - ReceivedData, null); - } + m_log.Error("[UDPSERVER]: " + ex.ToString()); } } @@ -320,7 +316,7 @@ namespace OpenSim.Region.ClientStack } } - protected virtual void AddNewClient(Packet packet) + protected virtual void AddNewClient(Packet packet, EndPoint epSender) { UseCircuitCodePacket useCircuit = (UseCircuitCodePacket) packet; lock (clientCircuits) @@ -337,8 +333,7 @@ namespace OpenSim.Region.ClientStack else m_log.Error("[UDPSERVER]: clientCurcuits_reverse already contains entry for user " + useCircuit.CircuitCode.Code.ToString() + ". NOT adding."); } - - PacketServer.AddNewClient(epSender, useCircuit, m_assetCache, m_authenticateSessionsClass); + PacketServer.ClaimEndPoint(useCircuit, epSender); } public void ServerListener() @@ -384,17 +379,19 @@ namespace OpenSim.Region.ClientStack } public virtual void SendPacketTo(byte[] buffer, int size, SocketFlags flags, uint circuitcode) - //EndPoint packetSender) { - // find the endpoint for this circuit EndPoint sendto = null; lock (clientCircuits_reverse) { if (clientCircuits_reverse.TryGetValue(circuitcode, out sendto)) { - //we found the endpoint so send the packet to it + Server.SendTo(buffer, size, flags, sendto); } + else + { + m_log.Debug("[UDPServer]: Failed to find person to send packet to!"); + } } } diff --git a/OpenSim/Region/Environment/Scenes/InnerScene.cs b/OpenSim/Region/Environment/Scenes/InnerScene.cs index 38bcb0388f..50db9ca1b8 100644 --- a/OpenSim/Region/Environment/Scenes/InnerScene.cs +++ b/OpenSim/Region/Environment/Scenes/InnerScene.cs @@ -410,7 +410,7 @@ namespace OpenSim.Region.Environment.Scenes public ScenePresence CreateAndAddScenePresence(IClientAPI client, bool child, AvatarAppearance appearance) { ScenePresence newAvatar = null; - + m_log.Debug("[InnerScene]: Creating avatar"); newAvatar = new ScenePresence(client, m_parentScene, m_regInfo, appearance); newAvatar.IsChildAgent = child; diff --git a/OpenSim/Region/Environment/Scenes/Scene.cs b/OpenSim/Region/Environment/Scenes/Scene.cs index 4f44d3607b..42fb411455 100644 --- a/OpenSim/Region/Environment/Scenes/Scene.cs +++ b/OpenSim/Region/Environment/Scenes/Scene.cs @@ -1342,10 +1342,13 @@ namespace OpenSim.Region.Environment.Scenes m_estateManager.sendRegionHandshake(client); + m_log.Debug("[SCENE]: Calling CreateScenePresence"); CreateAndAddScenePresence(client, child); - + m_log.Debug("[SCENE]: Sending parcel overlay"); m_LandManager.sendParcelOverlay(client); + m_log.Debug("[SCENE]: Adding user to cache."); CommsManager.UserProfileCacheService.AddNewUser(client.AgentId); + m_log.Debug("[SCENE]: Done with adding new user."); } protected virtual void SubscribeToClientEvents(IClientAPI client) @@ -1434,7 +1437,7 @@ namespace OpenSim.Region.Environment.Scenes AvatarAppearance appearance; GetAvatarAppearance(client, out appearance); - + m_log.Debug("[SCENE]: Calling plugin CreateAndAddScenePresence"); avatar = m_innerScene.CreateAndAddScenePresence(client, child, appearance); if (avatar.IsChildAgent) diff --git a/OpenSim/Region/Examples/SimpleApp/MyNpcCharacter.cs b/OpenSim/Region/Examples/SimpleApp/MyNpcCharacter.cs index e8218001b3..2982ac9c63 100644 --- a/OpenSim/Region/Examples/SimpleApp/MyNpcCharacter.cs +++ b/OpenSim/Region/Examples/SimpleApp/MyNpcCharacter.cs @@ -158,6 +158,11 @@ namespace SimpleApp private LLUUID myID = LLUUID.Random(); + public bool Claim(EndPoint ep, UseCircuitCodePacket packet) + { + throw new Exception("Unimplemented!"); + } + public MyNpcCharacter(EventManager eventManager) { // startPos = new LLVector3(128, (float)(Util.RandomClass.NextDouble()*100), 2); @@ -204,6 +209,11 @@ namespace SimpleApp get { return FirstName + LastName; } } + public EndPoint EndPoint + { + get { throw new Exception("Unimplemented.");} + set { throw new Exception("Unimplemented.");} + } public virtual void OutPacket(Packet newPack, ThrottleOutPacketType packType) {