Introduction

Pescador is a library for streaming (numerical) data for use in iterative machine learning applications.

The core concept is the Streamer object, which encapsulates a Python generator to allow for re-use and inter-process communication.

The basic use case is as follows:

  1. Define a generator function g which yields a dictionary of numpy arrays at each step
  2. Construct a Streamer object stream = Streamer(g, args...)
  3. Iterate over examples generated by stream().

On top of this basic functionality, pescador provides the following tools:

For examples of each of these use-cases, refer to the Examples section.

Basic Usage

Basic example

This document will walk through the basics of using pescador to stream samples from a generator.

Our running example will be learning from an infinite stream of stochastically perturbed samples from the Iris dataset.

Before we can get started, we’ll need to introduce a few core concepts. We will assume some basic familiarity with generators.

Batch generators

Not all python generators are valid for machine learning. Pescador assumes that generators produce output in a particular format, which we will refer to as a batch. Specifically, a batch is a python dictionary containing np.ndarray. For unsupervised learning (e.g., MiniBatchKMeans), valid batches contain only one key: X. For supervised learning (e.g., SGDClassifier), valid batches must contain both X and Y keys, both of equal length.

Here’s a simple example generator that draws random batches of data from Iris of a specified batch_size, and adds gaussian noise to the features.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import numpy as np

def noisy_samples(X, Y, batch_size=16, sigma=1.0):
    '''Generate an infinite stream of noisy samples from a labeled dataset.

    Parameters
    ----------
    X : np.ndarray, shape=(n, d)
        Features

    Y : np.ndarray, shape=(n,)
        Labels

    batch_size : int > 0
        Size of the batches to generate

    sigma : float > 0
        Variance of the additive noise

    Yields
    ------
    batch : dict
        batch['X'] is an `np.ndarray` of shape `(batch_size, d)`

        batch[Y'] is an `np.ndarray` of shape `(batch_size,)`
    '''


    n, d = X.shape

    while True:
        i = np.random.randint(0, n, size=batch_size)

        noise = sigma * np.random.randn(batch_size, d)

        yield dict(X=X[i] + noise, Y=Y[i])

In the code above, noisy_samples is a generator that can be sampled indefinitely because noisy_samples contains an infinite loop. Each iterate of noisy_samples will be a dictionary containing the sample batch’s features and labels.

Streamers

Generators in python have a couple of limitations for common stream learning pipelines. First, once instantiated, a generator cannot be “restarted”. Second, an instantiated generator cannot be serialized directly, so they are difficult to use in distributed computation environments.

Pescador provides the Streamer object to circumvent these issues. Streamer simply provides an object container for an uninstantiated generator (and its parameters), and an access method generate(). Calling generate() multiple times on a streamer object is equivalent to restarting the generator, and can therefore be used to simply implement multiple pass streams. Similarly, because Streamer can be serialized, it is simple to pass a streamer object to a separate process for parallel computation.

Here’s a simple example, using the generator from the previous section.

1
2
3
4
5
import pescador

streamer = pescador.Streamer(noisy_samples, X[train], Y[train])

batch_stream2 = streamer.generate()

Iterating over streamer.generate() is equivalent to iterating over noisy_samples(X[train], Y[train]).

Additionally, Streamer can be bounded easily by saying streamer.generate(max_batches=N) for some N maximum number of batches.

Finally, because generate() is such a common operation with streamer objects, a short-hand interface is provided by treating the streamer object as if it was a generator:

1
2
3
4
5
6
import pescador

streamer = pescador.Streamer(noisy_samples, X[train], Y[train])

# Equivalent to batch_stream2 above
batch_stream3 = streamer()

This document will walk through some advanced usage of pescador.

We will assume a working understanding of the simple example in the previous section.

Stream re-use and multiplexing

The Mux streamer provides a powerful interface for randomly interleaving samples from multiple input streams. Mux can also dynamically activate and deactivate individual Streamers, which allows it to operate on a bounded subset of streams at any given time.

As a concrete example, we can simulate a mixture of noisy streams with differing variances.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from __future__ import print_function

import numpy as np

