• Avalanche.Core
Search Results for

    Show / Hide Table of Contents
    • Avalanche.Accessor
      • Introduction
      • IAccessor
        • IAccessor
        • IListAccessor
        • IMapAccessor
        • IRecordAccessor
        • IContentAccessor
        • IOneOfAccessor
        • IAnyAccessor
      • .Net
        • Introduction
        • IList<T>
        • IDictionary<K,V>
        • FieldInfo
        • OneOfAttribute
        • StructLayoutAttribute
        • Class
      • Protobuf
        • Introduction
      • Articles
        • Dependency Injection
        • AccessorMessages
    • Avalanche.Binding
      • Introduction
    • Avalanche.Configuration
      • Introduction
      • Configuration Binding
      • ConfigurationExtensions
      • MemoryConfiguration
      • PrintTree
      • Saving IOptions
      • Yaml
    • Avalanche.Converter
      • Introduction
      • EnumConverter
      • Func<,>
      • HexConverter
      • PrimitiveConverter
      • StringConverter
    • Avalanche.Core
      • License
    • Avalanche.DataType
      • Introduction
      • DataType
        • IDataType
        • IListType
        • IMapType
        • IRecordType
        • IFieldType
        • IOneOfType
        • IAnyType
        • IStringType
        • IValueType
        • IIntegerType
        • IEnumerationType
        • IRealType
      • .Net
        • Introduction
        • IList<T>
        • IDictionary<K,V>
        • FieldInfo
        • Enum
        • OneOfAttribute
        • StructLayoutAttribute
        • Class
      • Protobuf
        • Introduction
      • Articles
        • DataTypeRequest
        • PrintTree
        • DataTypeMessages
    • Avalanche.Emit
      • Introduction
      • TypeBuilder
      • ConstructorBuilder
      • MethodBuilder
      • PropertyBuilder
      • FieldBuilder
      • Emit
      • Utilities
    • Avalanche.FileSystem
      • Introduction
      • Abstractions
        • IFileSystem
          • IFileSystemBrowse
          • IFileSystemCreateDirectory
          • IFileSystemDelete
          • IFileSystemFileAttribute
          • IFileSystemMount
          • IFileSystemMove
          • IFileSystemObserve
          • IFileSystemOpen
        • IEvent
        • IEntry
        • IOption
        • IToken
      • FileSystem
      • VirtualFileSystem
      • MemoryFileSystem
      • EmbeddedFileSystem
      • HttpFileSystem
      • Decoration
      • IFileProvider
      • Events
      • Utilities
        • Dispose
        • File Scanner
        • Visit Tree
        • File Operation
        • FilterEnumerable
        • PollingFilterWatchToken
    • Avalanche.Identity
      • Introduction
      • Identity
      • IdentityParts
      • IdentityInterner
      • IdentityComparer
      • Print Tree
      • IdentityAccessors
        • Introduction
        • TypeName
    • Avalanche.Localization
      • Introduction
      • Localization
      • LocalizationFile
      • LocalizationFiles
      • LocalizationFileSystem
      • LocalizationFileFormat
      • LocalizationLine
      • LocalizationLines
      • TemplateFormat
      • CultureProvider
      • FallbackCultureProvider
      • ResourceManager
      • LocalizationError
      • Microsoft.Extensions
        • Introduction
        • DependencyInjection
        • FileProvider
        • Logging
        • ITextLocalizer
        • IFileLocalizer
        • Localization
      • Asp.Net
        • Introduction
        • Supplying localization
        • Inject to pages
        • Culture Assigned
        • Minimalistic Api
        • Diagnostics
      • Pluralization
        • Introduction
        • Multiple plural parameters
        • Custom PluralRules
        • Invariant Culture
        • Unit Prefix
        • IPluralRule
        • IPluralNumber
        • IPluralRules
        • CLDRs
        • Unicode.CLDR40
        • Unicode.CLDR41
        • Unicode.CLDR42
      • Articles
        • Alphabet localization
        • Benchmarks
        • Caching
        • Class Library
        • Demo
        • Diagnostics
        • Embedded resources
        • Emplacement
        • File localization
        • Text localization
        • Printing templates
    • Avalanche.Message
      • Introduction
      • IMessage
      • IMessageProvider
      • IMessageDescription
      • IMessageDescriptions
      • MessageLevel
      • Message printing
      • Messages and Exceptions
      • Microsoft.Extensions
        • DependencyInjection
      • Articles
        • Aggregate Messages
        • Localization
        • Logging
        • Validation
    • Avalanche.Options
      • Introduction
      • OptionsExtensions
      • OptionsMonitorCast
    • Avalanche.Service
      • Introduction
      • Service
        • Introduction
        • IService
        • IServiceDisposable
        • IServiceDecoration
        • IServiceCast
        • IServiceObservable
        • IServiceContainer
        • Construction
        • Query
        • CancellationToken
        • CachePolicy
        • Scope
      • Handler
        • Introduction
        • IHandler
        • IHandlerCast
        • IHandlerDecoration
        • IHandlerWithOrder
        • CancellationToken
        • Cyclicity
        • Delegates
        • Invokable
        • ExportAttribute
        • OrderAttribute
        • PrintTree
        • Recursion
      • Query
        • Introduction
        • IQuery
        • IQueryCast
        • IQueryDecoration
      • Entry
        • Introduction
        • IEntry
        • IEntryCast
        • IEntryDecoration
        • IEntryObservable
        • IEntryVisitable
        • EntryState
      • Request
        • Introduction
        • IRequest
        • IRequestFor
        • IRequestToBeCached
        • IRequestToBeDisposed
        • RequestAttribute
        • ContextParameterAttribute
        • Print Tree
      • Dependency Injection
        • Introduction
        • Asp.Net
        • ServiceRequest<T>
        • Decorating a service
        • Handler
        • CachePolicy
        • CancellationToken
        • QueryLogger
        • IHostBuilder
      • Examples
        • NodeCount
        • Expression
        • Mapper
      • Articles
        • Benchmarks
        • Error Handling
        • ServiceMessages
    • Avalanche.StatusCode
      • Introduction
      • HResult
        • Introduction
        • HResult.Facilities
        • BasicMessages
        • RpcMessages
        • DispatchMessages
        • StorageMessages
        • ItfMessages
        • Win32Messages
        • WindowsMessages
        • SspiMessages
        • CertMessages
        • MediaServerMessages
        • SetupApiMessages
        • ScardMessages
        • ComPlusMessages
        • ClrMessages
        • UserModeFilterManagerMessages
        • GraphicsMessages
        • TpmServicesMessages
        • TpmSoftwareMessages
        • PlaMessages
        • FveMessages
        • FwpMessages
        • NdisMessages
        • DltMessages
      • System
        • Introduction
        • AccessControlMessages
        • AggregateMessages
        • AppDomainMessages
        • ArgumentMessages
        • ArgumentNullMessages
        • ArgumentOutOfRangeMessages
        • ArithmeticMessages
        • ArrayMessages
        • AssemblyMessages
        • BadImageFormatMessages
        • CodeContractMessages
        • CodePageMessages
        • CollectionsMessages
        • CompilerServiceMessages
        • CryptographyMessages
        • CultureMessages
        • DiagnosticsMessages
        • EventSourceMessages
        • ExecutionEngineMessages
        • FormatMessages
        • HostProtectionMessages
        • IOMessages
        • IndexOutOfRangeMessages
        • InteropServiceMessages
        • InvalidCastMessages
        • InvalidOperationMessages
        • IsolatedStorageMessages
        • LazyMessages
        • MarshalerMessages
        • MemoryMessages
        • MiscellaneousMessages
        • NotImplementedMessages
        • NotSupportedMessages
        • ObjectDisposedMessages
        • OperationCanceledMessages
        • OverflowMessages
        • PlatformMessages
        • PolicyMessages
        • PrincipalMessages
        • ProgramMessages
        • ReferenceMessages
        • ReflectionMessages
        • RegionMessages
        • RemotingMessages
        • ResourcesMessages
        • SecurityMessages
        • SerializationMessages
        • StackMessages
        • TaskMessages
        • TextMessages
        • ThreadingMessages
        • TimeZoneMessages
        • TypeMessages
        • XmlMessages
      • HttpStatusCode
      • OpcUaStatusCode
    • Avalanche.Template
      • Introduction
      • TemplateFormats
      • ITemplatePrintable
      • ITemplateFormatPrintable
      • ITemplateText
      • ITemplateBreakdown
      • ITemplateFormat
      • ITemplateFormats
      • Extract Arguments
      • Emplacement
    • Avalanche.Tokenizer
      • Introduction
      • IToken
      • ITokenizer
      • Tokenizers
    • Avalanche.Utilities
      • Introduction
      • Collections
        • Tuples
        • StructList
        • ArrayList
        • BijectionMap
        • LocakableDictionary
        • LockableList
        • MapList
        • Pipe
        • RingQueue
        • EnumerableExtensions
        • TupleUtilities
        • ArrayUtilities
      • Comparers
        • IGraphComparer
        • IGraphComparable
        • AlphaNumericComparer
        • EnumerableComparer
        • EnumerableGraphComparer
        • ReferenceComparer
        • KeyValuePairComparer
        • DefaultComparerProvider
        • RecordComparer
      • Cloners
        • ICloner
        • IGraphCloner
        • IGraphCloneable
        • ListCloner
        • DictionaryCloner
        • FieldCloner
        • PassthroughCloner
        • RecordCloner
        • ClonerProvider
      • Dispose
        • IDisposeAttachable
        • IDisposeBelatable
      • Provider
        • Introduction
        • ProviderBase
        • Delegate
        • Pipe
        • Cache
        • ResultCapture
        • AsReadOnly
        • AsService
        • IProviderEvent
      • Record
        • IRecordDescription
        • IFieldDescription
        • IConstructorDescription
        • IConstructionDescription
        • IParameterDescription
        • IRecordProviders
        • RecordDelegates
          • RecordCreate
          • RecordClone
          • RecordCopy
          • IRecordDelegates
        • FieldDelegates
          • FieldRead
          • FieldWrite
          • RecreateWith
          • IFieldDelegates
      • Reflection
        • EnumDescription
      • String
        • IEscaper
        • UnicodeString
        • Hex
      • Miscellaneous
        • IIdGenerator
        • Permutation
        • IReadOnly
        • IUserDataContainer
        • ITreeNode
        • Void
    • Avalanche.Writer
      • Introduction
      • ConstantWriter
      • Context
      • ConvertWriter
      • DefaultConstructor
      • DelegateWriter
      • PassthroughWriter
      • Referer
      • TypeCast
      • Writer
      • WriterPipe
      • WriterMessages

    Pipe

    Pipe<T> is a concurrency object for pushing values in one thread and pulling in from another.

    Pipe<int> pipe = new Pipe<int>();
    

    Pipe.Create(elementType) creates with element type argument.

    Pipe<int> pipe = (Pipe<int>)Pipe.Create(typeof(int));
    

    Pipe implements IProducerConsumerCollection<T>.

    IProducerConsumerCollection<int> pipe = new Pipe<int>();
    pipe.TryAdd(1);
    pipe.TryTake(out int value);
    

    .Subscribe(IObserver<T>) adds observer that gets events for newly added values.

    var pipe = new Pipe<int>();
    using IDisposable handle = pipe.Subscribe(new Observer());
    pipe.TryAdd(1);
    
    public class Observer : IObserver<int>
    {
        public void OnCompleted() => WriteLine("Completed");
        public void OnError(Exception error) => WriteLine(error);
        public void OnNext(int value) => WriteLine($"OnValue: {value}");
    }
    

    .ToArray() returns a snapshot of values in queue.

    var pipe = new Pipe<int>();
    pipe.TryAdd(1);
    pipe.TryAdd(2);
    int[] array = pipe.ToArray();
    

    IEnumerator<T> returns values as long pipe is alive. When pipe is disposed, IEnumerator iteration ends. Pipe can be monitored with foreach loop in one thread, and be fed with values from another.

    Pipe<int> pipe = new Pipe<int>();
    
    Task.Factory.StartNew(() =>
    {
        Parallel.For(0, 200, i =>
        {
            for (int ix = 0; ix < 10000; ix++)
                pipe.TryAdd(i * 100000 + ix);
        });
        pipe.Dispose();
    });
    
    List<int> receivedCopy = new List<int>();
    foreach (int ii in pipe)
    {
        receivedCopy.Add(ii);
    }
    
    int count = 0;
    foreach (int ii in pipe)
        count++;
    
    WriteLine($"first count = {receivedCopy.Count}, second count={count}");
    

    If pipe is disposed observers and enumerators are closed.

    pipe.Dispose();
    

    Full Example

    Full example
    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Threading.Tasks;
    using Avalanche.Utilities;
    using static System.Console;
    
    public class pipe
    {
        public static void Run()
        {
            {
                // <01>
                Pipe<int> pipe = new Pipe<int>();
                // </01>
            }
            {
                // <02>
                Pipe<int> pipe = (Pipe<int>)Pipe.Create(typeof(int));
                // </02>
            }
            {
                // <03>
                IProducerConsumerCollection<int> pipe = new Pipe<int>();
                pipe.TryAdd(1);
                pipe.TryTake(out int value);
                // </03>
            }
            {
                // <04>
                var pipe = new Pipe<int>();
                using IDisposable handle = pipe.Subscribe(new Observer());
                pipe.TryAdd(1);
                // </04>
            }
            {
                // <05>
                var pipe = new Pipe<int>();
                pipe.TryAdd(1);
                pipe.TryAdd(2);
                int[] array = pipe.ToArray();
                // </05>
            }
    
            {
                // <06>
                Pipe<int> pipe = new Pipe<int>();
    
                Task.Factory.StartNew(() =>
                {
                    Parallel.For(0, 200, i =>
                    {
                        for (int ix = 0; ix < 10000; ix++)
                            pipe.TryAdd(i * 100000 + ix);
                    });
                    pipe.Dispose();
                });
    
                List<int> receivedCopy = new List<int>();
                foreach (int ii in pipe)
                {
                    receivedCopy.Add(ii);
                }
    
                int count = 0;
                foreach (int ii in pipe)
                    count++;
    
                WriteLine($"first count = {receivedCopy.Count}, second count={count}");
                // </06>
            }
            {
                Pipe<int> pipe = new Pipe<int>();
                // <07>
                pipe.Dispose();
                // </07>
            }
        }
    
        // <99>
        public class Observer : IObserver<int>
        {
            public void OnCompleted() => WriteLine("Completed");
            public void OnError(Exception error) => WriteLine(error);
            public void OnNext(int value) => WriteLine($"OnValue: {value}");
        }
        // </99>
    }
    
    
    In This Article
    Back to top Copyright © Toni Kalajainen