API Reference

Streamer

class pescador.Streamer(streamer, *args, **kwargs)

A wrapper class for reusable generators.

Wrapping generators/iterators within an object provides two useful features:

  1. Streamer objects can be serialized (as long as the generator can be)
  2. Streamer objects can instantiate a generator multiple times.

The first feature is important for parallelization (see zmq_stream), while the second feature facilitates infinite streaming from finite data (i.e., oversampling).

Examples

Generate batches of random 3-dimensional vectors

>>> def my_generator(n):
...     for i in range(n):
...         yield {'X': np.random.randn(1, 3)}
>>> GS = Streamer(my_generator, 5)
>>> for i in GS():
...     print(i)

Or with a maximum number of items

>>> for i in GS(max_items=3):
...     print(i)

Or infinitely many examples, restarting the generator as needed

>>> for i in GS.cycle():
...     print(i)

An alternate interface for the same:

>>> for i in GS(cycle=True):
...     print(i)

Attributes

generator (iterable or Streamer) A generator function or iterable collection to draw from. May be another instance or subclass of Streamer.
args (list)
kwargs (dict) If generator is a function, then args and kwargs provide the parameters to the function.
__call__(max_batches=None, cycle=False)
Parameters:

max_batches : None or int > 0

Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.

cycle: bool

If True, cycle indefinitely.

Yields:

batch : dict

Items from the contained generator If max_batches is an integer, then at most max_batches are generated.

See also

generate, cycle, tuples

activate()

Activates the stream.

active

Returns true if the stream is active (ie a StopIteration) has not been thrown.

cycle()

Generates from the streamer infinitely.

This function will force an infinite stream, restarting the generator even if a StopIteration is raised.

Yields:

batch

Items from the contained generator.

generate(max_batches=None)

Instantiate the generator

Parameters:

max_batches : None or int > 0

Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.

Yields:

batch : dict

Items from the contained generator If max_batches is an integer, then at most max_batches are generated.

tuples(*items, **kwargs)

Generate data in tuple-form instead of dicts.

This is useful for interfacing with Keras’s generator system, which requires iterates to be provided as tuples.

Parameters:

*items

One or more dictionary keys. The generated tuples will correspond to (batch[item1], batch[item2], ..., batch[itemk]) where batch is a single iterate produced by the streamer.

cycle : bool

If True, then data is generated infinitely using the cycle method. Otherwise, data is generated according to the generate method.

max_batches : None or int > 0

Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.

Yields:

batch : tuple

Items from the contained generator If max_batches is an integer, then at most max_batches are generated.

See also

generate, cycle

Buffered Streamers

class pescador.BufferedStreamer(streamer, buffer_size, strict_batch_size=True)

Buffers a stream into batches of examples

Examples

>>> def my_generator(n):
...     # Generates a single 30-dimensional example vector for each iterate
...     for i in range(n):
...         yield dict(X=np.random.randn(1, 30))
>>> # Wrap the generator in a Streamer
>>> S = pescador.Streamer(my_generator, 128)
>>> # A buffered streamer will combine N iterates into a single batch
>>> N = 10
>>> B = pescador.BufferedStreamer(my_generator, N)
>>> for batch in B():
...     # Work on a batch of N=10 examples
...     MY_PROCESS_FUNCTION(batch)
__call__(max_batches=None, cycle=False)
Parameters:

max_batches : None or int > 0

Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.

cycle: bool

If True, cycle indefinitely.

Yields:

batch : dict

Items from the contained generator If max_batches is an integer, then at most max_batches are generated.

See also

generate, cycle, tuples

activate()

Activates the stream.

active

Returns true if the stream is active (ie a StopIteration) has not been thrown.

cycle()

Generates from the streamer infinitely.

This function will force an infinite stream, restarting the generator even if a StopIteration is raised.

Yields:

batch

Items from the contained generator.

generate(max_batches=None)

Generate samples from the streamer.

Parameters:

max_batches : int

For the BufferedStreamer, max_batches is the number of buffered batches that are generated, not the number of individual samples.

tuples(*items, **kwargs)

Generate data in tuple-form instead of dicts.

This is useful for interfacing with Keras’s generator system, which requires iterates to be provided as tuples.

Parameters:

*items

