Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

How to manage Python libraries in Hopsworks

>
5/12/2021
>
Robin Andersson

TLDR: Hopsworks is the Data-Intensive AI platform with a Feature Store for building complete end-to-end machine learning pipelines. This tutorial will show an overview of how to install and manage Python libraries in the platform. Hopsworks provides a Python environment per project that is shared among all the users in the project. All common installation alternatives are supported such as using the pip and conda package managers, in addition to libraries packaged in a .whl or .egg file and those that reside on a git repository. Furthermore, the service includes automatic conflict/dependency detection, upgrade notifications for platform libraries and easily accessible installations logs.


Introduction

The Python ecosystem is huge and seemingly ever growing. In a similar fashion, the number of ways to install your favorite package also seems to increase. As such, a data analysis platform needs to support a great variety of these options.

The Hopsworks installation ships with a Miniconda environment that comes preinstalled with the most popular libraries you can find in a data scientists toolkit, including TensorFlow, PyTorch and scikit-learn. The environment may be managed using the Hopsworks Python service to install or libraries which may then be used in Jupyter or the Jobs service in the platform.

In this blog post we will describe how to install a python library, wherever it may reside, in the Hopsworks platform. As an example, this tutorial will demonstrate how to install the Hopsworks Feature Store client library, called hsfs. The library is Apache V2 licensed, available on github and published on PyPi.

Prerequisites

To follow this tutorial you should have an Hopsworks instance running on https://hopsworks.ai. You can register for free, without providing credit card information, and receive USD 4000 worth of free credits to get started. The only thing you need to do is to connect your cloud account.

Navigate to Python service

The first step to get started with the platform and install libraries in the python environment is to create a project and then navigate to the Python service.

Inspecting the environment

When a project is created, the python environment is also initialized for the project. An overview of all the libraries and their versions along with the package manager that was used to install them are listed under the Manage Environment tab.

Installing hsfs by name

The simplest alternative to install the hsfs library is to enter the name as is done below and click Install as is shown in the example. The installation itself can then be tracked under the Ongoing Operations tab and when the installation is finished the library appears under the Manage Environment tab.

If hsfs would also have been available on an Anaconda repo, which is currently not the case, we would need to specify the channel where it resides. Searching for libraries on Anaconda repos is accessed by setting Conda as the package location.

Search and install specific version

If a versioned installation is desired to get the hsfs version compatible with a certain Hopsworks installation, the search functionality shows all the versions that have been published to PyPi in a dropdown. Simply pick a version and press Install. The example found below demonstrates this.

Installing a distribution

Many popular Python libraries have a great variety of builds for different platforms, architectures and with different build flags enabled. As such it is also important to support directly installing a distribution. To install hsfs as a wheel requires that the .whl file was previously uploaded to a Dataset in the project. After that we need to select the Upload tab, which means that the library we want to install is contained in an uploaded file. Then click the Browse button to navigate in the file selector to the distribution file and click Install as the following example demonstrates.

Installing a requirements.txt

Installing libraries one by one can be tedious and error prone, in that case it may be easier to use a requirements.txt file to define all the dependencies. This makes it easier to move your existing Python environment to Hopsworks, instead of having to install each library one by one.

The file may for example look like this.

This dependency list defines that version 2.2.7 of hsfs should be installed, along with version 2.9.0 of imageio and the latest available release for mahotas. The file needs to be uploaded to a dataset as in the previous example and then selected in the UI. 

Installing from a git repository

A great deal of python libraries are hosted on git repositories, this makes it especially handy to install a library during the development phase from a git repository. The source code for the hsfs package is as previously mentioned, hosted on a public github repository. Which means we only need to supply the URL to the repository and some optional query parameters. The subdirectory=python query parameter indicates that the setup.py file, which is needed to install the package, is in a subdirectory in the repository called python.

Automatic conflict detection

After each installation or uninstall of a library, the environment is analyzed to detect libraries that may not work properly. As hsfs depends on several libraries, it is important to analyze the environment and see if there are any dependency issues. For example, pandas is a dependency of hsfs and if uninstalled, an alert will appear that informs the user that the library is missing. In the following example pandas is uninstalled to demonstrate that.

Notification on library updates

When new releases are available for the hsfs library, a notification is shown to make it simple to upgrade to the latest version. Currently, only the hops and hsfs libraries are monitored for new releases as they are utility libraries used to interact with the platform services. By clicking the upgrade text, the library is upgraded to the recommended version automatically.

When installations fail

Speaking from experience, Python library installations can fail for a seemingly endless number of reasons. As such, to find the cause, it is crucial to be able to access the installation logs in a simple way to find a meaningful error. In the following example, an incorrect version of hsfs is being installed, and the cause of failure can be found a few seconds later in the logs.

Get started

Hopsworks is available both on AWS and Azure as a managed platform. Visit hopsworks.ai to try it out.

From 100 to ZeRO: PyTorch and DeepSpeed ZeRO on any Spark Cluster with Maggy

>
4/27/2021
>
Moritz Meister

TLDR; Maggy is an open source framework that lets you write generic PyTorch training code (as if it is written to run on a single machine) and execute that training distributed across a GPU cluster. Maggy enables you to write and debug PyTorch code on your local machine, and then run the same code at scale without having to change a single line in your program. Going even further, Maggy provides the distribution transparent use of ZeRO, a sharded optimizer recently proposed by Microsoft. You can use ZeRO to improve your memory efficiency with a single change in your Maggy configuration. You can try Maggy in the Hopsworks managed platform for free.

Distributed learning - An introduction

Deep learning has seen a surge in activity with the availability of high level frameworks such as PyTorch to build and train models. A few lines of code in a notebook are sufficient to create powerful classifiers from scratch. However, both the data and model sizes to achieve state-of-the-art performance are ever increasing, so that training on your local GPU becomes a hopeless endeavour. 

Enter distributed training. Distributed training allows you to train the same model on multiple GPUs on different shards of your data to speed up training times. In the ideal case, training on 4 GPUs simultaneously should reduce your training time by 75%. In distributed training, each GPU computes a forward and backward pass over its own batch of the data. For the model update, the computed gradients are shared and averaged between the nodes. This way, all models update their parameters with the same combined gradient and stay in sync. This additional communication step introduces additional overhead of course, which is why ideal scaling is never truly achieved. 

So if distributed training is such a great tool to accelerate training, why is its use still uncommon among normal PyTorch users? Because it is too tedious to use! A dummy example for starting distributed training might look something like this.

def train(args):
    args.world_size = args.gpus * args.nodes
    os.environ['MASTER_ADDR'] = '10.51.45.25'
    os.environ['MASTER_PORT'] = '8888'
    mp.spawn(train, nprocs=args.gpus, args=(args,))
    rank = args.nr * args.gpus + gpus
    dist.init_process_group(backend='nccl',
                            init_method='env://',
                            world_size=args.world_size,
                            rank=rank)
    torch.manual_seed(0)
    model = args.Module()
    torch.cuda.set_device(gpu)
    model = nn.parallel.DistributedDataParallel(
        model.cuda(gpu),
        device_ids=[gpu])
    ...

Going even further, you would need to launch your code on all of your nodes and take care of graceful shutdowns and collecting the results. This is where Maggy comes in. Maggy allows you to launch your PyTorch training script without any changes on Spark clusters. It takes care of the training processes for each node, the resource isolation and node connections.

Next we will explore what is needed to run distribution transparent training on Maggy as well as the restrictions that still exist with the framework.

Building blocks for distribution transparent training

Configuring Maggy

First of all, Maggy requires its experiment to be configured for distributed training. In the most common use case this means passing your model, hyperparameters and your training/test set. Configuring is as easy as creating a config object. Hyperparameters, train and test set are optional and can also be directly loaded in the training loop. If your training loop consists of more than one module such as in training GANs with a Generator and Discriminator or Policy gradient methods in RL, you can also pass a list of modules.

from maggy.experiment_config import TorchDistributedConfig

config = TorchDistributedConfig(module=models.resnet50,train_set=train_set, test_set=test_set)

Writing the training function

Maggy’s API requires the training function to follow a unified signature. Users have to pass their model class, its hyperparameters and the train and test set to the training function. 

def train(module, hparams, train_set, test_set):
...

If you want to load your datasets on each node by yourself, you can also omit passing the datasets in the config. In fact, this is highly recommended when working with larger dataset objects. Additionally, every module used in the training function should be imported within that function. Think of your training function as completely self contained. Last but not least, users should use the PyTorch DataLoader (as is best practice anyways). Alternatively, you can also use Maggy’s custom PetastormDataLoader to load large datasets from Petastorm parquet files. When using the latter, users need to ensure that datasets are even, that is they should have the same number of batches per epoch on all nodes. When using PyTorch’s DataLoader, you do not have to care about this. So to summarize, your training function needs to

  • Implement the correct signature
  • Import all used modules inside the function
  • Use the PyTorch DataLoader (or Maggy’s PetastormDataLoader with even Datasets)

Distributed training on Maggy - A complete example

It’s time to combine all the elements we introduced so far in a complete example of distributed training with Maggy. In this example, we are going to create some arbitrary training data, define a function approximator for scalar fields,write our training loop and launch the distributed training. 

Generate some training data

In order to not rely on specific datasets, we are going to create our own dataset. For this example, a scalar field should suffice. So first of all we randomly sample x and y and compute some function we want our neural network to approximate. PyTorch’s TensorDataset can then be used to form a proper dataset from this data.

import torch
import torch.nn as nn
import torch.nn.functional as F


coord = torch.rand((10000,2)) * 10 - 5  # Create random x/y coordinates in [-5,5]
z = torch.sin(coord[:,1]) + torch.cos(coord[:,0])  # Calculate scalar field for all points to get a dataset
train_set = torch.utils.data.TensorDataset(coord[:8000,:], z[:8000])
test_set = torch.utils.data.TensorDataset(coord[8000:,:], z[8000:])

Define the approximator

Next up we define our function approximator. For our example a standard neural network with 3 layers suffices, although in real applications you would of course train much larger networks.

class Approximator(torch.nn.Module):

    def __init__(self):
        super().__init__()
        self.l1 = torch.nn.Linear(2,100)
        self.l2 = torch.nn.Linear(100,100)
        self.l3 = torch.nn.Linear(100,1)
        
    def forward(self, x):

Writing the training loop

At the heart of every PyTorch program lies the training loop. Following the APIs introduced earlier, we define our training function as follows.

def train(module, hparams, train_set, test_set):
    import torch
    model = module()

    n_epochs = 100
    batch_size = 64
    lr = 1e-5    
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    loss_criterion = torch.nn.MSELoss()
    train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size)
    test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size)    
    def eval_model():
        loss = 0
        model.eval()
        for coord, z in test_loader:
            prediction = model(coord).detach()
            loss += loss_criterion(prediction, z)
        return loss

    for epoch in range(n_epochs):
        model.train()
        for coord, z in train_loader:
            optimizer.zero_grad()
            prediction = model(coord)
            loss = loss_criterion(prediction, z)
            loss.backward()
            optimizer.step()
    return eval_model()

As you can see, there is no additional code for distributed training. Maggy takes care of all the necessary things. 

Starting the training

All that remains now is to configure Maggy and run our training. For this, we have to create the config object and run the lagom function.

from maggy import experiment
from maggy.experiment_config import TorchDistributedConfig

config = TorchDistributedConfig(module=Approximator, train_set = train_set, test_set=test_set)
experiment.lagom(train, config)  # Starts the training loop

Evaluating the training

After running the training on 4 nodes, we can see that our approximator has converged to a good estimate of our scalar field. Of course, this would also be possible on a local node. But with more complex models and larger training sets such as the ImageNet dataset, distributed learning becomes necessary to leverage your workloads.

Try it for yourself

Maggy is open-source and documentation is available at maggy.ai. Give us a star or get in touch if you have more questions. Maggy is also available for all Hopsworks users in the managed platform on AWS or Azure. You can get started for free (no credit card required).

Star us on Github
Follow us on Twitter

Beyond Brainless AI with a Feature Store

>
4/27/2021
>
Jim Dowling

TLDR; Machine learning models are only as good as the data (features) they are trained on. In Enterprises, data scientists can often train very effective models in the lab - when given a free hand on which data to use. However, many of those data sources are not available in production environments due to disconnected systems and data silos. An AI-powered product that is limited to the data available within its application silo cannot recall historical data about its users or relevant contextual data from external sources. It is like a jellyfish - its autonomic system makes it functional and useful, but it lacks a brain. You can, however, evolve your models from brain-free AI to Total Recall AI with the help of a Feature Store, a centralized platform that can provide models with low latency access to data spanning the whole enterprise.

Autonomic AI

Jellyfish are undoubtedly complex creatures with sophisticated behaviour - they move, mate, and munch. They eat and discard waste from the same opening. Yet they have no brain - their autonomic system suffices for their needs. The biggest breakthroughs in AI in recent years have been enabled by deep learning, which requires large volumes of data and specialized compute hardware (e.g., GPUs). However, just like a jellyfish, recent successes in image processing and NLP with deep learning required no brain - no working memory, history, or context. Much of deep learning today is Jellyfish AI. We have made incredible progress in identifying objects in images and translating natural language, yet such deep learning models typically only require the immediate input - the image or the text - to make their predictions. The input signal is information-rich. These image and NLP models seldom require a 'brain' to augment the input with context or memories. Google translate doesn’t need to know the historical enmity between the Scots and the Irish in whether its spelt Whisky or Whiskey. Jellyfish AI is impressive - the input data is information rich and models can learn fantastically advanced behaviour from labeled examples. All “knowledge” needed to make predictions is embedded in the model. The model does not need to have working memory (e.g., it doesn’t need to know the user has clicked 10 times on your website in the last minute). 

Now compare using AI for image classification or NLP to building a web application that will use AI to interact with a user browsing a website. The immediate input data your application receives from your web browser are clicks on a mouse or a keyboard. The input signal is information-light - it is difficult to train a useful model using only user clicks. However, large Internet companies collect reams of information about users from many different sources, and transform that user data into features (information-rich signals that are ready to be used for either training models or making predictions with models). Models can then combine the click features with historical features about users and contextual features to build information-rich inputs to models. For example, you could augment the user’s action with everything you know about a user’s history and context to increase the user's engagement with the product. The feature store for machine learning (ML) stores and serves these features to models. We believe that AI-powered products that can easily access historical and contextual features will lead the next wave of AI in the Enterprise, and those products will need a feature store for ML.

Data Scientist and ML Engineer Disconnect

A frequent source of tension in Enterprises is between “naive” data scientists and “street-wise” ML engineers. Motivated by good software engineering practices, many ML engineers believe that ML models should be self-contained and tension can arise with data scientists who want to include features in their models that are “obviously not available in the production system”. 

However, data scientists are tasked with building the best models they can to add to the bottom line - engage more users, increase revenue, reduce costs. They know they can train better models with more data and more diverse sources of data. For example, a data scientist trying to predict if a financial transaction is suspected of money laundering or not might discover that a powerful feature is the graph of financial transfers related to this individual in the previous day/week/month. They can reduce false alerts of money launder by a factor of 100*, reducing the costs of investigating the false alerts, saving the business millions of dollars per year. The data scientist hands the model over the wall to the ML engineer who dismisses the idea of including the graph-based features in the production environment, and tension arises when communicating what is possible and what is not possible in production. The data scientist is crestfallen - but need not be.

The Feature Store is now the de facto Enterprise platform for storing historical and contextual features for AI-powered products. The Feature Store is, in effect, the brain for AI-powered products, the three-eyed Raven that enables the model to access the history and state of the whole Enterprise, not just the local state in the application. 

Feature Stores enable applications or model serving infrastructure to take information-light inputs (such as a cookie identifying a user or a shopping cart session) and enrich it with features harvested from anywhere in the Enterprise or beyond to build feature vectors capable of making better predictions. And as we know from Deep Learning, model accuracy improves predictably with more features and data, so there will be an increasing trend towards adding more and more features to models to improve their accuracy. Andrew Ng has recently been advocating this approach that he calls data-centric development, instead of the more traditional model-centric development. Another noticeable trend in large Enterprises is building faster and more scalable Feature stores that can supply those features within the time budget available to the AI-powered product. But AI is going to revolutionize Enterprise software products, so how do we make sure our AI-enabled products are not just Jellyfish AI?

Enabling AI-Enabled Products with a Feature Store

Anti-Pattern: Re-implementing the “feature engineering” code for the serving layer is non DRY. This introduces the risk of ‘skew’ between the features used to train models and the features served to operational models.

How do we avoid limiting AI-enabled products to only using the input features collected by the application itself? Models will benefit from access to all data that the Enterprise has collected about the user, product, or its context. A potential source of friction here, however, is the dominant architectural preference for microservices and data stove-pipes. Models themselves are being deployed as microservices in model-serving infrastructure, like KFServing or TensorFlow Serving or Nvidia Triton. How can we give these models access to more features?

Anti-Pattern: microservice based Online Feature Store. Microservices can be used to compute features in real-time from raw input data. When features can be pre-computed, microservices is an anti-pattern. This architecture adds latency, needs to be made highly available, handle hotspots, and microservices consume resources even when they are not needed. Serverless functions might be acceptable in the case where seconds of warmup latency is tolerable. But the microservices should still be reused to compute the training data - otherwise, there is a risk of training/serving skew. 

Without a Feature Store, applications could contact microservices or databases to compute or retrieve the historical and contextual features (data), respectively. Computing the features in the application itself is an anti-pattern as it duplicates the feature engineering code - that code should already exist to generate the training data for the model. Re-implementing feature engineering logic in applications also introduces the risk of skew between the features computed in the application and the features computed for training. If serving and training environments use the same programming language, they could avoid non-DRY code by reusing a versioned library that computes the features. However, even if feature engineering logic is written in Python in both training and serving, it may use PySpark for training and Python for serving or different versions of Python. Versioned libraries can help, but are not a general solution to the feature skew problem. 

The Feature Store solves the training/serving skew problem by computing the features once in a feature pipeline. The feature pipeline is then reused to (1) create training data, and (2) save those pre-engineered features to the Feature Store. The serving infrastructure can then retrieve those features when needed to make predictions. For example, when an application wants to make a prediction about a user, it would supply the user’s ID, shopping cart ID, session ID, or location to retrieve pre-engineered features from the Feature Store. The features are retrieved as a feature vector, and the feature vector is sent to the model that makes the prediction. The Feature Store service for retrieving feature vectors is commonly known as the Online Feature Store. The logic for retrieving features from the Online Feature Store can also be implemented in model serving infrastructure, not just in applications. The advantage of looking up features in serving infrastructure is that it keeps the application logic cleaner, and the application just sends IDs and real-time features to the model serving infrastructure, that in turn, builds the feature vector, sends it to the model for prediction, and returns the result to the application. Low latency and high throughput are important properties for the online feature store - the faster you can retrieve features and the more features you can include in a given time budget, the more accurate models you should be able to deploy in production. To quote DoorDash:

“Latency on feature stores is a part of model serving, and model serving latencies tend to be in the low milliseconds range. Thus, read latency has to be proportionately lower” 
AI-enabled products use models to make predictions, and those models need an Online Feature Store to provide them with historical and contextual data (features) to make better predictions.

Data-Centric AI with the Online Feature Store 

So, to summarize, if you want to give your ML models a brain, connect them up to a feature store. For Enterprises building personalized services, the featurestore can enrich their models with a 360 degree Enterprise wide view of the customer - not just a product-specific view of the customer. The feature store enables more accurate predictions through more data being available to make those predictions, and this ultimately enables products with better user experience, increased engagement, and the product intelligence now expected by users.

* This is based on a true story.

Star us on Github
Follow us on Twitter

How to transform Amazon Redshift data into features with Hopsworks Feature Store

>
2/9/2021
>
Ermias Gebremeskel

TLDR: Hopsworks Feature Store provides an open ecosystem that connects to the largest number of data storage, data pipelines, and data science platforms. You can connect the Feature Store to Amazon Redshift to transform your data into features to train models and make predictions.

Introduction

Amazon Redshift is a popular managed data warehouse on AWS. Companies use Redshift to store structured data for traditional data analytics. This makes Redshift a popular source of raw data for computing features for training machine learning models.

The Hopsworks Feature Store is the leading open-source feature store for machine learning. It is provided as a managed service on AWS and it includes all the tools needed to transform data in Redshift into features that will be used when training and serving ML models.

In this blog post, we show how you can configure the Redshift storage connector to ingest data, transform that data into features (feature engineering) and save the pre-computed features in the Hopsworks Feature Store.

Prerequisites

To follow this tutorial users should have an Hopsworks Feature Store instance running on https://hopsworks.ai. You can register for free with no credit-card and receive 4000 USD of credits to get started.

Users should also have an existing Redshift cluster. If you don't have an existing cluster, you can create one by following the AWS Documentation 

Step 1 - Configure the Redshift Storage Connector

The first step to be able to ingest Redshift data into the feature store is to configure a storage connector. The Redshift connector requires you to specify the following properties. Most of them are available in the properties area of your cluster in the Redshift UI.

  • Cluster identifier: The name of the cluster
  • Database driver: You can use the default JDBC Redshift Driver `com.amazon.redshift.jdbc42.Driver` (More on this later)
  • Database endpoint: The endpoint for the database. Should be in the format of `[UUID].eu-west-1.redshift.amazonaws.com`
  • Database name: The name of the database to query
  • Database port: The port of the cluster. Defaults to 5349

There are two options available for authenticating with the Redshift cluster. The first option is to configure a username and a password. The password is stored in the secret store and made available to all the members of the project.

