API Reference¶
Streamer¶
- class pescador.Streamer(streamer, *args, **kwargs)¶
A wrapper class for recycling iterables and generator functions, i.e. streamers.
Wrapping streamers within an object provides two useful features:
Streamer objects can be serialized (as long as its streamer can be)
Streamer objects can instantiate a generator multiple times.
The first feature is important for parallelization (see
zmq_stream
), while the second feature facilitates infinite streaming from finite data (i.e., oversampling).Examples
Generate random 3-dimensional vectors
>>> def my_generator(n): ... for i in range(n): ... yield i >>> stream = Streamer(my_generator, 5) >>> for i in stream: ... print(i) # Displays 0, 1, 2, 3, 4
Or with a maximum number of examples
>>> for i in stream(max_iter=3): ... print(i) # Displays 0, 1, 2
Or infinitely many examples, restarting the generator as needed
>>> for i in stream.cycle(): ... print(i) # Displays 0, 1, 2, 3, 4, 0, 1, 2, ...
Or finitely many examples, restarting the generator as needed
>>> for i in stream.cycle(max_iter=7): ... print(i) # Displays 0, 1, 2, 3, 4, 0, 1
An alternate interface for the same:
>>> for i in stream(cycle=True): ... print(i) # Displays 0, 1, 2, 3, 4, 0, 1, 2, ...
- Attributes:
- streamergenerator or iterable
Any generator function or iterable python object.
- argslist
- kwargsdict
Parameters provided to streamer, if callable.
- __call__(max_iter=None, cycle=False)¶
Allow streamers to act like callables
- Parameters:
- max_iterNone or int > 0
Maximum number of iterations to yield. If None, attempt to exhaust the stream. For finite streams, call iterate again, or use cycle=True to force an infinite stream.
- cyclebool
If True, cycle indefinitely.
- Yields:
- objObjects yielded by the generator provided on init.
- property active¶
Return true if the stream is active (ie there are still open / existing streams)
- cycle(max_iter=None)¶
Iterate from the streamer infinitely.
This function will force an infinite stream, restarting the streamer even if a StopIteration is raised.
- Parameters:
- max_iterNone or int > 0
Maximum number of iterations to yield. If None, iterate indefinitely.
- Yields:
- objObjects yielded by the streamer provided on init.
- property is_activated_copy¶
is_active is true if this object is a copy of the original Streamer and has been activated.
Parallel streaming¶
- class pescador.ZMQStreamer(streamer, min_port=49152, max_port=65535, max_tries=100, copy=False, timeout=5)¶
Parallel data streaming over zeromq sockets.
This allows a data generator to run in a separate process from the consumer.
A typical usage pattern is to construct a
Streamer
object from a generator and then useZMQStreamer
to execute the stream in one process while the other process consumes data.Examples
>>> # Construct a streamer object >>> S = pescador.Streamer(my_generator) >>> # Wrap the streamer in a ZMQ streamer >>> Z = pescador.ZMQStreamer(S) >>> # Process as normal >>> for data in Z: ... MY_FUNCTION(data)
- __call__(max_iter=None, cycle=False)¶
Allow streamers to act like callables
- Parameters:
- max_iterNone or int > 0
Maximum number of iterations to yield. If None, attempt to exhaust the stream. For finite streams, call iterate again, or use cycle=True to force an infinite stream.
- cyclebool
If True, cycle indefinitely.
- Yields:
- objObjects yielded by the generator provided on init.
- property active¶
Return true if the stream is active (ie there are still open / existing streams)
- cycle(max_iter=None)¶
Iterate from the streamer infinitely.
This function will force an infinite stream, restarting the streamer even if a StopIteration is raised.
- Parameters:
- max_iterNone or int > 0
Maximum number of iterations to yield. If None, iterate indefinitely.
- Yields:
- objObjects yielded by the streamer provided on init.
- property is_activated_copy¶
is_active is true if this object is a copy of the original Streamer and has been activated.
- iterate(max_iter=None)¶
Note: A ZMQStreamer does not activate its stream, but allows the zmq_worker to do that.
- Yields:
- datadict
Data drawn from streamer(max_iter).
Multiplexing¶
Defines the interface and several varieties of mux. A mux is a Streamer which wraps N other Streamer objects, and at every step yields a sample from one of its sub-streamers.
This module defines the following Mux types:
StochasticMux
A Mux which chooses its active streams stochastically, and chooses samples from the active streams stochastically.
StochasticMux
is equivalent to the pescador.Mux from versions <2.0.StochasticMux
has a mode parameter which selects how it operates, with the following modes:with_replacement
Sample streamers with replacement. This allows a single stream to be used multiple times simultaneously.
exhaustive
Each streamer is consumed at most once and never re-activated.
single_active
Each stream in the candidate pool is either active or not. Streams are revived when they are exhausted. This setting makes it so that streams in the active pool are uniquely selected from the candidate pool, where as with_replacement allows the same stream to be used more than once.
ShuffledMux
A
ShuffledMux
interleaves samples from all given streamers.RoundRobinMux
Iterates over all the streamers in strict order.
ChainMux
As in itertools.chain(), runs the first streamer to exhaustion, then the second, then the third, etc. Uses only a single stream at a time.
|
Stochastic Mux |
|
A variation on a mux, which takes N streamers, and samples from them equally, guaranteeing all N streamers to be "active". |
|
A Mux which iterates over all streamers in strict order. |
|
As in itertools.chain(). |
Maps (Transformers)¶
Map functions perform operations on a stream.
Important note: map functions return a generator, not another Streamer, so if you need it to behave like a Streamer, you have to wrap the function in a Streamer again.
|
Buffer data from an stream into one data object. |
|
Reformat data as tuples. |
|
Reformat data objects as keras-compatible tuples. |
|
Stochastic stream caching. |