One or more dictionary keys. The generated tuples will correspond to (batch[item1], batch[item2], ..., batch[itemk]) where batch is a single iterate produced by the streamer.

cycle : bool

If True, then data is generated infinitely using the cycle method. Otherwise, data is generated according to the generate method.

max_batches : None or int > 0

Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.

Yields:

batch : tuple

Items from the contained generator If max_batches is an integer, then at most max_batches are generated.

See also

generate, cycle

Multiplexing

class pescador.Mux(seed_pool, k, lam=256.0, pool_weights=None, with_replacement=True, prune_empty_seeds=True, revive=False, random_state=None)

Stochastic multiplexor for Streamers

Examples

>>> # Create a collection of streamers
>>> seeds = [pescador.Streamer(my_generator) for i in range(10)]
>>> # Multiplex them together into a single streamer
>>> # Use at most 3 streams at once
>>> mux = pescador.Mux(seeds, k=3)
>>> for batch in mux():
...     MY_FUNCTION(batch)
__call__(max_batches=None, cycle=False)
Parameters:

max_batches : None or int > 0

Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.

cycle: bool

If True, cycle indefinitely.

Yields:

batch : dict

Items from the contained generator If max_batches is an integer, then at most max_batches are generated.

See also

generate, cycle, tuples

activate()

Activates the seed pool

active

Returns true if the stream is active (ie a StopIteration) has not been thrown.

cycle()

Generates from the streamer infinitely.

This function will force an infinite stream, restarting the generator even if a StopIteration is raised.

Yields:

batch

Items from the contained generator.

tuples(*items, **kwargs)

Generate data in tuple-form instead of dicts.

This is useful for interfacing with Keras’s generator system, which requires iterates to be provided as tuples.

Parameters:

*items

One or more dictionary keys. The generated tuples will correspond to (batch[item1], batch[item2], ..., batch[itemk]) where batch is a single iterate produced by the streamer.

cycle : bool

If True, then data is generated infinitely using the cycle method. Otherwise, data is generated according to the generate method.

max_batches : None or int > 0

Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.

Yields:

batch : tuple

Items from the contained generator If max_batches is an integer, then at most max_batches are generated.

See also

generate, cycle

Parallel streaming

class pescador.ZMQStreamer(streamer, min_port=49152, max_port=65535, max_tries=100, copy=False, timeout=None)

Parallel data streaming over zeromq sockets.

This allows a data generator to run in a separate process from the consumer.

A typical usage pattern is to construct a Streamer object from a generator and then use ZMQStreamer to execute the stream in one process while the other process consumes data.

Examples

>>> # Construct a streamer object
>>> S = pescador.Streamer(my_generator)
>>> # Wrap the streamer in a ZMQ streamer
>>> Z = pescador.ZMQStreamer(S)
>>> # Process as normal
>>> for batch in Z():
...     MY_FUNCTION(batch)
__call__(max_batches=None, cycle=False)
Parameters:

max_batches : None or int > 0

Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.

cycle: bool

If True, cycle indefinitely.

Yields:

batch : dict

Items from the contained generator If max_batches is an integer, then at most max_batches are generated.

See also

generate, cycle, tuples

activate()

Activates the stream.

active

Returns true if the stream is active (ie a StopIteration) has not been thrown.

cycle()

Generates from the streamer infinitely.

This function will force an infinite stream, restarting the generator even if a StopIteration is raised.

Yields:

batch

Items from the contained generator.

generate(max_batches=None)

Note: A ZMQStreamer does not activate it’s stream, but allows the zmq_worker to do that.

Yields:

batch

Data drawn from streamer(max_batches).

tuples(*items, **kwargs)

Generate data in tuple-form instead of dicts.

This is useful for interfacing with Keras’s generator system, which requires iterates to be provided as tuples.

Parameters:

*items

One or more dictionary keys. The generated tuples will correspond to (batch[item1], batch[item2], ..., batch[itemk]) where batch is a single iterate produced by the streamer.

cycle : bool

If True, then data is generated infinitely using the cycle method. Otherwise, data is generated according to the generate method.

max_batches : None or int > 0

Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.

Yields:

batch : tuple

Items from the contained generator If max_batches is an integer, then at most max_batches are generated.

See also

generate, cycle