API Reference

Streamer

class pescador.Streamer(streamer, *args, **kwargs)

A wrapper class for recycling iterables and generator functions, i.e. streamers.

Wrapping streamers within an object provides two useful features:

  1. Streamer objects can be serialized (as long as its streamer can be)
  2. Streamer objects can instantiate a generator multiple times.

The first feature is important for parallelization (see zmq_stream), while the second feature facilitates infinite streaming from finite data (i.e., oversampling).

Examples

Generate random 3-dimensional vectors

>>> def my_generator(n):
...     for i in range(n):
...         yield i
>>> stream = Streamer(my_generator, 5)
>>> for i in stream:
...     print(i)  # Displays 0, 1, 2, 3, 4

Or with a maximum number of items

>>> for i in stream(max_items=3):
...     print(i)  # Displays 0, 1, 2

Or infinitely many examples, restarting the generator as needed

>>> for i in stream.cycle():
...     print(i)  # Displays 0, 1, 2, 3, 4, 0, 1, 2, ...

Or finitely many examples, restarting the generator as needed

>>> for i in stream.cycle(max_iter=7):
...     print(i)  # Displays 0, 1, 2, 3, 4, 0, 1

An alternate interface for the same:

>>> for i in stream(cycle=True):
...     print(i)  # Displays 0, 1, 2, 3, 4, 0, 1, 2, ...
Attributes:
streamer : generator or iterable

Any generator function or iterable python object.

args : list
kwargs : dict

Parameters provided to streamer, if callable.

__call__(self, max_iter=None, cycle=False)

Convenience interface for interacting with the Streamer.

Parameters:
max_iter : None or int > 0

Maximum number of iterations to yield. If None, attempt to exhaust the stream. For finite streams, call iterate again, or use cycle=True to force an infinite stream.

cycle: bool

If True, cycle indefinitely.

Yields:
obj : Objects yielded by the generator provided on init.

See also

iterate
cycle
active

Returns true if the stream is active (ie there are still open / existing streams)

cycle(self, max_iter=None)

Iterate from the streamer infinitely.

This function will force an infinite stream, restarting the streamer even if a StopIteration is raised.

Parameters:
max_iter : None or int > 0

Maximum number of iterations to yield. If None, iterate indefinitely.

Yields:
obj : Objects yielded by the streamer provided on init.
is_activated_copy

is_active is true if this object is a copy of the original Streamer and has been activated.

iterate(self, max_iter=None)

Instantiate an iterator.

Parameters:
max_iter : None or int > 0

Maximum number of iterations to yield. If None, exhaust the stream.

Yields:
obj : Objects yielded by the streamer provided on init.

See also

cycle
force an infinite stream.

Parallel streaming

class pescador.ZMQStreamer(streamer, min_port=49152, max_port=65535, max_tries=100, copy=False, timeout=5)

Parallel data streaming over zeromq sockets.

This allows a data generator to run in a separate process from the consumer.

A typical usage pattern is to construct a Streamer object from a generator and then use ZMQStreamer to execute the stream in one process while the other process consumes data.

Examples

>>> # Construct a streamer object
>>> S = pescador.Streamer(my_generator)
>>> # Wrap the streamer in a ZMQ streamer
>>> Z = pescador.ZMQStreamer(S)
>>> # Process as normal
>>> for data in Z:
...     MY_FUNCTION(data)
__call__(self, max_iter=None, cycle=False)

Convenience interface for interacting with the Streamer.

Parameters:
max_iter : None or int > 0

Maximum number of iterations to yield. If None, attempt to exhaust the stream. For finite streams, call iterate again, or use cycle=True to force an infinite stream.

cycle: bool

If True, cycle indefinitely.

Yields:
obj : Objects yielded by the generator provided on init.

See also

iterate
cycle
active

Returns true if the stream is active (ie there are still open / existing streams)

cycle(self, max_iter=None)

Iterate from the streamer infinitely.

This function will force an infinite stream, restarting the streamer even if a StopIteration is raised.

Parameters:
max_iter : None or int > 0

Maximum number of iterations to yield. If None, iterate indefinitely.

Yields:
obj : Objects yielded by the streamer provided on init.
is_activated_copy

is_active is true if this object is a copy of the original Streamer and has been activated.

iterate(self, max_iter=None)

Note: A ZMQStreamer does not activate its stream, but allows the zmq_worker to do that.

Yields:
data : dict

Data drawn from streamer(max_iter).

Multiplexing

Defines the interface and several varieties of mux. A mux is a Streamer which wraps N other Streamer objects, and at every step yields a sample from one of its sub-streamers.

This module defines the following Mux types:

StochasticMux

A Mux which chooses its active streams stochastically, and chooses samples from the active streams stochastically. StochasticMux is equivalent to the pescador.Mux from versions <2.0.

StochasticMux has a mode parameter which selects how it operates, with the following modes:

with_replacement

Sample streamers with replacement. This allows a single stream to be used multiple times simultaneously.

exhaustive

Each streamer is consumed at most once and never re-activated.

single_active

Each stream in the candidate pool is either active or not. Streams are revived when they are exhausted. This setting makes it so that streams in the active pool are uniquely selected from the candidate pool, where as with_replacement allows the same stream to be used more than once.
ShuffledMux
A ShuffledMux interleaves samples from all given streamers.
RoundRobinMux
Iterates over all the streamers in strict order.
ChainMux
As in itertools.chain(), runs the first streamer to exhaustion, then the second, then the third, etc. Uses only a single stream at a time.
StochasticMux(streamers, n_active, rate[, …]) Stochastic Mux
ShuffledMux(streamers[, weights, random_state]) A variation on a mux, which takes N streamers, and samples from them equally, guaranteeing all N streamers to be “active”.
RoundRobinMux(streamers[, mode, random_state]) A Mux which iterates over all streamers in strict order.
ChainMux(streamers[, mode, random_state]) As in itertools.chain().

Maps (Transformers)

Map functions perform operations on a stream.

Important note: map functions return a generator, not another Streamer, so if you need it to behave like a Streamer, you have to wrap the function in a Streamer again.

buffer_stream(stream, buffer_size[, …]) Buffer data from an stream into one data object.
tuples(stream, \*keys) Reformat data as tuples.
keras_tuples(stream[, inputs, outputs]) Reformat data objects as keras-compatible tuples.
cache(stream, n_cache[, prob, random_state]) Stochastic stream caching.