diff --git a/OpenSim/Region/ClientStack/Linden/UDP/IncomingPacketAsyncHandlingEngine.cs b/OpenSim/Region/ClientStack/Linden/UDP/IncomingPacketAsyncHandlingEngine.cs new file mode 100644 index 0000000000..874ddaec75 --- /dev/null +++ b/OpenSim/Region/ClientStack/Linden/UDP/IncomingPacketAsyncHandlingEngine.cs @@ -0,0 +1,328 @@ +/* + * Copyright (c) Contributors, http://opensimulator.org/ + * See CONTRIBUTORS.TXT for a full list of copyright holders. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of the OpenSimulator Project nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +using System; +using System.Collections.Concurrent; +using System.Reflection; +using System.Threading; +using log4net; +using OpenSim.Framework; +using OpenSim.Framework.Monitoring; +using OpenSim.Region.Framework.Scenes; + +namespace OpenSim.Region.ClientStack.LindenUDP +{ + public class Job + { + public string Name; + public WaitCallback Callback; + public object O; + + public Job(string name, WaitCallback callback, object o) + { + Name = name; + Callback = callback; + O = o; + } + } + + // TODO: These kinds of classes MUST be generalized with JobEngine, etc. + public class IncomingPacketAsyncHandlingEngine + { + private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); + + public int LogLevel { get; set; } + + public bool IsRunning { get; private set; } + + /// + /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping. + /// + public int RequestProcessTimeoutOnStop { get; set; } + + /// + /// Controls whether we need to warn in the log about exceeding the max queue size. + /// + /// + /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in + /// order to avoid spamming the log with lots of warnings. + /// + private bool m_warnOverMaxQueue = true; + + private BlockingCollection m_requestQueue; + + private CancellationTokenSource m_cancelSource = new CancellationTokenSource(); + + private LLUDPServer m_udpServer; + + private Stat m_requestsWaitingStat; + + private Job m_currentJob; + + /// + /// Used to signal that we are ready to complete stop. + /// + private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false); + + public IncomingPacketAsyncHandlingEngine(LLUDPServer server) + { + //LogLevel = 1; + m_udpServer = server; + RequestProcessTimeoutOnStop = 5000; + + // MainConsole.Instance.Commands.AddCommand( + // "Debug", + // false, + // "debug jobengine", + // "debug jobengine ", + // "Start, stop or get status of the job engine.", + // "If stopped then all jobs are processed immediately.", + // HandleControlCommand); + } + + public void Start() + { + lock (this) + { + if (IsRunning) + return; + + IsRunning = true; + + m_finishedProcessingAfterStop.Reset(); + + m_requestQueue = new BlockingCollection(new ConcurrentQueue(), 5000); + + m_requestsWaitingStat = + new Stat( + "IncomingPacketAsyncRequestsWaiting", + "Number of incoming packets waiting for async processing in engine.", + "", + "", + "clientstack", + m_udpServer.Scene.Name, + StatType.Pull, + MeasuresOfInterest.None, + stat => stat.Value = m_requestQueue.Count, + StatVerbosity.Debug); + + StatsManager.RegisterStat(m_requestsWaitingStat); + + Watchdog.StartThread( + ProcessRequests, + string.Format("Incoming Packet Async Handling Engine Thread ({0})", m_udpServer.Scene.Name), + ThreadPriority.Normal, + false, + true, + null, + int.MaxValue); + } + } + + public void Stop() + { + lock (this) + { + try + { + if (!IsRunning) + return; + + IsRunning = false; + + int requestsLeft = m_requestQueue.Count; + + if (requestsLeft <= 0) + { + m_cancelSource.Cancel(); + } + else + { + m_log.InfoFormat("[INCOMING PACKET ASYNC HANDLING ENGINE]: Waiting to write {0} events after stop.", requestsLeft); + + while (requestsLeft > 0) + { + if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop)) + { + // After timeout no events have been written + if (requestsLeft == m_requestQueue.Count) + { + m_log.WarnFormat( + "[INCOMING PACKET ASYNC HANDLING ENGINE]: No requests processed after {0} ms wait. Discarding remaining {1} requests", + RequestProcessTimeoutOnStop, requestsLeft); + + break; + } + } + + requestsLeft = m_requestQueue.Count; + } + } + } + finally + { + m_cancelSource.Dispose(); + StatsManager.DeregisterStat(m_requestsWaitingStat); + m_requestsWaitingStat = null; + m_requestQueue = null; + } + } + } + + public bool QueueRequest(string name, WaitCallback req, object o) + { + if (LogLevel >= 1) + m_log.DebugFormat("[INCOMING PACKET ASYNC HANDLING ENGINE]: Queued job {0}", name); + + if (m_requestQueue.Count < m_requestQueue.BoundedCapacity) + { + // m_log.DebugFormat( + // "[OUTGOING QUEUE REFILL ENGINE]: Adding request for categories {0} for {1} in {2}", + // categories, client.AgentID, m_udpServer.Scene.Name); + + m_requestQueue.Add(new Job(name, req, o)); + + if (!m_warnOverMaxQueue) + m_warnOverMaxQueue = true; + + return true; + } + else + { + if (m_warnOverMaxQueue) + { + // m_log.WarnFormat( + // "[JOB ENGINE]: Request queue at maximum capacity, not recording request from {0} in {1}", + // client.AgentID, m_udpServer.Scene.Name); + + m_log.WarnFormat("[INCOMING PACKET ASYNC HANDLING ENGINE]: Request queue at maximum capacity, not recording job"); + + m_warnOverMaxQueue = false; + } + + return false; + } + } + + private void ProcessRequests() + { + try + { + while (IsRunning || m_requestQueue.Count > 0) + { + m_currentJob = m_requestQueue.Take(m_cancelSource.Token); + + // QueueEmpty callback = req.Client.OnQueueEmpty; + // + // if (callback != null) + // { + // try + // { + // callback(req.Categories); + // } + // catch (Exception e) + // { + // m_log.Error("[OUTGOING QUEUE REFILL ENGINE]: ProcessRequests(" + req.Categories + ") threw an exception: " + e.Message, e); + // } + // } + + if (LogLevel >= 1) + m_log.DebugFormat("[INCOMING PACKET ASYNC HANDLING ENGINE]: Processing job {0}", m_currentJob.Name); + + try + { + m_currentJob.Callback.Invoke(m_currentJob.O); + } + catch (Exception e) + { + m_log.Error( + string.Format( + "[INCOMING PACKET ASYNC HANDLING ENGINE]: Job {0} failed, continuing. Exception ", m_currentJob.Name), e); + } + + if (LogLevel >= 1) + m_log.DebugFormat("[INCOMING PACKET ASYNC HANDLING ENGINE]: Processed job {0}", m_currentJob.Name); + + m_currentJob = null; + } + } + catch (OperationCanceledException) + { + } + + m_finishedProcessingAfterStop.Set(); + } + + // private void HandleControlCommand(string module, string[] args) + // { + // // if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene) + // // return; + // + // if (args.Length < 3) + // { + // MainConsole.Instance.Output("Usage: debug jobengine "); + // return; + // } + // + // string subCommand = args[2]; + // + // if (subCommand == "stop") + // { + // Stop(); + // MainConsole.Instance.OutputFormat("Stopped job engine."); + // } + // else if (subCommand == "start") + // { + // Start(); + // MainConsole.Instance.OutputFormat("Started job engine."); + // } + // else if (subCommand == "status") + // { + // MainConsole.Instance.OutputFormat("Job engine running: {0}", IsRunning); + // MainConsole.Instance.OutputFormat("Current job {0}", m_currentJob != null ? m_currentJob.Name : "none"); + // MainConsole.Instance.OutputFormat( + // "Jobs waiting: {0}", IsRunning ? m_requestQueue.Count.ToString() : "n/a"); + // MainConsole.Instance.OutputFormat("Log Level: {0}", LogLevel); + // } + // + // else if (subCommand == "loglevel") + // { + // // int logLevel; + // int logLevel = int.Parse(args[3]); + // // if (ConsoleUtil.TryParseConsoleInt(MainConsole.Instance, args[4], out logLevel)) + // // { + // LogLevel = logLevel; + // MainConsole.Instance.OutputFormat("Set log level to {0}", LogLevel); + // // } + // } + // else + // { + // MainConsole.Instance.OutputFormat("Unrecognized job engine subcommand {0}", subCommand); + // } + // } + } +} diff --git a/OpenSim/Region/ClientStack/Linden/UDP/LLClientView.cs b/OpenSim/Region/ClientStack/Linden/UDP/LLClientView.cs index 516327ccb7..85f9d68401 100644 --- a/OpenSim/Region/ClientStack/Linden/UDP/LLClientView.cs +++ b/OpenSim/Region/ClientStack/Linden/UDP/LLClientView.cs @@ -647,13 +647,37 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// /// true if the handler was added. This is currently always the case. public bool AddLocalPacketHandler(PacketType packetType, PacketMethod handler, bool doAsync) + { + return AddLocalPacketHandler(packetType, handler, doAsync, false); + } + + /// + /// Add a handler for the given packet type. + /// + /// + /// + /// + /// If true, when the packet is received handle it on a different thread. Whether this is given direct to + /// a threadpool thread or placed in a queue depends on the inEngine parameter. + /// + /// + /// If async is false then this parameter is ignored. + /// If async is true and inEngine is false, then the packet is sent directly to a + /// threadpool thread. + /// If async is true and inEngine is true, then the packet is sent to the IncomingPacketAsyncHandlingEngine. + /// This may result in slower handling but reduces the risk of overloading the simulator when there are many + /// simultaneous async requests. + /// + /// true if the handler was added. This is currently always the case. + public bool AddLocalPacketHandler(PacketType packetType, PacketMethod handler, bool doAsync, bool inEngine) { bool result = false; lock (m_packetHandlers) { if (!m_packetHandlers.ContainsKey(packetType)) { - m_packetHandlers.Add(packetType, new PacketProcessor() { method = handler, Async = doAsync }); + m_packetHandlers.Add( + packetType, new PacketProcessor() { method = handler, Async = doAsync, InEngine = inEngine }); result = true; } } @@ -688,21 +712,29 @@ namespace OpenSim.Region.ClientStack.LindenUDP PacketProcessor pprocessor; if (m_packetHandlers.TryGetValue(packet.Type, out pprocessor)) { + ClientInfo cinfo = UDPClient.GetClientInfo(); + //there is a local handler for this packet type if (pprocessor.Async) { - ClientInfo cinfo = UDPClient.GetClientInfo(); if (!cinfo.AsyncRequests.ContainsKey(packet.Type.ToString())) cinfo.AsyncRequests[packet.Type.ToString()] = 0; cinfo.AsyncRequests[packet.Type.ToString()]++; object obj = new AsyncPacketProcess(this, pprocessor.method, packet); - Util.FireAndForget(ProcessSpecificPacketAsync, obj, packet.Type.ToString()); + + if (pprocessor.InEngine) + m_udpServer.IpahEngine.QueueRequest( + packet.Type.ToString(), + ProcessSpecificPacketAsync, + obj); + else + Util.FireAndForget(ProcessSpecificPacketAsync, obj, packet.Type.ToString()); + result = true; } else { - ClientInfo cinfo = UDPClient.GetClientInfo(); if (!cinfo.SyncRequests.ContainsKey(packet.Type.ToString())) cinfo.SyncRequests[packet.Type.ToString()] = 0; cinfo.SyncRequests[packet.Type.ToString()]++; @@ -5554,10 +5586,10 @@ namespace OpenSim.Region.ClientStack.LindenUDP AddLocalPacketHandler(PacketType.ParcelBuy, HandleParcelBuyRequest, false); AddLocalPacketHandler(PacketType.UUIDGroupNameRequest, HandleUUIDGroupNameRequest); AddLocalPacketHandler(PacketType.ObjectGroup, HandleObjectGroupRequest); - AddLocalPacketHandler(PacketType.GenericMessage, HandleGenericMessage); - AddLocalPacketHandler(PacketType.AvatarPropertiesRequest, HandleAvatarPropertiesRequest); + AddLocalPacketHandler(PacketType.GenericMessage, HandleGenericMessage, true, true); + AddLocalPacketHandler(PacketType.AvatarPropertiesRequest, HandleAvatarPropertiesRequest, true, true); AddLocalPacketHandler(PacketType.ChatFromViewer, HandleChatFromViewer); - AddLocalPacketHandler(PacketType.AvatarPropertiesUpdate, HandlerAvatarPropertiesUpdate); + AddLocalPacketHandler(PacketType.AvatarPropertiesUpdate, HandlerAvatarPropertiesUpdate, true, true); AddLocalPacketHandler(PacketType.ScriptDialogReply, HandlerScriptDialogReply); AddLocalPacketHandler(PacketType.ImprovedInstantMessage, HandlerImprovedInstantMessage); AddLocalPacketHandler(PacketType.AcceptFriendship, HandlerAcceptFriendship); @@ -5742,8 +5774,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP AddLocalPacketHandler(PacketType.PickDelete, HandlePickDelete); AddLocalPacketHandler(PacketType.PickGodDelete, HandlePickGodDelete); AddLocalPacketHandler(PacketType.PickInfoUpdate, HandlePickInfoUpdate); - AddLocalPacketHandler(PacketType.AvatarNotesUpdate, HandleAvatarNotesUpdate); - AddLocalPacketHandler(PacketType.AvatarInterestsUpdate, HandleAvatarInterestsUpdate); + AddLocalPacketHandler(PacketType.AvatarNotesUpdate, HandleAvatarNotesUpdate, true, true); + AddLocalPacketHandler(PacketType.AvatarInterestsUpdate, HandleAvatarInterestsUpdate, true, true); AddLocalPacketHandler(PacketType.GrantUserRights, HandleGrantUserRights); AddLocalPacketHandler(PacketType.PlacesQuery, HandlePlacesQuery); AddLocalPacketHandler(PacketType.UpdateMuteListEntry, HandleUpdateMuteListEntry); @@ -12801,8 +12833,21 @@ namespace OpenSim.Region.ClientStack.LindenUDP public struct PacketProcessor { - public PacketMethod method; - public bool Async; + /// + /// Packet handling method. + /// + public PacketMethod method { get; set; } + + /// + /// Should this packet be handled asynchronously? + /// + public bool Async { get; set; } + + /// + /// If async is true, should this packet be handled in the async engine or given directly to a threadpool + /// thread? + /// + public bool InEngine { get; set; } } public class AsyncPacketProcess diff --git a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs index 61e1d6a1b1..d8cf7a5095 100644 --- a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs +++ b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs @@ -363,6 +363,12 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// private IClientAPI m_currentIncomingClient; + /// + /// Queue some low priority but potentially high volume async requests so that they don't overwhelm available + /// threadpool threads. + /// + public IncomingPacketAsyncHandlingEngine IpahEngine { get; private set; } + /// /// Experimental facility to run queue empty processing within a controlled number of threads rather than /// requiring massive numbers of short-lived threads from the threadpool when there are a high number of @@ -454,6 +460,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP if (usePools) EnablePools(); + IpahEngine = new IncomingPacketAsyncHandlingEngine(this); OqrEngine = new OutgoingQueueRefillEngine(this); } @@ -461,6 +468,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP { StartInbound(); StartOutbound(); + IpahEngine.Start(); OqrEngine.Start(); m_elapsedMSSinceLastStatReport = Environment.TickCount; @@ -506,6 +514,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP m_log.Info("[LLUDPSERVER]: Shutting down the LLUDP server for " + Scene.Name); base.StopOutbound(); base.StopInbound(); + IpahEngine.Stop(); OqrEngine.Stop(); } diff --git a/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs b/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs index 5cf2e1f37b..718db3189b 100644 --- a/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs +++ b/OpenSim/Region/CoreModules/Framework/EntityTransfer/HGIncomingSceneObjectEngine.cs @@ -133,7 +133,7 @@ namespace OpenSim.Region.CoreModules.Framework.EntityTransfer Watchdog.StartThread( ProcessRequests, - "HG Incoming Scene Object Engine Thread", + string.Format("HG Incoming Scene Object Engine Thread ({0})", Name), ThreadPriority.Normal, false, true,