from sklearn.datasets import load_breast_cancer
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import ShuffleSplit
from sklearn.metrics import accuracy_score

from pescador import Streamer, Mux

def noisy_samples(X, Y, batch_size=16, sigma=1.0):
    '''Copied over from the previous example'''
    n, d = X.shape

    while True:
        i = np.random.randint(0, n, size=batch_size)

        noise = sigma * np.random.randn(batch_size, d)

        yield dict(X=X[i] + noise, Y=Y[i])

# Load some example data from sklearn
raw_data = load_breast_cancer()
X, Y = raw_data['data'], raw_data['target']

classes = np.unique(Y)

for train, test in ShuffleSplit(len(X), n_splits=1, test_size=0.1):

    # Instantiate a linear classifier
    estimator = SGDClassifier()

    # Build a collection of Streamers with different noise scales
    streams = [Streamer(noisy_samples, X[train], Y[train], sigma=sigma)
               for sigma in [0, 0.5, 1.0, 2.0, 4.0]]

    # Build a mux stream, keeping 3 streams alive at once
    mux_stream = Mux(streams,
                     k=3,    # Keep 3 streams alive at once
                     lam=64) # Use a poisson rate of 64

    # Fit the model to the stream, use at most 5000 batches
    for batch in mux_stream(max_batches=5000):
        estimator.partial_fit(batch['X'], batch['Y'], classes=classes)

    # And report the accuracy
    Ypred = estimator.predict(X[test])
    print('Test accuracy: {:.3f}'.format(accuracy_score(Y[test], Ypred)))

In the above example, each Streamer in streams can make infinitely many samples. The lam=64 argument to Mux says that each stream should produce some n batches, where n is sampled from a Poisson distribution of rate lam. When a stream exceeds its bound, it is deactivated, and a new streamer is activated to fill its place.

Setting lam=None disables the random stream bounding, and mux() simply runs each active stream until exhaustion.

The Mux streamer can sampled with or without replacement from its input streams, according to the with_replacement option. Setting this parameter to False means that each stream can be active at most once.

Streams can also be sampled with non-uniform weighting by specifying a vector of pool_weights.

Finally, exhausted streams can be removed by setting prune_empty_seeds to True. If False, then exhausted streams may be reactivated at any time.

Sampling from disk

A common use case for pescador is to sample data from a large collection of existing archives. As a concrete example, consider the problem of fitting a statistical model to a large corpus of musical recordings. When the corpus is sufficiently large, it is impossible to fit the entire set in memory while estimating the model parameters. Instead, one can pre-process each song to store pre-computed features (and, optionally, target labels) in a numpy zip NPZ archive. The problem then becomes sampling data from a collection of NPZ archives.

Here, we will assume that the pre-processing has already been done so that each NPZ file contains a numpy array of features X and labels Y. We will define infinite samplers that pull n examples per iterate.

import numpy as np
import pescador

def sample_npz(npz_file, n):
    '''Generate an infinite sequence of contiguous samples
    from the input `npz_file`.

    Each iterate has `n > 0` rows.
    '''
    with np.load(npz_file) as data:
        # How many rows are in the data?
        # We assume that data['Y'] has the same length
        n_total = len(data['X'])

        while True:
            # Compute the index offset
            idx = np.random.randint(n_total - n)
            yield dict(X=data['X'][idx:idx + n],
                       Y=data['Y'][idx:idx + n])

Applying the sample_npz function above to a list of npz_files, we can make a multiplexed streamer object as follows:

n = 16
npz_files = #LIST OF PRE-COMPUTED NPZ FILES
streams = [pescador.Streamer(sample_npz, npz_f, n) for npz_f in npz_files]

# Keep 32 streams alive at once
# Draw on average 16 patches from each stream before deactivating
mux_stream = pescador.Mux(streams, k=32, lam=16)

for batch in mux_stream(max_batches=1000):
    # DO LEARNING HERE
    pass

Memory-mapping

The NPZ file format requires loading the entire contents of each archive into memory. This can lead to high memory consumption when the number of active streams is large. Note also that memory usage for each NPZ file will persist for as long as there is a reference to its contents. This can be circumvented, at the cost of some latency, by copying data within the streamer function:

