retire our BlockingQueue replaced by BlockingCollection and cross fingers

httptests
UbitUmarov 2018-01-22 17:09:38 +00:00
parent 437369778d
commit d38161f83d
5 changed files with 33 additions and 174 deletions

View File

@ -1,148 +0,0 @@
/*
* Copyright (c) Contributors, http://opensimulator.org/
* See CONTRIBUTORS.TXT for a full list of copyright holders.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of the OpenSimulator Project nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
using System.Collections.Generic;
using System.Threading;
namespace OpenSim.Framework
{
public class BlockingQueue<T>
{
private readonly Queue<T> m_pqueue = new Queue<T>();
private readonly Queue<T> m_queue = new Queue<T>();
private readonly object m_queueSync = new object();
public void PriorityEnqueue(T value)
{
lock (m_queueSync)
{
m_pqueue.Enqueue(value);
Monitor.Pulse(m_queueSync);
}
}
public void Enqueue(T value)
{
lock (m_queueSync)
{
m_queue.Enqueue(value);
Monitor.Pulse(m_queueSync);
}
}
public T Dequeue()
{
lock (m_queueSync)
{
while (m_queue.Count < 1 && m_pqueue.Count < 1)
{
Monitor.Wait(m_queueSync);
}
if (m_pqueue.Count > 0)
return m_pqueue.Dequeue();
if (m_queue.Count > 0)
return m_queue.Dequeue();
return default(T);
}
}
public T Dequeue(int msTimeout)
{
lock (m_queueSync)
{
if (m_queue.Count < 1 && m_pqueue.Count < 1)
{
if(!Monitor.Wait(m_queueSync, msTimeout))
return default(T);
}
if (m_pqueue.Count > 0)
return m_pqueue.Dequeue();
if (m_queue.Count > 0)
return m_queue.Dequeue();
return default(T);
}
}
/// <summary>
/// Indicate whether this queue contains the given item.
/// </summary>
/// <remarks>
/// This method is not thread-safe. Do not rely on the result without consistent external locking.
/// </remarks>
public bool Contains(T item)
{
lock (m_queueSync)
{
if (m_queue.Count < 1 && m_pqueue.Count < 1)
return false;
if (m_pqueue.Contains(item))
return true;
return m_queue.Contains(item);
}
}
/// <summary>
/// Return a count of the number of requests on this queue.
/// </summary>
public int Count()
{
lock (m_queueSync)
return m_queue.Count + m_pqueue.Count;
}
/// <summary>
/// Return the array of items on this queue.
/// </summary>
/// <remarks>
/// This method is not thread-safe. Do not rely on the result without consistent external locking.
/// </remarks>
public T[] GetQueueArray()
{
lock (m_queueSync)
{
if (m_queue.Count < 1 && m_pqueue.Count < 1)
return new T[0];
return m_queue.ToArray();
}
}
public void Clear()
{
lock (m_queueSync)
{
m_pqueue.Clear();
m_queue.Clear();
Monitor.Pulse(m_queueSync);
}
}
}
}

View File

@ -27,6 +27,7 @@
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO;
using System.Net;
@ -285,7 +286,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
/// <summary>Incoming packets that are awaiting handling</summary>
//protected OpenMetaverse.BlockingQueue<IncomingPacket> packetInbox = new OpenMetaverse.BlockingQueue<IncomingPacket>();
protected OpenSim.Framework.BlockingQueue<IncomingPacket> packetInbox = new OpenSim.Framework.BlockingQueue<IncomingPacket>();
protected BlockingCollection<IncomingPacket> packetInbox = new BlockingCollection<IncomingPacket>();
/// <summary>Bandwidth throttle for this UDP server</summary>
public TokenBucket Throttle { get; protected set; }
@ -712,7 +713,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
scene.Name,
StatType.Pull,
MeasuresOfInterest.AverageChangeOverTime,
stat => stat.Value = packetInbox.Count(),
stat => stat.Value = packetInbox.Count,
StatVerbosity.Debug));
// XXX: These stats are also pool stats but we register them separately since they are currently not
@ -1546,10 +1547,11 @@ namespace OpenSim.Region.ClientStack.LindenUDP
// if (incomingPacket.Packet.Type == PacketType.AgentUpdate ||
// incomingPacket.Packet.Type == PacketType.ChatFromViewer)
if (incomingPacket.Packet.Type == PacketType.ChatFromViewer)
packetInbox.PriorityEnqueue(incomingPacket);
else
packetInbox.Enqueue(incomingPacket);
// if (incomingPacket.Packet.Type == PacketType.ChatFromViewer)
// packetInbox.PriorityEnqueue(incomingPacket);
// else
// packetInbox.Enqueue(incomingPacket);
packetInbox.Add(incomingPacket);
}
@ -2018,7 +2020,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
Scene.ThreadAlive(1);
try
{
incomingPacket = packetInbox.Dequeue(250);
packetInbox.TryTake(out incomingPacket, 250);
if (incomingPacket != null && IsRunningInbound)
{
@ -2040,9 +2042,9 @@ namespace OpenSim.Region.ClientStack.LindenUDP
Watchdog.UpdateThread();
}
if (packetInbox.Count() > 0)
m_log.Warn("[LLUDPSERVER]: IncomingPacketHandler is shutting down, dropping " + packetInbox.Count() + " packets");
packetInbox.Clear();
if (packetInbox.Count > 0)
m_log.Warn("[LLUDPSERVER]: IncomingPacketHandler is shutting down, dropping " + packetInbox.Count + " packets");
packetInbox.Dispose();
Watchdog.RemoveThread();
}

View File

@ -26,7 +26,7 @@
*/
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
@ -668,7 +668,7 @@ namespace OpenSim.Region.CoreModules.World.Estate
public UUID user;
}
private OpenSim.Framework.BlockingQueue<EstateAccessDeltaRequest> deltaRequests = new OpenSim.Framework.BlockingQueue<EstateAccessDeltaRequest>();
private BlockingCollection<EstateAccessDeltaRequest> deltaRequests = new BlockingCollection<EstateAccessDeltaRequest>();
private void handleEstateAccessDeltaRequest(IClientAPI _remote_client, UUID _invoice, int _estateAccessType, UUID _user)
{
@ -683,7 +683,7 @@ namespace OpenSim.Region.CoreModules.World.Estate
newreq.estateAccessType = _estateAccessType;
newreq.user = _user;
deltaRequests.Enqueue(newreq);
deltaRequests.Add(newreq);
lock(deltareqLock)
{
@ -713,9 +713,11 @@ namespace OpenSim.Region.CoreModules.World.Estate
bool sentGroupsFull = false;
bool sentManagersFull = false;
EstateAccessDeltaRequest req;
while(Scene.IsRunning)
{
EstateAccessDeltaRequest req = deltaRequests.Dequeue(500);
req = null;
deltaRequests.TryTake(out req, 500);
if(!Scene.IsRunning)
break;
@ -757,7 +759,7 @@ namespace OpenSim.Region.CoreModules.World.Estate
changed.Clear();
lock(deltareqLock)
{
if(deltaRequests.Count() != 0)
if(deltaRequests.Count != 0)
continue;
runnigDeltaExec = false;
return;

View File

@ -27,6 +27,7 @@
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Drawing;
using System.Drawing.Imaging;
@ -67,7 +68,7 @@ namespace OpenSim.Region.CoreModules.World.WorldMap
private static readonly string DEFAULT_WORLD_MAP_EXPORT_PATH = "exportmap.jpg";
private static readonly UUID STOP_UUID = UUID.Random();
private OpenSim.Framework.BlockingQueue<MapRequestState> requests = new OpenSim.Framework.BlockingQueue<MapRequestState>();
private BlockingCollection<MapRequestState> requests = new BlockingCollection<MapRequestState>();
private ManualResetEvent m_mapBlockRequestEvent = new ManualResetEvent(false);
private Dictionary<UUID, Queue<MapBlockRequestData>> m_mapBlockRequests = new Dictionary<UUID, Queue<MapBlockRequestData>>();
@ -422,7 +423,7 @@ namespace OpenSim.Region.CoreModules.World.WorldMap
st.itemtype = 0;
st.regionhandle = 0;
requests.Enqueue(st);
requests.Add(st);
MapBlockRequestData req = new MapBlockRequestData();
@ -719,7 +720,7 @@ namespace OpenSim.Region.CoreModules.World.WorldMap
av = null;
st = null;
st = requests.Dequeue(4500);
requests.TryTake(out st, 4500);
Watchdog.UpdateThread();
if (st == null || st.agentID == UUID.Zero)
@ -795,8 +796,8 @@ namespace OpenSim.Region.CoreModules.World.WorldMap
else
{
// request still beeing processed, enqueue it back
requests.Enqueue(st);
if (requests.Count() < 3)
requests.Add(st);
if (requests.Count < 3)
Thread.Sleep(100);
}
}
@ -839,7 +840,7 @@ namespace OpenSim.Region.CoreModules.World.WorldMap
st.itemtype = itemtype;
st.regionhandle = regionhandle;
requests.Enqueue(st);
requests.Add(st);
}
uint[] itemTypesForcedSend = new uint[] { 6, 1, 7, 10 }; // green dots, infohub, land sells

View File

@ -3,6 +3,7 @@
*/
using System;
using System.Collections.Concurrent;
using System.Threading;
using OpenSim.Framework;
using OpenSim.Region.PhysicsModules.SharedBase;
@ -73,7 +74,7 @@ namespace OpenSim.Region.PhysicsModule.ubOde
public float MeshSculptphysicalLOD = 32;
public float MinSizeToMeshmerize = 0.1f;
private OpenSim.Framework.BlockingQueue<ODEPhysRepData> workQueue = new OpenSim.Framework.BlockingQueue<ODEPhysRepData>();
private BlockingCollection<ODEPhysRepData> workQueue = new BlockingCollection<ODEPhysRepData>();
private bool m_running;
private Thread m_thread;
@ -100,10 +101,11 @@ namespace OpenSim.Region.PhysicsModule.ubOde
private void DoWork()
{
m_mesher.ExpireFileCache();
ODEPhysRepData nextRep;
while(m_running)
{
ODEPhysRepData nextRep = workQueue.Dequeue();
workQueue.TryTake(out nextRep, -1);
if(!m_running)
return;
if (nextRep == null)
@ -132,7 +134,7 @@ namespace OpenSim.Region.PhysicsModule.ubOde
try
{
m_thread.Abort();
workQueue.Clear();
// workQueue.Dispose();
}
catch
{
@ -189,7 +191,7 @@ namespace OpenSim.Region.PhysicsModule.ubOde
repData.meshState = MeshState.loadingAsset;
repData.comand = meshWorkerCmnds.getmesh;
workQueue.Enqueue(repData);
workQueue.Add(repData);
}
}
@ -235,7 +237,7 @@ namespace OpenSim.Region.PhysicsModule.ubOde
if (needsMeshing(repData)) // no need for pbs now?
{
repData.comand = meshWorkerCmnds.changefull;
workQueue.Enqueue(repData);
workQueue.Add(repData);
}
}
else