The second option is to configure an IAM role. With IAM roles, Jobs or notebooks launched on Hopsworks  do not need to explicitly authenticate with Redshift, as the HSFS library will transparently use the IAM role to acquire a temporary credential to authenticate the specified user. 

In Hopsworks, there are two different ways to configure an IAM role: a per-cluster IAM role or a federated IAM role (role chaining). For the per-cluster IAM role, you select an instance profile for your Hopsworks cluster when launching it in hopsworks.ai, and all jobs or notebooks will be run with the selected IAM role.  For the federated IAM role, you create a head IAM role for the cluster that enables Hopsworks to assume a potentially different IAM role in each project. You can even restrict it so that only certain roles within a project (like a data owner) can assume a given role. 

With regards to the database driver, the library to interact with Redshift *is not* included in Hopsworks - you need to upload the driver yourself. First, you need to download the library from here. Select the driver version without the AWS SDK. You then upload the driver files to the “Resources” dataset in your project, see the screenshot below.

Upload the Redshift Driver (jar file) from your local machine to the Resources dataset.

Then, you add the file to your notebook or job before launching it, as shown in the screenshots below.

Before starting the JupyterLab server, add the Redshift driver jar file, so that it becomes available to jobs run in the notebook.

Step 2- Define an external (on-demand) Feature Group

Hopsworks supports the creation of (a) cached feature groups and (b) external (on-demand) feature groups. For cached feature groups, the features are stored in Hopsworks feature store. For external feature groups, only metadata for features is stored in the feature store - not the actual feature data which is read from the external database/object-store. When the external feature group is accessed from a Spark or Python job, the feature data is read on-demand using a connector from the external store. On AWS, Hopsworks supports the creation of external feature groups from a large number of data stores, including Redshift, RDS, Snowflake, S3, and any JDBC-enabled source. 

In this example, we will define an external feature group for a table in Redshift. External feature groups in Hopsworks support “provenance” in the Hopsworks Web UI, you can track which features are stored on which external systems and how they are computed. Additionally HSFS (the Python/Scala library used to interact with the feature store) provides the same APIs for external feature groups as for cached feature groups.

An external (on-demand) feature group can be defined as follow:

-- CODE language-bash -- # We named the storage connector defined in step 1 telco_redshift_cluster redshift_conn = fs.get_storage_connector("telco_redshift_cluster") telco_on_dmd = fs.create_on_demand_feature_group(name="telco_redshift", version=1, query="select * from telco", description="On-demand feature group for telecom customer data", storage_connector=redshift_conn, statistics_config=True) telco_on_dmd.save()

When running `save()` the metadata is stored in the feature store and statistics for the data are computed and made available through the Hopsworks UI. Statistics helps data scientists in the quest of building better features from raw data.

Step 3 -  Engineer features and save to the Feature Store

On-demand feature groups can be used directly as a source for creating training datasets. This is often the case if a company is migrating to Hopsworks and there are already feature engineering pipelines in production writing data to Redshift.

This flexibility provided by Hopsworks allows users to hit the ground running from day 1, without having to rewrite their pipelines to take advantage of the benefits the Hopsworks feature store provides.

-- CODE language-bash -- telco_on_dmd\ .select(['customer_id', 'internet_service', 'phone_service', 'total_charges', 'churn'])\

On-demand feature groups can also be joined with cached feature groups in Hopsworks to create training datasets. This helper guide  explains in detail how the HSFS joining APIs work and how they can be used to create training datasets.

If, however, Redshift contains raw data that needs to be feature engineered, you can retrieve a Spark DataFrame backed by the Redshift table using the HSFS API.

-- CODE language-bash -- spark_df = telco_on_dmd.read()

You can then transform your `spark_df` into features using feature engineering libraries. It is also possible to retrieve the dataframe as a Pandas dataframe and perform the feature engineering steps in Python code. 

You can then save the final dataframe containing the engineered features to the feature store as a cached feature group:

-- CODE language-bash -- telco_fg = fs.create_feature_group(name="telco_customer_features", version=1, description="Telecom customer features", online_enabled=True, time_travel_format="HUDI", primary_key=["customer_id"], statistics_config=True) spark_df = telco_on_dmd.read()

Storing feature groups as cached feature groups within Hopsworks provides several benefits over on-demand feature groups. First it allows users to leverage Hudi for incremental ingestion (with ACID properties, ensuring the integrity of the feature group) and time travel capabilities. As new data is ingested, new commits are tracked by Hopsworks allowing users to see what has changed over time. On each commit, statistics are computed and tracked in Hopsworks, allowing users to understand how the data has changed over time.

Cached feature groups can also be stored in the online feature store (`online_enabled=True`), thus enabling low latency access to the features using the online feature store API.

Get started

Get started today on Hopsworks.ai by configuring your Redshift storage connector and run this notebook.

Elasticsearch is dead, long live Open Distro for Elasticsearch

>
1/14/2021
>
Mahmoud Ismail

TLDR: The need for an open-source alternative to Elasticsearch has recently become more evident; platforms that bundle Open Distro for Elasticsearch are able to future-proof open-source support for free-text search and Elasticsearch. In this post, we describe how Hopsworks leverages the authentication and authorization support in Open Distro for Elasticsearch to make free text search a project-based multi-tenant service in Hopsworks. More concretely, Hopsworks now supports dynamic role-based access control (RBAC) to indexes in elasticsearch with no performance penalty by building on Open Distro for Elasticsearch (ODES).

Need Open-Source Elasticsearch?
Try Open Distro.

In January 2021, Elastic switched from the Apache V2 open-source license for both Elasticsearch and Kibana to a non open-source license to Server Side Public License (SSPL).

Hopsworks is an open-source platform that includes Open Distro for Elasticsearch (a fork of Elasticsearch) and Kibana.

In Hopsworks, we use Elasticsearch to provide free-text search for AI assets (features, models, experiments, datasets, etc). We also make Elasticsearch indexes available for use by programs run in Hopsworks. As we interpret it, the latter functionality means we contravene the licensing terms of the SSPL:

“If you make the functionality of the Program or a modified version available to third parties as a service... (license conditions apply)”

Luckily, we recently made the switch from Elasticsearch to Open Distro for Elasticsearch, supported by AWS, which is Apache v2 licensed.

Dynamic RBAC for Elasticsearch

Open Distro for Elasticsearch supports Active Directory and LDAP for authentication and authorization. Using the Security plugin, you can use RBAC to control the actions a user can perform. A role defines the cluster operations and index operations a user can perform, including access to indices, and even fine-grained field and document level access. RBAC allows an administrator to define a single security policy and apply it to all members of a department. But individuals may be members of multiple departments, so a user might be given multiple roles. With dynamic role-based access control you can change the set of roles a user can hold at a given time.

For example, if a user is a member of two departments - one for accessing banking data and another one for accessing trading data, with dynamic RBAC, you could restrict the user to only allow her to hold one of those roles at a given time. The policy for deciding which role the user holds could, for example, depend on what VPN (virtual private network) the user is logged in to or what building the user is present in. In effect, dynamic roles would allow the user to hold only one of the roles at a time and sandbox her inside one of the domains - banking or trading. It would prevent her from cross-linking or copying data between the different trading and banking domains.

Hopsworks implements a dynamic role-based access control model through its project-based multi-tenant security model.  Every Project has an owner with full read-write privileges and zero or more members.  A project owner may invite other users to his/her project as either a Data Scientist (read-only privileges and run jobs privileges) or Data Owner (full privileges). Users can be members of (or own) multiple Projects, but inside each project, each member (user) has a unique identity - we call it a project-user identity.  For example, user Alice in Project A is different from user Alice in Project B - (in fact, the system-wide (project-user) identities are ProjectA__Alice and ProjectB__Alice, respectively).

As such, each project-user identity is effectively a role with the project-level privileges to access data and run programs inside that project. If a user is a member of multiple projects, she has, in effect, multiple possible roles, but only one role can be active at a time when performing an action inside Hopsworks. When a user performs an action (for example, runs a program) it will be executed with the project-user identity. That is, the action will only have the privileges associated with that project. The figure below illustrates how Alice has a different identity for each of the two projects (A and B) that she is a member of. Each project contains its own separate private assets. Alice can use only one identity at a time which guarantees that she can’t access assets from both projects at the same time.

Hopsworks enables you to host sensitive data in a shared cluster using a project-based access control security model (an implementation of dynamic role-based access control). In Hopsworks, a project is a secure sandbox with members, data, code, and services. Similar to GitHub repositories, projects are self-service: users manage membership, roles, and can securely share data assets with other projects. This project-based multi-tenant security model enables users to host both sensitive and shared data in a single Hopsworks cluster - you do not need to manage and pay for separate clusters. 

An important aspect of project-based multi-tenancy is that assets can be shared between projects - sharing does not mean that data is duplicated. The current assets that can be shared between projects are: files/directories in HopsFS, Hive databases, feature stores, and Kafka topics. For example, in the figure below there are three users (User1, User2, and User3)  and two projects (A and B). User1 is a member of project A, while User2 and User3 are members of project B. All three users (User1, User2, User3) can access the assets shared between project A and project B. As sharing does not mean copying, the access control rules for the asset are updated to give users in the other project read or write permissions on the shared asset.


Project-user identity is primarily based on a X.509 certificate issued internally by Hopsworks. Access control policies, however, are implemented by the platform services (HopsFS, Hive, Feature Store, Kafka), and for Elasticsearch Open Distro, permissions are managed using an open-source Hopsworks project-based authorizer plugin.

Using Elastic Index from Spark in Hopsworks

The following PySpark code snippet, available as a notebook when you run the Spark Tour on Hopsworks, shows how to read from an index that is private to a project from PySpark. There is also an equivalent Scala/Spark notebook.

-- CODE language-bash -- from hops import elasticsearch, hdfs df = spark.read.option("header","true").csv("hdfs:///Projects/" + hdfs.project_name() + "/Resources/akc_breed_info.csv") # Write df to the project's private index called 'newindex' df.write.format('org.elasticsearch.spark.sql').options(**elasticsearch.get_elasticsearch_config("newindex")).mode("Overwrite").save() # Read from the project's private index called 'newindex' reader = spark.read.format("org.elasticsearch.spark.sql").options(**elasticsearch.get_elasticsearch_config("newindex")) df = reader.load().orderBy("breed") df.show()

Access Control using JWT and Hopsworks Project Membership

In Hopsworks, we use Public Key Infrastructure (PKI) with X.509 certificates to authenticate and authorize users. Every user and every service in a Hopsworks cluster has a private key and an X.509 certificate. Hopsworks projects also support multi-tenant services that are not currently backed by X.509 certificates, including Elasticsearch. Open Distro for Elasticsearch supports authentication and access control using JSON Web Tokens (JWT).  Similar to application X.509 certificates, Hopsworks’ resource manager (HopsYARN) issues a JWT for each submitted job and propagates it to running containers. Using the JWT, user code can then securely make calls to Elasticsearch indexes owned by the project. The JWT is rotated automatically before it expires and invalidated by HopsYARN once the application has finished.

For every Hopsworks project, a number of private indexes can be created in Elasticsearch: an index for real-time logs of applications in that project (accessible via Kibana), an index for ML experiments in the project, and an index for provenance for the project’s applications and file operations. Elastic indexes are private to the project - they are not accessible by users that are not members of the project. This access control is implemented as follows: when a request is made on Elasticsearch using a JWT token, our authorizer plugin extracts the project-specific username from the JWT token, which is of the form:

ProjectA__Alice 

The index names have the following form:

ProjectA__ElasticIndex 

Our plugin checks if a project-specific user is allowed to read/write an index by checking that the prefix (ProjectA) of both the user and the index match one another. We plan to add support for sharing elasticsearch indexes between projects by storing a list of projects allowed to perform read and write operations, respectively, on the indexes belonging to a project.

X.509 Service certificates

In Hopsworks, services communicate with each other using their own certificate to authenticate and encrypt all traffic. Each service in Hopsworks, that supports TLS encryption and/or authentication, has its own service-specific X.509 certificate, including all services in the ELK Stack (Elasticsearch, Kibana, and Logstash). Service certificates contain the Fully Qualified Domain Name (FQDN) of the host they are installed on and the login name of the system user that the process runs as. They are generated when a user provisions Hopsworks, and they have a long lifespan. Service certificates can be rotated automatically in configurable intervals or upon request of the administrator. 

Securely accessing Elastic Indexes in Jobs on Kubernetes 


Hopsworks can be integrated with Kubernetes by configuring it to use one of the available authentication mechanisms: API tokens, credentials, certificates, and IAM roles for AWS’ managed EKS offering. Hopsworks can run users’ jobs on Kubernetes that have project-specific security material,  X.509 certificates and JWTs, materialized to the launched Pods so user code can securely access services in Hopsworks, such as Open Distro for Elasticsearch. That is, Kubernetes jobs launched from within a project in Hopsworks are only allowed to access those Elasticsearch indexes that belong to that project.

Summary

In this post, we gave an overview of Hopsworks project-based multi-tenant security model and how we use Hopsworks projects and JWT tokens to make Open Distro for Elasticsearch a multi-tenant service.

Follow us on Twitter

Star us on Github


Hopsworks 2.0: The Next Generation Platform for Data-Intensive AI with a Feature Store

>
11/17/2020
>
Theofilos Kakantousis

TLDR; Hopsworks 2.0 is now generally available. Hopsworks is the world's first Enterprise Feature Store along with an advanced end-to-end ML platform. This version includes improvements to the User Interface, client APIs and improved integration with Kubernetes. Hopsworks 2.0 also comes with upgrades for popular ML frameworks such as TensorFlow 2.3 and PyTorch 1.6. 

The Hopsworks 2.0 platform:

In addition to bug and documentation fixes, Hopsworks 2.0 includes the following new features and improvements:

  • Feature Store: New Rich APIs and Extended Integration with External Data Sources
  • Provenance: Track and Navigate from Models to Training Datasets to Features
  • Python Programs as Schedulable Feature Engineering Jobs
  • Fine-grained User Access Control for Feature Stores
  • GitLab Support 
  • New example programs for feature engineering and usage for training and serving

Detailed release notes are available at the Hopsworks GitHub repository.

Feature Store: New Rich APIs and Extended Integration with External Data Sources

The Feature Store API is now more intuitive by following a Pandas-like style programming style for feature engineering and works with abstractions such as feature groups and training datasets. Allowing for an improved workflow. Solving the biggest pain points of professional users working with Big Data and AI: being able to organize and discover data with as few steps as possible. An all new API documentation page is accessible here

This release brings the new ability to enrich Feature Groups abstractions by attaching metadata in the form of tags and then allowing free-text search on these, available across the feature store to all users.

More details on the upgraded feature store experience are outlined in the Hopsworks Feature Store API 2.0 blog post and the user guide.

Provenance: Track and Navigate from Models to Training Datasets to Features

Hopsworks is built from the ground-up with support for implicit provenance. What that means is data engineers and data scientists that develop machine learning pipelines, from data ingestion, to feature engineering, to model training and serving, get lineage information for free as the data flows and is transformed through the pipeline. Users can then navigate from a model served in production back to the ML experiment that created the model along with its training dataset and finally all the way back to the feature groups and ingestion pipelines the training dataset was created from.  

Python Programs as Schedulable Feature Engineering Jobs

Up till this moment in Hopsworks’ journey, its Jobs service provided users with the means to run and monitor applications and do feature engineering using popular distributed processing frameworks such as Apache Spark/PySpark and Apache Flink. Hopsworks now further increases its reach to Feature Store users by  enabling them to run such feature engineering jobs by using pure Python programs, detangling them for example from the need to run their code within a PySpark context/cluster. Developers can now do feature engineering with Pandas or their library of choice and still benefit from Jobs capabilities such as include them in Airflow data and ML pipelines as well as run Python Jupyter notebooks as jobs, simplifying the process of including notebooks in development and production pipelines. User guide is available here.

Fine-Grained User Access Control for Feature Stores

Sharing datasets across projects of a Hopsworks instance is now made simpler by using a fine-grained access control mechanism. Users can now select from a list of options that enable a dataset to be shared with different permissions for each project and for each role users have in the target project (Data Owner, Data Scientist). In addition, these new dataset sharing semantics now make it easier to share feature stores with different permissions across projects. For more details please check the user guide here.

GitLab Support

Hopsworks developers can now use GitLab for code version control, in addition to the existing GitHub support. JupyterLab in Hopsworks comes shipped with the Git plugin so developers can work with Git directly from within the JupyterLab IDE. Further details are available in the user-guide here

Python Environment Updates and Examples

Hopsworks 2.0 is shipped with the latest and the greatest frameworks of the data engineering and data science Python world. Every Hopsworks instance now comes pre-installed with Pandas version 1.1.2, TensorFlow 2.3.1, PyTorch 1.6.0 and many more. 

A plethora of new feature engineering and machine learning examples are now available at our examples GitHub repository. Data engineers can now follow examples that demonstrate how to work from the Feature Store with Redshift  or Snowflake. Also new end-to-end and parallel experiments with deep learning and TensorFlow 2.3 are available.

Get Started

Much more is to come from Hopsworks with the upcoming releases so make sure to follow us on Twitter and, if you like it, star us on GitHub.

Visit http://hopsworks.ai to get started with a free instance of Hopsworks and explore the new Hopsworks Feature Store and data science support of the Hopsworks 2.0 series and don’t forget to join our growing community!

Feature Store for MLOps? Feature reuse means JOIN

>
10/23/2020
>
Jim Dowling

TLDR; Many engineers conflate operational Feature Stores with key-value (KV) stores, like DynamoDB and Cassandra. In fact, KV stores are missing a key mechanism needed to make features reusable: JOINs. JOINs enable features to be reused by different models. This blog is about how to scale your ML infrastructure by reusing cached features so that the number of feature pipelines you manage does not grow linearly with the number of models you run in production.

Img from OSDC talk by Twitter. Twitter evaluate their Feature Store by the number of features that are reused across teams.

The Cost of No JOINs: One Feature Pipeline per Model

If you don’t reuse features across different models, you will need a new feature pipeline for every new model you put in production. In the diagram below, there is a 1:1 mapping between train/test datasets and models. For every new model you put in production, you will write a new feature pipeline from your data stores that transforms and validates the raw data and performs aggregations, materializing train/test data to files. A ML training program (or pipeline) then trains and validates a model with the train/test data, after which it is tested and deployed to production.

It is very difficult to reuse the features in the materialized train/test datasets, as they tend to be stored in a format specific to a ML framework: .tfrecord for TensorFlow,.npy for PyTorch - and they cannot be easily combined with features stored in other train/test datasets. This is often due to the limitations of the file formats: neither TFRecord nor NPY support projections (selecting just a subset of columns).

If you intend to run hundreds of models in production, running hundreds of pipelines will explode your technical debt.


In online feature stores that do not support JOINs, it is typically the responsibility of the application to perform the JOIN of the cached features to create the feature vector. So, if you build your own online feature store using Cassandra or DynamoDB, you will need to also add logic for joining (and ordering) features in your applications/models. 

Every new feature you add to that model will need an update to the feature pipeline and changes to the application logic - making it costly and potentially cross-team work.

JOINs enable Feature Reuse


A JOIN is a Structured Query Language (SQL) command that combines data from two different two database tables. JOINs are used to create a view over the data that originates from one or more tables. If we assume that we store features normalized in database tables, then we are able to create sets of features (training datasets) by selecting different features from different tables and joining them together using a common join key. The join key(s) identify the common entity these feature values represent. 

If we now assume that features are stored as columns in tables, then what is a train/test dataset? It is a set of features along with a target column. Assuming the features are already present in existing tables, we can just join those features (columns) together to create a view - this view is the train/test dataset. The order of features (in a view) that makes up a train/test dataset is significant - a train/test dataset is a permutations of features - not a combination. Views (enabled by JOINs) enable a massive number of train/test datasets to be defined over a small number of shared features (stored in tables)

In Hopsworks, we store features in tables that we call “Feature Groups”. Feature Groups introduce a level of indirection between the raw input data and the train/test datasets used to train models, see below. They store a cached copy of the features, computed using the same feature pipeline from earlier, but this time, the feature pipeline writes to one or both of the stores that much a feature store: (1) a scalable store for train/test features (offline feature store) and (2) a low latency, high throughput store for features for serving (online feature store). 

In Hopsworks, we use Apache Hive as a scalable database for the offline store, and MySQL Cluster (NDB) as the online store. Our version of Hive stores its metadata in NDB and its data files in HopsFS/S3. In Hopsworks, FeatureGroups are database tables in Hive and MySQL Cluster, along with feature metadata (also tables in the same NDB database). 

As we can see in the diagram below, if we have N features available in the Feature Store, we can create an unlimited number of train/test datasets by simply joining features together from the offline feature store. 

In Hopsworks, we use Spark, with its cost-based optimizer, to perform JOINs. Spark, together with Parquet/Hudi/ORC help optimize joins by supporting partitioned data, push-down projections, SortMergeJoin, and 

  • a hint;
  • a join type (inner, left, outer, etc);
  • a join condition (equi-join);
  • an estimation of the input data size

JOINs in Online Feature Stores

If you use a KV store as your online store, you are back in the same situation as at start - you need a feature pipeline for every new model you put in production. But with Feature Groups (tables in NDB), we can materialize feature vectors (the individual row of features that is fed directly to the model for scoring) from the different tables by performing a join.

NDB supports sophisticated optimizations for JOINs, and can push-down many JOINs to the database nodes. In practice, in NDB even complex queries can “achieve latency down to 5-10 ms for a transaction that contains around 30 SQL statements”.

Conclusions

