This example demonstrates how to reuse and multiplex streamers.

We will assume a working understanding of the simple example in the previous section.

Stream reuse and multiplexing

The StochasticMux streamer provides a powerful interface for randomly interleaving samples from multiple input streams. StochasticMux can also dynamically activate and deactivate individual Streamers, which allows it to operate on a bounded subset of streams at any given time.

As a concrete example, we can simulate a mixture of noisy streams with differing variances.

In this example, we use a slightly different notation for defining streamers, introduced in pescador 2.1. Instead of defining an iterator function noisy_samples and then wrapping it with a Streamer object, we can instead decorate the generator function with @pescador.streamable. The result is qualitatively the same in both cases, but the decorator interface produces a slightly cleaner syntax.

 1import numpy as np
 2
 3from sklearn.datasets import load_breast_cancer
 4from sklearn.linear_model import SGDClassifier
 5from sklearn.model_selection import ShuffleSplit
 6from sklearn.metrics import accuracy_score
 7
 8from pescador import Streamer, StochasticMux
 9
10@pescador.streamable
11def noisy_samples(X, Y, sigma=1.0):
12    '''Copied over from the previous example'''
13    n, d = X.shape
14
15    while True:
16        i = np.random.randint(0, n, size=1)
17
18        noise = sigma * np.random.randn(1, d)
19
20        yield dict(X=X[i] + noise, Y=Y[i])
21
22# Load some example data from sklearn
23raw_data = load_breast_cancer()
24X, Y = raw_data['data'], raw_data['target']
25
26classes = np.unique(Y)
27
28rs = ShuffleSplit(n_splits=1, test_size=0.1)
29for train, test in rs.split(X):
30
31    # Instantiate a linear classifier
32    estimator = SGDClassifier()
33
34    # Build a collection of Streamers with different noise scales
35    streams = [noisy_samples(X[train], Y[train], sigma=sigma)
36               for sigma in [0, 0.5, 1.0, 2.0, 4.0]]
37
38    # Build a mux stream, keeping 3 streams alive at once
39    mux_stream = StochasticMux(streams,
40                               3,        # Keep 3 streams alive at once
41                               rate=64)  # Use a poisson rate of 64
42
43    # Fit the model to the stream, use at most 5000 samples
44    for sample in mux_stream(max_iter=5000):
45        estimator.partial_fit(sample['X'], sample['Y'], classes=classes)
46
47    # And report the accuracy
48    Ypred = estimator.predict(X[test])
49    print('Test accuracy: {:.3f}'.format(accuracy_score(Y[test], Ypred)))

In the above example, each Streamer in streams can make infinitely many samples. The rate=64 argument to StochasticMux says that each stream should produce some n samples, where n is sampled from a Poisson distribution of rate rate. When a stream exceeds its bound, it is deactivated, and a new streamer is activated to fill its place.

Setting rate=None disables the random stream bounding, and simply runs each active stream until exhaustion.

The StochasticMux streamer can sampled with or without replacement from its input streams, according to the mode option. Setting this parameter to single_active means that each stream can be active at most once.

Streams can also be sampled with non-uniform weighting by specifying a vector of weights.

Finally, exhausted streams can be removed by setting mode=’exhaustive’.