Introduction¶
Pescador is a library for streaming (numerical) data for use in iterative machine learning applications.
The core concept is the Streamer object, which encapsulates a Python generator to allow for re-use and inter-process communication.
The basic use case is as follows:
- Define a generator function g which yields a dictionary of numpy arrays at each step
- Construct a Streamer object stream = Streamer(g, args...)
- Iterate over examples generated by stream().
On top of this basic functionality, pescador provides the following tools:
- Buffering of sampled data into fixed-size batches (see Buffered Streamers)
- Multiplexing multiple data streams with dynamic (see Multiplexing)
- Parallel processing (see Parallel streaming)
For examples of each of these use-cases, refer to the Examples section.
Basic Usage¶
Basic example¶
This document will walk through the basics of using pescador to stream samples from a generator.
Our running example will be learning from an infinite stream of stochastically perturbed samples from the Iris dataset.
Before we can get started, we’ll need to introduce a few core concepts. We will assume some basic familiarity with generators.
Batch generators¶
Not all python generators are valid for machine learning. Pescador assumes that generators produce output in a particular format, which we will refer to as a batch. Specifically, a batch is a python dictionary containing np.ndarray. For unsupervised learning (e.g., MiniBatchKMeans), valid batches contain only one key: X. For supervised learning (e.g., SGDClassifier), valid batches must contain both X and Y keys, both of equal length.
Here’s a simple example generator that draws random batches of data from Iris of a specified batch_size, and adds gaussian noise to the features.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | import numpy as np
def noisy_samples(X, Y, batch_size=16, sigma=1.0):
'''Generate an infinite stream of noisy samples from a labeled dataset.
Parameters
----------
X : np.ndarray, shape=(n, d)
Features
Y : np.ndarray, shape=(n,)
Labels
batch_size : int > 0
Size of the batches to generate
sigma : float > 0
Variance of the additive noise
Yields
------
batch : dict
batch['X'] is an `np.ndarray` of shape `(batch_size, d)`
batch[Y'] is an `np.ndarray` of shape `(batch_size,)`
'''
n, d = X.shape
while True:
i = np.random.randint(0, n, size=batch_size)
noise = sigma * np.random.randn(batch_size, d)
yield dict(X=X[i] + noise, Y=Y[i])
|
In the code above, noisy_samples is a generator that can be sampled indefinitely because noisy_samples contains an infinite loop. Each iterate of noisy_samples will be a dictionary containing the sample batch’s features and labels.
Streamers¶
Generators in python have a couple of limitations for common stream learning pipelines. First, once instantiated, a generator cannot be “restarted”. Second, an instantiated generator cannot be serialized directly, so they are difficult to use in distributed computation environments.
Pescador provides the Streamer object to circumvent these issues. Streamer simply provides an object container for an uninstantiated generator (and its parameters), and an access method generate(). Calling generate() multiple times on a streamer object is equivalent to restarting the generator, and can therefore be used to simply implement multiple pass streams. Similarly, because Streamer can be serialized, it is simple to pass a streamer object to a separate process for parallel computation.
Here’s a simple example, using the generator from the previous section.
1 2 3 4 5 | import pescador
streamer = pescador.Streamer(noisy_samples, X[train], Y[train])
batch_stream2 = streamer.generate()
|
Iterating over streamer.generate() is equivalent to iterating over noisy_samples(X[train], Y[train]).
Additionally, Streamer can be bounded easily by saying streamer.generate(max_batches=N) for some N maximum number of batches.
Finally, because generate() is such a common operation with streamer objects, a short-hand interface is provided by treating the streamer object as if it was a generator:
1 2 3 4 5 6 | import pescador
streamer = pescador.Streamer(noisy_samples, X[train], Y[train])
# Equivalent to batch_stream2 above
batch_stream3 = streamer()
|
This document will walk through some advanced usage of pescador.
We will assume a working understanding of the simple example in the previous section.
Stream re-use and multiplexing¶
The Mux streamer provides a powerful interface for randomly interleaving samples from multiple input streams. Mux can also dynamically activate and deactivate individual Streamers, which allows it to operate on a bounded subset of streams at any given time.
As a concrete example, we can simulate a mixture of noisy streams with differing variances.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | from __future__ import print_function
import numpy as np
from sklearn.datasets import load_breast_cancer
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import ShuffleSplit
from sklearn.metrics import accuracy_score
from pescador import Streamer, Mux
def noisy_samples(X, Y, batch_size=16, sigma=1.0):
'''Copied over from the previous example'''
n, d = X.shape
while True:
i = np.random.randint(0, n, size=batch_size)
noise = sigma * np.random.randn(batch_size, d)
yield dict(X=X[i] + noise, Y=Y[i])
# Load some example data from sklearn
raw_data = load_breast_cancer()
X, Y = raw_data['data'], raw_data['target']
classes = np.unique(Y)
for train, test in ShuffleSplit(len(X), n_splits=1, test_size=0.1):
# Instantiate a linear classifier
estimator = SGDClassifier()
# Build a collection of Streamers with different noise scales
streams = [Streamer(noisy_samples, X[train], Y[train], sigma=sigma)
for sigma in [0, 0.5, 1.0, 2.0, 4.0]]
# Build a mux stream, keeping 3 streams alive at once
mux_stream = Mux(streams,
k=3, # Keep 3 streams alive at once
lam=64) # Use a poisson rate of 64
# Fit the model to the stream, use at most 5000 batches
for batch in mux_stream(max_batches=5000):
estimator.partial_fit(batch['X'], batch['Y'], classes=classes)
# And report the accuracy
Ypred = estimator.predict(X[test])
print('Test accuracy: {:.3f}'.format(accuracy_score(Y[test], Ypred)))
|
In the above example, each Streamer in streams can make infinitely many samples. The lam=64 argument to Mux says that each stream should produce some n batches, where n is sampled from a Poisson distribution of rate lam. When a stream exceeds its bound, it is deactivated, and a new streamer is activated to fill its place.
Setting lam=None disables the random stream bounding, and mux() simply runs each active stream until exhaustion.
The Mux streamer can sampled with or without replacement from its input streams, according to the with_replacement option. Setting this parameter to False means that each stream can be active at most once.
Streams can also be sampled with non-uniform weighting by specifying a vector of pool_weights.
Finally, exhausted streams can be removed by setting prune_empty_seeds to True. If False, then exhausted streams may be reactivated at any time.
Sampling from disk¶
A common use case for pescador is to sample data from a large collection of existing archives. As a concrete example, consider the problem of fitting a statistical model to a large corpus of musical recordings. When the corpus is sufficiently large, it is impossible to fit the entire set in memory while estimating the model parameters. Instead, one can pre-process each song to store pre-computed features (and, optionally, target labels) in a numpy zip NPZ archive. The problem then becomes sampling data from a collection of NPZ archives.
Here, we will assume that the pre-processing has already been done so that each NPZ file contains a numpy array of features X and labels Y. We will define infinite samplers that pull n examples per iterate.
import numpy as np
import pescador
def sample_npz(npz_file, n):
'''Generate an infinite sequence of contiguous samples
from the input `npz_file`.
Each iterate has `n > 0` rows.
'''
with np.load(npz_file) as data:
# How many rows are in the data?
# We assume that data['Y'] has the same length
n_total = len(data['X'])
while True:
# Compute the index offset
idx = np.random.randint(n_total - n)
yield dict(X=data['X'][idx:idx + n],
Y=data['Y'][idx:idx + n])
Applying the sample_npz function above to a list of npz_files, we can make a multiplexed streamer object as follows:
n = 16
npz_files = #LIST OF PRE-COMPUTED NPZ FILES
streams = [pescador.Streamer(sample_npz, npz_f, n) for npz_f in npz_files]
# Keep 32 streams alive at once
# Draw on average 16 patches from each stream before deactivating
mux_stream = pescador.Mux(streams, k=32, lam=16)
for batch in mux_stream(max_batches=1000):
# DO LEARNING HERE
pass
Memory-mapping¶
The NPZ file format requires loading the entire contents of each archive into memory. This can lead to high memory consumption when the number of active streams is large. Note also that memory usage for each NPZ file will persist for as long as there is a reference to its contents. This can be circumvented, at the cost of some latency, by copying data within the streamer function:
def sample_npz_copy(npz_file, n):
with np.load(npz_file) as data:
# How many rows are in the data?
# We assume that data['Y'] has the same length
n_total = len(data['X'])
while True:
# Compute the index offset
idx = np.random.randint(n_total - n)
yield dict(X=data['X'][idx:idx + n].copy(), # <-- Note the explicit copy
Y=data['Y'][idx:idx + n].copy())
The above modification will ensure that memory is freed as quickly as possible.
Alternatively, memory-mapping can be used to only load data as needed, but requires that each array is stored in its own NPY file:
def sample_npy_mmap(npy_x, npy_y, n):
# Open each file in "copy-on-write" mode, so that the files are read-only
X = np.load(npy_x, mmap_mode='c')
Y = np.load(npy_y, mmap_mode='c')
n_total = len(X)
while True:
# Compute the index offset
idx = np.random.randint(n_total - n)
yield dict(X=X[idx:idx + n],
Y=Y[idx:idx + n])
# Using this streamer is similar to the first example, but now you need a separate
# NPY file for each X and Y
npy_x_files = #LIST OF PRE-COMPUTED NPY FILES (X)
npy_y_files = #LIST OF PRE-COMPUTED NPY FILES (Y)
streams = [pescador.Streamer(sample_npz, npy_x, npy_y n)
for (npy_x, npy_y) in zip(npy_x_files, npy_y_files)]
Buffered Streaming¶
In a machine learning setting, it is common to train a model with multiple input datapoints simultaneously, in what are commonly referred to as “minibatches”. To achieve this, pescador provides the Buffered Streamers, which will “buffer” your batches into fixed batch sizes.
Following up on the first example, we use the noisy_samples generator.
1 2 3 4 5 6 7 8 9 10 11 12 | import pescador
# Create an initial streamer
streamer = pescador.Streamer(noisy_samples, X[train], Y[train])
minibatch_size = 128
# Wrap your streamer
buffered_streamer = pescador.BufferedStreamer(streamer, minibatch_size)
# Generate batches in exactly the same way as you would from the base streamer
for batch in buffered_streamer():
...
|
A few important points to note about using Buffered Streamers:
- Buffered Streamers will concatenate your arrays, such that the first dimension contains the number of batches (minibatch_size in the above example.
- Each key in the batches generated will be concatenated (across all the batches buffered).
- A consequence of this is that you must make sure that your generators yield batches such that every key contains arrays shaped (N, ...), where N is the number of batches generated.
Examples¶
Gallery of Pescador Examples¶
Advanced Examples¶
Examples of more advanced Pescador usage.
ZMQ Example¶
An example of how to use a ZMQStreamer to generate samples, with some small benchmarks along the way.
# Imports
import numpy as np
import pescador
import time
Batch Generator¶
As always, you have to start with a generator function, which yields some simple batches. Since this is a toy example, we’re just yielding some random numbers of the appropriate shape.
It is important to remember that the first dimension is always the “samples dimension” (batch size), since the BufferedStreamer will concatenate batch components together along this dimension. Therefore, we have to force the target to be of 2 dimensions.
def batch_gen():
"""
Returns
-------
batch_dict : dict
A batch which looks like it might come from some
machine learning problem, with X as Features, and Y as targets.
"""
while True:
yield dict(X=np.random.random((1, 10)),
Y=np.atleast_2d(np.random.randint(10)))
Basic ZMQ Usage¶
Here is a trivial ZMQStreamer example, using it directly on top of a single Streamer. We leave it to your imagination to decide what you would actually do with the batches you receive here.
n_test_batches = 1e3
# Construct a streamer
s = pescador.Streamer(batch_gen)
# Wrap teh streamer in a ZMQ streamer
zs = pescador.ZMQStreamer(s)
# Get batches from the stream as you would normally.
t0 = time.time()
batch_count = 0
for batch in zs(max_batches=n_test_batches):
batch_count += len(batch['X'])
# Train your network, etc.
duration = time.time() - t0
print("Generated {} samples from ZMQ\n\t"
"in {:.5f}s ({:.5f} / sample)".format(
batch_count, duration, duration / batch_count))
# Outputs:
# > Generated 1000 samples from ZMQ
# > in 0.57073s (0.00057 / sample)
Buffering ZMQ¶
You could also wrap the ZMQStreamer in a BufferedStreamer, to produce “mini-batches” for training, etc.
Note: You could put the BufferedStreamer before or after the ZMQStreamer; it sould work both ways.
buffer_size = 10
buffered_zmq = pescador.BufferedStreamer(zs, buffer_size)
# Get batches from the stream as you would normally.
iter_count = 0
batch_count = 0
t0 = time.time()
for batch in buffered_zmq(max_batches=n_test_batches):
iter_count += 1
batch_count += len(batch['X'])
duration = time.time() - t0
print("Generated {} batches of {} samples from Buffered ZMQ Streamer"
"\n\tin {:.5f}s ({:.5f} / sample)".format(iter_count,
batch_count,
duration,
duration / batch_count))
# Outputs
# > Generated 1000 batches of 10000 samples from Buffered ZMQ Streamer
# > in 1.69138s (0.00017 / sample)
Total running time of the script: ( 0 minutes 0.000 seconds)
Muxing Multiple Datasets Together¶
Let’s say you have a machine learning task like digit recognition. You have multiple datasets, and you would like to sample from them evenly. Pescador’s Mux Streamer is a perfect tool to facilitate this sort of setup.
For this example, to simulate this experience, we will split the canonical MNIST training set evenly into three pieces, and save them to their own .npy files.
import numpy as np
from keras.datasets import mnist
import pescador
Prepare Datasets¶
dataset1_path = "/tmp/dataset1_train.npz"
dataset2_path = "/tmp/dataset2_train.npz"
dataset3_path = "/tmp/dataset3_train.npz"
datasets = [dataset1_path, dataset2_path, dataset3_path]
def load_mnist():
# the data, shuffled and split between train and test sets
(x_train, y_train), (x_test, y_test) = mnist.load_data()
return (x_train, y_train), (x_test, y_test)
def split_and_save_datasets(X, Y, paths):
"""Shuffle X and Y into n / len(paths) datasets, and save them
to disk at the locations provided in paths.
"""
shuffled_idxs = np.random.permutation(np.arange(len(X)))
for i in range(len(paths)):
# Take every len(paths) item, starting at i.
# len(paths) is 3, so this would be [0::3], [1::3], [2::3]
X_i = X[shuffled_idxs[i::len(paths)]]
Y_i = Y[shuffled_idxs[i::len(paths)]]
np.savez(paths[i], X=X_i, Y=Y_i)
(X_train, Y_train), (X_test, Y_test) = load_mnist()
split_and_save_datasets(X_train, Y_train, datasets)
Create Generator and Streams for each dataset.¶
def npz_generator(npz_path):
"""Generate data from an npz file."""
npz_data = np.load(npz_path)
X = npz_data['X']
# y's are binary vectors, and should be of shape (1, 10) after this.
y = npz_data['Y']
n = X.shape[0]
while True:
i = np.random.randint(0, n)
yield {'X': X[np.newaxis, i], 'Y': y[np.newaxis, i]}
streams = [pescador.Streamer(npz_generator, x) for x in datasets]
Option 1: Stream equally from each dataset¶
If you can easily fit all the datasets in memory and you want to sample from then equally, you would set up your Mux as follows:
mux = pescador.mux.Mux(streams,
# Three streams, always active.
k=len(streams),
# We want to sample from each stream infinitely,
# so we turn off the lam parameter, which
# controls how long to sample from each stream.
lam=None)
Option 2: Sample from one at a time.¶
Another approach might be to restrict sampling to one stream at a time. Now, the lam parameter controls (statistically) how long to sample from a stream before activating a new stream.
mux = pescador.mux.Mux(streams,
# Only allow one active stream
k=1,
# Sample on average 1000 samples from a stream before
# moving onto another one.
lam=1000)
Use the mux as a streamer¶
At this point, you can use the Mux as a streamer normally:
for data in mux():
process(data)
Total running time of the script: ( 0 minutes 0.000 seconds)
Framework Examples¶
Examples of how to use pescador with different ML Frameworks.
Contributions/Pull Requests are welcomed and encouraged.
A Keras Example¶
An example of how to use Pescador with Keras.
3/18/2017: Updated to Keras 2.0 API.
# Original Code source: https://github.com/fchollet/keras/blob/master/examples/mnist_cnn.py
Setup and Definitions¶
from __future__ import print_function
import numpy as np
import keras
from keras.datasets import mnist
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras import backend as K
import pescador
batch_size = 128
num_classes = 10
epochs = 12
# input image dimensions
img_rows, img_cols = 28, 28
Load and preprocess data¶
def setup_data():
# the data, shuffled and split between train and test sets
(x_train, y_train), (x_test, y_test) = mnist.load_data()
if K.image_data_format() == 'channels_first':
x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols)
x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols)
input_shape = (1, img_rows, img_cols)
else:
x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
input_shape = (img_rows, img_cols, 1)
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255
print('x_train shape:', x_train.shape)
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')
# convert class vectors to binary class matrices
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)
return input_shape, (x_train, y_train), (x_test, y_test)
Setup Keras model¶
def build_model(input_shape):
model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3),
activation='relu',
input_shape=input_shape))
model.add(Conv2D(64, kernel_size=(3, 3),
activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(num_classes, activation='softmax'))
model.compile(loss=keras.losses.categorical_crossentropy,
optimizer=keras.optimizers.Adadelta(),
metrics=['accuracy'])
return model
Define Data Generators¶
To add a little bit of complexity, and show a little of what you could do with Keras, we’ll add an additional generator which simply adds a little Gaussian noise to the data.
def data_generator(X, y):
'''A basic generator for the data.'''
X = np.atleast_2d(X)
# y's are binary vectors, and should be of shape (1, 10) after this.
y = np.atleast_2d(y)
n = X.shape[0]
while True:
i = np.random.randint(0, n)
yield {'X': X[np.newaxis, i], 'y': y[np.newaxis, i]}
def noisy_generator(X, y, scale=1e-1):
'''A modified version of the original generator which adds gaussian
noise to the original sample.
'''
noise_shape = X.shape[1:]
for sample in data_generator(X, y):
noise = scale * np.random.randn(*noise_shape)
yield {'X': sample['X'] + noise, 'y': sample['y']}
Put it all together¶
They key method for interfacing with Keras is the Streamer.tuples(), function of the streamer, which takes args of the batch key names to pass to Keras, since Keras’s fit_generator consumes tuples from the generator.
input_shape, (X_train, Y_train), (X_test, Y_test) = setup_data()
steps_per_epoch = len(X_train) // batch_size
# Create two streams from the same data, where one of the streams
# adds a small amount of Gaussian noise. You could easily perform
# other data augmentations using the same basic strategy.
basic_stream = pescador.Streamer(data_generator, X_train, Y_train)
noisy_stream = pescador.Streamer(noisy_generator, X_train, Y_train)
# Mux the two streams together.
mux = pescador.mux.Mux([basic_stream, noisy_stream],
# Two streams, always active.
2,
# We want to sample from each stream infinitely.
lam=None)
# Generate batches from the stream
training_streamer = pescador.BufferedStreamer(mux, batch_size)
model = build_model(input_shape)
model.fit_generator(
training_streamer.tuples('X', 'y', cycle=True),
steps_per_epoch=steps_per_epoch,
epochs=epochs,
verbose=1,
validation_data=(X_test, Y_test))
score = model.evaluate(X_test, Y_test, verbose=0)
print('Test score:', score[0])
print('Test accuracy:', score[1])
Total running time of the script: ( 0 minutes 0.000 seconds)
API Reference¶
API Reference¶
Streamer¶
-
class
pescador.
Streamer
(streamer, *args, **kwargs)¶ A wrapper class for reusable generators.
Wrapping generators/iterators within an object provides two useful features:
- Streamer objects can be serialized (as long as the generator can be)
- Streamer objects can instantiate a generator multiple times.
The first feature is important for parallelization (see zmq_stream), while the second feature facilitates infinite streaming from finite data (i.e., oversampling).
Examples
Generate batches of random 3-dimensional vectors
>>> def my_generator(n): ... for i in range(n): ... yield {'X': np.random.randn(1, 3)} >>> GS = Streamer(my_generator, 5) >>> for i in GS(): ... print(i)
Or with a maximum number of items
>>> for i in GS(max_items=3): ... print(i)
Or infinitely many examples, restarting the generator as needed
>>> for i in GS.cycle(): ... print(i)
An alternate interface for the same:
>>> for i in GS(cycle=True): ... print(i)
Attributes
generator (iterable or Streamer) A generator function or iterable collection to draw from. May be another instance or subclass of Streamer. args (list) kwargs (dict) If generator is a function, then args and kwargs provide the parameters to the function. -
__call__
(max_batches=None, cycle=False)¶ Parameters: max_batches : None or int > 0
Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.
cycle: bool
If True, cycle indefinitely.
Yields: batch : dict
Items from the contained generator If max_batches is an integer, then at most max_batches are generated.
-
activate
()¶ Activates the stream.
-
active
¶ Returns true if the stream is active (ie a StopIteration) has not been thrown.
-
cycle
()¶ Generates from the streamer infinitely.
This function will force an infinite stream, restarting the generator even if a StopIteration is raised.
Yields: batch
Items from the contained generator.
-
generate
(max_batches=None)¶ Instantiate the generator
Parameters: max_batches : None or int > 0
Maximum number of batches to yield. If
None
, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.Yields: batch : dict
Items from the contained generator If max_batches is an integer, then at most max_batches are generated.
-
tuples
(*items, **kwargs)¶ Generate data in tuple-form instead of dicts.
This is useful for interfacing with Keras’s generator system, which requires iterates to be provided as tuples.
Parameters: *items
One or more dictionary keys. The generated tuples will correspond to (batch[item1], batch[item2], ..., batch[itemk]) where batch is a single iterate produced by the streamer.
cycle : bool
If True, then data is generated infinitely using the cycle method. Otherwise, data is generated according to the generate method.
max_batches : None or int > 0
Maximum number of batches to yield. If
None
, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.Yields: batch : tuple
Items from the contained generator If max_batches is an integer, then at most max_batches are generated.
Buffered Streamers¶
-
class
pescador.
BufferedStreamer
(streamer, buffer_size, strict_batch_size=True)¶ Buffers a stream into batches of examples
Examples
>>> def my_generator(n): ... # Generates a single 30-dimensional example vector for each iterate ... for i in range(n): ... yield dict(X=np.random.randn(1, 30)) >>> # Wrap the generator in a Streamer >>> S = pescador.Streamer(my_generator, 128) >>> # A buffered streamer will combine N iterates into a single batch >>> N = 10 >>> B = pescador.BufferedStreamer(my_generator, N) >>> for batch in B(): ... # Work on a batch of N=10 examples ... MY_PROCESS_FUNCTION(batch)
-
__call__
(max_batches=None, cycle=False)¶ Parameters: max_batches : None or int > 0
Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.
cycle: bool
If True, cycle indefinitely.
Yields: batch : dict
Items from the contained generator If max_batches is an integer, then at most max_batches are generated.
-
activate
()¶ Activates the stream.
-
active
¶ Returns true if the stream is active (ie a StopIteration) has not been thrown.
-
cycle
()¶ Generates from the streamer infinitely.
This function will force an infinite stream, restarting the generator even if a StopIteration is raised.
Yields: batch
Items from the contained generator.
-
generate
(max_batches=None)¶ Generate samples from the streamer.
Parameters: max_batches : int
For the BufferedStreamer, max_batches is the number of buffered batches that are generated, not the number of individual samples.
-
tuples
(*items, **kwargs)¶ Generate data in tuple-form instead of dicts.
This is useful for interfacing with Keras’s generator system, which requires iterates to be provided as tuples.
Parameters: *items
One or more dictionary keys. The generated tuples will correspond to (batch[item1], batch[item2], ..., batch[itemk]) where batch is a single iterate produced by the streamer.
cycle : bool
If True, then data is generated infinitely using the cycle method. Otherwise, data is generated according to the generate method.
max_batches : None or int > 0
Maximum number of batches to yield. If
None
, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.Yields: batch : tuple
Items from the contained generator If max_batches is an integer, then at most max_batches are generated.
-
Multiplexing¶
-
class
pescador.
Mux
(seed_pool, k, lam=256.0, pool_weights=None, with_replacement=True, prune_empty_seeds=True, revive=False, random_state=None)¶ Stochastic multiplexor for Streamers
Examples
>>> # Create a collection of streamers >>> seeds = [pescador.Streamer(my_generator) for i in range(10)] >>> # Multiplex them together into a single streamer >>> # Use at most 3 streams at once >>> mux = pescador.Mux(seeds, k=3) >>> for batch in mux(): ... MY_FUNCTION(batch)
-
__call__
(max_batches=None, cycle=False)¶ Parameters: max_batches : None or int > 0
Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.
cycle: bool
If True, cycle indefinitely.
Yields: batch : dict
Items from the contained generator If max_batches is an integer, then at most max_batches are generated.
-
activate
()¶ Activates the seed pool
-
active
¶ Returns true if the stream is active (ie a StopIteration) has not been thrown.
-
cycle
()¶ Generates from the streamer infinitely.
This function will force an infinite stream, restarting the generator even if a StopIteration is raised.
Yields: batch
Items from the contained generator.
-
tuples
(*items, **kwargs)¶ Generate data in tuple-form instead of dicts.
This is useful for interfacing with Keras’s generator system, which requires iterates to be provided as tuples.
Parameters: *items
One or more dictionary keys. The generated tuples will correspond to (batch[item1], batch[item2], ..., batch[itemk]) where batch is a single iterate produced by the streamer.
cycle : bool
If True, then data is generated infinitely using the cycle method. Otherwise, data is generated according to the generate method.
max_batches : None or int > 0
Maximum number of batches to yield. If
None
, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.Yields: batch : tuple
Items from the contained generator If max_batches is an integer, then at most max_batches are generated.
See also
generate
,cycle
-
Parallel streaming¶
-
class
pescador.
ZMQStreamer
(streamer, min_port=49152, max_port=65535, max_tries=100, copy=False, timeout=None)¶ Parallel data streaming over zeromq sockets.
This allows a data generator to run in a separate process from the consumer.
A typical usage pattern is to construct a Streamer object from a generator and then use ZMQStreamer to execute the stream in one process while the other process consumes data.
Examples
>>> # Construct a streamer object >>> S = pescador.Streamer(my_generator) >>> # Wrap the streamer in a ZMQ streamer >>> Z = pescador.ZMQStreamer(S) >>> # Process as normal >>> for batch in Z(): ... MY_FUNCTION(batch)
-
__call__
(max_batches=None, cycle=False)¶ Parameters: max_batches : None or int > 0
Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.
cycle: bool
If True, cycle indefinitely.
Yields: batch : dict
Items from the contained generator If max_batches is an integer, then at most max_batches are generated.
-
activate
()¶ Activates the stream.
-
active
¶ Returns true if the stream is active (ie a StopIteration) has not been thrown.
-
cycle
()¶ Generates from the streamer infinitely.
This function will force an infinite stream, restarting the generator even if a StopIteration is raised.
Yields: batch
Items from the contained generator.
-
generate
(max_batches=None)¶ Note: A ZMQStreamer does not activate it’s stream, but allows the zmq_worker to do that.
Yields: batch
Data drawn from streamer(max_batches).
-
tuples
(*items, **kwargs)¶ Generate data in tuple-form instead of dicts.
This is useful for interfacing with Keras’s generator system, which requires iterates to be provided as tuples.
Parameters: *items
One or more dictionary keys. The generated tuples will correspond to (batch[item1], batch[item2], ..., batch[itemk]) where batch is a single iterate produced by the streamer.
cycle : bool
If True, then data is generated infinitely using the cycle method. Otherwise, data is generated according to the generate method.
max_batches : None or int > 0
Maximum number of batches to yield. If
None
, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.Yields: batch : tuple
Items from the contained generator If max_batches is an integer, then at most max_batches are generated.
-
Changes¶
Changes¶
v1.0.0¶
This release constitutes a major revision over the 0.x series, and the new interface is not backward-compatible.
- #23 Preserve memory alignment of numpy arrays across ZMQ streams
- #34 Rewrite of all core functionality under a unified object interface
Streamer
. - #35, #52 Removed the StreamLearner interface and scikit-learn dependency
- #44 Added a base exception class PescadorError
- #45 Removed the util submodule
- #47, #60 Improvements to documentation
- #48 Fixed a timeout bug in ZMQ streamer
- #53 Added testing and support for python 3.6
- #57 Added the .tuple() interface for Streamers
- #61 Improved test coverage
- #63 Added random state to Mux
- #64 Added __call__ interface to Streamer
v0.1.3¶
- Added support for
joblib>=0.10
v0.1.2¶
- Added
pescador.mux
parameter revive. Calling with with_replacement=False, revive=True will use each seed at most once at any given time. - Added
pescador.zmq_stream
parameter timeout. Setting this to a positive number will terminate dangling worker threads after timeout is exceeded on join. See also:multiprocessing.Process.join
.
v0.1.1¶
pescador.mux
now throws aRuntimeError
exception if the seed pool is empty
v0.1.0¶
Initial public release