Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

How to Engineer and Use Features in Azure ML Studio with the Hopsworks Feature Store

>
2/25/2021
>
Moritz Meister

TLDR: The Hopsworks Feature Store is an open platform that connects to the largest number of data stores, and data science platforms with the most comprehensive API support - Python, Spark (Python, Java/Scala). It supports Azure ML Studio Notebooks or Designer for feature engineering and as your data science platform. You can design and ingest features and you can browse existing features,  along with creating training datasets as either DataFrames or as files on Azure Blob storage.

Introduction

Azure ML is becoming an increasingly popular data science platform to train, deploy, and manage machine learning models. Azure ML also supports AutoML, and enables you to automate and run pipelines at scale. 

The Hopsworks Feature Store is the leading open-source feature store for machine learning. It is provided as a managed service on Azure (and AWS) and it includes all the tools needed to store, retrieve and track features that will be used when training and serving ML models. The Hopsworks Feature Store integrates with many cloud platforms  and storage services, one of which is Azure ML Studio.

In this blog post, we show how you can connect to the Hopsworks Feature Store from Azure ML, how to ingest features from a Pandas dataframe, combine different features in order to create a training dataset, and finally save that training dataset to Azure Blob storage, from where the data can be read in Azure ML to train and validate a machine learning model.

Prerequisites

In order to follow this tutorial, you need:

  1. Hopsworks Feature Store running on https://hopsworks.ai. You can register for free with no credit-card and receive $4000 USD of credits to get started. You can deploy a feature store in either your own Azure account or even in an AWS account.
  2. Users should also have an existing ML Studio notebook with an attached compute cluster. If you don't have an existing notebook, you can create one by following the Azure ML documentation.
  3. If you want to follow this tutorial with the same data, make sure to upload these files to your ML Studio environment.
  4. A project created within Hopsworks. If you don’t have one yet, you can simply follow  the Feature Store tour that creates a sample project for you.

Step 1: Configure a Hopsworks API Key

Connecting to the Feature Store from Azure ML requires setting up a Feature Store API key for authentication. In Hopsworks, click on your username in the top-right corner (1) and select Settings to open the user settings. Select API keys. (2) Give the key a name and select the job, featurestore, dataset.create and project scopes before (3) creating the key. Copy the key into your clipboard for the next step.

Step 2: Connect from an Azure Machine Learning Notebook

To access the Feature Store from Azure Machine Learning, open a Python notebook and proceed with the following steps to install the Hopsworks Feature Store client called HSFS:

-- CODE language-bash -- !pip install hsfs[hive]

Note that we are installing the latest version at the time of writing this (2.1.4) - you should always install the latest minor version that corresponds to the version of your Hopsworks Feature Store. So in this case our Hopsworks instance is running version 2.1.

Furthermore, for Python clients (such as Azure ML), it is important to install HSFS with the `[hive]` optional extra. Spark clients do not need this.

After successfully installing HSFS, you should be able to connect to the Feature Store from your Azure ML  notebook (note: you might need to restart the kernel, if you had HSFS previously installed):

-- CODE language-bash -- import hsfs connection = hsfs.connection(host="[UUID].cloud.hopsworks.ai", project="[project-name]", engine="hive", api_key_value="[api-key]") fs = connection.get_feature_store()

Make sure to replace the [UUID] with the one of the DNS of your Hopsworks instance, the [project-name] with the Hopsworks project that contains your feature store. And the [api-key] with the key created in Step 1. Please note that it’s not good practice to store the Api Key in your notebook- instead you should store the key safely in a permissions protected file and use the “api_key_file” argument to pass the filename to the connection method.

Once you are connected you can get a handle to the feature store with `connection.get_feature_store()`. If the project you have connected to also contains a shared feature store (it is possible to have a feature store from another project shared with the project you are using), you can also get a handle on the shared feature store using the connection object.

Step 3: Ingest data from a Pandas dataframe to the Feature Store

You can simply upload some data in your favourite file format to the Azure ML workspace or you configure a Hopsworks Storage Connector to cloud storage or a database. The Storage Connector safely stores endpoints and credentials to external stores or databases, making it easier for Data Scientists to retrieve data from them.  

If you opted to upload the data as CSV files, as shown below, simply read it into a pandas dataframe:

-- CODE language-bash -- import pandas as pd import numpy as np sales_csv = pd.read_csv("sales data-set.csv") stores_csv = pd.read_csv("stores data-set.csv")

Now, we can perform some feature engineering based on the pandas dataframe. We would like to predict the weekly sales of a department, so let’s create our target feature by selecting the last week available for each department:We can create this as a feature group, also containing the `is_holiday` feature, since, this information will be available at prediction time, there is no risk of data leakage.
We can create this as a feature group, also containing the `is_holiday` feature, since, this information will be available at prediction time, there is no risk of data leakage.

-- CODE language-bash -- sales_csv["date"] = pd.to_datetime(sales_csv["date"]) sales_csv.sort_values(["store", "dept", "date"], inplace=True) target_df = sales_csv.groupby(["store", "dept"]).last().reset_index()

-- CODE language-bash -- target_df


We can create this as a feature group, also containing the `is_holiday` feature, since, this information will be available at prediction time, there is no risk of data leakage.

-- CODE language-bash -- fg_target = fs.create_feature_group("weekly_sales_target", version=1, description="containing the latest weekly sales of each store/department", primary_key=["store", "dept"], time_travel_format=None) fg_target.save(target_df)


By clicking the hyperlink in the logs underneath the notebook cell, you can follow the progress of your ingestion job in Hopsworks.


Let’s now create a few simple features based on the historical sales of each department:

-- CODE language-bash -- df = pd.merge(sales_csv, target_df[["store", "dept", "date"]], on=["store", "dept"], how="left") hist_df = df[df["date_x"] != df["date_y"]] hist_df["holiday_flag"] = df['is_holiday'].apply(lambda x: 1 if x else 0) hist_df["non_holiday_flag"] = df['is_holiday'].apply(lambda x: 0 if x else 1) hist_df["holiday_week_sales"] = hist_df["holiday_flag"] * hist_df["weekly_sales"] hist_df["non_holiday_week_sales"] = hist_df["non_holiday_flag"] * \ hist_df["weekly_sales"] total_features = hist_df.groupby(["store", "dept"]).agg( {"weekly_sales": [sum, np.mean], "date_x": pd.Series.nunique, "holiday_week_sales": sum, "non_holiday_week_sales": sum}) total_features.columns = ['_'.join(col).strip() for col in total_features.columns.values] total_features.reset_index(inplace=True)

And again, we finish by creating a feature group with this dataframe and saving it to the feature store:

-- CODE language-bash -- weekly_sales_total = fs.create_feature_group("weekly_sales_total", version=1, description="containing the total historical sales and weekly average of each store/department", primary_key=["store", "dept"], time_travel_format=None) weekly_sales_total.save(total_features)


Note: If you have existing feature engineering notebooks that you would like to reuse with the Hopsworks Feature Store, it should be enough to simply add the two calls (create the Feature Group, and save the dataframe to it) in order to ingest your features to the Feature Store. No other changes are required in your existing programs and you can still use your favourite Python libraries for feature engineering.

With these two feature groups we can move to the next step to create a training dataset. Since we did not disable statistics computation, you can head to the Hopsworks Feature Store and inspect the pre-computed statistics over the newly created feature groups.

Step 4: Create a training dataset in your favorite file format using the Feature Store

HSFS comes with an expressive Join API and Query Planner that allows users to join, filter and explore feature groups in order to create training datasets.

Assuming, you start with a new Jupyter Notebook, the first commands you need to run are to get handles to the previously created feature groups:

-- CODE language-bash -- target_fg = fs.get_feature_group("weekly_sales_target", version=1) sales_fg = fs.get_feature_group("weekly_sales_total", version=1)

Note that we explicitly supply the (schema) version for the feature group (version=1), so that other developers can update the feature groups safely in higher numbered versions of the feature group.

With our two feature group objects, we would like to join the target feature with our historical features, but only select the departments for our training dataset that have a full history of 142 weeks available:

-- CODE language-bash -- td_query = target_fg.select(["weekly_sales", "is_holiday"]) \ .join(sales_fg.filter(sales_fg.date_x_nunique == 142)) td_query.show(5)

As you can see, feature group joins work similarly to pandas dataframe joins. In this case we can omit the join-key since both feature groups have the same primary key, however, for more advanced joins there is always the possibility to specify the join key from each group as well as the join type (left, inner, right, outer, etc) manually.

Hopsworks Feature Store supports a variety of storage connectors to materialize your training dataset to different cloud storage systems. If you have previously configured an Azure Data Lake Storage connector, you can now use it as the destination for your training dataset:

-- CODE language-bash -- storage = fs.get_storage_connector(“azure-blob”, "ADLS")

Similar to feature groups, you can now create the training dataset in your favourite file format, matching the machine learning library you are planning to use - for example, choose ‘tfrecord’ for TensorFlow. The Feature Store will make sure to track all metadata related to your training dataset, even if the training dataset is created outside of Hopsworks.

-- CODE language-bash -- td = fs.create_training_dataset("weekly_sales_model", version=1, data_format="tfrecord", splits={"train": 0.8, "test": 0.2}, seed=12, label=["weekly_sales"], storage_connector=storage) td.save(td_query)

To retrieve the training dataset in your training environment you can simply get a handle to the dataset and its location, to pass it subsequently to your reader utilities:

-- CODE language-bash -- td = fs.get_training_dataset("weekly_sales_model", version=1) td.location

Get Started

This tutorial is available as a Jupyter Notebook in our GitHub repository. For more information, visit documentation.

How to transform Amazon Redshift data into features with Hopsworks Feature Store

>
2/9/2021
>
Ermias Gebremeskel

TLDR: Hopsworks Feature Store provides an open ecosystem that connects to the largest number of data storage, data pipelines, and data science platforms. You can connect the Feature Store to Amazon Redshift to transform your data into features to train models and make predictions.

Introduction

Amazon Redshift is a popular managed data warehouse on AWS. Companies use Redshift to store structured data for traditional data analytics. This makes Redshift a popular source of raw data for computing features for training machine learning models.

The Hopsworks Feature Store is the leading open-source feature store for machine learning. It is provided as a managed service on AWS and it includes all the tools needed to transform data in Redshift into features that will be used when training and serving ML models.

In this blog post, we show how you can configure the Redshift storage connector to ingest data, transform that data into features (feature engineering) and save the pre-computed features in the Hopsworks Feature Store.

Prerequisites

To follow this tutorial users should have an Hopsworks Feature Store instance running on https://hopsworks.ai. You can register for free with no credit-card and receive 4000 USD of credits to get started.

Users should also have an existing Redshift cluster. If you don't have an existing cluster, you can create one by following the AWS Documentation 

Step 1 - Configure the Redshift Storage Connector

The first step to be able to ingest Redshift data into the feature store is to configure a storage connector. The Redshift connector requires you to specify the following properties. Most of them are available in the properties area of your cluster in the Redshift UI.

  • Cluster identifier: The name of the cluster
  • Database driver: You can use the default JDBC Redshift Driver `com.amazon.redshift.jdbc42.Driver` (More on this later)
  • Database endpoint: The endpoint for the database. Should be in the format of `[UUID].eu-west-1.redshift.amazonaws.com`
  • Database name: The name of the database to query
  • Database port: The port of the cluster. Defaults to 5349

There are two options available for authenticating with the Redshift cluster. The first option is to configure a username and a password. The password is stored in the secret store and made available to all the members of the project.

The second option is to configure an IAM role. With IAM roles, Jobs or notebooks launched on Hopsworks  do not need to explicitly authenticate with Redshift, as the HSFS library will transparently use the IAM role to acquire a temporary credential to authenticate the specified user. 

In Hopsworks, there are two different ways to configure an IAM role: a per-cluster IAM role or a federated IAM role (role chaining). For the per-cluster IAM role, you select an instance profile for your Hopsworks cluster when launching it in hopsworks.ai, and all jobs or notebooks will be run with the selected IAM role.  For the federated IAM role, you create a head IAM role for the cluster that enables Hopsworks to assume a potentially different IAM role in each project. You can even restrict it so that only certain roles within a project (like a data owner) can assume a given role. 

