What does a buffer do in pipelining

System.IO.Pipelines in .NET

  • 11 minutes to read

System.IO.Pipelines is a new library designed to make it easier to run high-performance I / O in .NET. This is a library for .NET Standard that is compatible with all .NET implementations.

Problems solved by System.IO.Pipelines

Apps that analyze streaming data are made up of blocks of code that have many specialized and unusual code flows. The code blocks and the special case code are complex and difficult to manage.

should enable the following:

  • High performance analysis of streaming data.
  • Reduce the complexity of your code.

The following code is typical of a TCP server receiving line-separated messages (separated by) from a client:

The code above has several problems:

  • The entire message (end of line) may not be received in a single call to.
  • The result of is ignored. returns how much data has been read.
  • The case where multiple lines are read in a single call to is not processed.
  • An array is assigned for each read operation.

To resolve the issues described above, the following changes are required:

  • Buffer the incoming data until a new row is found.

  • Parse all returned rows in the buffer.

  • It is possible that the line is larger than 1 KB (1,024 bytes). The code needs to resize the input buffer until the delimiter is found so that the entire line will fit in the buffer.

    • Changing the size of the buffer creates more buffer copies because there are longer lines in the input.
    • To save unnecessary space, compress the buffer used to read lines.
  • If necessary, use buffer pools to avoid repetitive allocation of memory.

  • The following code addresses some of these issues:

The code shown above is complex and does not address all of the problems identified. High performance networks typically involve writing very complex code to maximize performance. was designed to make writing this type of code easier.

If you'd like code comments to appear in languages ​​other than English, let us know in this GitHub issue.

Pipe

The pipe class can be used to create a pair. All data written in is available in:

Basic use of pipe

There are two loops:

  • reads out and writes in.
  • reads out and analyzes incoming lines.

No explicit buffers are allocated. All buffer management is delegated to the and implementations. By delegating buffer management, it is easier for data-processing code to focus solely on business logic.

In the first loop, the following happens:

The second loop processes the buffers written by. The buffers come from the socket. The following applies to calling:

  • It returns a ReadResult that contains two important pieces of information:

    • The read data in the form of.
    • A Boolean value that indicates whether the end of data (EOF) has been reached.

After finding the end-of-line separator (EOL) and parsing the line, the following happens:

  • The logic processes the buffer to skip over the data that has already been processed.
  • is called to report how much data has been processed and examined.

The reader and writer loops are terminated by calling. allows the underlying pipe to free the allocated memory.

Backwater and flow control

Ideally, the reading and analysis processes work together:

  • The writing thread uses data from the network and stores it in buffers.
  • The analyzing thread is responsible for creating the appropriate data structures.

The analysis usually takes more time than copying blocks of data from the network:

  • The reading thread overtakes the analyzing thread.
  • The reading thread must either slow down or allocate more memory to store the data for the analyzing thread.

For best performance, there is a balance between taking frequent breaks and allocating more disk space.

To solve the problem described above, you have two settings to control the flow of data:

  • PauseWriterThreshold: Specifies how much data should be buffered before a pause is made for calls to FlushAsync.
  • ResumeWriterThreshold: Specifies how much data the reader must examine before the calls to continue.

PipeWriter.FlushAsync:

  • Returns an incomplete when the data reaches in.
  • Completes when the value becomes less than.

Two values ​​are used to prevent a fast cycle that can occur when using only one value.

Examples

PipeScheduler

Typically, when using and for either a TaskScheduler class or the current SynchronizationContext class, the asynchronous code continues.

When performing I / O, it is important to have precise control over where the I / O is being performed. This control enables the effective use of CPU caches. Efficient caching is critical for high-performance apps like web servers. PipeScheduler provides control over where asynchronous callbacks are performed. By default:

  • The current SynchronizationContext is used.
  • If one does not exist, the thread pool is used to make callbacks.

PipeScheduler.ThreadPool is the PipeScheduler implementation that adds thread pool callbacks to the queue. is the default and usually the best choice. Pipescheduler.Inline can cause unintended consequences, e.g. deadlocks.

Pipe reset

It is often efficient to reuse the object. To reset the pipe, call PipeReaderReset when it is complete.

PipeReader

