601 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			C#
		
	
	
			
		
		
	
	
			601 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			C#
		
	
	
| // Ami Bar
 | |
| // amibar@gmail.com
 | |
| 
 | |
| using System;
 | |
| using System.Threading;
 | |
| 
 | |
| namespace Amib.Threading.Internal
 | |
| {
 | |
|     #region WorkItemsQueue class
 | |
| 
 | |
|     /// <summary>
 | |
|     /// WorkItemsQueue class.
 | |
|     /// </summary>
 | |
|     public class WorkItemsQueue : IDisposable
 | |
|     {
 | |
|         #region Member variables
 | |
| 
 | |
|         /// <summary>
 | |
|         /// Waiters queue (implemented as stack).
 | |
|         /// </summary>
 | |
|         private WaiterEntry _headWaiterEntry = new WaiterEntry();
 | |
| 
 | |
|         /// <summary>
 | |
|         /// Waiters count
 | |
|         /// </summary>
 | |
|         private int _waitersCount = 0;
 | |
| 
 | |
|         /// <summary>
 | |
|         /// Work items queue
 | |
|         /// </summary>
 | |
|         private PriorityQueue _workItems = new PriorityQueue();
 | |
| 
 | |
|         /// <summary>
 | |
|         /// Indicate that work items are allowed to be queued
 | |
|         /// </summary>
 | |
|         private bool _isWorkItemsQueueActive = true;
 | |
| 
 | |
|         /// <summary>
 | |
|         /// Each thread in the thread pool keeps its own waiter entry.
 | |
|         /// </summary>
 | |
|         [ThreadStatic]
 | |
|         private static WaiterEntry _waiterEntry;
 | |
| 
 | |
|         /// <summary>
 | |
|         /// A flag that indicates if the WorkItemsQueue has been disposed.
 | |
|         /// </summary>
 | |
|         private bool _isDisposed = false;
 | |
| 
 | |
|         #endregion
 | |
| 
 | |
|         #region Public properties
 | |
| 
 | |
|         /// <summary>
 | |
|         /// Returns the current number of work items in the queue
 | |
|         /// </summary>
 | |
|         public int Count
 | |
|         {
 | |
|             get
 | |
|             {
 | |
|                 lock(this)
 | |
|                 {
 | |
|                     ValidateNotDisposed();
 | |
|                     return _workItems.Count;
 | |
|                 }
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         /// <summary>
 | |
|         /// Returns the current number of waiters
 | |
|         /// </summary>
 | |
|         public int WaitersCount
 | |
|         {
 | |
|             get
 | |
|             {
 | |
|                 lock(this)
 | |
|                 {
 | |
|                     ValidateNotDisposed();
 | |
|                     return _waitersCount;
 | |
|                 }
 | |
|             }
 | |
|         }
 | |
| 
 | |
| 
 | |
|         #endregion
 | |
| 
 | |
|         #region Public methods
 | |
| 
 | |
|         /// <summary>
 | |
|         /// Enqueue a work item to the queue.
 | |
|         /// </summary>
 | |
|         public bool EnqueueWorkItem(WorkItem workItem)
 | |
|         {
 | |
|             // A work item cannot be null, since null is used in the
 | |
|             // WaitForWorkItem() method to indicate timeout or cancel
 | |
|             if (null == workItem)
 | |
|             {
 | |
|                 throw new ArgumentNullException("workItem" , "workItem cannot be null");
 | |
|             }
 | |
| 
 | |
|             bool enqueue = true;
 | |
| 
 | |
|             // First check if there is a waiter waiting for work item. During 
 | |
|             // the check, timed out waiters are ignored. If there is no 
 | |
|             // waiter then the work item is queued.
 | |
|             lock(this)
 | |
|             {
 | |
|                 ValidateNotDisposed();
 | |
| 
 | |
|                 if (!_isWorkItemsQueueActive)
 | |
|                 {
 | |
|                     return false;
 | |
|                 }
 | |
| 
 | |
|                 while(_waitersCount > 0)
 | |
|                 {
 | |
|                     // Dequeue a waiter.
 | |
|                     WaiterEntry waiterEntry = PopWaiter();
 | |
| 
 | |
|                     // Signal the waiter. On success break the loop
 | |
|                     if (waiterEntry.Signal(workItem))
 | |
|                     {
 | |
|                         enqueue = false;
 | |
|                         break;
 | |
|                     }
 | |
|                 }
 | |
| 
 | |
|                 if (enqueue)
 | |
|                 {
 | |
|                     // Enqueue the work item
 | |
|                     _workItems.Enqueue(workItem);
 | |
|                 }
 | |
|             }
 | |
|             return true;
 | |
|         }
 | |
| 
 | |
| 
 | |
|         /// <summary>
 | |
|         /// Waits for a work item or exits on timeout or cancel
 | |
|         /// </summary>
 | |
|         /// <param name="millisecondsTimeout">Timeout in milliseconds</param>
 | |
|         /// <param name="cancelEvent">Cancel wait handle</param>
 | |
|         /// <returns>Returns true if the resource was granted</returns>
 | |
|         public WorkItem DequeueWorkItem(
 | |
|             int millisecondsTimeout, 
 | |
|             WaitHandle cancelEvent)
 | |
|         {
 | |
|             /// This method cause the caller to wait for a work item.
 | |
|             /// If there is at least one waiting work item then the 
 | |
|             /// method returns immidiately with true.
 | |
|             /// 
 | |
|             /// If there are no waiting work items then the caller 
 | |
|             /// is queued between other waiters for a work item to arrive.
 | |
|             /// 
 | |
|             /// If a work item didn't come within millisecondsTimeout or 
 | |
|             /// the user canceled the wait by signaling the cancelEvent 
 | |
|             /// then the method returns false to indicate that the caller 
 | |
|             /// didn't get a work item.
 | |
| 
 | |
|             WaiterEntry waiterEntry = null;
 | |
|             WorkItem workItem = null;
 | |
| 
 | |
|             lock(this)
 | |
|             {
 | |
|                 ValidateNotDisposed();
 | |
| 
 | |
|                 // If there are waiting work items then take one and return.
 | |
|                 if (_workItems.Count > 0)
 | |
|                 {
 | |
|                     workItem = _workItems.Dequeue() as WorkItem;
 | |
|                     return workItem;
 | |
|                 }
 | |
|                     // No waiting work items ...
 | |
|                 else
 | |
|                 {
 | |
|                     // Get the wait entry for the waiters queue
 | |
|                     waiterEntry = GetThreadWaiterEntry();
 | |
| 
 | |
|                     // Put the waiter with the other waiters
 | |
|                     PushWaiter(waiterEntry);
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             // Prepare array of wait handle for the WaitHandle.WaitAny()
 | |
|             WaitHandle [] waitHandles = new WaitHandle [] { 
 | |
|                                                                 waiterEntry.WaitHandle, 
 | |
|                                                                 cancelEvent };
 | |
| 
 | |
|             // Wait for an available resource, cancel event, or timeout.
 | |
| 
 | |
|             // During the wait we are supposes to exit the synchronization 
 | |
|             // domain. (Placing true as the third argument of the WaitAny())
 | |
|             // It just doesn't work, I don't know why, so I have lock(this) 
 | |
|             // statments insted of one.
 | |
| 
 | |
|             int index = WaitHandle.WaitAny(
 | |
|                 waitHandles,
 | |
|                 millisecondsTimeout, 
 | |
|                 true);
 | |
| 
 | |
|             lock(this)
 | |
|             {
 | |
|                 // success is true if it got a work item.
 | |
|                 bool success = (0 == index);
 | |
| 
 | |
|                 // The timeout variable is used only for readability.
 | |
|                 // (We treat cancel as timeout)
 | |
|                 bool timeout = !success;
 | |
| 
 | |
|                 // On timeout update the waiterEntry that it is timed out
 | |
|                 if (timeout)
 | |
|                 {
 | |
|                     // The Timeout() fails if the waiter has already been signaled
 | |
|                     timeout = waiterEntry.Timeout();
 | |
| 
 | |
|                     // On timeout remove the waiter from the queue.
 | |
|                     // Note that the complexity is O(1).
 | |
|                     if(timeout)
 | |
|                     {
 | |
|                         RemoveWaiter(waiterEntry, false);
 | |
|                     }
 | |
| 
 | |
|                     // Again readability
 | |
|                     success = !timeout;
 | |
|                 }
 | |
| 
 | |
|                 // On success return the work item
 | |
|                 if (success)
 | |
|                 {
 | |
|                     workItem = waiterEntry.WorkItem;
 | |
| 
 | |
|                     if (null == workItem)
 | |
|                     {
 | |
|                         workItem = _workItems.Dequeue() as WorkItem;
 | |
|                     }
 | |
|                 }
 | |
|             }
 | |
|             // On failure return null.
 | |
|             return workItem;
 | |
|         }
 | |
| 
 | |
|         /// <summary>
 | |
|         /// Cleanup the work items queue, hence no more work 
 | |
|         /// items are allowed to be queue
 | |
|         /// </summary>
 | |
|         protected virtual void Cleanup()
 | |
|         {
 | |
|             lock(this)
 | |
|             {
 | |
|                 // Deactivate only once
 | |
|                 if (!_isWorkItemsQueueActive)
 | |
|                 {
 | |
|                     return;
 | |
|                 }
 | |
| 
 | |
|                 // Don't queue more work items
 | |
|                 _isWorkItemsQueueActive = false;
 | |
| 
 | |
|                 foreach(WorkItem workItem in _workItems)
 | |
|                 {
 | |
|                     workItem.DisposeOfState();
 | |
|                 }
 | |
| 
 | |
|                 // Clear the work items that are already queued
 | |
|                 _workItems.Clear();
 | |
| 
 | |
|                 // Note: 
 | |
|                 // I don't iterate over the queue and dispose of work items's states, 
 | |
|                 // since if a work item has a state object that is still in use in the 
 | |
|                 // application then I must not dispose it.
 | |
| 
 | |
|                 // Tell the waiters that they were timed out.
 | |
|                 // It won't signal them to exit, but to ignore their
 | |
|                 // next work item.
 | |
|                 while(_waitersCount > 0)
 | |
|                 {
 | |
|                     WaiterEntry waiterEntry = PopWaiter();
 | |
|                     waiterEntry.Timeout();
 | |
|                 }
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         #endregion
 | |
| 
 | |
|         #region Private methods
 | |
| 
 | |
|         /// <summary>
 | |
|         /// Returns the WaiterEntry of the current thread
 | |
|         /// </summary>
 | |
|         /// <returns></returns>
 | |
|         /// In order to avoid creation and destuction of WaiterEntry
 | |
|         /// objects each thread has its own WaiterEntry object.
 | |
|         private WaiterEntry GetThreadWaiterEntry()
 | |
|         {
 | |
|             if (null == _waiterEntry)
 | |
|             {
 | |
|                 _waiterEntry = new WaiterEntry();
 | |
|             }
 | |
|             _waiterEntry.Reset();
 | |
|             return _waiterEntry;
 | |
|         }
 | |
| 
 | |
|         #region Waiters stack methods
 | |
| 
 | |
|         /// <summary>
 | |
|         /// Push a new waiter into the waiter's stack
 | |
|         /// </summary>
 | |
|         /// <param name="newWaiterEntry">A waiter to put in the stack</param>
 | |
|         public void PushWaiter(WaiterEntry newWaiterEntry)
 | |
|         {
 | |
|             // Remove the waiter if it is already in the stack and 
 | |
|             // update waiter's count as needed
 | |
|             RemoveWaiter(newWaiterEntry, false);
 | |
| 
 | |
|             // If the stack is empty then newWaiterEntry is the new head of the stack 
 | |
|             if (null == _headWaiterEntry._nextWaiterEntry)
 | |
|             {
 | |
|                 _headWaiterEntry._nextWaiterEntry = newWaiterEntry;
 | |
|                 newWaiterEntry._prevWaiterEntry = _headWaiterEntry;
 | |
| 
 | |
|             }
 | |
|             // If the stack is not empty then put newWaiterEntry as the new head 
 | |
|             // of the stack.
 | |
|             else
 | |
|             {
 | |
|                 // Save the old first waiter entry
 | |
|                 WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry;
 | |
| 
 | |
|                 // Update the links
 | |
|                 _headWaiterEntry._nextWaiterEntry = newWaiterEntry;
 | |
|                 newWaiterEntry._nextWaiterEntry = oldFirstWaiterEntry;
 | |
|                 newWaiterEntry._prevWaiterEntry = _headWaiterEntry;
 | |
|                 oldFirstWaiterEntry._prevWaiterEntry = newWaiterEntry;
 | |
|             }
 | |
| 
 | |
|             // Increment the number of waiters
 | |
|             ++_waitersCount;
 | |
|         }
 | |
| 
 | |
|         /// <summary>
 | |
|         /// Pop a waiter from the waiter's stack
 | |
|         /// </summary>
 | |
|         /// <returns>Returns the first waiter in the stack</returns>
 | |
|         private WaiterEntry PopWaiter()
 | |
|         {
 | |
|             // Store the current stack head
 | |
|             WaiterEntry oldFirstWaiterEntry = _headWaiterEntry._nextWaiterEntry;
 | |
| 
 | |
|             // Store the new stack head
 | |
|             WaiterEntry newHeadWaiterEntry = oldFirstWaiterEntry._nextWaiterEntry;
 | |
| 
 | |
|             // Update the old stack head list links and decrement the number
 | |
|             // waiters.
 | |
|             RemoveWaiter(oldFirstWaiterEntry, true);
 | |
| 
 | |
|             // Update the new stack head
 | |
|             _headWaiterEntry._nextWaiterEntry = newHeadWaiterEntry;
 | |
|             if (null != newHeadWaiterEntry)
 | |
|             {
 | |
|                 newHeadWaiterEntry._prevWaiterEntry = _headWaiterEntry;
 | |
|             }
 | |
| 
 | |
|             // Return the old stack head
 | |
|             return oldFirstWaiterEntry;
 | |
|         }
 | |
| 
 | |
|         /// <summary>
 | |
|         /// Remove a waiter from the stack
 | |
|         /// </summary>
 | |
|         /// <param name="waiterEntry">A waiter entry to remove</param>
 | |
|         /// <param name="popDecrement">If true the waiter count is always decremented</param>
 | |
|         private void RemoveWaiter(WaiterEntry waiterEntry, bool popDecrement)
 | |
|         {
 | |
|             // Store the prev entry in the list
 | |
|             WaiterEntry prevWaiterEntry = waiterEntry._prevWaiterEntry;
 | |
| 
 | |
|             // Store the next entry in the list
 | |
|             WaiterEntry nextWaiterEntry = waiterEntry._nextWaiterEntry;
 | |
| 
 | |
|             // A flag to indicate if we need to decrement the waiters count.
 | |
|             // If we got here from PopWaiter then we must decrement.
 | |
|             // If we got here from PushWaiter then we decrement only if
 | |
|             // the waiter was already in the stack.
 | |
|             bool decrementCounter = popDecrement;
 | |
| 
 | |
|             // Null the waiter's entry links
 | |
|             waiterEntry._prevWaiterEntry = null;
 | |
|             waiterEntry._nextWaiterEntry = null;
 | |
| 
 | |
|             // If the waiter entry had a prev link then update it.
 | |
|             // It also means that the waiter is already in the list and we
 | |
|             // need to decrement the waiters count.
 | |
|             if (null != prevWaiterEntry)
 | |
|             {
 | |
|                 prevWaiterEntry._nextWaiterEntry = nextWaiterEntry;
 | |
|                 decrementCounter = true;
 | |
|             }
 | |
| 
 | |
|             // If the waiter entry had a next link then update it.
 | |
|             // It also means that the waiter is already in the list and we
 | |
|             // need to decrement the waiters count.
 | |
|             if (null != nextWaiterEntry)
 | |
|             {
 | |
|                 nextWaiterEntry._prevWaiterEntry = prevWaiterEntry;
 | |
|                 decrementCounter = true;
 | |
|             }
 | |
| 
 | |
|             // Decrement the waiters count if needed
 | |
|             if (decrementCounter)
 | |
|             {
 | |
|                 --_waitersCount;
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         #endregion
 | |
| 
 | |
|         #endregion
 | |
| 
 | |
|         #region WaiterEntry class 
 | |
| 
 | |
|         // A waiter entry in the _waiters queue.
 | |
|         public class WaiterEntry : IDisposable
 | |
|         {
 | |
|             #region Member variables
 | |
| 
 | |
|             /// <summary>
 | |
|             /// Event to signal the waiter that it got the work item.
 | |
|             /// </summary>
 | |
|             private AutoResetEvent _waitHandle = new AutoResetEvent(false);
 | |
| 
 | |
|             /// <summary>
 | |
|             /// Flag to know if this waiter already quited from the queue 
 | |
|             /// because of a timeout.
 | |
|             /// </summary>
 | |
|             private bool _isTimedout = false;
 | |
| 
 | |
|             /// <summary>
 | |
|             /// Flag to know if the waiter was signaled and got a work item. 
 | |
|             /// </summary>
 | |
|             private bool _isSignaled = false;
 | |
| 
 | |
|             /// <summary>
 | |
|             /// A work item that passed directly to the waiter withou going 
 | |
|             /// through the queue
 | |
|             /// </summary>
 | |
|             private WorkItem _workItem = null;
 | |
| 
 | |
|             private bool _isDisposed = false;
 | |
| 
 | |
|             // Linked list members
 | |
|             internal WaiterEntry _nextWaiterEntry = null;
 | |
|             internal WaiterEntry _prevWaiterEntry = null;
 | |
| 
 | |
|             #endregion
 | |
| 
 | |
|             #region Construction
 | |
| 
 | |
|             public WaiterEntry()
 | |
|             {
 | |
|                 Reset();
 | |
|             }
 | |
|                         
 | |
|             #endregion
 | |
| 
 | |
|             #region Public methods
 | |
| 
 | |
|             public WaitHandle WaitHandle
 | |
|             {
 | |
|                 get { return _waitHandle; }
 | |
|             }
 | |
| 
 | |
|             public WorkItem WorkItem
 | |
|             {
 | |
|                 get
 | |
|                 {
 | |
|                     lock(this)
 | |
|                     {
 | |
|                         return _workItem;
 | |
|                     }
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             /// <summary>
 | |
|             /// Signal the waiter that it got a work item.
 | |
|             /// </summary>
 | |
|             /// <returns>Return true on success</returns>
 | |
|             /// The method fails if Timeout() preceded its call
 | |
|             public bool Signal(WorkItem workItem)
 | |
|             {
 | |
|                 lock(this)
 | |
|                 {
 | |
|                     if (!_isTimedout)
 | |
|                     {
 | |
|                         _workItem = workItem;
 | |
|                         _isSignaled = true;
 | |
|                         _waitHandle.Set();
 | |
|                         return true;
 | |
|                     }
 | |
|                 }
 | |
|                 return false;
 | |
|             }
 | |
| 
 | |
|             /// <summary>
 | |
|             /// Mark the wait entry that it has been timed out
 | |
|             /// </summary>
 | |
|             /// <returns>Return true on success</returns>
 | |
|             /// The method fails if Signal() preceded its call
 | |
|             public bool Timeout()
 | |
|             {
 | |
|                 lock(this)
 | |
|                 {
 | |
|                     // Time out can happen only if the waiter wasn't marked as
 | |
|                     // signaled
 | |
|                     if (!_isSignaled)
 | |
|                     {
 | |
|                         // We don't remove the waiter from the queue, the DequeueWorkItem 
 | |
|                         // method skips _waiters that were timed out.
 | |
|                         _isTimedout = true;
 | |
|                         return true;
 | |
|                     }
 | |
|                 }
 | |
|                 return false;
 | |
|             }
 | |
| 
 | |
|             /// <summary>
 | |
|             /// Reset the wait entry so it can be used again
 | |
|             /// </summary>
 | |
|             public void Reset()
 | |
|             {
 | |
|                 _workItem = null;
 | |
|                 _isTimedout = false;
 | |
|                 _isSignaled = false;
 | |
|                 _waitHandle.Reset();
 | |
|             }
 | |
| 
 | |
|             /// <summary>
 | |
|             /// Free resources
 | |
|             /// </summary>
 | |
|             public void Close()
 | |
|             {
 | |
|                 if (null != _waitHandle)
 | |
|                 {
 | |
|                     _waitHandle.Close();
 | |
|                     _waitHandle = null;
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             #endregion
 | |
| 
 | |
|             #region IDisposable Members
 | |
| 
 | |
|             public void Dispose()
 | |
|             {
 | |
|                 if (!_isDisposed)
 | |
|                 {
 | |
|                     Close();
 | |
|                     _isDisposed = true;
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             ~WaiterEntry()
 | |
|             {
 | |
|                 Dispose();
 | |
|             }
 | |
| 
 | |
|             #endregion
 | |
|         }
 | |
| 
 | |
|         #endregion
 | |
| 
 | |
|         #region IDisposable Members
 | |
| 
 | |
|         public void Dispose()
 | |
|         {
 | |
|             if (!_isDisposed)
 | |
|             {
 | |
|                 Cleanup();
 | |
|                 _isDisposed = true;
 | |
|                 GC.SuppressFinalize(this);
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         ~WorkItemsQueue()
 | |
|         {
 | |
|             Cleanup();
 | |
|         }
 | |
| 
 | |
|         private void ValidateNotDisposed()
 | |
|         {
 | |
|             if(_isDisposed)
 | |
|             {
 | |
|                 throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown");
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         #endregion
 | |
|     }
 | |
| 
 | |
|     #endregion
 | |
| }
 | |
| 
 |