API Reference¶
Streamer¶
-
class
pescador.
Streamer
(streamer, *args, **kwargs)¶ A wrapper class for recycling iterables and generator functions, i.e. streamers.
Wrapping streamers within an object provides two useful features:
- Streamer objects can be serialized (as long as its streamer can be)
- Streamer objects can instantiate a generator multiple times.
The first feature is important for parallelization (see zmq_stream), while the second feature facilitates infinite streaming from finite data (i.e., oversampling).
Examples
Generate random 3-dimensional vectors
>>> def my_generator(n): ... for i in range(n): ... yield i >>> stream = Streamer(my_generator, 5) >>> for i in stream: ... print(i) # Displays 0, 1, 2, 3, 4
Or with a maximum number of items
>>> for i in stream(max_items=3): ... print(i) # Displays 0, 1, 2
Or infinitely many examples, restarting the generator as needed
>>> for i in stream.cycle(): ... print(i) # Displays 0, 1, 2, 3, 4, 0, 1, 2, ...
An alternate interface for the same:
>>> for i in stream(cycle=True): ... print(i) # Displays 0, 1, 2, 3, 4, 0, 1, 2, ...
Attributes
streamer (generator or iterable) Any generator function or iterable python object. args (list) kwargs (dict) Parameters provided to streamer, if callable. Initializer
Parameters: streamer : iterable or generator function
Any generator function or object that is iterable when instantiated.
args, kwargs
Additional positional arguments or keyword arguments passed to
streamer()
if it is callable.Raises: PescadorError
If
streamer
is not a generator or an Iterable object.-
__call__
(max_iter=None, cycle=False, max_batches=<DEPRECATED parameter>)¶ Convenience interface for interacting with the Streamer.
Parameters: max_iter : None or int > 0
Maximum number of iterations to yield. If None, attempt to exhaust the stream. For finite streams, call iterate again, or use cycle=True to force an infinite stream.
cycle: bool
If True, cycle indefinitely.
max_batches : None or int > 0
Warning
This parameter name was deprecated in pescador 1.1 Use the max_iter parameter instead. The max_batches parameter will be removed in pescador 2.0.
Yields: obj : Objects yielded by the generator provided on init.
-
activate
()¶ Activates the stream.
-
active
¶ Returns true if the stream is active (ie a StopIteration) has not been thrown.
-
cycle
()¶ Iterate from the streamer infinitely.
This function will force an infinite stream, restarting the streamer even if a StopIteration is raised.
Yields: obj : Objects yielded by the streamer provided on init.
-
iterate
(max_iter=None)¶ Instantiate an iterator.
Parameters: max_iter : None or int > 0
Maximum number of iterations to yield. If
None
, exhaust the stream.Yields: obj : Objects yielded by the streamer provided on init.
See also
cycle
- force an infinite stream.
-
tuples
(*items, **kwargs)¶ Generate data in tuple-form instead of dicts.
This is useful for interfacing with Keras’s generator system, which requires iterates to be provided as tuples.
Parameters: *items
One or more dictionary keys. The generated tuples will correspond to (batch[item1], batch[item2], ..., batch[itemk]) where batch is a single iterate produced by the streamer.
cycle : bool
If True, then data is generated infinitely using the cycle method. Otherwise, data is generated according to the generate method.
max_batches : None or int > 0
Maximum number of batches to yield. If
None
, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.Yields: batch : tuple
Items from the contained generator If max_batches is an integer, then at most max_batches are generated.
Multiplexing¶
-
class
pescador.
Mux
(streamers, k, rate=256.0, weights=None, with_replacement=True, prune_empty_streams=True, revive=False, random_state=None, seed_pool=<DEPRECATED parameter>, lam=<DEPRECATED parameter>, pool_weights=<DEPRECATED parameter>, prune_empty_seeds=<DEPRECATED parameter>)¶ Stochastic multiplexor for Streamers
Examples
>>> # Create a collection of streamers >>> seeds = [pescador.Streamer(my_generator) for i in range(10)] >>> # Multiplex them together into a single streamer >>> # Use at most 3 streams at once >>> mux = pescador.Mux(seeds, k=3) >>> for batch in mux(): ... MY_FUNCTION(batch)
Mux([stream, range(8), stream2])
Given an array (pool) of streamer types, do the following:
- Select
k
streams at random to iterate from - Assign each activated stream a sample count ~ Poisson(lam)
- Yield samples from the streams by randomly multiplexing from the active set.
- When a stream is exhausted, select a new one from streamers.
Parameters: streamers : iterable of streamers
The collection of streamer-type objects
k : int > 0
The number of streams to keep active at any time.
rate : float > 0 or None
Rate parameter for the Poisson distribution governing sample counts for individual streams. If
None
, sample infinitely from each stream.weights : np.ndarray or None
Optional weighting for
streamers
. IfNone
, then weights are assumed to be uniform. Otherwise,weights[i]
defines the sampling proportion ofstreamers[i]
.Must have the same length as
streamers
.with_replacement : bool
Sample streamers with replacement. This allows a single stream to be used multiple times (even simultaneously). If
False
, then each streamer is consumed at most once and never revisited.prune_empty_streams : bool
Disable streamers that produce no data. If
True
, streamers that previously produced no data are never revisited. Note: 1. This may be undesireable for streams where past emptiness may not imply future emptiness. 2. Failure to prune truly empty streams with revive=True can result in infinite looping behavior. Disable with caution.revive: bool
If
with_replacement
isFalse
, settingrevive=True
will re-insert previously exhausted streams into the candidate set.This configuration allows a stream to be active at most once at any time.
random_state : None, int, or np.random.RandomState
If int, random_state is the seed used by the random number generator;
If RandomState instance, random_state is the random number generator;
If None, the random number generator is the RandomState instance used by np.random.
seed_pool : iterable of streamers
Warning
This parameter name was deprecated in pescador 1.1 Use the streamers parameter instead. The seed_pool parameter will be removed in pescador 2.0.
lam : float > 0.0
Warning
This parameter name was deprecated in pescador 1.1 Use the rate parameter instead. The lam parameter will be removed in pescador 2.0.
pool_weights : np.ndarray or None
Warning
This parameter name was deprecated in pescador 1.1 Use the weights parameter instead. The pool_weights parameter will be removed in pescador 2.0.
prune_empty_seeds : bool
Warning
This parameter name was deprecated in pescador 1.1 Use the prune_empty_streams parameter instead. The prune_empty_seeds parameter will be removed in pescador 2.0.
-
__call__
(max_iter=None, cycle=False, max_batches=<DEPRECATED parameter>)¶ Convenience interface for interacting with the Streamer.
Parameters: max_iter : None or int > 0
Maximum number of iterations to yield. If None, attempt to exhaust the stream. For finite streams, call iterate again, or use cycle=True to force an infinite stream.
cycle: bool
If True, cycle indefinitely.
max_batches : None or int > 0
Warning
This parameter name was deprecated in pescador 1.1 Use the max_iter parameter instead. The max_batches parameter will be removed in pescador 2.0.
Yields: obj : Objects yielded by the generator provided on init.
See also
iterate
,cycle
-
activate
()¶ Activates a number of streams
-
active
¶ Returns true if the stream is active (ie a StopIteration) has not been thrown.
-
cycle
()¶ Iterate from the streamer infinitely.
This function will force an infinite stream, restarting the streamer even if a StopIteration is raised.
Yields: obj : Objects yielded by the streamer provided on init.
-
tuples
(*items, **kwargs)¶ Generate data in tuple-form instead of dicts.
This is useful for interfacing with Keras’s generator system, which requires iterates to be provided as tuples.
Parameters: *items
One or more dictionary keys. The generated tuples will correspond to (batch[item1], batch[item2], ..., batch[itemk]) where batch is a single iterate produced by the streamer.
cycle : bool
If True, then data is generated infinitely using the cycle method. Otherwise, data is generated according to the generate method.
max_batches : None or int > 0
Maximum number of batches to yield. If
None
, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.Yields: batch : tuple
Items from the contained generator If max_batches is an integer, then at most max_batches are generated.
- Select
Maps (Transformers)¶
Map functions perform operations on a stream.
Important note: map functions return a generator, not another Streamer, so if you need it to behave like a Streamer, you have to wrap the function in a Streamer again.
buffer_stream (stream, buffer_size[, ...]) |
Buffer “data” from an stream into one data object. |
tuples (stream, *keys) |
Reformat data as tuples. |
keras_tuples (stream[, inputs, outputs]) |
Reformat data objects as keras-compatible tuples. |
Parallel streaming¶
-
class
pescador.
ZMQStreamer
(streamer, min_port=49152, max_port=65535, max_tries=100, copy=False, timeout=5)¶ Parallel data streaming over zeromq sockets.
This allows a data generator to run in a separate process from the consumer.
A typical usage pattern is to construct a Streamer object from a generator and then use ZMQStreamer to execute the stream in one process while the other process consumes data.
Examples
>>> # Construct a streamer object >>> S = pescador.Streamer(my_generator) >>> # Wrap the streamer in a ZMQ streamer >>> Z = pescador.ZMQStreamer(S) >>> # Process as normal >>> for data in Z: ... MY_FUNCTION(data)
Parameters: streamer : pescador.Streamer
The streamer object
min_port : int > 0
max_port : int > min_port
The range of TCP ports to use
max_tries : int > 0
The maximum number of connection attempts to make
copy : bool
Set True to enable data copying
timeout : [optional] number > 0
Maximum time (in seconds) to wait before killing subprocesses. If None, then the streamer will wait indefinitely for subprocesses to terminate.
-
__call__
(max_iter=None, cycle=False, max_batches=<DEPRECATED parameter>)¶ Convenience interface for interacting with the Streamer.
Parameters: max_iter : None or int > 0
Maximum number of iterations to yield. If None, attempt to exhaust the stream. For finite streams, call iterate again, or use cycle=True to force an infinite stream.
cycle: bool
If True, cycle indefinitely.
max_batches : None or int > 0
Warning
This parameter name was deprecated in pescador 1.1 Use the max_iter parameter instead. The max_batches parameter will be removed in pescador 2.0.
Yields: obj : Objects yielded by the generator provided on init.
See also
iterate
,cycle
-
activate
()¶ Activates the stream.
-
active
¶ Returns true if the stream is active (ie a StopIteration) has not been thrown.
-
cycle
()¶ Iterate from the streamer infinitely.
This function will force an infinite stream, restarting the streamer even if a StopIteration is raised.
Yields: obj : Objects yielded by the streamer provided on init.
-
iterate
(max_iter=None)¶ Note: A ZMQStreamer does not activate its stream, but allows the zmq_worker to do that.
Yields: data : dict
Data drawn from streamer(max_iter).
-
tuples
(*items, **kwargs)¶ Generate data in tuple-form instead of dicts.
This is useful for interfacing with Keras’s generator system, which requires iterates to be provided as tuples.
Parameters: *items
One or more dictionary keys. The generated tuples will correspond to (batch[item1], batch[item2], ..., batch[itemk]) where batch is a single iterate produced by the streamer.
cycle : bool
If True, then data is generated infinitely using the cycle method. Otherwise, data is generated according to the generate method.
max_batches : None or int > 0
Maximum number of batches to yield. If
None
, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.Yields: batch : tuple
Items from the contained generator If max_batches is an integer, then at most max_batches are generated.
See also
cycle
,tuples
,keras_tuples
-