Warm tip: This article is reproduced from stackoverflow.com, please click
c# parallel-processing plinq

Equivalent of do() while{} in Parallel

发布于 2020-03-27 10:26:05

How can I create the parallel equivalent of a do-while or similar in the Update() method below?

Another thread in the app writes to TestBuffer at random. TestBuffer.RemoveItemAndDoSomethingWithIt(); should run until the TestBuffer is empty. Currently Update() only runs with the items that were in the collection when it was was enumerated, which makes sense.

internal class UnOrderedBuffer<T> where T : class
{
    ConcurrentBag<T> GenericBag = new ConcurrentBag<T>();
}

internal class Tester
{
    private UnOrderedBuffer<Data> TestBuffer;

    public void Update()
    {
        Parallel.ForEach(TestBuffer, Item =>
        {
            TestBuffer.RemoveItemAndDoSomethingWithIt();
        });
    }
}
Questioner
Canacourse
Viewed
50
1,064 2020-03-19 01:23

You could force a single execution by 'prepending' a null/default value:

static IEnumerable<T> YieldOneDefault<T>(this IEnumerable<T> values)
{
    yield return default(T);
    foreach(var item in values)
        yield return item;
}

And then use it as follows:

Parallel.ForEach(TestBuffer.YieldOneDefault(), Item =>  
{  
    if(Item != null)
      TestBuffer.RemoveItemAndDoSomethingWithIt();
    else
      DoSomethingDuringTheFirstPass();
});  

Although I suspect you might be looking for the following extension methods:

public static IEnumerable<IEnumerable<T>> GetParrallelConsumingEnumerable<T>(this IProducerConsumerCollection<T> collection)
{
    T item;
    while (collection.TryTake(out item))
    {
        yield return GetParrallelConsumingEnumerableInner(collection, item);
    }
}

private static IEnumerable<T> GetParrallelConsumingEnumerableInner<T>(IProducerConsumerCollection<T> collection, T item)
{
    yield return item;
    while (collection.TryTake(out item))
    {
        yield return item;
    }
}

Which will let get you this result (which I think is what you are after):

Parallel.ForEach(TestBuffer.GetParrallelConsumingEnumerable(), Items =>       
{
    foreach(var item in Items)
    {
       DoSomethingWithItem(item);
    }
});