Buffered Streaming

In a machine learning setting, it is common to train a model with multiple input datapoints simultaneously, in what are commonly referred to as “minibatches”. To achieve this, pescador provides the pescador.maps.buffer_stream map transformer, which will “buffer” a data stream into fixed batch sizes.

Following up on the first example, we use the noisy_samples generator.

 1import pescador
 2
 3# Create an initial streamer
 4streamer = pescador.Streamer(noisy_samples, X[train], Y[train])
 5
 6minibatch_size = 128
 7# Wrap your streamer
 8buffered_sample_gen = pescador.maps.buffer_stream(streamer, minibatch_size)
 9
10# Generate batches in exactly the same way as you would from the base streamer
11for batch in buffered_sample_gen:
12    ...

A few important points to note about using pescador.maps.buffer_stream:

  • pescador.maps.buffer_stream will concatenate your arrays, adding a new sample dimension such that the first dimension contains the number of batches (minibatch_size in the above example). e.g. if your samples are shaped (4, 5), a batch size of 10 will produce arrays shaped (10, 4, 5)

  • Each key in the batches generated will be concatenated (across all the samples buffered).

  • pescador.maps.buffer_stream, like all pescador.maps transformers, returns a generator, not a Streamer. So, if you still want it to behave like a streamer, you have to wrap it in a streamer. Following up on the previous example:

 1batch_streamer = pescador.Streamer(buffered_sample_gen)
 2
 3# Generate batches as a streamer:
 4for batch in batch_streamer:
 5    # batch['X'].shape == (minibatch_size, ...)
 6    # batch['Y'].shape == (minibatch_size, 1)
 7    ...
 8
 9
10# Or, another way:
11batch_streamer = pescador.Streamer(pescador.buffer_stream, streamer, minibatch_size)