Pescador

Pescador is a library for streaming (numerical) data, primarily for use in machine learning applications.

Pescador addresses the following use cases:

  • Hierarchical sampling
  • Out-of-core learning
  • Parallel streaming

These use cases arise in the following common scenarios:

  • Say you have three data sources (A, B, C) that you want to sample. For example, each data source could contain all the examples of a particular category.

    Pescador can dynamically interleave these sources to provide a randomized stream D <- (A, B, C). The distribution over (A, B, C) need not be uniform: you can specify any distribution you like!

  • Now, say you have 3000 data sources, each of which may contain a large number of samples. Maybe that’s too much data to fit in RAM at once.

    Pescador makes it easy to interleave these sources while maintaining a small working set. Not all sources are simultaneously active, but Pescador manages the working set so you don’t have to. This way, you can process the full data set out of core, but using a bounded amount of memory.

  • If loading data incurs substantial latency (e.g., due to accessing storage access or pre-processing), this can be a problem.

    Pescador can seamlessly move data generation into a background process, so that your main thread can continue working.

To make this all possible, Pescador provides the following utilities:

  • Streamer objects encapsulate data generators for re-use, infinite sampling, and inter-process communication.
  • Multiplexing objects allow flexible sampling from multiple streams
  • Parallel streaming provides parallel processing with low communication overhead
  • Transform or modify streams with Maps (see Processing Data Streams)
  • Buffering of sampled data into fixed-size batches (see pescador.buffer_stream)

Installation

Pescador can be installed from PyPI through pip:

pip install pescador

or via conda using the conda-forge channel:

conda install -c conda-forge pescador

Introduction

Introduction

Pescador’s primary goal is to provide fine-grained control over data streaming and sampling. These problems can get complex quickly, so this section provides an overview of the concepts underlying Pescador’s design, and a summary of the provided functionality.

Definitions

To understand what pescador does, it will help to establish some common terminology. If you’re not already familiar with Python’s iterator and generator concepts, here’s a quick synopsis:

  1. An iterator is an object that produces a sequence of data, i.e. via __next__ / next().
  2. An iterable is an object that can produce iterators, i.e. via __iter__ / iter().
  3. A generator (or more precisely generator function) is a callable object that returns a single iterator.
  4. Pescador defines a stream as the sequence of objects produced by an iterator.
For example:
  • range is an iterable function
  • range(8) is an iterable, and its iterator produces the stream: 0, 1, 2, 3, ...

Streaming Data

  1. Pescador defines an object called a Streamer for the purposes of (re)creating iterators indefinitely and (optionally) interrupting them prematurely.

  2. Streamer implements the iterable interface, and can be iterated directly.

  3. A Streamer can be initialized with one of two types:
    • Any iterable type, e.g. range(7), ['foo', 'bar'], "abcdef", or another Streamer
    • A generator function and its arguments + keyword arguments.
  4. A Streamer transparently yields the data stream flowing through it

    • A Streamer should not modify objects in its stream.
    • In the spirit of encapsulation, the modification of data streams is achieved through separate functionality (see Processing Data Streams)

Multiplexing Data Streams

  1. Pescador defines a family of multiplexer or Mux classes for the purposes of multiplexing streams of data. For stochastic sampling applications, ShuffledMux and StochasticMux are the most useful classes.
  2. BaseMux inherits from Streamer, which makes all muxes both iterable and recomposable. Muxes allow you to construct arbitrary trees of data streams. This is useful for hierarchical sampling.
  3. Muxes are initialized with a container of one or more streamers, and parameters to control the mux’s sampling behavior..
  4. As a subclass of Streamer, a Mux also transparently yields the stream flowing through it, i.e. Streaming Data.

Processing Data Streams

Pescador adopts the concept of “transformers” for processing data streams.

  1. A transformer takes as input a single object in the stream.
  2. A transformer yields an object.
  3. Transformers are iterators, i.e. implement a __next__ method, to preserve iteration.
  4. An example of a built-in transformer is enumerate [ref]

Why Pescador?

Why Pescador?

