One Pipeline to Train Them All: Part 2 – Creating the Flexible Classifier Pipeline

Blog / Shannon Pace / November 9, 2016

Part I of this series described a pipeline based on a plug-in architecture designed for creating classifier models. However, describing is not enough. We need implementation! Fortunately, the pipeline’s structure enables various abstractions that make an implementation, in the abstract, quite simple. In this post we’ll go even further and provide example extensions, demonstrate usage of the pipeline, and look at how the textual model definitions, described in Part I, can be used to modify the pipeline’s behaviour without modifying its code. The example code in this post is in Python, but the pipeline can be implemented in any mainstream language.

The AdaptedModel Abstraction

In Part I, the pipeline was described as a process that outputs an “Adapter combined with a Model“. This process can be described using a class that explicitly pairs an Adapter with a Model, and which encodes the steps of the pipeline, more or less, in its methods. An example is the AdaptedModel class:

class AdaptedModel:
    """Pairs an adapter with a model for binary classification."""
    def __init__(self, adapter, model):
        """Sets this instance's adapter and model."""
        self.adapter = adapter
        self.model = model

    def classify(self, example):
        """Returns the classification of the given example."""
        return round(self.model.predict(self.adapter.adapt(example)))

    def accuracy(self, testing_set):
        """Returns the accuracy of the underlying model on the testing subset of the given testing dataset."""
        is_correct = lambda x: self.classify(x.example) == x.classification
        return len(list(filter(is_correct, testing_set))) / len(testing_set)

    def _adapt_training_set(self, training_set):
        """Prepares sequences and labels from the training subset of the given dataset which can be provided to this AdaptedModel instance's model."""
        input_shape = (len(training_set),) + self.adapter.shape
        sequences = numpy.zeros(input_shape, self.adapter.dtype)
        labels = numpy.zeros((len(training_set), 1), dtype = "int")
        for index, example in enumerate(training_set):
        sequences[index] = self.adapter.adapt(example)
        labels[index] = numpy.array([example.get_class()])
        return sequences, labels

    def train(self, dataset):
        """Trains this AdaptedModel instance's adapter, then uses it to generate input to train the instance's model. Returns the model's accuracy."""
        dataset.prepare_subsets()
        self.adapter.fit(dataset.get_training_set())
        sequences, labels = self._adapt_training_set(dataset.get_training_set())
        self.model.train(sequences, labels)
        return self.accuracy(dataset.get_testing_set())

The pipeline is implemented in the class’ train method. As you can see, this method:

  1. Loads a dataset.
  2. Fits an adapter over the training dataset.
  3. Uses the adapter to prepare input for a model.
  4. Trains the model using that input.
  5. Tests the model using the adapted testing dataset

The usage of the class is straightforward:

  • An AdaptedModel instance is prepared for use by calling its train method, which pulls examples from a Dataset, passes them through its Adapter, then feeds them into its Model.
  • A prepared AdaptedModel instance is used by calling its classify method, which returns the rounded prediction returned by its Model calculated for the sequence produced by its Adapter.

Later in this post, it will be shown that only a few lines of code are necessary to convert a textual model definition into an AdaptedModel instance that is ready to classify input. But first, let’s look at the small amount of infrastructure required to power the AdaptedModel.

Datasets, Adapters and Models

The AdaptedModel example assumes the existence of classes that implement the Dataset, Adapter and Model interfaces. The class/object concept is a useful abstraction for these software components for a number of reasons:

  • Each component has a small number of well-defined actions:
    • The Dataset a) prepares a training set and b) prepares a testing set.
    • The Adapter a) fits itself on examples and b) adapts examples for the model.
    • The Model a) trains itself on provided examples and b) classifies examples.
  • All three components can have internal state that changes in response to input; e.g. model training changes the internal state of the model.
  • Classes can implement interfaces, which provides a mechanism for ‘plugging’ components into the pipeline.

To support the presented AdaptedModel, the Dataset, Adapter and Model components need the following interfaces:

class Dataset:
    def __init__(self, *args): pass

    def prepare_subsets(self): pass

    def get_training_set(self): pass

    def get_testing_set(self): passclass Adapter:

    def __init__(self, *args): pass

    def fit(self, examples): pass

    def adapt(self, example): passclass Model:

    def __init__(self, *args): pass

    def train(self, sequences, labels): pass

    def predict(self, sequence): pass

A KnownExample class is also useful for pairing examples with their known classification:

class KnownExample:
    def __init__(self, example, classification):
        self.example = example
        self.classification = classification

Let’s look at concrete examples of each pipeline component. This implementation of Dataset fetches examples of the data to be classified, which correspond to a specific time period, from a database:

class SpanDataset:
    def __init__(self, start_datetime_str, end_datetime_str, training_ratio):
        """Sets the period from which to load examples and the ratio of examples in the training and testing subsets."""
        self.start_datetime = dateutil.parser.parse(start_datetime_str)
        self.end_datetime = dateutil.parser.parse(end_datetime_str)
        self.training_ratio = training_ratio

    def load(self):
        """Fetches examples from the database and uses them to prepare training and testing subsets."""
        import psycopg2
        connection = psycopg2.connect(DATABASE_CONNECTION_URL)
        cursor = connection.cursor()
        cursor.execute("SELECT example, classification FROM examples"
            "WHERE timestamp>=%s AND timestamp<=%s;",
            (self.start_datetime, self.end_datetime)
        )
        examples = []
        for example, classification in cursor.fetchall():
        examples.append(KnownExample(example, classification))
        random.shuffle(examples)
        subset_partition = int(len(examples) * self.training_ratio)
        self.training_set = examples[:subset_partition]
        self.testing_set = examples[subset_partition:]

    def get_training_set(self):
        """Returns the examples in this dataset's training set."""
        return self.training_set

    def get_testing_set(self):
        """Returns the examples in this dataset's test set."""
        return self.testing_set

