mscorlib(4.0.0.0) API with additions
ConcurrentQueue.cs
2 using System.Diagnostics;
6 using System.Threading;
7 
9 {
12  [Serializable]
13  [ComVisible(false)]
14  [DebuggerDisplay("Count = {Count}")]
15  [DebuggerTypeProxy(typeof(SystemCollectionsConcurrent_ProducerConsumerCollectionDebugView<>))]
16  [__DynamicallyInvokable]
17  [HostProtection(SecurityAction.LinkDemand, Synchronization = true, ExternalThreading = true)]
19  {
20  private class Segment
21  {
22  internal volatile T[] m_array;
23 
24  internal volatile VolatileBool[] m_state;
25 
26  private volatile Segment m_next;
27 
28  internal readonly long m_index;
29 
30  private volatile int m_low;
31 
32  private volatile int m_high;
33 
34  private volatile ConcurrentQueue<T> m_source;
35 
36  internal Segment Next => m_next;
37 
38  internal bool IsEmpty => Low > High;
39 
40  internal int Low => Math.Min(m_low, 32);
41 
42  internal int High => Math.Min(m_high, 31);
43 
44  internal Segment(long index, ConcurrentQueue<T> source)
45  {
46  m_array = new T[32];
47  m_state = new VolatileBool[32];
48  m_high = -1;
49  m_index = index;
50  m_source = source;
51  }
52 
53  internal void UnsafeAdd(T value)
54  {
55  m_high++;
56  m_array[m_high] = value;
57  m_state[m_high].m_value = true;
58  }
59 
60  internal Segment UnsafeGrow()
61  {
62  return m_next = new Segment(m_index + 1, m_source);
63  }
64 
65  internal void Grow()
66  {
67  Segment segment = m_next = new Segment(m_index + 1, m_source);
68  m_source.m_tail = m_next;
69  }
70 
71  internal bool TryAppend(T value)
72  {
73  if (m_high >= 31)
74  {
75  return false;
76  }
77  int num = 32;
78  try
79  {
80  }
81  finally
82  {
83  num = Interlocked.Increment(ref m_high);
84  if (num <= 31)
85  {
86  m_array[num] = value;
87  m_state[num].m_value = true;
88  }
89  if (num == 31)
90  {
91  Grow();
92  }
93  }
94  return num <= 31;
95  }
96 
97  internal bool TryRemove(out T result)
98  {
99  SpinWait spinWait = default(SpinWait);
100  int low = Low;
101  int high = High;
102  while (low <= high)
103  {
104  if (Interlocked.CompareExchange(ref m_low, low + 1, low) == low)
105  {
106  SpinWait spinWait2 = default(SpinWait);
107  while (!m_state[low].m_value)
108  {
109  spinWait2.SpinOnce();
110  }
111  result = m_array[low];
112  if (m_source.m_numSnapshotTakers <= 0)
113  {
114  m_array[low] = default(T);
115  }
116  if (low + 1 >= 32)
117  {
118  spinWait2 = default(SpinWait);
119  while (m_next == null)
120  {
121  spinWait2.SpinOnce();
122  }
123  m_source.m_head = m_next;
124  }
125  return true;
126  }
127  spinWait.SpinOnce();
128  low = Low;
129  high = High;
130  }
131  result = default(T);
132  return false;
133  }
134 
135  internal bool TryPeek(out T result)
136  {
137  result = default(T);
138  int low = Low;
139  if (low > High)
140  {
141  return false;
142  }
143  SpinWait spinWait = default(SpinWait);
144  while (!m_state[low].m_value)
145  {
146  spinWait.SpinOnce();
147  }
148  result = m_array[low];
149  return true;
150  }
151 
152  internal void AddToList(List<T> list, int start, int end)
153  {
154  for (int i = start; i <= end; i++)
155  {
156  SpinWait spinWait = default(SpinWait);
157  while (!m_state[i].m_value)
158  {
159  spinWait.SpinOnce();
160  }
161  list.Add(m_array[i]);
162  }
163  }
164  }
165 
166  [NonSerialized]
167  private volatile Segment m_head;
168 
169  [NonSerialized]
170  private volatile Segment m_tail;
171 
172  private T[] m_serializationArray;
173 
174  private const int SEGMENT_SIZE = 32;
175 
176  [NonSerialized]
177  internal volatile int m_numSnapshotTakers;
178 
181  [__DynamicallyInvokable]
183  {
184  [__DynamicallyInvokable]
185  get
186  {
187  return false;
188  }
189  }
190 
194  [__DynamicallyInvokable]
195  object ICollection.SyncRoot
196  {
197  [__DynamicallyInvokable]
198  get
199  {
200  throw new NotSupportedException(Environment.GetResourceString("ConcurrentCollection_SyncRoot_NotSupported"));
201  }
202  }
203 
206  [__DynamicallyInvokable]
207  public bool IsEmpty
208  {
209  [__DynamicallyInvokable]
210  get
211  {
212  Segment head = m_head;
213  if (!head.IsEmpty)
214  {
215  return false;
216  }
217  if (head.Next == null)
218  {
219  return true;
220  }
221  SpinWait spinWait = default(SpinWait);
222  while (head.IsEmpty)
223  {
224  if (head.Next == null)
225  {
226  return true;
227  }
228  spinWait.SpinOnce();
229  head = m_head;
230  }
231  return false;
232  }
233  }
234 
237  [__DynamicallyInvokable]
238  public int Count
239  {
240  [__DynamicallyInvokable]
241  get
242  {
243  GetHeadTailPositions(out Segment head, out Segment tail, out int headLow, out int tailHigh);
244  if (head == tail)
245  {
246  return tailHigh - headLow + 1;
247  }
248  int num = 32 - headLow;
249  num += 32 * (int)(tail.m_index - head.m_index - 1);
250  return num + (tailHigh + 1);
251  }
252  }
253 
255  [__DynamicallyInvokable]
257  {
258  m_head = (m_tail = new Segment(0L, this));
259  }
260 
261  private void InitializeFromCollection(IEnumerable<T> collection)
262  {
263  Segment segment = m_head = new Segment(0L, this);
264  int num = 0;
265  foreach (T item in collection)
266  {
267  segment.UnsafeAdd(item);
268  num++;
269  if (num >= 32)
270  {
271  segment = segment.UnsafeGrow();
272  num = 0;
273  }
274  }
275  m_tail = segment;
276  }
277 
281  [__DynamicallyInvokable]
282  public ConcurrentQueue(IEnumerable<T> collection)
283  {
284  if (collection == null)
285  {
286  throw new ArgumentNullException("collection");
287  }
288  InitializeFromCollection(collection);
289  }
290 
291  [OnSerializing]
292  private void OnSerializing(StreamingContext context)
293  {
294  m_serializationArray = ToArray();
295  }
296 
297  [OnDeserialized]
298  private void OnDeserialized(StreamingContext context)
299  {
300  InitializeFromCollection(m_serializationArray);
301  m_serializationArray = null;
302  }
303 
313  [__DynamicallyInvokable]
314  void ICollection.CopyTo(Array array, int index)
315  {
316  if (array == null)
317  {
318  throw new ArgumentNullException("array");
319  }
320  ((ICollection)ToList()).CopyTo(array, index);
321  }
322 
325  [__DynamicallyInvokable]
327  {
328  return ((IEnumerable<T>)this).GetEnumerator();
329  }
330 
334  [__DynamicallyInvokable]
335  bool IProducerConsumerCollection<T>.TryAdd(T item)
336  {
337  Enqueue(item);
338  return true;
339  }
340 
344  [__DynamicallyInvokable]
345  bool IProducerConsumerCollection<T>.TryTake(out T item)
346  {
347  return TryDequeue(out item);
348  }
349 
352  [__DynamicallyInvokable]
353  public T[] ToArray()
354  {
355  return ToList().ToArray();
356  }
357 
358  private List<T> ToList()
359  {
360  Interlocked.Increment(ref m_numSnapshotTakers);
361  List<T> list = new List<T>();
362  try
363  {
364  GetHeadTailPositions(out Segment head, out Segment tail, out int headLow, out int tailHigh);
365  if (head != tail)
366  {
367  head.AddToList(list, headLow, 31);
368  for (Segment next = head.Next; next != tail; next = next.Next)
369  {
370  next.AddToList(list, 0, 31);
371  }
372  tail.AddToList(list, 0, tailHigh);
373  return list;
374  }
375  head.AddToList(list, headLow, tailHigh);
376  return list;
377  }
378  finally
379  {
380  Interlocked.Decrement(ref m_numSnapshotTakers);
381  }
382  }
383 
384  private void GetHeadTailPositions(out Segment head, out Segment tail, out int headLow, out int tailHigh)
385  {
386  head = m_head;
387  tail = m_tail;
388  headLow = head.Low;
389  tailHigh = tail.High;
390  SpinWait spinWait = default(SpinWait);
391  while (head != m_head || tail != m_tail || headLow != head.Low || tailHigh != tail.High || head.m_index > tail.m_index)
392  {
393  spinWait.SpinOnce();
394  head = m_head;
395  tail = m_tail;
396  headLow = head.Low;
397  tailHigh = tail.High;
398  }
399  }
400 
410  [__DynamicallyInvokable]
411  public void CopyTo(T[] array, int index)
412  {
413  if (array == null)
414  {
415  throw new ArgumentNullException("array");
416  }
417  ToList().CopyTo(array, index);
418  }
419 
422  [__DynamicallyInvokable]
424  {
425  Interlocked.Increment(ref m_numSnapshotTakers);
426  GetHeadTailPositions(out Segment head, out Segment tail, out int headLow, out int tailHigh);
427  return GetEnumerator(head, tail, headLow, tailHigh);
428  }
429 
430  private IEnumerator<T> GetEnumerator(Segment head, Segment tail, int headLow, int tailHigh)
431  {
432  try
433  {
434  SpinWait spin = default(SpinWait);
435  if (head == tail)
436  {
437  for (int l = headLow; l <= tailHigh; l++)
438  {
439  spin.Reset();
440  while (!head.m_state[l].m_value)
441  {
442  spin.SpinOnce();
443  }
444  yield return head.m_array[l];
445  }
446  }
447  else
448  {
449  for (int k = headLow; k < 32; k++)
450  {
451  spin.Reset();
452  while (!head.m_state[k].m_value)
453  {
454  spin.SpinOnce();
455  }
456  yield return head.m_array[k];
457  }
458  for (Segment curr = head.Next; curr != tail; curr = curr.Next)
459  {
460  for (int i = 0; i < 32; i++)
461  {
462  spin.Reset();
463  while (!curr.m_state[i].m_value)
464  {
465  spin.SpinOnce();
466  }
467  yield return curr.m_array[i];
468  }
469  }
470  for (int j = 0; j <= tailHigh; j++)
471  {
472  spin.Reset();
473  while (!tail.m_state[j].m_value)
474  {
475  spin.SpinOnce();
476  }
477  yield return tail.m_array[j];
478  }
479  }
480  }
481  finally
482  {
483  Interlocked.Decrement(ref m_numSnapshotTakers);
484  }
485  }
486 
489  [__DynamicallyInvokable]
490  public void Enqueue(T item)
491  {
492  SpinWait spinWait = default(SpinWait);
493  while (true)
494  {
495  Segment tail = m_tail;
496  if (tail.TryAppend(item))
497  {
498  break;
499  }
500  spinWait.SpinOnce();
501  }
502  }
503 
508  [__DynamicallyInvokable]
509  public bool TryDequeue(out T result)
510  {
511  while (!IsEmpty)
512  {
513  Segment head = m_head;
514  if (head.TryRemove(out result))
515  {
516  return true;
517  }
518  }
519  result = default(T);
520  return false;
521  }
522 
527  [__DynamicallyInvokable]
528  public bool TryPeek(out T result)
529  {
530  Interlocked.Increment(ref m_numSnapshotTakers);
531  while (!IsEmpty)
532  {
533  Segment head = m_head;
534  if (head.TryPeek(out result))
535  {
536  Interlocked.Decrement(ref m_numSnapshotTakers);
537  return true;
538  }
539  }
540  result = default(T);
541  Interlocked.Decrement(ref m_numSnapshotTakers);
542  return false;
543  }
544  }
545 }
object SyncRoot
Gets an object that can be used to synchronize access to the T:System.Collections....
Definition: ICollection.cs:23
The exception that is thrown when a null reference (Nothing in Visual Basic) is passed to a method th...
void Reset()
Resets the spin counter.
Definition: SpinWait.cs:76
Provides support for spin-based waiting.
Definition: SpinWait.cs:8
void CopyTo(T[] array, int index)
Copies the T:System.Collections.Concurrent.ConcurrentQueue`1 elements to an existing one-dimensional ...
static sbyte Min(sbyte val1, sbyte val2)
Returns the smaller of two 8-bit signed integers.
Definition: Math.cs:762
Definition: __Canon.cs:3
new IEnumerator< T > GetEnumerator()
Returns an enumerator that iterates through the collection.
bool TryPeek(out T result)
Tries to return an object from the beginning of the T:System.Collections.Concurrent....
Exposes the enumerator, which supports a simple iteration over a collection of a specified type....
Definition: IEnumerable.cs:9
bool TryDequeue(out T result)
Tries to remove and return the object at the beginning of the concurrent queue.
Represents a strongly-typed, read-only collection of elements.
ConcurrentQueue(IEnumerable< T > collection)
Initializes a new instance of the T:System.Collections.Concurrent.ConcurrentQueue`1 class that contai...
void CopyTo(T[] array, int arrayIndex)
Copies the elements of the T:System.Collections.Generic.ICollection`1 to an T:System....
Describes the source and destination of a given serialized stream, and provides an additional caller-...
bool IsEmpty
Gets a value that indicates whether the T:System.Collections.Concurrent.ConcurrentQueue`1 is empty.
Supports a simple iteration over a generic collection.
Definition: IEnumerator.cs:6
Defines methods to manipulate generic collections.
Definition: ICollection.cs:9
Exposes an enumerator, which supports a simple iteration over a non-generic collection....
Definition: IEnumerable.cs:9
SecurityAction
Specifies the security actions that can be performed using declarative security.
Provides information about, and means to manipulate, the current environment and platform....
Definition: Environment.cs:21
Specifies that the process performs time-critical tasks that must be executed immediately,...
static int CompareExchange(ref int location1, int value, int comparand)
Compares two 32-bit signed integers for equality and, if they are equal, replaces the first value.
static int Increment(ref int location)
Increments a specified variable and stores the result, as an atomic operation.
Definition: Interlocked.cs:18
Provides methods for creating, manipulating, searching, and sorting arrays, thereby serving as the ba...
Definition: Array.cs:17
bool IsSynchronized
Gets a value indicating whether access to the T:System.Collections.ICollection is synchronized (threa...
Definition: ICollection.cs:33
void Enqueue(T item)
Adds an object to the end of the T:System.Collections.Concurrent.ConcurrentQueue`1.
ConcurrentQueue()
Initializes a new instance of the T:System.Collections.Concurrent.ConcurrentQueue`1 class.
Defines methods to manipulate thread-safe collections intended for producer/consumer usage....
IEnumerator< T > GetEnumerator()
Returns an enumerator that iterates through the T:System.Collections.Concurrent.ConcurrentQueue`1.
T [] ToArray()
Copies the elements stored in the T:System.Collections.Concurrent.ConcurrentQueue`1 to a new array.
Represents a strongly typed list of objects that can be accessed by index. Provides methods to search...
Definition: List.cs:14
void SpinOnce()
Performs a single spin.
Definition: SpinWait.cs:48
Specifies that the class can be serialized.
Represents a thread-safe first in-first out (FIFO) collection.
static int Decrement(ref int location)
Decrements a specified variable and stores the result, as an atomic operation.
Definition: Interlocked.cs:40
Provides constants and static methods for trigonometric, logarithmic, and other common mathematical f...
Definition: Math.cs:10
The exception that is thrown when an invoked method is not supported, or when there is an attempt to ...
Defines size, enumerators, and synchronization methods for all nongeneric collections.
Definition: ICollection.cs:8
int Count
Gets the number of elements contained in the T:System.Collections.Concurrent.ConcurrentQueue`1.
Provides atomic operations for variables that are shared by multiple threads.
Definition: Interlocked.cs:10