Pescador was developed in response to a variety of recurring problems related to data streaming for training machine learning models. After implementing custom solutions each time these problems occurred, we converged on a set of common solutions that can be applied more broadly. The solutions provided by Pescador may or may not fit your problem. This section of the documentation will attempt to help you figure out if Pescador is useful for your application.

Hierarchical sampling

Hierarchical sampling refers to any process where you want to sample data from a distribution by conditioning on one or more variables. For example, say you have a distribution over real-valued observations X and categorical labels Y, and you want to sample labeled observations (X, Y). A hierarchical sampler might first select a value for Y, and then randomly draw an example X that has that label. This is equivalent to exploiting the laws of conditional probability: \(P[X, Y] = P[X|Y] \times P[Y]\).

Hierarchical sampling can be useful when dealing with highly imbalanced data, where it may sometimes be better to learn from a balanced sample and then explicitly correct for imbalance within the model.

It can also be useful when dealing with data that has natural grouping substructure beyond categories. For example, when modeling a large collection of audio files, each file may generate multiple observations, which will all be mutually correlated. Hierarchical sampling can be useful in neutralizing this bias during the training process.

Pescador implements hierarchical sampling through a family of Multiplexing classes. In its simplest form, the ShuffledMux takes as input a set of Streamer objects from which samples are drawn randomly. This effectively generates data by a process similar to the following pseudo-code:

1
2
3
while True:
    stream_id = random_choice(streamers)
    yield next(streamers[stream_id])

The ShuffledMux object also lets you specify an arbitrary distribution over the set of streamers, giving you fine-grained control over the resulting distribution of samples.

Muxes are also Streamers, so sampling hierarchies can be nested arbitrarily deep.

Out-of-core sampling

Another common problem occurs when the size of the dataset is too large for the machine to fit in RAM simultaneously. Going back to the audio example above, consider a problem where there are 30,000 source files, each of which generates 1GB of observation data, and the machine can only fit 100 source files in memory at any given time.

To facilitate this use case, the StochasticMux object allows you to specify a maximum number of simultaneously active streams (i.e., the working set). In this case, you would most likely implement a generator for each file as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def sample_file(filename):
    # Load observation data
    X = np.load(filename)

    while True:
        # Generate a random row as a dictionary
        yield dict(X=X[np.random.choice(len(X))])

streamers = [pescador.Streamer(sample_file, fname) for fname in ALL_30K_FILES]

# Keep 100 streamers active at a time
# Replace a streamer after it has generated (on average) 8 samples
for item in pescador.StochasticMux(streamers, n_active=100, rate=8):
    model.partial_fit(item['X'])

Note that data is not loaded until the generator is instantiated. If you specify a working set of size n_active=100, then StochasticMux will select 100 streamers at random to form the working set, and only sample data from within that set. StochasticMux will then randomly evict streamers from the working set and replace them with new streamers, according to its rate parameter. This results in a simple interface to draw data from all input sources but using limited memory.

StochasticMux provides a great deal of flexibility over how streamers are replaced, what to do when streamers are exhausted, etc.

In addition to ShuffledMux and StochasticMux, there are also deterministic multiplexers ChainMux and RoundRobinMux, which are useful when random sampling is undesirable.

Parallel processing

In the above example, all of the data I/O was handled within the generator function. If the generator requires high-latency operations such as disk-access, this can become a computational bottleneck.

Pescador makes it easy to migrate data generation into a background process, so that high-latency operations do not stall the main thread. This is facilitated by the Parallel streaming object, which acts as a simple wrapper around any streamer that produces samples in the form of dictionaries of numpy arrays. Continuing the above example:

1
2
3
4
mux_stream = pescador.StochasticMux(streamers, n_active=100, rate=8)

for item in pescador.ZMQStreamer(mux_stream):
    model.partial_fit(item['X'])

Simple interface

Finally, Pescador is intended to work with a variety of machine learning frameworks, such as scikit-learn and Keras. While many frameworks provide custom tools for handling data pipelines, each one is different, and many require using specific data structures and formats.