This implementation of Adapter converts text into sequences by mapping each text token to an integer representing the frequency of the token in the training dataset:

class WordFrequencyAdapter:
    dtype = "int"

    def __init__(self, sequence_length, vocabulary_size):
        """Sets the length of sequences generated by this adapter; that is, the number of tokens in the example to be mapped to the word frequency index. Also sets the maximum size of the word frequency index."""
        self.sequence_shape = (sequence_length,)
        self.vocabulary_size = vocabulary_size
        def fit(self, examples):
        """Builds a word frequency index from the tokens in given examples."""
        tokens = (x for example in examples for x in tokenise(example.example))
        frequency_list = collections.Counter(tokens).most_common()
        self.index = dict(frequency_list[:self.vocabulary_size])
        def adapt(self, example):
        """Returns a list of integers representing the sequence of words in the given example. The function 'pad_sequence' simply appends zeroes to the sequence so that it fits the shape of 'self.sequence_shape'."""
        sequence = [self.index.get(token) for token in tokenise(example)]
        return pad_sequence([x for x in sequence if x], self.sequence_shape)

This implementation of Model uses the Scikit library to represent a classifier model:

class ScikitModel:
    def __init__(self, model):
        """Sets the Scikit model that this instance wraps."""
        self.model = model

    def train(self, tokens, labels):
        """Trains the underlying model."""
        self.model.fit(tokens, labels.ravel())
        return None

    def predict(self, sequence):
        """Predicts the class of the given sequence using the trained model."""
        return self.model.predict(sequence.reshape(1, -1)).item()

A useful way of organising the project is to place all classes implementing the Dataset interface into a datasets module, all Adapter classes in an adapters module, and so on. This enables Python’s getattr to be used to create a simple mapping between model definitions and program code.

Putting It All together

With those example components, let’s look at a basic use of the presented pipeline. Let’s first define our model, using a Python dict:

definition = {
    "dataset": {
        "name": "SpanDataset",
        "options": {
            "start_datetime_str": "2016-1-1",
            "end_datetime_str": "2016-1-2",
            "training_ratio": 0.8
        }
    },
    "adapter": {
        "name": "WordFrequencyAdapter",
        "options": {
            "sequence_length": 100
        }
    },
    "model": {
        "name": "ScikitModel",
        "options": {
            "model": sklearn.svm.SVC(kernel = "rbf", degree = 3, tol = 1e-3)
        }
    }
}

Then, using the pipeline is as simple as…

import datasets, adapters, models
        # Initialise the dataset, adapter and model
        objects.dataset = getattr(datasets, definition["dataset"]["name"])(**definition["dataset"]["options"])
        adapter = getattr(adapters, definition["adapter"]["name"])(**definition["adapter"]["options"])
        model = getattr(models, definition["model"]["name"])(**definition["model"]["options"])
        # Train the model (i.e. execute the pipeline).adapted_model = AdaptedModel(adapter, model)
        print("Accuracy:", adapted_model.train(dataset))
        # Use the prepared adapted model to classify (hypothetical) unseen examples.for example_id, example in get_unseen_examples(...):
        print("{}: {}".format(example_id, adapted_model.classify(example)))

And that’s all that’s required! In just a few lines, a model definition is converted into an adapter-model pair that is ready to perform classification.

The power of this approach is that, by making a few small changes to your non-code model definition, the behaviour of the pipeline can be changed substantially. For example, the following definition swaps the word frequency adapter for a word2vec adapter (the hypothetical Word2VecAdapter), and swaps the support vector machine for a neural network with a single-layer GRU structure (the hypothetical KerasModel):

definition = {
    "dataset": {
        "name": "SpanDataset",
        "options": {
            "start_datetime_str": "2016-1-1",
            "end_datetime_str": "2016-1-2",
            "training_ratio": 0.8
        }
    },
    "adapter": {
        "name": "Word2VecAdapter",
        "options": {
            "sequence_length": 100
        }
    },
    "model": {
        "name": "KerasModel",
        "options": {
            "model": ...
        }
    }
}

To make the model definitions JSON serialisable, all that is required is helper functions for mapping between definitions and the interfaces of external libraries. For example, a JSON representation of the "model" portion of the first definition could be:

    ...
    "model": {
        "name": "scikit_model_svm",
        "options": {
            "kernel": "rbf", "degree": 3, "tolerance": 1e-3
        }
    }    ...

The corresponding helper function is as simple as:

def scikit_model_smv(kernel, degree, tolerance):
    return ScikitModel(sklearn.svm.SVC(kernel = kernel, degree = degree, tol = tolerance))

All the Valves and Whistles

This, in a nutshell, is the core machine learning infrastructure we use in a system that will classify hundreds of thousands of examples daily, using a variety of adapters and models that are continuously trained and rotated through the system. But note that this is only the core of the system — application of the pipeline will inevitably require various additional features which depend on the use case. We’ve successfully integrated many features into the pipeline without modifying its structure, features such as: the collection of training metadata; the monitoring of hardware utilisation; the serialisation of adapters and models; the partial verification of model definitions; and thorough error reporting. For machine learning projects that require flexibility in terms of both data exploration and software functionality, consider this pipeline!