With regards to the database driver, the library to interact with Redshift *is not* included in Hopsworks - you need to upload the driver yourself. First, you need to download the library from here. Select the driver version without the AWS SDK. You then upload the driver files to the “Resources” dataset in your project, see the screenshot below.

Upload the Redshift Driver (jar file) from your local machine to the Resources dataset.

Then, you add the file to your notebook or job before launching it, as shown in the screenshots below.

Before starting the JupyterLab server, add the Redshift driver jar file, so that it becomes available to jobs run in the notebook.

Step 2- Define an external (on-demand) Feature Group

Hopsworks supports the creation of (a) cached feature groups and (b) external (on-demand) feature groups. For cached feature groups, the features are stored in Hopsworks feature store. For external feature groups, only metadata for features is stored in the feature store - not the actual feature data which is read from the external database/object-store. When the external feature group is accessed from a Spark or Python job, the feature data is read on-demand using a connector from the external store. On AWS, Hopsworks supports the creation of external feature groups from a large number of data stores, including Redshift, RDS, Snowflake, S3, and any JDBC-enabled source. 

In this example, we will define an external feature group for a table in Redshift. External feature groups in Hopsworks support “provenance” in the Hopsworks Web UI, you can track which features are stored on which external systems and how they are computed. Additionally HSFS (the Python/Scala library used to interact with the feature store) provides the same APIs for external feature groups as for cached feature groups.

An external (on-demand) feature group can be defined as follow:

-- CODE language-bash -- # We named the storage connector defined in step 1 telco_redshift_cluster redshift_conn = fs.get_storage_connector("telco_redshift_cluster") telco_on_dmd = fs.create_on_demand_feature_group(name="telco_redshift", version=1, query="select * from telco", description="On-demand feature group for telecom customer data", storage_connector=redshift_conn, statistics_config=True) telco_on_dmd.save()

When running `save()` the metadata is stored in the feature store and statistics for the data are computed and made available through the Hopsworks UI. Statistics helps data scientists in the quest of building better features from raw data.

Step 3 -  Engineer features and save to the Feature Store

On-demand feature groups can be used directly as a source for creating training datasets. This is often the case if a company is migrating to Hopsworks and there are already feature engineering pipelines in production writing data to Redshift.

This flexibility provided by Hopsworks allows users to hit the ground running from day 1, without having to rewrite their pipelines to take advantage of the benefits the Hopsworks feature store provides.

-- CODE language-bash -- telco_on_dmd\ .select(['customer_id', 'internet_service', 'phone_service', 'total_charges', 'churn'])\

On-demand feature groups can also be joined with cached feature groups in Hopsworks to create training datasets. This helper guide  explains in detail how the HSFS joining APIs work and how they can be used to create training datasets.

If, however, Redshift contains raw data that needs to be feature engineered, you can retrieve a Spark DataFrame backed by the Redshift table using the HSFS API.

-- CODE language-bash -- spark_df = telco_on_dmd.read()

You can then transform your `spark_df` into features using feature engineering libraries. It is also possible to retrieve the dataframe as a Pandas dataframe and perform the feature engineering steps in Python code. 

You can then save the final dataframe containing the engineered features to the feature store as a cached feature group:

-- CODE language-bash -- telco_fg = fs.create_feature_group(name="telco_customer_features", version=1, description="Telecom customer features", online_enabled=True, time_travel_format="HUDI", primary_key=["customer_id"], statistics_config=True) spark_df = telco_on_dmd.read()

Storing feature groups as cached feature groups within Hopsworks provides several benefits over on-demand feature groups. First it allows users to leverage Hudi for incremental ingestion (with ACID properties, ensuring the integrity of the feature group) and time travel capabilities. As new data is ingested, new commits are tracked by Hopsworks allowing users to see what has changed over time. On each commit, statistics are computed and tracked in Hopsworks, allowing users to understand how the data has changed over time.

Cached feature groups can also be stored in the online feature store (`online_enabled=True`), thus enabling low latency access to the features using the online feature store API.

Get started

Get started today on Hopsworks.ai by configuring your Redshift storage connector and run this notebook.

Hopsworks 2.0: The Next Generation Platform for Data-Intensive AI with a Feature Store

>
11/17/2020
>
Theofilos Kakantousis

TLDR; Hopsworks 2.0 is now generally available. Hopsworks is the world's first Enterprise Feature Store along with an advanced end-to-end ML platform. This version includes improvements to the User Interface, client APIs and improved integration with Kubernetes. Hopsworks 2.0 also comes with upgrades for popular ML frameworks such as TensorFlow 2.3 and PyTorch 1.6. 

The Hopsworks 2.0 platform:

In addition to bug and documentation fixes, Hopsworks 2.0 includes the following new features and improvements:

  • Feature Store: New Rich APIs and Extended Integration with External Data Sources
  • Provenance: Track and Navigate from Models to Training Datasets to Features
  • Python Programs as Schedulable Feature Engineering Jobs
  • Fine-grained User Access Control for Feature Stores
  • GitLab Support 
  • New example programs for feature engineering and usage for training and serving

Detailed release notes are available at the Hopsworks GitHub repository.

Feature Store: New Rich APIs and Extended Integration with External Data Sources

The Feature Store API is now more intuitive by following a Pandas-like style programming style for feature engineering and works with abstractions such as feature groups and training datasets. Allowing for an improved workflow. Solving the biggest pain points of professional users working with Big Data and AI: being able to organize and discover data with as few steps as possible. An all new API documentation page is accessible here

This release brings the new ability to enrich Feature Groups abstractions by attaching metadata in the form of tags and then allowing free-text search on these, available across the feature store to all users.

More details on the upgraded feature store experience are outlined in the Hopsworks Feature Store API 2.0 blog post and the user guide.

Provenance: Track and Navigate from Models to Training Datasets to Features

Hopsworks is built from the ground-up with support for implicit provenance. What that means is data engineers and data scientists that develop machine learning pipelines, from data ingestion, to feature engineering, to model training and serving, get lineage information for free as the data flows and is transformed through the pipeline. Users can then navigate from a model served in production back to the ML experiment that created the model along with its training dataset and finally all the way back to the feature groups and ingestion pipelines the training dataset was created from.  

Python Programs as Schedulable Feature Engineering Jobs

Up till this moment in Hopsworks’ journey, its Jobs service provided users with the means to run and monitor applications and do feature engineering using popular distributed processing frameworks such as Apache Spark/PySpark and Apache Flink. Hopsworks now further increases its reach to Feature Store users by  enabling them to run such feature engineering jobs by using pure Python programs, detangling them for example from the need to run their code within a PySpark context/cluster. Developers can now do feature engineering with Pandas or their library of choice and still benefit from Jobs capabilities such as include them in Airflow data and ML pipelines as well as run Python Jupyter notebooks as jobs, simplifying the process of including notebooks in development and production pipelines. User guide is available here.

Fine-Grained User Access Control for Feature Stores

Sharing datasets across projects of a Hopsworks instance is now made simpler by using a fine-grained access control mechanism. Users can now select from a list of options that enable a dataset to be shared with different permissions for each project and for each role users have in the target project (Data Owner, Data Scientist). In addition, these new dataset sharing semantics now make it easier to share feature stores with different permissions across projects. For more details please check the user guide here.

GitLab Support

Hopsworks developers can now use GitLab for code version control, in addition to the existing GitHub support. JupyterLab in Hopsworks comes shipped with the Git plugin so developers can work with Git directly from within the JupyterLab IDE. Further details are available in the user-guide here

Python Environment Updates and Examples

Hopsworks 2.0 is shipped with the latest and the greatest frameworks of the data engineering and data science Python world. Every Hopsworks instance now comes pre-installed with Pandas version 1.1.2, TensorFlow 2.3.1, PyTorch 1.6.0 and many more. 

A plethora of new feature engineering and machine learning examples are now available at our examples GitHub repository. Data engineers can now follow examples that demonstrate how to work from the Feature Store with Redshift  or Snowflake. Also new end-to-end and parallel experiments with deep learning and TensorFlow 2.3 are available.

Get Started

Much more is to come from Hopsworks with the upcoming releases so make sure to follow us on Twitter and, if you like it, star us on GitHub.

Visit http://hopsworks.ai to get started with a free instance of Hopsworks and explore the new Hopsworks Feature Store and data science support of the Hopsworks 2.0 series and don’t forget to join our growing community!

Hopsworks: World’s Only Cloud Native Feature Store, now Available on AWS and Azure.

>
11/17/2020
>
Steffen Grohsschmiedt

TLDR; Hopsworks is now available as a managed platform for Amazon Web Services (AWS) and Microsoft Azure with a comprehensive free tier. Hopsworks is the only currently available cloud-native Enterprise Feature Store and it also includes a Data Science platform for developing and operating machine learning models in production.

The Hopsworks managed platform:

  • Support for AWS and Microsoft Azure;
  • The Feature Store for Databricks (AWS/Azure), SageMaker, Kubeflow, and more;
  • Pay-per-use Compute: Engineer Features at Scale (Spark, Python, Flink);
  • Pay-per-use Compute for Training Machine Learning Models (including GPU support);
  • Scale Storage Independently - data stored on Amazon S3 and Azure Blob Storage;
  • Integrates with Amazon EKS/ECR for Python Jobs, Notebooks, and Model Serving;
  • CI/CD ML Pipelines using Airflow, including support for Jupyter-notebooks-as-jobs;
  • Federated IAM Roles Made Easy;
  • Managed Backups and Upgrades;
  • Active Directory/LDAP Integration;
  • Organization and User Management on Hopsworks.

Start Using Hopsworks for Free

You can get started with Hopsworks for free today. We offer free Hops Credits for you to start engineering and managing your features. You have access to all functionality of the Hopsworks Feature Store alongside all main features of Hopsworks platform, including compute elasticity (add/remove workers when needed), storage on S3/Azure Blob Storage, EKS, ECR and GPUs, as well as integrations with external Data Science and Data Lake/Warehouse platforms. Get started now!

Hopsworks 2.0

Hopsworks is the world's first Enterprise Feature Store for machine learning, including  an end-to-end ML platform for developing and operating AI applications. Hopsworks includes a new and improved Feature Store API, integration with Kubernetes, Sagemaker, Databricks, Redshift, Snowflake, Kubeflow, and many more. Hopsworks also comes with upgrades for popular ML frameworks, such as TensorFlow, PyTorch, and Maggy. 

All details about the 2.0 release can be found in the release blog post: Hopsworks 2.0: The Next Generation Platform for Data-Intensive AI with a Feature Store.

Support for AWS and Microsoft Azure

Hopsworks has a managed platform for AWS and we are pleased to now announce full support for Microsoft Azure. Hopsworks is now cloud-independent, allowing customers to seamlessly move from AWS to Azure or vice-versa, without the fear of being locked in to a single vendor. Enterprises can now manage features for training and serving models at scale on Hopsworks, while maintaining control of their data inside their organisation’s own cloud accounts. Check out how to get started on AWS or Microsoft Azure.

Feature Store for Databricks (AWS/Azure), SageMaker and Kubeflow

As Hopsworks is a modular platform with integrations to third party platforms, organisations can combine our Feature Store with their existing data stores and data science platforms. Hopsworks natively integrates with Databricks on both AWS and Azure, Amazon Sagemaker, Kubeflow and other ML platforms. Check out how to get started with Databricks on AWS and Azure, Amazon Sagemaker, Kubeflow and other Data and ML platforms.

Pay-per-use Compute: Engineer Features and Train Models at Scale

The general available version of Hopsworks adds support for pay-per-use compute, allowing you to add and remove compute nodes for feature engineering and machine learning jobs as needed to scale with your demand and minimize cost when not needed. We support different types of instances allowing you add or remove exactly the right amount of resources (CPU, GPU, memory) that you need to run your application. You can, for example, add GPU nodes to run a large-scale distributed training job while at the same time adding or removing non-GPU nodes for feature engineering workloads. Hopsworks supports running jobs on a wide variety of frameworks: Spark, Python, Flink, TensorFlow, PyTorch, Scikit-Learn, XGBoost, and more.

Pay-per-use Compute for Training Machine Learning Models (including GPU support)

Hopsworks on AWS added support for using GPU instances for running ML workloads, scaling to multiple nodes and GPUs. GPU nodes can be added or removed at any point in time to scale with your workload and safe cost when not needed. In addition, you can utilize EKS for GPU-accelerated workloads in notebooks and Python jobs.