Pescador is meant to be framework-agnostic, and allow you to write your own data generation logic using standard Python data structures (dictionaries and numpy arrays). We also provide helper utilities to integrate with Keras’s tuple generator interface.

Basic examples

Basic examples

Streaming data

This example will walk through the basics of using pescador to stream samples from a generator.

Our running example will be learning from an infinite stream of stochastically perturbed samples from the Iris dataset.

Sample generators

Streamers are intended to transparently pass data without modifying them. However, Pescador assumes that Streamers produce output in a particular format. Specifically, a data is expected to be a python dictionary where each value contains a np.ndarray. For an unsupervised learning (e.g., SKLearn/MiniBatchKMeans), the data might contain only one key: X. For supervised learning (e.g., SGDClassifier), valid data would contain both X and Y keys, both of equal length.

Here’s a simple example generator that draws random samples of data from the Iris dataset, and adds gaussian noise to the features.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import numpy as np

def noisy_samples(X, Y, sigma=1.0):
    '''Generate an infinite stream of noisy samples from a labeled dataset.

    Parameters
    ----------
    X : np.ndarray, shape=(d,)
        Features

    Y : np.ndarray, shape=(,)
        Labels

    sigma : float > 0
        Variance of the additive noise

    Yields
    ------
    sample : dict
        sample['X'] is an `np.ndarray` of shape `(d,)`

        sample['Y'] is a scalar `np.ndarray` of shape `(,)`
    '''

    n, d = X.shape

    while True:
        i = np.random.randint(0, n)

        noise = sigma * np.random.randn(1, d)

        yield dict(X=X[i] + noise, Y=Y[i])

In the code above, noisy_samples is a generator that can be sampled indefinitely because noisy_samples contains an infinite loop. Each iterate of noisy_samples will be a dictionary containing the sample’s features and labels.

Streamers

Generators in python have a couple of limitations for common stream learning pipelines. First, once instantiated, a generator cannot be “restarted”. Second, an instantiated generator cannot be serialized directly, so they are difficult to use in distributed computation environments.

Pescador provides the Streamer class to circumvent these issues. Streamer simply provides an object container for an uninstantiated generator (and its parameters), and an access method generate(). Calling generate() multiple times on a Streamer object is equivalent to restarting the generator, and can therefore be used to simply implement multiple pass streams. Similarly, because Streamer can be serialized, it is simple to pass a streamer object to a separate process for parallel computation.

Here’s a simple example, using the generator from the previous section.

1
2
3
4
5
import pescador

streamer = pescador.Streamer(noisy_samples, X[train], Y[train])

stream2 = streamer.iterate()

Iterating over streamer.iterate() is equivalent to iterating over noisy_samples(X[train], Y[train]).

Additionally, Streamer can be bounded easily by saying streamer.iterate(max_iter=N) for some N maximum number of samples.

Finally, because iterate() is such a common operation with streamer objects, a short-hand interface is provided by treating the streamer object as if it was a generator:

1
2
3
4
5
6
import pescador

streamer = pescador.Streamer(noisy_samples, X[train], Y[train])

# Equivalent to stream2 above
stream3 = streamer()

Iterating over any of these would then look like the following:

1
2
3
4
5
6
7
8
for sample in streamer.iterate():
    # do something
    ...

# For convenience, the object directly behaves as an iterator.
for sample in streamer:
    # do something
    ...

This example demonstrates how to re-use and multiplex streamers.

We will assume a working understanding of the simple example in the previous section.

Stream re-use and multiplexing

The StochasticMux streamer provides a powerful interface for randomly interleaving samples from multiple input streams. StochasticMux can also dynamically activate and deactivate individual Streamers, which allows it to operate on a bounded subset of streams at any given time.

As a concrete example, we can simulate a mixture of noisy streams with differing variances.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from __future__ import print_function

import numpy as np

from sklearn.datasets import load_breast_cancer
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import ShuffleSplit
from sklearn.metrics import accuracy_score

from pescador import Streamer, StochasticMux

