OpenSimMirror/ThirdParty/SmartThreadPool/WorkItemsGroup.cs

362 lines
11 KiB
C#

using System;
using System.Threading;
using System.Runtime.CompilerServices;
using System.Diagnostics;
namespace Amib.Threading.Internal
{
#region WorkItemsGroup class
/// <summary>
/// Summary description for WorkItemsGroup.
/// </summary>
public class WorkItemsGroup : WorkItemsGroupBase
{
#region Private members
private readonly object _lock = new object();
/// <summary>
/// A reference to the SmartThreadPool instance that created this
/// WorkItemsGroup.
/// </summary>
private readonly SmartThreadPool _stp;
/// <summary>
/// The OnIdle event
/// </summary>
private event WorkItemsGroupIdleHandler _onIdle;
/// <summary>
/// A flag to indicate if the Work Items Group is now suspended.
/// </summary>
private bool _isSuspended;
/// <summary>
/// Defines how many work items of this WorkItemsGroup can run at once.
/// </summary>
private int _concurrency;
/// <summary>
/// Priority queue to hold work items before they are passed
/// to the SmartThreadPool.
/// </summary>
private readonly PriorityQueue _workItemsQueue;
/// <summary>
/// Indicate how many work items are waiting in the SmartThreadPool
/// queue.
/// This value is used to apply the concurrency.
/// </summary>
private int _workItemsInStpQueue;
/// <summary>
/// Indicate how many work items are currently running in the SmartThreadPool.
/// This value is used with the Cancel, to calculate if we can send new
/// work items to the STP.
/// </summary>
private int _workItemsExecutingInStp = 0;
/// <summary>
/// WorkItemsGroup start information
/// </summary>
private readonly WIGStartInfo _workItemsGroupStartInfo;
/// <summary>
/// Signaled when all of the WorkItemsGroup's work item completed.
/// </summary>
//private readonly ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);
private readonly ManualResetEvent _isIdleWaitHandle = EventWaitHandleFactory.CreateManualResetEvent(true);
/// <summary>
/// A common object for all the work items that this work items group
/// generate so we can mark them to cancel in O(1)
/// </summary>
private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
#endregion
#region Construction
public WorkItemsGroup(
SmartThreadPool stp,
int concurrency,
WIGStartInfo wigStartInfo)
{
if (concurrency <= 0)
{
throw new ArgumentOutOfRangeException(
"concurrency",
#if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE)
concurrency,
#endif
"concurrency must be greater than zero");
}
_stp = stp;
_concurrency = concurrency;
_workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo).AsReadOnly();
_workItemsQueue = new PriorityQueue();
Name = "WorkItemsGroup";
// The _workItemsInStpQueue gets the number of currently executing work items,
// because once a work item is executing, it cannot be cancelled.
_workItemsInStpQueue = _workItemsExecutingInStp;
_isSuspended = _workItemsGroupStartInfo.StartSuspended;
}
#endregion
#region WorkItemsGroupBase Overrides
public override int Concurrency
{
get { return _concurrency; }
set
{
Debug.Assert(value > 0);
int diff = value - _concurrency;
_concurrency = value;
if (diff > 0)
{
EnqueueToSTPNextNWorkItem(diff);
}
}
}
public override int WaitingCallbacks
{
get { return _workItemsQueue.Count; }
}
public override object[] GetStates()
{
lock (_lock)
{
object[] states = new object[_workItemsQueue.Count];
int i = 0;
foreach (WorkItem workItem in _workItemsQueue)
{
states[i] = workItem.GetWorkItemResult().State;
++i;
}
return states;
}
}
/// <summary>
/// WorkItemsGroup start information
/// </summary>
public override WIGStartInfo WIGStartInfo
{
get { return _workItemsGroupStartInfo; }
}
/// <summary>
/// Start the Work Items Group if it was started suspended
/// </summary>
public override void Start()
{
// If the Work Items Group already started then quit
if (!_isSuspended)
{
return;
}
_isSuspended = false;
EnqueueToSTPNextNWorkItem(Math.Min(_workItemsQueue.Count, _concurrency));
}
public override void Cancel(bool abortExecution)
{
lock (_lock)
{
_canceledWorkItemsGroup.IsCanceled = true;
_workItemsQueue.Clear();
_workItemsInStpQueue = 0;
_canceledWorkItemsGroup = new CanceledWorkItemsGroup();
}
if (abortExecution)
{
_stp.CancelAbortWorkItemsGroup(this);
}
}
/// <summary>
/// Wait for the thread pool to be idle
/// </summary>
public override bool WaitForIdle(int millisecondsTimeout)
{
SmartThreadPool.ValidateWorkItemsGroupWaitForIdle(this);
return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false);
}
public override event WorkItemsGroupIdleHandler OnIdle
{
add { _onIdle += value; }
remove { _onIdle -= value; }
}
#endregion
#region Private methods
private void RegisterToWorkItemCompletion(IWorkItemResult wir)
{
IInternalWorkItemResult iwir = (IInternalWorkItemResult)wir;
iwir.OnWorkItemStarted += OnWorkItemStartedCallback;
iwir.OnWorkItemCompleted += OnWorkItemCompletedCallback;
}
public void OnSTPIsStarting()
{
if (_isSuspended)
{
return;
}
EnqueueToSTPNextNWorkItem(_concurrency);
}
public void EnqueueToSTPNextNWorkItem(int count)
{
for (int i = 0; i < count; ++i)
{
EnqueueToSTPNextWorkItem(null, false);
}
}
private object FireOnIdle(object state)
{
FireOnIdleImpl(_onIdle);
return null;
}
[MethodImpl(MethodImplOptions.NoInlining)]
private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle)
{
if(null == onIdle)
{
return;
}
Delegate[] delegates = onIdle.GetInvocationList();
foreach(WorkItemsGroupIdleHandler eh in delegates)
{
try
{
eh(this);
}
catch { } // Suppress exceptions
}
}
private void OnWorkItemStartedCallback(WorkItem workItem)
{
lock(_lock)
{
++_workItemsExecutingInStp;
}
}
private void OnWorkItemCompletedCallback(WorkItem workItem)
{
EnqueueToSTPNextWorkItem(null, true);
}
internal override void Enqueue(WorkItem workItem)
{
EnqueueToSTPNextWorkItem(workItem);
}
private void EnqueueToSTPNextWorkItem(WorkItem workItem)
{
EnqueueToSTPNextWorkItem(workItem, false);
}
private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue)
{
lock(_lock)
{
// Got here from OnWorkItemCompletedCallback()
if (decrementWorkItemsInStpQueue)
{
--_workItemsInStpQueue;
if(_workItemsInStpQueue < 0)
{
_workItemsInStpQueue = 0;
}
--_workItemsExecutingInStp;
if(_workItemsExecutingInStp < 0)
{
_workItemsExecutingInStp = 0;
}
}
// If the work item is not null then enqueue it
if (null != workItem)
{
workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup;
RegisterToWorkItemCompletion(workItem.GetWorkItemResult());
_workItemsQueue.Enqueue(workItem);
//_stp.IncrementWorkItemsCount();
if ((1 == _workItemsQueue.Count) &&
(0 == _workItemsInStpQueue))
{
_stp.RegisterWorkItemsGroup(this);
IsIdle = false;
_isIdleWaitHandle.Reset();
}
}
// If the work items queue of the group is empty than quit
if (0 == _workItemsQueue.Count)
{
if (0 == _workItemsInStpQueue)
{
_stp.UnregisterWorkItemsGroup(this);
IsIdle = true;
_isIdleWaitHandle.Set();
if (decrementWorkItemsInStpQueue && _onIdle != null && _onIdle.GetInvocationList().Length > 0)
{
_stp.QueueWorkItem(new WorkItemCallback(FireOnIdle));
}
}
return;
}
if (!_isSuspended)
{
if (_workItemsInStpQueue < _concurrency)
{
WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem;
try
{
_stp.Enqueue(nextWorkItem);
}
catch (ObjectDisposedException e)
{
e.GetHashCode();
// The STP has been shutdown
}
++_workItemsInStpQueue;
}
}
}
}
#endregion
}
#endregion
}