2013-05-01 22:00:46 +00:00
#region Release History
// Smart Thread Pool
// 7 Aug 2004 - Initial release
/ /
// 14 Sep 2004 - Bug fixes
/ /
// 15 Oct 2004 - Added new features
// - Work items return result.
// - Support waiting synchronization for multiple work items.
// - Work items can be cancelled.
// - Passage of the caller thread’ s context to the thread in the pool.
// - Minimal usage of WIN32 handles.
// - Minor bug fixes.
/ /
// 26 Dec 2004 - Changes:
// - Removed static constructors.
// - Added finalizers.
// - Changed Exceptions so they are serializable.
// - Fixed the bug in one of the SmartThreadPool constructors.
// - Changed the SmartThreadPool.WaitAll() so it will support any number of waiters.
// The SmartThreadPool.WaitAny() is still limited by the .NET Framework.
// - Added PostExecute with options on which cases to call it.
// - Added option to dispose of the state objects.
// - Added a WaitForIdle() method that waits until the work items queue is empty.
// - Added an STPStartInfo class for the initialization of the thread pool.
// - Changed exception handling so if a work item throws an exception it
// is rethrown at GetResult(), rather then firing an UnhandledException event.
// Note that PostExecute exception are always ignored.
/ /
// 25 Mar 2005 - Changes:
// - Fixed lost of work items bug
/ /
// 3 Jul 2005: Changes.
// - Fixed bug where Enqueue() throws an exception because PopWaiter() returned null, hardly reconstructed.
/ /
// 16 Aug 2005: Changes.
// - Fixed bug where the InUseThreads becomes negative when canceling work items.
/ /
// 31 Jan 2006 - Changes:
// - Added work items priority
// - Removed support of chained delegates in callbacks and post executes (nobody really use this)
// - Added work items groups
// - Added work items groups idle event
// - Changed SmartThreadPool.WaitAll() behavior so when it gets empty array
// it returns true rather then throwing an exception.
// - Added option to start the STP and the WIG as suspended
// - Exception behavior changed, the real exception is returned by an
// inner exception
// - Added option to keep the Http context of the caller thread. (Thanks to Steven T.)
// - Added performance counters
// - Added priority to the threads in the pool
/ /
// 13 Feb 2006 - Changes:
// - Added a call to the dispose of the Performance Counter so
// their won't be a Performance Counter leak.
// - Added exception catch in case the Performance Counters cannot
// be created.
/ /
// 17 May 2008 - Changes:
// - Changed the dispose behavior and removed the Finalizers.
// - Enabled the change of the MaxThreads and MinThreads at run time.
// - Enabled the change of the Concurrency of a IWorkItemsGroup at run
// time If the IWorkItemsGroup is a SmartThreadPool then the Concurrency
// refers to the MaxThreads.
// - Improved the cancel behavior.
// - Added events for thread creation and termination.
// - Fixed the HttpContext context capture.
// - Changed internal collections so they use generic collections
// - Added IsIdle flag to the SmartThreadPool and IWorkItemsGroup
// - Added support for WinCE
// - Added support for Action<T> and Func<T>
/ /
// 07 April 2009 - Changes:
// - Added support for Silverlight and Mono
// - Added Join, Choice, and Pipe to SmartThreadPool.
// - Added local performance counters (for Mono, Silverlight, and WindowsCE)
// - Changed duration measures from DateTime.Now to Stopwatch.
// - Queues changed from System.Collections.Queue to System.Collections.Generic.LinkedList<T>.
/ /
// 21 December 2009 - Changes:
// - Added work item timeout (passive)
/ /
// 20 August 2012 - Changes:
// - Added set name to threads
// - Fixed the WorkItemsQueue.Dequeue.
// Replaced while (!Monitor.TryEnter(this)); with lock(this) { ... }
// - Fixed SmartThreadPool.Pipe
// - Added IsBackground option to threads
// - Added ApartmentState to threads
// - Fixed thread creation when queuing many work items at the same time.
/ /
// 24 August 2012 - Changes:
// - Enabled cancel abort after cancel. See: http://smartthreadpool.codeplex.com/discussions/345937 by alecswan
// - Added option to set MaxStackSize of threads
# endregion
using System ;
using System.Security ;
using System.Threading ;
using System.Collections ;
using System.Collections.Generic ;
using System.Diagnostics ;
using System.Runtime.CompilerServices ;
using Amib.Threading.Internal ;
namespace Amib.Threading
{
#region SmartThreadPool class
/// <summary>
/// Smart thread pool class.
/// </summary>
public partial class SmartThreadPool : WorkItemsGroupBase , IDisposable
{
#region Public Default Constants
/// <summary>
/// Default minimum number of threads the thread pool contains. (0)
/// </summary>
public const int DefaultMinWorkerThreads = 0 ;
/// <summary>
/// Default maximum number of threads the thread pool contains. (25)
/// </summary>
public const int DefaultMaxWorkerThreads = 25 ;
/// <summary>
/// Default idle timeout in milliseconds. (One minute)
/// </summary>
public const int DefaultIdleTimeout = 60 * 1000 ; // One minute
/// <summary>
/// Indicate to copy the security context of the caller and then use it in the call. (false)
/// </summary>
public const bool DefaultUseCallerCallContext = false ;
/// <summary>
/// Indicate to copy the HTTP context of the caller and then use it in the call. (false)
/// </summary>
public const bool DefaultUseCallerHttpContext = false ;
/// <summary>
/// Indicate to dispose of the state objects if they support the IDispose interface. (false)
/// </summary>
public const bool DefaultDisposeOfStateObjects = false ;
/// <summary>
/// The default option to run the post execute (CallToPostExecute.Always)
/// </summary>
public const CallToPostExecute DefaultCallToPostExecute = CallToPostExecute . Always ;
/// <summary>
/// The default post execute method to run. (None)
/// When null it means not to call it.
/// </summary>
public static readonly PostExecuteWorkItemCallback DefaultPostExecuteWorkItemCallback ;
/// <summary>
/// The default work item priority (WorkItemPriority.Normal)
/// </summary>
public const WorkItemPriority DefaultWorkItemPriority = WorkItemPriority . Normal ;
/// <summary>
/// The default is to work on work items as soon as they arrive
/// and not to wait for the start. (false)
/// </summary>
public const bool DefaultStartSuspended = false ;
/// <summary>
/// The default name to use for the performance counters instance. (null)
/// </summary>
public static readonly string DefaultPerformanceCounterInstanceName ;
#if !(WINDOWS_PHONE)
/// <summary>
/// The default thread priority (ThreadPriority.Normal)
/// </summary>
public const ThreadPriority DefaultThreadPriority = ThreadPriority . Normal ;
# endif
/// <summary>
/// The default thread pool name. (SmartThreadPool)
/// </summary>
public const string DefaultThreadPoolName = "SmartThreadPool" ;
/// <summary>
/// The default Max Stack Size. (SmartThreadPool)
/// </summary>
public static readonly int? DefaultMaxStackSize = null ;
/// <summary>
/// The default fill state with params. (false)
/// It is relevant only to QueueWorkItem of Action<...>/Func<...>
/// </summary>
public const bool DefaultFillStateWithArgs = false ;
/// <summary>
/// The default thread backgroundness. (true)
/// </summary>
public const bool DefaultAreThreadsBackground = true ;
#if !(_SILVERLIGHT) && !(WINDOWS_PHONE)
/// <summary>
/// The default apartment state of a thread in the thread pool.
/// The default is ApartmentState.Unknown which means the STP will not
/// set the apartment of the thread. It will use the .NET default.
/// </summary>
public const ApartmentState DefaultApartmentState = ApartmentState . Unknown ;
# endif
# endregion
#region Member Variables
/// <summary>
/// Dictionary of all the threads in the thread pool.
/// </summary>
private readonly SynchronizedDictionary < Thread , ThreadEntry > _workerThreads = new SynchronizedDictionary < Thread , ThreadEntry > ( ) ;
/// <summary>
/// Queue of work items.
/// </summary>
private readonly WorkItemsQueue _workItemsQueue = new WorkItemsQueue ( ) ;
/// <summary>
/// Count the work items handled.
/// Used by the performance counter.
/// </summary>
private int _workItemsProcessed ;
/// <summary>
/// Number of threads that currently work (not idle).
/// </summary>
private int _inUseWorkerThreads ;
/// <summary>
/// Stores a copy of the original STPStartInfo.
/// It is used to change the MinThread and MaxThreads
/// </summary>
private STPStartInfo _stpStartInfo ;
/// <summary>
/// Total number of work items that are stored in the work items queue
/// plus the work items that the threads in the pool are working on.
/// </summary>
private int _currentWorkItemsCount ;
/// <summary>
/// Signaled when the thread pool is idle, i.e. no thread is busy
/// and the work items queue is empty
/// </summary>
//private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true);
private ManualResetEvent _isIdleWaitHandle = EventWaitHandleFactory . CreateManualResetEvent ( true ) ;
/// <summary>
/// An event to signal all the threads to quit immediately.
/// </summary>
//private ManualResetEvent _shuttingDownEvent = new ManualResetEvent(false);
private ManualResetEvent _shuttingDownEvent = EventWaitHandleFactory . CreateManualResetEvent ( false ) ;
/// <summary>
/// A flag to indicate if the Smart Thread Pool is now suspended.
/// </summary>
private bool _isSuspended ;
/// <summary>
/// A flag to indicate the threads to quit.
/// </summary>
private bool _shutdown ;
/// <summary>
/// Counts the threads created in the pool.
/// It is used to name the threads.
/// </summary>
private int _threadCounter ;
/// <summary>
/// Indicate that the SmartThreadPool has been disposed
/// </summary>
private bool _isDisposed ;
/// <summary>
/// Holds all the WorkItemsGroup instaces that have at least one
/// work item int the SmartThreadPool
/// This variable is used in case of Shutdown
/// </summary>
private readonly SynchronizedDictionary < IWorkItemsGroup , IWorkItemsGroup > _workItemsGroups = new SynchronizedDictionary < IWorkItemsGroup , IWorkItemsGroup > ( ) ;
/// <summary>
/// A common object for all the work items int the STP
/// so we can mark them to cancel in O(1)
/// </summary>
private CanceledWorkItemsGroup _canceledSmartThreadPool = new CanceledWorkItemsGroup ( ) ;
/// <summary>
/// Windows STP performance counters
/// </summary>
private ISTPInstancePerformanceCounters _windowsPCs = NullSTPInstancePerformanceCounters . Instance ;
/// <summary>
/// Local STP performance counters
/// </summary>
private ISTPInstancePerformanceCounters _localPCs = NullSTPInstancePerformanceCounters . Instance ;
#if (WINDOWS_PHONE)
private static readonly Dictionary < int , ThreadEntry > _threadEntries = new Dictionary < int , ThreadEntry > ( ) ;
#elif (_WINDOWS_CE)
private static LocalDataStoreSlot _threadEntrySlot = Thread . AllocateDataSlot ( ) ;
# else
[ThreadStatic]
private static ThreadEntry _threadEntry ;
# endif
/// <summary>
/// An event to call after a thread is created, but before
/// it's first use.
/// </summary>
private event ThreadInitializationHandler _onThreadInitialization ;
/// <summary>
/// An event to call when a thread is about to exit, after
/// it is no longer belong to the pool.
/// </summary>
private event ThreadTerminationHandler _onThreadTermination ;
# endregion
#region Per thread properties
/// <summary>
/// A reference to the current work item a thread from the thread pool
/// is executing.
/// </summary>
internal static ThreadEntry CurrentThreadEntry
{
#if (WINDOWS_PHONE)
get
{
lock ( _threadEntries )
{
ThreadEntry threadEntry ;
if ( _threadEntries . TryGetValue ( Thread . CurrentThread . ManagedThreadId , out threadEntry ) )
{
return threadEntry ;
}
}
return null ;
}
set
{
lock ( _threadEntries )
{
_threadEntries [ Thread . CurrentThread . ManagedThreadId ] = value ;
}
}
#elif (_WINDOWS_CE)
get
{
//Thread.CurrentThread.ManagedThreadId
return Thread . GetData ( _threadEntrySlot ) as ThreadEntry ;
}
set
{
Thread . SetData ( _threadEntrySlot , value ) ;
}
# else
get
{
return _threadEntry ;
}
set
{
_threadEntry = value ;
}
# endif
}
# endregion
#region Construction and Finalization
/// <summary>
/// Constructor
/// </summary>
public SmartThreadPool ( )
{
_stpStartInfo = new STPStartInfo ( ) ;
Initialize ( ) ;
}
/// <summary>
/// Constructor
/// </summary>
/// <param name="idleTimeout">Idle timeout in milliseconds</param>
public SmartThreadPool ( int idleTimeout )
{
_stpStartInfo = new STPStartInfo
{
IdleTimeout = idleTimeout ,
} ;
Initialize ( ) ;
}
/// <summary>
/// Constructor
/// </summary>
/// <param name="idleTimeout">Idle timeout in milliseconds</param>
/// <param name="maxWorkerThreads">Upper limit of threads in the pool</param>
public SmartThreadPool (
int idleTimeout ,
int maxWorkerThreads )
{
_stpStartInfo = new STPStartInfo
{
IdleTimeout = idleTimeout ,
MaxWorkerThreads = maxWorkerThreads ,
} ;
Initialize ( ) ;
}
/// <summary>
/// Constructor
/// </summary>
/// <param name="idleTimeout">Idle timeout in milliseconds</param>
/// <param name="maxWorkerThreads">Upper limit of threads in the pool</param>
/// <param name="minWorkerThreads">Lower limit of threads in the pool</param>
public SmartThreadPool (
int idleTimeout ,
int maxWorkerThreads ,
int minWorkerThreads )
{
_stpStartInfo = new STPStartInfo
{
IdleTimeout = idleTimeout ,
MaxWorkerThreads = maxWorkerThreads ,
MinWorkerThreads = minWorkerThreads ,
} ;
Initialize ( ) ;
}
/// <summary>
/// Constructor
/// </summary>
/// <param name="stpStartInfo">A SmartThreadPool configuration that overrides the default behavior</param>
public SmartThreadPool ( STPStartInfo stpStartInfo )
{
_stpStartInfo = new STPStartInfo ( stpStartInfo ) ;
Initialize ( ) ;
}
private void Initialize ( )
{
Name = _stpStartInfo . ThreadPoolName ;
ValidateSTPStartInfo ( ) ;
// _stpStartInfoRW stores a read/write copy of the STPStartInfo.
// Actually only MaxWorkerThreads and MinWorkerThreads are overwritten
_isSuspended = _stpStartInfo . StartSuspended ;
#if (_WINDOWS_CE) || (_SILVERLIGHT) || (_MONO) || (WINDOWS_PHONE)
if ( null ! = _stpStartInfo . PerformanceCounterInstanceName )
{
throw new NotSupportedException ( "Performance counters are not implemented for Compact Framework/Silverlight/Mono, instead use StpStartInfo.EnableLocalPerformanceCounters" ) ;
}
# else
if ( null ! = _stpStartInfo . PerformanceCounterInstanceName )
{
try
{
_windowsPCs = new STPInstancePerformanceCounters ( _stpStartInfo . PerformanceCounterInstanceName ) ;
}
catch ( Exception e )
{
Debug . WriteLine ( "Unable to create Performance Counters: " + e ) ;
_windowsPCs = NullSTPInstancePerformanceCounters . Instance ;
}
}
# endif
if ( _stpStartInfo . EnableLocalPerformanceCounters )
{
_localPCs = new LocalSTPInstancePerformanceCounters ( ) ;
}
// If the STP is not started suspended then start the threads.
if ( ! _isSuspended )
{
StartOptimalNumberOfThreads ( ) ;
}
}
private void StartOptimalNumberOfThreads ( )
{
int threadsCount = Math . Max ( _workItemsQueue . Count , _stpStartInfo . MinWorkerThreads ) ;
threadsCount = Math . Min ( threadsCount , _stpStartInfo . MaxWorkerThreads ) ;
threadsCount - = _workerThreads . Count ;
if ( threadsCount > 0 )
{
StartThreads ( threadsCount ) ;
}
}
private void ValidateSTPStartInfo ( )
{
if ( _stpStartInfo . MinWorkerThreads < 0 )
{
throw new ArgumentOutOfRangeException (
"MinWorkerThreads" , "MinWorkerThreads cannot be negative" ) ;
}
if ( _stpStartInfo . MaxWorkerThreads < = 0 )
{
throw new ArgumentOutOfRangeException (
"MaxWorkerThreads" , "MaxWorkerThreads must be greater than zero" ) ;
}
if ( _stpStartInfo . MinWorkerThreads > _stpStartInfo . MaxWorkerThreads )
{
throw new ArgumentOutOfRangeException (
"MinWorkerThreads, maxWorkerThreads" ,
"MaxWorkerThreads must be greater or equal to MinWorkerThreads" ) ;
}
}
private static void ValidateCallback ( Delegate callback )
{
if ( callback . GetInvocationList ( ) . Length > 1 )
{
throw new NotSupportedException ( "SmartThreadPool doesn't support delegates chains" ) ;
}
}
# endregion
#region Thread Processing
/// <summary>
/// Waits on the queue for a work item, shutdown, or timeout.
/// </summary>
/// <returns>
/// Returns the WaitingCallback or null in case of timeout or shutdown.
/// </returns>
private WorkItem Dequeue ( )
{
WorkItem workItem =
_workItemsQueue . DequeueWorkItem ( _stpStartInfo . IdleTimeout , _shuttingDownEvent ) ;
return workItem ;
}
/// <summary>
/// Put a new work item in the queue
/// </summary>
/// <param name="workItem">A work item to queue</param>
internal override void Enqueue ( WorkItem workItem )
{
// Make sure the workItem is not null
Debug . Assert ( null ! = workItem ) ;
IncrementWorkItemsCount ( ) ;
workItem . CanceledSmartThreadPool = _canceledSmartThreadPool ;
_workItemsQueue . EnqueueWorkItem ( workItem ) ;
workItem . WorkItemIsQueued ( ) ;
// If all the threads are busy then try to create a new one
if ( _currentWorkItemsCount > _workerThreads . Count )
{
StartThreads ( 1 ) ;
}
}
private void IncrementWorkItemsCount ( )
{
_windowsPCs . SampleWorkItems ( _workItemsQueue . Count , _workItemsProcessed ) ;
_localPCs . SampleWorkItems ( _workItemsQueue . Count , _workItemsProcessed ) ;
int count = Interlocked . Increment ( ref _currentWorkItemsCount ) ;
//Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString());
if ( count = = 1 )
{
IsIdle = false ;
_isIdleWaitHandle . Reset ( ) ;
}
}
private void DecrementWorkItemsCount ( )
{
int count = Interlocked . Decrement ( ref _currentWorkItemsCount ) ;
//Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString());
if ( count = = 0 )
{
IsIdle = true ;
_isIdleWaitHandle . Set ( ) ;
}
Interlocked . Increment ( ref _workItemsProcessed ) ;
if ( ! _shutdown )
{
// The counter counts even if the work item was cancelled
_windowsPCs . SampleWorkItems ( _workItemsQueue . Count , _workItemsProcessed ) ;
_localPCs . SampleWorkItems ( _workItemsQueue . Count , _workItemsProcessed ) ;
}
}
internal void RegisterWorkItemsGroup ( IWorkItemsGroup workItemsGroup )
{
_workItemsGroups [ workItemsGroup ] = workItemsGroup ;
}
internal void UnregisterWorkItemsGroup ( IWorkItemsGroup workItemsGroup )
{
if ( _workItemsGroups . Contains ( workItemsGroup ) )
{
_workItemsGroups . Remove ( workItemsGroup ) ;
}
}
/// <summary>
/// Inform that the current thread is about to quit or quiting.
/// The same thread may call this method more than once.
/// </summary>
private void InformCompleted ( )
{
// There is no need to lock the two methods together
// since only the current thread removes itself
// and the _workerThreads is a synchronized dictionary
if ( _workerThreads . Contains ( Thread . CurrentThread ) )
{
_workerThreads . Remove ( Thread . CurrentThread ) ;
_windowsPCs . SampleThreads ( _workerThreads . Count , _inUseWorkerThreads ) ;
_localPCs . SampleThreads ( _workerThreads . Count , _inUseWorkerThreads ) ;
}
}
/// <summary>
/// Starts new threads
/// </summary>
/// <param name="threadsCount">The number of threads to start</param>
private void StartThreads ( int threadsCount )
{
if ( _isSuspended )
{
return ;
}
lock ( _workerThreads . SyncRoot )
{
// Don't start threads on shut down
if ( _shutdown )
{
return ;
}
for ( int i = 0 ; i < threadsCount ; + + i )
{
// Don't create more threads then the upper limit
if ( _workerThreads . Count > = _stpStartInfo . MaxWorkerThreads )
{
return ;
}
// Create a new thread
#if (_SILVERLIGHT) || (WINDOWS_PHONE)
Thread workerThread = new Thread ( ProcessQueuedItems ) ;
# else
Thread workerThread =
_stpStartInfo . MaxStackSize . HasValue
? new Thread ( ProcessQueuedItems , _stpStartInfo . MaxStackSize . Value )
: new Thread ( ProcessQueuedItems ) ;
# endif
// Configure the new thread and start it
workerThread . Name = "STP " + Name + " Thread #" + _threadCounter ;
workerThread . IsBackground = _stpStartInfo . AreThreadsBackground ;
#if !(_SILVERLIGHT) && !(_WINDOWS_CE) && !(WINDOWS_PHONE)
if ( _stpStartInfo . ApartmentState ! = ApartmentState . Unknown )
{
workerThread . SetApartmentState ( _stpStartInfo . ApartmentState ) ;
}
# endif
#if !(_SILVERLIGHT) && !(WINDOWS_PHONE)
workerThread . Priority = _stpStartInfo . ThreadPriority ;
# endif
workerThread . Start ( ) ;
+ + _threadCounter ;
// Add it to the dictionary and update its creation time.
_workerThreads [ workerThread ] = new ThreadEntry ( this ) ;
_windowsPCs . SampleThreads ( _workerThreads . Count , _inUseWorkerThreads ) ;
_localPCs . SampleThreads ( _workerThreads . Count , _inUseWorkerThreads ) ;
}
}
}
/// <summary>
/// A worker thread method that processes work items from the work items queue.
/// </summary>
private void ProcessQueuedItems ( )
{
// Keep the entry of the dictionary as thread's variable to avoid the synchronization locks
// of the dictionary.
CurrentThreadEntry = _workerThreads [ Thread . CurrentThread ] ;
FireOnThreadInitialization ( ) ;
try
{
bool bInUseWorkerThreadsWasIncremented = false ;
// Process until shutdown.
while ( ! _shutdown )
{
// Update the last time this thread was seen alive.
// It's good for debugging.
CurrentThreadEntry . IAmAlive ( ) ;
// The following block handles the when the MaxWorkerThreads has been
// incremented by the user at run-time.
// Double lock for quit.
if ( _workerThreads . Count > _stpStartInfo . MaxWorkerThreads )
{
lock ( _workerThreads . SyncRoot )
{
if ( _workerThreads . Count > _stpStartInfo . MaxWorkerThreads )
{
// Inform that the thread is quiting and then quit.
// This method must be called within this lock or else
// more threads will quit and the thread pool will go
// below the lower limit.
InformCompleted ( ) ;
break ;
}
}
}
// Wait for a work item, shutdown, or timeout
WorkItem workItem = Dequeue ( ) ;
// Update the last time this thread was seen alive.
// It's good for debugging.
CurrentThreadEntry . IAmAlive ( ) ;
// On timeout or shut down.
if ( null = = workItem )
{
// Double lock for quit.
if ( _workerThreads . Count > _stpStartInfo . MinWorkerThreads )
{
lock ( _workerThreads . SyncRoot )
{
if ( _workerThreads . Count > _stpStartInfo . MinWorkerThreads )
{
// Inform that the thread is quiting and then quit.
// This method must be called within this lock or else
// more threads will quit and the thread pool will go
// below the lower limit.
InformCompleted ( ) ;
break ;
}
}
}
}
// If we didn't quit then skip to the next iteration.
if ( null = = workItem )
{
continue ;
}
try
{
// Initialize the value to false
bInUseWorkerThreadsWasIncremented = false ;
// Set the Current Work Item of the thread.
// Store the Current Work Item before the workItem.StartingWorkItem() is called,
// so WorkItem.Cancel can work when the work item is between InQueue and InProgress
// states.
// If the work item has been cancelled BEFORE the workItem.StartingWorkItem()
// (work item is in InQueue state) then workItem.StartingWorkItem() will return false.
// If the work item has been cancelled AFTER the workItem.StartingWorkItem() then
// (work item is in InProgress state) then the thread will be aborted
CurrentThreadEntry . CurrentWorkItem = workItem ;
// Change the state of the work item to 'in progress' if possible.
// We do it here so if the work item has been canceled we won't
// increment the _inUseWorkerThreads.
// The cancel mechanism doesn't delete items from the queue,
// it marks the work item as canceled, and when the work item
// is dequeued, we just skip it.
// If the post execute of work item is set to always or to
// call when the work item is canceled then the StartingWorkItem()
// will return true, so the post execute can run.
if ( ! workItem . StartingWorkItem ( ) )
{
continue ;
}
// Execute the callback. Make sure to accurately
// record how many callbacks are currently executing.
int inUseWorkerThreads = Interlocked . Increment ( ref _inUseWorkerThreads ) ;
_windowsPCs . SampleThreads ( _workerThreads . Count , inUseWorkerThreads ) ;
_localPCs . SampleThreads ( _workerThreads . Count , inUseWorkerThreads ) ;
// Mark that the _inUseWorkerThreads incremented, so in the finally{}
// statement we will decrement it correctly.
bInUseWorkerThreadsWasIncremented = true ;
workItem . FireWorkItemStarted ( ) ;
ExecuteWorkItem ( workItem ) ;
}
catch ( Exception ex )
{
ex . GetHashCode ( ) ;
// Do nothing
}
finally
{
workItem . DisposeOfState ( ) ;
// Set the CurrentWorkItem to null, since we
// no longer run user's code.
CurrentThreadEntry . CurrentWorkItem = null ;
// Decrement the _inUseWorkerThreads only if we had
// incremented it. Note the cancelled work items don't
// increment _inUseWorkerThreads.
if ( bInUseWorkerThreadsWasIncremented )
{
int inUseWorkerThreads = Interlocked . Decrement ( ref _inUseWorkerThreads ) ;
_windowsPCs . SampleThreads ( _workerThreads . Count , inUseWorkerThreads ) ;
_localPCs . SampleThreads ( _workerThreads . Count , inUseWorkerThreads ) ;
}
// Notify that the work item has been completed.
// WorkItemsGroup may enqueue their next work item.
workItem . FireWorkItemCompleted ( ) ;
// Decrement the number of work items here so the idle
// ManualResetEvent won't fluctuate.
DecrementWorkItemsCount ( ) ;
}
}
}
catch ( ThreadAbortException tae )
{
tae . GetHashCode ( ) ;
// Handle the abort exception gracfully.
#if !(_WINDOWS_CE) && !(_SILVERLIGHT) && !(WINDOWS_PHONE)
Thread . ResetAbort ( ) ;
# endif
}
catch ( Exception e )
{
Debug . Assert ( null ! = e ) ;
}
finally
{
InformCompleted ( ) ;
FireOnThreadTermination ( ) ;
}
}
private void ExecuteWorkItem ( WorkItem workItem )
{
_windowsPCs . SampleWorkItemsWaitTime ( workItem . WaitingTime ) ;
_localPCs . SampleWorkItemsWaitTime ( workItem . WaitingTime ) ;
try
{
workItem . Execute ( ) ;
}
finally
{
_windowsPCs . SampleWorkItemsProcessTime ( workItem . ProcessTime ) ;
_localPCs . SampleWorkItemsProcessTime ( workItem . ProcessTime ) ;
}
}
# endregion
#region Public Methods
private void ValidateWaitForIdle ( )
{
if ( null ! = CurrentThreadEntry & & CurrentThreadEntry . AssociatedSmartThreadPool = = this )
{
throw new NotSupportedException (
"WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock" ) ;
}
}
internal static void ValidateWorkItemsGroupWaitForIdle ( IWorkItemsGroup workItemsGroup )
{
if ( null = = CurrentThreadEntry )
{
return ;
}
WorkItem workItem = CurrentThreadEntry . CurrentWorkItem ;
ValidateWorkItemsGroupWaitForIdleImpl ( workItemsGroup , workItem ) ;
if ( ( null ! = workItemsGroup ) & &
( null ! = workItem ) & &
CurrentThreadEntry . CurrentWorkItem . WasQueuedBy ( workItemsGroup ) )
{
throw new NotSupportedException ( "WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock" ) ;
}
}
[MethodImpl(MethodImplOptions.NoInlining)]
private static void ValidateWorkItemsGroupWaitForIdleImpl ( IWorkItemsGroup workItemsGroup , WorkItem workItem )
{
if ( ( null ! = workItemsGroup ) & &
( null ! = workItem ) & &
workItem . WasQueuedBy ( workItemsGroup ) )
{
throw new NotSupportedException ( "WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock" ) ;
}
}
/// <summary>
/// Force the SmartThreadPool to shutdown
/// </summary>
public void Shutdown ( )
{
Shutdown ( true , 0 ) ;
}
/// <summary>
/// Force the SmartThreadPool to shutdown with timeout
/// </summary>
public void Shutdown ( bool forceAbort , TimeSpan timeout )
{
Shutdown ( forceAbort , ( int ) timeout . TotalMilliseconds ) ;
}
/// <summary>
/// Empties the queue of work items and abort the threads in the pool.
/// </summary>
public void Shutdown ( bool forceAbort , int millisecondsTimeout )
{
ValidateNotDisposed ( ) ;
ISTPInstancePerformanceCounters pcs = _windowsPCs ;
if ( NullSTPInstancePerformanceCounters . Instance ! = _windowsPCs )
{
// Set the _pcs to "null" to stop updating the performance
// counters
_windowsPCs = NullSTPInstancePerformanceCounters . Instance ;
pcs . Dispose ( ) ;
}
Thread [ ] threads ;
lock ( _workerThreads . SyncRoot )
{
// Shutdown the work items queue
_workItemsQueue . Dispose ( ) ;
// Signal the threads to exit
_shutdown = true ;
_shuttingDownEvent . Set ( ) ;
// Make a copy of the threads' references in the pool
threads = new Thread [ _workerThreads . Count ] ;
_workerThreads . Keys . CopyTo ( threads , 0 ) ;
}
int millisecondsLeft = millisecondsTimeout ;
Stopwatch stopwatch = Stopwatch . StartNew ( ) ;
//DateTime start = DateTime.UtcNow;
bool waitInfinitely = ( Timeout . Infinite = = millisecondsTimeout ) ;
bool timeout = false ;
// Each iteration we update the time left for the timeout.
foreach ( Thread thread in threads )
{
// Join don't work with negative numbers
if ( ! waitInfinitely & & ( millisecondsLeft < 0 ) )
{
timeout = true ;
break ;
}
// Wait for the thread to terminate
bool success = thread . Join ( millisecondsLeft ) ;
if ( ! success )
{
timeout = true ;
break ;
}
if ( ! waitInfinitely )
{
// Update the time left to wait
//TimeSpan ts = DateTime.UtcNow - start;
millisecondsLeft = millisecondsTimeout - ( int ) stopwatch . ElapsedMilliseconds ;
}
}
if ( timeout & & forceAbort )
{
// Abort the threads in the pool
foreach ( Thread thread in threads )
{
if ( ( thread ! = null )
#if !(_WINDOWS_CE)
& & thread . IsAlive
#endif
)
{
try
{
thread . Abort ( ) ; // Shutdown
}
catch ( SecurityException e )
{
e . GetHashCode ( ) ;
}
catch ( ThreadStateException ex )
{
ex . GetHashCode ( ) ;
// In case the thread has been terminated
// after the check if it is alive.
}
}
}
}
}
/// <summary>
/// Wait for all work items to complete
/// </summary>
/// <param name="waitableResults">Array of work item result objects</param>
/// <returns>
/// true when every work item in workItemResults has completed; otherwise false.
/// </returns>
public static bool WaitAll (
IWaitableResult [ ] waitableResults )
{
return WaitAll ( waitableResults , Timeout . Infinite , true ) ;
}
/// <summary>
/// Wait for all work items to complete
/// </summary>
/// <param name="waitableResults">Array of work item result objects</param>
/// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
/// <param name="exitContext">
/// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
/// </param>
/// <returns>
/// true when every work item in workItemResults has completed; otherwise false.
/// </returns>
public static bool WaitAll (
IWaitableResult [ ] waitableResults ,
TimeSpan timeout ,
bool exitContext )
{
return WaitAll ( waitableResults , ( int ) timeout . TotalMilliseconds , exitContext ) ;
}
/// <summary>
/// Wait for all work items to complete
/// </summary>
/// <param name="waitableResults">Array of work item result objects</param>
/// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
/// <param name="exitContext">
/// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
/// </param>
/// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
/// <returns>
/// true when every work item in workItemResults has completed; otherwise false.
/// </returns>
public static bool WaitAll (
IWaitableResult [ ] waitableResults ,
TimeSpan timeout ,
bool exitContext ,
WaitHandle cancelWaitHandle )
{
return WaitAll ( waitableResults , ( int ) timeout . TotalMilliseconds , exitContext , cancelWaitHandle ) ;
}
/// <summary>
/// Wait for all work items to complete
/// </summary>
/// <param name="waitableResults">Array of work item result objects</param>
/// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
/// <param name="exitContext">
/// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
/// </param>
/// <returns>
/// true when every work item in workItemResults has completed; otherwise false.
/// </returns>
public static bool WaitAll (
IWaitableResult [ ] waitableResults ,
int millisecondsTimeout ,
bool exitContext )
{
return WorkItem . WaitAll ( waitableResults , millisecondsTimeout , exitContext , null ) ;
}
/// <summary>
/// Wait for all work items to complete
/// </summary>
/// <param name="waitableResults">Array of work item result objects</param>
/// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
/// <param name="exitContext">
/// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
/// </param>
/// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
/// <returns>
/// true when every work item in workItemResults has completed; otherwise false.
/// </returns>
public static bool WaitAll (
IWaitableResult [ ] waitableResults ,
int millisecondsTimeout ,
bool exitContext ,
WaitHandle cancelWaitHandle )
{
return WorkItem . WaitAll ( waitableResults , millisecondsTimeout , exitContext , cancelWaitHandle ) ;
}
/// <summary>
/// Waits for any of the work items in the specified array to complete, cancel, or timeout
/// </summary>
/// <param name="waitableResults">Array of work item result objects</param>
/// <returns>
/// The array index of the work item result that satisfied the wait, or WaitTimeout if any of the work items has been canceled.
/// </returns>
public static int WaitAny (
IWaitableResult [ ] waitableResults )
{
return WaitAny ( waitableResults , Timeout . Infinite , true ) ;
}
/// <summary>
/// Waits for any of the work items in the specified array to complete, cancel, or timeout
/// </summary>
/// <param name="waitableResults">Array of work item result objects</param>
/// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
/// <param name="exitContext">
/// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
/// </param>
/// <returns>
/// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
/// </returns>
public static int WaitAny (
IWaitableResult [ ] waitableResults ,
TimeSpan timeout ,
bool exitContext )
{
return WaitAny ( waitableResults , ( int ) timeout . TotalMilliseconds , exitContext ) ;
}
/// <summary>
/// Waits for any of the work items in the specified array to complete, cancel, or timeout
/// </summary>
/// <param name="waitableResults">Array of work item result objects</param>
/// <param name="timeout">The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. </param>
/// <param name="exitContext">
/// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
/// </param>
/// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
/// <returns>
/// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
/// </returns>
public static int WaitAny (
IWaitableResult [ ] waitableResults ,
TimeSpan timeout ,
bool exitContext ,
WaitHandle cancelWaitHandle )
{
return WaitAny ( waitableResults , ( int ) timeout . TotalMilliseconds , exitContext , cancelWaitHandle ) ;
}
/// <summary>
/// Waits for any of the work items in the specified array to complete, cancel, or timeout
/// </summary>
/// <param name="waitableResults">Array of work item result objects</param>
/// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
/// <param name="exitContext">
/// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
/// </param>
/// <returns>
/// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
/// </returns>
public static int WaitAny (
IWaitableResult [ ] waitableResults ,
int millisecondsTimeout ,
bool exitContext )
{
return WorkItem . WaitAny ( waitableResults , millisecondsTimeout , exitContext , null ) ;
}
/// <summary>
/// Waits for any of the work items in the specified array to complete, cancel, or timeout
/// </summary>
/// <param name="waitableResults">Array of work item result objects</param>
/// <param name="millisecondsTimeout">The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely.</param>
/// <param name="exitContext">
/// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false.
/// </param>
/// <param name="cancelWaitHandle">A cancel wait handle to interrupt the wait if needed</param>
/// <returns>
/// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled.
/// </returns>
public static int WaitAny (
IWaitableResult [ ] waitableResults ,
int millisecondsTimeout ,
bool exitContext ,
WaitHandle cancelWaitHandle )
{
return WorkItem . WaitAny ( waitableResults , millisecondsTimeout , exitContext , cancelWaitHandle ) ;
}
/// <summary>
/// Creates a new WorkItemsGroup.
/// </summary>
/// <param name="concurrency">The number of work items that can be run concurrently</param>
/// <returns>A reference to the WorkItemsGroup</returns>
public IWorkItemsGroup CreateWorkItemsGroup ( int concurrency )
{
IWorkItemsGroup workItemsGroup = new WorkItemsGroup ( this , concurrency , _stpStartInfo ) ;
return workItemsGroup ;
}
/// <summary>
/// Creates a new WorkItemsGroup.
/// </summary>
/// <param name="concurrency">The number of work items that can be run concurrently</param>
/// <param name="wigStartInfo">A WorkItemsGroup configuration that overrides the default behavior</param>
/// <returns>A reference to the WorkItemsGroup</returns>
public IWorkItemsGroup CreateWorkItemsGroup ( int concurrency , WIGStartInfo wigStartInfo )
{
IWorkItemsGroup workItemsGroup = new WorkItemsGroup ( this , concurrency , wigStartInfo ) ;
return workItemsGroup ;
}
#region Fire Thread's Events
private void FireOnThreadInitialization ( )
{
if ( null ! = _onThreadInitialization )
{
foreach ( ThreadInitializationHandler tih in _onThreadInitialization . GetInvocationList ( ) )
{
try
{
tih ( ) ;
}
catch ( Exception e )
{
e . GetHashCode ( ) ;
Debug . Assert ( false ) ;
throw ;
}
}
}
}
private void FireOnThreadTermination ( )
{
if ( null ! = _onThreadTermination )
{
foreach ( ThreadTerminationHandler tth in _onThreadTermination . GetInvocationList ( ) )
{
try
{
tth ( ) ;
}
catch ( Exception e )
{
e . GetHashCode ( ) ;
Debug . Assert ( false ) ;
throw ;
}
}
}
}
# endregion
/// <summary>
/// This event is fired when a thread is created.
/// Use it to initialize a thread before the work items use it.
/// </summary>
public event ThreadInitializationHandler OnThreadInitialization
{
add { _onThreadInitialization + = value ; }
remove { _onThreadInitialization - = value ; }
}
/// <summary>
/// This event is fired when a thread is terminating.
/// Use it for cleanup.
/// </summary>
public event ThreadTerminationHandler OnThreadTermination
{
add { _onThreadTermination + = value ; }
remove { _onThreadTermination - = value ; }
}
internal void CancelAbortWorkItemsGroup ( WorkItemsGroup wig )
{
foreach ( ThreadEntry threadEntry in _workerThreads . Values )
{
WorkItem workItem = threadEntry . CurrentWorkItem ;
if ( null ! = workItem & &
workItem . WasQueuedBy ( wig ) & &
! workItem . IsCanceled )
{
threadEntry . CurrentWorkItem . GetWorkItemResult ( ) . Cancel ( true ) ;
}
}
}
# endregion
#region Properties
/// <summary>
/// Get/Set the lower limit of threads in the pool.
/// </summary>
public int MinThreads
{
get
{
ValidateNotDisposed ( ) ;
return _stpStartInfo . MinWorkerThreads ;
}
set
{
Debug . Assert ( value > = 0 ) ;
Debug . Assert ( value < = _stpStartInfo . MaxWorkerThreads ) ;
if ( _stpStartInfo . MaxWorkerThreads < value )
{
_stpStartInfo . MaxWorkerThreads = value ;
}
_stpStartInfo . MinWorkerThreads = value ;
StartOptimalNumberOfThreads ( ) ;
}
}
/// <summary>
/// Get/Set the upper limit of threads in the pool.
/// </summary>
public int MaxThreads
{
get
{
ValidateNotDisposed ( ) ;
return _stpStartInfo . MaxWorkerThreads ;
}
set
{
Debug . Assert ( value > 0 ) ;
Debug . Assert ( value > = _stpStartInfo . MinWorkerThreads ) ;
if ( _stpStartInfo . MinWorkerThreads > value )
{
_stpStartInfo . MinWorkerThreads = value ;
}
_stpStartInfo . MaxWorkerThreads = value ;
StartOptimalNumberOfThreads ( ) ;
}
}
/// <summary>
/// Get the number of threads in the thread pool.
/// Should be between the lower and the upper limits.
/// </summary>
public int ActiveThreads
{
get
{
ValidateNotDisposed ( ) ;
return _workerThreads . Count ;
}
}
/// <summary>
/// Get the number of busy (not idle) threads in the thread pool.
/// </summary>
public int InUseThreads
{
get
{
ValidateNotDisposed ( ) ;
return _inUseWorkerThreads ;
}
}
/// <summary>
/// Returns true if the current running work item has been cancelled.
/// Must be used within the work item's callback method.
/// The work item should sample this value in order to know if it
/// needs to quit before its completion.
/// </summary>
public static bool IsWorkItemCanceled
{
get
{
return CurrentThreadEntry . CurrentWorkItem . IsCanceled ;
}
}
/// <summary>
/// Checks if the work item has been cancelled, and if yes then abort the thread.
/// Can be used with Cancel and timeout
/// </summary>
public static void AbortOnWorkItemCancel ( )
{
if ( IsWorkItemCanceled )
{
Thread . CurrentThread . Abort ( ) ;
}
}
/// <summary>
/// Thread Pool start information (readonly)
/// </summary>
public STPStartInfo STPStartInfo
{
get
{
return _stpStartInfo . AsReadOnly ( ) ;
}
}
public bool IsShuttingdown
{
get { return _shutdown ; }
}
/// <summary>
/// Return the local calculated performance counters
/// Available only if STPStartInfo.EnableLocalPerformanceCounters is true.
/// </summary>
public ISTPPerformanceCountersReader PerformanceCountersReader
{
get { return ( ISTPPerformanceCountersReader ) _localPCs ; }
}
# endregion
#region IDisposable Members
public void Dispose ( )
{
if ( ! _isDisposed )
{
if ( ! _shutdown )
{
Shutdown ( ) ;
}
if ( null ! = _shuttingDownEvent )
{
_shuttingDownEvent . Close ( ) ;
_shuttingDownEvent = null ;
}
_workerThreads . Clear ( ) ;
if ( null ! = _isIdleWaitHandle )
{
_isIdleWaitHandle . Close ( ) ;
_isIdleWaitHandle = null ;
}
_isDisposed = true ;
}
}
private void ValidateNotDisposed ( )
{
if ( _isDisposed )
{
throw new ObjectDisposedException ( GetType ( ) . ToString ( ) , "The SmartThreadPool has been shutdown" ) ;
}
}
# endregion
#region WorkItemsGroupBase Overrides
/// <summary>
/// Get/Set the maximum number of work items that execute cocurrency on the thread pool
/// </summary>
public override int Concurrency
{
get { return MaxThreads ; }
set { MaxThreads = value ; }
}
/// <summary>
/// Get the number of work items in the queue.
/// </summary>
public override int WaitingCallbacks
{
get
{
ValidateNotDisposed ( ) ;
return _workItemsQueue . Count ;
}
}
/// <summary>
/// Get an array with all the state objects of the currently running items.
/// The array represents a snap shot and impact performance.
/// </summary>
public override object [ ] GetStates ( )
{
object [ ] states = _workItemsQueue . GetStates ( ) ;
return states ;
}
/// <summary>
/// WorkItemsGroup start information (readonly)
/// </summary>
public override WIGStartInfo WIGStartInfo
{
get { return _stpStartInfo . AsReadOnly ( ) ; }
}
/// <summary>
/// Start the thread pool if it was started suspended.
/// If it is already running, this method is ignored.
/// </summary>
public override void Start ( )
{
if ( ! _isSuspended )
{
return ;
}
_isSuspended = false ;
ICollection workItemsGroups = _workItemsGroups . Values ;
foreach ( WorkItemsGroup workItemsGroup in workItemsGroups )
{
workItemsGroup . OnSTPIsStarting ( ) ;
}
StartOptimalNumberOfThreads ( ) ;
}
/// <summary>
/// Cancel all work items using thread abortion
/// </summary>
/// <param name="abortExecution">True to stop work items by raising ThreadAbortException</param>
public override void Cancel ( bool abortExecution )
{
_canceledSmartThreadPool . IsCanceled = true ;
_canceledSmartThreadPool = new CanceledWorkItemsGroup ( ) ;
ICollection workItemsGroups = _workItemsGroups . Values ;
foreach ( WorkItemsGroup workItemsGroup in workItemsGroups )
{
workItemsGroup . Cancel ( abortExecution ) ;
}
if ( abortExecution )
{
foreach ( ThreadEntry threadEntry in _workerThreads . Values )
{
WorkItem workItem = threadEntry . CurrentWorkItem ;
if ( null ! = workItem & &
threadEntry . AssociatedSmartThreadPool = = this & &
! workItem . IsCanceled )
{
threadEntry . CurrentWorkItem . GetWorkItemResult ( ) . Cancel ( true ) ;
}
}
}
}
/// <summary>
/// Wait for the thread pool to be idle
/// </summary>
public override bool WaitForIdle ( int millisecondsTimeout )
{
ValidateWaitForIdle ( ) ;
return STPEventWaitHandle . WaitOne ( _isIdleWaitHandle , millisecondsTimeout , false ) ;
}
/// <summary>
/// This event is fired when all work items are completed.
/// (When IsIdle changes to true)
/// This event only work on WorkItemsGroup. On SmartThreadPool
/// it throws the NotImplementedException.
/// </summary>
public override event WorkItemsGroupIdleHandler OnIdle
{
add
{
throw new NotImplementedException ( "This event is not implemented in the SmartThreadPool class. Please create a WorkItemsGroup in order to use this feature." ) ;
//_onIdle += value;
}
remove
{
throw new NotImplementedException ( "This event is not implemented in the SmartThreadPool class. Please create a WorkItemsGroup in order to use this feature." ) ;
//_onIdle -= value;
}
}
internal override void PreQueueWorkItem ( )
{
ValidateNotDisposed ( ) ;
}
# endregion
#region Join, Choice, Pipe, etc.
/// <summary>
/// Executes all actions in parallel.
/// Returns when they all finish.
/// </summary>
/// <param name="actions">Actions to execute</param>
public void Join ( IEnumerable < Action > actions )
{
WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true } ;
IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup ( int . MaxValue , wigStartInfo ) ;
foreach ( Action action in actions )
{
workItemsGroup . QueueWorkItem ( action ) ;
}
workItemsGroup . Start ( ) ;
workItemsGroup . WaitForIdle ( ) ;
}
/// <summary>
/// Executes all actions in parallel.
/// Returns when they all finish.
/// </summary>
/// <param name="actions">Actions to execute</param>
public void Join ( params Action [ ] actions )
{
Join ( ( IEnumerable < Action > ) actions ) ;
}
private class ChoiceIndex
{
public int _index = - 1 ;
}
/// <summary>
/// Executes all actions in parallel
/// Returns when the first one completes
/// </summary>
/// <param name="actions">Actions to execute</param>
public int Choice ( IEnumerable < Action > actions )
{
WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true } ;
IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup ( int . MaxValue , wigStartInfo ) ;
ManualResetEvent anActionCompleted = new ManualResetEvent ( false ) ;
ChoiceIndex choiceIndex = new ChoiceIndex ( ) ;
int i = 0 ;
foreach ( Action action in actions )
{
Action act = action ;
int value = i ;
workItemsGroup . QueueWorkItem ( ( ) = > { act ( ) ; Interlocked . CompareExchange ( ref choiceIndex . _index , value , - 1 ) ; anActionCompleted . Set ( ) ; } ) ;
+ + i ;
}
workItemsGroup . Start ( ) ;
anActionCompleted . WaitOne ( ) ;
return choiceIndex . _index ;
}
/// <summary>
/// Executes all actions in parallel
/// Returns when the first one completes
/// </summary>
/// <param name="actions">Actions to execute</param>
public int Choice ( params Action [ ] actions )
{
return Choice ( ( IEnumerable < Action > ) actions ) ;
}
/// <summary>
/// Executes actions in sequence asynchronously.
/// Returns immediately.
/// </summary>
/// <param name="pipeState">A state context that passes </param>
/// <param name="actions">Actions to execute in the order they should run</param>
public void Pipe < T > ( T pipeState , IEnumerable < Action < T > > actions )
{
WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true } ;
IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup ( 1 , wigStartInfo ) ;
foreach ( Action < T > action in actions )
{
Action < T > act = action ;
workItemsGroup . QueueWorkItem ( ( ) = > act ( pipeState ) ) ;
}
workItemsGroup . Start ( ) ;
workItemsGroup . WaitForIdle ( ) ;
}
/// <summary>
/// Executes actions in sequence asynchronously.
/// Returns immediately.
/// </summary>
/// <param name="pipeState"></param>
/// <param name="actions">Actions to execute in the order they should run</param>
public void Pipe < T > ( T pipeState , params Action < T > [ ] actions )
{
Pipe ( pipeState , ( IEnumerable < Action < T > > ) actions ) ;
}
# endregion
}
# endregion
}