Streaming data

This example will walk through the basics of using pescador to stream samples from a generator.

Our running example will be learning from an infinite stream of stochastically perturbed samples from the Iris dataset.

Sample generators

Streamers are intended to transparently pass data without modifying them. However, Pescador assumes that Streamers produce output in a particular format. Specifically, a data is expected to be a python dictionary where each value contains a np.ndarray. For an unsupervised learning (e.g., SKLearn/MiniBatchKMeans), the data might contain only one key: X. For supervised learning (e.g., SGDClassifier), valid data would contain both X and Y keys, both of equal length.

Here’s a simple example generator that draws random samples of data from the Iris dataset, and adds gaussian noise to the features.

 1import numpy as np
 2
 3def noisy_samples(X, Y, sigma=1.0):
 4    '''Generate an infinite stream of noisy samples from a labeled dataset.
 5
 6    Parameters
 7    ----------
 8    X : np.ndarray, shape=(d,)
 9        Features
10
11    Y : np.ndarray, shape=(,)
12        Labels
13
14    sigma : float > 0
15        Variance of the additive noise
16
17    Yields
18    ------
19    sample : dict
20        sample['X'] is an `np.ndarray` of shape `(d,)`
21
22        sample['Y'] is a scalar `np.ndarray` of shape `(,)`
23    '''
24
25    n, d = X.shape
26
27    while True:
28        i = np.random.randint(0, n)
29
30        noise = sigma * np.random.randn(1, d)
31
32        yield dict(X=X[i] + noise, Y=Y[i])

In the code above, noisy_samples is a generator that can be sampled indefinitely because noisy_samples contains an infinite loop. Each iterate of noisy_samples will be a dictionary containing the sample’s features and labels.

Streamers

Generators in python have a couple of limitations for common stream learning pipelines. First, once instantiated, a generator cannot be “restarted”. Second, an instantiated generator cannot be serialized directly, so they are difficult to use in distributed computation environments.

Pescador provides the Streamer class to circumvent these issues. Streamer simply provides an object container for an uninstantiated generator (and its parameters), and an access method generate(). Calling generate() multiple times on a Streamer object is equivalent to restarting the generator, and can therefore be used to simply implement multiple pass streams. Similarly, because Streamer can be serialized, it is simple to pass a streamer object to a separate process for parallel computation.

Here’s a simple example, using the generator from the previous section.

1import pescador
2
3streamer = pescador.Streamer(noisy_samples, X[train], Y[train])
4
5stream2 = streamer.iterate()

Iterating over streamer.iterate() is equivalent to iterating over noisy_samples(X[train], Y[train]).

Additionally, Streamer can be bounded easily by saying streamer.iterate(max_iter=N) for some N maximum number of samples.

Finally, because iterate() is such a common operation with streamer objects, a short-hand interface is provided by treating the streamer object as if it was a generator:

1import pescador
2
3streamer = pescador.Streamer(noisy_samples, X[train], Y[train])
4
5# Equivalent to stream2 above
6stream3 = streamer()

Iterating over any of these would then look like the following:

1for sample in streamer.iterate():
2    # do something
3    ...
4
5# For convenience, the object directly behaves as an iterator.
6for sample in streamer:
7    # do something
8    ...

Stream decorators

In the example above, we implemented a streamer by first defining a generator function, and then wrapping it inside a Streamer object. This allows the function to be used either directly as an iterator, or indirectly through pescador. However, if you are implementing a generator which will only be used via pescador, and do not need direct iterator access, you can accomplish this with a slightly cleaner syntax like in the following example:

1import pescador
2
3@pescador.streamable
4def my_generator(n):
5   for i in range(n):
6      yield i**2

Using the above syntax, when you call my_generator(5), the result is itself a Streamer object, rather than an iterator. We generally recommend using this syntax unless direct access to the iterator is necessary, as the resulting code tends to be more readable. Further examples are given elsewhere in the documentation.