Scale Storage Independently - Amazon S3 and Azure Blob Storage

The Feature Store stores its historical data on HopsFS (a next generation HDFS) that now uses both S3 and Azure Blob storage as its backing store, reducing costs and enabling storage to scale independently of the size of a Hopsworks cluster. As a customer, you only have to provide your bucket and an IAM role/Managed identity with sufficient rights when creating a cluster. All data is kept private and stored inside your cloud account. HopsFS provides both performance improvements (3.4X read throughput vs S3), POSIX-like semantics, and open metadata compared to S3.

Amazon EKS/ECR for Python Jobs, Notebooks, and Model Serving

Hopswoksi on AWS integrates with EKS and ECR allowing you to run applications as containers on EKS from Hopsworks. Some services in Hopsworks are offloaded to EKS, such as running Jupyter/JupyterLab notebooks and model serving workloads. Hopsworks users run applications on EKS that can securely callback and use services in Hopsworks. In particular, there is special support for Python programs, where Data Scientists can install python libraries in their Hopsworks project, and Hopsworks transparently builds the Docker image that is run as a Python job on Kubernetes. Even Jupyter notebooks on Hopsworks can be run and scheduled as jobs on Kubernetes, without users having to write and manage Dockerfiles. An additional benefit of EKS is that the cluster can be set up with autoscaling allowing your workloads to scale dynamically with demand. Check out Integration with Amazon EKS and Amazon ECR to get started with EKS for Hopsworks.

Feature Engineering and ML Pipelines

Hopsworks supports data-parallel feature engineering jobs in Spark, PySpark, and Flink, as well as pure Python feature engineering with Pandas, Feature Tools, and any Python library. Feature engineering pipelines typically end at the Feature Store, and can be orchestrated using Airflow on Hopsworks (or an external Airflow cluster). Hopsworks has additional support for data validation with Great Expectations and Deequ. After the Feature Store, ML pipelines create train/test datasets, train models, analyze, validate and deploy models. Hopsworks provides experiment tracking, Tensorboard, distributed ML (with open-source Maggy), model analysis with the What-if-Tool. Again, ML training pipelines can be orchestrated using Airflow on Hopsworks (or an external Airflow cluster).

Federated IAM Roles Made Easy

Enterprise Hopsworks now enables fine-grained access to AWS resources using AWS role chaining (federated IAM roles). Administrators can now define a mapping between AWS roles and project-specific user roles in Hopsworks. This enables both fine-grained access and auditing of users when they use AWS services from Hopsworks. Hopsworks users can either set a default role to be assumed by their jobs and notebooks to enable access to their AWS resources, or assume a role manually from within their programs by using the Python and Java/Scala SDKs (guide).

Managed Backups and Upgrades

The enterprise version of Hopsworks supports fully-automated backups and upgrades at the click of a button, making the operation of Hopsworks cluster a non-issue for your DevOps team.

Active Directory/LDAP Integration

The enterprise version of Hopsworks supports single sign-on to Hopsworks clusters with Active Directory/LDAP integration. This enables your enterprise to manage access to the Feature Store and Hopsworks with your existing enterprise Identity Provider. Hopsworks also supports OAuth-2 as an Identity Provider.

Organizations and User Management

Hopsworks supports billing organizations and user management, enabling your teams to share access to your Hopsworks account.

Coming soon

We are continuously working on improving the Hopsworks experience. Some of the upcoming features are:

  • Dynamic auto scaling of compute nodes
  • Support for AWS Spot Instances/Azure Spot Virtual Machines
  • Support for Azure Kubernetes Service (AKS)
  • Support for GPUs on Azure

Feature Store for MLOps? Feature reuse means JOIN

>
10/23/2020
>
Jim Dowling

TLDR; Many engineers conflate operational Feature Stores with key-value (KV) stores, like DynamoDB and Cassandra. In fact, KV stores are missing a key mechanism needed to make features reusable: JOINs. JOINs enable features to be reused by different models. This blog is about how to scale your ML infrastructure by reusing cached features so that the number of feature pipelines you manage does not grow linearly with the number of models you run in production.

Img from OSDC talk by Twitter. Twitter evaluate their Feature Store by the number of features that are reused across teams.

The Cost of No JOINs: One Feature Pipeline per Model

If you don’t reuse features across different models, you will need a new feature pipeline for every new model you put in production. In the diagram below, there is a 1:1 mapping between train/test datasets and models. For every new model you put in production, you will write a new feature pipeline from your data stores that transforms and validates the raw data and performs aggregations, materializing train/test data to files. A ML training program (or pipeline) then trains and validates a model with the train/test data, after which it is tested and deployed to production.

It is very difficult to reuse the features in the materialized train/test datasets, as they tend to be stored in a format specific to a ML framework: .tfrecord for TensorFlow,.npy for PyTorch - and they cannot be easily combined with features stored in other train/test datasets. This is often due to the limitations of the file formats: neither TFRecord nor NPY support projections (selecting just a subset of columns).

If you intend to run hundreds of models in production, running hundreds of pipelines will explode your technical debt.


In online feature stores that do not support JOINs, it is typically the responsibility of the application to perform the JOIN of the cached features to create the feature vector. So, if you build your own online feature store using Cassandra or DynamoDB, you will need to also add logic for joining (and ordering) features in your applications/models. 

Every new feature you add to that model will need an update to the feature pipeline and changes to the application logic - making it costly and potentially cross-team work.

JOINs enable Feature Reuse


A JOIN is a Structured Query Language (SQL) command that combines data from two different two database tables. JOINs are used to create a view over the data that originates from one or more tables. If we assume that we store features normalized in database tables, then we are able to create sets of features (training datasets) by selecting different features from different tables and joining them together using a common join key. The join key(s) identify the common entity these feature values represent. 

If we now assume that features are stored as columns in tables, then what is a train/test dataset? It is a set of features along with a target column. Assuming the features are already present in existing tables, we can just join those features (columns) together to create a view - this view is the train/test dataset. The order of features (in a view) that makes up a train/test dataset is significant - a train/test dataset is a permutations of features - not a combination. Views (enabled by JOINs) enable a massive number of train/test datasets to be defined over a small number of shared features (stored in tables)

In Hopsworks, we store features in tables that we call “Feature Groups”. Feature Groups introduce a level of indirection between the raw input data and the train/test datasets used to train models, see below. They store a cached copy of the features, computed using the same feature pipeline from earlier, but this time, the feature pipeline writes to one or both of the stores that much a feature store: (1) a scalable store for train/test features (offline feature store) and (2) a low latency, high throughput store for features for serving (online feature store). 

In Hopsworks, we use Apache Hive as a scalable database for the offline store, and MySQL Cluster (NDB) as the online store. Our version of Hive stores its metadata in NDB and its data files in HopsFS/S3. In Hopsworks, FeatureGroups are database tables in Hive and MySQL Cluster, along with feature metadata (also tables in the same NDB database). 

As we can see in the diagram below, if we have N features available in the Feature Store, we can create an unlimited number of train/test datasets by simply joining features together from the offline feature store. 

In Hopsworks, we use Spark, with its cost-based optimizer, to perform JOINs. Spark, together with Parquet/Hudi/ORC help optimize joins by supporting partitioned data, push-down projections, SortMergeJoin, and 

  • a hint;
  • a join type (inner, left, outer, etc);
  • a join condition (equi-join);
  • an estimation of the input data size

JOINs in Online Feature Stores

If you use a KV store as your online store, you are back in the same situation as at start - you need a feature pipeline for every new model you put in production. But with Feature Groups (tables in NDB), we can materialize feature vectors (the individual row of features that is fed directly to the model for scoring) from the different tables by performing a join.

NDB supports sophisticated optimizations for JOINs, and can push-down many JOINs to the database nodes. In practice, in NDB even complex queries can “achieve latency down to 5-10 ms for a transaction that contains around 30 SQL statements”.

Conclusions

Reuse features to save on infrastructure and the number of feature pipelines needed to maintain models in production. JOINs is the method we use in Hopsworks to reuse cached features across different models - both for training and serving. We use Spark as the JOIN engine for the offline feature store and NDB with its parallelized, push-down JOINs for low-latency joins in the online feature store.

Manage your own Feature Store on Kubeflow with Hopsworks

>
6/15/2020
>
Jim Dowling

Feature stores are key components in enterprises’ machine learning/artificial intelligence architectures. In previous blog posts, we introduced the feature store, MLOps with a feature store, and how to use the Hopsworks Feature Store in Databricks and AWS Sagemaker. In this blog post, we focus on how to integrate Kubeflow with the Hopsworks Feature Store. Hopsworks is available as an open-source platform, but integrations with external platforms, like Kubeflow, are only available on either our SaaS (www.hopsworks.ai) or Enterprise versions.

Hopsworks is a modular platform that includes a feature store, a compute engine ((Py)Spark, Python, Flink), a data science studio, a file system (HopsFS/S3), and model serving/monitoring support. The Hopsworks Feature Store can be used as a standalone feature store by Kubeflow.  As Kubernetes has limited support for Spark (Spark-on-k8s still has problems with shuffle, as of June 2020), Hopsworks is often used for both its feature store and Spark and scale-out deep learning capabilities. 

Hopsworks offers a centralized platform to manage, govern, discover, and use features. Features can be used:

  • at scale to create train/test datasets, 
  • for model scoring in analytical (batch application) models, 
  • at low latency by operational models to enrich feature vectors.

Get Started

Before you begin, make sure you have started a Hopsworks cluster. If you are on AWS, we recommend using our managed platform Hopsworks.ai. If you are on GCP, Azure or on-premises, as of June 2020, you have to use the hopsworks-installer script to install Hopsworks.

Hopsworks should be installed so that Kubeflow has access to the private IPs of the feature store services: Hive Server 2 and the Hopsworks REST API endpoint. On GCP, this means you should install your Hopsworks cluster in the same Region/Zone as your Kubeflow cluster. Similarly, on AWS, your Hopsworks cluster should be installed in the same VPC/subnet as Kubeflow. And on Azure, your Hopsworks cluster should be installed in the same resource group as your Kubeflow cluster.

API Key

From a Jupyter notebook in Kubeflow, you need to be able to authenticate and interact with the Hopsworks Feature Store. As such you need to get an API key from Hopsworks. You can generate an API key by clicking on your username in the top right of the window, click on Settings and select API KEY.


You need to choose the featurestore, jobs, and project scopes when creating your API key. You should save your API key to a file, with the path API_KEY_FILE, that will be accessible from your Jupyter notebook in Kubeflow. 

Hopsworks-cloud-sdk