def noisy_samples(X, Y, sigma=1.0):
    '''Copied over from the previous example'''
    n, d = X.shape

    while True:
        i = np.random.randint(0, n, size=1)

        noise = sigma * np.random.randn(1, d)

        yield dict(X=X[i] + noise, Y=Y[i])

# Load some example data from sklearn
raw_data = load_breast_cancer()
X, Y = raw_data['data'], raw_data['target']

classes = np.unique(Y)

rs = ShuffleSplit(n_splits=1, test_size=0.1)
for train, test in rs.split(X):

    # Instantiate a linear classifier
    estimator = SGDClassifier()

    # Build a collection of Streamers with different noise scales
    streams = [Streamer(noisy_samples, X[train], Y[train], sigma=sigma)
               for sigma in [0, 0.5, 1.0, 2.0, 4.0]]

    # Build a mux stream, keeping 3 streams alive at once
    mux_stream = StochasticMux(streams,
                               3,        # Keep 3 streams alive at once
                               rate=64)  # Use a poisson rate of 64

    # Fit the model to the stream, use at most 5000 samples
    for sample in mux_stream(max_iter=5000):
        estimator.partial_fit(sample['X'], sample['Y'], classes=classes)

    # And report the accuracy
    Ypred = estimator.predict(X[test])
    print('Test accuracy: {:.3f}'.format(accuracy_score(Y[test], Ypred)))

In the above example, each Streamer in streams can make infinitely many samples. The rate=64 argument to StochasticMux says that each stream should produce some n samples, where n is sampled from a Poisson distribution of rate rate. When a stream exceeds its bound, it is deactivated, and a new streamer is activated to fill its place.

Setting rate=None disables the random stream bounding, and simply runs each active stream until exhaustion.

The StochasticMux streamer can sampled with or without replacement from its input streams, according to the mode option. Setting this parameter to single_active means that each stream can be active at most once.

Streams can also be sampled with non-uniform weighting by specifying a vector of weights.

Finally, exhausted streams can be removed by setting mode=’exhaustive’.

Sampling from disk

A common use case for pescador is to sample data from a large collection of existing archives. As a concrete example, consider the problem of fitting a statistical model to a large corpus of musical recordings. When the corpus is sufficiently large, it is impossible to fit the entire set in memory while estimating the model parameters. Instead, one can pre-process each song to store pre-computed features (and, optionally, target labels) in a numpy zip NPZ archive. The problem then becomes sampling data from a collection of NPZ archives.

Here, we will assume that the pre-processing has already been done so that each NPZ file contains a numpy array of features X and labels Y. We will define infinite samplers that pull n examples per iterate.

import numpy as np
import pescador

def sample_npz(npz_file, n):
    '''Generate an infinite sequence of contiguous samples
    from the input `npz_file`.

    Each iterate has `n > 0` rows.
    '''
    with np.load(npz_file) as data:
        # How many rows are in the data?
        # We assume that data['Y'] has the same length
        n_total = len(data['X'])

        while True:
            # Compute the index offset
            idx = np.random.randint(n_total - n)
            yield dict(X=data['X'][idx:idx + n],
                       Y=data['Y'][idx:idx + n])

Applying the sample_npz function above to a list of npz_files, we can make a multiplexed streamer object as follows:

n = 16
npz_files = #LIST OF PRE-COMPUTED NPZ FILES
streams = [pescador.Streamer(sample_npz, npz_f, n) for npz_f in npz_files]

# Keep 32 streams alive at once
# Draw on average 16 patches from each stream before deactivating
mux_stream = pescador.StochasticMux(streams, n_active=32, rate=16)

for batch in mux_stream(max_iter=1000):
    # DO LEARNING HERE
    pass
Memory-mapping

The NPZ file format requires loading the entire contents of each archive into memory. This can lead to high memory consumption when the number of active streams is large. Note also that memory usage for each NPZ file will persist for as long as there is a reference to its contents. This can be circumvented, at the cost of some latency, by copying data within the streamer function:

def sample_npz_copy(npz_file, n):
    with np.load(npz_file) as data:
        # How many rows are in the data?
        # We assume that data['Y'] has the same length
        n_total = len(data['X'])

        while True:
            # Compute the index offset
            idx = np.random.randint(n_total - n)
            yield dict(X=data['X'][idx:idx + n].copy(),  # <-- Note the explicit copy
                       Y=data['Y'][idx:idx + n].copy())

The above modification will ensure that memory is freed as quickly as possible.

Alternatively, memory-mapping can be used to only load data as needed, but requires that each array is stored in its own NPY file:

def sample_npy_mmap(npy_x, npy_y, n):

    # Open each file in "copy-on-write" mode, so that the files are read-only
    X = np.load(npy_x, mmap_mode='c')
    Y = np.load(npy_y, mmap_mode='c')

    n_total = len(X)

    while True:
        # Compute the index offset
        idx = np.random.randint(n_total - n)
        yield dict(X=X[idx:idx + n],
                   Y=Y[idx:idx + n])

# Using this streamer is similar to the first example, but now you need a separate
# NPY file for each X and Y
npy_x_files = #LIST OF PRE-COMPUTED NPY FILES (X)
npy_y_files = #LIST OF PRE-COMPUTED NPY FILES (Y)
streams = [pescador.Streamer(sample_npz, npy_x, npy_y n)
           for (npy_x, npy_y) in zip(npy_x_files, npy_y_files)]

# Then construct the `StochasticMux` from the streams, as above
mux_streame = pescador.StochasticMux(streams, n_active=32, rate=16)

...

Buffered Streaming

In a machine learning setting, it is common to train a model with multiple input datapoints simultaneously, in what are commonly referred to as “minibatches”. To achieve this, pescador provides the pescador.maps.buffer_stream map transformer, which will “buffer” a data stream into fixed batch sizes.

Following up on the first example, we use the noisy_samples generator.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import pescador

# Create an initial streamer
streamer = pescador.Streamer(noisy_samples, X[train], Y[train])

minibatch_size = 128
# Wrap your streamer
buffered_sample_gen = pescador.buffer_stream(streamer, minibatch_size)

# Generate batches in exactly the same way as you would from the base streamer
for batch in buffered_sample_gen:
    ...

A few important points to note about using pescador.maps.buffer_stream:

  • pescador.maps.buffer_stream will concatenate your arrays, adding a new sample dimension such that the first dimension contains the number of batches (minibatch_size in the above example). e.g. if your samples are shaped (4, 5), a batch size of 10 will produce arrays shaped (10, 4, 5)
  • Each key in the batches generated will be concatenated (across all the samples buffered).
  • pescador.maps.buffer_stream, like all pescador.maps transformers, returns a generator, not a Streamer. So, if you still want it to behave like a streamer, you have to wrap it in a streamer. Following up on the previous example:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
batch_streamer = pescador.Streamer(buffered_sample_gen)

# Generate batches as a streamer:
for batch in batch_streamer:
    # batch['X'].shape == (minibatch_size, ...)
    # batch['Y'].shape == (minibatch_size, 1)
    ...


# Or, another way:
batch_streamer = pescador.Streamer(pescador.buffer_stream, streamer, minibatch_size)

Advanced examples

API Reference

API Reference

Streamer

class pescador.Streamer(streamer, *args, **kwargs)

A wrapper class for recycling iterables and generator functions, i.e. streamers.

Wrapping streamers within an object provides two useful features:

  1. Streamer objects can be serialized (as long as its streamer can be)
  2. Streamer objects can instantiate a generator multiple times.

The first feature is important for parallelization (see zmq_stream), while the second feature facilitates infinite streaming from finite data (i.e., oversampling).

Examples

Generate random 3-dimensional vectors

>>> def my_generator(n):
...     for i in range(n):
...         yield i
>>> stream = Streamer(my_generator, 5)
>>> for i in stream:
...     print(i)  # Displays 0, 1, 2, 3, 4

Or with a maximum number of items

>>> for i in stream(max_items=3):
...     print(i)  # Displays 0, 1, 2

Or infinitely many examples, restarting the generator as needed

>>> for i in stream.cycle():
...     print(i)  # Displays 0, 1, 2, 3, 4, 0, 1, 2, ...

