mscorlib(4.0.0.0) API with additions
BlockingCollection.cs
2 using System.Diagnostics;
6 using System.Threading;
7 
9 {
12  [ComVisible(false)]
13  [DebuggerTypeProxy(typeof(SystemThreadingCollections_BlockingCollectionDebugView<>))]
14  [DebuggerDisplay("Count = {Count}, Type = {m_collection}")]
15  [global::__DynamicallyInvokable]
16  [HostProtection(SecurityAction.LinkDemand, Synchronization = true, ExternalThreading = true)]
18  {
19  private IProducerConsumerCollection<T> m_collection;
20 
21  private int m_boundedCapacity;
22 
23  private const int NON_BOUNDED = -1;
24 
25  private SemaphoreSlim m_freeNodes;
26 
27  private SemaphoreSlim m_occupiedNodes;
28 
29  private bool m_isDisposed;
30 
31  private CancellationTokenSource m_ConsumersCancellationTokenSource;
32 
33  private CancellationTokenSource m_ProducersCancellationTokenSource;
34 
35  private volatile int m_currentAdders;
36 
37  private const int COMPLETE_ADDING_ON_MASK = int.MinValue;
38 
42  [global::__DynamicallyInvokable]
43  public int BoundedCapacity
44  {
45  [global::__DynamicallyInvokable]
46  get
47  {
48  CheckDisposed();
49  return m_boundedCapacity;
50  }
51  }
52 
56  [global::__DynamicallyInvokable]
57  public bool IsAddingCompleted
58  {
59  [global::__DynamicallyInvokable]
60  get
61  {
62  CheckDisposed();
63  return m_currentAdders == int.MinValue;
64  }
65  }
66 
70  [global::__DynamicallyInvokable]
71  public bool IsCompleted
72  {
73  [global::__DynamicallyInvokable]
74  get
75  {
76  CheckDisposed();
78  {
79  return m_occupiedNodes.CurrentCount == 0;
80  }
81  return false;
82  }
83  }
84 
88  [global::__DynamicallyInvokable]
89  public int Count
90  {
91  [global::__DynamicallyInvokable]
92  get
93  {
94  CheckDisposed();
95  return m_occupiedNodes.CurrentCount;
96  }
97  }
98 
102  [global::__DynamicallyInvokable]
104  {
105  [global::__DynamicallyInvokable]
106  get
107  {
108  CheckDisposed();
109  return false;
110  }
111  }
112 
116  [global::__DynamicallyInvokable]
117  object ICollection.SyncRoot
118  {
119  [global::__DynamicallyInvokable]
120  get
121  {
122  throw new NotSupportedException(SR.GetString("ConcurrentCollection_SyncRoot_NotSupported"));
123  }
124  }
125 
126  private static bool IsSTAThread => Thread.CurrentThread.GetApartmentState() == ApartmentState.STA;
127 
129  [global::__DynamicallyInvokable]
132  {
133  }
134 
138  [global::__DynamicallyInvokable]
139  public BlockingCollection(int boundedCapacity)
140  : this((IProducerConsumerCollection<T>)new ConcurrentQueue<T>(), boundedCapacity)
141  {
142  }
143 
150  [global::__DynamicallyInvokable]
151  public BlockingCollection(IProducerConsumerCollection<T> collection, int boundedCapacity)
152  {
153  if (boundedCapacity < 1)
154  {
155  throw new ArgumentOutOfRangeException("boundedCapacity", boundedCapacity, SR.GetString("BlockingCollection_ctor_BoundedCapacityRange"));
156  }
157  if (collection == null)
158  {
159  throw new ArgumentNullException("collection");
160  }
161  int count = collection.Count;
162  if (count > boundedCapacity)
163  {
164  throw new ArgumentException(SR.GetString("BlockingCollection_ctor_CountMoreThanCapacity"));
165  }
166  Initialize(collection, boundedCapacity, count);
167  }
168 
172  [global::__DynamicallyInvokable]
174  {
175  if (collection == null)
176  {
177  throw new ArgumentNullException("collection");
178  }
179  Initialize(collection, -1, collection.Count);
180  }
181 
182  private void Initialize(IProducerConsumerCollection<T> collection, int boundedCapacity, int collectionCount)
183  {
184  m_collection = collection;
185  m_boundedCapacity = boundedCapacity;
186  m_isDisposed = false;
187  m_ConsumersCancellationTokenSource = new CancellationTokenSource();
188  m_ProducersCancellationTokenSource = new CancellationTokenSource();
189  if (boundedCapacity == -1)
190  {
191  m_freeNodes = null;
192  }
193  else
194  {
195  m_freeNodes = new SemaphoreSlim(boundedCapacity - collectionCount);
196  }
197  m_occupiedNodes = new SemaphoreSlim(collectionCount);
198  }
199 
204  [global::__DynamicallyInvokable]
205  public void Add(T item)
206  {
207  TryAddWithNoTimeValidation(item, -1, default(CancellationToken));
208  }
209 
216  [global::__DynamicallyInvokable]
217  public void Add(T item, CancellationToken cancellationToken)
218  {
219  TryAddWithNoTimeValidation(item, -1, cancellationToken);
220  }
221 
227  [global::__DynamicallyInvokable]
228  public bool TryAdd(T item)
229  {
230  return TryAddWithNoTimeValidation(item, 0, default(CancellationToken));
231  }
232 
241  [global::__DynamicallyInvokable]
242  public bool TryAdd(T item, TimeSpan timeout)
243  {
244  ValidateTimeout(timeout);
245  return TryAddWithNoTimeValidation(item, (int)timeout.TotalMilliseconds, default(CancellationToken));
246  }
247 
256  [global::__DynamicallyInvokable]
257  public bool TryAdd(T item, int millisecondsTimeout)
258  {
259  ValidateMillisecondsTimeout(millisecondsTimeout);
260  return TryAddWithNoTimeValidation(item, millisecondsTimeout, default(CancellationToken));
261  }
262 
273  [global::__DynamicallyInvokable]
274  public bool TryAdd(T item, int millisecondsTimeout, CancellationToken cancellationToken)
275  {
276  ValidateMillisecondsTimeout(millisecondsTimeout);
277  return TryAddWithNoTimeValidation(item, millisecondsTimeout, cancellationToken);
278  }
279 
280  private bool TryAddWithNoTimeValidation(T item, int millisecondsTimeout, CancellationToken cancellationToken)
281  {
282  CheckDisposed();
283  if (cancellationToken.IsCancellationRequested)
284  {
285  throw new OperationCanceledException(SR.GetString("Common_OperationCanceled"), cancellationToken);
286  }
287  if (IsAddingCompleted)
288  {
289  throw new InvalidOperationException(SR.GetString("BlockingCollection_Completed"));
290  }
291  bool flag = true;
292  if (m_freeNodes != null)
293  {
294  CancellationTokenSource cancellationTokenSource = null;
295  try
296  {
297  flag = m_freeNodes.Wait(0);
298  if (!flag && millisecondsTimeout != 0)
299  {
300  cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, m_ProducersCancellationTokenSource.Token);
301  flag = m_freeNodes.Wait(millisecondsTimeout, cancellationTokenSource.Token);
302  }
303  }
304  catch (OperationCanceledException)
305  {
306  if (cancellationToken.IsCancellationRequested)
307  {
308  throw new OperationCanceledException(SR.GetString("Common_OperationCanceled"), cancellationToken);
309  }
310  throw new InvalidOperationException(SR.GetString("BlockingCollection_Add_ConcurrentCompleteAdd"));
311  }
312  finally
313  {
314  cancellationTokenSource?.Dispose();
315  }
316  }
317  if (flag)
318  {
319  SpinWait spinWait = default(SpinWait);
320  while (true)
321  {
322  int currentAdders = m_currentAdders;
323  if ((currentAdders & int.MinValue) != 0)
324  {
325  spinWait.Reset();
326  while (m_currentAdders != int.MinValue)
327  {
328  spinWait.SpinOnce();
329  }
330  throw new InvalidOperationException(SR.GetString("BlockingCollection_Completed"));
331  }
332  if (Interlocked.CompareExchange(ref m_currentAdders, currentAdders + 1, currentAdders) == currentAdders)
333  {
334  break;
335  }
336  spinWait.SpinOnce();
337  }
338  try
339  {
340  bool flag2 = false;
341  try
342  {
343  cancellationToken.ThrowIfCancellationRequested();
344  flag2 = m_collection.TryAdd(item);
345  }
346  catch
347  {
348  if (m_freeNodes != null)
349  {
350  m_freeNodes.Release();
351  }
352  throw;
353  }
354  if (!flag2)
355  {
356  throw new InvalidOperationException(SR.GetString("BlockingCollection_Add_Failed"));
357  }
358  m_occupiedNodes.Release();
359  return flag;
360  }
361  finally
362  {
363  Interlocked.Decrement(ref m_currentAdders);
364  }
365  }
366  return flag;
367  }
368 
373  [global::__DynamicallyInvokable]
374  public T Take()
375  {
376  if (!TryTake(out T item, -1, CancellationToken.None))
377  {
378  throw new InvalidOperationException(SR.GetString("BlockingCollection_CantTakeWhenDone"));
379  }
380  return item;
381  }
382 
389  [global::__DynamicallyInvokable]
390  public T Take(CancellationToken cancellationToken)
391  {
392  if (!TryTake(out T item, -1, cancellationToken))
393  {
394  throw new InvalidOperationException(SR.GetString("BlockingCollection_CantTakeWhenDone"));
395  }
396  return item;
397  }
398 
405  [global::__DynamicallyInvokable]
406  public bool TryTake(out T item)
407  {
408  return TryTake(out item, 0, CancellationToken.None);
409  }
410 
421  [global::__DynamicallyInvokable]
422  public bool TryTake(out T item, TimeSpan timeout)
423  {
424  ValidateTimeout(timeout);
425  return TryTakeWithNoTimeValidation(out item, (int)timeout.TotalMilliseconds, CancellationToken.None, null);
426  }
427 
437  [global::__DynamicallyInvokable]
438  public bool TryTake(out T item, int millisecondsTimeout)
439  {
440  ValidateMillisecondsTimeout(millisecondsTimeout);
441  return TryTakeWithNoTimeValidation(out item, millisecondsTimeout, CancellationToken.None, null);
442  }
443 
455  [global::__DynamicallyInvokable]
456  public bool TryTake(out T item, int millisecondsTimeout, CancellationToken cancellationToken)
457  {
458  ValidateMillisecondsTimeout(millisecondsTimeout);
459  return TryTakeWithNoTimeValidation(out item, millisecondsTimeout, cancellationToken, null);
460  }
461 
462  private bool TryTakeWithNoTimeValidation(out T item, int millisecondsTimeout, CancellationToken cancellationToken, CancellationTokenSource combinedTokenSource)
463  {
464  CheckDisposed();
465  item = default(T);
466  if (cancellationToken.IsCancellationRequested)
467  {
468  throw new OperationCanceledException(SR.GetString("Common_OperationCanceled"), cancellationToken);
469  }
470  if (IsCompleted)
471  {
472  return false;
473  }
474  bool flag = false;
475  CancellationTokenSource cancellationTokenSource = combinedTokenSource;
476  try
477  {
478  flag = m_occupiedNodes.Wait(0);
479  if (!flag && millisecondsTimeout != 0)
480  {
481  if (combinedTokenSource == null)
482  {
483  cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, m_ConsumersCancellationTokenSource.Token);
484  }
485  flag = m_occupiedNodes.Wait(millisecondsTimeout, cancellationTokenSource.Token);
486  }
487  }
488  catch (OperationCanceledException)
489  {
490  if (cancellationToken.IsCancellationRequested)
491  {
492  throw new OperationCanceledException(SR.GetString("Common_OperationCanceled"), cancellationToken);
493  }
494  return false;
495  }
496  finally
497  {
498  if (cancellationTokenSource != null && combinedTokenSource == null)
499  {
500  cancellationTokenSource.Dispose();
501  }
502  }
503  if (flag)
504  {
505  bool flag2 = false;
506  bool flag3 = true;
507  try
508  {
509  cancellationToken.ThrowIfCancellationRequested();
510  flag2 = m_collection.TryTake(out item);
511  flag3 = false;
512  if (!flag2)
513  {
514  throw new InvalidOperationException(SR.GetString("BlockingCollection_Take_CollectionModified"));
515  }
516  return flag;
517  }
518  finally
519  {
520  if (flag2)
521  {
522  if (m_freeNodes != null)
523  {
524  m_freeNodes.Release();
525  }
526  }
527  else if (flag3)
528  {
529  m_occupiedNodes.Release();
530  }
531  if (IsCompleted)
532  {
533  CancelWaitingConsumers();
534  }
535  }
536  }
537  return flag;
538  }
539 
549  [global::__DynamicallyInvokable]
550  public static int AddToAny(BlockingCollection<T>[] collections, T item)
551  {
552  return TryAddToAny(collections, item, -1, CancellationToken.None);
553  }
554 
566  [global::__DynamicallyInvokable]
567  public static int AddToAny(BlockingCollection<T>[] collections, T item, CancellationToken cancellationToken)
568  {
569  return TryAddToAny(collections, item, -1, cancellationToken);
570  }
571 
581  [global::__DynamicallyInvokable]
582  public static int TryAddToAny(BlockingCollection<T>[] collections, T item)
583  {
584  return TryAddToAny(collections, item, 0, CancellationToken.None);
585  }
586 
598  [global::__DynamicallyInvokable]
599  public static int TryAddToAny(BlockingCollection<T>[] collections, T item, TimeSpan timeout)
600  {
601  ValidateTimeout(timeout);
602  return TryAddToAnyCore(collections, item, (int)timeout.TotalMilliseconds, CancellationToken.None);
603  }
604 
616  [global::__DynamicallyInvokable]
617  public static int TryAddToAny(BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
618  {
619  ValidateMillisecondsTimeout(millisecondsTimeout);
620  return TryAddToAnyCore(collections, item, millisecondsTimeout, CancellationToken.None);
621  }
622 
636  [global::__DynamicallyInvokable]
637  public static int TryAddToAny(BlockingCollection<T>[] collections, T item, int millisecondsTimeout, CancellationToken cancellationToken)
638  {
639  ValidateMillisecondsTimeout(millisecondsTimeout);
640  return TryAddToAnyCore(collections, item, millisecondsTimeout, cancellationToken);
641  }
642 
643  private static int TryAddToAnyCore(BlockingCollection<T>[] collections, T item, int millisecondsTimeout, CancellationToken externalCancellationToken)
644  {
645  ValidateCollectionsArray(collections, isAddOperation: true);
646  int num = millisecondsTimeout;
647  uint startTime = 0u;
648  if (millisecondsTimeout != -1)
649  {
650  startTime = (uint)Environment.TickCount;
651  }
652  int num2 = TryAddToAnyFast(collections, item);
653  if (num2 > -1)
654  {
655  return num2;
656  }
657  CancellationToken[] cancellationTokens;
658  List<WaitHandle> handles = GetHandles(collections, externalCancellationToken, isAddOperation: true, out cancellationTokens);
659  while (millisecondsTimeout == -1 || num >= 0)
660  {
661  num2 = -1;
662  using (CancellationTokenSource cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationTokens))
663  {
664  handles.Add(cancellationTokenSource.Token.WaitHandle);
665  num2 = WaitHandle.WaitAny(handles.ToArray(), num, exitContext: false);
666  handles.RemoveAt(handles.Count - 1);
667  if (cancellationTokenSource.IsCancellationRequested)
668  {
669  if (externalCancellationToken.IsCancellationRequested)
670  {
671  throw new OperationCanceledException(SR.GetString("Common_OperationCanceled"), externalCancellationToken);
672  }
673  throw new ArgumentException(SR.GetString("BlockingCollection_CantAddAnyWhenCompleted"), "collections");
674  }
675  }
676  if (num2 == 258)
677  {
678  return -1;
679  }
680  if (collections[num2].TryAdd(item))
681  {
682  return num2;
683  }
684  if (millisecondsTimeout != -1)
685  {
686  num = UpdateTimeOut(startTime, millisecondsTimeout);
687  }
688  }
689  return -1;
690  }
691 
692  private static int TryAddToAnyFast(BlockingCollection<T>[] collections, T item)
693  {
694  for (int i = 0; i < collections.Length; i++)
695  {
696  if (collections[i].m_freeNodes == null)
697  {
698  collections[i].TryAdd(item);
699  return i;
700  }
701  }
702  return -1;
703  }
704 
705  private static List<WaitHandle> GetHandles(BlockingCollection<T>[] collections, CancellationToken externalCancellationToken, bool isAddOperation, out CancellationToken[] cancellationTokens)
706  {
707  List<WaitHandle> list = new List<WaitHandle>(collections.Length + 1);
708  List<CancellationToken> list2 = new List<CancellationToken>(collections.Length + 1);
709  list2.Add(externalCancellationToken);
710  if (isAddOperation)
711  {
712  for (int i = 0; i < collections.Length; i++)
713  {
714  if (collections[i].m_freeNodes != null)
715  {
716  list.Add(collections[i].m_freeNodes.AvailableWaitHandle);
717  list2.Add(collections[i].m_ProducersCancellationTokenSource.Token);
718  }
719  }
720  }
721  else
722  {
723  for (int j = 0; j < collections.Length; j++)
724  {
725  if (!collections[j].IsCompleted)
726  {
727  list.Add(collections[j].m_occupiedNodes.AvailableWaitHandle);
728  list2.Add(collections[j].m_ConsumersCancellationTokenSource.Token);
729  }
730  }
731  }
732  cancellationTokens = list2.ToArray();
733  return list;
734  }
735 
736  private static int UpdateTimeOut(uint startTime, int originalWaitMillisecondsTimeout)
737  {
738  if (originalWaitMillisecondsTimeout == 0)
739  {
740  return 0;
741  }
742  uint num = (uint)(Environment.TickCount - (int)startTime);
743  if (num > int.MaxValue)
744  {
745  return 0;
746  }
747  int num2 = originalWaitMillisecondsTimeout - (int)num;
748  if (num2 <= 0)
749  {
750  return 0;
751  }
752  return num2;
753  }
754 
764  [global::__DynamicallyInvokable]
765  public static int TakeFromAny(BlockingCollection<T>[] collections, out T item)
766  {
767  return TakeFromAny(collections, out item, CancellationToken.None);
768  }
769 
781  [global::__DynamicallyInvokable]
782  public static int TakeFromAny(BlockingCollection<T>[] collections, out T item, CancellationToken cancellationToken)
783  {
784  return TryTakeFromAnyCore(collections, out item, -1, isTakeOperation: true, cancellationToken);
785  }
786 
796  [global::__DynamicallyInvokable]
797  public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T item)
798  {
799  return TryTakeFromAny(collections, out item, 0);
800  }
801 
813  [global::__DynamicallyInvokable]
814  public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T item, TimeSpan timeout)
815  {
816  ValidateTimeout(timeout);
817  return TryTakeFromAnyCore(collections, out item, (int)timeout.TotalMilliseconds, isTakeOperation: false, CancellationToken.None);
818  }
819 
831  [global::__DynamicallyInvokable]
832  public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
833  {
834  ValidateMillisecondsTimeout(millisecondsTimeout);
835  return TryTakeFromAnyCore(collections, out item, millisecondsTimeout, isTakeOperation: false, CancellationToken.None);
836  }
837 
851  [global::__DynamicallyInvokable]
852  public static int TryTakeFromAny(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout, CancellationToken cancellationToken)
853  {
854  ValidateMillisecondsTimeout(millisecondsTimeout);
855  return TryTakeFromAnyCore(collections, out item, millisecondsTimeout, isTakeOperation: false, cancellationToken);
856  }
857 
858  private static int TryTakeFromAnyCore(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout, bool isTakeOperation, CancellationToken externalCancellationToken)
859  {
860  ValidateCollectionsArray(collections, isAddOperation: false);
861  for (int i = 0; i < collections.Length; i++)
862  {
863  if (!collections[i].IsCompleted && collections[i].m_occupiedNodes.CurrentCount > 0 && collections[i].TryTake(out item))
864  {
865  return i;
866  }
867  }
868  return TryTakeFromAnyCoreSlow(collections, out item, millisecondsTimeout, isTakeOperation, externalCancellationToken);
869  }
870 
871  private static int TryTakeFromAnyCoreSlow(BlockingCollection<T>[] collections, out T item, int millisecondsTimeout, bool isTakeOperation, CancellationToken externalCancellationToken)
872  {
873  int num = millisecondsTimeout;
874  uint startTime = 0u;
875  if (millisecondsTimeout != -1)
876  {
877  startTime = (uint)Environment.TickCount;
878  }
879  while (millisecondsTimeout == -1 || num >= 0)
880  {
881  CancellationToken[] cancellationTokens;
882  List<WaitHandle> handles = GetHandles(collections, externalCancellationToken, isAddOperation: false, out cancellationTokens);
883  if (handles.Count == 0 && isTakeOperation)
884  {
885  throw new ArgumentException(SR.GetString("BlockingCollection_CantTakeAnyWhenAllDone"), "collections");
886  }
887  if (handles.Count == 0)
888  {
889  break;
890  }
891  using (CancellationTokenSource cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationTokens))
892  {
893  handles.Add(cancellationTokenSource.Token.WaitHandle);
894  int num2 = WaitHandle.WaitAny(handles.ToArray(), num, exitContext: false);
895  if (cancellationTokenSource.IsCancellationRequested && externalCancellationToken.IsCancellationRequested)
896  {
897  throw new OperationCanceledException(SR.GetString("Common_OperationCanceled"), externalCancellationToken);
898  }
899  if (!cancellationTokenSource.IsCancellationRequested)
900  {
901  if (num2 == 258)
902  {
903  break;
904  }
905  if (collections.Length != handles.Count - 1)
906  {
907  for (int i = 0; i < collections.Length; i++)
908  {
909  if (collections[i].m_occupiedNodes.AvailableWaitHandle == handles[num2])
910  {
911  num2 = i;
912  break;
913  }
914  }
915  }
916  if (collections[num2].TryTake(out item))
917  {
918  return num2;
919  }
920  }
921  }
922  if (millisecondsTimeout != -1)
923  {
924  num = UpdateTimeOut(startTime, millisecondsTimeout);
925  }
926  }
927  item = default(T);
928  return -1;
929  }
930 
933  [global::__DynamicallyInvokable]
934  public void CompleteAdding()
935  {
936  CheckDisposed();
937  if (IsAddingCompleted)
938  {
939  return;
940  }
941  SpinWait spinWait = default(SpinWait);
942  while (true)
943  {
944  int currentAdders = m_currentAdders;
945  if ((currentAdders & int.MinValue) != 0)
946  {
947  spinWait.Reset();
948  while (m_currentAdders != int.MinValue)
949  {
950  spinWait.SpinOnce();
951  }
952  return;
953  }
954  if (Interlocked.CompareExchange(ref m_currentAdders, currentAdders | int.MinValue, currentAdders) == currentAdders)
955  {
956  break;
957  }
958  spinWait.SpinOnce();
959  }
960  spinWait.Reset();
961  while (m_currentAdders != int.MinValue)
962  {
963  spinWait.SpinOnce();
964  }
965  if (Count == 0)
966  {
967  CancelWaitingConsumers();
968  }
969  CancelWaitingProducers();
970  }
971 
972  private void CancelWaitingConsumers()
973  {
974  m_ConsumersCancellationTokenSource.Cancel();
975  }
976 
977  private void CancelWaitingProducers()
978  {
979  m_ProducersCancellationTokenSource.Cancel();
980  }
981 
983  [global::__DynamicallyInvokable]
984  public void Dispose()
985  {
986  Dispose(disposing: true);
987  GC.SuppressFinalize(this);
988  }
989 
992  [global::__DynamicallyInvokable]
993  protected virtual void Dispose(bool disposing)
994  {
995  if (!m_isDisposed)
996  {
997  if (m_freeNodes != null)
998  {
999  m_freeNodes.Dispose();
1000  }
1001  m_occupiedNodes.Dispose();
1002  m_isDisposed = true;
1003  }
1004  }
1005 
1009  [global::__DynamicallyInvokable]
1010  public T[] ToArray()
1011  {
1012  CheckDisposed();
1013  return m_collection.ToArray();
1014  }
1015 
1023  [global::__DynamicallyInvokable]
1024  public void CopyTo(T[] array, int index)
1025  {
1026  ((ICollection)this).CopyTo((Array)array, index);
1027  }
1028 
1036  [global::__DynamicallyInvokable]
1037  void ICollection.CopyTo(Array array, int index)
1038  {
1039  CheckDisposed();
1040  T[] array2 = m_collection.ToArray();
1041  try
1042  {
1043  Array.Copy(array2, 0, array, index, array2.Length);
1044  }
1045  catch (ArgumentNullException)
1046  {
1047  throw new ArgumentNullException("array");
1048  }
1050  {
1051  throw new ArgumentOutOfRangeException("index", index, SR.GetString("BlockingCollection_CopyTo_NonNegative"));
1052  }
1053  catch (ArgumentException)
1054  {
1055  throw new ArgumentException(SR.GetString("BlockingCollection_CopyTo_TooManyElems"), "index");
1056  }
1057  catch (RankException)
1058  {
1059  throw new ArgumentException(SR.GetString("BlockingCollection_CopyTo_MultiDim"), "array");
1060  }
1061  catch (InvalidCastException)
1062  {
1063  throw new ArgumentException(SR.GetString("BlockingCollection_CopyTo_IncorrectType"), "array");
1064  }
1065  catch (ArrayTypeMismatchException)
1066  {
1067  throw new ArgumentException(SR.GetString("BlockingCollection_CopyTo_IncorrectType"), "array");
1068  }
1069  }
1070 
1074  [global::__DynamicallyInvokable]
1076  {
1078  }
1079 
1085  [global::__DynamicallyInvokable]
1087  {
1088  CancellationTokenSource linkedTokenSource = null;
1089  try
1090  {
1091  linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, m_ConsumersCancellationTokenSource.Token);
1092  while (!IsCompleted)
1093  {
1094  if (TryTakeWithNoTimeValidation(out T item, -1, cancellationToken, linkedTokenSource))
1095  {
1096  yield return item;
1097  }
1098  }
1099  }
1100  finally
1101  {
1102  linkedTokenSource?.Dispose();
1103  }
1104  }
1105 
1109  [global::__DynamicallyInvokable]
1111  {
1112  CheckDisposed();
1113  return m_collection.GetEnumerator();
1114  }
1115 
1119  [global::__DynamicallyInvokable]
1121  {
1122  return ((IEnumerable<T>)this).GetEnumerator();
1123  }
1124 
1125  private static void ValidateCollectionsArray(BlockingCollection<T>[] collections, bool isAddOperation)
1126  {
1127  if (collections == null)
1128  {
1129  throw new ArgumentNullException("collections");
1130  }
1131  if (collections.Length < 1)
1132  {
1133  throw new ArgumentException(SR.GetString("BlockingCollection_ValidateCollectionsArray_ZeroSize"), "collections");
1134  }
1135  if ((!IsSTAThread && collections.Length > 63) || (IsSTAThread && collections.Length > 62))
1136  {
1137  throw new ArgumentOutOfRangeException("collections", SR.GetString("BlockingCollection_ValidateCollectionsArray_LargeSize"));
1138  }
1139  int num = 0;
1140  while (true)
1141  {
1142  if (num < collections.Length)
1143  {
1144  if (collections[num] == null)
1145  {
1146  throw new ArgumentException(SR.GetString("BlockingCollection_ValidateCollectionsArray_NullElems"), "collections");
1147  }
1148  if (collections[num].m_isDisposed)
1149  {
1150  throw new ObjectDisposedException("collections", SR.GetString("BlockingCollection_ValidateCollectionsArray_DispElems"));
1151  }
1152  if (isAddOperation && collections[num].IsAddingCompleted)
1153  {
1154  break;
1155  }
1156  num++;
1157  continue;
1158  }
1159  return;
1160  }
1161  throw new ArgumentException(SR.GetString("BlockingCollection_CantAddAnyWhenCompleted"), "collections");
1162  }
1163 
1164  private static void ValidateTimeout(TimeSpan timeout)
1165  {
1166  long num = (long)timeout.TotalMilliseconds;
1167  if ((num < 0 || num > int.MaxValue) && num != -1)
1168  {
1169  throw new ArgumentOutOfRangeException("timeout", timeout, string.Format(CultureInfo.InvariantCulture, SR.GetString("BlockingCollection_TimeoutInvalid"), new object[1]
1170  {
1171  int.MaxValue
1172  }));
1173  }
1174  }
1175 
1176  private static void ValidateMillisecondsTimeout(int millisecondsTimeout)
1177  {
1178  if (millisecondsTimeout < 0 && millisecondsTimeout != -1)
1179  {
1180  throw new ArgumentOutOfRangeException("millisecondsTimeout", millisecondsTimeout, string.Format(CultureInfo.InvariantCulture, SR.GetString("BlockingCollection_TimeoutInvalid"), new object[1]
1181  {
1182  int.MaxValue
1183  }));
1184  }
1185  }
1186 
1187  private void CheckDisposed()
1188  {
1189  if (m_isDisposed)
1190  {
1191  throw new ObjectDisposedException("BlockingCollection", SR.GetString("BlockingCollection_Disposed"));
1192  }
1193  }
1194  }
1195 }
static Thread CurrentThread
Gets the currently running thread.
Definition: Thread.cs:134
static CultureInfo InvariantCulture
Gets the T:System.Globalization.CultureInfo object that is culture-independent (invariant).
Definition: CultureInfo.cs:263
Represents a lightweight alternative to T:System.Threading.Semaphore that limits the number of thread...
The exception that is thrown when a null reference (Nothing in Visual Basic) is passed to a method th...
static int WaitAny(WaitHandle[] waitHandles, int millisecondsTimeout, bool exitContext)
Waits for any of the elements in the specified array to receive a signal, using a 32-bit signed integ...
Definition: WaitHandle.cs:457
Propagates notification that operations should be canceled.
static int AddToAny(BlockingCollection< T >[] collections, T item, CancellationToken cancellationToken)
Adds the specified item to any one of the specified T:System.Collections.Concurrent....
Encapsulates operating system–specific objects that wait for exclusive access to shared resources.
Definition: WaitHandle.cs:15
static int TakeFromAny(BlockingCollection< T >[] collections, out T item)
Takes an item from any one of the specified T:System.Collections.Concurrent.BlockingCollection`1 inst...
void ThrowIfCancellationRequested()
Throws a T:System.OperationCanceledException if this token has had cancellation requested.
static int TryAddToAny(BlockingCollection< T >[] collections, T item)
Tries to add the specified item to any one of the specified T:System.Collections.Concurrent....
int Count
Gets the number of elements contained in the T:System.Collections.Generic.List`1.
Definition: List.cs:296
void CompleteAdding()
Marks the T:System.Collections.Concurrent.BlockingCollection`1 instances as not accepting any more ad...
void Reset()
Resets the spin counter.
Definition: SpinWait.cs:76
void RemoveAt(int index)
Removes the element at the specified index of the T:System.Collections.Generic.List`1.
Definition: List.cs:1375
static void SuppressFinalize(object obj)
Requests that the common language runtime not call the finalizer for the specified object.
Definition: GC.cs:308
IEnumerable< T > GetConsumingEnumerable(CancellationToken cancellationToken)
Provides a consuming T:System.Collections.Generic.IEnumerable`1 for items in the collection.
Provides support for spin-based waiting.
Definition: SpinWait.cs:8
bool TryAdd(T item, int millisecondsTimeout)
Tries to add the specified item to the T:System.Collections.Concurrent.BlockingCollection`1 within th...
static CancellationTokenSource CreateLinkedTokenSource(CancellationToken token1, CancellationToken token2)
Creates a T:System.Threading.CancellationTokenSource that will be in the canceled state when any of t...
Provides a mechanism for releasing unmanaged resources.To browse the .NET Framework source code for t...
Definition: IDisposable.cs:8
void Cancel()
Communicates a request for cancellation.
void Add(T item, CancellationToken cancellationToken)
Adds the item to the T:System.Collections.Concurrent.BlockingCollection`1.
Definition: __Canon.cs:3
static int TryTakeFromAny(BlockingCollection< T >[] collections, out T item, int millisecondsTimeout)
Tries to remove an item from any one of the specified T:System.Collections.Concurrent....
int Release()
Releases the T:System.Threading.SemaphoreSlim object once.
The exception that is thrown when the value of an argument is outside the allowable range of values a...
T [] ToArray()
Copies the items from the T:System.Collections.Concurrent.BlockingCollection`1 instance into a new ar...
double TotalMilliseconds
Gets the value of the current T:System.TimeSpan structure expressed in whole and fractional milliseco...
Definition: TimeSpan.cs:180
CancellationToken Token
Gets the T:System.Threading.CancellationToken associated with this T:System.Threading....
T Take()
Removes an item from the T:System.Collections.Concurrent.BlockingCollection`1.
void Dispose()
Releases all resources used by the current instance of the T:System.Threading.SemaphoreSlim class.
Represents a strongly-typed, read-only collection of elements.
void Add(T item)
Adds the item to the T:System.Collections.Concurrent.BlockingCollection`1.
int Count
Gets the number of items contained in the T:System.Collections.Concurrent.BlockingCollection`1.
static int TryTakeFromAny(BlockingCollection< T >[] collections, out T item, int millisecondsTimeout, CancellationToken cancellationToken)
Tries to remove an item from any one of the specified T:System.Collections.Concurrent....
bool IsCancellationRequested
Gets whether cancellation has been requested for this token.
int CurrentCount
Gets the number of remaining threads that can enter the T:System.Threading.SemaphoreSlim object.
void Dispose()
Releases all resources used by the current instance of the T:System.Collections.Concurrent....
Defines methods to manipulate generic collections.
Definition: ICollection.cs:9
BlockingCollection(int boundedCapacity)
Initializes a new instance of the T:System.Collections.Concurrent.BlockingCollection`1 class with the...
Provides blocking and bounding capabilities for thread-safe collections that implement T:System....
WaitHandle AvailableWaitHandle
Returns a T:System.Threading.WaitHandle that can be used to wait on the semaphore.
void Add(T item)
Adds an object to the end of the T:System.Collections.Generic.List`1.
Definition: List.cs:510
Exposes an enumerator, which supports a simple iteration over a non-generic collection....
Definition: IEnumerable.cs:9
SecurityAction
Specifies the security actions that can be performed using declarative security.
Provides information about, and means to manipulate, the current environment and platform....
Definition: Environment.cs:21
virtual void Dispose(bool disposing)
Releases resources used by the T:System.Collections.Concurrent.BlockingCollection`1 instance.
static int TakeFromAny(BlockingCollection< T >[] collections, out T item, CancellationToken cancellationToken)
Takes an item from any one of the specified T:System.Collections.Concurrent.BlockingCollection`1 inst...
IEnumerable< T > GetConsumingEnumerable()
Provides a consuming T:System.Collections.Generic.IEnumerator`1 for items in the collection.
ApartmentState GetApartmentState()
Returns an T:System.Threading.ApartmentState value indicating the apartment state.
Definition: Thread.cs:876
T Take(CancellationToken cancellationToken)
Removes an item from the T:System.Collections.Concurrent.BlockingCollection`1.
static int CompareExchange(ref int location1, int value, int comparand)
Compares two 32-bit signed integers for equality and, if they are equal, replaces the first value.
static int TryAddToAny(BlockingCollection< T >[] collections, T item, TimeSpan timeout)
Tries to add the specified item to any one of the specified T:System.Collections.Concurrent....
void Dispose()
Releases all resources used by the current instance of the T:System.Threading.CancellationTokenSource...
static int TryTakeFromAny(BlockingCollection< T >[] collections, out T item)
Tries to remove an item from any one of the specified T:System.Collections.Concurrent....
void Wait()
Blocks the current thread until it can enter the T:System.Threading.SemaphoreSlim.
void CopyTo(T[] array, int index)
Copies all of the items in the T:System.Collections.Concurrent.BlockingCollection`1 instance to a com...
Provides methods for creating, manipulating, searching, and sorting arrays, thereby serving as the ba...
Definition: Array.cs:17
bool IsSynchronized
Gets a value indicating whether access to the T:System.Collections.ICollection is synchronized (threa...
Definition: ICollection.cs:33
static int TickCount
Gets the number of milliseconds elapsed since the system started.
Definition: Environment.cs:306
bool TryAdd(T item)
Tries to add the specified item to the T:System.Collections.Concurrent.BlockingCollection`1.
int BoundedCapacity
Gets the bounded capacity of this T:System.Collections.Concurrent.BlockingCollection`1 instance.
static int TryTakeFromAny(BlockingCollection< T >[] collections, out T item, TimeSpan timeout)
Tries to remove an item from any one of the specified T:System.Collections.Concurrent....
bool IsCompleted
Gets whether this T:System.Collections.Concurrent.BlockingCollection`1 has been marked as complete fo...
bool IsAddingCompleted
Gets whether this T:System.Collections.Concurrent.BlockingCollection`1 has been marked as complete fo...
IEnumerator GetEnumerator()
Returns an enumerator that iterates through a collection.
static CancellationToken None
Returns an empty T:System.Threading.CancellationToken value.
static int TryAddToAny(BlockingCollection< T >[] collections, T item, int millisecondsTimeout)
Tries to add the specified item to any one of the specified T:System.Collections.Concurrent....
bool TryTake(out T item, int millisecondsTimeout)
Tries to remove an item from the T:System.Collections.Concurrent.BlockingCollection`1 in the specifie...
Defines methods to manipulate thread-safe collections intended for producer/consumer usage....
Controls the system garbage collector, a service that automatically reclaims unused memory.
Definition: GC.cs:11
The exception that is thrown when one of the arguments provided to a method is not valid.
WaitHandle WaitHandle
Gets a T:System.Threading.WaitHandle that is signaled when the token is canceled.
bool TryAdd(T item, int millisecondsTimeout, CancellationToken cancellationToken)
Tries to add the specified item to the T:System.Collections.Concurrent.BlockingCollection`1 within th...
bool TryTake(out T item)
Tries to remove an item from the T:System.Collections.Concurrent.BlockingCollection`1.
static void Copy(Array sourceArray, Array destinationArray, int length)
Copies a range of elements from an T:System.Array starting at the first element and pastes them into ...
Definition: Array.cs:1275
BlockingCollection()
Initializes a new instance of the T:System.Collections.Concurrent.BlockingCollection`1 class without ...
Represents a strongly typed list of objects that can be accessed by index. Provides methods to search...
Definition: List.cs:14
The exception that is thrown in a thread upon cancellation of an operation that the thread was execut...
void SpinOnce()
Performs a single spin.
Definition: SpinWait.cs:48
Creates or manipulates threads other than its own, which might be harmful to the host.
Represents a time interval.To browse the .NET Framework source code for this type,...
Definition: TimeSpan.cs:12
Signals to a T:System.Threading.CancellationToken that it should be canceled.
BlockingCollection(IProducerConsumerCollection< T > collection, int boundedCapacity)
Initializes a new instance of the T:System.Collections.Concurrent.BlockingCollection`1 class with the...
static int TryAddToAny(BlockingCollection< T >[] collections, T item, int millisecondsTimeout, CancellationToken cancellationToken)
Tries to add the specified item to any one of the specified T:System.Collections.Concurrent....
Represents a thread-safe first in-first out (FIFO) collection.
The exception that is thrown when a method call is invalid for the object's current state.
bool TryTake(out T item, int millisecondsTimeout, CancellationToken cancellationToken)
Tries to remove an item from the T:System.Collections.Concurrent.BlockingCollection`1 in the specifie...
bool TryAdd(T item, TimeSpan timeout)
Tries to add the specified item to the T:System.Collections.Concurrent.BlockingCollection`1.
static int Decrement(ref int location)
Decrements a specified variable and stores the result, as an atomic operation.
Definition: Interlocked.cs:40
Provides information about a specific culture (called a locale for unmanaged code development)....
Definition: CultureInfo.cs:16
The exception that is thrown when an invoked method is not supported, or when there is an attempt to ...
bool IsCancellationRequested
Gets whether cancellation has been requested for this T:System.Threading.CancellationTokenSource.
Defines size, enumerators, and synchronization methods for all nongeneric collections.
Definition: ICollection.cs:8
bool TryTake(out T item, TimeSpan timeout)
Tries to remove an item from the T:System.Collections.Concurrent.BlockingCollection`1 in the specifie...
void CopyTo(Array array, int index)
Copies the elements of the T:System.Collections.ICollection to an T:System.Array, starting at a parti...
T [] ToArray()
Copies the elements of the T:System.Collections.Generic.List`1 to a new array.
Definition: List.cs:1531
BlockingCollection(IProducerConsumerCollection< T > collection)
Initializes a new instance of the T:System.Collections.Concurrent.BlockingCollection`1 class without ...
Provides atomic operations for variables that are shared by multiple threads.
Definition: Interlocked.cs:10
static int AddToAny(BlockingCollection< T >[] collections, T item)
Adds the specified item to any one of the specified T:System.Collections.Concurrent....
Supports a simple iteration over a non-generic collection.
Definition: IEnumerator.cs:9
ApartmentState
Specifies the apartment state of a T:System.Threading.Thread.
Creates and controls a thread, sets its priority, and gets its status.
Definition: Thread.cs:18