Reuse features to save on infrastructure and the number of feature pipelines needed to maintain models in production. JOINs is the method we use in Hopsworks to reuse cached features across different models - both for training and serving. We use Spark as the JOIN engine for the offline feature store and NDB with its parallelized, push-down JOINs for low-latency joins in the online feature store.

ML Engineer Guide: Feature Store vs Data Warehouse

>
10/8/2020
>
Jim Dowling

TLDR; The feature store is a data warehouse of features for machine learning (ML). Architecturally, it differs from the traditional data warehouse in that it is a dual-database, with one database (row-oriented) serving features at low latency to online applications and the other database (column-oriented) storing large volumes of features, used by Data Scientists to create train/test datasets and by batch applications doing offline model scoring.

Features Store: Data Warehouse Déjà Vu

Data warehouses democratized access to Enterprise data by centralizing data in a single platform and then empowering business analysts with visual tools, such as Tableau and Power BI. No longer did they need to know what data resides where and how to query that data in that platform. They could derive historical insights into the business using BI tools. 

Data scientists, in contrast, build predictive models to derive business insights. The feature store is the data warehouse for Data Science - it is a central vault for storing documented, curated, and access-controlled features that can be used across many different models. The feature store ingests data from the Enterprise’s many different sources after transforming, aggregating, and validating the data. 

Feature pipelines need to be written to ensure that data reliably flows from existing sources and is available in a format ready to be consumed by ML training pipelines and models.

Most Data Scientists currently do not have a feature store. They spend most of their time looking for, cleaning, and featurizing data. Hence, the (very real) cliché that 80% of data science is data wrangling. Data Scientists without a feature store work in an era akin to how business analysts worked before the advent of data warehouses, with low individual and organizational productivity.

The Data Warehouse is an input
to the Feature Store 

Both platforms are a central store of curated data used to generate insights into the data. Both platforms have pipelines (ETL and feature pipelines, respectively) to ingest data from one or more disparate sources (operational databases, data lakes, etc).

Both benefit from metadata catalogs to organize data sets and access control to share data with only authorized actors. 

Both platforms can be designed to scale-out on commodity hardware and store large volumes of data, although typically a data warehouse stores only relevant to analysis (modern data lakehouses are designed to store large volumes of data more cost efficiently).

Feature Store as a Dual Database

The main architectural difference between a data warehouse and a feature store is that the data warehouse is typically a single columnar database, while the feature store is typically implemented as two databases:

  • an offline feature store for serving large batches of features to (1) create train/test datasets and (2) batch applications scoring models using those batches of features, and
  • an online feature store for serving a single row of features (a feature vector) to be used as input features for an online model for an individual prediction.

The offline feature store is typically required to efficiently serve and store large amounts of feature data, while the online feature store is required to return feature vectors in very low latency (e.g., < 10ms). Examples of databases used for the offline feature store are Apache Hive and BigQuery and examples of online feature stores include MySQL Cluster, Redis, and DynamoDB. 

Note that if you want to reuse features in different train/test datasets for different models, your database or application will need to join features together. This is true for both the offline and online feature stores. If your feature store does not support joining features, that is, you do not reuse features across different models, you (or some system) will need to create a new ingestion pipeline for every new model you support in production.

Detailed Comparison

In the table below, we see an overview of the main architectural differences between feature stores and data warehouses. Data warehouses are used primarily by business analysts for interactive querying and for generating historical reports/dashboards on the business. Feature stores are used by both data scientists and by the online/batch applications, and they are fed data by feature pipelines, typically written in Python or Scala/Java. 

Data scientists typically use Python programs to create train/test datasets by joining existing features in the feature store together and materializing the train/test datasets in a file format best suited to the framework they are going to train their model in (e.g., TFRecord for TensorFlow, NPY for PyTorch). Data warehouses and SQL currently lack this capability to create train/test datasets in ML file formats.

Feature Data should be Validated
before Ingestion

The table also shows the differences in the types of data stored, as well as how the data is stored, validated, and queried. A data warehouse stores data in tables along with schemas for describing the type of data and constraints for columns. Similarly, the feature store stores typed data (typically in tables), but as features are typically stored as ready-to-consume numerical values or vectors (embeddings) or tensors, there is less need for a richer set of column types compared to a data warehouse.  Foreign key constraints are typically not supported in feature stores, due to the difficulty in enforcing such constraints between online and offline stores.

As model training is very sensitive to bad data (null values, outliers cause numerical instability, missing values), feature data should be validated before ingestion. Data validation frameworks, such as Great Expectations and Deequ, have appeared to help implement feature pipelines that apply predicates (data validation rules) on all the features ingested into the feature store, ensuring high data quality in the feature store. 

Domain specific languages (DSL) are sometimes used to define the feature transformations, aggregations, and data validation rules in feature pipelines, but general purpose languages (Python, Scala) are commonly used when non-trivial feature engineering is required. 

Using the feature store to create train/test data

Data scientists are one of the main users of the feature store. They use a feature repository to perform exploratory data analysis (EDA) - searching/browsing for available features and inspecting feature values/schemas/statistics. Data Scientists mainly use Python to select features to create train/test datasets. This typically involves joining features together to create a  train/test dataset in their file format of choice (.tfrecord, .csv, .npy, .petastorm, etc). Sometimes feature stores support a DSL (domain specific language) to create train/test datasets or other languages such as Scala/Java. 

Online feature store

Online applications use the online feature store to retrieve feature values with low latency to build feature vectors that are sent to models for predictions. In contrast to higher latency data warehouses, feature stores may be required to return feature vectors in single millisecond latency - only really achievable in row-oriented or key-value stores. 

The typical access pattern for retrieving features is a key-value lookup, but if features are to be reused in the online feature store, then joins are again required (either in the database or in the application). In some databases (such as MySQL Cluster), a small number of joins can be performed at very low latency.

Feature statistics to monitor for feature
drift and data drift

Descriptive statistics (e.g., mean, standard deviation) for features are also useful when identifying data drift in online models. Your monitoring infrastructure can calculate statistics on live prediction traffic, and compare those statistics with the values in the feature store to identify data drift for the live traffic, potentially required retraining of the model.

Time-Travel 

Temporal databases support time-travel: the ability to query data as it was at a given point-in-time or data changes in a given time-interval. The “AS OF SYSTEM TIME” syntax was introduced to SQL 2011 to standardize point-in-time queries, while the “VERSIONS BETWEEN SYSTEM TIME ... AND ... “ syntax was introduced to identify the versioned changes to data in a time interval. Time-travel is supported in some data warehouses, but does not have universal support across all vendors.

For a feature store time-travel has several important uses: when creating train/test data (e.g., training data is data from the years 2010-2018, while test data is data from the range 2019-2020). Time-travel is also useful to make changes to a dataset (e.g., rollback a bad commit of data to the dataset) or to compare metadata (statistics) for features and how they change over time. We rarely require time-travel for features used in serving. Time-travel is also important when performing point-in-time joins, where we ensure that there is no data leakage from the future when we create train/test datasets from historical data.

Feature Pipelines 

Data warehouses typically have timed triggers for running ETL jobs (or data pipelines) to ingest the latest data from operational databases, message queues, and data lakes. Similarly, feature pipelines can timed triggers to transform and aggregate the latest data from different sources before storing it in both the online and offline feature store for scoring by online and offline applications. However, additional pipelines can also feed features to the feature store. 

Predictions made by models can be stored in the feature store along with the outcomes for those predictions. There can be long lags of even days or months or years before outcomes become available - e.g., a prediction on whether a loan will be repaid or not), but as they arrive new training data becomes available that can be used to trigger re-training of models.

Conclusion

Data warehouses can be used to store pre-computed features, but they do not provide much more functionality beyond that for ML pipelines. When Data Scientists need to create train/test data using Python or when online features (for serving features to online models) are needed at low latency, you need a feature store. Similarly, if you want to detect feature drift or data drift, you need support for computing feature statistics and identifying drift.


One function is all you need for ML Experiments

>
9/30/2020
>
Robin Andersson

TLDR; Hopsworks provides support for machine learning (ML) experiments. That is, it can automatically track the artifacts, graphs, performance, logs, metadata, and dependencies of your ML programs.Many of you already know about platforms like MLflow, so why should you read about Hopsworks Experiments?  Because you do not have to rewrite your TensorFlow/PyTorch/Scikit-learn programs to get tracking and distributed ML for free, and TensorBoard comes built-in. We discuss how Hopsworks uniquely supports implicit provenance to transparently create metadata and how it is combined with the oblivious training function to make your training distribution transparent. 

Hopsworks Introduction

Hopsworks is a single platform for both data science and data engineering that is available as both an open-source platform and a SaaS platform, including a built-in feature store. You can train models on GPUs at scale, easily install any Python libraries you want using pip/conda, run Jupyter notebooks as jobs, put those jobs in Airflow pipelines, and even write (Py)Spark or Flink applications that run at scale. 

As a development environment, Hopsworks provides a central, collaborative development environment that enables machine learning teams to easily share results and experiments with teammates or generate reports for project stakeholders. All resources have strong security, data governance, backup and high availability support in Hopsworks, while assets are stored in a single distributed file system (with data stored on S3 in the cloud).

A Hopsworks ML experiment stores information about your ML training run: logs, images, metrics of interest (accuracy, loss), the program used to train the model, its input training data, and the conda dependencies used. Optional outputs are hyperparameters, a TensorBoard, and a Spark history server.


The logs of each hyperparameter trial are retrieved by clicking on its log, and TensorBoard visualizes the different trials results. The TensorBoard HParams plugin is also available to drill down further on the trials.

Tracking

When you run a Python or PySpark application on the Hopsworks platform, it can create an experiment that includes both the traditional information a program generates (results, logs, errors) as well as ML-specific information to help track, debug, and reproduce your program and its inputs and outputs:

  • hyperparameters: parameters for training runs that are not updated by the ML programs themselves; 
  • metrics: the loss or accuracy of the model(s) trained in this experiment;
  • program artifacts: python/pyspark/airflow programs, and their conda environments;
  • model artifacts: serialized model objects, model schemas, and model checkpoints;
  • executions: information to be able to re-execute the experiment, including parameters, versioned features for input, output files,  etc; 
  • versioned features: to be able to reproduce an experiment, we need the exact training/test data from the run and how it was created from the feature store;
  • visualizations: images generated during training and score. Also use TensorBoard to visualize training runs - Hopsworks aggregates results from all workers transparently;
  • logs (for debugging): model weights, gradients, losses, optimizer state;
  • custom metadata: tag experiments and free-text search for them, govern experiments (label as ‘PII’, ‘data-retention-period’, etc), and reproduce training runs.

Experiment Tracking and Distributed ML in One Library


-- CODE language-bash -- def train(data_path, max_depth, min_child_weight, estimators): X_train, X_test, y_train, y_test = build_data(..) ... print("hello world") # monkeypatched - prints in notebook ... model.fit(X_train, y_train) # auto-logging ... hops.export_model(model, "tensorflow",..,model_name) ... # create local files ‘logile.txt’, ‘diagram.png’ return {'accuracy': accuracy, 'loss': loss, 'logfile': 'logfile.txt', 'diagram': 'diagram.png'} # track dict from maggy import experiment experiment.lagom(train, name="My Experiment", ...) # To launch as a distributed ML HParam Tuning job: # sp=Searchspace(max_depth=('INTEGER',[2,8]),min_child_weight # =('INTEGER', [2, 8]), ) # experiment.lagom(train, name=“HP, optimizer='randomsearch', # direction='max', num_trials=15,)

Platforms that support experiment tracking require the user to refactor their training code in a function or some explicit scope (such as “with … as xx:” in MLFlow, see Appendix A) to identify when an experiment begins and when an experiment ends. In Hopsworks, we require the developer to write their training code inside a function. 

We call this Python function an oblivious training function because the function is oblivious of whether it is being run on a Python kernel in a Jupyter notebook or on many workers in a cluster, see our blog and Spark/AI summit talk for details. That is, you write your training code once and reuse the same function when training a small model on your laptop or when performing hyperparameter tuning or distributed training on a large cluster of GPUs or CPUs.

We double down on this “wrapper” Python function by also using it to start/stop experiment tracking. Experiment tracking and distribution transparency in a single function, nice! 

In Hopsworks, the Maggy library runs experiments, see code snippet above. As you can see, the only code changes a user needed compared to a best-practice TensorFlow program are: 

  1. factor the training code in a user-defined function (def train(..):);
  2. return a Python dict containing the results, images, and files that the user wants to be tracked for the experiment and accessible later in the Experiments UI; and
  3. invoke the training function using the experiment.lagom function.

The hyperparameters can be fixed for a single execution run, or as shown in the last 4 lines of the code snippet, you can execute the train function as a distributed hyperparameter tuning job across many workers in parallel (with GPUs, if needed). 

Hopsworks will automatically:

  • track all parameters of the train function as hyperparameters for this experiment, 
  • auto-log using Keras callbacks in model.fit;
  • create a versioned directory in HopsFS, where a copy of the program, its conda environment, and all logs from all workers are aggregated;
  • track all provenance information for this application - input data from HopsFS used in this experiment (train/test datasets from the Feature Store), and all output artifacts (models, model checkpoints, application logs);
  • redirect all print statements executed in workers to the Jupyter notebook cell for easier debugging (see GIF below - each print statement is prefixed by the worker ID).
In Hopsworks, logs from workers can be printed in your Jupyter notebook during training. Take that Databricks!

TensorBoard support

-- CODE language-bash -- def train(): from maggy import tensorboard ... model.fit(.., callbacks=[TensorBoard(log_dir=tensorboard.logdir(),..)], ...)

TensorBoard is arguably the most common and powerful tool used to visualize, profile and debug machine learning experiments. Hopsworks Experiments integrates seamlessly with TensorBoard. Inside the training function, the data scientist can simply import the tensorboard python module and get the folder location to write all the TensorBoard files. The content of the folder is then collected from each Executor and placed in the experiment directory in HopsFS. As TensorBoard supports showing multiple experiment runs in the same graph, visualizing and comparing multiple hyperparameter combinations becomes as simple as starting the TensorBoard integrated in the Experiments service. By default, Tensorboard is configured with useful plugins such as HParam, Profiler, and Debugging. 

Profiling and debugging

Hopsworks 1.4.0 comes with TensorFlow 2.3, which includes the TensorFlow profiler. A new long-awaited feature that finally allows users to profile model training to identify bottlenecks in the training process such as slow data loading or poor operation placement in CPU + GPU configurations. 

TensorFlow 2.3 also includes Debugger V2, making it easy to find model issues such as NaN which are non-trivial to find the root cause of in complex models.

Model Registry

In the training code models may be exported and saved to HopsFS. Using the model python module in the hops library, it is easy to version and attach meaningful metadata to models to reflect the performance of a given model version. 

The Hopsworks Model Registry, is a service where all models are listed in addition to useful information such as which user created the model, different versions, time of creation and evaluation metrics such as accuracy. 

The Model Registry provides functionality to filter based on the model name, version number and the user that exported the model. Furthermore the evaluation metrics of model versions can be sorted in the UI to find the best version for a given model. 

In the Model Registry UI, you can also navigate to the experiment used to train the model, and from there to the train/test data used to train the model, and from there to the features in the feature store used to create the train/test data. Thanks, provenance!

Exporting a model

A model can be exported programmatically by using the export function in the model module. Prior to exporting the model, the experiment needs to have written a model to a folder or to a path on HopsFS. Then that path is supplied to the function along with the name of the model and the evaluation metrics that should be attached. The export call will upload the contents of the folder to your Models dataset and it will also appear in the Model Registry with an incrementing version number for each export.

-- CODE language-bash -- from hops import model # local path to directory containing model (e.g. .pb or .pk) path = os.getcwd() + “/model_dir” # uploads path to the model repository, metadata is a dict of metrics model.export(path, “mnist”, metrics={‘accuracy’: acc})

Get the best model version

When deploying a model to real-time serving infrastructure or loading a model for offline batch inference, applications can query the model repository to find the best version based on metadata attached to the model versions - such as the accuracy of the model. In the following example, the model version for MNIST with the highest accuracy is returned.

-- CODE language-bash -- from hops import model F from hops.model import Metric MODEL_NAME=”mnist” EVALUATION_METRIC=”accuracy” best_model = model.get_best_model(MODEL_NAME, EVALUATION_METRIC, Metric.MAX) print(‘Model name: ‘ + best_model[‘name’]) print(‘Model version: ‘ + str(best_model[‘version])) print(best_model[‘metrics’])

The Devil is in the Details

That was the brief overview of Hopsworks Experiments and the Model Registry. You can now try it out on www.hopsworks.ai or install Hopsworks Community or Enterprise on any servers or VMs you can get your hands on. If you want to read more about how we implemented the plumbing, then read on.

Transparent Distributed ML with PySpark

Hopsworks uses PySpark to transparently distribute the oblivious training function for execution on workers. If GPUs are used by workers, Spark allocates GPUs to workers, and dynamic executors are supported which ensures that GPUs are released after the training function has returned, read more here. This enables you to keep your notebook open and interactively visualize results from training, without having to worry that you are still paying for the GPUs. 

The advantage of the Hopsworks programming model, compared to approaches where training code is supplied as Docker images such as AWS Sagemaker, is that you can write custom training code in place and debug it directly in your notebook. You also don’t need to write Dockerfiles for training code, and Python dependencies are managed by simply installing libraries using PIP or Conda from the Hopsworks UI (we compile the Docker images transparently for you).

The oblivious training function can run in different execution contexts: on a Jupyter notebook in a Python kernel (far left), for parallel ML experiments (middle), and for collective allreduce data parallel training (far right). Maggy and Hopsworks take care of complex tasks such as scheduling tasks, collecting results, and generating new hyperparameter trials.

HopsFS stores experiment data and logs generated by workers during training. When an experiment is started through the API, a subfolder in the Experiments dataset in HopsFS is created and metadata about the experiment is attached to the folder. Hopsworks automatically synchronizes this metadata to elasticsearch using implicit provenance. 

The metadata may include information such as the name of the experiment, type of the experiment, the exported model, and so on. As the existence of an experiment is tracked by a directory, it also means that deleting a folder also deletes the experiment as well as its associated metadata from the tracking service. 

Tracking metadata with Implicit Provenance

Existing systems for tracking the lineage of ML artifacts, such as TensorFlow Extended or MLFlow, require developers to change their application or library code to log tracking events to an external metadata store. 

In Hopsworks, we primarily use implicit provenance to capture metadata, where we instrument our distributed file system, HopsFS, and some libraries to capture changes to ML artifacts, requiring minimal code changes to standard TensorFlow, PyTorch, or Scikit-learn programs (see details in our USENIX OpML’20 paper). 

File system events such as reading features from a train/test dataset and saving a model to a directory implicitly recorded as metadata in HopsFS and then transparently indexed in Elasticsearch. This enables free-text search for ML artifacts, metadata, and experiments in the UI.

Experiments in Hopsworks are the first part of a ML training pipeline that starts at the Feature Store and ends at model serving. ML Artifacts (train/test datasets, experiments, models, etc) can be stored on HopsFS, and they can also have custom metadata attached to them. 

The custom metadata is tightly coupled to the artifact (remove the file, and its metadata is automatically cleaned up) - this is achieved by storing the metadata in the same scaleout metadata layer used by HopsFS. This custom metadata is also automatically synchronized to Elasticsearch (using a service called ePipe), enabling free-text search for metadata in Hopsworks.

That’s all for now Folks!

Of all the developer tools for Data Science, platforms for managing ML experiments have seen the most innovation in recent years. Open-source platforms have appeared, such as MLFlow and our Hopsworks platform, alongside proprietary SaaS offerings such as WandB, Neptune, Comet.ml, and Valohai. 

What makes Hopsworks Experiments different? You can write clean Python code and get experiment tracking and distributed ML for free with the help of implicit provenance and the oblivious training function, respectively. 

There is growing consensus that platforms should keep track of what goes in and out of ML experiments for both debugging and reproducibility. You can instrument your code to keep track of inputs/outputs, or you can let the framework manage it for you with implicit provenance. 

Hopsworks Experiments are a key component in our mission to reduce the complexity of putting ML in production. Further groundbreaking innovations are coming in the next few months in the areas of real-time feature engineering and monitoring operational models. Stay tuned!

Appendix A

In the code snippet below, we compare how you write a Hopsworks experiment with MLFlow. There are more similarities than differences, but explicit logging to a tracking server is not needed in Hopsworks.

-- CODE language-bash -- def train(data_path, max_depth, min_child_weight, estimators): X_train, X_test, y_train, y_test = build_data(..) ... print("hello world") # monkeypatched - prints in notebook ... model.fit(X_train, y_train) # auto-logging ... hops.export_model(model, "tensorflow",..,model_name) ... # create local files ‘logile.txt’, ‘diagram.png’ return {'accuracy': accuracy, 'loss': loss, 'logfile': 'logfile.txt', 'diagram': 'diagram.png'} # track dict from maggy import experiment experiment.lagom(train, name="My Experiment", ...) # To launch as a distributed ML HParam Tuning job: # sp=Searchspace(max_depth=('INTEGER',[2,8]),min_child_weight # =('INTEGER', [2, 8]), ) # experiment.lagom(train, name=“HP, optimizer='randomsearch', # direction='max', num_trials=15,)