With the API key configured correctly, in your KubeFlow Jupyter notebook, you should be able to install the hopsworks-cloud-sdk library (https://pypi.org/project/hopsworks-cloud-sdk/) using PIP:

>>> !pip install hopsworks-cloud-sdk ~= 1.3

Make sure that the hopsworks-cloud-sdk library version matches the installed version of Hopsworks.

Establish the first connection


From your Jupyter notebook, the API Key should now be readable from the local file system at the path API_KEY_FILE, and the hopsworks-cloud-sdk library should be installed. You should be now able to establish a connection to the feature store, and start using the Hopsworks - Kubeflow integration with this connect call:

import hops.featurestore as fs
fs.connect(
'my_instance',   # Private IP/DNS of your Feature Store instance
'my_project',   # Name of your Hopsworks Feature Store project
secrets_store=file,
api_key_file=API_KEY_FILE
)

Upcoming improvements

Several exciting improvements are coming to the Hopsworks feature store APIs in the next couple of weeks. The most important one is a more expressive API for joining features together. The new API is heavily inspired by Pandas dataframe joining and should make life easier for data scientists. Moreover, we are adding the capability to register a small Pandas dataframe as a feature group directly from a python-kernel in a Jupyter notebook. It will also be possible to ingest Pandas dataframes as feature groups without the need for PySpark.

Learn more with our demo!


Follow us on Twitter

Like us on Github

How to Build your own Feature Store

>
5/26/2020
>
Jim Dowling

As of May 2020, Logical Clocks are the only vendor of a Feature Store for machine learning (ML) and the only maker of a fully open-source and cloud-native Feature Store for ML. As such, we have many conversations with companies and organizations who are deciding between building their own feature store and buying one. Given the increasing interest in building feature stores, we thought we would share our experience of building one and motivate some of the decisions and choices we took (and did not take) to help others who are considering following us down the same path. 

8 Benefits of a Feature Store for ML

Here we list some of the reasons for wanting a feature store for ML in the first place.

1. Consistent Feature Engineering for Training Data and Serving 

The feature store can eliminate the need for two different implementations for features: one when training a model and one when serving a model. With a feature store, you can have a single feature pipeline that computes the features and stores them into both an offline and online stores for use in both training models and serving models, respectively. The offline feature store needs to support large volumes of data for model training and for use by batch (analytical) applications for model scoring. The online store needs to support low latency access to features for models served in production.

2. Encourage Feature Reuse

Features should be reused between different models. In Twitter, they have a “sharing adoption” metric to evaluate the success of their feature store internally. The “sharing adoption” measures the number of teams that reuse in production models features created by other teams.

3. System support for Serving of Features 

Operational models often need low latency access to features that may be computationally complex or generated from historical data. These types of features are often difficult or impossible to compute inside the applications themselves, either because of the lack of available data or because of the excessive time required to compute the features. The feature store can solve this problem by acting as a low-latency store for precomputed features for operational models (used by online applications). 

4. Exploratory Data Analysis with a Feature Store

Data scientists can discover the available pre-computed features, the types of those features (numerical, categorical), their descriptive statistics , and the distribution of feature values. They can also view a small sample of feature values to help quickly identify any potential issues with using a given feature in a model.

5. Temporal Queries for Features

You would like to augment feature store queries with temporal logic. That is, you want to know the value of a given feature at:

  • an instant in time (for example, when joining features together from different feature groups);
  • a time interval - a length of time (for example, the last 3 months);
  • a time period: an anchored duration of time (for example, training data for years 2012-2018, test data for 2019). 

In relational databases, temporal queries are typically supported by a user-defined table that keeps a full history of data changes and allows easy point in time analysis, such as SQL Server. In scalable SQL systems, examples of columnar storage formats that support temporal queries are Apache Hudi and Delta Lake as well as streaming support in Apache Flink. These systems require increased storage (to store updates as well as the current values) for the ability to query the value of features at points in time, time intervals or time periods.

6. Security, Governance, and Tracking

The feature store is a central repository for an organization’s features enabling them to be access controlled, governed, and to have their usage tracked. The feature store also provides common standards for metadata, consistent documentation and coding standards for features. The repository can maintain popularity counts for features to show which ones are widely used which ones could potentially be removed, enabling better management of features.

7. Reproducibility for Training Datasets 

The feature store should enable the re-creation of training datasets for given points-in-time to enable the reproducibility of models. An alternative to recreating training datasets is to archive them, but for many industries, such as healthcare and insurance, they will need to be reproducible for regulatory reasons. The ability to recreate training datasets for models is also useful for debugging models, even when you are not required by law to keep the training dataset.

8. Data Drift for Model Serving

The feature store can compute and store statistics over training datasets and make those statistics available to model serving platforms via API calls. In operational models, the training data statistics can then be compared with statistics computed over time windows of live data (last minute, hour, day) sent to the model for predictions. Simple (or complex) statistical tests can identify data drift - when live feature values diverge significantly from the model’s training data. 

Feature Store Design Flow Chart

The feature store design flow chart below shows some of the decisions that need to be taken when deciding to roll your own feature store. This set of decisions is obviously not complete, and we omitted systems design issues that are commonly found in any scalable system, such as schema design, API design, language support, and platform support (on-premises, cloud-native).

The first decision point is whether you really want to go ahead and build a data platform to support your ML efforts, knowing the considerable time it will take (at least 6-12 months before you will have anything production-ready)  and the future costs of maintaining and updating your platform. If you decide that building a feature store is too much, then give us a shout - mention that Jim sent you and ask for the blog reader discount :)

If you still are determined to build one, you need to take your first big decision: is your feature store a library or a store (a materialized cache of your features)? If the only problem you want to solve is consistent features between training and serving, then the library approach may be suitable. In the library approach, you write your feature encoding functions in versioned libraries that are included in both training and serving pipelines. A downside to this is that both training and serving pipelines need to be implemented in the same (or compatible) programming languages. Another downside is that you may need to wait a long time to backfill training data, as you need to run a job to compute the features. Netflix’s early fact/feature store introduced shared, versioned feature encoders to ensure consistent feature engineering.

Your next decision is whether you want to reuse features across different models or not. Reusing features means you will need to join normalized features together to create training datasets and also when serving features. If you decide you do not want to reuse features, you will still be able to solve the problems of consistent feature engineering and system support for serving features. There are feature stores, like Condé Nest based on Cassandra, that have a single datastore used for storing training and serving features. 

The next decision is consequential if you are only working with analytical models and not online (operational) models, you might decide you only need a single (scalable) database to store your features. Assuming you are a typical user who needs the feature store for both training and serving models, you then need to decide if you need support for time-travel queries. Many online models with windowed features (how many times a user did ‘X’ in the last 15 minutes) often need time-travel support to ensure consistent features for feature serving and creating training data. Assuming you decide to add time-travel support, you now need to build on a system with, or add application support for, temporal queries.

Your next decision also depends on your choice of data stores for the offline (scalable SQL) and online (low latency feature serving) stores. In Hopsworks, uniquely, we have a unified metadata layer for our file system, HopsFS, our feature serving layer, MySQL Cluster, and our scalable SQL layer, Apache Hive (-on-Hops). We could easily add extended metadata to our unified metadata layer to describe features, their statistics, and tag them. A CDC (change-data-capture) API to our unified metadata layer enabled us to also index feature descriptions in Elasticsearch, which supports free-text search and searches do not affect the performance of the unified metadata service. The bad news if you do not have unified metadata for your dual databases is that you need to design and develop agreement protocols to ensure that the 3 different systems are kept in sync, presenting a consistent view of your features: your offline, online, and feature metadata platforms. This is a tough distributed systems engineering challenge - good luck!

Finally, you need to decide on a compute engine (maybe internal or external to your platform) for both joining features to create training datasets, but also to compute your features. You may decide on a domain-specific language (like Michelangelo or Zipline) or a more general purpose language or framework like Python (Feast) or Spark (Hopsworks).

Apart from all these issues, you also need to decide on whether you need a UI to discover and manage features (Hopsworks, Michelangelo, Twitter, Zipline) or not (Feast). You also need to decide on whether you need to support multiple feature stores (such as development, production, and sensitive feature stores), and access control to those different feature stores (as in Hopsworks).

Phew! Finally you have navigated some of the decisions that you need to make to tailor your feature store platform for your needs. Although we have not covered the problem of API design to your feature store, we can confide that most feature store platforms have gone through more than one generation of API (we were not immune to that, either). So, good luck, give us a call if you decide to buy rather than build.

Follow us on Twitter

Like us on Github

Hopsworks Feature Store for AWS SageMaker

>
5/18/2020
>
Fabio Buso

Feature stores are key components in enterprises’ machine learning/artificial intelligence architectures. In previous blog posts (Introduction to feature store, MLOps with a feature store, and Hopsworks Feature Store for Databricks) we focused on describing the key concepts and building blocks of the Hopsworks Feature Store. In this blog post we are going to focus on how to integrate AWS SageMaker with Hopsworks. Hopsworks is available on AWS as either a SaaS platform (www.hopsworks.ai) or as a custom Enterprise platform.

While Hopsworks provides all the tools to design and operate pipelines that go from raw data to serving models in production, it is also a modular platform. In particular, the Hopsworks Feature Store can be used as a standalone feature store by data science platforms, such as AWS SageMaker or Databricks. It offers AWS Sagemaker users a centralized platform to manage, discover, and use features - for both creating training datasets and for serving features to operational models. In this blog, we will cover how AWS Sagemaker users can, from within the comfort of their Jupyter notebook, perform exploratory data analysis with the feature store, discovering available features, and join features together to create train/test datasets - all from the comfort of your existing SageMaker notebook instance.

Exploratory Data Analysis with a Feature Store

Exploratory data analysis (EDA) is a key component of every data scientists’ job. The Hopsworks Feature Store provides data scientists with a repository of features ready to be used for training models. Data scientists can browse the available features, understand the features by inspecting their metadata, investigate pre-computed feature statistics, and preview sample feature values. These are typical steps a data scientist takes to determine if a feature is a good fit for a specific model. With the Hopsworks AWS SageMaker integration, data scientists can perform these steps in a Jupyter notebook by making feature store API calls in Python.

In Hopsworks, features are organized into groups of related features in what is called a Feature Group. Exploration usually starts at the feature group level, by listing all the available feature groups in the feature store:

>>> featurestore.get_featuregroups()
['games_features_1',
'games_features_on_demand_tour_1',
'games_features_hudi_tour_1',
'season_scores_features_1',
'attendances_features_1',
'players_features_1',
'teams_features_1',
'imported_feature_name_1',
'imported_feature_name_online_1']

The following step allows data scientists to understand which individual features are available in a given feature group, and it returns the first five rows (a data sample):

>>> df = featurestore.get_featuregroup("teams_features").head(5)


The above API call will send a request to the Hopsworks Feature Store and return the result to the user in a Pandas dataframe df.

Individual features are the building blocks of the Hopsworks feature store. From SageMaker, data scientists can join features together and visualize them. As joining features is performed in Spark and SageMaker only provides a Python kernel, the join is executed on the Hopsworks Feature Store and the result returned to the user in a Pandas dataframe df. The complexity of the request is hidden behind the API call.

>>> df = featurestore.get_features(
["team_budget", "average_attendance", "average_player_age"]
).head(5)


Statistics and data visualization help to give an understanding of the data. Hopsworks allow users to compute statistics such as the distribution of feature values, feature correlation within a feature group, and descriptive statistics (Min, Max, Averages, Counts) on the different features.

The statistics are shown in the Hopsworks Feature Store UI, but they are also available from a notebook in SageMaker:

>>> featurestore.visualize_featuregroup_correlations("players_features")

Generate train/test datasets

Once you have explored the feature store and identified which features you need for your model, you can create a training dataset (the train and test data you need to train and evaluate a model, respectively). A training dataset is a materialization of multiple features joined together, potentially coming from different feature groups. The joining of features together on-demand, enables data scientists to reuse the same features in many different training datasets. Once features have been joined together into a dataframe, they can be stored in a ML framework friendly file format on a storage platform of choice, such as S3. For example, if you are training a TensorFlow model, you may decide to store your training dataset in TensorFlow’s native TFRecord file format, in a bucket on S3, s3_bucket.

>>> featurestore.create_training_dataset(
    training_dataset = "team_position_prediction",
    features =  ["team_budget", "average_attendance", "average_player_age"],
    training_dataset_version = latest_version + 1,
    data_format=’tfrecords’,
    sink=s3_bucket
)

In the above example, the feature store joins the list of features together and saves the result in files in TFRecord format in a S3 bucket. The S3 bucket needs to be defined inside a connector in the Hopsworks Feature Store.  In practice, what happens is that the SageMaker notebook asks the Hopsworks Feature Store to start a Spark job to produce the training dataset. When the job has completed on Hopsworks, you’ll be able to use the training dataset, typically in a different notebook, to train your model.

Get Started

Before you begin, make sure you have started a Hopsworks cluster using our managed platform Hopsworks.ai. The Hopsworks - SageMaker integration is an enterprise only feature and Hopsworks.ai gives you access to it. The first time you use the Hopsworks - SageMaker integration, there are a few simple steps that you need to perform to configure your SageMaker environment.

API Key

From SageMaker you need to be able to authenticate and interact with the Hopsworks Feature Store. As such you need to get an API key from Hopsworks. You can generate an API key by clicking on your username in the top right of the window, click on Settings and select API KEY.