PipeReader manages the main memory on behalf of the caller. Call alwaysPipeReader.AdvanceTo after calling PipeReader.ReadAsync. This allows us to know when the caller has completed operations in memory so that they can be tracked. The element returned by is only valid until called. It is illegal to use after calling.

takes two SequencePosition arguments.

  • The first argument determines how much memory was used.
  • The second argument determines to what extent the buffer has been examined.

Marking data as processed means that the pipe can return the memory to the underlying buffer pool. Marking data as examined controls what action the next call to will take. If all data is marked as examined, this means that the next call to will not return a message until further data is written to the pipe. Any other value leads to the fact that the next call is immediately examined and returns the unexamined data. However, it is not about data that has already been processed.

Scenarios for Reading Streaming Data

There are some typical patterns when trying to read streaming data:

  • Analyze a single message in a specified data stream.
  • Analyze all messages in a specified data stream.

The following examples use the method to parse messages from. parses a single message and updates the input buffer to truncate the parsed message from the buffer. is not part of .NET; it is a user-created method that is used in the following sections.

Reading a single message

The following code reads a single message and returns it to the caller.

The preceding code:

  • Analyzes a single message.
  • Updates the processed and examined to point to the beginning of the truncated input buffer.

The two arguments are updated because the parsed message is removed from the input buffer. In general, when analyzing a single message from the buffer, the position examined should be one of the following:

  • The end of the message.
  • The end of the received buffer if no message was found.

The single message scenario has the greatest potential for error. If you get the wrong values examined Passed (examined) it could result in an out of memory exception or an infinite loop. For more information, see the General PipeReader Problems section in this article.

Reading multiple messages

The following code reads all messages from one and calls for each message.

cancellation

:

  • Supports passing a CancellationToken element.
  • Throws an OperationCanceledException if canceled while a read is pending.
  • Supports the ability to cancel the current read through PipeReader.CancelPendingRead, which avoids throwing an exception. Calling will cause the current or next call to to return a ReadResult, where is set to. This can be useful for pausing the existing read loop in a non-destructive manner and without throwing an exception.

General PipeReader problems

  • If the wrong values ​​are passed to or, data that has already been read can be read again.

  • If you pass as investigated, it can lead to the following results:

    • Held data.
    • An out of memory (OOM) exception might be thrown when no data is being processed. For example, when processing a single message from the buffer at a time.
  • If the wrong values ​​are passed to or, this can lead to an infinite loop. If has not changed, for example, causes the next call to be returned immediately before new data arrives.

  • If the wrong values ​​are passed to or, this can lead to endless buffering (possibly OOM).

  • If you use after calling, it can lead to memory corruption (use after releasing).

  • Failure to call can result in a memory leak.

  • Checking ReadResult.IsCompleted and exiting the read logic before processing the buffer results will result in data loss. The loop termination condition should be based on and. Failure to do this properly can result in an infinite loop.

Problematic code

Data loss

can return the final data segment if is set to. Failure to read this data before exiting the read loop leads to data loss.

warning

use NOT the following code. Using this example results in data loss, unresponsive apps, security issues, and should NOT copied. The following example is provided to help identify common PipeReader problems.

warning

Use the code shown above NOT. Using this example results in data loss, unresponsive apps, security issues, and should NOT copied. The example above is provided to help identify common PipeReader problems.

Infinite loop

The following logic can result in an infinite loop when there is but never a complete message in the buffer.

warning

use NOT the following code. Using this example results in data loss, unresponsive apps, security issues, and should NOT copied. The following example is provided to help identify common PipeReader problems.

warning

Use the code shown above NOT. Using this example results in data loss, unresponsive apps, security issues, and should NOT copied. The example above is provided to help identify common PipeReader problems.

Here is another snippet of code with the same problem. The code checks to see if there is a non-empty buffer before checking. Since this is done in an -statement, the result is an infinite loop if there is never a complete message in the buffer.

warning

use NOT the following code. Using this example results in data loss, unresponsive apps, security issues, and should NOT copied. The following example is provided to help identify common PipeReader problems.

warning

Use the code shown above NOT. Using this example results in data loss, unresponsive apps, security issues, and should NOT copied. The example above is provided to help identify common PipeReader problems.

App stops responding unexpectedly

