API Reference

Streamer

class pescador.Streamer(streamer, *args, **kwargs)

A wrapper class for recycling iterables and generator functions, i.e. streamers.

Wrapping streamers within an object provides two useful features:

  1. Streamer objects can be serialized (as long as its streamer can be)
  2. Streamer objects can instantiate a generator multiple times.

The first feature is important for parallelization (see zmq_stream), while the second feature facilitates infinite streaming from finite data (i.e., oversampling).

Examples

Generate random 3-dimensional vectors

>>> def my_generator(n):
...     for i in range(n):
...         yield i
>>> stream = Streamer(my_generator, 5)
>>> for i in stream:
...     print(i)  # Displays 0, 1, 2, 3, 4

Or with a maximum number of items

>>> for i in stream(max_items=3):
...     print(i)  # Displays 0, 1, 2

Or infinitely many examples, restarting the generator as needed

>>> for i in stream.cycle():
...     print(i)  # Displays 0, 1, 2, 3, 4, 0, 1, 2, ...

An alternate interface for the same:

>>> for i in stream(cycle=True):
...     print(i)  # Displays 0, 1, 2, 3, 4, 0, 1, 2, ...

Attributes

streamer (generator or iterable) Any generator function or iterable python object.
args (list)
kwargs (dict) Parameters provided to streamer, if callable.

Initializer

Parameters:

streamer : iterable or generator function

Any generator function or object that is iterable when instantiated.

args, kwargs

Additional positional arguments or keyword arguments passed to streamer() if it is callable.

Raises:

PescadorError

If streamer is not a generator or an Iterable object.

__call__(max_iter=None, cycle=False, max_batches=<DEPRECATED parameter>)

Convenience interface for interacting with the Streamer.

Parameters:

max_iter : None or int > 0

Maximum number of iterations to yield. If None, attempt to exhaust the stream. For finite streams, call iterate again, or use cycle=True to force an infinite stream.

cycle: bool

If True, cycle indefinitely.

max_batches : None or int > 0

Warning

This parameter name was deprecated in pescador 1.1 Use the max_iter parameter instead. The max_batches parameter will be removed in pescador 2.0.

Yields:

obj : Objects yielded by the generator provided on init.

See also

iterate, cycle

activate()

Activates the stream.

active

Returns true if the stream is active (ie a StopIteration) has not been thrown.

cycle()

Iterate from the streamer infinitely.

This function will force an infinite stream, restarting the streamer even if a StopIteration is raised.

Yields:obj : Objects yielded by the streamer provided on init.
iterate(max_iter=None)

Instantiate an iterator.

Parameters:

max_iter : None or int > 0

Maximum number of iterations to yield. If None, exhaust the stream.

Yields:

obj : Objects yielded by the streamer provided on init.

See also

cycle
force an infinite stream.
tuples(*items, **kwargs)

Generate data in tuple-form instead of dicts.

This is useful for interfacing with Keras’s generator system, which requires iterates to be provided as tuples.

Parameters:

*items

One or more dictionary keys. The generated tuples will correspond to (batch[item1], batch[item2], ..., batch[itemk]) where batch is a single iterate produced by the streamer.

cycle : bool

If True, then data is generated infinitely using the cycle method. Otherwise, data is generated according to the generate method.

max_batches : None or int > 0

Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.

Yields:

batch : tuple

Items from the contained generator If max_batches is an integer, then at most max_batches are generated.

See also

cycle, tuples, keras_tuples

Multiplexing

class pescador.Mux(streamers, k, rate=256.0, weights=None, with_replacement=True, prune_empty_streams=True, revive=False, random_state=None, seed_pool=<DEPRECATED parameter>, lam=<DEPRECATED parameter>, pool_weights=<DEPRECATED parameter>, prune_empty_seeds=<DEPRECATED parameter>)

Stochastic multiplexor for Streamers

Examples

>>> # Create a collection of streamers
>>> seeds = [pescador.Streamer(my_generator) for i in range(10)]
>>> # Multiplex them together into a single streamer
>>> # Use at most 3 streams at once
>>> mux = pescador.Mux(seeds, k=3)
>>> for batch in mux():
...     MY_FUNCTION(batch)

Mux([stream, range(8), stream2])

Given an array (pool) of streamer types, do the following:

  1. Select k streams at random to iterate from
  2. Assign each activated stream a sample count ~ Poisson(lam)
  3. Yield samples from the streams by randomly multiplexing from the active set.
  4. When a stream is exhausted, select a new one from streamers.
Parameters:

streamers : iterable of streamers

The collection of streamer-type objects

k : int > 0

The number of streams to keep active at any time.

rate : float > 0 or None

Rate parameter for the Poisson distribution governing sample counts for individual streams. If None, sample infinitely from each stream.

weights : np.ndarray or None

Optional weighting for streamers. If None, then weights are assumed to be uniform. Otherwise, weights[i] defines the sampling proportion of streamers[i].

Must have the same length as streamers.

with_replacement : bool

Sample streamers with replacement. This allows a single stream to be used multiple times (even simultaneously). If False, then each streamer is consumed at most once and never revisited.

prune_empty_streams : bool

Disable streamers that produce no data. If True, streamers that previously produced no data are never revisited. Note: 1. This may be undesireable for streams where past emptiness may not imply future emptiness. 2. Failure to prune truly empty streams with revive=True can result in infinite looping behavior. Disable with caution.

revive: bool

If with_replacement is False, setting revive=True will re-insert previously exhausted streams into the candidate set.

This configuration allows a stream to be active at most once at any time.

random_state : None, int, or np.random.RandomState

If int, random_state is the seed used by the random number generator;