-- CODE language-bash -- def train(data_path, max_depth, min_child_weight, estimators, model_name): # distribution external X_train, X_test, y_train, y_test = build_data(..) mlflow.set_tracking_uri("jdbc:mysql://username:password@host:3306/database") mlflow.set_experiment("My Experiment") with mlflow.start_run() as run: ... mlflow.log_param("max_depth", max_depth) mlflow.log_param("min_child_weight", min_child_weight) mlflow.log_param("estimators", estimators) with open("test.txt", "w") as f: f.write("hello world!") mlflow.log_artifacts("/full/path/to/test.txt") ... model.fit(X_train, y_train) # auto-logging ... mlflow.tensorflow.log_model(model, "tensorflow-model", registered_model_name=model_name)

Like MLFlow, but better?

Appendix B

Pipelines are the program that orchestrates the execution of an end-to-end training and model deployment job. In Hopsworks, you can run Jupyter notebooks as schedulable Jobs in Hopsworks, and these jobs can be run as part of an Airflow pipeline (Airflow also comes as part of Hopsworks). After pipeline runs, data scientists can quickly inspect the training results in the Experiments service. 

The typical steps that make up a full training-and-deploy pipeline include:

  • materialization of train/test data by selecting features from a feature store, 
  • model training on the train/test data and export the model to the Model Registry,
  • evaluation and validation of the model and if it passes robustness, bias, and accuracy tests, model deployment.

Unifying Single-host and Distributed Machine Learning with Maggy

>
6/26/2020
>
Moritz Meister

This blog covers the oblivious training function and the internals of Maggy presented at Spark+AI Summit 2020, on June 26th.

TLDR; Maggy is an open-source framework for distributed machine learning. In this post, we introduce a new unified framework for writing core ML training logic as “oblivious training functions”. Maggy enables you to reuse the same training code whether training small models on your laptop or reusing the same code to scale out hyperparameter tuning or distributed deep learning on a cluster. Maggy enables the replacement of the current waterfall development process for distributed ML applications, where code is rewritten at every stage, with an iterative development process.

Most of the publicly available ML source code for training models is not built to scale-out on many servers or GPUs. Getting started with deep learning is relatively easy these days, thanks to fast.ai, GitHub, and the blogosphere. The hard part for practitioners starts when the code examples found online need to be applied to more challenging domains, with larger and custom datasets, which in turn will require a bigger customized version of the model to fit that dataset. Using publicly available code as a starting point for model development on clusters, you will end up in a process similar to the one depicted in Figure 1.

Figure 1: A simplified view of the ML model development process, illustrating its iterative nature.

The software development process for ML models is rarely the perfect waterfall development model, as shown in Figure 1 without the green arrows. In the (discredited) waterfall development process, you would start out with requirements, then move on to design, implementation and test. The (current!) equivalent process in ML model development is the following, as shown in Figure 1 with the green arrows. You start out on your local machine with a subset of the data in order to explore and design the model architecture. Then you move to use a cluster of resources (such as GPUs) to more quickly find hyperparameters, run lots of parallel ablation studies (many skip this stage!), and finally scale out the training of the model on the large dataset using lots of resources. Then, you’re done, right? Wrong! You typically iterate through the stages, finding better hyperparameters, adding new features, rewriting for distribution, going from your laptop to the cluster and back again.

We rewrite our model training code for distribution as it offers many benefits – faster training of models using more GPUs, parallelizing hyperparameter tuning over many GPUs, and parallelizing ablation studies to help understand the behaviour and performance of deep neural networks. However, not only will the boiler plate model training code need to be modified, but as you move along the process, distribution will introduce additional obtrusive code artifacts and modifications, depending on the frameworks used. This will lead to a mix of infrastructure code and model code, with duplicated training logic, hyperparameters hard-coded into the training loop, additional tracking code to keep record of your changes and config files for experiments:

Figure 2: Model development creates a mix of code artefacts duplicating code for every step, making iterative development hard.

With such a code base, iterating becomes near impossible as it requires adapting many copies of redundant code. And finally, imagine handing the code off to an ML engineer to productionize the model.

The Oblivious Training Function

Figure 3: The oblivious training function makes training code reusable among all steps of the process.

We introduce an open-source framework, Maggy, that enables write-once training functions that can be reused in single-host Python programs and cluster-scale PySpark or Distributed TensorFlow programs. Training functions written with Maggy look like best-practice TensorFlow programs where we factor out dependencies using popular programming idioms (such as functions to generate models and data batches). We call this new abstraction for ML model development the oblivious training function, as the core model training logic supports distribution transparency, that is, the training code is not aware (oblivious) of whether it is being run on a single host or whether it is being executed on hundreds of devices in parallel. 

What does it mean for training code to be distribution transparent?

Transparency in distributed systems refers to hiding distribution-specific aspects of an application from the developer - for example, a developer invoking a function may not know (or need to know) if the function she is calling is local to her application or on a remote server. This means, distribution transparency enables developers to write code that is reusable between single-host and distributed instantiations of a program:

Figure 4: Distribution Transparency hides complexities related to distribution from the developer, making the same code executable on a single-host as well as in a large cluster. Transparency leads to DRY training code.

Building Blocks for Distribution Transparency

How does ML code have to be structured in order to be transparently distributed? Firstly, developers have to follow best practices and, secondly, developers must be aware of the difference between distribution contexts, that is, what characterizes, for example, distributed hyperparameter tuning vs. distributed training.

1. ML Development Best Practices:

The ML community has recently developed some best practices, which are already widely spread among developers. Taking a look at the new well-illustrated Keras Guides, you will notice a common approach with four techniques.

  • Modularize: By modularizing code into reusable functions, these functions become building blocks, making the code pluggable in order to construct different configurations of the model for hyperparameter optimization or ablation.
  • Parametrize: Instead of hardcoding parameters such as learning rate, regularization penalty or other hyperparameters, developers are encouraged to replace this with variables whenever possible, to have a single place for them to be changed.
  • Higher order training functions: instead of using instantiated objects for example for the training dataset, the input logic related to the data can be encapsulated in a function which is being used by a higher order function. By doing so also the data input pipeline can be parametrized. The same holds for the generation of the model, which can be encapsulated in a function returning the model.
  • Usage of callbacks at runtime: In order to be able to intercept and interact with the actual training loop, most ML frameworks such as TensorFlow and PyTorch offer the possibility to use callback functions that are  invoked by the framework at certain points in time during training, such as at the end of every epoch or batch. Callback functions  enable  runtime monitoring of training, and can, for example, also be used to add support to stop the training early (important in hyperparameter optimization).

2. Distribution Context

While a single-host environment is self-explanatory, there is a difference between the context of ML experiments, such as hyperparameter optimization or parallel ablation studies, and the distributed training of a single model. Both hyperparameter optimization and parallel ablation studies have weak scaling requirements (also known as embarrassingly parallel), because all workers execute independent pieces of work and have limited communication. For example, hyperparameter tuning involves training independent copies of the model with different hyperparameters or different architectures, in order to find the best performing configuration. Distributed training, however, is strong scaling, as it introduces  significant communication and coordination between the workers. As workers are training a single model, they continually exchange gradients, which are computed on independent shards of data (data parallel training). Many distributed training problems, in fact, become (network or disk) I/O bound as they scale.  Figure 5 illustrates the three contexts and the step in the model development process that they are applicable to.

Figure 5: Single-host vs. parallel multi-host vs. distributed multi-host context and their applicability to the steps of the process.

Being aware of the different contexts and applying popular programming idioms, it becomes apparent what it means for the oblivious training function. It is no longer the developer herself who instantiates and launches the training function, but the framework that will invoke the training function as it is aware of the current context and it will take care of the distribution related complexities. That means, for exploration, the framework can be used to fix all parameters. For hyperparameter optimization experiments, the framework will take care of generating potentially good hyperparameter combinations and parameterizing the oblivious training function with them to be launched on different workers. For distributed training, it means setting up the environment for workers to discover each other and wrapping the model code with a distribution strategy.

Figure 6: The oblivious training function as an abstraction allows us to let the system take care of distributed system related complexities.

Putting it all together

Having the building blocks at hand, how do we write the model training code in Maggy? Let us take a look at the latest best-practices MNIST example that already factors the model configuration, dataset preparation and training logic into functions. Building on this example, we will show the modifications to the code that are needed to construct an oblivious training function in Maggy. It is important to note that all modifications are still vanilla Python code, and can, therefore, be run as is on a single host environment. Let’s start with the boiler plate with the two functions and the training logic:

1. Model Definition

def get_model():
   model = tf.keras.Sequential([
       tf.keras.Input(shape=(784,)),
       tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
       tf.keras.layers.Conv2D(32, name='conv1', kernel_size=3, activation='relu'),
       tf.keras.layers.MaxPooling2D(pool_size=2),
       tf.keras.layers.Conv2D(64, name='conv2', kernel_size=2, activation='relu'),
       tf.keras.layers.MaxPooling2D(name='pool1', pool_size=2),
       tf.keras.layers.Flatten(),
       tf.keras.layers.Dropout(0.1),
       tf.keras.layers.Dense(128, activation='relu'),
       tf.keras.layers.Dense(10)
   ])
   return model

2. Data set generation

