This document will walk through some advanced usage of pescador.
We will assume a working understanding of the simple example in the previous section.
Stream re-use and multiplexingΒΆ
The Mux streamer provides a powerful interface for randomly interleaving samples from multiple input streams. Mux can also dynamically activate and deactivate individual Streamers, which allows it to operate on a bounded subset of streams at any given time.
As a concrete example, we can simulate a mixture of noisy streams with differing variances.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 | from __future__ import print_function
import numpy as np
from sklearn.datasets import load_breast_cancer
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import ShuffleSplit
from sklearn.metrics import accuracy_score
from pescador import Streamer, Mux
def noisy_samples(X, Y, batch_size=16, sigma=1.0):
'''Copied over from the previous example'''
n, d = X.shape
while True:
i = np.random.randint(0, n, size=batch_size)
noise = sigma * np.random.randn(batch_size, d)
yield dict(X=X[i] + noise, Y=Y[i])
# Load some example data from sklearn
raw_data = load_breast_cancer()
X, Y = raw_data['data'], raw_data['target']
classes = np.unique(Y)
for train, test in ShuffleSplit(len(X), n_splits=1, test_size=0.1):
# Instantiate a linear classifier
estimator = SGDClassifier()
# Build a collection of Streamers with different noise scales
streams = [Streamer(noisy_samples, X[train], Y[train], sigma=sigma)
for sigma in [0, 0.5, 1.0, 2.0, 4.0]]
# Build a mux stream, keeping 3 streams alive at once
mux_stream = Mux(streams,
k=3, # Keep 3 streams alive at once
lam=64) # Use a poisson rate of 64
# Fit the model to the stream, use at most 5000 batches
for batch in mux_stream(max_batches=5000):
estimator.partial_fit(batch['X'], batch['Y'], classes=classes)
# And report the accuracy
Ypred = estimator.predict(X[test])
print('Test accuracy: {:.3f}'.format(accuracy_score(Y[test], Ypred)))
|
In the above example, each Streamer in streams can make infinitely many samples. The lam=64 argument to Mux says that each stream should produce some n batches, where n is sampled from a Poisson distribution of rate lam. When a stream exceeds its bound, it is deactivated, and a new streamer is activated to fill its place.
Setting lam=None disables the random stream bounding, and mux() simply runs each active stream until exhaustion.
The Mux streamer can sampled with or without replacement from its input streams, according to the with_replacement option. Setting this parameter to False means that each stream can be active at most once.
Streams can also be sampled with non-uniform weighting by specifying a vector of pool_weights.
Finally, exhausted streams can be removed by setting prune_empty_seeds to True. If False, then exhausted streams may be reactivated at any time.