API Reference

Streamer

class pescador.Streamer(streamer, *args, **kwargs)

A wrapper class for recycling iterables and generator functions, i.e. streamers.

Wrapping streamers within an object provides two useful features:

  1. Streamer objects can be serialized (as long as its streamer can be)

  2. Streamer objects can instantiate a generator multiple times.

The first feature is important for parallelization (see zmq_stream), while the second feature facilitates infinite streaming from finite data (i.e., oversampling).

Examples

Generate random 3-dimensional vectors

>>> def my_generator(n):
...     for i in range(n):
...         yield i
>>> stream = Streamer(my_generator, 5)
>>> for i in stream:
...     print(i)  # Displays 0, 1, 2, 3, 4

Or with a maximum number of examples

>>> for i in stream(max_iter=3):
...     print(i)  # Displays 0, 1, 2

Or infinitely many examples, restarting the generator as needed

>>> for i in stream.cycle():
...     print(i)  # Displays 0, 1, 2, 3, 4, 0, 1, 2, ...

Or finitely many examples, restarting the generator as needed

>>> for i in stream.cycle(max_iter=7):
...     print(i)  # Displays 0, 1, 2, 3, 4, 0, 1

An alternate interface for the same:

>>> for i in stream(cycle=True):
...     print(i)  # Displays 0, 1, 2, 3, 4, 0, 1, 2, ...
Attributes:
streamergenerator or iterable

Any generator function or iterable python object.

argslist
kwargsdict

Parameters provided to streamer, if callable.

__call__(max_iter=None, cycle=False)

Allow streamers to act like callables

Parameters:
max_iterNone or int > 0

Maximum number of iterations to yield. If None, attempt to exhaust the stream. For finite streams, call iterate again, or use cycle=True to force an infinite stream.

cyclebool

If True, cycle indefinitely.

Yields:
objObjects yielded by the generator provided on init.

See also

iterate
cycle
property active

Return true if the stream is active (ie there are still open / existing streams)

cycle(max_iter=None)

Iterate from the streamer infinitely.

This function will force an infinite stream, restarting the streamer even if a StopIteration is raised.

Parameters:
max_iterNone or int > 0

Maximum number of iterations to yield. If None, iterate indefinitely.

Yields:
objObjects yielded by the streamer provided on init.
property is_activated_copy

is_active is true if this object is a copy of the original Streamer and has been activated.

iterate(max_iter=None)

Instantiate an iterator.

Parameters:
max_iterNone or int > 0

Maximum number of iterations to yield. If None, exhaust the stream.

Yields:
objObjects yielded by the streamer provided on init.

See also

cycle

force an infinite stream.

Parallel streaming

class pescador.ZMQStreamer(streamer, min_port=49152, max_port=65535, max_tries=100, copy=False, timeout=5)

Parallel data streaming over zeromq sockets.

This allows a data generator to run in a separate process from the consumer.

A typical usage pattern is to construct a Streamer object from a generator and then use ZMQStreamer to execute the stream in one process while the other process consumes data.

Examples

>>> # Construct a streamer object
>>> S = pescador.Streamer(my_generator)
>>> # Wrap the streamer in a ZMQ streamer
>>> Z = pescador.ZMQStreamer(S)
>>> # Process as normal
>>> for data in Z:
...     MY_FUNCTION(data)
__call__(max_iter=None, cycle=False)

Allow streamers to act like callables

Parameters:
max_iterNone or int > 0

Maximum number of iterations to yield. If None, attempt to exhaust the stream. For finite streams, call iterate again, or use cycle=True to force an infinite stream.

cyclebool

If True, cycle indefinitely.

Yields:
objObjects yielded by the generator provided on init.

See also

iterate
cycle
property active

Return true if the stream is active (ie there are still open / existing streams)

cycle(max_iter=None)

Iterate from the streamer infinitely.

This function will force an infinite stream, restarting the streamer even if a StopIteration is raised.

Parameters:
max_iterNone or int > 0

Maximum number of iterations to yield. If None, iterate indefinitely.

Yields:
objObjects yielded by the streamer provided on init.
property is_activated_copy

is_active is true if this object is a copy of the original Streamer and has been activated.

iterate(max_iter=None)

Note: A ZMQStreamer does not activate its stream, but allows the zmq_worker to do that.

Yields:
datadict

Data drawn from streamer(max_iter).

Multiplexing

Defines the interface and several varieties of mux. A mux is a Streamer which wraps N other Streamer objects, and at every step yields a sample from one of its sub-streamers.

This module defines the following Mux types:

StochasticMux

A Mux which chooses its active streams stochastically, and chooses samples from the active streams stochastically. StochasticMux is equivalent to the pescador.Mux from versions <2.0.

StochasticMux has a mode parameter which selects how it operates, with the following modes:

with_replacement

Sample streamers with replacement. This allows a single stream to be used multiple times simultaneously.

exhaustive

Each streamer is consumed at most once and never re-activated.

single_active

Each stream in the candidate pool is either active or not. Streams are revived when they are exhausted. This setting makes it so that streams in the active pool are uniquely selected from the candidate pool, where as with_replacement allows the same stream to be used more than once.

ShuffledMux

A ShuffledMux interleaves samples from all given streamers.

RoundRobinMux

Iterates over all the streamers in strict order.

ChainMux

As in itertools.chain(), runs the first streamer to exhaustion, then the second, then the third, etc. Uses only a single stream at a time.

StochasticMux(streamers, n_active, rate[, ...])

Stochastic Mux

ShuffledMux(streamers[, weights, random_state])

A variation on a mux, which takes N streamers, and samples from them equally, guaranteeing all N streamers to be "active".

RoundRobinMux(streamers[, mode, random_state])

A Mux which iterates over all streamers in strict order.

ChainMux(streamers[, mode, random_state])

As in itertools.chain().

Maps (Transformers)

Map functions perform operations on a stream.

Important note: map functions return a generator, not another Streamer, so if you need it to behave like a Streamer, you have to wrap the function in a Streamer again.

buffer_stream(stream, buffer_size[, ...])

Buffer data from an stream into one data object.

tuples(stream, *keys)

Reformat data as tuples.

keras_tuples(stream[, inputs, outputs])

Reformat data objects as keras-compatible tuples.

cache(stream, n_cache[, prob, random_state])

Stochastic stream caching.