def get_dataset():
   batch_size = 32
   num_val_samples = 10000    # Return the MNIST dataset in the form of a `tf.data.Dataset`.
   (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()    # Preprocess the data (these are Numpy arrays)
   x_train = x_train.reshape(-1, 784).astype("float32") / 255
   x_test = x_test.reshape(-1, 784).astype("float32") / 255
   y_train = y_train.astype("float32")
   y_test = y_test.astype("float32")    # Reserve num_val_samples samples for validation
   x_val = x_train[-num_val_samples:]
   y_val = y_train[-num_val_samples:]
   x_train = x_train[:-num_val_samples]
   y_train = y_train[:-num_val_samples]
   return (
       tf.data.Dataset.from_tensor_slices((x_train, y_train))
              .shuffle(50000).repeat().batch(batch_size),
       tf.data.Dataset.from_tensor_slices((x_val, y_val)).
              .shuffle(10000).repeat().batch(batch_size),
       tf.data.Dataset.from_tensor_slices((x_test, y_test))
              .shuffle(10000).repeat().batch(batch_size),
   )

3. Training logic

model = get_model()
model.compile(
   optimizer=tf.keras.optimizers.Adam(),
   loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
   metrics=['accuracy', tf.keras.metrics.SparseCategoricalAccuracy()],
)# Train the model on all available devices.
train_dataset, val_dataset, test_dataset = get_dataset()
model.fit(train_dataset, epochs=2, steps_per_epoch=1000, validation_data=val_dataset, validation_steps=100)# Test the model on all available devices.
metrics = model.evaluate(test_dataset, steps=100)
metric_dict = {out: metrics[i] for i, out in enumerate(model.metrics_names)}

1. Model generation

We are parametrizing the model itself, by replacing hyperparameters with arguments.

Parametrizing the model definion

2. Dataset generation

The dataset generation function stays unchanged in this case, but similar to the model, this function could be parametrized

def get_dataset():
   batch_size = 32
   num_val_samples = 10000    # Return the MNIST dataset in the form of a `tf.data.Dataset`.
   (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()    # Preprocess the data (these are Numpy arrays)
   x_train = x_train.reshape(-1, 784).astype("float32") / 255
   x_test = x_test.reshape(-1, 784).astype("float32") / 255
   y_train = y_train.astype("float32")
   y_test = y_test.astype("float32")    # Reserve num_val_samples samples for validation
   x_val = x_train[-num_val_samples:]
   y_val = y_train[-num_val_samples:]
   x_train = x_train[:-num_val_samples]
   y_train = y_train[:-num_val_samples]
   return (
       tf.data.Dataset.from_tensor_slices((x_train, y_train))
              .shuffle(50000).repeat().batch(batch_size),
       tf.data.Dataset.from_tensor_slices((x_val, y_val)).
              .shuffle(10000).repeat().batch(batch_size),
       tf.data.Dataset.from_tensor_slices((x_test, y_test))
              .shuffle(10000).repeat().batch(batch_size),
   )

3. Training logic

The training logic is wrapped in a parametrized and pluggable function, the oblivious training function. Again, hyperparameters are passed as arguments to the function. Additionally, the dataset and model generation functions are replaced with arguments, in order to be able to let the system, for example, replace the dataset generator with an alternative one - we use this to drop features for ablation studies. Last, but not least, the training function should return its current performance as a metric to be optimized in hyperparameter optimization. This is needed to make Maggy aware of the desired optimization metric.

Adjust Training Logic to be callable with different parameters

Note that up to this point, all modifications are pure Python code and, hence, the training function can still be run in a single host environment by calling it yourself in a Notebook with a fixed set of parameters and by passing the model and dataset generation functions as arguments.

Finally, to execute the function in a different distribution context, Maggy is used:

from maggy import experiment
experiment.set_dataset_generator(gen_dataset)
experiment.set_model_generator(gen_model)# Hyperparameter optimization
experiment.set_context('optimization', 'randomsearch', searchspace)
result = experiment.lagom(train_fun)
params = result.get('best_hp')# Distributed Training
experiment.set_context('dist_training', 'MultiWorkerMirroredStrategy', params)
experiment.lagom(train_fun)# Ablation study
experiment.set_context('ablation', 'loco', ablation_study, params)
experiment.lagom(train_fun)

Maggy requires additional configuration information for hyperparameter optimization, such as a search space definition and the optimization strategy to be used. In the case of distributed training, the distribution strategy is needed as well as a set of parameters to fix the model to. These parameters can either be taken from the previous hyperparameter tuning experiments or input manually. Lagom is the API to launch the function on a Spark cluster. 

Future Work

You can try out Maggy for hyperparameter optimization or ablation studies now on Hopsworks.ai and keep an eye on Maggy's GitHub repo for the oblivious training function to be released as a pure Spark version or wait until the next release of Hopsworks, that will include full support. Maggy is still a project under heavy development and our mission with Maggy is to provide a new way of writing machine learning applications that reduces the burden on Data Scientists becoming distributed systems experts. By following the best practices we are able to keep the high-level APIs of frameworks like Keras and PyTorch free of distribution obtrusive code. 

Summary

In this blog, we introduced a new feature to an open-source framework, Maggy, that enables write-once training functions that can be reused in single-host Python programs and cluster-scale PySpark programs. Training functions written with Maggy look like best-practice TensorFlow programs where we factor out dependencies using popular programming idioms (such as functions to generate models and data batches). In a single Jupyter notebook, developers can mix vanilla Python code to develop and test models on their laptop with PySpark-specific cells that can be run when a cluster is available using a PySpark kernel, such as Sparkmagic. This way, iterative development of deep learning models now becomes possible, moving from the laptop to the cluster and back again, with DRY code in the training function – as all phases reuse the same training code.

Watch our demo presented at the Spark+AI Summit 2020

References

Meister et al. (2020). Towards Distribution Transparency for Supervised ML With Oblivious Training Functions. Published in the MLOps workshop of MLSys’20.

Hello Asynchronous Search for PySpark, published on October 19, 2019.

Manage your own Feature Store on Kubeflow with Hopsworks

>
6/15/2020
>
Jim Dowling

Feature stores are key components in enterprises’ machine learning/artificial intelligence architectures. In previous blog posts, we introduced the feature store, MLOps with a feature store, and how to use the Hopsworks Feature Store in Databricks and AWS Sagemaker. In this blog post, we focus on how to integrate Kubeflow with the Hopsworks Feature Store. Hopsworks is available as an open-source platform, but integrations with external platforms, like Kubeflow, are only available on either our SaaS (www.hopsworks.ai) or Enterprise versions.

Hopsworks is a modular platform that includes a feature store, a compute engine ((Py)Spark, Python, Flink), a data science studio, a file system (HopsFS/S3), and model serving/monitoring support. The Hopsworks Feature Store can be used as a standalone feature store by Kubeflow.  As Kubernetes has limited support for Spark (Spark-on-k8s still has problems with shuffle, as of June 2020), Hopsworks is often used for both its feature store and Spark and scale-out deep learning capabilities. 

Hopsworks offers a centralized platform to manage, govern, discover, and use features. Features can be used:

  • at scale to create train/test datasets, 
  • for model scoring in analytical (batch application) models, 
  • at low latency by operational models to enrich feature vectors.

Get Started

Before you begin, make sure you have started a Hopsworks cluster. If you are on AWS, we recommend using our managed platform Hopsworks.ai. If you are on GCP, Azure or on-premises, as of June 2020, you have to use the hopsworks-installer script to install Hopsworks.

Hopsworks should be installed so that Kubeflow has access to the private IPs of the feature store services: Hive Server 2 and the Hopsworks REST API endpoint. On GCP, this means you should install your Hopsworks cluster in the same Region/Zone as your Kubeflow cluster. Similarly, on AWS, your Hopsworks cluster should be installed in the same VPC/subnet as Kubeflow. And on Azure, your Hopsworks cluster should be installed in the same resource group as your Kubeflow cluster.

API Key

From a Jupyter notebook in Kubeflow, you need to be able to authenticate and interact with the Hopsworks Feature Store. As such you need to get an API key from Hopsworks. You can generate an API key by clicking on your username in the top right of the window, click on Settings and select API KEY.


You need to choose the featurestore, jobs, and project scopes when creating your API key. You should save your API key to a file, with the path API_KEY_FILE, that will be accessible from your Jupyter notebook in Kubeflow. 

Hopsworks-cloud-sdk

With the API key configured correctly, in your KubeFlow Jupyter notebook, you should be able to install the hopsworks-cloud-sdk library (https://pypi.org/project/hopsworks-cloud-sdk/) using PIP:

>>> !pip install hopsworks-cloud-sdk ~= 1.3

Make sure that the hopsworks-cloud-sdk library version matches the installed version of Hopsworks.

Establish the first connection


From your Jupyter notebook, the API Key should now be readable from the local file system at the path API_KEY_FILE, and the hopsworks-cloud-sdk library should be installed. You should be now able to establish a connection to the feature store, and start using the Hopsworks - Kubeflow integration with this connect call:

import hops.featurestore as fs
fs.connect(
'my_instance',   # Private IP/DNS of your Feature Store instance
'my_project',   # Name of your Hopsworks Feature Store project
secrets_store=file,
api_key_file=API_KEY_FILE
)

Upcoming improvements

Several exciting improvements are coming to the Hopsworks feature store APIs in the next couple of weeks. The most important one is a more expressive API for joining features together. The new API is heavily inspired by Pandas dataframe joining and should make life easier for data scientists. Moreover, we are adding the capability to register a small Pandas dataframe as a feature group directly from a python-kernel in a Jupyter notebook. It will also be possible to ingest Pandas dataframes as feature groups without the need for PySpark.

Learn more with our demo!


Follow us on Twitter

Like us on Github

How to Build your own Feature Store

>
5/26/2020
>
Jim Dowling

As of May 2020, Logical Clocks are the only vendor of a Feature Store for machine learning (ML) and the only maker of a fully open-source and cloud-native Feature Store for ML. As such, we have many conversations with companies and organizations who are deciding between building their own feature store and buying one. Given the increasing interest in building feature stores, we thought we would share our experience of building one and motivate some of the decisions and choices we took (and did not take) to help others who are considering following us down the same path. 

8 Benefits of a Feature Store for ML

Here we list some of the reasons for wanting a feature store for ML in the first place.

1. Consistent Feature Engineering for Training Data and Serving 

The feature store can eliminate the need for two different implementations for features: one when training a model and one when serving a model. With a feature store, you can have a single feature pipeline that computes the features and stores them into both an offline and online stores for use in both training models and serving models, respectively. The offline feature store needs to support large volumes of data for model training and for use by batch (analytical) applications for model scoring. The online store needs to support low latency access to features for models served in production.

2. Encourage Feature Reuse

Features should be reused between different models. In Twitter, they have a “sharing adoption” metric to evaluate the success of their feature store internally. The “sharing adoption” measures the number of teams that reuse in production models features created by other teams.

3. System support for Serving of Features 

Operational models often need low latency access to features that may be computationally complex or generated from historical data. These types of features are often difficult or impossible to compute inside the applications themselves, either because of the lack of available data or because of the excessive time required to compute the features. The feature store can solve this problem by acting as a low-latency store for precomputed features for operational models (used by online applications). 

4. Exploratory Data Analysis with a Feature Store

Data scientists can discover the available pre-computed features, the types of those features (numerical, categorical), their descriptive statistics , and the distribution of feature values. They can also view a small sample of feature values to help quickly identify any potential issues with using a given feature in a model.

5. Temporal Queries for Features

You would like to augment feature store queries with temporal logic. That is, you want to know the value of a given feature at:

  • an instant in time (for example, when joining features together from different feature groups);
  • a time interval - a length of time (for example, the last 3 months);
  • a time period: an anchored duration of time (for example, training data for years 2012-2018, test data for 2019). 

In relational databases, temporal queries are typically supported by a user-defined table that keeps a full history of data changes and allows easy point in time analysis, such as SQL Server. In scalable SQL systems, examples of columnar storage formats that support temporal queries are Apache Hudi and Delta Lake as well as streaming support in Apache Flink. These systems require increased storage (to store updates as well as the current values) for the ability to query the value of features at points in time, time intervals or time periods.

6. Security, Governance, and Tracking

The feature store is a central repository for an organization’s features enabling them to be access controlled, governed, and to have their usage tracked. The feature store also provides common standards for metadata, consistent documentation and coding standards for features. The repository can maintain popularity counts for features to show which ones are widely used which ones could potentially be removed, enabling better management of features.

7. Reproducibility for Training Datasets 

The feature store should enable the re-creation of training datasets for given points-in-time to enable the reproducibility of models. An alternative to recreating training datasets is to archive them, but for many industries, such as healthcare and insurance, they will need to be reproducible for regulatory reasons. The ability to recreate training datasets for models is also useful for debugging models, even when you are not required by law to keep the training dataset.

8. Data Drift for Model Serving

The feature store can compute and store statistics over training datasets and make those statistics available to model serving platforms via API calls. In operational models, the training data statistics can then be compared with statistics computed over time windows of live data (last minute, hour, day) sent to the model for predictions. Simple (or complex) statistical tests can identify data drift - when live feature values diverge significantly from the model’s training data. 

Feature Store Design Flow Chart

The feature store design flow chart below shows some of the decisions that need to be taken when deciding to roll your own feature store. This set of decisions is obviously not complete, and we omitted systems design issues that are commonly found in any scalable system, such as schema design, API design, language support, and platform support (on-premises, cloud-native).

The first decision point is whether you really want to go ahead and build a data platform to support your ML efforts, knowing the considerable time it will take (at least 6-12 months before you will have anything production-ready)  and the future costs of maintaining and updating your platform. If you decide that building a feature store is too much, then give us a shout - mention that Jim sent you and ask for the blog reader discount :)

If you still are determined to build one, you need to take your first big decision: is your feature store a library or a store (a materialized cache of your features)? If the only problem you want to solve is consistent features between training and serving, then the library approach may be suitable. In the library approach, you write your feature encoding functions in versioned libraries that are included in both training and serving pipelines. A downside to this is that both training and serving pipelines need to be implemented in the same (or compatible) programming languages. Another downside is that you may need to wait a long time to backfill training data, as you need to run a job to compute the features. Netflix’s early fact/feature store introduced shared, versioned feature encoders to ensure consistent feature engineering.

Your next decision is whether you want to reuse features across different models or not. Reusing features means you will need to join normalized features together to create training datasets and also when serving features. If you decide you do not want to reuse features, you will still be able to solve the problems of consistent feature engineering and system support for serving features. There are feature stores, like Condé Nest based on Cassandra, that have a single datastore used for storing training and serving features. 

The next decision is consequential if you are only working with analytical models and not online (operational) models, you might decide you only need a single (scalable) database to store your features. Assuming you are a typical user who needs the feature store for both training and serving models, you then need to decide if you need support for time-travel queries. Many online models with windowed features (how many times a user did ‘X’ in the last 15 minutes) often need time-travel support to ensure consistent features for feature serving and creating training data. Assuming you decide to add time-travel support, you now need to build on a system with, or add application support for, temporal queries.

Your next decision also depends on your choice of data stores for the offline (scalable SQL) and online (low latency feature serving) stores. In Hopsworks, uniquely, we have a unified metadata layer for our file system, HopsFS, our feature serving layer, MySQL Cluster, and our scalable SQL layer, Apache Hive (-on-Hops). We could easily add extended metadata to our unified metadata layer to describe features, their statistics, and tag them. A CDC (change-data-capture) API to our unified metadata layer enabled us to also index feature descriptions in Elasticsearch, which supports free-text search and searches do not affect the performance of the unified metadata service. The bad news if you do not have unified metadata for your dual databases is that you need to design and develop agreement protocols to ensure that the 3 different systems are kept in sync, presenting a consistent view of your features: your offline, online, and feature metadata platforms. This is a tough distributed systems engineering challenge - good luck!

Finally, you need to decide on a compute engine (maybe internal or external to your platform) for both joining features to create training datasets, but also to compute your features. You may decide on a domain-specific language (like Michelangelo or Zipline) or a more general purpose language or framework like Python (Feast) or Spark (Hopsworks).

Apart from all these issues, you also need to decide on whether you need a UI to discover and manage features (Hopsworks, Michelangelo, Twitter, Zipline) or not (Feast). You also need to decide on whether you need to support multiple feature stores (such as development, production, and sensitive feature stores), and access control to those different feature stores (as in Hopsworks).

Phew! Finally you have navigated some of the decisions that you need to make to tailor your feature store platform for your needs. Although we have not covered the problem of API design to your feature store, we can confide that most feature store platforms have gone through more than one generation of API (we were not immune to that, either). So, good luck, give us a call if you decide to buy rather than build.

Follow us on Twitter

Like us on Github

Hopsworks Feature Store for Databricks

>
4/23/2020
>
Fabio Buso


TLDR; Feature Stores have become the key piece of data infrastructure for machine learning platforms. They manage the whole lifecycle of features: from training different models to providing low-latency access to features by online-applications for model inference. This article introduces the Hopsworks Feature Store for Databricks, and how it can accelerate and govern your model development and operations on Databricks.

What is a Feature Store?

The Feature Store for machine learning is a feature computation and storage service that enables features to be registered, discovered, and used both as part of ML pipelines as well as by online applications for model inferencing. Feature Stores are typically required to store both large volumes of feature data and provide low latency access to features for online applications. As such, they are typically implemented as a dual-database system: a low latency online feature store (typically a key-value store or real-time database) and a scale-out SQL database to store large volumes of feature data for training and batch applications. The online feature store enables online applications to enrich feature vectors with near real-time feature data before performing inference requests.  The offline feature store can store large volumes of feature data that is used to create train/test data for model development or by batch applications for model scoring. The Feature Store solves the following problems in ML pipelines: 

  • reuse of feature pipelines by sharing features between teams/projects;
  • enables the serving of features at scale and with low latency for online applications;
  • ensures the consistency of features between training and serving - features are engineered once and can be cached in both the Online and Offline Feature Stores;
  • ensures point-in-time correctness for features - when a prediction was made and an outcome arrives later, we need to be able to query the values of different features at a given point in time in the past. 


The Feature Store for ML consists of both an Online and Offline database and Databricks can be used to transform raw data from backend systems into engineered features cached in the online and offline stores. Those features are made available to online and batch applications for inferencing and for creating train/test data for model training.

Engineer Features in Databricks, publish to the Feature Store

The process for ingesting and featurizing new data is separate from the process for training models using features that come from potentially many different sources. That is, there are often differences in the cadence for feature engineering compared to the cadence for model training. Some features may be updated every few seconds, while others are updated every few months. Models, on the other hand, can be trained on demand, regularly (every day or every week, for example), or when monitoring shows a model’s performance has degraded. Feature engineering pipelines are typically triggered at regular intervals when new data arrives or on-demand when source code is pushed to git because changes were made in how features are engineered.


Feature pipelines have a natural cadence for each data source, and the cached features can be reused by many downstream model training pipelines. Feature Pipelines can be developed in Spark or Pandas applications that are run on Databricks. They can be combined with data validation libraries like Deequ to ensure feature data is correct and complete.


The feature store enables feature pipelines to cache feature data for use by many downstream model training pipelines, reducing the time to create/backfill features. Groups of features are often computed together and have their own natural ingestion cadence, see figure above. Real-time features may be updated in the online feature store every few seconds using a streaming application, while batch features could be updated hourly, daily, weekly, or monthly.

In practice, feature pipelines are data pipelines, where the output is cleaned, validated, featurized data. As there are typically no guarantees on the correctness of the incoming data, input data must be validated and any missing values must be handled (often by either imputing them or ignoring them). One popular framework for data validation with Spark is AWS Deequ, as they allow you to extend traditional schema-based support for validating data (e.g., this column should contain integers) with data validation rules for numerical or categorical values. For example, while a schema ensures that a numerical feature is of type float, additional validation rules are needed to ensure those floats lie within an expected range. You can also check to ensure a columns’ values are unique, not null, that its descriptive statistics  are within certain ranges. Validated data is then transformed into numeric and categorical features that are then cached in the feature store, and subsequently used both to train models and for batch/online model inferencing.


import hsfs
# “prod” is the production feature store

conn = hsfs.connection(host="ea2.aws.hopsworks.ai", project="prod")
featurestore = conn.get_feature_store()

# read raw data and use Spark to engineer features
raw_data_df =  spark.read.parquet('/parquet_partitioned')
polynomial_features = raw_data_df.map(lambda x: x^2)

# Features computed together in a DataFrames are in the same feature group
fg = featurestore.create_feature_group(name='fg_revenue',
                                       version=1,                         
                                       type='offline')

fg.create(polynomial_features)
g.compute_statistics()

In this code snippet, we connect to the Hopsworks Feature Store, read some raw data into a DataFrame from  a parquet file, and transform the data into polynomial features. Then, we create a feature group, it’s version is ‘1’ and it is only to be stored in the ‘offline’ feature store. Finally, we ingest our new polynomial_dataframe into the feature group, and compute statistics over the feature group that are also stored in the Hopsworks Feature Store. Note that Pandas DataFrames are supported as well as Spark DataFrames, and there are both Python and Scala/Java APIs.

When a feature store is available, the output of feature pipelines is cached feature data, stored in the feature store. Ideally, the destination data sink will have support for versioned data, such as in Apache Hudi in Hopsworks Feature Store. In Hopsworks, feature pipelines upsert (insert or update) data into existing feature groups, where a  feature group is a set of features computed together (typically because they come from the same backend system and are related by some entity or key). Every time a feature pipeline runs for a feature group, it creates a new commit in the sink Hudi dataset. This way, we can track and query different commits to feature groups in the Feature Store, and monitor changes to statistics of ingested data over time.

You can find an example notebook for feature engineering with PySpark in Databricks and registering features with Hopsworks here.


Model Training Pipelines in Databricks start at the Feature Store 

Model training pipelines in Databricks can read in train/test data either directly as Spark Dataframes from the Hopsworks Feature Store or as train/test files in S3 (in a file format like .tfrecords or .npy or .csv or .petastorm). Notebooks/jobs in Databricks can use the Hopsworks Feature Store to join features together to create such train/test datasets on S3.


Model training with a feature store typically involves at least three stages:

  1. select the features from feature groups and join them together to build a train/test dataset. You may also here want to filter out data and include an optional timestamp to retrieve features exactly as they were at a point of time in the past;
  2. train the model using the training dataset created in step 1 (training can be further decomposed into the following steps: hyperparameter optimization, ablation study, and model training);
  3. validate the model using automated tests and deploy it to a model registry for batch applications and/or an online model server for online applications.

import hsfs
conn = hsfs.connection(host="ea2.aws.hopsworks.ai", project="prod")
featurestore = conn.get_feature_store()

# get feature groups from which you want to create a training dataset
fg1 = featurestore.get_feature_group('fg_revenue', version=1)
fg2 = featurestore.get_feature_group('fg_users', version=2)
# lazily join features

joined_features = fg1.select_all() \
                  .join(fg2.select(['user_id', 'age']), on='user_id')

sink = featurestore.get_storage_connector('S3-training-dataset-bucket')

td = featurestore.create_training_dataset(name='revenue_prediction',
                    version=1,
                    data_format='tfrecords',
                    storage_connector=sink,
                    split={'train': 0.8, 'test': 0.2})

td.seed = 1234
td.create(joined_features)

Data Scientists are able to rely on the quality and business logic correctness in published features and can therefore quickly export and create training datasets in their favourite data format.
You can find an example notebook for getting started with creating train/test datasets from Hopsworks in Databricks here.

Deploying the Hopsworks Feature Store for Databricks

The Hopsworks Feature Store is available as a managed platform for AWS (Hopsworks.ai) and as an Enterprise platform for Azure.

Hopsworks.ai for AWS

Hopsworks.ai is our new managed platform for the Hopsworks Feature Store on AWS. In its current version, it will deploy a Hopsworks Feature Store into your AWS account. From Hopsworks.ai, you can stop/start/backup your Hopsworks Feature Store.


The details for how to launch a Hopsworks Feature Store inside an existing VPC/subnet used by Databricks are found in our documentation. The following figures from Hopsworks.ai show you how you have to pick the same Region/VPC/Zone used by your Databricks cluster when launching Hopsworks.


You also need to expose the Feature Store service for use by Databricks, see the figure below. 


For some Enterprises, an alternative to deploying Hopsworks in the same VPC as Databricks is VPC peering. VPC peering requires manual work, and you can contact us for help in VPC peering.

Enterprise Hopsworks for Databricks Azure

On Azure, by default, Databricks is deployed to a locked resource group with all data plane resources, including a virtual network (VNet) that all clusters will be associated with. However, with VNet injection, you can deploy Azure Databricks into the same virtual network where the Hopsworks Feature Store is deployed. Contact us for more details on how to install and setup VNet injection for Azure with Hopsworks Feature Store. An alternative to VNet injection is VPC, and you can contact us for help in VPC peering.

Learn more

Summary

A new key piece of infrastructure for machine learning has now arrived for Databricks users - the Hopsworks Feature Store. It enables you to centralize your features for ML for easier discovery and governance, it enables the reuse of features in different ML projects, and provides a single pipeline or engineering features for both training and inference. The Hopsworks Feature Store is available today as either a managed platform or AWS, so you can spin up a cluster in just a few minutes, or as an Enterprise platform for either AWS or Azure. 

References

Follow us on Twitter

Star us on Github

ExtremeEarth scales AI to the Earth Observation Community with Hopsworks

>
4/15/2020
>
Theofilos Kakantousis

This article was originally published at Extreme Earth.

In recent years, unprecedented volumes of data are generated in various domains. Copernicus, a European Union flagship programme, produces more than three petabytes(PB) of Earth Observation (EO) data annually from Sentinel satellites [1]. This data is made readily available to researchers that are using it, among other things, to develop Artificial Intelligence (AI) algorithms in particular using Deep Learning (DL) techniques that are suitable for Big Data. One of the greatest challenges that researchers face however, is the lack of tools that can help them unlock the potential of this data deluge and develop predictive and classification AI models.

ExtremeEarth is an EU-funded project that aims to develop use-cases that demonstrate how researchers can apply Deep Learning in order to make use of Copernicus data in the various EU Thematic Exploitation Platforms (TEPs). A main differentiator of ExtremeEarth to other such projects is the use of Hopsworks, a Data-Intensive AI software platform for scalable Deep Learning. Hopsworks is being extended as part of ExtremeEarth to bring specialized AI tools for EO data and the EO data community in general.


Hopsworks, Earth Observation Data and AI in one Platform

Hopsworks is a Data-Intensive AI platform which brings a collaborative data science environment to researchers who need a horizontally scalable solution to developing AI models using Deep Learning. Collaborative means that users of the platform get access to different workspaces, called projects, where they can share data and programs with their colleagues, hence improving collaboration and increasing productivity. The Python programming language has become the lingua franca amongst data scientists and Hopsworks is a Python-first platform, as it provides all the tools needed to get started programming with Python and Big Data. Hopsworks integrates with Apache Spark and PySpark, a popular distributed processing framework. 

Hopsworks brings to the Copernicus program and the EO data community essential features required for developing aI applications at scale, such as distributed Deep Learning with Graphics Processing Units (GPUs) on multiple servers, as demanded by the Copernicus volumes of data. Hopsworks provides services that facilitate conducting Deep Learning experiments, all the way from doing feature engineering with the Feature Store [2], to developing Deep Learning models with the Experiments and Models services that allow them to manage and monitor Deep Learning artifacts such as experiments, models and automated code-versioning and much more [3]. Hopsworks storage and metadata layer is built on top of HopsFS, the award-winning highly scalable distributed file system [4], which enables Hopsworks to meet the extreme storage and computational demands of the ExtremeEarth project.

Hopsworks brings horizontally scalable Deep Learning for EO data close to where the data lives, as it can be deployed on Data and Information Access Services (DIAS) [5]. The latter provides centralised access to Copernicus data and information which combined with the AI for EO data capabilities that Hopsworks brings, an unparalleled data science environment is made available to researchers and data scientists of the EO data  community.


Challenges of Deep Learning with EO Data


Recent years have witnessed the performance leaps of Deep Learning (DL) models thanks to the availability of big datasets (e.g. ImageNet) and the improvement of computation capabilities (e.g., GPUs and cloud environments). Hence, with the massive amount of data coming from earth observation satellites such as the Sentinel constellation, DL models can be used for a variety of EO-related tasks. Examples of these tasks are sea-ice classification, monitoring of water flows, and calculating vegetation indices.

However, together with the performance gains comes many challenges for applying DL to EO tasks, including, but not limited to:


Labeled datasets for training; 

While collecting raw Synthetic-Aperture Radar (SAR) images from the satellites is one thing, labeling those images to make them suitable for supervised DL is yet a time consuming task. Should we seek help from unsupervised or semi-supervised learning approaches to eliminate the need for labeled datasets? Or should we start building tools to make annotating the datasets easier?

Interpretable and Human-understandable models or EO tasks;

Given enough labeled data, we can probably build a model with satisfactory performance. But how can we justify the reasons behind why the model makes certain predictions given certain inputs? While we can extract the intermediate predictions for given outputs, can we reach interpretations that can be better understood by humans?

Management of very large datasets;

Managing terabytes (TB) of data that can still fit into a single machine is one thing, but managing petabytes (PB) of data that requires distributed storage and provides a good service for the DL algorithms so as not to slow down the training and serving process is a totally different challenge. To further complicate the management, what about partial failures in the distributed file system? How shall we handle them? 

Heterogeneous data sources and modalities (e.g., SAR images from satellites, sensor readings from ground weather stations); 

How can we build models that effectively use multi-modalities? For example, how can we utilize the geo-location information in an image classification model?

DL architectures and learning algorithms for spectral, spatial, and temporal data;

While we might be able to perform preprocessing and design model architectures for RGB image classification, how do these apply to SAR images? Can we use the same model architectures? How to extract useful information from multi-spectral images? 

Training and fine-tuning (hyperparameter optimizations) of DL models;

Hyperparameters are those parameters of the training process (e.g., the learning rate of the optimizer, or the size of the convolution windows) that should be manually set before training. How can we effectively train models and tune the hyperparameters? Should we change the code manually? Or can we use frameworks to provide some kind of automation?

The real time requirements for serving DL models;

Once the training is done, we want to use our trained model to predict outcomes based on the newly observed data. Often these predictions have to be made in real-time or near-real-time to make quick decisions. For example, we want to update the ice charts of the shipping routes every hour. How to serve our DL models online to meet these real-time requirements?


Deep Learning Pipelines for EO Data with Hopsworks

A Data Science application in the domain of Big Data typically consists of a set of stages that form a Deep Learning pipeline. ُThis pipeline is responsible for managing the lifecycle of data that comes into the platform and is to be used for developing machine learning models. In the EO data domain in particular, these pipelines need to scale to the petabyte-scale data that is available within the Copernicus program. Hopsworks provides data scientists with all the required tools to build and orchestrate each stage of the pipeline, depicted in the following diagram.

 In detail, a typical Deep Learning pipeline would consist of:

  • Data Ingestion: The first step is to collect and insert data into the AI platform where the pipeline is to be run. A great variety of data sources can be used such as Internet of Things (IoT) devices, web-service APIs etc. In ExtremeEarth, the data typically resides on the DIAS which can be directly accessed from Hopsworks.
  • Data Validation: Tools such as Apache Spark that can cope with Big Data are typically employed to validate incoming data that is to be used in later stages. For example data might need to be parsed and cleaned up from duplicate or missing values or a simple transformation of an alphanumeric field to a numeric one might be needed.
  • Feature Engineering: Before making use of the validated data to develop DL models, the features that will be used to develop such  models need to be defined, computed and persisted. Hopsworks Feature Store is the service that data engineers and data scientists use for such tasks, as it provides rich APIs, scalability and elasticity to cope with varying data volumes and complex data types and relations. For example users can create groups of features or compute new features such as aggregations of existing ones. 
  • Model development (Training): Data scientists can greatly benefit from a rich experiment API provided by Hopsworks to run their machine learning code, whether it be TensorFlow, Keras, PyTorch or another framework with a Python API. In addition, Hopsworks manages GPU allocation across the entire cluster and facilitates distributed training which involves making use of multiple machines with multiple GPUs per machine in order to train bigger models and faster [9]
  • Model Serving & Monitoring: Typically the output of the previous stage is a DL model. To make use of it, users can submit inference requests by using the Hopsworks built-in elastic model serving infrastructure for TensorFlow and scikit-learn, two popular machine learning frameworks. Models can also be exported in the previous pipeline and downloaded from Hopsworks directly to be embedded into external applications, such as iceberg detection and water availability detection in food crops. Hopsworks also provides infrastructure for model monitoring, that is continuously monitoring the requests being submitted to the model and its responses and users can then apply their own business logic on which actions to take depending on how the monitoring metrics output changes over time.



Example Use Case: Iceberg Classification with Hopsworks


Drifting icebergs pose major threats to the safety of navigation in areas where icebergs might appear, e.g., the Northern Sea Route and North-West Passage. Currently, domain experts manually conduct what is known as an “ice chart” on a daily basis, and send it to ships and vessels. This is a time-consuming and repetitive task, and automating it using DL models for iceberg classification would result in generation of more accurate and more frequent ice charts, which in turn leads to safer navigation in concerned routes.

Iceberg classification is concerned with telling whether a given SAR image patch contains an iceberg or not. Details of the classification depends on the dataset that will be used. For example, given the Statoil/C-CORE Iceberg Classifier Challenge dataset [6], the main task is to train a DL model that can predict whether an image contains a ship or an iceberg (binary classification). 

Satellite radar images from the Statoil/C-CORE Iceberg Classifier Challenge [6]


The steps we took to develop and serve the model were the following:

  • First step is preprocessing. We read the data which is stored in JSON format and create a new feature which is the average of the satellite image bands.
  • Second step is inserting the data into the Feature Store which provides APIs for managing feature groups and creating training and test datasets. In this case, we created the training and test datasets in TFRecord format after scaling the images as we are using TensorFlow for training. 
  • Third step is building and training our DL model on Hopsworks. Since the dataset is not very complicated and we have a binary classification task, using a DL model that is very similar to LeNet-5 [7] yields 87% accuracy on the validation set after 20 epochs of training which takes 60 seconds to train on a Nvidia GTX1080. This step also includes hyperparameter tuning. Ablation studies, in which we remove different components (e.g., different convolutional layers, or dataset features) can also be employed to gain more insights about the model. Hopsworks provides efficient and easy support for hyperparameter tuning and ablation studies through a Python-based framework called Maggy [8]. Finally, to further increase the training speed, the distributed training strategy provided in Hopworks can be used.

The final step is to export and serve the model. Model is exported and saved into the Hopsworks “Models” dataset. Then we use the Hopsworks elastic model serving infrastructure to host TensorFlow serving which can scale with the number of inference requests.

Conclusion

In this blog post we described how the ExtremeEarth project brings new tools and capabilities with Hopsworks to the EO data community and the Copernicus program. We also showed how we have developed a practical use case by using Copernicus data and Hopsworks. We keep developing Hopsworks to make it even more akin to the tools and processes used by researchers across the entire EO community and we continue development of our use cases with more sophisticated models using even more advanced distributed Deep Learning training techniques. 


References

  1. https://workshop.copernicus.eu/sites/default/files/content/attachments/ajax/copernicus_overview.pdf 
  2. https://www.logicalclocks.com/blog/feature-store-the-missing-data-layer-in-ml-pipelines
  3. https://hopsworks.readthedocs.io/en/latest/
  4. https://www.arcos.inf.uc3m.es/ccgrid2017/calls/10th-ieee-international-scalable-computing-challenge-scale-2017/
  5. https://www.copernicus.eu/en/access-data/dias
  6. https://www.kaggle.com/c/statoil-iceberg-classifier-challenge/data 
  7. LeCun, Yann. "LeNet-5, convolutional neural networks." URL: http://yann. lecun. com/exdb/lenet 20 (2015): 5.
  8. https://maggy.readthedocs.io/en/latest/
  9. https://www.logicalclocks.com/blog/when-deep-learning-with-gpus-use-a-cluster-manager

MLOps with a Feature Store

>
2/14/2020
>
Fabio Buso

TLDR; If AI is to become embedded in the DNA of Enterprise computing systems, Enterprises must first re-align their machine learning (ML) development processes to include data engineers, data scientists and ML engineers in a single automated development, integration, testing, and deployment pipeline. This blog introduces platforms and methods for continuous integration (CI), continuous delivery (CD), and continuous training (CT) with machine learning platforms, with details on how to do CI/CD machine learning operations (MLOps) with a Feature Store. We will see how the Feature Store refactors the monolithic end-to-end ML pipeline into a feature engineering and a model training pipeline.

What is MLOps?

MLOps is a recent term that describes how to apply DevOps principles to automating the building, testing, and deployment of ML systems. The Continuous Delivery Foundation’s SIG-MLOps defines MLOps as  “the extension of the DevOps methodology to include Machine Learning and Data Science assets as first class citizens within the DevOps ecology”. MLOps aims to unify ML application development and the operation of ML applications, making it easier for teams to deploy better models more frequently. Martinfowler.com defines MLOps as:
“a software engineering approach in which a cross-functional team produces machine learning applications based on code, data, and models in small and safe increments that can be reproduced and reliably released at any time, in short adaptation cycles.”

Some of the major challenges of MLOps, compared to Devops, is how to deal with versioned data, not just versioned code, how to manage specialized hardware (graphical processing units (GPUs)), and how to manage data governance and compliance for models. 

DevOps vs MLOps

Traditional DevOps CI/CD Workflow triggered by changes to source code.


Git is the world’s most popular source-code version control system. It is used to track changes in source code over time and to support different versions of source code. Support for version control is a prerequisite for automation and continuous integration (CI) solutions as it enables it enables reproducible provisioning of any environment in a fully automated fashion. That is, we assume the configuration information required to provision the environment is stored in a version control system, as well as the source code for the system we will be testing. Typically, when working with DevOps, every commit to Git triggers the automated creation of packages that can be deployed to any environment using only information in version control.

In most DevOps setups, Jenkins is used together with Git as an automation server that builds, tests and deploys your versioned code in a controlled and predictable way. The typical steps Jenkins follows for a CI/CD pipeline is to: provision testing virtual machines (VMs)/containers, checkout code onto machines, compile the code, run tests, package binaries, and deploy binaries. For Java, this involves running a build tool like maven to compile, test, and package Java binaries before deploying the binaries in some staging or production system. For Docker, this means compiling a Dockerfile and deploying the Docker image to a Docker registry.


High Level MLOps CI/CD Worflow triggered by changes in either source code or data.


Perhaps the most defining characteristic of MLOps is the need to version data as well as code to enable reproducible workflows for training models. Git is not suitable as a platform for versioning data, as it does not scale to store large volumes of data. Luckily, others have been working on alternative platforms in recent years.

However, Git and Jenkins are not enough for MLOps, where the build process involves running a complex distributed workflow and we need both versioned code and versioned data to ensure reproducible automated builds. The workflow is what we call a ML pipeline, a graph of components, where each component takes input parameters and data, and at the end, a successful workflow run deploys a trained model to production.  A standard ML pipeline consists of at least the following components: validate incoming data, compute features on the incoming data, generate train/test data, train the model, validate the model, deploy the model, and monitor the model in production. This simplified pipeline can, in practice, be even more complex, where the model training stage can be broken into smaller components: hyperparameter tuning, ablation studies, and distributed training. 

There are many already several end-to-end ML frameworks that support orchestration frameworks to run ML pipelines: TensorFlow Extended (TFX) supports Airflow, Beam and Kubeflow pipelines, Hopsworks supports Airflow, MLFlow supports Spark, and Kubeflow supports Kubeflow pipelines. These frameworks enable the automated execution of workflows, the ability to repeat steps, such as re-training a model, with only input parameter changes, the ability to pass data between components, and the ability to specify event-based triggering of workflows (e.g., at a specific time of day, on the arrival of new data, or when model performance degrades below a given level). TFX, MLFlow, and Hopsworks also support distributed processing using Beam and/or Spark, enabling scale-out execution on clusters using very large amounts of data.

MLOps: versioned Code and Data 

Data Versioning, Git-Style

DVC, developed by the affable Dmitry Petrov,  provides an open-source tool for versioning files/objects in cloud storage that uses Git to store metadata about files and reflinks (that support transparent copy-on-write for data files) to ensure consistency between git entries and the data files. Similarly, Pachyderm, a ML platform on Kubernetes,  also provides a data versioning platform using git-like semantics. However, these git-like approaches just track immutable files, they do not store the differences between files. They cannot handle time-travel queries, such as “give me train/test data for the range between the years 2016-2018” or “give me the value of these features on the 6th September 2018”. Without time-travel, they cannot support incremental feature engineering: compute features only on the data that has changed since the last time run (1 hour ago, a day ago, etc).

Data Versioning with Time-Travel Queries and Incremental Pulling

An alternative to git-like data versioning systems is to use a transactional data-lake that provides versioned, structured datasets. A versioned dataset does not just have a version of the schema for its data (schemas may evolve over time), but also updates to the data-lake are executed atomically and identified by a commit. The most well known such platforms are the open-source projects: Delta Lake, Apache Hudi, Apache Iceberg. Here users can perform time-travel queries that return the data at a given point-in-time (commit-id), or the data for a given time-interval, or the changes to the data since a given point in time. They execute time travel queries efficiently using indexes (bloom filters, z-indexes, data-skipping indexes) that massively reduce the amount of data that needs to be read from the file system or object store. Transactional data lakes also enable incremental feature engineering - compute features only for the data that has changed in the last hour or day - by enabling clients to read only those changes in a dataset since a given point in time.

Hopsworks Feature Store 

The Feature Store for machine learning is a feature computation and storage service that enables features to be registered, discovered, and used both as part of ML pipelines as well as by online applications for model inferencing. Feature Stores are typically required to store both large volumes of feature data and provide low latency access to features for online applications. As such, they are typically implemented as a dual-database system: a low latency online feature store (typically a key-value store or real-time database) and a scale-out SQL database to store large volumes of feature data for training and batch applications. The online feature store enables online applications to enrich feature vectors with near real-time feature data before performing inference requests.  The offline feature store can store large volumes of feature data that is used to create train/test data for model development or by batch applications for model scoring. The Feature Store solves the following problems in ML pipelines: 

  • reuse of feature pipelines by sharing features between teams/projects;
  • enables the serving of features at scale and with low latency;
  • ensures the consistency of features between training and serving - features are engineered once and can be cached in both the Online and Offline Feature Stores;
  • ensures point-in-time correctness for featuress - when a prediction was made and an outcome arrives later, we need to be able to query the values of different features at a given point in time in the past. 
The Feature Store for ML consists of both an Online and Offline database and transforms raw data from backend systems into engineered features that are made available to online and batch applications for inferencing and to Data Scientists to create train/test data for model development.

Most hyperscale AI companies have built internal feature stores (Uber, Twitter, AirBnb, Google, Facebook, Netflix, Comcast), but there are also two open-source Feature Stores: Hopsworks Feature Store (built on Apache Hudi/Hive, MySQL Cluster and HopsFS) and Feast (built on Big Query, BigTable, and Redis). Other databases used by existing Feature Stores include Cassandra, S3, and Kafka, and custom key-value stores.


End-to-End ML Pipelines with the Hopsworks Feature Store

Both MLOps and DataOps CI/CD pipelines differ from traditional DevOps in that they may be triggered by new data arriving for processing (as well as triggering due to updates to the source code for the data engineering or model training pipelines). DataOps is concerned with automating test and deployment of data processing pipelines (or feature pipelines, in our case), with stages such as data validation and data pipelines. MLOps, on the other hand, is concerned with the automation of training and deploying production ML models, with stages such as model training, model validation, and model deployment.


The Feature Store enables ML workflows to be decomposed into two workflows: (1) a “DataOps” workflow for engineering features and validating incoming data that caches the features in the Feature Store, and (2) a “MLOps” workflow for training models using features from the Feature Store, analyzing and validating those models, deploying them into online model serving infrastructure, and monitoring model performance in production.


Some ML lifecycle frameworks, such as TensorFlow Extended (TFX) and MLFlow, are based around end-to-end ML pipelines that start with raw data and end in production models. However, the first steps of an end-to-end ML pipeline, that take raw data and turn it into training data for models can be very expensive. According to Airbnb, without a feature store, creating train/test data can take up to 60-80% of data scientists time. The feature store enables transformed data (features) to be reused in different models. When you have a feature store, you no longer need end-to-end ML pipelines from raw data to models. You can decompose end-to-end ML pipelines into two separate pipelines that each run at their own cadence: (1) feature pipelines that ingest data from backend systems, validate it, featurize it and cache it in the feature store, and (2) training pipelines that train models from feature data, validate those models, and deploy them to production. 

The motivation for introducing the Feature Store for MLOps is that the process for ingesting and featurizing new data is separate from the process for training models using features that come from potentially many different sources. That is, there are often differences in the cadence for feature engineering compared to the cadence for model training. Some features may be updated every few seconds, while others are updated every few months. Models, on the other hand, can be trained on demand, regularly (every day or every week, for example), or when monitoring shows a model’s performance has degraded. Feature engineering pipelines are typically triggered at regular intervals when new data arrives or on-demand when source code is pushed to git because changes were made in how features are engineered.

ML Pipelines are Stateful 

The best practice when developing data pipelines is for them to be stateless and idempotent, so that they can be safely re-run in case of failures. ML pipelines, however, have state. Before you deploy a model to production, you need some contextual information - does this model perform better than the currently deployed model? This decision requires state about the currently deployed model. Ideally, we also want historical state, so we can observe and evaluate the performance of models over time and the processing time/success-rate of building models over time. Hopsworks, TFX and MLFlow provide a metadata store to enable ML pipelines to make stateful decisions, to log their execution steps, store the artifacts they produce, and store the provenance of the final models. Both TFX and MLFlow are obtrusive - they make developers re-write the code at each of the stages with their component models (with well-defined inputs and outputs to each stage). This way, they can intercept input parameters to components and log them to the metadata store. Hopsworks provides an inobtrusive metadata model, where pipelines read/write to the HopsFS (HDFS) file system and use Hopsworks APIs to interact with its feature store. This way, metadata events, artifacts, executions, and provenance are implicitly stored to the metadata store without the need to rewrite notebooks or python programs, as is needed in TFX or MLFlow.

Feature Pipelines feed the Hopsworks Feature Store


Feature pipelines have a natural cadence for each data source, and the cached features can be reused by many downstream model training pipelines.


The feature store enables feature pipelines to cache feature data for use by many downstream model training pipelines, reducing the time to create/backfill features. Groups of features are often computed together and have their own natural ingestion cadence, see figure above. Real-time features may be updated in the online feature store every few seconds using a streaming application, while batch features could be updated hourly, daily, weekly, or monthly.

In practice, feature pipelines are data pipelines, where the output is cleaned, validated, featurized data. As there are typically no guarantees on the correctness of the incoming data, input data must be validated and any missing values must be handled (often by either imputing them or ignoring them). Two popular frameworks for data validation are TFX data validation and AWS Deequ, as they allow you to extend traditional schema-based support for validating data (e.g., this column should contain integers) with data validation rules for checking if numerical or categorical values are as expected. For example, while a schema ensures that a numerical feature is of type float, additional validation rules are needed to ensure those floats lie within an expected range. You can also check to ensure a columns’ values are unique, not null, that its descriptive statistics  are within certain ranges. Validated data is then transformed into numeric and categorical features that are then cached in the feature store, and subsequently used both to train models and for batch/online model inferencing. 


Feature Pipelines belong to the DataOps paradigm, where frameworks like Spark, PySpark, Pandas, and Featuretools are used along with data validation libraries like TFX data validation and Deequ.


Feature pipelines share many of the same best-practice DevOps practices with data pipelines. Some of the types of automated tests for data/features, include:

  • unit test and integration tests for all featurization code (Jenkins can run these tests when code is pushed to Git);
  • test that feature values fall within expected ranges (TFX data validation or Deequ);
  • test the uniqueness, completeness, and distinctness of features (Deequ);
  • test that feature distributions match your expectations (TFX data validation or Deequ);
  • test the relationship between each feature and the label, and the pairwise correlations between individual signals (Deequ);
  • test the cost of each feature (custom tests);
  • test that personally identifiable information is not leaking into features (custom tests).

When a feature store is available, the output of feature pipelines is cached feature data, stored in the feature store. Ideally, the destination data sink will have support for versioned data, such as in Apache Hudi in Hopsworks Feature Store. In Hopsworks, feature pipelines upsert (insert or update) data into existing feature groups, where a  feature group is a set of features computed together (typically because they come from the same backend system and are related by some entity or key). Every time a feature pipeline runs for a feature group, it creates a new commit in the sink Hudi dataset. This way, we can track and query different commits to feature groups in the Feature Store, and monitor changes to statistics of ingested data over time.

Model Training Pipelines start at the Feature Store

Model training pipelines belong to the MLOps paradigm, where versioned features are read from Apache Hudi in the Hopsworks Feature Store to create train/test data that is used to train models that are then deployed and monitored in production. Provenance of ML artifacts and executions are stored in the Metadata store in Hopsworks, and ML pipelines are orchestrated by Hopsworks.


Model training with a feature store typically involves at least three stages (or programs) in a workflow:

  1. select the features to include, the file format, and the file system (or object store) for the train/test dataset that will be created from features in the feature store. Note that for Hopsworks Feature store, a timestamp (corresponding to Hudi commit-ids), can also be supplied to reproduce a train/test dataset exactly as it was at a point of time in the past;
  2. train the model using the training dataset created in step 1 (training can be further decomposed into the following steps: hyperparameter optimization, ablation study, and model training);
  3. validate the model using automated tests and deploy it to a model registry for batch applications and/or an online model server for online applications.

In the Hopsworks platform, these three steps would typically be python programs or Jupyter notebooks and they are executed as part of an Airflow DAG (directed acyclic graph). That is, Airflow orchestrates the execution of the pipeline. Airflow enables DAGs to be scheduled periodically, but it can also be configured to run workflows when new feature data arrives in the feature store or when Git commits are pushed for model training pipeline code.

The type of automated tests that are performed during the model validation step include:

  • test how the model performs on different data slices to check for bias,
  • test the robustness of the model to out-of-distribution feature vectors.

Hopsworks supports Google’s What-If Tool for model analysis using Jupyter notebooks. It is useful to investigate counterfactuals (compare a datapoint to the most similar point where your model predicts a different result), making it easier to develop model validation tests that can subsequently in production pipelines.

Google’s What-If Tool can be used to analyze a model, asking counterfactuals and testing for bias on different slices of data. Knowledge discovered here can be transferred into model validation tests.

Monitoring Online Models

When a model is deployed to a model server for use by online applications, we need to monitor model its performance and its input features. We need to identify if the input features in production are statistically different from the input features used to train the model. In practice, we can do this by comparing statistics computed over the training data (accessible through feature store API calls) with statistics collected from input features at runtime. In Hopsworks, we log all prediction requests sent to models to a topic in Kafka. You can then write a Spark Streaming or Flink application that processes the prediction requests in Kafka, computing statistics in time-based windows and comparing those statistics with the training data statistics from the Feature Store. If the time-based windows statistics for a given feature diverge significantly from the training statistics, your streaming application can notify ML engineers that input features are not as expected. Your streaming application will typically also compute business-level KPIs for the models and provide a UI to enable operators to visualize model performance. More concretely, the error signals to look for in online monitoring include:

Concept drift

In a model, the target variable is what the model is trying to predict. It could be, for example, if a financial transaction is suspected as fraud or not fraud. When the statistical properties of a model change over time in an unexpected way (for example, a new fraud scheme appears that increases the overall amount of fraud), we have concept drift.

Data drift

If, however, the statistical properties of the input features change over time in an unexpected way, it will negatively impact the model’s performance. For example, if users execute many more financial transactions than normal due to it being a holiday period, but the model was not trained to handle holiday periods, then the model performance may degrade (either missing fraud or flagging up too many transactions as suspicious).

Feature pipeline changes

If there are changes in how a feature is computed in a feature pipeline, and an online model enriches its feature vector with that feature data from the online feature store, then this can negatively impact the model’s performance. For example, if you change how to compute the number of transactions a user carries out, it may negatively impact the model’s performance.

Summary

We have now covered an End-to-End ML pipeline with a Feature Store based on MLOps principles. Updates to pipeline code or newly arrived data enable changes to be continuously tested and models to be continually updated and deployed in production. We showed how the Feature Store enables monolithic end-to-end ML pipelines to be decomposed into feature pipelines and model training pipelines. We also discussed how data versioning is possible with modern data lake frameworks such as Apache Hudi. In the next blog, we will cover ML pipelines and reproducible experiments Hopsworks in more detail, and how to easily move pipelines from development to production environments. We will also show how to develop both feature pipelines and model training pipelines using Airflow. 

References

Guide to File Formats for Machine Learning: Columnar, Training, and Inferencing

>
10/25/2019
>
Jim Dowling

TLDR; Most machine learning models are trained using data from files. This post is a guide to the popular file formats used in open source frameworks for machine learning in Python, including TensorFlow/Keras, PyTorch, Scikit-Learn, and PySpark. We will also describe how a Feature Store can make the Data Scientist’s life easier by generating training/test data in a file format of choice on a file system of choice.

A file format defines the structure and encoding of the data stored in it and it is typically identified by its file extension – for example, a filename ending in .txt indicates the file is a text file. However, even though files are used to store much of the world’s data, most of that data is not in a format that can be used directly to train ML models. 

This post is mostly concerned with file formats for structured data and we will discuss how the Hopsworks Feature Store enables the easy creation of training data in popular file formats for ML, such as .tfrecords, .csv, .npy, and .petastorm, as well as the file formats used to store models, such as .pb and .pkl . We will not cover other well-known file formats, such as image file formats (e.g., .png, .jpeg), video file formats (e.g.,.mp4, .mkv, etc), archive file formats (e.g.,.zip, .gz, .tar, .bzip2), document file formats (e.g., .docx, .pdf, .txt) or web file formats (e.g., .html).

Data sources

Files are stored on file systems, and, in the cloud, increasingly on object stores. File systems come in different flavors. A local file system (POSIX) usually stores its data on one or more disks (magnetic (hard drives), SSD (solid-state storage), or NVMe drives). Such file systems can be accessible over the network (e.g., NFS), or if large amounts of capacity are needed distributed file systems can be used that scale to store hundreds of Petabytes of data across thousands of servers, like HDFS, HopsFS, and CephFS. In the cloud, object stores are the cheapest file storage option where the files can be read/written with reasonable performance by applications. 

File formats are linked to their file system. For example, if you ran a TensorFlow/Pytorch application to train ImageNet on images stored in S3 (object store) on a Nvidia V100 GPU, you might be able to process 100 images/sec, as that is what a single client can read from S3. However, the V100 can potentially process 1000 images/sec – you are bottlenecking on the I/O to the filesystem. New file formats appear to get around just this type of problem. For example, the petastorm file format was developed by Uber to store PBs of data for self-driving vehicles that is stored on HDFS. petastorm files are large, splittable, and compressed with readers for TensorFlow and PyTorch enabling them to feed lots of V100 in parallel, ensuring they do not bottleneck on file I/O – as they would have if they worked with traditional multimedia file formats on slower network file systems. An alternative, but much more expensive solution, would be to work with the traditional file formats on storage devices made up of hundreds or thousands of NVMe disks.

Machine learning frameworks want to consume training data as a sequence of samples, so file formats for training ML models should have easily consumable layouts with no impedance mismatch with the storage platform or the language used to read/write the files. Additionally, distributed training (training ML models on many GPUs at the same time to make training go faster)  requires files to be splittable and accessible over a distributed file system or object store, so that different GPUs can read different shards (partitions) of the data in parallel from different servers.

Big Data – from Binary to Structured Data

Machine learning has produced technological breakthroughs in a number of domains, such as image classification, voice recognition, voice synthesis, natural language processing, and neural machine translation. The types of file formats used in these systems are typically compressed binary or plaintext data formats. 

Machine learning has extended its impact beyond these first domains and is now being applied to Enterprise data to solve business problems that can be modelled as supervised machine learning problems.  However, much of Enterprise data is available as structured data – originating from Data Warehouses, databases, document repositories, and Data Lakes. Structured Enterprise data can also be used in a variety of text-based and binary file formats. In general, if you have large amounts of data, then you should consider using a binary file format (instead of a traditional text-based format like CSV), as binary file formats may significantly improve the throughput of your import pipeline, helping reduce model training time. Binary file formats require less space on disk and take less time to read from disk. Binary file formats are significant as the other main trend in machine learning is the increasing use of deep learning. Deep learning is ravenous for data - it just keeps getting better the more data it is trained on – and with increased volumes of data, efficient, compressed file formats have a role to play in the deep learning wars to come.


File Formats in Machine Learning Frameworks

Older file formats (e.g., .csv) may not be compressed, may not be splittable (e.g., HDF5 and netCDF) so that they can work seamlessly when training with many workers in parallel, and may make it difficult to combine multiple datasets. However, if the framework you use for machine learning, such as TensorFlow, PyTorch, ScikitLearn, does not provide data import and preprocessing functionality that is integrated seamlessly with those file format features and data sources, then you may not get the benefit of  better file formats. For example, the TFRecord file format was designed for TensorFlow and has full support in tf.data, while PyTorch’s DataLoader was designed, first-and-foremost, around Numpy files and then extended to other file formats. Similarly, ScikitLearn was first designed to work with CSV and Pandas and then extended to work with other file formats. So, while it is clear you should be using more modern file formats for machine learning, you still have the challenge of converting your data to this format, and there may be limited documentation available on how to do that. Fortunately, the Feature Store (introduced later) enables you to easily convert your data into the most important file formats for machine learning with no effort. 

File Formats

This section introduces the most widely used file formats for ML, grouping them into known classes of well-known file format types: columnar, tabular, nested, array-based, and hierarchical. There are also new file formats that have been designed for model serving, that are described below.

Columnar Data File Formats

Apart from possibly Excel, the most common location for Enterprise data is the Data Warehouse or Data Lake. This data is typically accessed using Structured Query Language (SQL) and the data may be stored in either row-oriented format (typically OLTP databases that provide low latency access, high write throughput) or, more commonly, column-oriented format (OLAP/columnar databases that can scale from TBs to PBs and provide faster queries and aggregations). In Data Lakes, structured data may be stored as files (.parquet and .orc) that is still accessible via SQL using scalable SQL engines such as SparkSQL, Hive, and Presto. These columnar data files and backend databases are often the source of training data for Enterprise ML models, and feature engineering on them often requires data-parallel processing frameworks (Spark, Beam, and Flink) to enable feature engineering to scale-out over many servers. This scalable processing is enabled because the path to data in parquet/orc/petastorm data is actually a directory – not a file. The directory contains many files that can be processed in parallel. So, typically you supply the base path (directory) when you want to read data in a columnar file format, and the processing engine figures out which file(s) to read. If you only want to read a subset of columns from a table, files containing excluded columns will not be read from disk, and if you perform a range scan, statistics (max/min values for columns in files) in files enable data skipping - skip this file if your query asks for columns with values outside the range of values stored in the file.

While both parquet and orc have similar properties, Petastorm is uniquely designed to support ML data - it is the only columnar file format that natively supports multi-dimensional data. Columnar file formats typically assume 2-dimensional relational data, but tensors can have much higher dimensionality than 1-d vector or 2-d relational data sources.  Petastorm provides multi-dimensional data capability by extending Parquet with its own Unischema designed explicitly for machine learning use-cases. The Unischema enables petastorm files to store multi-dimensional tensors natively in Parquet. The unischema is also compatible with PyTorch and TensorFlow so you can convert the petastorm schema directly to TensorFlow schema or PyTorch schema, which enables native TensorFlow/PyTorch readers for petastorm.

Columnar file formats are designed for use on distributed file systems (HDFS, HopsFS) and object stores (S3, GCS, ADL) where workers can read the different files in parallel.

  • File formats: .parquet, .orc, .petastorm.
  • Feature Engineering: PySpark, Beam, Flink.
  • Training:.petastorm has native readers in TensorFlow and PyTorch;
                    .orc, .parquet have native readers in Spark;
                    JDBC/Hive sources supported by Spark

Tabular Text-based File Formats

Tabular data for machine learning is typically found is .csv files. Csv files are text-based files containing comma separated values (csv). Csv files are popular for ML as they are easy to view/debug and easy to read/write from programs (no compression/indexing). However, they have no support for column types, there is no distinction between text and numeric columns, and they have poor performance which is more noticeable when the volume of data grows to GBs or more - they are not splittable, they do not have indexes, and they do not support column filtering. .csv files can be compressed using GZIP to save space. Other popular tabular formats that are not typically used in ML are spreadsheet file formats (e.g.,.xlsx and .xls) and unstructured text formats (.txt).

  • File formats: .csv, .xslx
  • Feature Engineering: Pandas, Scikit-Learn, PySpark, Beam, and lots more
  • Training: .csv has native readers in TensorFlow, PyTorch, Scikit-Learn, Spark

Nested File Formats

Nested file formats store their records (entries) in an n-level hierarchical format and have a schema to describe their structure. A hierarchical format means that a record could have one parent (or be the root, with no parent) but it could also have children. Nested file format schemas are able to be extended (add attributes while maintaining backwards compatibility) and the order of attributes is typically not significant.  The .json and .xml file formats are the best known plain text nested file formats, while binary nested file formats include protocol buffers (.pb) and avro (.avro). 

TFRecords is a sequence of binary records, typically a protobuf with either a schema “Example” or “SequenceExample”. The developer decides whether to store samples as either an “Example” or a “SequenceExample”. Choose a SequenceExample if your features are lists of identically typed data.  A TFRecords file can be a directory (containing lots of .tfrecords files), and it supports compression with Gzip. More details on how to use TFRecords can be found in the official documentation, and this good blog post.

  • File formats: .tfrecords, .json, .xml, .avro
  • Feature Engineering: Pandas, Scikit-Learn, PySpark, Beam, and lots more
  • Training: .tfrecords is the native file format for TensorFlow;
                    .json has native readers in TensorFlow, PyTorch, Scikit-Learn, Spark
                    .avro files can be used as training data in TensorFlow with LinkedIn’s library

Array-Based Formats

Numpy is an abbreviation of Numerical Python and it is a massively popular library for scientific computing and data analysis. Through support for vectorization, Numpy (.npy) is also a high performance file format. A Numpy array is a densely packed array with elements of the same type. The file format, .npy, is a binary file format that stores a single NumPy array (including nested record arrays and object arrays).

  • File formats: .npy
  • Feature Engineering: PyTorch, Numpy, Scikit-Learn, TensorFlow;
  • Training: .npy has native readers in PyTorch, TensorFlow, Scikit-Learn.

Hierarchical Data Formats 

HDF5 (.h5 or .hdf5) and NetCDF (.nc) are popular hierarchical data file formats (HDF) that are designed to support large, heterogeneous, and complex datasets. In particular, HDF formats are suitable for high dimensional data that does not map well to columnar formats like parquet (although petastorm is both columnar and supports high dimensional data). Lots of medical device data is stored in HDF files or related file formats, such as BAM, VCF for genomic data. Internally, HDF5 and NetCDF store data in a compressed layout. NetCDF is popular in domains such as climate science and astronomy. HDF5 is popular in domains such as GIS systems.  They are not splittable, so not suitable for distributed processing (with engines like Spark). 

  • File formats: .h5 (HDF5), .nc (NetCDF)
  • Feature Engineering: Pandas, Dask, XArray;
  • Training: .h5 has no native readers in TensorFlow or PyTorch;
                    .nc has no native readers we are aware of.

Model File Formats

In supervised machine learning, the artefact created after training that is used to make predictions on new data is called a model. For example, after training a deep neural network (DNN), the trained model is basically a file containing the layers and weights in the DNN. Often, models can be saved in a file that can potentially be compressed, so typically model files have a binary file format. TensorFlow saves models as protocol buffer files, with a .pb file extension. Keras saves models natively as .h5 file. Scikit-Learn saves models as pickled python objects, with a .pkl file extension. An older format for model serving based on XML, predictive model markup language (.pmml), is still usable on some frameworks, such as Scikit-Learn.

Model files are used to make predictions on new data by either (1) batch applications that typically read the model in as a file or (2) a real-time model serving server (such as TensorFlow Serving Server) that reads the model into memory, may even have multiple versions of a model in memory for AB testing. 

Other model file formats that are used include SparkML models that can be saved in MLeap file format and served in real-time using a MLleap model server (files are packaged in .zip format). Apple developed the .mlmodel file format to store models embedded in iOS applications as part of its Core ML framework (which has superior support for ObjectiveC and Swift languages). Applications trained in TensorFlow, Scikit-Learn, and other frameworks need to convert their model files to the .mlmodel file format for use on iOS, with tools like, coremltools and Tensorflow converter being available to help file format conversion. ONNX is a ML framework independent file format, supported by Microsoft, Facebook, and Amazon. In theory, any ML framework should be able to export its models in .onnx file format, so it offers great promise in unifying model serving across the different frameworks. However, as of late 2019, ONNX does not support all operations for the most popular ML frameworks (TensorFlow, PyTorch, Scikit-Learn), so ONNX is not yet practical for those frameworks. In PyTorch, the recommended way to serve models is to use Torch Script to trace and save a model as a .pt file and serve it from a C++ application. 

One final file format to mention here is YAML that is used to package models as part of the MLFlow framework for ML pipelines on Spark. MLFlow stores a YAML file that describes the files it packages for model serving, so that deployment tools can understand the model file format and know what files to deploy.

  • File formats: .pb, .onnx, .pkl, .mlmodel, .zip, .pmml, .pt
  • Inference: .pb files are served by TensorFlowServing Server;
                      .onnx files are served by Microsoft’s commercial model serving platorm;
                      .pkl files are served for Scikit-Learn models, often on Flask servers;
                      .mlmodel files are served by iOS platforms;
                      .zip files are used to package up MLeap files that are served on the MLeap runtime;
                      .pt files are use to package PyTorch models that can be served inside C++ applications.

ML Data File Formats Summary

A summary of the different file formats for the different ML pipeline stages (feature engineering / dataprep, training, and serving) is shown in the tables below:


And the winner is...

The most feature complete and language independent and scalable of the file formats for training data for deep learning is petastorm. Not only does it support high-dimensional data and have native readers in TensorFlow and PyTorch, but it also scales for parallel workers, but it also supports push-down index scans (only read those columns from disk that you request and even skip files where the values in that file are outside the range of values requested) and scales to store many TBs of data. 

For model serving, we cannot really find any file format superior to the others. The easiest model serving solution to deploy and operate is protocol buffers and TensorFlow serving server. While both ONNX and Torch Script have potential, the open-source model serving servers are not there yet for them. Similarly, serving .pkl files on a flask server, while used heavily in production, still requires a lot more work on behalf of the ML operations engineer - you have to write your own Python serving program, manage security, load balancing, and much more.

Hopsworks and ML File Formats

In this section, we discuss support for converting ML feature data into the file format of your choice using the Hopsworks Feature Store, and the support for the main ML frameworks file formats in Hopsworks using the HopsFS file system.

Feature Store for File Format Conversion 

Hopsworks Feature Store can create train/test data in your file format of choice, on your data store of choice (S3, HDFS, HopsFS).


The Hopsworks Feature Store is a warehouse for storing reusable features for ML. It is designed to act as a cache for feature data. Feature data is the output of feature engineering, which transforms raw data from backend systems into features that can be used directly to train ML models. Data scientists interact with the feature store by browsing the available features, and, then, on finding the features they need to build their predictive model, they generate train/test data in the file format of their choice, on the storage platform of their choice. In the example shown in the Figure above, we can see we have 3 features and a target variable from the Titanic dataset, and we can then pick a file format (from the 10+ available file formats) and a target file system where the train/test data will be created. The Feature Store enables teams to easily collaborate on solving a problem by easily trying out solutions in different frameworks using different approaches (for example, deep learning on TensorFlow and PyTorch, and decision trees on Scikit-Learn) without the effort of converting the train/test data into the most efficient, easy-to-use file format for the respective framework.

File Formats Examples in Hopsworks 

Hopsworks provides HopsFS (a next-generation HDFS file system) as its default distributed file system. HopsFS is a great distributed file system for machine learning due to its high throughput and low latency as well as widespread native HDFS readers in popular frameworks for ML: Spark, TensorFlow, Pandas (Scikit-Learn), and PyTorch (through petastorm). 

Some examples of Notebooks that use different file formats with HopsFS in Hopsworks are included here, followed by specific examples from popular frameworks for ML in Python (PySpark, TensorFlow/Keras, PyTorch, Scikit-Learn):

In the next sections we give a brief overview of the recommended file formats for the major python ML frameworks: PySpark, TensorFlow/Keras, PyTorch, and Scikit-Learn, along with an example code snippet and a link to a Python notebook from Hopsworks.

PySpark

File formats: .csv, .parquet, .orc, .json, .avro, .petastorm

Data sources: local filesystem, HDFS , S3
Model serving  file formats: .zip (MLeap)

Columnar file formats work better with PySpark (.parquet, .orc, .petastorm) as they compress better, are splittable, and support reading selective reading of columns (only those columns specified will be read from files on disk). Avro files are frequently used when you need to write fast with PySpark, as they are row-oriented and splittable. PySpark can read files from the local filesystem, HDFS, and S3 data sources.

Open Example PySpark Notebook


Pandas/Scikit-Learn

File formats: .csv, .npy, .parquet, .h5, .json, .xlsx
Data sources: local filesystem, HDFS , S3
Model serving file formats: .pkl

Pandas can read files natively in .csv, .parquet, .hdf5, .json, .xlsx, and also from SQL sources. Pandas can read files from the local filesystem, HDFS, S3, http, and ftp data sources. In Hopsworks, you can read files in HopsFS using Panda’s native HDFS reader with a helper class:

Open Example Pandas Notebook

TensorFlow/Keras

File formats: .csv, .npy, .tfrecords, .petastorm

Data sources: local filesystem, HDFS , S3
Model serving file formats: .pb

The native file format for Tensorflow is .tfrecords, and if your Dataset is not huge and you do not need to read only a subset of columns from the dataset, then it is our recommended file format. If, however, you only want to read a subset of columns (a projection in database terminology) from the dataset, then .petastorm is the file format to use. TensorFlow can read files from the local filesystem, HDFS, and S3 data sources.

Open Example TensorFlow Notebook

PyTorch 

Training File formats: .csv, .npy, .petastorm

Data sources: local filesystem, HDFS ( petastorm), S3
Model serving file formats: .pt

PyTorch is tightly integrated with Numpy, and .npy is the native file format for PyTorch. However, np.load() does not work with HopsFS natively, so you have to use a wrapper function that we include in our library that first materializes the data from HopsFS to the local filesystem before reading it in as a numpy array.  If you have large datasets, we recommend using .petastorm, which works natively with PyTorch through its own reader.

Open Example PyTorch Notebook


Numpy/Scikit-Learn

File formats: .npy

Data sources: local filesystem
Model serving file formats: .pkl

The native data format for Scikit-Learn is numeric data, and is most commonly stored as numpy arrays or pandas DataFrames (convertible to numpy arrays). PyTorch also builds directly on numpy arrays. As such, .npy is a popular file format for training data with Scikit-Learn. In Hopsworks, we provide a numpy_helper to read .npy files from HopsFS.

Open Example Scikit-Learn Notebook

References

[hdf5] https://www.neonscience.org/about-hdf5

[numpy] https://towardsdatascience.com/why-you-should-start-using-npy-file-more-often-df2a13cc0161

[tftransform] https://www.tensorflow.org/tfx/transform/get_started

[featurestore] https://www.logicalclocks.com/featurestore 

[tfrecords] https://medium.com/mostly-ai/tensorflow-records-what-they-are-and-how-to-use-them-c46bc4bbb564 

[bigdataFileFormats] https://luminousmen.com/post/big-data-file-formats

[petastorm] https://qcon.ai/system/files/presentation-slides/yevgeni_-_petastorm_16th_apr_2019_.pdf

Optimizing GPU utilization in Hops

>
9/28/2018
>
Robin Andersson

TLDR; This article describes how we use dynamic executors in PySpark to ensure GPUs are only allocated to executors only when they are training neural networks. When training is finished, GPUs are immediately freed up for use by other applications in the cluster. We also make hyperparameter optimization fault-tolerant using Spark’s executor blacklisting capability.

Click here to see a Spark Summit talk on PySpark, TensorFlow, and Hops.

When developers use Tensorflow/Keras/PyTorch to train deep neural networks in Hops, Hops uses PySpark to transparently distribute the python code to containers with GPUs. Hops enables AutoML, automated search for good neural network architectures and hyperparameters by running parallel experiments on different combinations of hyperparameters and model architectures. In PySpark, Hops runs a different experiment on each executor – not all of the experiments will finish at the same time. Some experiments may finish early, some later. And GPUs cannot currently be shared (multiplexed) by concurrent applications. Population-based approaches for AutoML, typically proceed in stages or iterations, meaning all experiments wait for other experiments to finish, resulting in idle GPU time. That is, GPUs lie idle waiting for other experiments to finish.

As such, we have the problem of how to free up the GPUs as soon as its experiment is finished. Hops leverages dynamic executors in PySpark/YARN to free up the GPU(s) attached to an executor immediately if it sits idle waiting for other experiments to finish, ensuring that (expensive) GPUs are held no longer than needed.

Spark/TensorFlow on Hops

Each Spark executor runs a local TensorFlow process. Hops also supports cluster-wide Conda for managing python library dependencies. Hops supports the creation of projects, and each project has its own conda environment, replicated at all hosts in the cluster. When you launch a PySpark job, it uses the local conda environment for that project. This way, users can install whatever libraries they like using conda and pip, and then use them directly inside Spark Executors. It makes programming PySpark one step closer to the single-host experience of programming Python. Hops also supports Jupyter and the SparkMagic kernel for running PySpark jobs.

GPUs are a ‘special resource’

YARN, Kubernetes, Mesos cannot ensure that GPU resources allocated to applications will be highly utilized. This is not a problem for traditional resources like CPU and memory. For example, if a workload of CPU-intensive applications hog CPU resources, but overprovision memory, this problem can be addressed by the resource scheduler over-provisioning the amount of memory available at a host. That is, if the host has 256GB of RAM, you can tell YARN the host has 350GB of memory available for applications. You should, of course, ensure the operating-system has enough virtual memory available to handle the configured level of over-provisioning, and, like an airline selling more seats than it has available, ensure that disk swapping almost never happens. Another alternative is to use a system like Dr Elephant that monitors applications’ resource utilization and warns applications when they overprovision memory, so that applications can be right-sized. This, of course, works best where workloads are predictable for a cluster, which is not always the case.

But GPUs are a special resource. We can’t overprovision them, because we can’t run multiple applications on GPUs at the same time. An application is given a GPU by a resource scheduler, and the resource scheduler trusts that the application will fully utilize the GPU while it has it. But as the Germans say, ‘trust is good, but control is better’. In Hops, we use Spark’s Dynamic Executors to ensure that GPUs are released immediately, when no longer needed.

For deep neural networks (Tensorflow/Keras/PyTorch), users wrap their code in a function like this:

   def train_fn(learning_rate, dropout_rate):
   …..
   # tensorflow training code goes here
   …..
   args_dict = {'learning_rate': [0.001, 0.0005, 0.0001], 'dropout': [0.45, 0.7]}
   experiment.grid_search(train_fn, args_dict)

The above code is an example of hyperparameter optimization for TensorFlow. The program will start run 6 instances of the train_fn, each a different combination of learning_rate and dropout parameters (see the Table below). If our PySpark application has allocated 6 executors then each executor will run one of hyperparameter combinations. It is also possible that fewer than 6 executors were allocated to the application. In that case, hyperparameter tasks (or trials) are placed in a queue, and executors pick them from the queue until all tasks are finished.

In the above example, we use grid_search as an exploration algorithm (to explore the performance of different combinations of hyperparameters). Another approach supported in Hops is genetic algorithms to perform population-based search for good combinations of hyperparameters (aka AutoML or neural architecture search). Other potential approaches are random walk, bayesian optimization, and hyperband.

Fault-tolerant hyperparameter optimization

In case a task fails on a PySpark executor, the task is typically retried up to N times on that same executor. If the task does not succeed after these N attempts the whole Spark Job will fail. Put that in to perspective of running a large hyperparameter search which may be running for days. Let’s assume as the PySpark executors are running the experiments, one of the servers experience a hardware malfunction, for example a disk breaks. The executors running on this machine will fail all their tasks and the job as a whole will be failed, due to this hardware failure. We need a smarter mechanism to rerun the specific experiments that failed on another machine.

Spark 2.1.0 added support for blacklisting problematic executors and even entire nodes. We take advantage of this mechanism to ensure hyperparameter optimization jobs are fault-tolerant. The behaviour that we configure is that an executor is blacklisted after an experiment fails for whatever reason and the allocated CPU/Memory/GPU(s) is reclaimed by YARN. The second time that the same experiment fails, it will blacklist the machine where the executor is running on. Which means that no more experiments will be run on that machine for the entire duration of the hyperparameter optimization job, the assumption here is that the failure may be hardware related. This means that the experiment which failed will now run on a new machine, if it fails a third time the job fails as a whole since the problem is most likely due to an application error and not hardware failure. In practice we have seen that this mechanism can also help remedy code errors, e.g. code which may fail occasionally is rerun up to three times.

GPU Support in Spark/YARN

In Hops, we have already added support for GPUs in a fork of Spark. This work is ongoing in the Spark community, with YARN/GPU support being developed in SPARK-2473.

References

Feature Store: The Missing Data Layer in ML Pipelines?

>
12/30/2018
>
Kim Hammar

TLDR; A feature store is a central vault for storing documented, curated, and access-controlled features. In this blog post, we discuss the state-of-the-art in data management for deep learning and present the first open-source feature store, available in Hopsworks.

What is a Feature Store?

The concept of a feature store was introduced by Uber in 2017 [11]. The feature store is a central place to store curated features within an organization. A feature is a measurable property of some data-sample. It could be for example an image-pixel, a word from a piece of text, the age of a person, a coordinate emitted from a sensor, or an aggregate value like the average number of purchases within the last hour. Features can be extracted directly from files and database tables, or can be derived values, computed from one or more data sources.

Features are the fuel for AI systems, as we use them to train machine learning models so that we can make predictions for feature values that we have never seen before.

Figure 1. A feature store is the interface between feature engineering and model development.

The feature store has two interfaces:

Writing to the feature store: The interface for Data Engineers. At the end of the feature engineering pipeline, instead of writing features to a file or a project-specific database or file, features are written to the feature store.

Data Engineer example:

from hops import featurestore
raw_data = spark.read.parquet(filename)

polynomial_features = raw_data.map(lambda x: x^2)

featurestore.insert_into_featuregroup(polynomial_features, "polynomial_featuregroup")

Reading from the feature store: The interface for Data Scientists. To train a model on a set of features, the features can be read from the feature store directly.

Data Scientist example:

from hops import featurestore

features_df = featurestore.get_features(["average_attendance", "average_player_age"])

A feature store is not a simple data storage service, it is also a data transformation service as it makes feature engineering a first-class construct. Feature engineering is the process of transforming raw data into a format that is understandable for predictive models.

Why You Need a Feature Store

At Logical Clocks we are committed to developing technologies to operate machine learning workflows at large scale and to help organizations distill intelligence from data. Machine learning is an extremely powerful method that has the potential to help us move from a historical understanding of the world to a predictive modeling of the world around us. However, building machine learning systems is hard and requires specialized platforms and tools.

Although ad-hoc feature engineering and training pipelines is a quick way for Data Scientists to experiment with machine learning models, such pipelines have a tendency to become complex over time. As the number of models increase, it quickly becomes a pipeline jungle that is hard to manage. This motivates the usage of standardized methods and tools for the feature engineering process, helping reduce the cost of developing new predictive models. The feature store is a service designed for this purpose.

Technical Debt in Machine Learning Systems

“Machine Learning: The High-Interest  Credit Card of Technical Debt”

Google [3]

Machine learning systems have a tendency to assemble technical debt [1]. Examples of technical debt in machine learning systems are:

  • There is no principled way to to access features during model serving.
  • Features cannot easily be re-used between multiple machine learning pipelines.
  • Data science projects work in isolation without collaboration and re-use.
  • Features used for training and serving are inconsistent.
  • When new data arrives, there is no way to pin down exactly which features need to be recomputed, rather the entire pipeline needs to be run to update features.

Several organizations that we have spoken to struggle to scale their machine learning workflows due to the technical complexity, and some teams are even reluctant to adopting machine learning considering the high technical cost of it. Using a feature store is a best practice that can reduce the technical debt of machine learning workflows.

“Pipeline jungles can only be avoided by thinking holistically about data collection and feature extraction”

Google [1]

Data Engineering is the hardest problem in Machine Learning

“Data is the hardest part of ML and the most important piece to get right. Modelers spend most of their time selecting and transforming features at training time and then building the pipelines to deliver those features to production models. Broken data is the most common cause of problems in production ML systems”

Uber [2]

Delivering machine learning solutions in production and at large-scale is very different from fitting a model to a pre-processed dataset. In practice, a large part of the effort that goes into developing a model is spent on feature engineering and data wrangling.

Figure 2. Model development is just one part of the work that goes into machine learning project.

There are many different ways to extract features from raw data, but common feature engineering steps include:

  • Converting categorical data into numeric data;
  • Normalizing data (to alleviate ill-conditioned optimization when features originate from different distributions);
  • One-hot-encoding/binarization;
  • Feature binning (e.g., convert continuous features into discrete);
  • Feature hashing (e.g., to reduce the memory footprint of one-hot-encoded features);
  • Computing polynomial features;
  • Representation learning (e.g.,  extract features using clustering, embeddings, or generative models);
  • Computing aggregate features (e.g., count, min, max, stdev).

To illustrate the importance of feature engineering, let’s consider a classification task on a dataset with just one feature, x1, that looks like this:

Figure 3. A dataset with a single feature, x1, that have two classes (filled circle and non-filled circle) that are not linearly separable.

We are doomed to fail if we try to fit a linear model directly to this dataset as it is not linearly separable. During feature engineering we can extract an additional feature, x2, where the function for deriving x2 from the raw dataset is x2 = (x1)^2. The resulting two-dimensional dataset might look like depicted in Figure 2.

Figure 4. A dataset with two features, x1 and x2, that have two classes (filled circle and non-filled circle) that are linearly separable (e.g., by the red line).

By adding an extra feature, the dataset becomes linearly separable and can be fitted by our model. This was a simple example, in practice the process of feature engineering can involve much more complex transformations.

In the case of deep learning, deep models tend to perform better the more data they are trained on (more data samples during training can have a regularizing effect and combat overfitting). Consequently, a trend in machine learning is to train on increasingly larger datasets. This trend further complicates the feature engineering process as Data Engineers must think about scalability and efficiency in addition to the feature engineering logic. With a standardized and scalable feature platform, the complexity of feature engineering can be managed more effectively.

Life Before the Feature Store

Figure 5. A typical machine learning infrastructure without a feature store.

No Feature Store

In Figure 5, feature code is duplicated across training jobs and there are also features that have different implementations:  one for training, and one for deployment (Inferencing) (Model C). Having different implementations for computing features for training and deployment entails non-DRY code and can lead to prediction problems. Moreover, without a feature store, features are typically not reusable as they are embedded in training/serving jobs. This also means that Data Scientists have to write low level code for accessing data stores, requiring data engineering skills. There is also no service to search for feature implementations, and there is no management or governance of features.

Figure 6. A machine learning Infrastructure with a feature store.

With a Feature Store

Data Scientists can now search for features, and with API support, easily use them to build models with minimal data engineering. In addition, features can be cached and reused by other models, reducing model training time and infrastructure costs. Features are now a managed, governed asset in the Enterprise.

Economies of Scale for Machine Learning Organizations

A frequent pitfall for organizations that apply machine learning is to think of data science teams as individual groups that work independently with limited collaboration. Having this mindset results in machine learning workflows where there is no standardized way to share features across different teams and machine learning models. Not being able to share features across models and teams is limiting Data Scientist's productivity and makes it harder to build new models. By using a shared feature store, organizations can achieve an economies-of-scale effect. When the feature store is built up with more features, it becomes easier and cheaper to build new models as the new models can re-use features that exist in the feature store.

Figure 7. By centralizing feature storage within the organization, the ramp-up period for new models and machine learning projects is reduced.

Hopsworks Feature Store

With Hopsworks 0.8.0 we are releasing the first open-source feature store service that will be integrated in the HopsML framework [8]. In this section we cover the technical details of the system and how it is used.

The Components of a Feature Store and a Comparison of Existing Feature Stores

During 2018, a number of large companies that are at the forefront of applying machine learning at scale announced the development of proprietary feature stores. Uber, LinkedIn, and Airbnb built their feature stores on Hadoop data lakes, while Comcast built a feature store on an AWS data lake, and GO-JEK built a feature store on Google’s data platform.

These existing feature stores consist of five main components:

  • The feature engineering jobs, the computation of features, the dominant frameworks for feature computation are Samza (Uber [4]),Spark (Uber [4], Airbnb [5], Comcast [6]), Flink (Airbnb [5], Comcast [6]), and Beam (GO-JEK [7]).
  • The storage layer for storing feature data. Common solutions for storing features are Hive (Uber [4], Airbnb [5]), S3 (Comcast [6]), and BigQuery (GO-JEK [7]).
  • The metadata layer used for storing code to compute features, feature version information, feature analysis data, and feature documentation.
  • The Feature Store API used for reading/writing features from/to the feature store.
  • The feature registry, a user interface (UI) service where Data Scientists can share, discover, and order computation of features.

Before we dive into the feature store API and its usage, let’s have a look at the technology stack that we built our feature store on.

Hopsworks Feature Store Architecture

Figure 8. Architecture of Hopsworks Feature Store

Feature Engineering Frameworks

At Logical Clocks we specialize in Python-first ML pipelines, and for feature engineering we focus our support on Spark, PySpark, Numpy, and Pandas. The motivation for using Spark/PySpark to do feature engineering is that it is the preferred choice for data wrangling among our users that are working with large-scale datasets. However, we have also observed that users working with small datasets prefer to do the feature engineering with frameworks such as Numpy and Pandas, which is why we decided to provide native support for those frameworks as well. Users can submit feature engineering jobs on the Hopsworks platform using notebooks, python files, or .jar files.

The Storage Layer

We have built the storage layer for the feature data on top of Hive/HopsFS with additional abstractions for modeling feature data. The reason for using Hive as the underlying storage layer is two-fold: (1) it is not uncommon that our users are working with datasets in terabyte-scale or larger, demanding scalable solutions that can be deployed on HopsFS (see blog post on HopsFS [9]); and (2) data modeling of features is naturally done in a relational manner, grouping relational features into tables and using SQL to query the feature store. This type of data modelling and access patterns fits well with Hive in combination with columnar storage formats such as Parquet or ORC.

The Metadata Layer

To provide automatic versioning, documentation, feature analysis, and feature sharing we store extended metadata about features in a metadata store. For the metadata store we utilize NDB (MySQL Cluster) which allows us to keep feature metadata that is strongly consistent with other metadata in Hopsworks, such as metadata about feature engineering jobs and datasets.

Feature Data Modeling

We introduce three new concepts to our users for modeling data in the feature store.

  • The feature is an individual versioned and documented data column in the feature store, e.g., the average rating of a customer.
  • The feature group is a documented and versioned group of features stored as a Hive table. The feature group is linked to a specific Spark/Numpy/Pandas job that takes in raw data and outputs the computed features.
  • The training dataset is a versioned and managed dataset of features and labels (potentially from multiple different feature groups). Training datasets are stored in HopsFS as tfrecords, parquet, csv, tsv, hdf5, or .npy files.
Figure 9. A feature group contains a group of features and a training dataset contains a set of features, potentially from many different feature groups.

When designing feature groups, it is a best-practice to let all features that are computed from the same raw dataset to be in the same feature group. It is common that there are several feature groups that share a common column, such as a timestamp or a customer-id, that allows feature groups to be joined together into a training dataset.

The Feature Store API

The feature store has two interfaces; one interface for writing curated features to the feature store and one interface for reading features from the feature store to use for training or serving.  

Creating Features

The feature store is agnostic to the method for computing the features. The only requirement is that the features can be grouped together in a Pandas, Numpy, or Spark dataframe. The user provides a dataframe with features and associated feature metadata (metadata can also be edited later through the feature registry UI) and the feature store library takes care of creating a new version of the feature group, computing feature statistics, and linking the features to the job to compute them.

Insert Features

from hops import featurestore

featurestore.insert_into_featuregroup(features_df, featuregroup_name)

Create Feature Group

from hops import featurestore

featurestore.create_featuregroup(
   features_df,
   featuregroup_name,
   featuregroup_description,
   feature_engineering_job,
   featuregroup_version
)

Reading From the Feature Store (Query Planner)

To read features from the feature store, users can use either SQL or APIs in Python and Scala. Based on our experience with users on our platform, Data Scientists can have diverse backgrounds. Although some Data Scientists are very comfortable with SQL, others prefer higher level APIs. This motivated us to develop a query-planner to simplify user queries. The query-planner enables users to express the bare minimum information to fetch features from the feature store. For example, a user can request 100 features that are spread across 20 different feature groups by just providing a list of feature names. The query-planner uses the metadata in the feature store to infer where to fetch the features from and how to join them together.

Figure 10. Users query the feature store programmatically or with SQL queries. The output is provided as Pandas, Numpy, or Spark dataframes.

To fetch the features “average_attendance” and “average_player_age” from the feature store, all the user has to write is this.

Example to fetch features

from hops import featurestore

features_df = featurestore.get_features(["average_attendance", "average_player_age"])

The returned “features_df” is a (Pandas, Numpy, or Spark) dataframe that can then be used to generate training datasets for models.

Creating Training Datasets

Organizations typically have many different types of raw datasets that can be used to extract features. For example, in the context of user recommendation there might be one dataset with demographic data of users and another dataset with user activities. Features from the same dataset are naturally grouped into a feature group, and it is common to generate one feature group per dataset. When training a model, you want to include all features that have predictive power for the prediction task, these features can potentially span multiple feature groups. The training dataset abstraction in the Hopsworks Feature Store is used for this purpose. The training dataset allows users to group a set of features with labels for training a model to do a particular prediction task.

Once a user has fetched a set of features from different feature groups in the feature store, the features can be joined with labels (in case of supervised learning) and materialized into a training dataset. By creating a training dataset using the feature store API, the dataset becomes managed by the feature store. Managed training datasets are automatically analyzed for data anomalies, versioned, documented, and shared with the organization.

Figure 11. The life-cycle of data in HopsML. Raw data is transformed into features which are grouped together into training datasets that are used to train models.

To create a managed training dataset, the user supplies a Pandas, Numpy or Spark dataframe with features, labels, and metadata.

Create a managed training dataset

from hops import featurestore

features_df = featurestore.get_features(["average_attendance", "average_player_age"])

featurestore.create_training_dataset(
   features_df,
   training_dataset_name,
   training_dataset_description,
   computation_job,
   training_dataset_version,
   data_format="tfrecords"
)

Once the training dataset has been created, the dataset is discoverable in the feature registry and users can use it to train models. Below is an example code snippet for training a model using a training dataset stored in a distributed manner in the tfrecords format on HopsFS.

Training a model using a training dataset

from hops import featurestore

import tensorflow as tf

dataset_dir = featurestore.get_training_dataset_path(td_name)


# the tf records are written in a distributed manner using partitions

input_files = tf.gfile.Glob(dataset_dir + "/part-r-*")


# tf record schemas are managed by the feature store

tf_record_schema = featurestore.get_training_dataset_tf_record_schema(td_name)


# tf records are a sequence of *binary* (serialized with protobuf) records that need to be decoded.

def decode(example_proto):

   return tf.parse_single_example(example_proto, tf_record_schema)

dataset = tf.data.TFRecordDataset(input_files)
   .map(decode)
   .shuffle(shuffle_buffer_size)
   .batch(batch_size)
   .repeat(num_epochs)


# three layer MLP for regression

model = tf.keras.Sequential([
   layers.Dense(64, activation="relu"),
   layers.Dense(64, activation="relu"),
   layers.Dense(1)
])

model.compile(optimizer=tf.train.AdamOptimizer(lr), loss="mse")

model.fit(dataset, epochs=num_epochs, steps_per_epoch=spe)

The Feature Registry

The feature registry is the user interface for publishing and discovering features and training datasets. The feature registry also serves as a tool for analyzing feature evolution over time by comparing feature versions. When a new data science project is started, Data Scientists within the project typically begin by scanning the feature registry for available features, and only add new features for their model that do not already exist in the feature store.

Figure 12. Feature Registry on Hopsworks.

The feature registry provides :

  • Keyword search on feature/feature group/training dataset metadata.
  • Create/Update/Delete/View operations on feature/feature group/training dataset metadata.
  • Automatic feature analysis.
  • Feature dependency tracking.
  • Feature job tracking.
  • Feature data preview.

Automatic Feature Analysis

When a feature group or training dataset is updated in the feature store, a data analysis step is performed. In particular, we look at cluster analysis, feature correlation, feature histograms, and descriptive statistics. We have found that these are the most common type of statistics that our users find useful in the feature modeling phase. For example, feature correlation information can be used to identify redundant features, feature histograms can be used to monitor feature distributions between different versions of a feature to discover covariate shift, and cluster analysis can be used to spot outliers. Having such statistics accessible in the feature registry helps users decide on which features to use.

Figure 13. Viewing the feature correlation for a training dataset using the feature registry.


Figure 14. Viewing the distribution of a feature in a feature group using the feature registry.

Feature Dependencies and Automatic Backfilling

When the feature store increases in size, the process of scheduling jobs to recompute features should be automated to avoid a potential management bottleneck. Feature groups and training datasets in Hopsworks feature store are linked to Spark/Numpy/Pandas jobs which enables  the reproduction and recompution of the features when necessary. Moreover, each feature group and training dataset can have a set of data dependencies. By linking feature groups and training datasets to jobs and data dependencies, the features in the Hopsworks feature store can be automatically backfilled using workflow management systems such as Airflow [10].

Figure 15. Feature dependency tracking.

A Multi-Tenant Feature Store Service

We believe that the biggest benefit of a feature store comes when it is centralized across the entire organization. The more high-quality features available in the feature store the better. For example, in 2017 Uber reported that they had approximately 10000 features in their feature store [11].

Despite the benefit of centralizing features, we have identified a need to enforce access control to features. Several organizations that we have talked to are working partially with sensitive data that requires specific access rights that is not granted to everyone in the organization. For example, it might not be feasible to publish features that are extracted from sensitive data to a feature store that is public within the organization.

To solve this problem we utilize the multi-tenancy property built-in to the architecture of the Hopsworks platform [12]. Feature stores in Hopsworks are by default project-private and can be shared across projects, which means that an organization can combine public and private feature stores. An organization can have a central public feature store that is shared with everyone in the organization as well as private feature stores containing features of sensitive nature that are only accessible by users with the appropriate permissions.

Figure 16. Based on the organization need, features can be divided into several feature stores to preserve data access control.

Future Work

The feature store covered in this blog post is a so called batch feature store, meaning that it is a feature store designed for training and non-real time model serving. In future work, we plan to extend the feature store to meet real-time guarantees that are required during serving of user-facing models. Moreover, we are currently in the process of evaluating the need for a Domain Specific Language (DSL) for feature engineering. By using a DSL, users that are not proficient in Spark/Pandas/Numpy can provide an abstract declarative description of how features should be extracted from raw data and then the library translates that description into a Spark job for computing the features. Finally, we are also looking into supporting Petastorm [13] as a data format for training datasets. By storing training datasets in Petastorm we can feed Parquet data directly into machine learning models in an efficient manner. We consider Petastorm as a potential replacement for tfrecords, that can make it easier to re-use training datasets for other ML-frameworks than Tensorflow, such as PyTorch.

Summary

Building successful AI systems is hard. At Logical Clocks we have observed that our users spend a lot of effort on the Data Engineering phase of machine learning. From the release of version 0.8.0, Hopsworks provides the world’s first open-source feature store. A feature store is a data management layer for machine learning that allows Data Scientists and Data Engineers to share and discover features, better understand features over time, and effectivize the machine learning workflow.

References

[1] Hidden Technical Debt in Machine Learning Systems: https://papers.nips.cc/paper/5656-hidden-technical-debt-in-machine-learning-systems.pdf

[2] Scaling Machine Learning at Uber with Michelangelo: https://eng.uber.com/scaling-michelangelo/

[3] Machine Learning: The High-Interest Credit Card of Technical Debt: https://static.googleusercontent.com/media/research.google.com/sv//pubs/archive/43146.pdf

[4] Scaling Machine Learning as a Service (Uber): http://proceedings.mlr.press/v67/li17a/li17a.pdf

[5] Zipline: Airbnb’s Machine Learning Data Management Platform: https://databricks.com/session/zipline-airbnbs-machine-learning-data-management-platform

[6] Operationalizing Machine Learning—Managing Provenance from Raw Data to Predictions: https://databricks.com/session/operationalizing-machine-learning-managing-provenance-from-raw-data-to-predictions

[7] Building a Feature Platform to Scale Machine Learning | DataEngConf BCN '18: https://www.youtube.com/watch?v=0iCXY6VnpCc

[8] HopsML, Python-First ML Pipelines: https://hops.readthedocs.io/en/latest/hopsml/hopsML.html

[9] Fixing the Small Files Problem in HDFS: https://www.logicalclocks.com/fixing-the-small-files-problem-in-hdfs/

[10] Airflow: https://airflow.apache.org/

[11] Meet Michelangelo: Uber’s Machine Learning Platform: https://eng.uber.com/michelangelo/

[12] Introducing Hopsworks: https://www.logicalclocks.com/introducing-hopsworks/

[13] Petastorm: https://github.com/uber/petastorm

[14] Deep learning scaling is predictable, empirically: https://blog.acolyer.org/2018/03/28/deep-learning-scaling-is-predictable-empirically/

[15] Feature Store at LinkedIn https://www.thestrangeloop.com/2018/democratizing-ai—back-fitting-end-to-end-machine-learning-at-linkedin-scale.html