If RandomState instance, random_state is the random number generator;

If None, the random number generator is the RandomState instance used by np.random.

seed_pool : iterable of streamers

Warning

This parameter name was deprecated in pescador 1.1 Use the streamers parameter instead. The seed_pool parameter will be removed in pescador 2.0.

lam : float > 0.0

Warning

This parameter name was deprecated in pescador 1.1 Use the rate parameter instead. The lam parameter will be removed in pescador 2.0.

pool_weights : np.ndarray or None

Warning

This parameter name was deprecated in pescador 1.1 Use the weights parameter instead. The pool_weights parameter will be removed in pescador 2.0.

prune_empty_seeds : bool

Warning

This parameter name was deprecated in pescador 1.1 Use the prune_empty_streams parameter instead. The prune_empty_seeds parameter will be removed in pescador 2.0.

__call__(max_iter=None, cycle=False, max_batches=<DEPRECATED parameter>)

Convenience interface for interacting with the Streamer.

Parameters:

max_iter : None or int > 0

Maximum number of iterations to yield. If None, attempt to exhaust the stream. For finite streams, call iterate again, or use cycle=True to force an infinite stream.

cycle: bool

If True, cycle indefinitely.

max_batches : None or int > 0

Warning

This parameter name was deprecated in pescador 1.1 Use the max_iter parameter instead. The max_batches parameter will be removed in pescador 2.0.

Yields:

obj : Objects yielded by the generator provided on init.

See also

iterate, cycle

activate()

Activates a number of streams

active

Returns true if the stream is active (ie a StopIteration) has not been thrown.

cycle()

Iterate from the streamer infinitely.

This function will force an infinite stream, restarting the streamer even if a StopIteration is raised.

Yields:obj : Objects yielded by the streamer provided on init.
tuples(*items, **kwargs)

Generate data in tuple-form instead of dicts.

This is useful for interfacing with Keras’s generator system, which requires iterates to be provided as tuples.

Parameters:

*items

One or more dictionary keys. The generated tuples will correspond to (batch[item1], batch[item2], ..., batch[itemk]) where batch is a single iterate produced by the streamer.

cycle : bool

If True, then data is generated infinitely using the cycle method. Otherwise, data is generated according to the generate method.

max_batches : None or int > 0

Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.

Yields:

batch : tuple

Items from the contained generator If max_batches is an integer, then at most max_batches are generated.

See also

cycle, tuples, keras_tuples

Maps (Transformers)

Map functions perform operations on a stream.

Important note: map functions return a generator, not another Streamer, so if you need it to behave like a Streamer, you have to wrap the function in a Streamer again.

buffer_stream(stream, buffer_size[, ...]) Buffer “data” from an stream into one data object.
tuples(stream, *keys) Reformat data as tuples.
keras_tuples(stream[, inputs, outputs]) Reformat data objects as keras-compatible tuples.

Parallel streaming

class pescador.ZMQStreamer(streamer, min_port=49152, max_port=65535, max_tries=100, copy=False, timeout=5)

Parallel data streaming over zeromq sockets.

This allows a data generator to run in a separate process from the consumer.

A typical usage pattern is to construct a Streamer object from a generator and then use ZMQStreamer to execute the stream in one process while the other process consumes data.

Examples

>>> # Construct a streamer object
>>> S = pescador.Streamer(my_generator)
>>> # Wrap the streamer in a ZMQ streamer
>>> Z = pescador.ZMQStreamer(S)
>>> # Process as normal
>>> for data in Z:
...     MY_FUNCTION(data)
Parameters:

streamer : pescador.Streamer

The streamer object

min_port : int > 0

max_port : int > min_port

The range of TCP ports to use

max_tries : int > 0

The maximum number of connection attempts to make

copy : bool

Set True to enable data copying

timeout : [optional] number > 0

Maximum time (in seconds) to wait before killing subprocesses. If None, then the streamer will wait indefinitely for subprocesses to terminate.

__call__(max_iter=None, cycle=False, max_batches=<DEPRECATED parameter>)

Convenience interface for interacting with the Streamer.

Parameters:

max_iter : None or int > 0

Maximum number of iterations to yield. If None, attempt to exhaust the stream. For finite streams, call iterate again, or use cycle=True to force an infinite stream.

cycle: bool

If True, cycle indefinitely.

max_batches : None or int > 0

Warning

This parameter name was deprecated in pescador 1.1 Use the max_iter parameter instead. The max_batches parameter will be removed in pescador 2.0.

Yields:

obj : Objects yielded by the generator provided on init.

See also

iterate, cycle

activate()

Activates the stream.

active

Returns true if the stream is active (ie a StopIteration) has not been thrown.

cycle()

Iterate from the streamer infinitely.

This function will force an infinite stream, restarting the streamer even if a StopIteration is raised.

Yields:obj : Objects yielded by the streamer provided on init.
iterate(max_iter=None)

Note: A ZMQStreamer does not activate its stream, but allows the zmq_worker to do that.

Yields:

data : dict

Data drawn from streamer(max_iter).

tuples(*items, **kwargs)

Generate data in tuple-form instead of dicts.

This is useful for interfacing with Keras’s generator system, which requires iterates to be provided as tuples.

Parameters:

*items

One or more dictionary keys. The generated tuples will correspond to (batch[item1], batch[item2], ..., batch[itemk]) where batch is a single iterate produced by the streamer.

cycle : bool

If True, then data is generated infinitely using the cycle method. Otherwise, data is generated according to the generate method.

max_batches : None or int > 0

Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.

Yields:

batch : tuple

Items from the contained generator If max_batches is an integer, then at most max_batches are generated.

See also

cycle, tuples, keras_tuples