Examples

Simple example

This document will walk through the basics of training models using pescador.

Our running example will be learning from an infinite stream of stochastically perturbed samples from the Iris dataset.

Before we can get started, we’ll need to introduce a few core concepts. We will assume some basic familiarity with scikit-learn and generators.

Batch generators

Not all python generators are valid for machine learning. Pescador assumes that generators produce output in a particular format, which we will refer to as a batch. Specifically, a batch is a python dictionary containing np.ndarray. For unsupervised learning (e.g., MiniBatchKMeans), valid batches contain only one key: X. For supervised learning (e.g., SGDClassifier), valid batches must contain both X and Y keys, both of equal length.

Here’s a simple example generator that draws random batches of data from Iris of a specified batch_size, and adds gaussian noise to the features.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import numpy as np

def noisy_samples(X, Y, batch_size=16, sigma=1.0):
    '''Generate an infinite stream of noisy samples from a labeled dataset.

    Parameters
    ----------
    X : np.ndarray, shape=(n, d)
        Features

    Y : np.ndarray, shape=(n,)
        Labels

    batch_size : int > 0
        Size of the batches to generate

    sigma : float > 0
        Variance of the additive noise

    Yields
    ------
    batch
    '''


    n, d = X.shape

    while True:
        i = np.random.randint(0, n, size=m)

        noise = sigma * np.random.randn(batch_size, d)

        yield dict(X=X[i] + noise, Y=Y[i])

In the code above, noisy_samples is a generator that can be sampled indefinitely because noisy_samples contains an infinite loop. Each iterate of noisy_samples will be a dictionary containing the sample batch’s features and labels.

StreamLearner

Many scikit-learn classes provide an iterative learning interface via partial_fit(), which can update an existing model after observing a new batch of samples. Pescador provides an additional layer (StreamLearner) which interfaces between batch generators and partial_fit().

The following example illustrates how to use StreamLearner.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from __future__ import print_function

import sklearn.datasets
from sklearn.cross_validation import ShuffleSplit
from sklearn.linear_model import SGDClassifier
from sklearn.metrics import accuracy_score

import pescador

# Load the Iris dataset
data = sklearn.datasets.load_iris()
X, Y = data.data, data.target

# Get the space of class labels
classes = np.unique(Y)

# Generate a single 90/10 train/test split
for train, test in ShuffleSplit(len(X), n_iter=1, test_size=0.1)

    # Instantiate a linear classifier
    estimator = SGDClassifier()

    # Wrap the estimator object in a stream learner
    model = pescador.StreamLearner(estimator, max_batches=1000)

    # Build a data stream
    batch_stream = noisy_samples(X[train], Y[train])

    # Fit the model to the stream
    model.iter_fit(batch_stream, classes=classes)

    # And report the accuracy
    print('Test accuracy: {:.3f}'.format(accuracy_score(Y[test],
                                                        model.predict(X[test]))))

A few things to note here:

  • Because noisy_samples is an infinite generator, we need to provide an explicit bound on the amount of samples to draw when fitting. This is done in line 20 with the max_batches parameter to StreamLearner.
  • StreamLearner objects transparently wrap the methods of their contained estimator object, so model.predict(X[test]) and model.estimator.predict(X[test]) are equivalent.

Advanced example

This document will walk through advanced usage of pescador.

We will assume a working understanding of the simple example in the previous section.

Streamers

Generators in python have a couple of limitations for common stream learning pipelines. First, once instantiated, a generator cannot be “restarted”. Second, an instantiated generator cannot be serialized directly, so they are difficult to use in distributed computation environments.

Pescador provides the Streamer object to circumvent these issues. Streamer simply provides an object container for an uninstantiated generator (and its parameters), and an access method generate(). Calling generate() multiple times on a streamer object is equivalent to restarting the generator, and can therefore be used to simply implement multiple pass streams. Similarly, because Streamer can be serialized, it is simple to pass a streamer object to a separate process for parallel computation.

Here’s a simple example, using the generator from the previous section.

1
2
3
4
5
import pescador

streamer = pescador.Streamer(noisy_samples, X[train], Y[train])

batch_stream2 = streamer.generate()

Iterating over streamer.generate() is equivalent to iterating over noisy_samples(X[train], Y[train]).

Additionally, Streamer can be bounded easily by saying streamer.generate(max_batches=N) for some N maximum number of batches.

Stream re-use and multiplexing

The mux() function provides a powerful interface for randomly interleaving samples from multiple input streams. mux can also dynamically activate and deactivate individual Streamers, which allows it to operate on a bounded subset of streams at any given time.

As a concrete example, we can simulate a mixture of noisy streams with differing variances.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
for train, test in ShuffleSplit(len(X), n_iter=1, test_size=0.1)

    # Instantiate a linear classifier
    estimator = SGDClassifier()

    # Wrap the estimator object in a stream learner
    model = pescador.StreamLearner(estimator, max_batches=1000)

    # Build a collection of streams with different variance scales
    streams = [noisy_samples(X[train], Y[train], sigma=sigma)
               for sigma in [0.5, 1.0, 2.0, 4.0]]

    # Build a mux stream, keeping only 2 streams alive at once
    batch_stream = pescador.mux(streams,
                                1000,   # Generate 1000 batches in total
                                2,      # Keep 2 streams alive at once
                                lam=16) # Use a poisson rate of 16


    # Fit the model to the stream
    model.iter_fit(batch_stream, classes=classes)

    # And report the accuracy
    print('Test accuracy: {:.3f}'.format(accuracy_score(Y[test],
                                                        model.predict(X[test]))))

In the above example, each noisy_samples streamer is infinite. The lam=16 argument to mux says that each stream should produce some n batches, where n is sampled from a Poisson distribution of rate lam. When a stream exceeds its bound, it is deactivated, and a new stream is activated to fill its place.

Setting lam=None disables the random stream bounding, and mux() simply runs each active stream until exhaustion.

Streams can be sampled with or without replacement according to the with_replacement option. Setting this parameter to False means that each stream can be active at most once.

Streams can also be sampled with non-uniform weighting by specifying a vector pool_weights.

Finally, exhausted streams can be removed by setting prune_empty_seeds to True. If False, then exhausted streams may be reactivated at any time.

Note that because mux() itself is a generator, it too can be wrapped in a Streamer object.

API Reference

The Streamer object

The StreamLearner object

Stream manipulation

Utility functions for stream manipulations

mux(seed_pool, n_samples, k[, lam, ...]) Stochastic multiplexor for generator seeds.
buffer_batch(generator, buffer_size) Buffer an iterable of batches into larger (or smaller) batches
buffer_streamer(streamer, buffer_size, ...) Buffer a stream of batches
batch_length(batch) Determine the number of samples in a batch.

Parallelism

ZMQ-baesd stream multiplexing

zmq_stream(streamer[, max_batches, ...]) Parallel data streaming over zeromq sockets.

Changes

Changes

v0.1.3

  • Added support for joblib>=0.10

v0.1.2

  • Added pescador.mux parameter revive. Calling with with_replacement=False, revive=True will use each seed at most once at any given time.
  • Added pescador.zmq_stream parameter timeout. Setting this to a positive number will terminate dangling worker threads after timeout is exceeded on join. See also: multiprocessing.Process.join.

v0.1.1

  • pescador.mux now throws a RuntimeError exception if the seed pool is empty

v0.1.0

Initial public release