API Reference¶
Streamer¶
-
class
pescador.
Streamer
(streamer, *args, **kwargs)¶ A wrapper class for reusable generators.
Wrapping generators/iterators within an object provides two useful features:
- Streamer objects can be serialized (as long as the generator can be)
- Streamer objects can instantiate a generator multiple times.
The first feature is important for parallelization (see zmq_stream), while the second feature facilitates infinite streaming from finite data (i.e., oversampling).
Examples
Generate batches of random 3-dimensional vectors
>>> def my_generator(n): ... for i in range(n): ... yield {'X': np.random.randn(1, 3)} >>> GS = Streamer(my_generator, 5) >>> for i in GS(): ... print(i)
Or with a maximum number of items
>>> for i in GS(max_items=3): ... print(i)
Or infinitely many examples, restarting the generator as needed
>>> for i in GS.cycle(): ... print(i)
An alternate interface for the same:
>>> for i in GS(cycle=True): ... print(i)
Attributes
generator (iterable or Streamer) A generator function or iterable collection to draw from. May be another instance or subclass of Streamer. args (list) kwargs (dict) If generator is a function, then args and kwargs provide the parameters to the function. -
__call__
(max_batches=None, cycle=False)¶ Parameters: max_batches : None or int > 0
Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.
cycle: bool
If True, cycle indefinitely.
Yields: batch : dict
Items from the contained generator If max_batches is an integer, then at most max_batches are generated.
-
activate
()¶ Activates the stream.
-
active
¶ Returns true if the stream is active (ie a StopIteration) has not been thrown.
-
cycle
()¶ Generates from the streamer infinitely.
This function will force an infinite stream, restarting the generator even if a StopIteration is raised.
Yields: batch
Items from the contained generator.
-
generate
(max_batches=None)¶ Instantiate the generator
Parameters: max_batches : None or int > 0
Maximum number of batches to yield. If
None
, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.Yields: batch : dict
Items from the contained generator If max_batches is an integer, then at most max_batches are generated.
-
tuples
(*items, **kwargs)¶ Generate data in tuple-form instead of dicts.
This is useful for interfacing with Keras’s generator system, which requires iterates to be provided as tuples.
Parameters: *items
One or more dictionary keys. The generated tuples will correspond to (batch[item1], batch[item2], ..., batch[itemk]) where batch is a single iterate produced by the streamer.
cycle : bool
If True, then data is generated infinitely using the cycle method. Otherwise, data is generated according to the generate method.
max_batches : None or int > 0
Maximum number of batches to yield. If
None
, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.Yields: batch : tuple
Items from the contained generator If max_batches is an integer, then at most max_batches are generated.
Buffered Streamers¶
-
class
pescador.
BufferedStreamer
(streamer, buffer_size, strict_batch_size=True)¶ Buffers a stream into batches of examples
Examples
>>> def my_generator(n): ... # Generates a single 30-dimensional example vector for each iterate ... for i in range(n): ... yield dict(X=np.random.randn(1, 30)) >>> # Wrap the generator in a Streamer >>> S = pescador.Streamer(my_generator, 128) >>> # A buffered streamer will combine N iterates into a single batch >>> N = 10 >>> B = pescador.BufferedStreamer(my_generator, N) >>> for batch in B(): ... # Work on a batch of N=10 examples ... MY_PROCESS_FUNCTION(batch)
-
__call__
(max_batches=None, cycle=False)¶ Parameters: max_batches : None or int > 0
Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.
cycle: bool
If True, cycle indefinitely.
Yields: batch : dict
Items from the contained generator If max_batches is an integer, then at most max_batches are generated.
-
activate
()¶ Activates the stream.
-
active
¶ Returns true if the stream is active (ie a StopIteration) has not been thrown.
-
cycle
()¶ Generates from the streamer infinitely.
This function will force an infinite stream, restarting the generator even if a StopIteration is raised.
Yields: batch
Items from the contained generator.
-
generate
(max_batches=None)¶ Generate samples from the streamer.
Parameters: max_batches : int
For the BufferedStreamer, max_batches is the number of buffered batches that are generated, not the number of individual samples.
-
tuples
(*items, **kwargs)¶ Generate data in tuple-form instead of dicts.
This is useful for interfacing with Keras’s generator system, which requires iterates to be provided as tuples.
Parameters: *items
One or more dictionary keys. The generated tuples will correspond to (batch[item1], batch[item2], ..., batch[itemk]) where batch is a single iterate produced by the streamer.
cycle : bool
If True, then data is generated infinitely using the cycle method. Otherwise, data is generated according to the generate method.
max_batches : None or int > 0
Maximum number of batches to yield. If
None
, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.Yields: batch : tuple
Items from the contained generator If max_batches is an integer, then at most max_batches are generated.
-
Multiplexing¶
-
class
pescador.
Mux
(seed_pool, k, lam=256.0, pool_weights=None, with_replacement=True, prune_empty_seeds=True, revive=False, random_state=None)¶ Stochastic multiplexor for Streamers
Examples
>>> # Create a collection of streamers >>> seeds = [pescador.Streamer(my_generator) for i in range(10)] >>> # Multiplex them together into a single streamer >>> # Use at most 3 streams at once >>> mux = pescador.Mux(seeds, k=3) >>> for batch in mux(): ... MY_FUNCTION(batch)
-
__call__
(max_batches=None, cycle=False)¶ Parameters: max_batches : None or int > 0
Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.
cycle: bool
If True, cycle indefinitely.
Yields: batch : dict
Items from the contained generator If max_batches is an integer, then at most max_batches are generated.
-
activate
()¶ Activates the seed pool
-
active
¶ Returns true if the stream is active (ie a StopIteration) has not been thrown.
-
cycle
()¶ Generates from the streamer infinitely.
This function will force an infinite stream, restarting the generator even if a StopIteration is raised.
Yields: batch
Items from the contained generator.
-
tuples
(*items, **kwargs)¶ Generate data in tuple-form instead of dicts.
This is useful for interfacing with Keras’s generator system, which requires iterates to be provided as tuples.
Parameters: *items
One or more dictionary keys. The generated tuples will correspond to (batch[item1], batch[item2], ..., batch[itemk]) where batch is a single iterate produced by the streamer.
cycle : bool
If True, then data is generated infinitely using the cycle method. Otherwise, data is generated according to the generate method.
max_batches : None or int > 0
Maximum number of batches to yield. If
None
, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.Yields: batch : tuple
Items from the contained generator If max_batches is an integer, then at most max_batches are generated.
See also
generate
,cycle
-
Parallel streaming¶
-
class
pescador.
ZMQStreamer
(streamer, min_port=49152, max_port=65535, max_tries=100, copy=False, timeout=None)¶ Parallel data streaming over zeromq sockets.
This allows a data generator to run in a separate process from the consumer.
A typical usage pattern is to construct a Streamer object from a generator and then use ZMQStreamer to execute the stream in one process while the other process consumes data.
Examples
>>> # Construct a streamer object >>> S = pescador.Streamer(my_generator) >>> # Wrap the streamer in a ZMQ streamer >>> Z = pescador.ZMQStreamer(S) >>> # Process as normal >>> for batch in Z(): ... MY_FUNCTION(batch)
-
__call__
(max_batches=None, cycle=False)¶ Parameters: max_batches : None or int > 0
Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.
cycle: bool
If True, cycle indefinitely.
Yields: batch : dict
Items from the contained generator If max_batches is an integer, then at most max_batches are generated.
-
activate
()¶ Activates the stream.
-
active
¶ Returns true if the stream is active (ie a StopIteration) has not been thrown.
-
cycle
()¶ Generates from the streamer infinitely.
This function will force an infinite stream, restarting the generator even if a StopIteration is raised.
Yields: batch
Items from the contained generator.
-
generate
(max_batches=None)¶ Note: A ZMQStreamer does not activate it’s stream, but allows the zmq_worker to do that.
Yields: batch
Data drawn from streamer(max_batches).
-
tuples
(*items, **kwargs)¶ Generate data in tuple-form instead of dicts.
This is useful for interfacing with Keras’s generator system, which requires iterates to be provided as tuples.
Parameters: *items
One or more dictionary keys. The generated tuples will correspond to (batch[item1], batch[item2], ..., batch[itemk]) where batch is a single iterate produced by the streamer.
cycle : bool
If True, then data is generated infinitely using the cycle method. Otherwise, data is generated according to the generate method.
max_batches : None or int > 0
Maximum number of batches to yield. If
None
, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.Yields: batch : tuple
Items from the contained generator If max_batches is an integer, then at most max_batches are generated.
-