diff --git a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs index 2f363f4d2e..c34bafa9b2 100644 --- a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs +++ b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPClient.cs @@ -650,8 +650,15 @@ namespace OpenSim.Region.ClientStack.LindenUDP if (HasUpdates(m_categories)) { - // Asynchronously run the callback - Util.FireAndForget(FireQueueEmpty, categories); + if (!m_udpServer.OqrEngine.IsRunning) + { + // Asynchronously run the callback + Util.FireAndForget(FireQueueEmpty, categories); + } + else + { + m_udpServer.OqrEngine.QueueRequest(this, categories); + } } else { @@ -670,7 +677,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// Throttle categories to fire the callback for, /// stored as an object to match the WaitCallback delegate /// signature - private void FireQueueEmpty(object o) + public void FireQueueEmpty(object o) { // m_log.DebugFormat("[LLUDPCLIENT]: FireQueueEmpty for {0} in {1}", AgentID, m_udpServer.Scene.Name); diff --git a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs index 465c86bef6..3c9bb1bd82 100644 --- a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs +++ b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs @@ -355,6 +355,13 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// private IClientAPI m_currentIncomingClient; + /// + /// Experimental facility to run queue empty processing within a controlled number of threads rather than + /// requiring massive numbers of short-lived threads from the threadpool when there are a high number of + /// connections. + /// + public OutgoingQueueRefillEngine OqrEngine { get; private set; } + public LLUDPServer( IPAddress listenIP, ref uint port, int proxyPortOffsetParm, bool allow_alternate_port, IConfigSource configSource, AgentCircuitManager circuitManager) @@ -432,6 +439,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP if (usePools) EnablePools(); + + OqrEngine = new OutgoingQueueRefillEngine(this); } public void Start() @@ -469,7 +478,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP Watchdog.StartThread( OutgoingPacketHandler, - string.Format("Outgoing Packets ({0})", Scene.RegionInfo.RegionName), + string.Format("Outgoing Packets ({0})", Scene.Name), ThreadPriority.Normal, false, true, @@ -479,7 +488,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP public void Stop() { - m_log.Info("[LLUDPSERVER]: Shutting down the LLUDP server for " + Scene.RegionInfo.RegionName); + m_log.Info("[LLUDPSERVER]: Shutting down the LLUDP server for " + Scene.Name); base.StopOutbound(); base.StopInbound(); } diff --git a/OpenSim/Region/ClientStack/Linden/UDP/OutgoingQueueRefillEngine.cs b/OpenSim/Region/ClientStack/Linden/UDP/OutgoingQueueRefillEngine.cs new file mode 100644 index 0000000000..8777402fc4 --- /dev/null +++ b/OpenSim/Region/ClientStack/Linden/UDP/OutgoingQueueRefillEngine.cs @@ -0,0 +1,267 @@ +/* + * Copyright (c) Contributors, http://opensimulator.org/ + * See CONTRIBUTORS.TXT for a full list of copyright holders. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of the OpenSimulator Project nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY + * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +using System; +using System.Collections.Concurrent; +using System.Reflection; +using System.Threading; +using log4net; +using OpenSim.Framework; +using OpenSim.Framework.Monitoring; +using OpenSim.Region.Framework.Scenes; + +namespace OpenSim.Region.ClientStack.LindenUDP +{ + public struct RefillRequest + { + public LLUDPClient Client; + public ThrottleOutPacketTypeFlags Categories; + + public RefillRequest(LLUDPClient client, ThrottleOutPacketTypeFlags categories) + { + Client = client; + Categories = categories; + } + } + + public class OutgoingQueueRefillEngine + { + private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); + + public bool IsRunning { get; private set; } + + /// + /// The timeout in milliseconds to wait for at least one event to be written when the recorder is stopping. + /// + public int RequestProcessTimeoutOnStop { get; set; } + + /// + /// Controls whether we need to warn in the log about exceeding the max queue size. + /// + /// + /// This is flipped to false once queue max has been exceeded and back to true when it falls below max, in + /// order to avoid spamming the log with lots of warnings. + /// + private bool m_warnOverMaxQueue = true; + + private BlockingCollection m_requestQueue; + + private CancellationTokenSource m_cancelSource = new CancellationTokenSource(); + + private LLUDPServer m_udpServer; + + /// + /// Used to signal that we are ready to complete stop. + /// + private ManualResetEvent m_finishedProcessingAfterStop = new ManualResetEvent(false); + + public OutgoingQueueRefillEngine(LLUDPServer server) + { + RequestProcessTimeoutOnStop = 5000; + m_udpServer = server; + + MainConsole.Instance.Commands.AddCommand( + "Debug", + false, + "debug lludp oqre", + "debug lludp oqre ", + "Start, stop or get status of OutgoingQueueRefillEngine.", + "Experimental.", + HandleOqreCommand); + } + + public void Start() + { + lock (this) + { + if (IsRunning) + return; + + IsRunning = true; + + m_finishedProcessingAfterStop.Reset(); + + m_requestQueue = new BlockingCollection(new ConcurrentQueue(), 5000); + + Watchdog.StartThread( + ProcessRequests, + String.Format("OutgoingQueueRefillEngineThread ({0})", m_udpServer.Scene.Name), + ThreadPriority.Normal, + false, + true, + null, + int.MaxValue); + } + } + + public void Stop() + { + lock (this) + { + try + { + if (!IsRunning) + return; + + IsRunning = false; + + int requestsLeft = m_requestQueue.Count; + + if (requestsLeft <= 0) + { + m_cancelSource.Cancel(); + } + else + { + m_log.InfoFormat("[OUTGOING QUEUE REFILL ENGINE]: Waiting to write {0} events after stop.", requestsLeft); + + while (requestsLeft > 0) + { + if (!m_finishedProcessingAfterStop.WaitOne(RequestProcessTimeoutOnStop)) + { + // After timeout no events have been written + if (requestsLeft == m_requestQueue.Count) + { + m_log.WarnFormat( + "[OUTGOING QUEUE REFILL ENGINE]: No requests processed after {0} ms wait. Discarding remaining {1} requests", + RequestProcessTimeoutOnStop, requestsLeft); + + break; + } + } + + requestsLeft = m_requestQueue.Count; + } + } + } + finally + { + m_cancelSource.Dispose(); + m_requestQueue = null; + } + } + } + + public bool QueueRequest(LLUDPClient client, ThrottleOutPacketTypeFlags categories) + { + if (m_requestQueue.Count < m_requestQueue.BoundedCapacity) + { +// m_log.DebugFormat( +// "[OUTGOING QUEUE REFILL ENGINE]: Adding request for categories {0} for {1} in {2}", +// categories, client.AgentID, m_udpServer.Scene.Name); + + m_requestQueue.Add(new RefillRequest(client, categories)); + + if (!m_warnOverMaxQueue) + m_warnOverMaxQueue = true; + + return true; + } + else + { + if (m_warnOverMaxQueue) + { + m_log.WarnFormat( + "[OUTGOING QUEUE REFILL ENGINE]: Request queue at maximum capacity, not recording request from {0} in {1}", + client.AgentID, m_udpServer.Scene.Name); + + m_warnOverMaxQueue = false; + } + + return false; + } + } + + private void ProcessRequests() + { + try + { + while (IsRunning || m_requestQueue.Count > 0) + { + RefillRequest req = m_requestQueue.Take(m_cancelSource.Token); + + // QueueEmpty callback = req.Client.OnQueueEmpty; + // + // if (callback != null) + // { + // try + // { + // callback(req.Categories); + // } + // catch (Exception e) + // { + // m_log.Error("[OUTGOING QUEUE REFILL ENGINE]: ProcessRequests(" + req.Categories + ") threw an exception: " + e.Message, e); + // } + // } + + req.Client.FireQueueEmpty(req.Categories); + } + } + catch (OperationCanceledException) + { + } + + m_finishedProcessingAfterStop.Set(); + } + + private void HandleOqreCommand(string module, string[] args) + { + if (SceneManager.Instance.CurrentScene != null && SceneManager.Instance.CurrentScene != m_udpServer.Scene) + return; + + if (args.Length != 4) + { + MainConsole.Instance.Output("Usage: debug lludp oqre "); + return; + } + + string subCommand = args[3]; + + if (subCommand == "stop") + { + Stop(); + MainConsole.Instance.OutputFormat("Stopped OQRE for {0}", m_udpServer.Scene.Name); + } + else if (subCommand == "start") + { + Start(); + MainConsole.Instance.OutputFormat("Started OQRE for {0}", m_udpServer.Scene.Name); + } + else if (subCommand == "status") + { + MainConsole.Instance.OutputFormat("OQRE in {0}", m_udpServer.Scene.Name); + MainConsole.Instance.OutputFormat("Running: {0}", IsRunning); + MainConsole.Instance.OutputFormat( + "Requests waiting: {0}", IsRunning ? m_requestQueue.Count.ToString() : "n/a"); + } + else + { + MainConsole.Instance.OutputFormat("Unrecognized OQRE subcommand {0}", subCommand); + } + } + } +} \ No newline at end of file