513 lines
18 KiB
C#
513 lines
18 KiB
C#
// Ami Bar
|
|
// amibar@gmail.com
|
|
|
|
using System;
|
|
using System.Threading;
|
|
using System.Runtime.CompilerServices;
|
|
using System.Diagnostics;
|
|
|
|
namespace Amib.Threading.Internal
|
|
{
|
|
#region WorkItemsGroup class
|
|
|
|
/// <summary>
|
|
/// Summary description for WorkItemsGroup.
|
|
/// </summary>
|
|
public class WorkItemsGroup : IWorkItemsGroup
|
|
{
|
|
#region Private members
|
|
|
|
private object _lock = new object();
|
|
/// <summary>
|
|
/// Contains the name of this instance of SmartThreadPool.
|
|
/// Can be changed by the user.
|
|
/// </summary>
|
|
private string _name = "WorkItemsGroup";
|
|
|
|
/// <summary>
|
|
/// A reference to the SmartThreadPool instance that created this
|
|
/// WorkItemsGroup.
|
|
/// </summary>
|
|
private SmartThreadPool _stp;
|
|
|
|
/// <summary>
|
|
/// The OnIdle event
|
|
/// </summary>
|
|
private event WorkItemsGroupIdleHandler _onIdle;
|
|
|
|
/// <summary>
|
|
/// Defines how many work items of this WorkItemsGroup can run at once.
|
|
/// </summary>
|
|
private int _concurrency;
|
|
|
|
/// <summary>
|
|
/// Priority queue to hold work items before they are passed
|
|
/// to the SmartThreadPool.
|
|
/// </summary>
|
|
private PriorityQueue _workItemsQueue;
|
|
|
|
/// <summary>
|
|
/// Indicate how many work items are waiting in the SmartThreadPool
|
|
/// queue.
|
|
/// This value is used to apply the concurrency.
|
|
/// </summary>
|
|
private int _workItemsInStpQueue;
|
|
|
|
/// <summary>
|
|
/// Indicate how many work items are currently running in the SmartThreadPool.
|
|
/// This value is used with the Cancel, to calculate if we can send new
|
|
/// work items to the STP.
|
|
/// </summary>
|
|
private int _workItemsExecutingInStp = 0;
|
|
|
|
/// <summary>
|
|
/// WorkItemsGroup start information
|
|
/// </summary>
|
|
private WIGStartInfo _workItemsGroupStartInfo;
|
|
|
|
/// <summary>
|
|
/// Signaled when all of the WorkItemsGroup's work item completed.
|
|
/// </summary>
|
|
private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);
|
|
|
|
/// <summary>
|
|
/// A common object for all the work items that this work items group
|
|
/// generate so we can mark them to cancel in O(1)
|
|
/// </summary>
|
|
private CanceledWorkItemsGroup _canceledWorkItemsGroup = new CanceledWorkItemsGroup();
|
|
|
|
#endregion
|
|
|
|
#region Construction
|
|
|
|
public WorkItemsGroup(
|
|
SmartThreadPool stp,
|
|
int concurrency,
|
|
WIGStartInfo wigStartInfo)
|
|
{
|
|
if (concurrency <= 0)
|
|
{
|
|
throw new ArgumentOutOfRangeException("concurrency", concurrency, "concurrency must be greater than zero");
|
|
}
|
|
_stp = stp;
|
|
_concurrency = concurrency;
|
|
_workItemsGroupStartInfo = new WIGStartInfo(wigStartInfo);
|
|
_workItemsQueue = new PriorityQueue();
|
|
|
|
// The _workItemsInStpQueue gets the number of currently executing work items,
|
|
// because once a work item is executing, it cannot be cancelled.
|
|
_workItemsInStpQueue = _workItemsExecutingInStp;
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region IWorkItemsGroup implementation
|
|
|
|
/// <summary>
|
|
/// Get/Set the name of the SmartThreadPool instance
|
|
/// </summary>
|
|
public string Name
|
|
{
|
|
get
|
|
{
|
|
return _name;
|
|
}
|
|
|
|
set
|
|
{
|
|
_name = value;
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Queue a work item
|
|
/// </summary>
|
|
/// <param name="callback">A callback to execute</param>
|
|
/// <returns>Returns a work item result</returns>
|
|
public IWorkItemResult QueueWorkItem(WorkItemCallback callback)
|
|
{
|
|
WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback);
|
|
EnqueueToSTPNextWorkItem(workItem);
|
|
return workItem.GetWorkItemResult();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Queue a work item
|
|
/// </summary>
|
|
/// <param name="callback">A callback to execute</param>
|
|
/// <param name="workItemPriority">The priority of the work item</param>
|
|
/// <returns>Returns a work item result</returns>
|
|
public IWorkItemResult QueueWorkItem(WorkItemCallback callback, WorkItemPriority workItemPriority)
|
|
{
|
|
WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, workItemPriority);
|
|
EnqueueToSTPNextWorkItem(workItem);
|
|
return workItem.GetWorkItemResult();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Queue a work item
|
|
/// </summary>
|
|
/// <param name="workItemInfo">Work item info</param>
|
|
/// <param name="callback">A callback to execute</param>
|
|
/// <returns>Returns a work item result</returns>
|
|
public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback)
|
|
{
|
|
WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback);
|
|
EnqueueToSTPNextWorkItem(workItem);
|
|
return workItem.GetWorkItemResult();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Queue a work item
|
|
/// </summary>
|
|
/// <param name="callback">A callback to execute</param>
|
|
/// <param name="state">
|
|
/// The context object of the work item. Used for passing arguments to the work item.
|
|
/// </param>
|
|
/// <returns>Returns a work item result</returns>
|
|
public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state)
|
|
{
|
|
WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state);
|
|
EnqueueToSTPNextWorkItem(workItem);
|
|
return workItem.GetWorkItemResult();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Queue a work item
|
|
/// </summary>
|
|
/// <param name="callback">A callback to execute</param>
|
|
/// <param name="state">
|
|
/// The context object of the work item. Used for passing arguments to the work item.
|
|
/// </param>
|
|
/// <param name="workItemPriority">The work item priority</param>
|
|
/// <returns>Returns a work item result</returns>
|
|
public IWorkItemResult QueueWorkItem(WorkItemCallback callback, object state, WorkItemPriority workItemPriority)
|
|
{
|
|
WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, workItemPriority);
|
|
EnqueueToSTPNextWorkItem(workItem);
|
|
return workItem.GetWorkItemResult();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Queue a work item
|
|
/// </summary>
|
|
/// <param name="workItemInfo">Work item information</param>
|
|
/// <param name="callback">A callback to execute</param>
|
|
/// <param name="state">
|
|
/// The context object of the work item. Used for passing arguments to the work item.
|
|
/// </param>
|
|
/// <returns>Returns a work item result</returns>
|
|
public IWorkItemResult QueueWorkItem(WorkItemInfo workItemInfo, WorkItemCallback callback, object state)
|
|
{
|
|
WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, workItemInfo, callback, state);
|
|
EnqueueToSTPNextWorkItem(workItem);
|
|
return workItem.GetWorkItemResult();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Queue a work item
|
|
/// </summary>
|
|
/// <param name="callback">A callback to execute</param>
|
|
/// <param name="state">
|
|
/// The context object of the work item. Used for passing arguments to the work item.
|
|
/// </param>
|
|
/// <param name="postExecuteWorkItemCallback">
|
|
/// A delegate to call after the callback completion
|
|
/// </param>
|
|
/// <returns>Returns a work item result</returns>
|
|
public IWorkItemResult QueueWorkItem(
|
|
WorkItemCallback callback,
|
|
object state,
|
|
PostExecuteWorkItemCallback postExecuteWorkItemCallback)
|
|
{
|
|
WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback);
|
|
EnqueueToSTPNextWorkItem(workItem);
|
|
return workItem.GetWorkItemResult();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Queue a work item
|
|
/// </summary>
|
|
/// <param name="callback">A callback to execute</param>
|
|
/// <param name="state">
|
|
/// The context object of the work item. Used for passing arguments to the work item.
|
|
/// </param>
|
|
/// <param name="postExecuteWorkItemCallback">
|
|
/// A delegate to call after the callback completion
|
|
/// </param>
|
|
/// <param name="workItemPriority">The work item priority</param>
|
|
/// <returns>Returns a work item result</returns>
|
|
public IWorkItemResult QueueWorkItem(
|
|
WorkItemCallback callback,
|
|
object state,
|
|
PostExecuteWorkItemCallback postExecuteWorkItemCallback,
|
|
WorkItemPriority workItemPriority)
|
|
{
|
|
WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, workItemPriority);
|
|
EnqueueToSTPNextWorkItem(workItem);
|
|
return workItem.GetWorkItemResult();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Queue a work item
|
|
/// </summary>
|
|
/// <param name="callback">A callback to execute</param>
|
|
/// <param name="state">
|
|
/// The context object of the work item. Used for passing arguments to the work item.
|
|
/// </param>
|
|
/// <param name="postExecuteWorkItemCallback">
|
|
/// A delegate to call after the callback completion
|
|
/// </param>
|
|
/// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param>
|
|
/// <returns>Returns a work item result</returns>
|
|
public IWorkItemResult QueueWorkItem(
|
|
WorkItemCallback callback,
|
|
object state,
|
|
PostExecuteWorkItemCallback postExecuteWorkItemCallback,
|
|
CallToPostExecute callToPostExecute)
|
|
{
|
|
WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute);
|
|
EnqueueToSTPNextWorkItem(workItem);
|
|
return workItem.GetWorkItemResult();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Queue a work item
|
|
/// </summary>
|
|
/// <param name="callback">A callback to execute</param>
|
|
/// <param name="state">
|
|
/// The context object of the work item. Used for passing arguments to the work item.
|
|
/// </param>
|
|
/// <param name="postExecuteWorkItemCallback">
|
|
/// A delegate to call after the callback completion
|
|
/// </param>
|
|
/// <param name="callToPostExecute">Indicates on which cases to call to the post execute callback</param>
|
|
/// <param name="workItemPriority">The work item priority</param>
|
|
/// <returns>Returns a work item result</returns>
|
|
public IWorkItemResult QueueWorkItem(
|
|
WorkItemCallback callback,
|
|
object state,
|
|
PostExecuteWorkItemCallback postExecuteWorkItemCallback,
|
|
CallToPostExecute callToPostExecute,
|
|
WorkItemPriority workItemPriority)
|
|
{
|
|
WorkItem workItem = WorkItemFactory.CreateWorkItem(this, _workItemsGroupStartInfo, callback, state, postExecuteWorkItemCallback, callToPostExecute, workItemPriority);
|
|
EnqueueToSTPNextWorkItem(workItem);
|
|
return workItem.GetWorkItemResult();
|
|
}
|
|
|
|
/// <summary>
|
|
/// Wait for the thread pool to be idle
|
|
/// </summary>
|
|
public void WaitForIdle()
|
|
{
|
|
WaitForIdle(Timeout.Infinite);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Wait for the thread pool to be idle
|
|
/// </summary>
|
|
public bool WaitForIdle(TimeSpan timeout)
|
|
{
|
|
return WaitForIdle((int)timeout.TotalMilliseconds);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Wait for the thread pool to be idle
|
|
/// </summary>
|
|
public bool WaitForIdle(int millisecondsTimeout)
|
|
{
|
|
_stp.ValidateWorkItemsGroupWaitForIdle(this);
|
|
return _isIdleWaitHandle.WaitOne(millisecondsTimeout, false);
|
|
}
|
|
|
|
public int WaitingCallbacks
|
|
{
|
|
get
|
|
{
|
|
return _workItemsQueue.Count;
|
|
}
|
|
}
|
|
|
|
public event WorkItemsGroupIdleHandler OnIdle
|
|
{
|
|
add
|
|
{
|
|
_onIdle += value;
|
|
}
|
|
remove
|
|
{
|
|
_onIdle -= value;
|
|
}
|
|
}
|
|
|
|
public void Cancel()
|
|
{
|
|
lock(_lock)
|
|
{
|
|
_canceledWorkItemsGroup.IsCanceled = true;
|
|
_workItemsQueue.Clear();
|
|
_workItemsInStpQueue = 0;
|
|
_canceledWorkItemsGroup = new CanceledWorkItemsGroup();
|
|
}
|
|
}
|
|
|
|
public void Start()
|
|
{
|
|
lock (this)
|
|
{
|
|
if (!_workItemsGroupStartInfo.StartSuspended)
|
|
{
|
|
return;
|
|
}
|
|
_workItemsGroupStartInfo.StartSuspended = false;
|
|
}
|
|
|
|
for(int i = 0; i < _concurrency; ++i)
|
|
{
|
|
EnqueueToSTPNextWorkItem(null, false);
|
|
}
|
|
}
|
|
|
|
#endregion
|
|
|
|
#region Private methods
|
|
|
|
private void RegisterToWorkItemCompletion(IWorkItemResult wir)
|
|
{
|
|
IInternalWorkItemResult iwir = wir as IInternalWorkItemResult;
|
|
iwir.OnWorkItemStarted += new WorkItemStateCallback(OnWorkItemStartedCallback);
|
|
iwir.OnWorkItemCompleted += new WorkItemStateCallback(OnWorkItemCompletedCallback);
|
|
}
|
|
|
|
public void OnSTPIsStarting()
|
|
{
|
|
lock (this)
|
|
{
|
|
if (_workItemsGroupStartInfo.StartSuspended)
|
|
{
|
|
return;
|
|
}
|
|
}
|
|
|
|
for(int i = 0; i < _concurrency; ++i)
|
|
{
|
|
EnqueueToSTPNextWorkItem(null, false);
|
|
}
|
|
}
|
|
|
|
private object FireOnIdle(object state)
|
|
{
|
|
FireOnIdleImpl(_onIdle);
|
|
return null;
|
|
}
|
|
|
|
[MethodImpl(MethodImplOptions.NoInlining)]
|
|
private void FireOnIdleImpl(WorkItemsGroupIdleHandler onIdle)
|
|
{
|
|
if(null == onIdle)
|
|
{
|
|
return;
|
|
}
|
|
|
|
Delegate[] delegates = onIdle.GetInvocationList();
|
|
foreach(WorkItemsGroupIdleHandler eh in delegates)
|
|
{
|
|
try
|
|
{
|
|
eh(this);
|
|
}
|
|
// Ignore exceptions
|
|
catch{}
|
|
}
|
|
}
|
|
|
|
private void OnWorkItemStartedCallback(WorkItem workItem)
|
|
{
|
|
lock(_lock)
|
|
{
|
|
++_workItemsExecutingInStp;
|
|
}
|
|
}
|
|
|
|
private void OnWorkItemCompletedCallback(WorkItem workItem)
|
|
{
|
|
EnqueueToSTPNextWorkItem(null, true);
|
|
}
|
|
|
|
private void EnqueueToSTPNextWorkItem(WorkItem workItem)
|
|
{
|
|
EnqueueToSTPNextWorkItem(workItem, false);
|
|
}
|
|
|
|
private void EnqueueToSTPNextWorkItem(WorkItem workItem, bool decrementWorkItemsInStpQueue)
|
|
{
|
|
lock(_lock)
|
|
{
|
|
// Got here from OnWorkItemCompletedCallback()
|
|
if (decrementWorkItemsInStpQueue)
|
|
{
|
|
--_workItemsInStpQueue;
|
|
|
|
if(_workItemsInStpQueue < 0)
|
|
{
|
|
_workItemsInStpQueue = 0;
|
|
}
|
|
|
|
--_workItemsExecutingInStp;
|
|
|
|
if(_workItemsExecutingInStp < 0)
|
|
{
|
|
_workItemsExecutingInStp = 0;
|
|
}
|
|
}
|
|
|
|
// If the work item is not null then enqueue it
|
|
if (null != workItem)
|
|
{
|
|
workItem.CanceledWorkItemsGroup = _canceledWorkItemsGroup;
|
|
|
|
RegisterToWorkItemCompletion(workItem.GetWorkItemResult());
|
|
_workItemsQueue.Enqueue(workItem);
|
|
//_stp.IncrementWorkItemsCount();
|
|
|
|
if ((1 == _workItemsQueue.Count) &&
|
|
(0 == _workItemsInStpQueue))
|
|
{
|
|
_stp.RegisterWorkItemsGroup(this);
|
|
Trace.WriteLine("WorkItemsGroup " + Name + " is NOT idle");
|
|
_isIdleWaitHandle.Reset();
|
|
}
|
|
}
|
|
|
|
// If the work items queue of the group is empty than quit
|
|
if (0 == _workItemsQueue.Count)
|
|
{
|
|
if (0 == _workItemsInStpQueue)
|
|
{
|
|
_stp.UnregisterWorkItemsGroup(this);
|
|
Trace.WriteLine("WorkItemsGroup " + Name + " is idle");
|
|
_isIdleWaitHandle.Set();
|
|
_stp.QueueWorkItem(new WorkItemCallback(this.FireOnIdle));
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (!_workItemsGroupStartInfo.StartSuspended)
|
|
{
|
|
if (_workItemsInStpQueue < _concurrency)
|
|
{
|
|
WorkItem nextWorkItem = _workItemsQueue.Dequeue() as WorkItem;
|
|
_stp.Enqueue(nextWorkItem, true);
|
|
++_workItemsInStpQueue;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#endregion
|
|
}
|
|
|
|
#endregion
|
|
}
|