Or finitely many examples, restarting the generator as needed

>>> for i in stream.cycle(max_iter=7):
...     print(i)  # Displays 0, 1, 2, 3, 4, 0, 1

An alternate interface for the same:

>>> for i in stream(cycle=True):
...     print(i)  # Displays 0, 1, 2, 3, 4, 0, 1, 2, ...

Attributes

streamer (generator or iterable) Any generator function or iterable python object.
args (list)
kwargs (dict) Parameters provided to streamer, if callable.
__call__(max_iter=None, cycle=False)

Convenience interface for interacting with the Streamer.

Parameters:

max_iter : None or int > 0

Maximum number of iterations to yield. If None, attempt to exhaust the stream. For finite streams, call iterate again, or use cycle=True to force an infinite stream.

cycle: bool

If True, cycle indefinitely.

Yields:

obj : Objects yielded by the generator provided on init.

See also

iterate, cycle

active

Returns true if the stream is active (ie there are still open / existing streams)

cycle(max_iter=None)

Iterate from the streamer infinitely.

This function will force an infinite stream, restarting the streamer even if a StopIteration is raised.

Parameters:

max_iter : None or int > 0

Maximum number of iterations to yield. If None, iterate indefinitely.

Yields:

obj : Objects yielded by the streamer provided on init.

is_activated_copy

is_active is true if this object is a copy of the original Streamer and has been activated.

iterate(max_iter=None)

Instantiate an iterator.

Parameters:

max_iter : None or int > 0

Maximum number of iterations to yield. If None, exhaust the stream.

Yields:

obj : Objects yielded by the streamer provided on init.

See also

cycle
force an infinite stream.

Parallel streaming

class pescador.ZMQStreamer(streamer, min_port=49152, max_port=65535, max_tries=100, copy=False, timeout=5)

Parallel data streaming over zeromq sockets.

This allows a data generator to run in a separate process from the consumer.

A typical usage pattern is to construct a Streamer object from a generator and then use ZMQStreamer to execute the stream in one process while the other process consumes data.

Examples

>>> # Construct a streamer object
>>> S = pescador.Streamer(my_generator)
>>> # Wrap the streamer in a ZMQ streamer
>>> Z = pescador.ZMQStreamer(S)
>>> # Process as normal
>>> for data in Z:
...     MY_FUNCTION(data)
__call__(max_iter=None, cycle=False)

Convenience interface for interacting with the Streamer.

Parameters:

max_iter : None or int > 0

Maximum number of iterations to yield. If None, attempt to exhaust the stream. For finite streams, call iterate again, or use cycle=True to force an infinite stream.

cycle: bool

If True, cycle indefinitely.

Yields:

obj : Objects yielded by the generator provided on init.

See also

iterate, cycle

active

Returns true if the stream is active (ie there are still open / existing streams)

cycle(max_iter=None)

Iterate from the streamer infinitely.

This function will force an infinite stream, restarting the streamer even if a StopIteration is raised.

Parameters:

max_iter : None or int > 0

Maximum number of iterations to yield. If None, iterate indefinitely.

Yields:

obj : Objects yielded by the streamer provided on init.

is_activated_copy

is_active is true if this object is a copy of the original Streamer and has been activated.

iterate(max_iter=None)

Note: A ZMQStreamer does not activate its stream, but allows the zmq_worker to do that.

Yields:

data : dict

Data drawn from streamer(max_iter).

Multiplexing

Defines the interface and several varieties of mux. A mux is a Streamer which wraps N other Streamer objects, and at every step yields a sample from one of its sub-streamers.

This module defines the following Mux types:

StochasticMux

A Mux which chooses its active streams stochastically, and chooses samples from the active streams stochastically. StochasticMux is equivalent to the pescador.Mux from versions <2.0.

StochasticMux has a mode parameter which selects how it operates, with the following modes:

with_replacement

Sample streamers with replacement. This allows a single stream to be used multiple times simultaneously.

exhaustive

Each streamer is consumed at most once and never re-activated.

single_active