def sample_npz_copy(npz_file, n):
    with np.load(npz_file) as data:
        # How many rows are in the data?
        # We assume that data['Y'] has the same length
        n_total = len(data['X'])

        while True:
            # Compute the index offset
            idx = np.random.randint(n_total - n)
            yield dict(X=data['X'][idx:idx + n].copy(),  # <-- Note the explicit copy
                       Y=data['Y'][idx:idx + n].copy())

The above modification will ensure that memory is freed as quickly as possible.

Alternatively, memory-mapping can be used to only load data as needed, but requires that each array is stored in its own NPY file:

def sample_npy_mmap(npy_x, npy_y, n):

    # Open each file in "copy-on-write" mode, so that the files are read-only
    X = np.load(npy_x, mmap_mode='c')
    Y = np.load(npy_y, mmap_mode='c')

    n_total = len(X)

    while True:
        # Compute the index offset
        idx = np.random.randint(n_total - n)
        yield dict(X=X[idx:idx + n],
                   Y=Y[idx:idx + n])


# Using this streamer is similar to the first example, but now you need a separate
# NPY file for each X and Y
npy_x_files = #LIST OF PRE-COMPUTED NPY FILES (X)
npy_y_files = #LIST OF PRE-COMPUTED NPY FILES (Y)
streams = [pescador.Streamer(sample_npz, npy_x, npy_y n)
           for (npy_x, npy_y) in zip(npy_x_files, npy_y_files)]

Buffered Streaming

In a machine learning setting, it is common to train a model with multiple input datapoints simultaneously, in what are commonly referred to as “minibatches”. To achieve this, pescador provides the Buffered Streamers, which will “buffer” your batches into fixed batch sizes.

Following up on the first example, we use the noisy_samples generator.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import pescador

# Create an initial streamer
streamer = pescador.Streamer(noisy_samples, X[train], Y[train])

minibatch_size = 128
# Wrap your streamer
buffered_streamer = pescador.BufferedStreamer(streamer, minibatch_size)

# Generate batches in exactly the same way as you would from the base streamer
for batch in buffered_streamer():
    ...