You need to choose the featurestore, jobs, and project scopes when creating your API key. You should upload the API key as a secret on the AWS Secrets Manager service. The Hopsworks SageMaker integration also supports reading the API key from the AWS Parameter Store or a local file. The documentation (https://hopsworks.readthedocs.io) covers the setup for all the cases.

To use the AWS Secrets Manager, you should first find the IAM Role of your SageMaker notebook - in this case it is AmazonSageMaker-ExecutionRole-20190511T072435.


Create a new secret called hopsworks/role/[MY_SAGEMAKER_ROLE] where the [MY_SAGEMAKER_ROLE] is the same name as the IAM Role you retrieved in the previous step. The key should be api-key and the value you should be the API Key you copied from Hopsworks in the first step.



Finally we need to give the IAM role of the SageMaker notebook permissions to read the secret we just created. In the AWS Management Console, go to IAM, select Roles and then the role that is used when creating SageMaker notebook instances. Select Add inline policy. Choose Secrets Manager as service, expand the Read access level and check GetSecretValue. Expand Resources and select Add ARN. Paste the ARN of the secret created in the previous step with the AWS Secrets Manager. Click on Review, give the policy a name und click on Create policy.

After this step, your Sagemaker notebook when run as the above IAM Role will have permission to read the Hopsworks API key from the Secrets Manager service.

Hopsworks-cloud-sdk

With the API key configured correctly, in your AWS Sagemaker Jupyter notebook, you should be able to install the hopsworks-cloud-sdk library (https://pypi.org/project/hopsworks-cloud-sdk/) using PIP:

>>> !pip install hopsworks-cloud-sdk ~= 1.2

Make sure that the hopsworks-cloud-sdk library version matches the installed version of Hopsworks.

Establish the first connection

With the API Key configured and the library installed, you should be now able to establish a connection to the feature store, and start using the Hopsworks - AWS SageMaker integration.

import hops.featurestore as fs
fs.connect(
'my_instance',                       # DNS of your Feature Store instance
'my_project',                     # Name of your Hopsworks Feature Store project
secrets_store = 'secretsmanager')   # Either parameterstore, secretsmanager, or file

Try it out now with Hopsworks.ai

You can now try out the Hopsworks Feature Store and the SageMaker integration by starting a Hopsworks instance on Hopsworks.ai and running this example Jupyter notebook on your SageMaker instance: https://github.com/logicalclocks/hops-examples/blob/master/notebooks/featurestore/aws/SageMakerFeaturestoreTourPython.ipynb.

The Hopsworks Community is also available if you need help with your setup.

Upcoming improvements

Several exciting improvements are coming to the Hopsworks feature store APIs in the next couple of weeks. The most important one is a more expressive API for joining features together. The new API is heavily inspired by Pandas dataframe joining and should make life easier for data scientists. Moreover, we are adding the capability to register a small Pandas dataframe as a feature group directly from a SageMaker notebook. While we still encourage you to use a Spark environment to engineer complex features with lots of data, it will also be possible to ingest Pandas dataframes as feature groups without the need for PySpark.

Learn More:


Hopsworks Feature Store for Databricks

>
4/23/2020
>
Fabio Buso


TLDR; Feature Stores have become the key piece of data infrastructure for machine learning platforms. They manage the whole lifecycle of features: from training different models to providing low-latency access to features by online-applications for model inference. This article introduces the Hopsworks Feature Store for Databricks, and how it can accelerate and govern your model development and operations on Databricks.

What is a Feature Store?

The Feature Store for machine learning is a feature computation and storage service that enables features to be registered, discovered, and used both as part of ML pipelines as well as by online applications for model inferencing. Feature Stores are typically required to store both large volumes of feature data and provide low latency access to features for online applications. As such, they are typically implemented as a dual-database system: a low latency online feature store (typically a key-value store or real-time database) and a scale-out SQL database to store large volumes of feature data for training and batch applications. The online feature store enables online applications to enrich feature vectors with near real-time feature data before performing inference requests.  The offline feature store can store large volumes of feature data that is used to create train/test data for model development or by batch applications for model scoring. The Feature Store solves the following problems in ML pipelines: 

  • reuse of feature pipelines by sharing features between teams/projects;
  • enables the serving of features at scale and with low latency for online applications;
  • ensures the consistency of features between training and serving - features are engineered once and can be cached in both the Online and Offline Feature Stores;
  • ensures point-in-time correctness for features - when a prediction was made and an outcome arrives later, we need to be able to query the values of different features at a given point in time in the past. 


The Feature Store for ML consists of both an Online and Offline database and Databricks can be used to transform raw data from backend systems into engineered features cached in the online and offline stores. Those features are made available to online and batch applications for inferencing and for creating train/test data for model training.

Engineer Features in Databricks, publish to the Feature Store

The process for ingesting and featurizing new data is separate from the process for training models using features that come from potentially many different sources. That is, there are often differences in the cadence for feature engineering compared to the cadence for model training. Some features may be updated every few seconds, while others are updated every few months. Models, on the other hand, can be trained on demand, regularly (every day or every week, for example), or when monitoring shows a model’s performance has degraded. Feature engineering pipelines are typically triggered at regular intervals when new data arrives or on-demand when source code is pushed to git because changes were made in how features are engineered.


Feature pipelines have a natural cadence for each data source, and the cached features can be reused by many downstream model training pipelines. Feature Pipelines can be developed in Spark or Pandas applications that are run on Databricks. They can be combined with data validation libraries like Deequ to ensure feature data is correct and complete.


The feature store enables feature pipelines to cache feature data for use by many downstream model training pipelines, reducing the time to create/backfill features. Groups of features are often computed together and have their own natural ingestion cadence, see figure above. Real-time features may be updated in the online feature store every few seconds using a streaming application, while batch features could be updated hourly, daily, weekly, or monthly.

In practice, feature pipelines are data pipelines, where the output is cleaned, validated, featurized data. As there are typically no guarantees on the correctness of the incoming data, input data must be validated and any missing values must be handled (often by either imputing them or ignoring them). One popular framework for data validation with Spark is AWS Deequ, as they allow you to extend traditional schema-based support for validating data (e.g., this column should contain integers) with data validation rules for numerical or categorical values. For example, while a schema ensures that a numerical feature is of type float, additional validation rules are needed to ensure those floats lie within an expected range. You can also check to ensure a columns’ values are unique, not null, that its descriptive statistics  are within certain ranges. Validated data is then transformed into numeric and categorical features that are then cached in the feature store, and subsequently used both to train models and for batch/online model inferencing.


import hsfs
# “prod” is the production feature store

conn = hsfs.connection(host="ea2.aws.hopsworks.ai", project="prod")
featurestore = conn.get_feature_store()

# read raw data and use Spark to engineer features
raw_data_df =  spark.read.parquet('/parquet_partitioned')
polynomial_features = raw_data_df.map(lambda x: x^2)

# Features computed together in a DataFrames are in the same feature group
fg = featurestore.create_feature_group(name='fg_revenue',
                                       version=1,                         
                                       type='offline')

fg.create(polynomial_features)
g.compute_statistics()

In this code snippet, we connect to the Hopsworks Feature Store, read some raw data into a DataFrame from  a parquet file, and transform the data into polynomial features. Then, we create a feature group, it’s version is ‘1’ and it is only to be stored in the ‘offline’ feature store. Finally, we ingest our new polynomial_dataframe into the feature group, and compute statistics over the feature group that are also stored in the Hopsworks Feature Store. Note that Pandas DataFrames are supported as well as Spark DataFrames, and there are both Python and Scala/Java APIs.

When a feature store is available, the output of feature pipelines is cached feature data, stored in the feature store. Ideally, the destination data sink will have support for versioned data, such as in Apache Hudi in Hopsworks Feature Store. In Hopsworks, feature pipelines upsert (insert or update) data into existing feature groups, where a  feature group is a set of features computed together (typically because they come from the same backend system and are related by some entity or key). Every time a feature pipeline runs for a feature group, it creates a new commit in the sink Hudi dataset. This way, we can track and query different commits to feature groups in the Feature Store, and monitor changes to statistics of ingested data over time.

You can find an example notebook for feature engineering with PySpark in Databricks and registering features with Hopsworks here.


Model Training Pipelines in Databricks start at the Feature Store 

Model training pipelines in Databricks can read in train/test data either directly as Spark Dataframes from the Hopsworks Feature Store or as train/test files in S3 (in a file format like .tfrecords or .npy or .csv or .petastorm). Notebooks/jobs in Databricks can use the Hopsworks Feature Store to join features together to create such train/test datasets on S3.


Model training with a feature store typically involves at least three stages:

  1. select the features from feature groups and join them together to build a train/test dataset. You may also here want to filter out data and include an optional timestamp to retrieve features exactly as they were at a point of time in the past;
  2. train the model using the training dataset created in step 1 (training can be further decomposed into the following steps: hyperparameter optimization, ablation study, and model training);
  3. validate the model using automated tests and deploy it to a model registry for batch applications and/or an online model server for online applications.

import hsfs
conn = hsfs.connection(host="ea2.aws.hopsworks.ai", project="prod")
featurestore = conn.get_feature_store()

# get feature groups from which you want to create a training dataset
fg1 = featurestore.get_feature_group('fg_revenue', version=1)
fg2 = featurestore.get_feature_group('fg_users', version=2)
# lazily join features

joined_features = fg1.select_all() \
                  .join(fg2.select(['user_id', 'age']), on='user_id')

sink = featurestore.get_storage_connector('S3-training-dataset-bucket')

td = featurestore.create_training_dataset(name='revenue_prediction',
                    version=1,
                    data_format='tfrecords',
                    storage_connector=sink,
                    split={'train': 0.8, 'test': 0.2})

td.seed = 1234
td.create(joined_features)

Data Scientists are able to rely on the quality and business logic correctness in published features and can therefore quickly export and create training datasets in their favourite data format.
You can find an example notebook for getting started with creating train/test datasets from Hopsworks in Databricks here.

Deploying the Hopsworks Feature Store for Databricks

The Hopsworks Feature Store is available as a managed platform for AWS (Hopsworks.ai) and as an Enterprise platform for Azure.

Hopsworks.ai for AWS

Hopsworks.ai is our new managed platform for the Hopsworks Feature Store on AWS. In its current version, it will deploy a Hopsworks Feature Store into your AWS account. From Hopsworks.ai, you can stop/start/backup your Hopsworks Feature Store.


The details for how to launch a Hopsworks Feature Store inside an existing VPC/subnet used by Databricks are found in our documentation. The following figures from Hopsworks.ai show you how you have to pick the same Region/VPC/Zone used by your Databricks cluster when launching Hopsworks.


You also need to expose the Feature Store service for use by Databricks, see the figure below. 


For some Enterprises, an alternative to deploying Hopsworks in the same VPC as Databricks is VPC peering. VPC peering requires manual work, and you can contact us for help in VPC peering.

Enterprise Hopsworks for Databricks Azure

On Azure, by default, Databricks is deployed to a locked resource group with all data plane resources, including a virtual network (VNet) that all clusters will be associated with. However, with VNet injection, you can deploy Azure Databricks into the same virtual network where the Hopsworks Feature Store is deployed. Contact us for more details on how to install and setup VNet injection for Azure with Hopsworks Feature Store. An alternative to VNet injection is VPC, and you can contact us for help in VPC peering.

Learn more

Summary

A new key piece of infrastructure for machine learning has now arrived for Databricks users - the Hopsworks Feature Store. It enables you to centralize your features for ML for easier discovery and governance, it enables the reuse of features in different ML projects, and provides a single pipeline or engineering features for both training and inference. The Hopsworks Feature Store is available today as either a managed platform or AWS, so you can spin up a cluster in just a few minutes, or as an Enterprise platform for either AWS or Azure. 

References

Follow us on Twitter

Star us on Github

MLOps with a Feature Store

>
2/14/2020
>
Fabio Buso

TLDR; If AI is to become embedded in the DNA of Enterprise computing systems, Enterprises must first re-align their machine learning (ML) development processes to include data engineers, data scientists and ML engineers in a single automated development, integration, testing, and deployment pipeline. This blog introduces platforms and methods for continuous integration (CI), continuous delivery (CD), and continuous training (CT) with machine learning platforms, with details on how to do CI/CD machine learning operations (MLOps) with a Feature Store. We will see how the Feature Store refactors the monolithic end-to-end ML pipeline into a feature engineering and a model training pipeline.

What is MLOps?

MLOps is a recent term that describes how to apply DevOps principles to automating the building, testing, and deployment of ML systems. The Continuous Delivery Foundation’s SIG-MLOps defines MLOps as  “the extension of the DevOps methodology to include Machine Learning and Data Science assets as first class citizens within the DevOps ecology”. MLOps aims to unify ML application development and the operation of ML applications, making it easier for teams to deploy better models more frequently. Martinfowler.com defines MLOps as:
“a software engineering approach in which a cross-functional team produces machine learning applications based on code, data, and models in small and safe increments that can be reproduced and reliably released at any time, in short adaptation cycles.”

Some of the major challenges of MLOps, compared to Devops, is how to deal with versioned data, not just versioned code, how to manage specialized hardware (graphical processing units (GPUs)), and how to manage data governance and compliance for models. 

DevOps vs MLOps

Traditional DevOps CI/CD Workflow triggered by changes to source code.


Git is the world’s most popular source-code version control system. It is used to track changes in source code over time and to support different versions of source code. Support for version control is a prerequisite for automation and continuous integration (CI) solutions as it enables it enables reproducible provisioning of any environment in a fully automated fashion. That is, we assume the configuration information required to provision the environment is stored in a version control system, as well as the source code for the system we will be testing. Typically, when working with DevOps, every commit to Git triggers the automated creation of packages that can be deployed to any environment using only information in version control.

In most DevOps setups, Jenkins is used together with Git as an automation server that builds, tests and deploys your versioned code in a controlled and predictable way. The typical steps Jenkins follows for a CI/CD pipeline is to: provision testing virtual machines (VMs)/containers, checkout code onto machines, compile the code, run tests, package binaries, and deploy binaries. For Java, this involves running a build tool like maven to compile, test, and package Java binaries before deploying the binaries in some staging or production system. For Docker, this means compiling a Dockerfile and deploying the Docker image to a Docker registry.


High Level MLOps CI/CD Worflow triggered by changes in either source code or data.


Perhaps the most defining characteristic of MLOps is the need to version data as well as code to enable reproducible workflows for training models. Git is not suitable as a platform for versioning data, as it does not scale to store large volumes of data. Luckily, others have been working on alternative platforms in recent years.

However, Git and Jenkins are not enough for MLOps, where the build process involves running a complex distributed workflow and we need both versioned code and versioned data to ensure reproducible automated builds. The workflow is what we call a ML pipeline, a graph of components, where each component takes input parameters and data, and at the end, a successful workflow run deploys a trained model to production.  A standard ML pipeline consists of at least the following components: validate incoming data, compute features on the incoming data, generate train/test data, train the model, validate the model, deploy the model, and monitor the model in production. This simplified pipeline can, in practice, be even more complex, where the model training stage can be broken into smaller components: hyperparameter tuning, ablation studies, and distributed training. 

There are many already several end-to-end ML frameworks that support orchestration frameworks to run ML pipelines: TensorFlow Extended (TFX) supports Airflow, Beam and Kubeflow pipelines, Hopsworks supports Airflow, MLFlow supports Spark, and Kubeflow supports Kubeflow pipelines. These frameworks enable the automated execution of workflows, the ability to repeat steps, such as re-training a model, with only input parameter changes, the ability to pass data between components, and the ability to specify event-based triggering of workflows (e.g., at a specific time of day, on the arrival of new data, or when model performance degrades below a given level). TFX, MLFlow, and Hopsworks also support distributed processing using Beam and/or Spark, enabling scale-out execution on clusters using very large amounts of data.

MLOps: versioned Code and Data 

Data Versioning, Git-Style

DVC, developed by the affable Dmitry Petrov,  provides an open-source tool for versioning files/objects in cloud storage that uses Git to store metadata about files and reflinks (that support transparent copy-on-write for data files) to ensure consistency between git entries and the data files. Similarly, Pachyderm, a ML platform on Kubernetes,  also provides a data versioning platform using git-like semantics. However, these git-like approaches just track immutable files, they do not store the differences between files. They cannot handle time-travel queries, such as “give me train/test data for the range between the years 2016-2018” or “give me the value of these features on the 6th September 2018”. Without time-travel, they cannot support incremental feature engineering: compute features only on the data that has changed since the last time run (1 hour ago, a day ago, etc).

Data Versioning with Time-Travel Queries and Incremental Pulling

An alternative to git-like data versioning systems is to use a transactional data-lake that provides versioned, structured datasets. A versioned dataset does not just have a version of the schema for its data (schemas may evolve over time), but also updates to the data-lake are executed atomically and identified by a commit. The most well known such platforms are the open-source projects: Delta Lake, Apache Hudi, Apache Iceberg. Here users can perform time-travel queries that return the data at a given point-in-time (commit-id), or the data for a given time-interval, or the changes to the data since a given point in time. They execute time travel queries efficiently using indexes (bloom filters, z-indexes, data-skipping indexes) that massively reduce the amount of data that needs to be read from the file system or object store. Transactional data lakes also enable incremental feature engineering - compute features only for the data that has changed in the last hour or day - by enabling clients to read only those changes in a dataset since a given point in time.

Hopsworks Feature Store 

The Feature Store for machine learning is a feature computation and storage service that enables features to be registered, discovered, and used both as part of ML pipelines as well as by online applications for model inferencing. Feature Stores are typically required to store both large volumes of feature data and provide low latency access to features for online applications. As such, they are typically implemented as a dual-database system: a low latency online feature store (typically a key-value store or real-time database) and a scale-out SQL database to store large volumes of feature data for training and batch applications. The online feature store enables online applications to enrich feature vectors with near real-time feature data before performing inference requests.  The offline feature store can store large volumes of feature data that is used to create train/test data for model development or by batch applications for model scoring. The Feature Store solves the following problems in ML pipelines: 

  • reuse of feature pipelines by sharing features between teams/projects;
  • enables the serving of features at scale and with low latency;
  • ensures the consistency of features between training and serving - features are engineered once and can be cached in both the Online and Offline Feature Stores;
  • ensures point-in-time correctness for featuress - when a prediction was made and an outcome arrives later, we need to be able to query the values of different features at a given point in time in the past. 
The Feature Store for ML consists of both an Online and Offline database and transforms raw data from backend systems into engineered features that are made available to online and batch applications for inferencing and to Data Scientists to create train/test data for model development.

Most hyperscale AI companies have built internal feature stores (Uber, Twitter, AirBnb, Google, Facebook, Netflix, Comcast), but there are also two open-source Feature Stores: Hopsworks Feature Store (built on Apache Hudi/Hive, MySQL Cluster and HopsFS) and Feast (built on Big Query, BigTable, and Redis). Other databases used by existing Feature Stores include Cassandra, S3, and Kafka, and custom key-value stores.


End-to-End ML Pipelines with the Hopsworks Feature Store

Both MLOps and DataOps CI/CD pipelines differ from traditional DevOps in that they may be triggered by new data arriving for processing (as well as triggering due to updates to the source code for the data engineering or model training pipelines). DataOps is concerned with automating test and deployment of data processing pipelines (or feature pipelines, in our case), with stages such as data validation and data pipelines. MLOps, on the other hand, is concerned with the automation of training and deploying production ML models, with stages such as model training, model validation, and model deployment.


The Feature Store enables ML workflows to be decomposed into two workflows: (1) a “DataOps” workflow for engineering features and validating incoming data that caches the features in the Feature Store, and (2) a “MLOps” workflow for training models using features from the Feature Store, analyzing and validating those models, deploying them into online model serving infrastructure, and monitoring model performance in production.


Some ML lifecycle frameworks, such as TensorFlow Extended (TFX) and MLFlow, are based around end-to-end ML pipelines that start with raw data and end in production models. However, the first steps of an end-to-end ML pipeline, that take raw data and turn it into training data for models can be very expensive. According to Airbnb, without a feature store, creating train/test data can take up to 60-80% of data scientists time. The feature store enables transformed data (features) to be reused in different models. When you have a feature store, you no longer need end-to-end ML pipelines from raw data to models. You can decompose end-to-end ML pipelines into two separate pipelines that each run at their own cadence: (1) feature pipelines that ingest data from backend systems, validate it, featurize it and cache it in the feature store, and (2) training pipelines that train models from feature data, validate those models, and deploy them to production. 

The motivation for introducing the Feature Store for MLOps is that the process for ingesting and featurizing new data is separate from the process for training models using features that come from potentially many different sources. That is, there are often differences in the cadence for feature engineering compared to the cadence for model training. Some features may be updated every few seconds, while others are updated every few months. Models, on the other hand, can be trained on demand, regularly (every day or every week, for example), or when monitoring shows a model’s performance has degraded. Feature engineering pipelines are typically triggered at regular intervals when new data arrives or on-demand when source code is pushed to git because changes were made in how features are engineered.

ML Pipelines are Stateful 

The best practice when developing data pipelines is for them to be stateless and idempotent, so that they can be safely re-run in case of failures. ML pipelines, however, have state. Before you deploy a model to production, you need some contextual information - does this model perform better than the currently deployed model? This decision requires state about the currently deployed model. Ideally, we also want historical state, so we can observe and evaluate the performance of models over time and the processing time/success-rate of building models over time. Hopsworks, TFX and MLFlow provide a metadata store to enable ML pipelines to make stateful decisions, to log their execution steps, store the artifacts they produce, and store the provenance of the final models. Both TFX and MLFlow are obtrusive - they make developers re-write the code at each of the stages with their component models (with well-defined inputs and outputs to each stage). This way, they can intercept input parameters to components and log them to the metadata store. Hopsworks provides an inobtrusive metadata model, where pipelines read/write to the HopsFS (HDFS) file system and use Hopsworks APIs to interact with its feature store. This way, metadata events, artifacts, executions, and provenance are implicitly stored to the metadata store without the need to rewrite notebooks or python programs, as is needed in TFX or MLFlow.

Feature Pipelines feed the Hopsworks Feature Store


Feature pipelines have a natural cadence for each data source, and the cached features can be reused by many downstream model training pipelines.


The feature store enables feature pipelines to cache feature data for use by many downstream model training pipelines, reducing the time to create/backfill features. Groups of features are often computed together and have their own natural ingestion cadence, see figure above. Real-time features may be updated in the online feature store every few seconds using a streaming application, while batch features could be updated hourly, daily, weekly, or monthly.

In practice, feature pipelines are data pipelines, where the output is cleaned, validated, featurized data. As there are typically no guarantees on the correctness of the incoming data, input data must be validated and any missing values must be handled (often by either imputing them or ignoring them). Two popular frameworks for data validation are TFX data validation and AWS Deequ, as they allow you to extend traditional schema-based support for validating data (e.g., this column should contain integers) with data validation rules for checking if numerical or categorical values are as expected. For example, while a schema ensures that a numerical feature is of type float, additional validation rules are needed to ensure those floats lie within an expected range. You can also check to ensure a columns’ values are unique, not null, that its descriptive statistics  are within certain ranges. Validated data is then transformed into numeric and categorical features that are then cached in the feature store, and subsequently used both to train models and for batch/online model inferencing. 


Feature Pipelines belong to the DataOps paradigm, where frameworks like Spark, PySpark, Pandas, and Featuretools are used along with data validation libraries like TFX data validation and Deequ.


Feature pipelines share many of the same best-practice DevOps practices with data pipelines. Some of the types of automated tests for data/features, include:

  • unit test and integration tests for all featurization code (Jenkins can run these tests when code is pushed to Git);
  • test that feature values fall within expected ranges (TFX data validation or Deequ);
  • test the uniqueness, completeness, and distinctness of features (Deequ);
  • test that feature distributions match your expectations (TFX data validation or Deequ);
  • test the relationship between each feature and the label, and the pairwise correlations between individual signals (Deequ);
  • test the cost of each feature (custom tests);
  • test that personally identifiable information is not leaking into features (custom tests).

When a feature store is available, the output of feature pipelines is cached feature data, stored in the feature store. Ideally, the destination data sink will have support for versioned data, such as in Apache Hudi in Hopsworks Feature Store. In Hopsworks, feature pipelines upsert (insert or update) data into existing feature groups, where a  feature group is a set of features computed together (typically because they come from the same backend system and are related by some entity or key). Every time a feature pipeline runs for a feature group, it creates a new commit in the sink Hudi dataset. This way, we can track and query different commits to feature groups in the Feature Store, and monitor changes to statistics of ingested data over time.

Model Training Pipelines start at the Feature Store

Model training pipelines belong to the MLOps paradigm, where versioned features are read from Apache Hudi in the Hopsworks Feature Store to create train/test data that is used to train models that are then deployed and monitored in production. Provenance of ML artifacts and executions are stored in the Metadata store in Hopsworks, and ML pipelines are orchestrated by Hopsworks.


Model training with a feature store typically involves at least three stages (or programs) in a workflow:

  1. select the features to include, the file format, and the file system (or object store) for the train/test dataset that will be created from features in the feature store. Note that for Hopsworks Feature store, a timestamp (corresponding to Hudi commit-ids), can also be supplied to reproduce a train/test dataset exactly as it was at a point of time in the past;
  2. train the model using the training dataset created in step 1 (training can be further decomposed into the following steps: hyperparameter optimization, ablation study, and model training);
  3. validate the model using automated tests and deploy it to a model registry for batch applications and/or an online model server for online applications.

In the Hopsworks platform, these three steps would typically be python programs or Jupyter notebooks and they are executed as part of an Airflow DAG (directed acyclic graph). That is, Airflow orchestrates the execution of the pipeline. Airflow enables DAGs to be scheduled periodically, but it can also be configured to run workflows when new feature data arrives in the feature store or when Git commits are pushed for model training pipeline code.

The type of automated tests that are performed during the model validation step include:

  • test how the model performs on different data slices to check for bias,
  • test the robustness of the model to out-of-distribution feature vectors.

Hopsworks supports Google’s What-If Tool for model analysis using Jupyter notebooks. It is useful to investigate counterfactuals (compare a datapoint to the most similar point where your model predicts a different result), making it easier to develop model validation tests that can subsequently in production pipelines.

Google’s What-If Tool can be used to analyze a model, asking counterfactuals and testing for bias on different slices of data. Knowledge discovered here can be transferred into model validation tests.

Monitoring Online Models

When a model is deployed to a model server for use by online applications, we need to monitor model its performance and its input features. We need to identify if the input features in production are statistically different from the input features used to train the model. In practice, we can do this by comparing statistics computed over the training data (accessible through feature store API calls) with statistics collected from input features at runtime. In Hopsworks, we log all prediction requests sent to models to a topic in Kafka. You can then write a Spark Streaming or Flink application that processes the prediction requests in Kafka, computing statistics in time-based windows and comparing those statistics with the training data statistics from the Feature Store. If the time-based windows statistics for a given feature diverge significantly from the training statistics, your streaming application can notify ML engineers that input features are not as expected. Your streaming application will typically also compute business-level KPIs for the models and provide a UI to enable operators to visualize model performance. More concretely, the error signals to look for in online monitoring include:

Concept drift

In a model, the target variable is what the model is trying to predict. It could be, for example, if a financial transaction is suspected as fraud or not fraud. When the statistical properties of a model change over time in an unexpected way (for example, a new fraud scheme appears that increases the overall amount of fraud), we have concept drift.

Data drift

If, however, the statistical properties of the input features change over time in an unexpected way, it will negatively impact the model’s performance. For example, if users execute many more financial transactions than normal due to it being a holiday period, but the model was not trained to handle holiday periods, then the model performance may degrade (either missing fraud or flagging up too many transactions as suspicious).

Feature pipeline changes

If there are changes in how a feature is computed in a feature pipeline, and an online model enriches its feature vector with that feature data from the online feature store, then this can negatively impact the model’s performance. For example, if you change how to compute the number of transactions a user carries out, it may negatively impact the model’s performance.

Summary

We have now covered an End-to-End ML pipeline with a Feature Store based on MLOps principles. Updates to pipeline code or newly arrived data enable changes to be continuously tested and models to be continually updated and deployed in production. We showed how the Feature Store enables monolithic end-to-end ML pipelines to be decomposed into feature pipelines and model training pipelines. We also discussed how data versioning is possible with modern data lake frameworks such as Apache Hudi. In the next blog, we will cover ML pipelines and reproducible experiments Hopsworks in more detail, and how to easily move pipelines from development to production environments. We will also show how to develop both feature pipelines and model training pipelines using Airflow. 

References

Feature Store: The Missing Data Layer in ML Pipelines?

>
12/30/2018
>
Kim Hammar

TLDR; A feature store is a central vault for storing documented, curated, and access-controlled features. In this blog post, we discuss the state-of-the-art in data management for deep learning and present the first open-source feature store, available in Hopsworks.

What is a Feature Store?

The concept of a feature store was introduced by Uber in 2017 [11]. The feature store is a central place to store curated features within an organization. A feature is a measurable property of some data-sample. It could be for example an image-pixel, a word from a piece of text, the age of a person, a coordinate emitted from a sensor, or an aggregate value like the average number of purchases within the last hour. Features can be extracted directly from files and database tables, or can be derived values, computed from one or more data sources.

Features are the fuel for AI systems, as we use them to train machine learning models so that we can make predictions for feature values that we have never seen before.

Figure 1. A feature store is the interface between feature engineering and model development.

The feature store has two interfaces:

Writing to the feature store: The interface for Data Engineers. At the end of the feature engineering pipeline, instead of writing features to a file or a project-specific database or file, features are written to the feature store.

Data Engineer example:

from hops import featurestore
raw_data = spark.read.parquet(filename)

polynomial_features = raw_data.map(lambda x: x^2)

featurestore.insert_into_featuregroup(polynomial_features, "polynomial_featuregroup")

Reading from the feature store: The interface for Data Scientists. To train a model on a set of features, the features can be read from the feature store directly.

Data Scientist example:

from hops import featurestore

features_df = featurestore.get_features(["average_attendance", "average_player_age"])

A feature store is not a simple data storage service, it is also a data transformation service as it makes feature engineering a first-class construct. Feature engineering is the process of transforming raw data into a format that is understandable for predictive models.

Why You Need a Feature Store

At Logical Clocks we are committed to developing technologies to operate machine learning workflows at large scale and to help organizations distill intelligence from data. Machine learning is an extremely powerful method that has the potential to help us move from a historical understanding of the world to a predictive modeling of the world around us. However, building machine learning systems is hard and requires specialized platforms and tools.

Although ad-hoc feature engineering and training pipelines is a quick way for Data Scientists to experiment with machine learning models, such pipelines have a tendency to become complex over time. As the number of models increase, it quickly becomes a pipeline jungle that is hard to manage. This motivates the usage of standardized methods and tools for the feature engineering process, helping reduce the cost of developing new predictive models. The feature store is a service designed for this purpose.

Technical Debt in Machine Learning Systems

“Machine Learning: The High-Interest  Credit Card of Technical Debt”

Google [3]

Machine learning systems have a tendency to assemble technical debt [1]. Examples of technical debt in machine learning systems are:

  • There is no principled way to to access features during model serving.
  • Features cannot easily be re-used between multiple machine learning pipelines.
  • Data science projects work in isolation without collaboration and re-use.
  • Features used for training and serving are inconsistent.
  • When new data arrives, there is no way to pin down exactly which features need to be recomputed, rather the entire pipeline needs to be run to update features.

Several organizations that we have spoken to struggle to scale their machine learning workflows due to the technical complexity, and some teams are even reluctant to adopting machine learning considering the high technical cost of it. Using a feature store is a best practice that can reduce the technical debt of machine learning workflows.

“Pipeline jungles can only be avoided by thinking holistically about data collection and feature extraction”

Google [1]

Data Engineering is the hardest problem in Machine Learning

“Data is the hardest part of ML and the most important piece to get right. Modelers spend most of their time selecting and transforming features at training time and then building the pipelines to deliver those features to production models. Broken data is the most common cause of problems in production ML systems”

Uber [2]

Delivering machine learning solutions in production and at large-scale is very different from fitting a model to a pre-processed dataset. In practice, a large part of the effort that goes into developing a model is spent on feature engineering and data wrangling.

Figure 2. Model development is just one part of the work that goes into machine learning project.

There are many different ways to extract features from raw data, but common feature engineering steps include:

  • Converting categorical data into numeric data;
  • Normalizing data (to alleviate ill-conditioned optimization when features originate from different distributions);
  • One-hot-encoding/binarization;
  • Feature binning (e.g., convert continuous features into discrete);
  • Feature hashing (e.g., to reduce the memory footprint of one-hot-encoded features);
  • Computing polynomial features;
  • Representation learning (e.g.,  extract features using clustering, embeddings, or generative models);
  • Computing aggregate features (e.g., count, min, max, stdev).

To illustrate the importance of feature engineering, let’s consider a classification task on a dataset with just one feature, x1, that looks like this:

Figure 3. A dataset with a single feature, x1, that have two classes (filled circle and non-filled circle) that are not linearly separable.

We are doomed to fail if we try to fit a linear model directly to this dataset as it is not linearly separable. During feature engineering we can extract an additional feature, x2, where the function for deriving x2 from the raw dataset is x2 = (x1)^2. The resulting two-dimensional dataset might look like depicted in Figure 2.

Figure 4. A dataset with two features, x1 and x2, that have two classes (filled circle and non-filled circle) that are linearly separable (e.g., by the red line).

By adding an extra feature, the dataset becomes linearly separable and can be fitted by our model. This was a simple example, in practice the process of feature engineering can involve much more complex transformations.

In the case of deep learning, deep models tend to perform better the more data they are trained on (more data samples during training can have a regularizing effect and combat overfitting). Consequently, a trend in machine learning is to train on increasingly larger datasets. This trend further complicates the feature engineering process as Data Engineers must think about scalability and efficiency in addition to the feature engineering logic. With a standardized and scalable feature platform, the complexity of feature engineering can be managed more effectively.

Life Before the Feature Store

Figure 5. A typical machine learning infrastructure without a feature store.

No Feature Store

In Figure 5, feature code is duplicated across training jobs and there are also features that have different implementations:  one for training, and one for deployment (Inferencing) (Model C). Having different implementations for computing features for training and deployment entails non-DRY code and can lead to prediction problems. Moreover, without a feature store, features are typically not reusable as they are embedded in training/serving jobs. This also means that Data Scientists have to write low level code for accessing data stores, requiring data engineering skills. There is also no service to search for feature implementations, and there is no management or governance of features.

Figure 6. A machine learning Infrastructure with a feature store.

With a Feature Store

Data Scientists can now search for features, and with API support, easily use them to build models with minimal data engineering. In addition, features can be cached and reused by other models, reducing model training time and infrastructure costs. Features are now a managed, governed asset in the Enterprise.

Economies of Scale for Machine Learning Organizations

A frequent pitfall for organizations that apply machine learning is to think of data science teams as individual groups that work independently with limited collaboration. Having this mindset results in machine learning workflows where there is no standardized way to share features across different teams and machine learning models. Not being able to share features across models and teams is limiting Data Scientist's productivity and makes it harder to build new models. By using a shared feature store, organizations can achieve an economies-of-scale effect. When the feature store is built up with more features, it becomes easier and cheaper to build new models as the new models can re-use features that exist in the feature store.

Figure 7. By centralizing feature storage within the organization, the ramp-up period for new models and machine learning projects is reduced.

Hopsworks Feature Store

With Hopsworks 0.8.0 we are releasing the first open-source feature store service that will be integrated in the HopsML framework [8]. In this section we cover the technical details of the system and how it is used.

The Components of a Feature Store and a Comparison of Existing Feature Stores

During 2018, a number of large companies that are at the forefront of applying machine learning at scale announced the development of proprietary feature stores. Uber, LinkedIn, and Airbnb built their feature stores on Hadoop data lakes, while Comcast built a feature store on an AWS data lake, and GO-JEK built a feature store on Google’s data platform.

These existing feature stores consist of five main components:

  • The feature engineering jobs, the computation of features, the dominant frameworks for feature computation are Samza (Uber [4]),Spark (Uber [4], Airbnb [5], Comcast [6]), Flink (Airbnb [5], Comcast [6]), and Beam (GO-JEK [7]).
  • The storage layer for storing feature data. Common solutions for storing features are Hive (Uber [4], Airbnb [5]), S3 (Comcast [6]), and BigQuery (GO-JEK [7]).
  • The metadata layer used for storing code to compute features, feature version information, feature analysis data, and feature documentation.
  • The Feature Store API used for reading/writing features from/to the feature store.
  • The feature registry, a user interface (UI) service where Data Scientists can share, discover, and order computation of features.

Before we dive into the feature store API and its usage, let’s have a look at the technology stack that we built our feature store on.

Hopsworks Feature Store Architecture

Figure 8. Architecture of Hopsworks Feature Store

Feature Engineering Frameworks

At Logical Clocks we specialize in Python-first ML pipelines, and for feature engineering we focus our support on Spark, PySpark, Numpy, and Pandas. The motivation for using Spark/PySpark to do feature engineering is that it is the preferred choice for data wrangling among our users that are working with large-scale datasets. However, we have also observed that users working with small datasets prefer to do the feature engineering with frameworks such as Numpy and Pandas, which is why we decided to provide native support for those frameworks as well. Users can submit feature engineering jobs on the Hopsworks platform using notebooks, python files, or .jar files.

The Storage Layer

We have built the storage layer for the feature data on top of Hive/HopsFS with additional abstractions for modeling feature data. The reason for using Hive as the underlying storage layer is two-fold: (1) it is not uncommon that our users are working with datasets in terabyte-scale or larger, demanding scalable solutions that can be deployed on HopsFS (see blog post on HopsFS [9]); and (2) data modeling of features is naturally done in a relational manner, grouping relational features into tables and using SQL to query the feature store. This type of data modelling and access patterns fits well with Hive in combination with columnar storage formats such as Parquet or ORC.

The Metadata Layer

To provide automatic versioning, documentation, feature analysis, and feature sharing we store extended metadata about features in a metadata store. For the metadata store we utilize NDB (MySQL Cluster) which allows us to keep feature metadata that is strongly consistent with other metadata in Hopsworks, such as metadata about feature engineering jobs and datasets.

Feature Data Modeling

We introduce three new concepts to our users for modeling data in the feature store.

  • The feature is an individual versioned and documented data column in the feature store, e.g., the average rating of a customer.
  • The feature group is a documented and versioned group of features stored as a Hive table. The feature group is linked to a specific Spark/Numpy/Pandas job that takes in raw data and outputs the computed features.
  • The training dataset is a versioned and managed dataset of features and labels (potentially from multiple different feature groups). Training datasets are stored in HopsFS as tfrecords, parquet, csv, tsv, hdf5, or .npy files.
Figure 9. A feature group contains a group of features and a training dataset contains a set of features, potentially from many different feature groups.

When designing feature groups, it is a best-practice to let all features that are computed from the same raw dataset to be in the same feature group. It is common that there are several feature groups that share a common column, such as a timestamp or a customer-id, that allows feature groups to be joined together into a training dataset.

The Feature Store API

The feature store has two interfaces; one interface for writing curated features to the feature store and one interface for reading features from the feature store to use for training or serving.  

Creating Features

The feature store is agnostic to the method for computing the features. The only requirement is that the features can be grouped together in a Pandas, Numpy, or Spark dataframe. The user provides a dataframe with features and associated feature metadata (metadata can also be edited later through the feature registry UI) and the feature store library takes care of creating a new version of the feature group, computing feature statistics, and linking the features to the job to compute them.

Insert Features

from hops import featurestore

featurestore.insert_into_featuregroup(features_df, featuregroup_name)

Create Feature Group

from hops import featurestore

featurestore.create_featuregroup(
   features_df,
   featuregroup_name,
   featuregroup_description,
   feature_engineering_job,
   featuregroup_version
)

Reading From the Feature Store (Query Planner)

To read features from the feature store, users can use either SQL or APIs in Python and Scala. Based on our experience with users on our platform, Data Scientists can have diverse backgrounds. Although some Data Scientists are very comfortable with SQL, others prefer higher level APIs. This motivated us to develop a query-planner to simplify user queries. The query-planner enables users to express the bare minimum information to fetch features from the feature store. For example, a user can request 100 features that are spread across 20 different feature groups by just providing a list of feature names. The query-planner uses the metadata in the feature store to infer where to fetch the features from and how to join them together.

Figure 10. Users query the feature store programmatically or with SQL queries. The output is provided as Pandas, Numpy, or Spark dataframes.

To fetch the features “average_attendance” and “average_player_age” from the feature store, all the user has to write is this.

Example to fetch features

from hops import featurestore

features_df = featurestore.get_features(["average_attendance", "average_player_age"])

The returned “features_df” is a (Pandas, Numpy, or Spark) dataframe that can then be used to generate training datasets for models.

Creating Training Datasets

Organizations typically have many different types of raw datasets that can be used to extract features. For example, in the context of user recommendation there might be one dataset with demographic data of users and another dataset with user activities. Features from the same dataset are naturally grouped into a feature group, and it is common to generate one feature group per dataset. When training a model, you want to include all features that have predictive power for the prediction task, these features can potentially span multiple feature groups. The training dataset abstraction in the Hopsworks Feature Store is used for this purpose. The training dataset allows users to group a set of features with labels for training a model to do a particular prediction task.

Once a user has fetched a set of features from different feature groups in the feature store, the features can be joined with labels (in case of supervised learning) and materialized into a training dataset. By creating a training dataset using the feature store API, the dataset becomes managed by the feature store. Managed training datasets are automatically analyzed for data anomalies, versioned, documented, and shared with the organization.

Figure 11. The life-cycle of data in HopsML. Raw data is transformed into features which are grouped together into training datasets that are used to train models.

To create a managed training dataset, the user supplies a Pandas, Numpy or Spark dataframe with features, labels, and metadata.

Create a managed training dataset

from hops import featurestore

features_df = featurestore.get_features(["average_attendance", "average_player_age"])

featurestore.create_training_dataset(
   features_df,
   training_dataset_name,
   training_dataset_description,
   computation_job,
   training_dataset_version,
   data_format="tfrecords"
)

Once the training dataset has been created, the dataset is discoverable in the feature registry and users can use it to train models. Below is an example code snippet for training a model using a training dataset stored in a distributed manner in the tfrecords format on HopsFS.

Training a model using a training dataset

from hops import featurestore

import tensorflow as tf

dataset_dir = featurestore.get_training_dataset_path(td_name)


# the tf records are written in a distributed manner using partitions

input_files = tf.gfile.Glob(dataset_dir + "/part-r-*")


# tf record schemas are managed by the feature store

tf_record_schema = featurestore.get_training_dataset_tf_record_schema(td_name)


# tf records are a sequence of *binary* (serialized with protobuf) records that need to be decoded.

def decode(example_proto):

   return tf.parse_single_example(example_proto, tf_record_schema)

dataset = tf.data.TFRecordDataset(input_files)
   .map(decode)
   .shuffle(shuffle_buffer_size)
   .batch(batch_size)
   .repeat(num_epochs)


# three layer MLP for regression

model = tf.keras.Sequential([
   layers.Dense(64, activation="relu"),
   layers.Dense(64, activation="relu"),
   layers.Dense(1)
])

model.compile(optimizer=tf.train.AdamOptimizer(lr), loss="mse")

model.fit(dataset, epochs=num_epochs, steps_per_epoch=spe)

The Feature Registry

The feature registry is the user interface for publishing and discovering features and training datasets. The feature registry also serves as a tool for analyzing feature evolution over time by comparing feature versions. When a new data science project is started, Data Scientists within the project typically begin by scanning the feature registry for available features, and only add new features for their model that do not already exist in the feature store.

Figure 12. Feature Registry on Hopsworks.

The feature registry provides :

  • Keyword search on feature/feature group/training dataset metadata.
  • Create/Update/Delete/View operations on feature/feature group/training dataset metadata.
  • Automatic feature analysis.
  • Feature dependency tracking.
  • Feature job tracking.
  • Feature data preview.

Automatic Feature Analysis

When a feature group or training dataset is updated in the feature store, a data analysis step is performed. In particular, we look at cluster analysis, feature correlation, feature histograms, and descriptive statistics. We have found that these are the most common type of statistics that our users find useful in the feature modeling phase. For example, feature correlation information can be used to identify redundant features, feature histograms can be used to monitor feature distributions between different versions of a feature to discover covariate shift, and cluster analysis can be used to spot outliers. Having such statistics accessible in the feature registry helps users decide on which features to use.

Figure 13. Viewing the feature correlation for a training dataset using the feature registry.


Figure 14. Viewing the distribution of a feature in a feature group using the feature registry.

Feature Dependencies and Automatic Backfilling

When the feature store increases in size, the process of scheduling jobs to recompute features should be automated to avoid a potential management bottleneck. Feature groups and training datasets in Hopsworks feature store are linked to Spark/Numpy/Pandas jobs which enables  the reproduction and recompution of the features when necessary. Moreover, each feature group and training dataset can have a set of data dependencies. By linking feature groups and training datasets to jobs and data dependencies, the features in the Hopsworks feature store can be automatically backfilled using workflow management systems such as Airflow [10].

Figure 15. Feature dependency tracking.

A Multi-Tenant Feature Store Service

We believe that the biggest benefit of a feature store comes when it is centralized across the entire organization. The more high-quality features available in the feature store the better. For example, in 2017 Uber reported that they had approximately 10000 features in their feature store [11].

Despite the benefit of centralizing features, we have identified a need to enforce access control to features. Several organizations that we have talked to are working partially with sensitive data that requires specific access rights that is not granted to everyone in the organization. For example, it might not be feasible to publish features that are extracted from sensitive data to a feature store that is public within the organization.

To solve this problem we utilize the multi-tenancy property built-in to the architecture of the Hopsworks platform [12]. Feature stores in Hopsworks are by default project-private and can be shared across projects, which means that an organization can combine public and private feature stores. An organization can have a central public feature store that is shared with everyone in the organization as well as private feature stores containing features of sensitive nature that are only accessible by users with the appropriate permissions.

Figure 16. Based on the organization need, features can be divided into several feature stores to preserve data access control.

Future Work

The feature store covered in this blog post is a so called batch feature store, meaning that it is a feature store designed for training and non-real time model serving. In future work, we plan to extend the feature store to meet real-time guarantees that are required during serving of user-facing models. Moreover, we are currently in the process of evaluating the need for a Domain Specific Language (DSL) for feature engineering. By using a DSL, users that are not proficient in Spark/Pandas/Numpy can provide an abstract declarative description of how features should be extracted from raw data and then the library translates that description into a Spark job for computing the features. Finally, we are also looking into supporting Petastorm [13] as a data format for training datasets. By storing training datasets in Petastorm we can feed Parquet data directly into machine learning models in an efficient manner. We consider Petastorm as a potential replacement for tfrecords, that can make it easier to re-use training datasets for other ML-frameworks than Tensorflow, such as PyTorch.

Summary

Building successful AI systems is hard. At Logical Clocks we have observed that our users spend a lot of effort on the Data Engineering phase of machine learning. From the release of version 0.8.0, Hopsworks provides the world’s first open-source feature store. A feature store is a data management layer for machine learning that allows Data Scientists and Data Engineers to share and discover features, better understand features over time, and effectivize the machine learning workflow.

References

[1] Hidden Technical Debt in Machine Learning Systems: https://papers.nips.cc/paper/5656-hidden-technical-debt-in-machine-learning-systems.pdf

[2] Scaling Machine Learning at Uber with Michelangelo: https://eng.uber.com/scaling-michelangelo/

[3] Machine Learning: The High-Interest Credit Card of Technical Debt: https://static.googleusercontent.com/media/research.google.com/sv//pubs/archive/43146.pdf

[4] Scaling Machine Learning as a Service (Uber): http://proceedings.mlr.press/v67/li17a/li17a.pdf

[5] Zipline: Airbnb’s Machine Learning Data Management Platform: https://databricks.com/session/zipline-airbnbs-machine-learning-data-management-platform

[6] Operationalizing Machine Learning—Managing Provenance from Raw Data to Predictions: https://databricks.com/session/operationalizing-machine-learning-managing-provenance-from-raw-data-to-predictions

[7] Building a Feature Platform to Scale Machine Learning | DataEngConf BCN '18: https://www.youtube.com/watch?v=0iCXY6VnpCc

[8] HopsML, Python-First ML Pipelines: https://hops.readthedocs.io/en/latest/hopsml/hopsML.html

[9] Fixing the Small Files Problem in HDFS: https://www.logicalclocks.com/fixing-the-small-files-problem-in-hdfs/

[10] Airflow: https://airflow.apache.org/

[11] Meet Michelangelo: Uber’s Machine Learning Platform: https://eng.uber.com/michelangelo/

[12] Introducing Hopsworks: https://www.logicalclocks.com/introducing-hopsworks/

[13] Petastorm: https://github.com/uber/petastorm

[14] Deep learning scaling is predictable, empirically: https://blog.acolyer.org/2018/03/28/deep-learning-scaling-is-predictable-empirically/

[15] Feature Store at LinkedIn https://www.thestrangeloop.com/2018/democratizing-ai—back-fitting-end-to-end-machine-learning-at-linkedin-scale.html