Pipe
Pipe<T> is a concurrency object for pushing values in one thread and pulling in from another.
Pipe<int> pipe = new Pipe<int>();
Pipe.Create(elementType) creates with element type argument.
Pipe<int> pipe = (Pipe<int>)Pipe.Create(typeof(int));
Pipe implements IProducerConsumerCollection<T>.
IProducerConsumerCollection<int> pipe = new Pipe<int>();
pipe.TryAdd(1);
pipe.TryTake(out int value);
.Subscribe(IObserver<T>) adds observer that gets events for newly added values.
var pipe = new Pipe<int>();
using IDisposable handle = pipe.Subscribe(new Observer());
pipe.TryAdd(1);
public class Observer : IObserver<int>
{
public void OnCompleted() => WriteLine("Completed");
public void OnError(Exception error) => WriteLine(error);
public void OnNext(int value) => WriteLine($"OnValue: {value}");
}
.ToArray() returns a snapshot of values in queue.
var pipe = new Pipe<int>();
pipe.TryAdd(1);
pipe.TryAdd(2);
int[] array = pipe.ToArray();
IEnumerator<T> returns values as long pipe is alive. When pipe is disposed, IEnumerator iteration ends. Pipe can be monitored with foreach loop in one thread, and be fed with values from another.
Pipe<int> pipe = new Pipe<int>();
Task.Factory.StartNew(() =>
{
Parallel.For(0, 200, i =>
{
for (int ix = 0; ix < 10000; ix++)
pipe.TryAdd(i * 100000 + ix);
});
pipe.Dispose();
});
List<int> receivedCopy = new List<int>();
foreach (int ii in pipe)
{
receivedCopy.Add(ii);
}
int count = 0;
foreach (int ii in pipe)
count++;
WriteLine($"first count = {receivedCopy.Count}, second count={count}");
If pipe is disposed observers and enumerators are closed.
pipe.Dispose();
Full Example
Full example
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
using Avalanche.Utilities;
using static System.Console;
public class pipe
{
public static void Run()
{
{
// <01>
Pipe<int> pipe = new Pipe<int>();
// </01>
}
{
// <02>
Pipe<int> pipe = (Pipe<int>)Pipe.Create(typeof(int));
// </02>
}
{
// <03>
IProducerConsumerCollection<int> pipe = new Pipe<int>();
pipe.TryAdd(1);
pipe.TryTake(out int value);
// </03>
}
{
// <04>
var pipe = new Pipe<int>();
using IDisposable handle = pipe.Subscribe(new Observer());
pipe.TryAdd(1);
// </04>
}
{
// <05>
var pipe = new Pipe<int>();
pipe.TryAdd(1);
pipe.TryAdd(2);
int[] array = pipe.ToArray();
// </05>
}
{
// <06>
Pipe<int> pipe = new Pipe<int>();
Task.Factory.StartNew(() =>
{
Parallel.For(0, 200, i =>
{
for (int ix = 0; ix < 10000; ix++)
pipe.TryAdd(i * 100000 + ix);
});
pipe.Dispose();
});
List<int> receivedCopy = new List<int>();
foreach (int ii in pipe)
{
receivedCopy.Add(ii);
}
int count = 0;
foreach (int ii in pipe)
count++;
WriteLine($"first count = {receivedCopy.Count}, second count={count}");
// </06>
}
{
Pipe<int> pipe = new Pipe<int>();
// <07>
pipe.Dispose();
// </07>
}
}
// <99>
public class Observer : IObserver<int>
{
public void OnCompleted() => WriteLine("Completed");
public void OnError(Exception error) => WriteLine(error);
public void OnNext(int value) => WriteLine($"OnValue: {value}");
}
// </99>
}