pescador.maps.buffer_stream

pescador.maps.buffer_stream(stream, buffer_size, partial=False, axis=None)

Buffer data from an stream into one data object.

This is useful when a stream produces one example at a time, and you want to collect buffer_size iterates into a single object.

Parameters:
streamstream

The stream to buffer

buffer_sizeint > 0

The number of examples to retain per batch.

partialbool, default=False

If True, yield a final partial batch on under-run.

axisint or None

If None (default), concatenate data along a new 0th axis. Otherwise, concatenation is performed along the specified axis.

This is primarily useful when combining data that already has a dimension for buffer index, e.g., when buffering buffers.

Yields:
batch

A batch of size at most buffer_size

Raises:
DataError

If the stream contains items that are not data-like.

Examples

This example shows how to concatenate several iterates into a batch:

>>> def mygen():
...     # Make items with x = number, y = parity of x
...     for i in range(100):
...         yield dict(x=np.asarray(i), y=np.asarray(i % 2))
>>> # Make a streamer and print the first few iterates
>>> S = pescador.Streamer(mygen)
>>> [_ for _ in S.iterate(5)]
[{'x': array(0), 'y': array(0)},
 {'x': array(1), 'y': array(1)},
 {'x': array(2), 'y': array(0)},
 {'x': array(3), 'y': array(1)},
 {'x': array(4), 'y': array(0)}]
>>> # Buffer the streamer
>>> buf = pescador.buffer_stream(S, 5)
>>> next(buf)
{'x': array([0, 1, 2, 3, 4]), 'y': array([0, 1, 0, 1, 0])}

If the iterates already have a batch index dimension, we can use it directly. This can be useful when the streamers already generate partial batches that you want to combine, rather than singletons.

>>> def mygen_idx():
...     # Make items with x = number, y = parity of x
...     for i in range(100):
...         yield dict(x=np.asarray([i]), y=np.asarray([i % 2]))
>>> # Make a streamer and print the first few iterates
>>> S = pescador.Streamer(mygen_idx)
>>> [_ for _ in S.iterate(5)]
[{'x': array([0]), 'y': array([0])},
 {'x': array([1]), 'y': array([1])},
 {'x': array([2]), 'y': array([0])},
 {'x': array([3]), 'y': array([1])},
 {'x': array([4]), 'y': array([0])}]
>>> # This is the wrong way to do it, since it will add another index
>>> # dimension
>>> buf_wrong = pescador.buffer_stream(S, 5)
>>> next(buf_wrong)
{'x': array([[0],
    [1],
    [2],
    [3],
    [4]]), 'y': array([[0],
    [1],
    [0],
    [1],
    [0]])}
>>> # The right way to do it, using the existing buffer index
>>> buf_right = pescador.buffer_stream(S, 5, axis=0)
>>> next(buf_right)
{'x': array([0, 1, 2, 3, 4]), 'y': array([0, 1, 0, 1, 0])}