10 [__DynamicallyInvokable]
11 [HostProtection(
SecurityAction.LinkDemand, Synchronization =
true, ExternalThreading =
true)]
16 [__DynamicallyInvokable]
19 [__DynamicallyInvokable]
29 [__DynamicallyInvokable]
35 [__DynamicallyInvokable]
42 [__DynamicallyInvokable]
48 [__DynamicallyInvokable]
49 [HostProtection(
SecurityAction.LinkDemand, Synchronization =
true, ExternalThreading =
true)]
50 public static class Partitioner
54 protected readonly TSourceReader m_sharedReader;
56 protected static int s_defaultMaxChunkSize = GetDefaultChunkSize<TSource>();
58 protected SharedInt m_currentChunkSize;
60 protected SharedInt m_localOffset;
62 private const int CHUNK_DOUBLING_RATE = 3;
64 private int m_doublingCountdown;
66 protected readonly
int m_maxChunkSize;
68 protected readonly SharedLong m_sharedIndex;
70 protected abstract bool HasNoElementsLeft
89 protected DynamicPartitionEnumerator_Abstract(TSourceReader sharedReader, SharedLong sharedIndex)
90 : this(sharedReader, sharedIndex, useSingleChunking: false)
94 protected DynamicPartitionEnumerator_Abstract(TSourceReader sharedReader, SharedLong sharedIndex,
bool useSingleChunking)
96 m_sharedReader = sharedReader;
97 m_sharedIndex = sharedIndex;
98 m_maxChunkSize = (useSingleChunking ? 1 : s_defaultMaxChunkSize);
101 protected abstract bool GrabNextChunk(
int requestedChunkSize);
103 public abstract void Dispose();
107 throw new NotSupportedException();
110 public bool MoveNext()
112 if (m_localOffset ==
null)
114 m_localOffset =
new SharedInt(-1);
115 m_currentChunkSize =
new SharedInt(0);
116 m_doublingCountdown = 3;
118 if (m_localOffset.Value < m_currentChunkSize.Value - 1)
120 m_localOffset.Value++;
123 int requestedChunkSize;
124 if (m_currentChunkSize.Value == 0)
126 requestedChunkSize = 1;
128 else if (m_doublingCountdown > 0)
130 requestedChunkSize = m_currentChunkSize.Value;
134 requestedChunkSize = Math.Min(m_currentChunkSize.Value * 2, m_maxChunkSize);
135 m_doublingCountdown = 3;
137 m_doublingCountdown--;
138 if (GrabNextChunk(requestedChunkSize))
140 m_localOffset.Value = 0;
147 private class DynamicPartitionerForIEnumerable<TSource> : OrderablePartitioner<TSource>
149 private class InternalPartitionEnumerable :
IEnumerable<KeyValuePair<long, TSource>>,
IEnumerable, IDisposable
153 private SharedLong m_sharedIndex;
157 private volatile int m_FillBufferSize;
159 private volatile int m_FillBufferCurrentPosition;
161 private volatile int m_activeCopiers;
163 private SharedBool m_hasNoElementsLeft;
165 private SharedBool m_sourceDepleted;
167 private object m_sharedLock;
169 private bool m_disposed;
171 private SharedInt m_activePartitionCount;
173 private readonly
bool m_useSingleChunking;
175 internal InternalPartitionEnumerable(
IEnumerator<TSource> sharedReader,
bool useSingleChunking,
bool isStaticPartitioning)
177 m_sharedReader = sharedReader;
178 m_sharedIndex =
new SharedLong(-1L);
179 m_hasNoElementsLeft =
new SharedBool(value:
false);
180 m_sourceDepleted =
new SharedBool(value:
false);
181 m_sharedLock =
new object();
182 m_useSingleChunking = useSingleChunking;
183 if (!m_useSingleChunking)
185 int num = (PlatformHelper.ProcessorCount <= 4) ? 1 : 4;
188 if (isStaticPartitioning)
190 m_activePartitionCount =
new SharedInt(0);
194 m_activePartitionCount =
null;
202 throw new ObjectDisposedException(Environment.GetResourceString(
"PartitionerStatic_CanNotCallGetEnumeratorAfterSourceHasBeenDisposed"));
204 return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex, m_hasNoElementsLeft, m_sharedLock, m_activePartitionCount,
this, m_useSingleChunking);
209 return GetEnumerator();
212 private void TryCopyFromFillBuffer(
KeyValuePair<long, TSource>[] destArray,
int requestedChunkSize, ref
int actualNumElementsGrabbed)
214 actualNumElementsGrabbed = 0;
216 if (fillBuffer !=
null && m_FillBufferCurrentPosition < m_FillBufferSize)
219 int num =
Interlocked.
Add(ref m_FillBufferCurrentPosition, requestedChunkSize);
220 int num2 = num - requestedChunkSize;
221 if (num2 < m_FillBufferSize)
223 actualNumElementsGrabbed = ((num < m_FillBufferSize) ? num : (m_FillBufferSize - num2));
224 Array.Copy(fillBuffer, num2, destArray, 0, actualNumElementsGrabbed);
232 actualNumElementsGrabbed = 0;
233 if (m_hasNoElementsLeft.Value)
237 if (m_useSingleChunking)
239 return GrabChunk_Single(destArray, requestedChunkSize, ref actualNumElementsGrabbed);
241 return GrabChunk_Buffered(destArray, requestedChunkSize, ref actualNumElementsGrabbed);
248 if (!m_hasNoElementsLeft.Value)
252 if (m_sharedReader.MoveNext())
254 m_sharedIndex.Value = checked(m_sharedIndex.Value + 1);
256 actualNumElementsGrabbed = 1;
259 m_sourceDepleted.Value =
true;
260 m_hasNoElementsLeft.Value =
true;
265 m_sourceDepleted.Value =
true;
266 m_hasNoElementsLeft.Value =
true;
274 internal bool GrabChunk_Buffered(
KeyValuePair<long, TSource>[] destArray,
int requestedChunkSize, ref
int actualNumElementsGrabbed)
276 TryCopyFromFillBuffer(destArray, requestedChunkSize, ref actualNumElementsGrabbed);
277 if (actualNumElementsGrabbed == requestedChunkSize)
281 if (m_sourceDepleted.Value)
283 m_hasNoElementsLeft.Value =
true;
285 return actualNumElementsGrabbed > 0;
289 if (m_sourceDepleted.Value)
291 return actualNumElementsGrabbed > 0;
295 if (m_activeCopiers > 0)
298 while (m_activeCopiers > 0)
303 while (actualNumElementsGrabbed < requestedChunkSize)
305 if (!m_sharedReader.MoveNext())
307 m_sourceDepleted.Value =
true;
310 m_sharedIndex.Value = checked(m_sharedIndex.Value + 1);
312 actualNumElementsGrabbed++;
315 if (!m_sourceDepleted.Value && fillBuffer !=
null && m_FillBufferCurrentPosition >= fillBuffer.Length)
317 for (
int i = 0; i < fillBuffer.Length; i++)
319 if (!m_sharedReader.MoveNext())
321 m_sourceDepleted.
Value =
true;
322 m_FillBufferSize = i;
325 m_sharedIndex.Value = checked(m_sharedIndex.Value + 1);
328 m_FillBufferCurrentPosition = 0;
333 m_sourceDepleted.Value =
true;
334 m_hasNoElementsLeft.Value =
true;
338 return actualNumElementsGrabbed > 0;
341 public void Dispose()
346 m_sharedReader.Dispose();
351 private class InternalPartitionEnumerator : DynamicPartitionEnumerator_Abstract<TSource, IEnumerator<TSource>>
355 private readonly SharedBool m_hasNoElementsLeft;
357 private readonly
object m_sharedLock;
359 private readonly SharedInt m_activePartitionCount;
361 private InternalPartitionEnumerable m_enumerable;
363 protected override bool HasNoElementsLeft
367 return m_hasNoElementsLeft.
Value;
371 m_hasNoElementsLeft.Value =
true;
379 if (m_currentChunkSize ==
null)
381 throw new InvalidOperationException(Environment.GetResourceString(
"PartitionerStatic_CurrentCalledBeforeMoveNext"));
383 return m_localList[m_localOffset.Value];
387 internal InternalPartitionEnumerator(
IEnumerator<TSource> sharedReader, SharedLong sharedIndex, SharedBool hasNoElementsLeft,
object sharedLock, SharedInt activePartitionCount, InternalPartitionEnumerable enumerable,
bool useSingleChunking)
388 : base(sharedReader, sharedIndex, useSingleChunking)
390 m_hasNoElementsLeft = hasNoElementsLeft;
391 m_sharedLock = sharedLock;
392 m_enumerable = enumerable;
393 m_activePartitionCount = activePartitionCount;
394 if (m_activePartitionCount !=
null)
400 protected override bool GrabNextChunk(
int requestedChunkSize)
402 if (HasNoElementsLeft)
406 if (m_localList ==
null)
410 return m_enumerable.GrabChunk(m_localList, requestedChunkSize, ref m_currentChunkSize.Value);
413 public override void Dispose()
415 if (m_activePartitionCount !=
null &&
Interlocked.
Decrement(ref m_activePartitionCount.Value) == 0)
417 m_enumerable.Dispose();
424 private readonly
bool m_useSingleChunking;
429 : base(keysOrderedInEachPartition: true, keysOrderedAcrossPartitions: false, keysNormalized: true)
437 if (partitionCount <= 0)
439 throw new ArgumentOutOfRangeException(
"partitionCount");
443 for (
int i = 0; i < partitionCount; i++)
452 return new InternalPartitionEnumerable(m_source.
GetEnumerator(), m_useSingleChunking, isStaticPartitioning:
false);
456 private abstract class DynamicPartitionerForIndexRange_Abstract<TSource, TCollection> : OrderablePartitioner<TSource>
458 private TCollection m_data;
462 protected DynamicPartitionerForIndexRange_Abstract(TCollection data)
463 : base(keysOrderedInEachPartition: true, keysOrderedAcrossPartitions: false, keysNormalized: true)
472 if (partitionCount <= 0)
474 throw new ArgumentOutOfRangeException(
"partitionCount");
478 for (
int i = 0; i < partitionCount; i++)
480 array[i] = orderableDynamicPartitions_Factory.
GetEnumerator();
487 return GetOrderableDynamicPartitions_Factory(m_data);
491 private abstract class DynamicPartitionEnumeratorForIndexRange_Abstract<TSource, TSourceReader> : DynamicPartitionEnumerator_Abstract<TSource, TSourceReader>
493 protected int m_startIndex;
495 protected abstract int SourceCount
500 protected override bool HasNoElementsLeft
504 return Volatile.
Read(ref m_sharedIndex.Value) >= SourceCount - 1;
511 protected DynamicPartitionEnumeratorForIndexRange_Abstract(TSourceReader sharedReader, SharedLong sharedIndex)
512 : base(sharedReader, sharedIndex)
516 protected override bool GrabNextChunk(
int requestedChunkSize)
518 while (!HasNoElementsLeft)
521 if (HasNoElementsLeft)
525 long num2 = Math.Min(SourceCount - 1, num + requestedChunkSize);
528 m_currentChunkSize.Value = (int)(num2 - num);
529 m_localOffset.Value = -1;
530 m_startIndex = (int)(num + 1);
537 public override void Dispose()
542 private class DynamicPartitionerForIList<TSource> : DynamicPartitionerForIndexRange_Abstract<TSource, IList<TSource>>
548 private SharedLong m_sharedIndex;
552 m_sharedReader = sharedReader;
553 m_sharedIndex =
new SharedLong(-1L);
558 return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex);
563 return GetEnumerator();
567 private class InternalPartitionEnumerator : DynamicPartitionEnumeratorForIndexRange_Abstract<TSource, IList<TSource>>
569 protected override int SourceCount => m_sharedReader.Count;
575 if (m_currentChunkSize ==
null)
577 throw new InvalidOperationException(Environment.GetResourceString(
"PartitionerStatic_CurrentCalledBeforeMoveNext"));
583 internal InternalPartitionEnumerator(
IList<TSource> sharedReader, SharedLong sharedIndex)
584 : base(sharedReader, sharedIndex)
596 return new InternalPartitionEnumerable(m_data);
600 private class DynamicPartitionerForArray<TSource> : DynamicPartitionerForIndexRange_Abstract<TSource, TSource[]>
604 private readonly TSource[] m_sharedReader;
606 private SharedLong m_sharedIndex;
608 internal InternalPartitionEnumerable(TSource[] sharedReader)
610 m_sharedReader = sharedReader;
611 m_sharedIndex =
new SharedLong(-1L);
616 return GetEnumerator();
621 return new InternalPartitionEnumerator(m_sharedReader, m_sharedIndex);
625 private class InternalPartitionEnumerator : DynamicPartitionEnumeratorForIndexRange_Abstract<TSource, TSource[]>
627 protected override int SourceCount => m_sharedReader.Length;
633 if (m_currentChunkSize ==
null)
635 throw new InvalidOperationException(Environment.GetResourceString(
"PartitionerStatic_CurrentCalledBeforeMoveNext"));
641 internal InternalPartitionEnumerator(TSource[] sharedReader, SharedLong sharedIndex)
642 : base(sharedReader, sharedIndex)
647 internal DynamicPartitionerForArray(TSource[] source)
654 return new InternalPartitionEnumerable(m_data);
658 private abstract class StaticIndexRangePartitioner<TSource, TCollection> : OrderablePartitioner<TSource>
660 protected abstract int SourceCount
665 protected StaticIndexRangePartitioner()
666 : base(keysOrderedInEachPartition: true, keysOrderedAcrossPartitions: true, keysNormalized: true)
674 if (partitionCount <= 0)
676 throw new ArgumentOutOfRangeException(
"partitionCount");
679 int num = Math.DivRem(SourceCount, partitionCount, out result);
682 for (
int i = 0; i < partitionCount; i++)
685 num2 = ((i >= result) ? (num3 + num - 1) : (num3 + num));
686 array[i] = CreatePartition(num3, num2);
692 private abstract class StaticIndexRangePartition<TSource> :
IEnumerator<KeyValuePair<long, TSource>>, IDisposable,
IEnumerator 694 protected readonly
int m_startIndex;
696 protected readonly
int m_endIndex;
698 protected volatile int m_offset;
713 protected StaticIndexRangePartition(
int startIndex,
int endIndex)
715 m_startIndex = startIndex;
716 m_endIndex = endIndex;
717 m_offset = startIndex - 1;
720 public void Dispose()
726 throw new NotSupportedException();
729 public bool MoveNext()
731 if (m_offset < m_endIndex)
736 m_offset = m_endIndex + 1;
741 private class StaticIndexRangePartitionerForIList<TSource> : StaticIndexRangePartitioner<TSource, IList<TSource>>
745 protected override int SourceCount => m_list.
Count;
754 return new StaticIndexRangePartitionForIList<TSource>(m_list, startIndex, endIndex);
758 private class StaticIndexRangePartitionForIList<TSource> : StaticIndexRangePartition<TSource>
766 if (m_offset < m_startIndex)
768 throw new InvalidOperationException(Environment.GetResourceString(
"PartitionerStatic_CurrentCalledBeforeMoveNext"));
774 internal StaticIndexRangePartitionForIList(
IList<TSource> list,
int startIndex,
int endIndex)
775 : base(startIndex, endIndex)
781 private class StaticIndexRangePartitionerForArray<TSource> : StaticIndexRangePartitioner<TSource, TSource[]>
783 private TSource[] m_array;
785 protected override int SourceCount => m_array.Length;
787 internal StaticIndexRangePartitionerForArray(TSource[] array)
794 return new StaticIndexRangePartitionForArray<TSource>(m_array, startIndex, endIndex);
798 private class StaticIndexRangePartitionForArray<TSource> : StaticIndexRangePartition<TSource>
800 private volatile TSource[] m_array;
806 if (m_offset < m_startIndex)
808 throw new InvalidOperationException(Environment.GetResourceString(
"PartitionerStatic_CurrentCalledBeforeMoveNext"));
814 internal StaticIndexRangePartitionForArray(TSource[] array,
int startIndex,
int endIndex)
815 : base(startIndex, endIndex)
821 private class SharedInt
823 internal volatile int Value;
825 internal SharedInt(
int value)
831 private class SharedBool
833 internal volatile bool Value;
835 internal SharedBool(
bool value)
841 private class SharedLong
845 internal SharedLong(
long value)
851 private const int DEFAULT_BYTES_PER_CHUNK = 512;
858 [__DynamicallyInvokable]
867 return new DynamicPartitionerForIList<TSource>(list);
869 return new StaticIndexRangePartitionerForIList<TSource>(list);
877 [__DynamicallyInvokable]
886 return new DynamicPartitionerForArray<TSource>(array);
888 return new StaticIndexRangePartitionerForArray<TSource>(array);
895 [__DynamicallyInvokable]
907 [__DynamicallyInvokable]
918 return new DynamicPartitionerForIEnumerable<TSource>(source, partitionerOptions);
926 [__DynamicallyInvokable]
930 if (toExclusive <= fromInclusive)
934 long num2 = (toExclusive - fromInclusive) / (PlatformHelper.ProcessorCount * num);
948 [__DynamicallyInvokable]
951 if (toExclusive <= fromInclusive)
964 bool shouldQuit =
false;
965 for (
long i = fromInclusive; i < toExclusive; i += rangeSize)
975 num = checked(i + rangeSize);
982 if (num > toExclusive)
986 yield
return new Tuple<long, long>(item, num);
995 [__DynamicallyInvokable]
999 if (toExclusive <= fromInclusive)
1003 int num2 = (toExclusive - fromInclusive) / (PlatformHelper.ProcessorCount * num);
1017 [__DynamicallyInvokable]
1020 if (toExclusive <= fromInclusive)
1033 bool shouldQuit =
false;
1034 for (
int i = fromInclusive; i < toExclusive; i += rangeSize)
1044 num = checked(i + rangeSize);
1051 if (num > toExclusive)
1055 yield
return new Tuple<int, int>(item, num);
1059 private static int GetDefaultChunkSize<TSource>()
1061 if (typeof(TSource).IsValueType)
1069 return 512 / IntPtr.Size;
The exception that is thrown when a null reference (Nothing in Visual Basic) is passed to a method th...
TValue Value
Gets the value in the key/value pair.
Provides support for spin-based waiting.
Represents a non-generic collection of objects that can be individually accessed by index.
The content of the collection was cleared.
Provides a mechanism for releasing unmanaged resources.To browse the .NET Framework source code for t...
LayoutKind
Controls the layout of an object when exported to unmanaged code.
Specifies the current position within a stream.
new IEnumerator< T > GetEnumerator()
Returns an enumerator that iterates through the collection.
The exception that is thrown when the value of an argument is outside the allowable range of values a...
static int SizeOf(object structure)
Returns the unmanaged size of an object in bytes.
Exposes the enumerator, which supports a simple iteration over a collection of a specified type....
Supports a simple iteration over a generic collection.
The exception that is thrown when an arithmetic, casting, or conversion operation in a checked contex...
Lets you control the physical layout of the data fields of a class or structure in memory.
Partitioner()
Creates a new partitioner instance.
SecurityAction
Specifies the security actions that can be performed using declarative security.
Provides information about, and means to manipulate, the current environment and platform....
LayoutKind Value
Gets the T:System.Runtime.InteropServices.LayoutKind value that specifies how the class or structure ...
Represents a particular manner of splitting an orderable data source into multiple partitions.
EnumerablePartitionerOptions
Specifies options to control the buffering behavior of a partitioner
static int CompareExchange(ref int location1, int value, int comparand)
Compares two 32-bit signed integers for equality and, if they are equal, replaces the first value.
static int Increment(ref int location)
Increments a specified variable and stores the result, as an atomic operation.
virtual IEnumerable< TSource > GetDynamicPartitions()
Creates an object that can partition the underlying collection into a variable number of partitions.
static OrderablePartitioner< Tuple< long, long > > Create(long fromInclusive, long toExclusive, long rangeSize)
Creates a partitioner that chunks the user-specified range.
Provides a collection of methods for allocating unmanaged memory, copying unmanaged memory blocks,...
Contains methods for performing volatile memory operations.
new T Current
Gets the element in the collection at the current position of the enumerator.
virtual bool SupportsDynamicPartitions
Gets whether additional partitions can be created dynamically.
static OrderablePartitioner< Tuple< int, int > > Create(int fromInclusive, int toExclusive)
Creates a partitioner that chunks the user-specified range.
static bool Read(ref bool location)
Reads the value of the specified field. On systems that require it, inserts a memory barrier that pre...
Represents a collection of objects that can be individually accessed by index.
static OrderablePartitioner< TSource > Create< TSource >(IList< TSource > list, bool loadBalance)
Creates an orderable partitioner from an T:System.Collections.Generic.IList`1 instance.
int Count
Gets the number of elements contained in the T:System.Collections.Generic.ICollection`1.
void SpinOnce()
Performs a single spin.
static OrderablePartitioner< Tuple< int, int > > Create(int fromInclusive, int toExclusive, int rangeSize)
Creates a partitioner that chunks the user-specified range.
static int Decrement(ref int location)
Decrements a specified variable and stores the result, as an atomic operation.
The exception that is thrown when an invoked method is not supported, or when there is an attempt to ...
static int Add(ref int location1, int value)
Adds two 32-bit integers and replaces the first integer with the sum, as an atomic operation.
Provides atomic operations for variables that are shared by multiple threads.
Represents a particular manner of splitting a data source into multiple partitions.
Supports a simple iteration over a non-generic collection.
static OrderablePartitioner< Tuple< long, long > > Create(long fromInclusive, long toExclusive)
Creates a partitioner that chunks the user-specified range.
abstract IList< IEnumerator< TSource > > GetPartitions(int partitionCount)
Partitions the underlying collection into the given number of partitions.