AcUtils
A high performance abstraction layer for AccuRev
LimitedConcurrencyLevelTaskScheduler.cs
Go to the documentation of this file.
1 
13 using System.Collections.Generic;
14 using System.Linq;
15 
16 namespace System.Threading.Tasks.Schedulers
17 {
22  public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
23  {
25  [ThreadStatic]
26  private static bool _currentThreadIsProcessingItems;
28  private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)
30  private readonly int _maxDegreeOfParallelism;
32  private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks)
33 
39  public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
40  {
41  if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
42  _maxDegreeOfParallelism = maxDegreeOfParallelism;
43  }
44 
47  protected sealed override void QueueTask(Task task)
48  {
49  // Add the task to the list of tasks to be processed. If there aren't enough
50  // delegates currently queued or running to process tasks, schedule another.
51  lock (_tasks)
52  {
53  _tasks.AddLast(task);
54  if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
55  {
56  ++_delegatesQueuedOrRunning;
57  NotifyThreadPoolOfPendingWork();
58  }
59  }
60  }
61 
66  {
67  ThreadPool.UnsafeQueueUserWorkItem(_ =>
68  {
69  // Note that the current thread is now processing work items.
70  // This is necessary to enable inlining of tasks into this thread.
71  _currentThreadIsProcessingItems = true;
72  try
73  {
74  // Process all available items in the queue.
75  while (true)
76  {
77  Task item;
78  lock (_tasks)
79  {
80  // When there are no more items to be processed,
81  // note that we're done processing, and get out.
82  if (_tasks.Count == 0)
83  {
84  --_delegatesQueuedOrRunning;
85  break;
86  }
87 
88  // Get the next item from the queue
89  item = _tasks.First.Value;
90  _tasks.RemoveFirst();
91  }
92 
93  // Execute the task we pulled out of the queue
94  base.TryExecuteTask(item);
95  }
96  }
97  // We're done processing items on the current thread
98  finally { _currentThreadIsProcessingItems = false; }
99  }, null);
100  }
101 
106  protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
107  {
108  // If this thread isn't already processing a task, we don't support inlining
109  if (!_currentThreadIsProcessingItems) return false;
110 
111  // If the task was previously queued, remove it from the queue
112  if (taskWasPreviouslyQueued) TryDequeue(task);
113 
114  // Try to run the task.
115  return base.TryExecuteTask(task);
116  }
117 
121  protected sealed override bool TryDequeue(Task task)
122  {
123  lock (_tasks) return _tasks.Remove(task);
124  }
125 
127  public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }
128 
131  protected sealed override IEnumerable<Task> GetScheduledTasks()
132  {
133  bool lockTaken = false;
134  try
135  {
136  Monitor.TryEnter(_tasks, ref lockTaken);
137  if (lockTaken) return _tasks.ToArray();
138  else throw new NotSupportedException();
139  }
140  finally
141  {
142  if (lockTaken) Monitor.Exit(_tasks);
143  }
144  }
145  }
146 }
static bool _currentThreadIsProcessingItems
Whether the current thread is processing work items.
sealed override bool TryDequeue(Task task)
Attempts to remove a previously scheduled task from the scheduler.
LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the specified degree o...
void NotifyThreadPoolOfPendingWork()
Informs the ThreadPool that there's work to be executed for this scheduler.
sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
Attempts to execute the specified task on the current thread.
sealed override IEnumerable< Task > GetScheduledTasks()
Gets an enumerable of the tasks currently scheduled on this scheduler.
sealed override void QueueTask(Task task)
Queues a task to the scheduler.
readonly int _maxDegreeOfParallelism
The maximum concurrency level allowed by this scheduler.
Provides a task scheduler that ensures a maximum concurrency level while running on top of the Thread...