Each stream in the candidate pool is either active or not. Streams are revived when they are exhausted. This setting makes it so that streams in the active pool are uniquely selected from the candidate pool, where as with_replacement allows the same stream to be used more than once.
ShuffledMux
A ShuffledMux interleaves samples from all given streamers.
RoundRobinMux
Iterates over all the streamers in strict order.
ChainMux
As in itertools.chain(), runs the first streamer to exhaustion, then the second, then the third, etc. Uses only a single stream at a time.
Mux

The pescador<2.0 Mux is still available and works the same, but is deprecated.

We recommend replacing all uses of Mux with StochasticMux.

StochasticMux(streamers, n_active, rate[, …]) Stochastic Mux
ShuffledMux(streamers[, weights, random_state]) A variation on a mux, which takes N streamers, and samples from them equally, guaranteeing all N streamers to be “active”.
RoundRobinMux(streamers[, mode, random_state]) A Mux which iterates over all streamers in strict order.
ChainMux(streamers[, mode, random_state]) As in itertools.chain().
Mux(streamers, k[, rate, weights, …]) Stochastic multiplexor for Streamers

Maps (Transformers)

Map functions perform operations on a stream.

Important note: map functions return a generator, not another Streamer, so if you need it to behave like a Streamer, you have to wrap the function in a Streamer again.

buffer_stream(stream, buffer_size[, partial]) Buffer “data” from an stream into one data object.
tuples(stream, *keys) Reformat data as tuples.
keras_tuples(stream[, inputs, outputs]) Reformat data objects as keras-compatible tuples.

Release notes

Changes

v2.0.0 (2018-02-05)

This release is the second major revision of the pescador architecture, and includes many substantial changes to the API.

This release contains no changes from the release candidate 2.0.0rc0.

  • #103 Deprecation and refactor of the Mux class. Its functionality has been superseded by new classes StochasticMux, ShuffledMux, ChainMux, and RoundRobinMux.
  • #109 Removed deprecated features from the 1.x series: - BufferedStreamer class - Streamer.tuples method
  • #111 Removed the internally-facing StreamActivator class
  • #113 Bugfix: multiply-activated streamers (and muxes) no longer share state
  • #116 Streamer.cycle now respects the max_iter parameter
  • #121 Added minimum dependency version requirements
  • #106 Added more advanced examples in the documentation

v1.1.0 (2017-08-25)

This is primarily a maintenance release, and will be the last in the 1.x series.

  • #97 Fixed an infinite loop in Mux
  • #91 Changed the default timeout for ZMQStreamer to 5 seconds.
  • #90 Fixed conda-forge package distribution
  • #89 Refactored internals of the Mux class toward the 2.x series
  • #88, #100 improved unit tests
  • #73, #95 Updated documentation

v1.0.0 (2017-03-18)

This release constitutes a major revision over the 0.x series, and the new interface is not backward-compatible.

  • #23 Preserve memory alignment of numpy arrays across ZMQ streams
  • #34 Rewrite of all core functionality under a unified object interface Streamer.
  • #35, #52 Removed the StreamLearner interface and scikit-learn dependency
  • #44 Added a base exception class PescadorError
  • #45 Removed the util submodule
  • #47, #60 Improvements to documentation
  • #48 Fixed a timeout bug in ZMQ streamer
  • #53 Added testing and support for python 3.6
  • #57 Added the .tuple() interface for Streamers
  • #61 Improved test coverage
  • #63 Added random state to Mux
  • #64 Added __call__ interface to Streamer

v0.1.3 (2016-07-28)

  • Added support for joblib>=0.10

v0.1.2 (2016-02-22)

  • Added pescador.mux parameter revive. Calling with with_replacement=False, revive=True will use each seed at most once at any given time.
  • Added pescador.zmq_stream parameter timeout. Setting this to a positive number will terminate dangling worker threads after timeout is exceeded on join. See also: multiprocessing.Process.join.

v0.1.1 (2016-01-07)

  • pescador.mux now throws a RuntimeError exception if the seed pool is empty

v0.1.0 (2015-10-20)

Initial public release

Indices and tables