Pescador¶
Pescador is a library for streaming (numerical) data, primarily for use in machine learning applications.
Pescador addresses the following use cases:
- Hierarchical sampling
- Out-of-core learning
- Parallel streaming
These use cases arise in the following common scenarios:
Say you have three data sources (A, B, C) that you want to sample. For example, each data source could contain all the examples of a particular category.
Pescador can dynamically interleave these sources to provide a randomized stream D <- (A, B, C). The distribution over (A, B, C) need not be uniform: you can specify any distribution you like!
Now, say you have 3000 data sources, each of which may contain a large number of samples. Maybe that’s too much data to fit in RAM at once.
Pescador makes it easy to interleave these sources while maintaining a small working set. Not all sources are simultaneously active, but Pescador manages the working set so you don’t have to. This way, you can process the full data set out of core, but using a bounded amount of memory.
If loading data incurs substantial latency (e.g., due to accessing storage access or pre-processing), this can be a problem.
Pescador can seamlessly move data generation into a background process, so that your main thread can continue working.
To make this all possible, Pescador provides the following utilities:
- Streamer objects encapsulate data generators for re-use, infinite sampling, and inter-process communication.
- Multiplexing objects allow flexible sampling from multiple streams
- Parallel streaming provides parallel processing with low communication overhead
- Transform or modify streams with Maps (see Processing Data Streams)
- Buffering of sampled data into fixed-size batches (see pescador.buffer_stream)
Installation¶
Pescador can be installed from PyPI through pip:
pip install pescador
or via conda using the conda-forge channel:
conda install -c conda-forge pescador
Introduction¶
Introduction¶
Pescador’s primary goal is to provide fine-grained control over data streaming and sampling. These problems can get complex quickly, so this section provides an overview of the concepts underlying Pescador’s design, and a summary of the provided functionality.
Definitions¶
To understand what pescador does, it will help to establish some common terminology. If you’re not already familiar with Python’s iterator and generator concepts, here’s a quick synopsis:
- An iterator is an object that produces a sequence of data, i.e. via
__next__
/next()
. - An iterable is an object that can produce iterators, i.e. via
__iter__
/iter()
.- See: iterable definition
- A generator (or more precisely generator function) is a callable object that returns a single iterator.
- See: generator definition
- Pescador defines a stream as the sequence of objects produced by an iterator.
- For example:
range
is an iterable functionrange(8)
is an iterable, and its iterator produces the stream:0, 1, 2, 3, ...
Streaming Data¶
Pescador defines an object called a Streamer for the purposes of (re)creating iterators indefinitely and (optionally) interrupting them prematurely.
Streamer implements the iterable interface, and can be iterated directly.
- A Streamer can be initialized with one of two types:
- Any iterable type, e.g.
range(7)
,['foo', 'bar']
,"abcdef"
, or anotherStreamer
- A generator function and its arguments + keyword arguments.
- Any iterable type, e.g.
A Streamer transparently yields the data stream flowing through it
- A Streamer should not modify objects in its stream.
- In the spirit of encapsulation, the modification of data streams is achieved through separate functionality (see Processing Data Streams)
Multiplexing Data Streams¶
- Pescador defines a family of multiplexer or Mux classes for the purposes of multiplexing streams of data. For stochastic sampling applications, ShuffledMux and StochasticMux are the most useful classes.
- BaseMux inherits from Streamer, which makes all muxes both iterable and recomposable. Muxes allow you to construct arbitrary trees of data streams. This is useful for hierarchical sampling.
- Muxes are initialized with a container of one or more streamers, and parameters to control the mux’s sampling behavior..
- As a subclass of Streamer, a Mux also transparently yields the stream flowing through it, i.e. Streaming Data.
Processing Data Streams¶
Pescador adopts the concept of “transformers” for processing data streams.
- A transformer takes as input a single object in the stream.
- A transformer yields an object.
- Transformers are iterators, i.e. implement a __next__ method, to preserve iteration.
- An example of a built-in transformer is enumerate [ref]
Why Pescador?¶
Why Pescador?¶
Pescador was developed in response to a variety of recurring problems related to data streaming for training machine learning models. After implementing custom solutions each time these problems occurred, we converged on a set of common solutions that can be applied more broadly. The solutions provided by Pescador may or may not fit your problem. This section of the documentation will attempt to help you figure out if Pescador is useful for your application.
Hierarchical sampling¶
Hierarchical sampling refers to any process where you want to sample data from a distribution by conditioning on one or more variables. For example, say you have a distribution over real-valued observations X and categorical labels Y, and you want to sample labeled observations (X, Y). A hierarchical sampler might first select a value for Y, and then randomly draw an example X that has that label. This is equivalent to exploiting the laws of conditional probability: \(P[X, Y] = P[X|Y] \times P[Y]\).
Hierarchical sampling can be useful when dealing with highly imbalanced data, where it may sometimes be better to learn from a balanced sample and then explicitly correct for imbalance within the model.
It can also be useful when dealing with data that has natural grouping substructure beyond categories. For example, when modeling a large collection of audio files, each file may generate multiple observations, which will all be mutually correlated. Hierarchical sampling can be useful in neutralizing this bias during the training process.
Pescador implements hierarchical sampling through a family of Multiplexing classes. In its simplest form, the ShuffledMux takes as input a set of Streamer objects from which samples are drawn randomly. This effectively generates data by a process similar to the following pseudo-code:
1 2 3 | while True:
stream_id = random_choice(streamers)
yield next(streamers[stream_id])
|
The ShuffledMux object also lets you specify an arbitrary distribution over the set of streamers, giving you fine-grained control over the resulting distribution of samples.
Muxes are also Streamers, so sampling hierarchies can be nested arbitrarily deep.
Out-of-core sampling¶
Another common problem occurs when the size of the dataset is too large for the machine to fit in RAM simultaneously. Going back to the audio example above, consider a problem where there are 30,000 source files, each of which generates 1GB of observation data, and the machine can only fit 100 source files in memory at any given time.
To facilitate this use case, the StochasticMux object allows you to specify a maximum number of simultaneously active streams (i.e., the working set). In this case, you would most likely implement a generator for each file as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | def sample_file(filename):
# Load observation data
X = np.load(filename)
while True:
# Generate a random row as a dictionary
yield dict(X=X[np.random.choice(len(X))])
streamers = [pescador.Streamer(sample_file, fname) for fname in ALL_30K_FILES]
# Keep 100 streamers active at a time
# Replace a streamer after it has generated (on average) 8 samples
for item in pescador.StochasticMux(streamers, n_active=100, rate=8):
model.partial_fit(item['X'])
|
Note that data is not loaded until the generator is instantiated. If you specify a working set of size n_active=100, then StochasticMux will select 100 streamers at random to form the working set, and only sample data from within that set. StochasticMux will then randomly evict streamers from the working set and replace them with new streamers, according to its rate parameter. This results in a simple interface to draw data from all input sources but using limited memory.
StochasticMux provides a great deal of flexibility over how streamers are replaced, what to do when streamers are exhausted, etc.
In addition to ShuffledMux and StochasticMux, there are also deterministic multiplexers ChainMux and RoundRobinMux, which are useful when random sampling is undesirable.
Parallel processing¶
In the above example, all of the data I/O was handled within the generator function. If the generator requires high-latency operations such as disk-access, this can become a computational bottleneck.
Pescador makes it easy to migrate data generation into a background process, so that high-latency operations do not stall the main thread. This is facilitated by the Parallel streaming object, which acts as a simple wrapper around any streamer that produces samples in the form of dictionaries of numpy arrays. Continuing the above example:
1 2 3 4 | mux_stream = pescador.StochasticMux(streamers, n_active=100, rate=8)
for item in pescador.ZMQStreamer(mux_stream):
model.partial_fit(item['X'])
|
Simple interface¶
Finally, Pescador is intended to work with a variety of machine learning frameworks, such as scikit-learn and Keras. While many frameworks provide custom tools for handling data pipelines, each one is different, and many require using specific data structures and formats.
Pescador is meant to be framework-agnostic, and allow you to write your own data generation logic using standard Python data structures (dictionaries and numpy arrays). We also provide helper utilities to integrate with Keras’s tuple generator interface.
Basic examples¶
Basic examples¶
Streaming data¶
This example will walk through the basics of using pescador to stream samples from a generator.
Our running example will be learning from an infinite stream of stochastically perturbed samples from the Iris dataset.
Sample generators¶
Streamers are intended to transparently pass data without modifying them. However, Pescador assumes that Streamers produce output in a particular format. Specifically, a data is expected to be a python dictionary where each value contains a np.ndarray. For an unsupervised learning (e.g., SKLearn/MiniBatchKMeans), the data might contain only one key: X. For supervised learning (e.g., SGDClassifier), valid data would contain both X and Y keys, both of equal length.
Here’s a simple example generator that draws random samples of data from the Iris dataset, and adds gaussian noise to the features.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | import numpy as np
def noisy_samples(X, Y, sigma=1.0):
'''Generate an infinite stream of noisy samples from a labeled dataset.
Parameters
----------
X : np.ndarray, shape=(d,)
Features
Y : np.ndarray, shape=(,)
Labels
sigma : float > 0
Variance of the additive noise
Yields
------
sample : dict
sample['X'] is an `np.ndarray` of shape `(d,)`
sample['Y'] is a scalar `np.ndarray` of shape `(,)`
'''
n, d = X.shape
while True:
i = np.random.randint(0, n)
noise = sigma * np.random.randn(1, d)
yield dict(X=X[i] + noise, Y=Y[i])
|
In the code above, noisy_samples is a generator that can be sampled indefinitely because noisy_samples contains an infinite loop. Each iterate of noisy_samples will be a dictionary containing the sample’s features and labels.
Streamers¶
Generators in python have a couple of limitations for common stream learning pipelines. First, once instantiated, a generator cannot be “restarted”. Second, an instantiated generator cannot be serialized directly, so they are difficult to use in distributed computation environments.
Pescador provides the Streamer class to circumvent these issues. Streamer simply provides an object container for an uninstantiated generator (and its parameters), and an access method generate(). Calling generate() multiple times on a Streamer object is equivalent to restarting the generator, and can therefore be used to simply implement multiple pass streams. Similarly, because Streamer can be serialized, it is simple to pass a streamer object to a separate process for parallel computation.
Here’s a simple example, using the generator from the previous section.
1 2 3 4 5 | import pescador
streamer = pescador.Streamer(noisy_samples, X[train], Y[train])
stream2 = streamer.iterate()
|
Iterating over streamer.iterate() is equivalent to iterating over noisy_samples(X[train], Y[train]).
Additionally, Streamer can be bounded easily by saying streamer.iterate(max_iter=N) for some N maximum number of samples.
Finally, because iterate() is such a common operation with streamer objects, a short-hand interface is provided by treating the streamer object as if it was a generator:
1 2 3 4 5 6 | import pescador
streamer = pescador.Streamer(noisy_samples, X[train], Y[train])
# Equivalent to stream2 above
stream3 = streamer()
|
Iterating over any of these would then look like the following:
1 2 3 4 5 6 7 8 | for sample in streamer.iterate():
# do something
...
# For convenience, the object directly behaves as an iterator.
for sample in streamer:
# do something
...
|
This example demonstrates how to re-use and multiplex streamers.
We will assume a working understanding of the simple example in the previous section.
Stream re-use and multiplexing¶
The StochasticMux streamer provides a powerful interface for randomly interleaving samples from multiple input streams. StochasticMux can also dynamically activate and deactivate individual Streamers, which allows it to operate on a bounded subset of streams at any given time.
As a concrete example, we can simulate a mixture of noisy streams with differing variances.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | from __future__ import print_function
import numpy as np
from sklearn.datasets import load_breast_cancer
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import ShuffleSplit
from sklearn.metrics import accuracy_score
from pescador import Streamer, StochasticMux
def noisy_samples(X, Y, sigma=1.0):
'''Copied over from the previous example'''
n, d = X.shape
while True:
i = np.random.randint(0, n, size=1)
noise = sigma * np.random.randn(1, d)
yield dict(X=X[i] + noise, Y=Y[i])
# Load some example data from sklearn
raw_data = load_breast_cancer()
X, Y = raw_data['data'], raw_data['target']
classes = np.unique(Y)
rs = ShuffleSplit(n_splits=1, test_size=0.1)
for train, test in rs.split(X):
# Instantiate a linear classifier
estimator = SGDClassifier()
# Build a collection of Streamers with different noise scales
streams = [Streamer(noisy_samples, X[train], Y[train], sigma=sigma)
for sigma in [0, 0.5, 1.0, 2.0, 4.0]]
# Build a mux stream, keeping 3 streams alive at once
mux_stream = StochasticMux(streams,
3, # Keep 3 streams alive at once
rate=64) # Use a poisson rate of 64
# Fit the model to the stream, use at most 5000 samples
for sample in mux_stream(max_iter=5000):
estimator.partial_fit(sample['X'], sample['Y'], classes=classes)
# And report the accuracy
Ypred = estimator.predict(X[test])
print('Test accuracy: {:.3f}'.format(accuracy_score(Y[test], Ypred)))
|
In the above example, each Streamer in streams can make infinitely many samples. The rate=64 argument to StochasticMux says that each stream should produce some n samples, where n is sampled from a Poisson distribution of rate rate. When a stream exceeds its bound, it is deactivated, and a new streamer is activated to fill its place.
Setting rate=None disables the random stream bounding, and simply runs each active stream until exhaustion.
The StochasticMux streamer can sampled with or without replacement from its input streams, according to the mode option. Setting this parameter to single_active means that each stream can be active at most once.
Streams can also be sampled with non-uniform weighting by specifying a vector of weights.
Finally, exhausted streams can be removed by setting mode=’exhaustive’.
Sampling from disk¶
A common use case for pescador is to sample data from a large collection of existing archives. As a concrete example, consider the problem of fitting a statistical model to a large corpus of musical recordings. When the corpus is sufficiently large, it is impossible to fit the entire set in memory while estimating the model parameters. Instead, one can pre-process each song to store pre-computed features (and, optionally, target labels) in a numpy zip NPZ archive. The problem then becomes sampling data from a collection of NPZ archives.
Here, we will assume that the pre-processing has already been done so that each NPZ file contains a numpy array of features X and labels Y. We will define infinite samplers that pull n examples per iterate.
import numpy as np
import pescador
def sample_npz(npz_file, n):
'''Generate an infinite sequence of contiguous samples
from the input `npz_file`.
Each iterate has `n > 0` rows.
'''
with np.load(npz_file) as data:
# How many rows are in the data?
# We assume that data['Y'] has the same length
n_total = len(data['X'])
while True:
# Compute the index offset
idx = np.random.randint(n_total - n)
yield dict(X=data['X'][idx:idx + n],
Y=data['Y'][idx:idx + n])
Applying the sample_npz function above to a list of npz_files, we can make a multiplexed streamer object as follows:
n = 16
npz_files = #LIST OF PRE-COMPUTED NPZ FILES
streams = [pescador.Streamer(sample_npz, npz_f, n) for npz_f in npz_files]
# Keep 32 streams alive at once
# Draw on average 16 patches from each stream before deactivating
mux_stream = pescador.StochasticMux(streams, n_active=32, rate=16)
for batch in mux_stream(max_iter=1000):
# DO LEARNING HERE
pass
Memory-mapping¶
The NPZ file format requires loading the entire contents of each archive into memory. This can lead to high memory consumption when the number of active streams is large. Note also that memory usage for each NPZ file will persist for as long as there is a reference to its contents. This can be circumvented, at the cost of some latency, by copying data within the streamer function:
def sample_npz_copy(npz_file, n):
with np.load(npz_file) as data:
# How many rows are in the data?
# We assume that data['Y'] has the same length
n_total = len(data['X'])
while True:
# Compute the index offset
idx = np.random.randint(n_total - n)
yield dict(X=data['X'][idx:idx + n].copy(), # <-- Note the explicit copy
Y=data['Y'][idx:idx + n].copy())
The above modification will ensure that memory is freed as quickly as possible.
Alternatively, memory-mapping can be used to only load data as needed, but requires that each array is stored in its own NPY file:
def sample_npy_mmap(npy_x, npy_y, n):
# Open each file in "copy-on-write" mode, so that the files are read-only
X = np.load(npy_x, mmap_mode='c')
Y = np.load(npy_y, mmap_mode='c')
n_total = len(X)
while True:
# Compute the index offset
idx = np.random.randint(n_total - n)
yield dict(X=X[idx:idx + n],
Y=Y[idx:idx + n])
# Using this streamer is similar to the first example, but now you need a separate
# NPY file for each X and Y
npy_x_files = #LIST OF PRE-COMPUTED NPY FILES (X)
npy_y_files = #LIST OF PRE-COMPUTED NPY FILES (Y)
streams = [pescador.Streamer(sample_npy_mmap, npy_x, npy_y, n)
for (npy_x, npy_y) in zip(npy_x_files, npy_y_files)]
# Then construct the `StochasticMux` from the streams, as above
mux_streamer = pescador.StochasticMux(streams, n_active=32, rate=16)
...
Buffered Streaming¶
In a machine learning setting, it is common to train a model with multiple input datapoints simultaneously, in what are commonly referred to as “minibatches”. To achieve this, pescador provides the pescador.maps.buffer_stream map transformer, which will “buffer” a data stream into fixed batch sizes.
Following up on the first example, we use the noisy_samples generator.
1 2 3 4 5 6 7 8 9 10 11 12 | import pescador
# Create an initial streamer
streamer = pescador.Streamer(noisy_samples, X[train], Y[train])
minibatch_size = 128
# Wrap your streamer
buffered_sample_gen = pescador.buffer_stream(streamer, minibatch_size)
# Generate batches in exactly the same way as you would from the base streamer
for batch in buffered_sample_gen:
...
|
A few important points to note about using pescador.maps.buffer_stream:
- pescador.maps.buffer_stream will concatenate your arrays, adding a new sample dimension such that the first dimension contains the number of batches (minibatch_size in the above example). e.g. if your samples are shaped (4, 5), a batch size of 10 will produce arrays shaped (10, 4, 5)
- Each key in the batches generated will be concatenated (across all the samples buffered).
- pescador.maps.buffer_stream, like all pescador.maps transformers, returns a generator, not a Streamer. So, if you still want it to behave like a streamer, you have to wrap it in a streamer. Following up on the previous example:
1 2 3 4 5 6 7 8 9 10 11 | batch_streamer = pescador.Streamer(buffered_sample_gen)
# Generate batches as a streamer:
for batch in batch_streamer:
# batch['X'].shape == (minibatch_size, ...)
# batch['Y'].shape == (minibatch_size, 1)
...
# Or, another way:
batch_streamer = pescador.Streamer(pescador.buffer_stream, streamer, minibatch_size)
|
Advanced examples¶
Gallery of Pescador Examples¶
Advanced Examples¶
Examples of more advanced Pescador usage.
Note
Click here to download the full example code
Parallel streaming¶
An example of how to use a ZMQStreamer to generate samples in a background process, with some small benchmarks along the way.
# Imports
import numpy as np
import pescador
import time
Computational Load¶
Often, the act of generating samples will be computationally expensive, place a heavy load on disk I/O, or both. Here, we can mimic costly processing by doing a bunch of pointless math on an array.
def costly_function(X, n_ops=100):
for n in range(n_ops):
if (n % 2):
X = X ** 3.0
else:
X = X ** (1. / 3)
return X
Sample Generator¶
Here we define a sampler function that yields some simple data. We’ll run some computation on the inside to slow things down a bit.
def data_gen(n_ops=100):
"""Yield data, while optionally burning compute cycles.
Parameters
----------
n_ops : int, default=100
Number of operations to run between yielding data.
Returns
-------
data : dict
A object which looks like it might come from some
machine learning problem, with X as features, and y as targets.
"""
while True:
X = np.random.uniform(size=(64, 64))
yield dict(X=costly_function(X, n_ops),
y=np.random.randint(10, size=(1,)))
def timed_sampling(stream, n_iter, desc):
start_time = time.time()
for data in stream(max_iter=n_iter):
costly_function(data['X'])
duration = time.time() - start_time
print("{} :: Average time per iteration: {:0.3f} sec"
.format(desc, duration / max_iter))
max_iter = 1e2
# Construct a streamer
stream = pescador.Streamer(data_gen)
timed_sampling(stream, max_iter, 'Single-threaded')
# Single-threaded :: Average time per iteration: 0.024 sec
Basic ZMQ Usage¶
Here is a trivial ZMQStreamer example, using it directly on top of a single Streamer. We leave it to your imagination to decide what you would actually do with the batches you receive here.
# Wrap the streamer in a ZMQ streamer
zstream = pescador.ZMQStreamer(stream)
timed_sampling(zstream, max_iter, 'ZMQ')
# ZMQ :: Average time per iteration: 0.012 sec
ZMQ with Map Functions¶
You will also likely want to buffer samples for building mini-batches. Here, we demonstrate best practices for using map functions in a stream pipeline.
buffer_size = 16
# Get batches from the stream as you would normally.
batches = pescador.Streamer(pescador.buffer_stream, stream, buffer_size)
timed_sampling(batches, max_iter, 'Single-threaded Batches')
# Single-threaded Batches :: Average time per iteration: 0.392 sec
zstream = pescador.ZMQStreamer(batches)
timed_sampling(zstream, max_iter, 'ZMQ Batches')
# ZMQ Batches :: Average time per iteration: 0.196 sec
Total running time of the script: ( 0 minutes 0.000 seconds)
Framework Examples¶
Examples of how to use pescador with different ML Frameworks.
Contributions/Pull Requests are welcomed and encouraged.
Note
Click here to download the full example code
A Keras Example¶
An example of how to use Pescador with Keras.
Original Code source: https://github.com/fchollet/keras/blob/master/examples/mnist_cnn.py
Setup and Definitions¶
from __future__ import print_function
import datetime
import keras
from keras.datasets import mnist
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras import backend as K
import numpy as np
import pescador
batch_size = 128
num_classes = 10
epochs = 12
# input image dimensions
img_rows, img_cols = 28, 28
Load and preprocess data¶
def setup_data():
"""Load and shape data for training with Keras + Pescador.
Returns
-------
input_shape : tuple, len=3
Shape of each sample; adapts to channel configuration of Keras.
X_train, y_train : np.ndarrays
Images and labels for training.
X_test, y_test : np.ndarrays
Images and labels for test.
"""
# The data, shuffled and split between train and test sets
(x_train, y_train), (x_test, y_test) = mnist.load_data()
if K.image_data_format() == 'channels_first':
x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols)
x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols)
input_shape = (1, img_rows, img_cols)
else:
x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
input_shape = (img_rows, img_cols, 1)
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255
print('x_train shape:', x_train.shape)
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')
# convert class vectors to binary class matrices
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)
return input_shape, (x_train, y_train), (x_test, y_test)
Setup Keras model¶
def build_model(input_shape):
"""Create a compiled Keras model.
Parameters
----------
input_shape : tuple, len=3
Shape of each image sample.
Returns
-------
model : keras.Model
Constructed model.
"""
model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3),
activation='relu',
input_shape=input_shape))
model.add(Conv2D(64, kernel_size=(3, 3),
activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(num_classes, activation='softmax'))
model.compile(loss=keras.losses.categorical_crossentropy,
optimizer=keras.optimizers.Adadelta(),
metrics=['accuracy'])
return model
Define Data Sampler¶
def sampler(X, y):
'''A basic generator for sampling data.
Parameters
----------
X : np.ndarray, len=n_samples, ndim=4
Image data.
y : np.ndarray, len=n_samples, ndim=2
One-hot encoded class vectors.
Yields
------
data : dict
Single image sample, like {X: np.ndarray, y: np.ndarray}
'''
X = np.atleast_2d(X)
# y's are binary vectors, and should be of shape (10,) after this.
y = np.atleast_1d(y)
n = X.shape[0]
while True:
i = np.random.randint(0, n)
yield {'X': X[i], 'y': y[i]}
Define a Custom Map Function¶
def additive_noise(stream, key='X', scale=1e-1):
'''Add noise to a data stream.
Parameters
----------
stream : iterable
A stream that yields data objects.
key : string, default='X'
Name of the field to add noise.
scale : float, default=0.1
Scale factor for gaussian noise.
Yields
------
data : dict
Updated data objects in the stream.
'''
for data in stream:
noise_shape = data[key].shape
noise = scale * np.random.randn(*noise_shape)
data[key] = data[key] + noise
yield data
Put it all together¶
input_shape, (X_train, Y_train), (X_test, Y_test) = setup_data()
steps_per_epoch = len(X_train) // batch_size
# Create two streams from the same data, where one of the streams
# adds a small amount of Gaussian noise. You could easily perform
# other data augmentations using the same 'map' strategy.
stream = pescador.Streamer(sampler, X_train, Y_train)
noisy_stream = pescador.Streamer(additive_noise, stream, 'X')
# Multiplex the two streamers together.
mux = pescador.StochasticMux([stream, noisy_stream],
# Two streams, always active.
n_active=2,
# We want to sample from each stream infinitely.
rate=None)
# Buffer the stream into minibatches.
batches = pescador.buffer_stream(mux, batch_size)
model = build_model(input_shape)
try:
print("Start time: {}".format(datetime.datetime.now()))
model.fit_generator(
pescador.tuples(batches, 'X', 'y'),
steps_per_epoch=steps_per_epoch,
epochs=epochs,
verbose=1,
validation_data=(X_test, Y_test))
except KeyboardInterrupt:
print("Stopping early")
finally:
print("Finished: {}".format(datetime.datetime.now()))
scores = model.evaluate(X_test, Y_test, verbose=0)
for val, name in zip(scores, model.metrics_names):
print('Test {}: {:0.4f}'.format(name, val))
Total running time of the script: ( 0 minutes 0.000 seconds)
Multiplexing examples¶
Examples of how to use stream multiplexing in applications.
Note
Click here to download the full example code
Using ChainMux for repeatable streams¶
Some applications call for deterministic, repeatable data streams, rather than stochastic samples. A common use case is validation in machine learning applications, where a held-out set of data is used to estimate the quality of a model during training. The validation score is computed repeatedly as the model changes, and the resulting scores are compared to each other to find the best version of the model. The simplest way to ensure that the validation scores are comparable is to use the same sample set each time. With Pescador, this is most easily achieved by using the ChainMux.
# Imports
import numpy as np
import pescador
Setup¶
We’ll assume that the validation data lives in some N files Each file produces M examples, so the total validation set has N*M examples
val_files = ['file1.npz', 'file2.npz']
N = len(val_files)
M = 10 # or whatever the number of examples per file is
Data generator¶
We’ll make a simple generator that streams the first m examples from an npz file. The npz file is assumed to store two arrays: X and Y containing inputs and outputs (eg, images and class labels) Once the streamer produces m examples, it exits.
def data_gen(filename, m):
data = np.load(filename)
X = data['X']
Y = data['Y']
for i in range(m):
yield dict(X=X[i], y=Y[i])
Constructing the streamers¶
First, we’ll make a streamer for each validation example.
val_streams = [pescador.Streamer(data_gen, fn, M) for fn in val_files]
Building the mux¶
The ChainMux can be used to combine data from all val_streams in order. We’ll use cycle mode here, so that the chain restarts after all of the streamers have been exhausted. This produces an infinite stream of data from a finite sequence that repeats every N*M steps. This can be used in keras’s fit_generator function with validation_steps=N*M to ensure that the validation set is constant at each epoch.
val_stream = pescador.ChainMux(val_streams, mode='cycle')
Total running time of the script: ( 0 minutes 0.000 seconds)
Note
Click here to download the full example code
Using cycle mode to create data epochs¶
In some machine learning applications, it is common to train in epochs where each epoch consists of a full pass through the data set. There are a few ways to produce this behavior with pescador, depending on the exact sampling properties you have in mind.
- If presentation order does not matter, and a deterministic sequence is acceptable, then this can be achieved with ChainMux as demonstrated in Using ChainMux for repeatable streams. This is typically a good approach for validation or evaluation, but not training, since the deterministic sequence order could bias the model.
- If you want random presentation order, but want to ensure that all data is touched once per epoch, then the StochasticMux can be used in cycle mode to restart all streamers once they’ve been exhausted.
# Imports
import numpy as np
import pescador
Setup¶
We’ll assume that the data lives in some N files For convenience, we’ll assume that each file produces M examples. Our goal is to ensure that each example is generated once per epoch.
files = ['file1.npz', 'file2.npz', 'file3.npz']
N = len(files)
M = 10 # or whatever the number of examples per file is
Data generator¶
We’ll make a simple generator that streams the first m examples from a given file. The npz file is assumed to store two arrays: X and Y containing inputs and outputs. Once the streamer produces m examples, it exits.
def data_gen(filename, m):
data = np.load(filename)
X = data['X']
Y = data['Y']
for i in range(m):
yield dict(X=X[i], y=Y[i])
Constructing the streamers¶
We’ll make a streamer for each source file
streams = [pescador.Streamer(data_gen, fn, M) for fn in files]
Epochs with StochasticMux¶
The StochasticMux has three modes of operation, which control how its input streams are activated and replaced: - mode=’with_replacement’ allows each streamer to be activated
multiple times, even simultaneously.
- mode=’single_active’ does not allow a streamer to be active more than once at a time, but an inactive streamer can be activated at any time.
- mode=’exhaustive’ is like single_active, but does not allow previously used streamers to be re-activated.
For epoch-based sampling, we will use exhaustive mode to ensure that streamers are not reactivated within the epoch.
Since each data stream produces exactly M examples, this would lead to a finite sample stream (i.e., only one epoch). To prevent the mux from exiting after the first epoch, we’ll use cycle mode.
k = 100 # or however many streamers you want simultaneously active
# We'll use `rate=None` here so that the number of samples per stream is
# determined by the streamer (`M`) and not the mux.
mux = pescador.StochasticMux(streams, k, rate=None, mode='exhaustive')
epoch_stream = mux(cycle=True)
The epoch_stream will produce an infinite sequence of iterates. The same samples are presented (in random order) in the first N*M, second N*M, etc. disjoint sub-sequences, each of which may be considered as an epoch.
NOTE: for this approach to work with something like keras’s fit_generator method, you need to be able to explicitly calculate the duration of an epoch, which means that the number of samples per streamer (M here) must be known in advance.
Total running time of the script: ( 0 minutes 0.000 seconds)
Note
Click here to download the full example code
Hierarchical sampling with ShuffledMux¶
Hierarchical sampling can be done in pescador by composing mux objects. This example will illustrate how to use different types of muxen to achieve different effects.
As a motivating example, consider a machine learning problem where training data are images belonging to 100 classes, such as car, boat, pug, etc. For each class, you have a list of images belonging to that class. Perhaps the total number of images is larger than fits comfortably in memory, but you still want to produce a stream of training data with uniform class presentation.
To solve this in pescador, we will first create a StochasticMux for each sub-population (e.g., one for cars, one for boats, etc.), which will maintain a small active set. We will then combine those sub-population muxen using ShuffledMux to produce the output stream.
# Code source: Brian McFee
# License: BSD 3 Clause
from __future__ import print_function
import pescador
Setup¶
We’ll demonstrate this with a simpler problem involving two populations of streamers:
- Population 1 generates upper-case letters
- Population 2 generates lower-case letters
# First, let's make a simple generator that makes an infinite
# sequence of a given letter.
def letter(c):
while True:
yield c
# Let's make the two populations of streamers
pop1 = [pescador.Streamer(letter, c) for c in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ']
pop2 = [pescador.Streamer(letter, c) for c in 'abcdefghijklmnopqrstuvwxyz']
# We'll sample population 1 with 3 streamers active at any time.
# Each streamer will generate, on average, 5 samples before being
# replaced.
mux1 = pescador.StochasticMux(pop1, 3, 5)
# Let's have 5 active streamers for population 2, and replace
# them after 2 examples on average.
mux2 = pescador.StochasticMux(pop2, 5, 2)
Mux composition¶
We multiplex the two populations using a ShuffledMux. The ShuffledMux keeps all of its input streamers active, and draws samples independently at random from each one.
# This should generate an approximately equal number of upper- and
# lower-case letters, with more diversity among the lower-case letters.
hier_mux = pescador.ShuffledMux([mux1, mux2])
print(''.join(hier_mux(max_iter=80)))
Weighted sampling¶
If you want to specify the sampling probability of mux1 and mux2, you can supply weights to the ShuffledMux. By default, each input is equally likely.
# This should generate three times as many upper-case as lower-case letters.
weight_mux = pescador.ShuffledMux([mux1, mux2], weights=[0.75, 0.25])
print(''.join(weight_mux(max_iter=80)))
Total running time of the script: ( 0 minutes 0.000 seconds)
Note
Click here to download the full example code
Muxing Multiple Datasets Together¶
Let’s say you have a machine learning task like digit recognition. You have multiple datasets, and you would like to sample from them evenly. Pescador’s Mux Streamer is a perfect tool to facilitate this sort of setup.
For this example, to simulate this experience, we will split the canonical MNIST training set evenly into three pieces, and save them to their own .npy files.
import numpy as np
from keras.datasets import mnist
import pescador
Prepare Datasets¶
dataset1_path = "/tmp/dataset1_train.npz"
dataset2_path = "/tmp/dataset2_train.npz"
dataset3_path = "/tmp/dataset3_train.npz"
datasets = [dataset1_path, dataset2_path, dataset3_path]
def load_mnist():
# the data, shuffled and split between train and test sets
(x_train, y_train), (x_test, y_test) = mnist.load_data()
return (x_train, y_train), (x_test, y_test)
def split_and_save_datasets(X, Y, paths):
"""Shuffle X and Y into n / len(paths) datasets, and save them
to disk at the locations provided in paths.
"""
shuffled_idxs = np.random.permutation(np.arange(len(X)))
for i in range(len(paths)):
# Take every len(paths) item, starting at i.
# len(paths) is 3, so this would be [0::3], [1::3], [2::3]
X_i = X[shuffled_idxs[i::len(paths)]]
Y_i = Y[shuffled_idxs[i::len(paths)]]
np.savez(paths[i], X=X_i, Y=Y_i)
(X_train, Y_train), (X_test, Y_test) = load_mnist()
split_and_save_datasets(X_train, Y_train, datasets)
Create Generator and Streams for each dataset.¶
def npz_generator(npz_path):
"""Generate data from an npz file."""
npz_data = np.load(npz_path)
X = npz_data['X']
# Y is a binary maxtrix with shape=(n, k), each y will have shape=(k,)
y = npz_data['Y']
n = X.shape[0]
while True:
i = np.random.randint(0, n)
yield {'X': X[i], 'Y': y[i]}
streams = [pescador.Streamer(npz_generator, x) for x in datasets]
Option 1: Stream equally from each dataset¶
If you can easily fit all the datasets in memory and you want to sample from then equally, you would set up your Mux as follows:
mux = pescador.Mux(streams,
# Three streams, always active.
k=len(streams),
# We want to sample from each stream infinitely,
# so we turn off the rate parameter, which
# controls how long to sample from each stream.
rate=None)
Option 2: Sample from one at a time.¶
Another approach might be to restrict sampling to one stream at a time. Now, the rate parameter controls (statistically) how long to sample from a stream before activating a new stream.
mux = pescador.Mux(streams,
# Only allow one active stream
k=1,
# Sample on average 1000 samples from a stream before
# moving onto another one.
rate=1000)
Use the mux as a streamer¶
At this point, you can use the Mux as a streamer normally:
for data in mux(max_iter=10):
print(dict((k, v.shape) for k, v in data.items()))
Total running time of the script: ( 0 minutes 0.000 seconds)
API Reference¶
API Reference¶
Streamer¶
-
class
pescador.
Streamer
(streamer, *args, **kwargs)¶ A wrapper class for recycling iterables and generator functions, i.e. streamers.
Wrapping streamers within an object provides two useful features:
- Streamer objects can be serialized (as long as its streamer can be)
- Streamer objects can instantiate a generator multiple times.
The first feature is important for parallelization (see zmq_stream), while the second feature facilitates infinite streaming from finite data (i.e., oversampling).
Examples
Generate random 3-dimensional vectors
>>> def my_generator(n): ... for i in range(n): ... yield i >>> stream = Streamer(my_generator, 5) >>> for i in stream: ... print(i) # Displays 0, 1, 2, 3, 4
Or with a maximum number of items
>>> for i in stream(max_items=3): ... print(i) # Displays 0, 1, 2
Or infinitely many examples, restarting the generator as needed
>>> for i in stream.cycle(): ... print(i) # Displays 0, 1, 2, 3, 4, 0, 1, 2, ...
Or finitely many examples, restarting the generator as needed
>>> for i in stream.cycle(max_iter=7): ... print(i) # Displays 0, 1, 2, 3, 4, 0, 1
An alternate interface for the same:
>>> for i in stream(cycle=True): ... print(i) # Displays 0, 1, 2, 3, 4, 0, 1, 2, ...
Attributes: - streamer : generator or iterable
Any generator function or iterable python object.
- args : list
- kwargs : dict
Parameters provided to streamer, if callable.
-
__call__
(max_iter=None, cycle=False)¶ Convenience interface for interacting with the Streamer.
Parameters: - max_iter : None or int > 0
Maximum number of iterations to yield. If None, attempt to exhaust the stream. For finite streams, call iterate again, or use cycle=True to force an infinite stream.
- cycle: bool
If True, cycle indefinitely.
Yields: - obj : Objects yielded by the generator provided on init.
-
active
¶ Returns true if the stream is active (ie there are still open / existing streams)
-
cycle
(max_iter=None)¶ Iterate from the streamer infinitely.
This function will force an infinite stream, restarting the streamer even if a StopIteration is raised.
Parameters: - max_iter : None or int > 0
Maximum number of iterations to yield. If None, iterate indefinitely.
Yields: - obj : Objects yielded by the streamer provided on init.
-
is_activated_copy
¶ is_active is true if this object is a copy of the original Streamer and has been activated.
Parallel streaming¶
-
class
pescador.
ZMQStreamer
(streamer, min_port=49152, max_port=65535, max_tries=100, copy=False, timeout=5)¶ Parallel data streaming over zeromq sockets.
This allows a data generator to run in a separate process from the consumer.
A typical usage pattern is to construct a Streamer object from a generator and then use ZMQStreamer to execute the stream in one process while the other process consumes data.
Examples
>>> # Construct a streamer object >>> S = pescador.Streamer(my_generator) >>> # Wrap the streamer in a ZMQ streamer >>> Z = pescador.ZMQStreamer(S) >>> # Process as normal >>> for data in Z: ... MY_FUNCTION(data)
-
__call__
(max_iter=None, cycle=False)¶ Convenience interface for interacting with the Streamer.
Parameters: - max_iter : None or int > 0
Maximum number of iterations to yield. If None, attempt to exhaust the stream. For finite streams, call iterate again, or use cycle=True to force an infinite stream.
- cycle: bool
If True, cycle indefinitely.
Yields: - obj : Objects yielded by the generator provided on init.
-
active
¶ Returns true if the stream is active (ie there are still open / existing streams)
-
cycle
(max_iter=None)¶ Iterate from the streamer infinitely.
This function will force an infinite stream, restarting the streamer even if a StopIteration is raised.
Parameters: - max_iter : None or int > 0
Maximum number of iterations to yield. If None, iterate indefinitely.
Yields: - obj : Objects yielded by the streamer provided on init.
-
is_activated_copy
¶ is_active is true if this object is a copy of the original Streamer and has been activated.
-
iterate
(max_iter=None)¶ Note: A ZMQStreamer does not activate its stream, but allows the zmq_worker to do that.
Yields: - data : dict
Data drawn from streamer(max_iter).
-
Multiplexing¶
Defines the interface and several varieties of mux. A mux is a Streamer which wraps N other Streamer objects, and at every step yields a sample from one of its sub-streamers.
This module defines the following Mux types:
- StochasticMux
A Mux which chooses its active streams stochastically, and chooses samples from the active streams stochastically. StochasticMux is equivalent to the pescador.Mux from versions <2.0.
StochasticMux has a mode parameter which selects how it operates, with the following modes:with_replacement
Sample streamers with replacement. This allows a single stream to be used multiple times simultaneously.exhaustive
Each streamer is consumed at most once and never re-activated.single_active
Each stream in the candidate pool is either active or not. Streams are revived when they are exhausted. This setting makes it so that streams in the active pool are uniquely selected from the candidate pool, where as with_replacement allows the same stream to be used more than once.- ShuffledMux
- A ShuffledMux interleaves samples from all given streamers.
- RoundRobinMux
- Iterates over all the streamers in strict order.
- ChainMux
- As in itertools.chain(), runs the first streamer to exhaustion, then the second, then the third, etc. Uses only a single stream at a time.
- Mux
The pescador<2.0 Mux is still available and works the same, but is deprecated.
We recommend replacing all uses of Mux with StochasticMux.
StochasticMux (streamers, n_active, rate[, …]) |
Stochastic Mux |
ShuffledMux (streamers[, weights, random_state]) |
A variation on a mux, which takes N streamers, and samples from them equally, guaranteeing all N streamers to be “active”. |
RoundRobinMux (streamers[, mode, random_state]) |
A Mux which iterates over all streamers in strict order. |
ChainMux (streamers[, mode, random_state]) |
As in itertools.chain(). |
Mux (streamers, k[, rate, weights, …]) |
Stochastic multiplexor for Streamers |
Maps (Transformers)¶
Map functions perform operations on a stream.
Important note: map functions return a generator, not another Streamer, so if you need it to behave like a Streamer, you have to wrap the function in a Streamer again.
buffer_stream (stream, buffer_size[, …]) |
Buffer “data” from an stream into one data object. |
tuples (stream, *keys) |
Reformat data as tuples. |
keras_tuples (stream[, inputs, outputs]) |
Reformat data objects as keras-compatible tuples. |
Release notes¶
Changes¶
v2.0.0 (2018-02-05)¶
This release is the second major revision of the pescador architecture, and includes many substantial changes to the API.
This release contains no changes from the release candidate 2.0.0rc0.
- #103 Deprecation and refactor of the Mux class. Its functionality has been superseded by new classes StochasticMux, ShuffledMux, ChainMux, and RoundRobinMux.
- #109 Removed deprecated features from the 1.x series: - BufferedStreamer class - Streamer.tuples method
- #111 Removed the internally-facing StreamActivator class
- #113 Bugfix: multiply-activated streamers (and muxes) no longer share state
- #116 Streamer.cycle now respects the max_iter parameter
- #121 Added minimum dependency version requirements
- #106 Added more advanced examples in the documentation
v1.1.0 (2017-08-25)¶
This is primarily a maintenance release, and will be the last in the 1.x series.
v1.0.0 (2017-03-18)¶
This release constitutes a major revision over the 0.x series, and the new interface is not backward-compatible.
- #23 Preserve memory alignment of numpy arrays across ZMQ streams
- #34 Rewrite of all core functionality under a unified object interface
Streamer
. - #35, #52 Removed the StreamLearner interface and scikit-learn dependency
- #44 Added a base exception class PescadorError
- #45 Removed the util submodule
- #47, #60 Improvements to documentation
- #48 Fixed a timeout bug in ZMQ streamer
- #53 Added testing and support for python 3.6
- #57 Added the .tuple() interface for Streamers
- #61 Improved test coverage
- #63 Added random state to Mux
- #64 Added __call__ interface to Streamer
v0.1.3 (2016-07-28)¶
- Added support for
joblib>=0.10
v0.1.2 (2016-02-22)¶
- Added
pescador.mux
parameter revive. Calling with with_replacement=False, revive=True will use each seed at most once at any given time. - Added
pescador.zmq_stream
parameter timeout. Setting this to a positive number will terminate dangling worker threads after timeout is exceeded on join. See also:multiprocessing.Process.join
.
v0.1.1 (2016-01-07)¶
pescador.mux
now throws aRuntimeError
exception if the seed pool is empty
v0.1.0 (2015-10-20)¶
Initial public release