Calling with at the position without conditions can cause the app to stop responding when parsing a single message. The next call to will only return if:

  • Further data will be written into the pipe.
  • And the new data had not previously been examined.

warning

use NOT the following code. Using this example results in data loss, unresponsive apps, security issues, and should NOT copied. The following example is provided to help identify common PipeReader problems.

warning

Use the code shown above NOT. Using this example results in data loss, unresponsive apps, security issues, and should NOT copied. The example above is provided to help identify common PipeReader problems.

Insufficient memory (out of memory, OOM)

The following code maintains buffering until an OutOfMemoryException occurs with the following conditions:

  • There is no maximum message size.
  • The data returned by does not constitute a complete message. For example, the message is not complete because the other side is writing a large message (e.g. a 4 GB message).

warning

use NOT the following code. Using this example results in data loss, unresponsive apps, security issues, and should NOT copied. The following example is provided to help identify common PipeReader problems.

warning

Use the code shown above NOT. Using this example results in data loss, unresponsive apps, security issues, and should NOT copied. The example above is provided to help identify common PipeReader problems.

Memory corruption

When writing utilities that read the buffer, any payload returned should be copied before calling. The following example returns the memory that was discarded by. It can be reused for the next operation (read / write access).

warning

use NOT the following code. Using this example results in data loss, unresponsive apps, security issues, and should NOT copied. The following example is provided to help identify common PipeReader problems.

warning

Use the code shown above NOT. Using this example results in data loss, unresponsive apps, security issues, and should NOT copied. The example above is provided to help identify common PipeReader problems.

PipeWriter

PipeWriter manages buffers for writing on behalf of the caller. implemented. allows access to buffers to perform writes without additional buffer copies.

The previous code:

  • Requests a buffer of at least 5 bytes from using GetMemory.
  • Writes bytes for the ASCII string to the returned element.
  • Calls Advance to indicate how many bytes have been written to the buffer.
  • Empties the element that sends the bytes to the underlying device.

The previous method of writing uses the buffers provided by. Alternatively, PipeWriter.WriteAsync does the following:

  • Copies the existing buffer into.
  • Call as needed, and then call FlushAsync.

cancellation

FlushAsync supports passing a CancellationToken element. Passing one will result in one if the token is canceled while a flush operation is pending. supports the ability to cancel the current flush process via PipeWriter.CancelPendingFlush without throwing an exception. Calling to causes the current or next call to or to return a FlushResult, where is set to. This can be useful for stopping the resulting flush in a non-destructive manner and without throwing an exception.

General PipeWriter problems

  • GetSpan and GetMemory return a buffer with at least the requested amount of memory. Are you going Not from exact buffer sizes.
  • There is no guarantee that successive calls will return the same buffer or buffer size.
  • After calling Advance, a new buffer must be requested to continue writing more data. Cannot write to the previously fetched buffer.
  • Calling or is not safe during an incomplete call.
  • Calling or while there is uncleaned data can cause memory corruption.

IDuplexPipe

IDuplexPipe is a contract for types that support both read and write operations. For example, a network connection would be represented by a.

In contrast to the element, which contains a and a class, only represents one side of a full duplex connection. This means that the information written in is read by.

Streams

When reading or writing stream data, you typically read data using a deserializer and write data using a serializer. Most of these APIs for reading and writing a data stream have an parameter. To simplify integration with these existing APIs, and expose an AsStream method. AsStream returns an implementation around or around.

Stream example

and instances can be created using the static method, provided a Stream object and, optionally, appropriate creation options are available.

StreamPipeReaderOptions can be used to control the creation of an instance by using the following parameters:

StreamPipeWriterOptions can be used to control the creation of an instance by using the following parameters:

Important

When creating and instances using methods, you must consider the lifetime of the object. If you need access to the stream after the Reader or Writer has exited, you must set the flag to in the Build Options. Otherwise the stream will be closed.

The following code shows how to create and instances using the methods over a stream.

The application uses StreamReader to read the file lorem-ipsum.txt read as a stream. FileStream is passed to the PipeReader.Create method, which instantiates an object. The console application then passes its standard output stream to PipeWriter.Create using Console.OpenStandardOutput (). In this example, cancellation is supported.