A few important points to note about using Buffered Streamers:

  • Buffered Streamers will concatenate your arrays, such that the first dimension contains the number of batches (minibatch_size in the above example.
  • Each key in the batches generated will be concatenated (across all the batches buffered).
  • A consequence of this is that you must make sure that your generators yield batches such that every key contains arrays shaped (N, ...), where N is the number of batches generated.

Examples

API Reference

API Reference

Streamer

class pescador.Streamer(streamer, *args, **kwargs)

A wrapper class for reusable generators.

Wrapping generators/iterators within an object provides two useful features:

  1. Streamer objects can be serialized (as long as the generator can be)
  2. Streamer objects can instantiate a generator multiple times.

The first feature is important for parallelization (see zmq_stream), while the second feature facilitates infinite streaming from finite data (i.e., oversampling).

Examples

Generate batches of random 3-dimensional vectors

>>> def my_generator(n):
...     for i in range(n):
...         yield {'X': np.random.randn(1, 3)}
>>> GS = Streamer(my_generator, 5)
>>> for i in GS():
...     print(i)

Or with a maximum number of items

>>> for i in GS(max_items=3):
...     print(i)

Or infinitely many examples, restarting the generator as needed

>>> for i in GS.cycle():
...     print(i)

An alternate interface for the same:

>>> for i in GS(cycle=True):
...     print(i)

Attributes

generator (iterable or Streamer) A generator function or iterable collection to draw from. May be another instance or subclass of Streamer.
args (list)
kwargs (dict) If generator is a function, then args and kwargs provide the parameters to the function.
__call__(max_batches=None, cycle=False)
Parameters:

max_batches : None or int > 0

Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.

cycle: bool

If True, cycle indefinitely.

Yields:

batch : dict

Items from the contained generator If max_batches is an integer, then at most max_batches are generated.

See also

generate, cycle, tuples

activate()

Activates the stream.

active

Returns true if the stream is active (ie a StopIteration) has not been thrown.

cycle()

Generates from the streamer infinitely.

This function will force an infinite stream, restarting the generator even if a StopIteration is raised.

Yields:

batch

Items from the contained generator.

generate(max_batches=None)

Instantiate the generator

Parameters:

max_batches : None or int > 0

Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.

Yields:

batch : dict

Items from the contained generator If max_batches is an integer, then at most max_batches are generated.

tuples(*items, **kwargs)

Generate data in tuple-form instead of dicts.

This is useful for interfacing with Keras’s generator system, which requires iterates to be provided as tuples.

Parameters:

*items

One or more dictionary keys. The generated tuples will correspond to (batch[item1], batch[item2], ..., batch[itemk]) where batch is a single iterate produced by the streamer.

cycle : bool

If True, then data is generated infinitely using the cycle method. Otherwise, data is generated according to the generate method.

max_batches : None or int > 0

Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.

Yields:

batch : tuple

Items from the contained generator If max_batches is an integer, then at most max_batches are generated.

See also

generate, cycle

Buffered Streamers

class pescador.BufferedStreamer(streamer, buffer_size, strict_batch_size=True)

Buffers a stream into batches of examples

Examples

>>> def my_generator(n):
...     # Generates a single 30-dimensional example vector for each iterate
...     for i in range(n):
...         yield dict(X=np.random.randn(1, 30))
>>> # Wrap the generator in a Streamer
>>> S = pescador.Streamer(my_generator, 128)
>>> # A buffered streamer will combine N iterates into a single batch
>>> N = 10
>>> B = pescador.BufferedStreamer(my_generator, N)
>>> for batch in B():
...     # Work on a batch of N=10 examples
...     MY_PROCESS_FUNCTION(batch)
__call__(max_batches=None, cycle=False)
Parameters:

max_batches : None or int > 0

Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.

cycle: bool

If True, cycle indefinitely.

Yields:

batch : dict

Items from the contained generator If max_batches is an integer, then at most max_batches are generated.

See also

generate, cycle, tuples

activate()

Activates the stream.

active

Returns true if the stream is active (ie a StopIteration) has not been thrown.

cycle()

Generates from the streamer infinitely.

This function will force an infinite stream, restarting the generator even if a StopIteration is raised.

Yields:

batch

Items from the contained generator.

generate(max_batches=None)

Generate samples from the streamer.

Parameters:

max_batches : int

For the BufferedStreamer, max_batches is the number of buffered batches that are generated, not the number of individual samples.

tuples(*items, **kwargs)

Generate data in tuple-form instead of dicts.

This is useful for interfacing with Keras’s generator system, which requires iterates to be provided as tuples.

Parameters:

*items

One or more dictionary keys. The generated tuples will correspond to (batch[item1], batch[item2], ..., batch[itemk]) where batch is a single iterate produced by the streamer.

cycle : bool

If True, then data is generated infinitely using the cycle method. Otherwise, data is generated according to the generate method.

max_batches : None or int > 0

Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.

Yields:

batch : tuple

Items from the contained generator If max_batches is an integer, then at most max_batches are generated.

See also

generate, cycle

Multiplexing

class pescador.Mux(seed_pool, k, lam=256.0, pool_weights=None, with_replacement=True, prune_empty_seeds=True, revive=False, random_state=None)

Stochastic multiplexor for Streamers

Examples

>>> # Create a collection of streamers
>>> seeds = [pescador.Streamer(my_generator) for i in range(10)]
>>> # Multiplex them together into a single streamer
>>> # Use at most 3 streams at once
>>> mux = pescador.Mux(seeds, k=3)
>>> for batch in mux():
...     MY_FUNCTION(batch)
__call__(max_batches=None, cycle=False)
Parameters:

max_batches : None or int > 0

Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.

cycle: bool

If True, cycle indefinitely.

Yields:

batch : dict

Items from the contained generator If max_batches is an integer, then at most max_batches are generated.

See also

generate, cycle, tuples

activate()

Activates the seed pool

active

Returns true if the stream is active (ie a StopIteration) has not been thrown.

cycle()

Generates from the streamer infinitely.

This function will force an infinite stream, restarting the generator even if a StopIteration is raised.

Yields:

batch

Items from the contained generator.

tuples(*items, **kwargs)

Generate data in tuple-form instead of dicts.

This is useful for interfacing with Keras’s generator system, which requires iterates to be provided as tuples.

Parameters:

*items

One or more dictionary keys. The generated tuples will correspond to (batch[item1], batch[item2], ..., batch[itemk]) where batch is a single iterate produced by the streamer.

cycle : bool

If True, then data is generated infinitely using the cycle method. Otherwise, data is generated according to the generate method.

max_batches : None or int > 0

Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.

Yields:

batch : tuple

Items from the contained generator If max_batches is an integer, then at most max_batches are generated.

See also

generate, cycle

Parallel streaming

class pescador.ZMQStreamer(streamer, min_port=49152, max_port=65535, max_tries=100, copy=False, timeout=None)

Parallel data streaming over zeromq sockets.

This allows a data generator to run in a separate process from the consumer.

A typical usage pattern is to construct a Streamer object from a generator and then use ZMQStreamer to execute the stream in one process while the other process consumes data.

Examples

>>> # Construct a streamer object
>>> S = pescador.Streamer(my_generator)
>>> # Wrap the streamer in a ZMQ streamer
>>> Z = pescador.ZMQStreamer(S)
>>> # Process as normal
>>> for batch in Z():
...     MY_FUNCTION(batch)
__call__(max_batches=None, cycle=False)
Parameters:

max_batches : None or int > 0

Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.

cycle: bool

If True, cycle indefinitely.

Yields:

batch : dict

Items from the contained generator If max_batches is an integer, then at most max_batches are generated.

See also

generate, cycle, tuples

activate()

Activates the stream.

active

Returns true if the stream is active (ie a StopIteration) has not been thrown.

cycle()

Generates from the streamer infinitely.

This function will force an infinite stream, restarting the generator even if a StopIteration is raised.

Yields:

batch

Items from the contained generator.

generate(max_batches=None)

Note: A ZMQStreamer does not activate it’s stream, but allows the zmq_worker to do that.

Yields:

batch

Data drawn from streamer(max_batches).

tuples(*items, **kwargs)

Generate data in tuple-form instead of dicts.

This is useful for interfacing with Keras’s generator system, which requires iterates to be provided as tuples.

Parameters:

*items

One or more dictionary keys. The generated tuples will correspond to (batch[item1], batch[item2], ..., batch[itemk]) where batch is a single iterate produced by the streamer.

cycle : bool

If True, then data is generated infinitely using the cycle method. Otherwise, data is generated according to the generate method.

max_batches : None or int > 0

Maximum number of batches to yield. If None, exhaust the generator. If the stream is finite, the generator will be exhausted when it completes. Call generate again, or use cycle to force an infinite stream.

Yields:

batch : tuple

Items from the contained generator If max_batches is an integer, then at most max_batches are generated.

See also

generate, cycle

Changes

Changes

v1.0.0

This release constitutes a major revision over the 0.x series, and the new interface is not backward-compatible.

  • #23 Preserve memory alignment of numpy arrays across ZMQ streams
  • #34 Rewrite of all core functionality under a unified object interface Streamer.
  • #35, #52 Removed the StreamLearner interface and scikit-learn dependency
  • #44 Added a base exception class PescadorError
  • #45 Removed the util submodule
  • #47, #60 Improvements to documentation
  • #48 Fixed a timeout bug in ZMQ streamer
  • #53 Added testing and support for python 3.6
  • #57 Added the .tuple() interface for Streamers
  • #61 Improved test coverage
  • #63 Added random state to Mux
  • #64 Added __call__ interface to Streamer

v0.1.3

  • Added support for joblib>=0.10

v0.1.2

  • Added pescador.mux parameter revive. Calling with with_replacement=False, revive=True will use each seed at most once at any given time.
  • Added pescador.zmq_stream parameter timeout. Setting this to a positive number will terminate dangling worker threads after timeout is exceeded on join. See also: multiprocessing.Process.join.

v0.1.1

  • pescador.mux now throws a RuntimeError exception if the seed pool is empty

v0.1.0

Initial public release