OpenSimMirror/OpenSim/Framework/PriorityQueue.cs

394 lines
14 KiB
C#

/*
* Copyright (c) Contributors, http://opensimulator.org/
* See CONTRIBUTORS.TXT for a full list of copyright holders.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of the OpenSimulator Project nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE DEVELOPERS ``AS IS'' AND ANY
* EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE CONTRIBUTORS BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
using System;
using System.Collections;
using System.Collections.Generic;
using System.Reflection;
using OpenSim.Framework;
using OpenSim.Framework.Client;
using log4net;
namespace OpenSim.Framework
{
public class PriorityQueue
{
// private static readonly ILog m_log = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
public delegate bool UpdatePriorityHandler(ref uint priority, ISceneEntity entity);
/// <summary>
/// Total number of queues (priorities) available
/// </summary>
public const uint NumberOfQueues = 13; // includes immediate queues, m_queueCounts need to be set acording
/// <summary>
/// Number of queuest (priorities) that are processed immediately
/// </summary.
public const uint NumberOfImmediateQueues = 2;
// first queues are immediate, so no counts
private static readonly uint[] m_queueCounts = {0, 0, 8, 8, 5, 4, 3, 2, 1, 1, 1, 1, 1 };
// this is ava, ava, attach, <10m, 20,40,80,160m,320,640,1280, +
private MinHeap<MinHeapItem>[] m_heaps = new MinHeap<MinHeapItem>[NumberOfQueues];
private Dictionary<uint, LookupItem> m_lookupTable;
// internal state used to ensure the deqeues are spread across the priority
// queues "fairly". queuecounts is the amount to pull from each queue in
// each pass. weighted towards the higher priority queues
private uint m_nextQueue = 0;
private uint m_countFromQueue = 0;
private int m_capacity;
private int m_added;
// next request is a counter of the number of updates queued, it provides
// a total ordering on the updates coming through the queue and is more
// lightweight (and more discriminating) than tick count
private UInt64 m_nextRequest = 0;
/// <summary>
/// Lock for enqueue and dequeue operations on the priority queue
/// </summary>
private object m_syncRoot = new object();
public object SyncRoot {
get { return this.m_syncRoot; }
}
#region constructor
public PriorityQueue() : this(MinHeap<MinHeapItem>.DEFAULT_CAPACITY) { }
public PriorityQueue(int capacity)
{
m_capacity = 16;
capacity /= 4;
for (int i = 0; i < m_heaps.Length; ++i)
m_heaps[i] = new MinHeap<MinHeapItem>(capacity);
m_lookupTable = new Dictionary<uint, LookupItem>(m_capacity);
m_nextQueue = NumberOfImmediateQueues;
m_countFromQueue = m_queueCounts[m_nextQueue];
m_added = 0;
}
#endregion Constructor
#region PublicMethods
public void Close()
{
for (int i = 0; i < m_heaps.Length; ++i)
m_heaps[i] = null;
m_heaps = null;
m_lookupTable.Clear();
m_lookupTable = null;
}
/// <summary>
/// Return the number of items in the queues
/// </summary>
public int Count
{
get
{
int count = 0;
for (int i = 0; i < m_heaps.Length; ++i)
count += m_heaps[i].Count;
return count;
}
}
/// <summary>
/// Enqueue an item into the specified priority queue
/// </summary>
public bool Enqueue(uint pqueue, EntityUpdate value)
{
LookupItem lookup;
IHandle lookupH;
UInt64 entry;
uint localid = value.Entity.LocalId;
if (m_lookupTable.TryGetValue(localid, out lookup))
{
lookupH = lookup.Handle;
entry = lookup.Heap[lookupH].EntryOrder;
EntityUpdate up = lookup.Heap[lookupH].Value;
value.Update(lookup.Heap[lookupH].Value);
lookup.Heap.Remove(lookupH);
if((up.Flags & PrimUpdateFlags.CancelKill) != 0)
entry = m_nextRequest++;
pqueue = Util.Clamp<uint>(pqueue, 0, NumberOfQueues - 1);
lookup.Heap = m_heaps[pqueue];
lookup.Heap.Add(new MinHeapItem(pqueue, entry, value), ref lookup.Handle);
m_lookupTable[localid] = lookup;
return true;
}
value.Update();
entry = m_nextRequest++;
++m_added;
pqueue = Util.Clamp<uint>(pqueue, 0, NumberOfQueues - 1);
lookup.Heap = m_heaps[pqueue];
lookup.Heap.Add(new MinHeapItem(pqueue, entry, value), ref lookup.Handle);
m_lookupTable[localid] = lookup;
return true;
}
public void Remove(List<uint> ids)
{
LookupItem lookup;
foreach (uint localid in ids)
{
if (m_lookupTable.TryGetValue(localid, out lookup))
{
lookup.Heap.Remove(lookup.Handle);
m_lookupTable.Remove(localid);
}
}
if(m_lookupTable.Count == 0 && m_added > 8 * m_capacity)
{
m_lookupTable = new Dictionary<uint, LookupItem>(m_capacity);
m_added = 0;
}
}
/// <summary>
/// Remove an item from one of the queues. Specifically, it removes the
/// oldest item from the next queue in order to provide fair access to
/// all of the queues
/// </summary>
public bool TryDequeue(out EntityUpdate value)
{
// If there is anything in immediate queues, return it first no
// matter what else. Breaks fairness. But very useful.
for (int iq = 0; iq < NumberOfImmediateQueues; iq++)
{
if (m_heaps[iq].Count > 0)
{
MinHeapItem item = m_heaps[iq].RemoveMin();
m_lookupTable.Remove(item.Value.Entity.LocalId);
value = item.Value;
return true;
}
}
// To get the fair queing, we cycle through each of the
// queues when finding an element to dequeue.
// We pull (NumberOfQueues - QueueIndex) items from each queue in order
// to give lower numbered queues a higher priority and higher percentage
// of the bandwidth.
MinHeap<MinHeapItem> curheap = m_heaps[m_nextQueue];
// Check for more items to be pulled from the current queue
if (m_countFromQueue > 0 && curheap.Count > 0)
{
--m_countFromQueue;
MinHeapItem item = curheap.RemoveMin();
m_lookupTable.Remove(item.Value.Entity.LocalId);
value = item.Value;
return true;
}
// Find the next non-immediate queue with updates in it
for (uint i = NumberOfImmediateQueues; i < NumberOfQueues; ++i)
{
m_nextQueue++;
if(m_nextQueue >= NumberOfQueues)
m_nextQueue = NumberOfImmediateQueues;
curheap = m_heaps[m_nextQueue];
if (curheap.Count == 0)
continue;
m_countFromQueue = m_queueCounts[m_nextQueue];
--m_countFromQueue;
MinHeapItem item = curheap.RemoveMin();
m_lookupTable.Remove(item.Value.Entity.LocalId);
value = item.Value;
return true;
}
value = default(EntityUpdate);
if(m_lookupTable.Count == 0 && m_added > 8 * m_capacity)
{
m_lookupTable = new Dictionary<uint, LookupItem>(m_capacity);
m_added = 0;
}
return false;
}
public bool TryOrderedDequeue(out EntityUpdate value)
{
for (int iq = 0; iq < NumberOfQueues; ++iq)
{
MinHeap<MinHeapItem> curheap = m_heaps[iq];
if (curheap.Count > 0)
{
MinHeapItem item = curheap.RemoveMin();
m_lookupTable.Remove(item.Value.Entity.LocalId);
value = item.Value;
return true;
}
}
value = default(EntityUpdate);
if(m_lookupTable.Count == 0 && m_added > 8 * m_capacity)
{
m_lookupTable = new Dictionary<uint, LookupItem>(m_capacity);
m_added = 0;
}
return false;
}
/// <summary>
/// Reapply the prioritization function to each of the updates currently
/// stored in the priority queues.
/// </summary
public void Reprioritize(UpdatePriorityHandler handler)
{
MinHeapItem item;
uint pqueue = 0;
foreach (LookupItem lookup in new List<LookupItem>(m_lookupTable.Values))
{
if (lookup.Heap.TryGetValue(lookup.Handle, out item))
{
if (handler(ref pqueue, item.Value.Entity))
{
// unless the priority queue has changed, there is no need to modify
// the entry
pqueue = Util.Clamp<uint>(pqueue, 0, NumberOfQueues - 1);
if (pqueue != item.PriorityQueue)
{
lookup.Heap.Remove(lookup.Handle);
LookupItem litem = lookup;
litem.Heap = m_heaps[pqueue];
litem.Heap.Add(new MinHeapItem(pqueue, item), ref litem.Handle);
m_lookupTable[item.Value.Entity.LocalId] = litem;
}
}
else
{
// m_log.WarnFormat("[PQUEUE]: UpdatePriorityHandler returned false for {0}",item.Value.Entity.UUID);
lookup.Heap.Remove(lookup.Handle);
m_lookupTable.Remove(item.Value.Entity.LocalId);
}
}
}
}
/// <summary>
/// </summary>
public override string ToString()
{
string s = "";
for (int i = 0; i < NumberOfQueues; i++)
s += String.Format("{0,7} ", m_heaps[i].Count);
return s;
}
#endregion PublicMethods
#region MinHeapItem
private struct MinHeapItem : IComparable<MinHeapItem>
{
private EntityUpdate value;
internal EntityUpdate Value
{
get
{
return value;
}
}
private uint pqueue;
internal uint PriorityQueue
{
get
{
return pqueue;
}
}
private UInt64 entryorder;
internal UInt64 EntryOrder
{
get
{
return entryorder;
}
}
internal MinHeapItem(uint _pqueue, MinHeapItem other)
{
entryorder = other.entryorder;
value = other.value;
pqueue = _pqueue;
}
internal MinHeapItem(uint _pqueue, UInt64 _entryorder, EntityUpdate _value)
{
entryorder = _entryorder;
value = _value;
pqueue = _pqueue;
}
public override string ToString()
{
return String.Format("[{0},{1},{2}]",pqueue,entryorder,value.Entity.LocalId);
}
public int CompareTo(MinHeapItem other)
{
// I'm assuming that the root part of an SOG is added to the update queue
// before the component parts
return Comparer<UInt64>.Default.Compare(this.EntryOrder, other.EntryOrder);
}
}
#endregion
#region LookupItem
private struct LookupItem
{
internal MinHeap<MinHeapItem> Heap;
internal IHandle Handle;
}
#endregion
}
}