From d38161f83d08e8f36905faaec30fcb9bbce9a319 Mon Sep 17 00:00:00 2001 From: UbitUmarov Date: Mon, 22 Jan 2018 17:09:38 +0000 Subject: [PATCH] retire our BlockingQueue replaced by BlockingCollection and cross fingers --- OpenSim/Framework/BlockingQueue.cs | 148 ------------------ .../ClientStack/Linden/UDP/LLUDPServer.cs | 22 +-- .../World/Estate/EstateManagementModule.cs | 12 +- .../World/WorldMap/WorldMapModule.cs | 13 +- .../PhysicsModules/ubOde/ODEMeshWorker.cs | 12 +- 5 files changed, 33 insertions(+), 174 deletions(-) delete mode 100644 OpenSim/Framework/BlockingQueue.cs diff --git a/OpenSim/Framework/BlockingQueue.cs b/OpenSim/Framework/BlockingQueue.cs deleted file mode 100644 index 2461049107..0000000000 --- a/OpenSim/Framework/BlockingQueue.cs +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Copyright (c) Contributors, http://opensimulator.org/ - * See CONTRIBUTORS.TXT for a full list of copyright holders. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * * Neither the name of the OpenSimulator Project nor the - * names of its contributors may be used to endorse or promote products - * derived from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY - * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED - * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE - * DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY - * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES - * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND - * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS - * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -using System.Collections.Generic; -using System.Threading; - -namespace OpenSim.Framework -{ - public class BlockingQueue - { - private readonly Queue m_pqueue = new Queue(); - private readonly Queue m_queue = new Queue(); - private readonly object m_queueSync = new object(); - - public void PriorityEnqueue(T value) - { - lock (m_queueSync) - { - m_pqueue.Enqueue(value); - Monitor.Pulse(m_queueSync); - } - } - - public void Enqueue(T value) - { - lock (m_queueSync) - { - m_queue.Enqueue(value); - Monitor.Pulse(m_queueSync); - } - } - - public T Dequeue() - { - lock (m_queueSync) - { - while (m_queue.Count < 1 && m_pqueue.Count < 1) - { - Monitor.Wait(m_queueSync); - } - - if (m_pqueue.Count > 0) - return m_pqueue.Dequeue(); - - if (m_queue.Count > 0) - return m_queue.Dequeue(); - return default(T); - } - } - - public T Dequeue(int msTimeout) - { - lock (m_queueSync) - { - if (m_queue.Count < 1 && m_pqueue.Count < 1) - { - if(!Monitor.Wait(m_queueSync, msTimeout)) - return default(T); - } - - if (m_pqueue.Count > 0) - return m_pqueue.Dequeue(); - if (m_queue.Count > 0) - return m_queue.Dequeue(); - return default(T); - } - } - - /// - /// Indicate whether this queue contains the given item. - /// - /// - /// This method is not thread-safe. Do not rely on the result without consistent external locking. - /// - public bool Contains(T item) - { - lock (m_queueSync) - { - if (m_queue.Count < 1 && m_pqueue.Count < 1) - return false; - - if (m_pqueue.Contains(item)) - return true; - return m_queue.Contains(item); - } - } - - /// - /// Return a count of the number of requests on this queue. - /// - public int Count() - { - lock (m_queueSync) - return m_queue.Count + m_pqueue.Count; - } - - /// - /// Return the array of items on this queue. - /// - /// - /// This method is not thread-safe. Do not rely on the result without consistent external locking. - /// - public T[] GetQueueArray() - { - lock (m_queueSync) - { - if (m_queue.Count < 1 && m_pqueue.Count < 1) - return new T[0]; - - return m_queue.ToArray(); - } - } - - public void Clear() - { - lock (m_queueSync) - { - m_pqueue.Clear(); - m_queue.Clear(); - Monitor.Pulse(m_queueSync); - } - } - } -} diff --git a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs index 72b611695a..58094d316d 100644 --- a/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs +++ b/OpenSim/Region/ClientStack/Linden/UDP/LLUDPServer.cs @@ -27,6 +27,7 @@ using System; using System.Collections.Generic; +using System.Collections.Concurrent; using System.Diagnostics; using System.IO; using System.Net; @@ -285,7 +286,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP /// Incoming packets that are awaiting handling //protected OpenMetaverse.BlockingQueue packetInbox = new OpenMetaverse.BlockingQueue(); - protected OpenSim.Framework.BlockingQueue packetInbox = new OpenSim.Framework.BlockingQueue(); + protected BlockingCollection packetInbox = new BlockingCollection(); /// Bandwidth throttle for this UDP server public TokenBucket Throttle { get; protected set; } @@ -712,7 +713,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP scene.Name, StatType.Pull, MeasuresOfInterest.AverageChangeOverTime, - stat => stat.Value = packetInbox.Count(), + stat => stat.Value = packetInbox.Count, StatVerbosity.Debug)); // XXX: These stats are also pool stats but we register them separately since they are currently not @@ -1546,10 +1547,11 @@ namespace OpenSim.Region.ClientStack.LindenUDP // if (incomingPacket.Packet.Type == PacketType.AgentUpdate || // incomingPacket.Packet.Type == PacketType.ChatFromViewer) - if (incomingPacket.Packet.Type == PacketType.ChatFromViewer) - packetInbox.PriorityEnqueue(incomingPacket); - else - packetInbox.Enqueue(incomingPacket); +// if (incomingPacket.Packet.Type == PacketType.ChatFromViewer) +// packetInbox.PriorityEnqueue(incomingPacket); +// else +// packetInbox.Enqueue(incomingPacket); + packetInbox.Add(incomingPacket); } @@ -2018,7 +2020,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP Scene.ThreadAlive(1); try { - incomingPacket = packetInbox.Dequeue(250); + packetInbox.TryTake(out incomingPacket, 250); if (incomingPacket != null && IsRunningInbound) { @@ -2040,9 +2042,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP Watchdog.UpdateThread(); } - if (packetInbox.Count() > 0) - m_log.Warn("[LLUDPSERVER]: IncomingPacketHandler is shutting down, dropping " + packetInbox.Count() + " packets"); - packetInbox.Clear(); + if (packetInbox.Count > 0) + m_log.Warn("[LLUDPSERVER]: IncomingPacketHandler is shutting down, dropping " + packetInbox.Count + " packets"); + packetInbox.Dispose(); Watchdog.RemoveThread(); } diff --git a/OpenSim/Region/CoreModules/World/Estate/EstateManagementModule.cs b/OpenSim/Region/CoreModules/World/Estate/EstateManagementModule.cs index 3c45b68717..0ca76e4cfb 100644 --- a/OpenSim/Region/CoreModules/World/Estate/EstateManagementModule.cs +++ b/OpenSim/Region/CoreModules/World/Estate/EstateManagementModule.cs @@ -26,7 +26,7 @@ */ using System; -using System.Collections; +using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; @@ -668,7 +668,7 @@ namespace OpenSim.Region.CoreModules.World.Estate public UUID user; } - private OpenSim.Framework.BlockingQueue deltaRequests = new OpenSim.Framework.BlockingQueue(); + private BlockingCollection deltaRequests = new BlockingCollection(); private void handleEstateAccessDeltaRequest(IClientAPI _remote_client, UUID _invoice, int _estateAccessType, UUID _user) { @@ -683,7 +683,7 @@ namespace OpenSim.Region.CoreModules.World.Estate newreq.estateAccessType = _estateAccessType; newreq.user = _user; - deltaRequests.Enqueue(newreq); + deltaRequests.Add(newreq); lock(deltareqLock) { @@ -713,9 +713,11 @@ namespace OpenSim.Region.CoreModules.World.Estate bool sentGroupsFull = false; bool sentManagersFull = false; + EstateAccessDeltaRequest req; while(Scene.IsRunning) { - EstateAccessDeltaRequest req = deltaRequests.Dequeue(500); + req = null; + deltaRequests.TryTake(out req, 500); if(!Scene.IsRunning) break; @@ -757,7 +759,7 @@ namespace OpenSim.Region.CoreModules.World.Estate changed.Clear(); lock(deltareqLock) { - if(deltaRequests.Count() != 0) + if(deltaRequests.Count != 0) continue; runnigDeltaExec = false; return; diff --git a/OpenSim/Region/CoreModules/World/WorldMap/WorldMapModule.cs b/OpenSim/Region/CoreModules/World/WorldMap/WorldMapModule.cs index 03a4d3475e..b5a6912d3b 100644 --- a/OpenSim/Region/CoreModules/World/WorldMap/WorldMapModule.cs +++ b/OpenSim/Region/CoreModules/World/WorldMap/WorldMapModule.cs @@ -27,6 +27,7 @@ using System; using System.Collections; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Drawing; using System.Drawing.Imaging; @@ -67,7 +68,7 @@ namespace OpenSim.Region.CoreModules.World.WorldMap private static readonly string DEFAULT_WORLD_MAP_EXPORT_PATH = "exportmap.jpg"; private static readonly UUID STOP_UUID = UUID.Random(); - private OpenSim.Framework.BlockingQueue requests = new OpenSim.Framework.BlockingQueue(); + private BlockingCollection requests = new BlockingCollection(); private ManualResetEvent m_mapBlockRequestEvent = new ManualResetEvent(false); private Dictionary> m_mapBlockRequests = new Dictionary>(); @@ -422,7 +423,7 @@ namespace OpenSim.Region.CoreModules.World.WorldMap st.itemtype = 0; st.regionhandle = 0; - requests.Enqueue(st); + requests.Add(st); MapBlockRequestData req = new MapBlockRequestData(); @@ -719,7 +720,7 @@ namespace OpenSim.Region.CoreModules.World.WorldMap av = null; st = null; - st = requests.Dequeue(4500); + requests.TryTake(out st, 4500); Watchdog.UpdateThread(); if (st == null || st.agentID == UUID.Zero) @@ -795,8 +796,8 @@ namespace OpenSim.Region.CoreModules.World.WorldMap else { // request still beeing processed, enqueue it back - requests.Enqueue(st); - if (requests.Count() < 3) + requests.Add(st); + if (requests.Count < 3) Thread.Sleep(100); } } @@ -839,7 +840,7 @@ namespace OpenSim.Region.CoreModules.World.WorldMap st.itemtype = itemtype; st.regionhandle = regionhandle; - requests.Enqueue(st); + requests.Add(st); } uint[] itemTypesForcedSend = new uint[] { 6, 1, 7, 10 }; // green dots, infohub, land sells diff --git a/OpenSim/Region/PhysicsModules/ubOde/ODEMeshWorker.cs b/OpenSim/Region/PhysicsModules/ubOde/ODEMeshWorker.cs index f4e2b1feb5..bdfbe3d5b7 100644 --- a/OpenSim/Region/PhysicsModules/ubOde/ODEMeshWorker.cs +++ b/OpenSim/Region/PhysicsModules/ubOde/ODEMeshWorker.cs @@ -3,6 +3,7 @@ */ using System; +using System.Collections.Concurrent; using System.Threading; using OpenSim.Framework; using OpenSim.Region.PhysicsModules.SharedBase; @@ -73,7 +74,7 @@ namespace OpenSim.Region.PhysicsModule.ubOde public float MeshSculptphysicalLOD = 32; public float MinSizeToMeshmerize = 0.1f; - private OpenSim.Framework.BlockingQueue workQueue = new OpenSim.Framework.BlockingQueue(); + private BlockingCollection workQueue = new BlockingCollection(); private bool m_running; private Thread m_thread; @@ -100,10 +101,11 @@ namespace OpenSim.Region.PhysicsModule.ubOde private void DoWork() { m_mesher.ExpireFileCache(); + ODEPhysRepData nextRep; while(m_running) { - ODEPhysRepData nextRep = workQueue.Dequeue(); + workQueue.TryTake(out nextRep, -1); if(!m_running) return; if (nextRep == null) @@ -132,7 +134,7 @@ namespace OpenSim.Region.PhysicsModule.ubOde try { m_thread.Abort(); - workQueue.Clear(); + // workQueue.Dispose(); } catch { @@ -189,7 +191,7 @@ namespace OpenSim.Region.PhysicsModule.ubOde repData.meshState = MeshState.loadingAsset; repData.comand = meshWorkerCmnds.getmesh; - workQueue.Enqueue(repData); + workQueue.Add(repData); } } @@ -235,7 +237,7 @@ namespace OpenSim.Region.PhysicsModule.ubOde if (needsMeshing(repData)) // no need for pbs now? { repData.comand = meshWorkerCmnds.changefull; - workQueue.Enqueue(repData); + workQueue.Add(repData); } } else