No items found.

Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

End-to-end Deep Learning Pipelines with Earth observation Data in Hopsworks

>
>
10/8/2021
End-to-end Deep Learning Pipelines with Earth observation Data in Hopsworks

Tl;dr: Hopsworks enables data scientists to develop scalable end-to-end machine learning pipelines with Earth observation data. In this blog post we demonstrate how to build such a pipeline with real-world data in order to develop an iceberg classification model.

Introduction

In the blog post AI Software Architecture for Copernicus Data with Hopsworks we described how Hopsworks, the data-intensive AI platform with a feature store, brings support for scale-out AI with Earth observation data from the Copernicus programme and the H2020 ExtremeEarth project. This blog post is a continuation of the previous one as we dive into a real-world example where we describe how to build and deploy a machine learning (ML) model using a horizontally scalable deep learning (DL) architecture that identifies if a remotely sensed target is a ship or iceberg.

An extended version of this blog post is available as deliverable “D1.8 Hops data platform support for EO data -version II” of the H2020 ExtremeEarth project published in June 2021.

Pipeline

In order to develop and put in production a machine learning model, the input data needs to be processed and transformed through a series of stages. Each stage serves a distinct purpose and all the stages chained together transform the input Earth observation data into an ML model that application clients can use. For the ship/iceberg classification model described in this example, these stages are listed below and described in detail in the following sections:

  • Data ingestion and pre-processing
  • Feature Engineering and Feature Validation
  • Training
  • Model Analysis, Model Serving, Model Monitoring
  • Orchestration
End-to-end ML pipeline stages

Dataset

A requirement for this example is to use a free and publicly available dataset in the Earth observation domain. As such, we opted for the “Statoil/C-CORE Iceberg Classifier Challenge - Ship or iceberg, can you decide from space?” [1] hosted by Kaggle which is an online community of data scientists and machine learners, and is distributed for free.

The schema for the Statoil dataset is presented in the figure below. The data is in json format and contains 1604 images. For each image in the dataset, we have the following information:

  • id - the id of the image.
  • band_1, band_2 - the flattened image data. Each band has 75x75
  • pixel values in the list, so the list has 5625 elements. Band 1 and Band 2 are signals characterized by radar backscatter produced from the polarizations to HH (transmit/receive horizontally) and HV (transmitted horizontally and received vertically).
  • inc_angle - the incidence angle of which the image was taken.
  • is_iceberg - set to 1 if it is an iceberg, and 0 if it is a ship.
Schema of the Statoil demonstrator dataset

Data ingestion and preprocessing

Hopsworks can ingest data from various external sources and it is up to the users to decide the most efficient approach for their use cases. Such data sources include object stores such as Amazon AWS S3 or Azure EBS, external relational databases that can be accessed via protocols such as JDBC and of course the data that resides in the local filesystem. Another option, which has followed for the purposes of this article, was to upload the input data via the Hopsworks UI which makes use of the Hopsworks REST API. This way, day is readily available to applications running in Hopsworks from within the project’s datasets.

Often times data  needs to be pre-processed, that is transformed into data ready to extract ML features from and eventually use it as training/test data. In the Earth observation domain, such preprocessing steps might involve applying algorithms implemented in arbitrary languages and platforms. For example, the European Space Agency (ESA) is  developing free open source toolboxes for the scientific exploitation of Earth Observation missions. ESA SNAP [2] is a common architecture for all Sentinel Toolboxes. To make it easier for developers to work with SNAP, the toolbox has been containerized and is made available by different organizations such as mundialis [3] to be run as Docker containers. Hopsworks as of version 2.2.0 supports running docker containers as jobs in Hopsworks. That means users can seamlessly integrate running Docker containers as part of their pipelines built in Hopsworks [4]. 

Feature Engineering and Feature Validation

After having ingested the input data into the platform and applied any preprocessing steps, we proceed by engineering the features required by the deep learning training algorithm. Feature engineering in this example is done within Hopsworks by using Jupyter notebooks and Python. Feature engineering can also be performed by an external service and the curated feature data can then be inserted into the Hopsworks feature store. The latter is the service that allows data scientists to store, organize, discover, audit and share feature data that can be reused across multiple ML models. 

In the iceberg classifier example above, we use the band_1 and band_2 features to compute a new feature called band_avg. All the features are then organized into a feature group and inserted into the feature store as shown in the code snippets image below.

Create a new feature band_avg

Input data often contain noise, for example missing feature values or values of the wrong data type. Since the feature data needs to be ready for use by the ML programs, when inserting data into the feature store developers can make use of the feature validation API which is part of the feature store Python and Scala SDKs [5]. This API provides a plethora of rules that can be applied on data as that is being inserted into the feature store. 

In the iceberg feature group example we chose to apply three validation rules:

  • HAS_DATATYPE: Asserts that the feature id of the iceberg feature group does not contain null values. This is asserted by setting the max allowed null values to zero. Additionally, the is_iceberg label is also expected to only contain numbers by setting the threshold for required numeric values of is_iceberg to 1.
  • HAS_MAX: Assertion on the maximum allowed value of the is_iceberg label, which is set to 1.
  • HAS_MIN: Assertion on the minimum allowed value of the is_iceberg label, which is set to 0.

These rules are grouped in feature store expectations and can be set during the feature group creation call as shown in the image below.

Feature expectations Python API example

Training

Hopsworks comes equipped with two Python frameworks, namely experiments [6]  and Maggy [7], that enable data scientists to develop machine learning models at scale as well as manage machine learning experiment metadata. In particular, these frameworks enable scalable deep learning with GPUs across multiple machines, distribution transparent machine learning experiments, ablation studies, and writing core ML training logic as oblivious training functions. Maggy enables you to reuse the same training code whether training small models on your laptop or reusing the same code to scale out hyperparameter tuning or distributed deep learning on a cluster. 

This example uses TensorFlow version 2.4 for developing the model. When launching a machine learning experiment from Hopsworks, the Jupyter service provides users with different options depending on what type of training/experimentation is to be done. As seen in the image below, these types are Experiment, Parallel Experiments, Distributed Training. Experiment is used to conduct a single experiment while Parallel Experiments can significantly speed up the process of exploring hyperparameter combinations that work best for the ML model. Distributed Training automates the process of setting up and launching workers that will develop the model based on the selected distributed training strategy. 

For example the screenshot below shows how to perform hyperparameter optimization with Maggy for the iceberg classification example.

Iceberg hyperparameter optimization with Maggy - launch

Once all trials are executed, a summary of results is printed as the final output.

Iceberg hyper-parameter optimization with Maggy - results

For distributed training, the same model was used as in the previous sections, however, Jupyter was started with the Distributed Training configuration.

Iceberg distributed training function
Iceberg distributed training experiments API launch

In the context of machine learning, we can define an ablation study as “a scientific examination of a machine learning system by removing its building blocks in order to gain insight on their effects on its overall performance”. With Maggy, performing ablation studies of machine learning or deep learning systems is a fairly simple task that consists of the following steps:

  • Creating an AblationStudy instance,
  • Specifying the components that you want to ablate by including them in your AblationStudy instance,
  • Defining a base model generator function and/or a dataset generator function,
  • Wrapping your TensorFlow/Keras code in a Python function (called e.g., the training function) that receives two arguments (model_function and dataset_function), and
  • Launching your experiment with Maggy while specifying an ablation policy.
Maggy ablation studies notebook example - ablations


Maggy ablation studies notebook example - results

Model: Analysis, Serving, Monitoring

Data scientists working with Hopsworks can make use of the What-If [8] tool to test performance in hypothetical situations, analyze the importance of different data features, and visualize model behavior across multiple models and subsets of input data, and for different ML fairness metrics. The What-If tool is available out of box when working within a Hopsworks project.

Below you can see the code snippet used to perform model analysis for the sea iceberg classification model developed with the demonstrator dataset in this deliverable. Users set the number of data points to be displayed, the test dataset location to be used for analysis of the model, and the features to be used.

Model analysis what-if tool code snippet

The next screenshot depicts the performance and fairness of the model based on a particular feature of the model.

Performance and Fairness of the model

After a model has been developed and exported by the previous stages in the DL pipeline, it needs to be served so that external clients can use it for inference. Also as the model is being served, its performance needs to be monitored in real-time so that users can decide when it would be the best time to trigger the training stage. For the iceberg classification model,  Hopsworks uses TensorFlow Model Server on Kubernetes to serve the model in an elastic and scalable manner and  Spark/Kafka for monitoring and logging the inference requests. Users can then manage the serving instances from the Hopsworks UI and view logs as shown in the screenshot below. 

Model serving logs in Kibana

 Orchestration

All previous sections have demonstrated how to apply transformations and processing steps to data via a Deep Learning pipeline, in order to go from raw data into an ML model. So far all steps had to be manually executed in a proper order to produce the output model. However, once that process is established it can then be quite repetitive in nature. That means it decreases the efficiency of data scientists whose primary focus is on improving the accuracy of the models by applying novel techniques and algorithms. Such a repetitive process should then be automated and managed easily with the help of software tools.

One such tool is Apache Airflow [9], a platform to programmatically schedule and monitor workflows. Hopsworks provides Airflow as one of the services available in a project. Users can either create an orchestration pipeline with the Hopsworks UI or implement it themselves and then upload it to Hopsworks.

Airflow service UI in Hopsworks


Airflow tree-view tasks for the iceberg classification pipeline

Conclusion

In this blog post we presented a real-world example of developing an end-to-end machine learning pipeline for performing iceberg classification with Earth observation (remote sensing) data. The pipeline is developed using tools and services available in Hopsworks and the example’s code is available in the ExtremeEarth project GitHub repository [10]. 

References

  1. https://www.kaggle.com/c/statoil-iceberg-classifier-challenge/overview/description 
  2. https://step.esa.int/main/toolboxes/snap/ 
  3. https://github.com/mundialis/esa-snap
  4. https://hopsworks.readthedocs.io/en/stable/user_guide/hopsworks/jobs.html#docker 
  5. https://docs.hopsworks.ai/latest/generated/feature_validation/ 
  6. https://hopsworks.readthedocs.io/en/stable/hopsml/experiment.html
  7. https://maggy.ai/dev/ 
  8. https://pair-code.github.io/what-if-tool/ 
  9. https://airflow.apache.org/ 
  10. https://github.com/ExtremeEarth-Project/eo-ml-examples/tree/main/D1.8

Note: This blog is a republication from Extreme Earth.

Learn how to publish (write) and subscribe to (read) streams of events and how to interact with the schema registry and use Avro for data serialization.

Ahmad Al-Shishtawy

Guest Blogger

Read

RonDB 21.04.0 has integrated benchmark scripts to execute various benchmarks towards RonDB. We bring now the results of RonDB benchmark on AWS, Azure, and GCP.

With support to Apache Hudi, Hopsworks Feature Store offers lakehouse capabilities to improve automated feature pipelines and training pipelines (MLOps).

Davit Bzhalava

Head of Data Science

Read

Today we are pleased to announce the release of two new RonDB; 21.10.1, a beta release of RonDB 21.10 with 4 new features and a maintenance release for 21.04.

Mikael Ronström

Head of Data

Read

Hopsworks brings support for scale-out AI with the ExtremeEarth project which focuses on the most concerning issues of food security and sea mapping.

Theofilos Kakantousis

VP of Product

Read

The Hopsworks Feature Store abstracts away the complexity of a dual database system, unifying feature access for online and batch applications.

Moritz Meister

Software Engineer

Read

In this blog we performed a set of microbenchmarks. In particular, we compare RonDB with ScyllaDB for instruction cache on separating threads.

Mikael Ronström

Head of Data

Read

We have integrated Sysbench into the RonDB installation facilitate running benchmarks with RonDB. This blog will describe the use of these benchmarks in RonDB.

Mikael Ronström

Head of Data

Read

This tutorial gives an overview of how to work with Jupyter on the platform and train a state-of-the-art ML model using the fastai python library.

Robin Andersson

Software engineer

Read

HopsFS is an open-source scaleout metadata file system, but its primary use case is not Exabyte storage, rather customizable consistent metadata.

Jim Dowling

CEO

Read

Learn how to train a ML model in a distributed fashion without reformatting our code on Databricks with Maggy, open source tool available on Hopsworks.

Riccardo Grigoletto

Software Engineer

Read

This tutorial will show an overview of how to install and manage Python libraries in the platform.

Robin Andersson

Software engineer

Read

The first release of RonDB is now available. RonDB 21.04.0 brings new documentation, community support, 3x improved performance, bug fixes and new features.

Mikael Ronström

Head of Data

Read

Use open-source Maggy to write and debug PyTorch code on your local machine and run the code at scale without changing a single line in your program.

Moritz Meister

Software Engineer

Read

Evolve your models from stateless AI to Total Recall AI with the help of a Feature Store

Jim Dowling

CEO

Read

RonDB enables users to have full control over the assignment of threads to CPUs, how the CPU locking is to be performed and how the thread should be scheduled.

Mikael Ronström

Head of Data

Read

RonDB shows higher availability and the ability to handle larger data sets in comparison with Redis, paving the way to be the fastest key-value store available.

Mikael Ronström

Head of Data

Read

Learn how to design and ingest features, browse existing features, create training datasets as DataFrames or as files on Azure Blob storage.

Moritz Meister

Software Engineer

Read

RonDB is a managed key-value store with SQL capabilities. It provides the best low-latency, high throughput, and high availability database available today.

Mikael Ronström

Head of Data

Read

Connect the Hopsworks Feature Store to Amazon Redshift to transform your data into features to train models and make predictions.

Ermias Gebremeskel

Software engineer

Read

Hopsworks now supports dynamic role-based access control to indexes in elasticsearch with no performance penalty by building on Open Distro for Elasticsearch.

Mahmoud Ismail

Software engineer

Read

HopsFS-S3: cloud-native distributed hierarchical file system that has the same cost as S3, but has 100X the performance of S3 for file move/rename operations.

Mahmoud Ismail

Software engineer

Read

Hopsworks is the world's first Enterprise Feature Store along with an advanced end-to-end ML platform.

Theofilos Kakantousis

VP of Product

Read

Hopsworks is now available as a managed platform for Amazon Web Services (AWS) and Microsoft Azure with a comprehensive free tier.

Steffen Grohsschmiedt

Head of Cloud

Read

Hopsworks Feature Store API was rebuilt from the ground based on our extensive experience working with enterprise users requirements.

Fabio Buso

VP Engineering

Read

Use JOINs for feature reuse to save on infrastructure and the number of feature pipelines needed to maintain models in production.

Jim Dowling

CEO

Read

A data warehouse is an input to the Feature Store. A data warehouse is a single columnar database, while a feature store is implemented as two databases.

Jim Dowling

CEO

Read

Hopsworks supports machine learning experiments to track and distribute ML for free and with a built-in TensorBoard.

Robin Andersson

Software engineer

Read

This blog analyses the cost-benefits of Feature Stores for Machine Learning and estimates your return on investment with our Feature Store ROI Calculator.

Jim Dowling

CEO

Read

Integrate with third-party security standards and take advantage from our project-based multi-tenancy model to host data in one single shared cluster.

Antonios Kouzoupis

Software engineer

Read

Feature store as a new element and AI tool in the machine learning systems for the automotive industry.

Remco Frijling

Guest Blogger

Read

Try out Maggy for hyperparameter optimization or ablation studies now on Hopsworks.ai to access a new way of writing machine learning applications.

Moritz Meister

Software Engineer

Read

Learn how to integrate Kubeflow with Hopsworks and take advantage of its Feature Store and scale-out deep learning capabilities.

Jim Dowling

CEO

Read

Given the increasing interest in feature stores, we share our own experience of building one to help others who are considering following us down the same path.

Jim Dowling

CEO

Read

Integrate AWS SageMaker with Hopsworks to manage, discover and use features for creating training datasets and for serving features to operational models.

Fabio Buso

VP Engineering

Read

Get started with Hopsworks.ai to effortlessly launch and manage Hopsworks clusters in any AWS account that also integrates with Databricks and AWS SageMaker.

Steffen Grohsschmiedt

Head of Cloud

Read

This article introduces the Hopsworks Feature Store for Databricks, and how it can accelerate and govern your model development and operations on Databricks.

Fabio Buso

VP Engineering

Read

How ExtremeEarth Brings Large-scale AI to the Earth Observation Community with Hopsworks, the Data-intensive AI Platform

Theofilos Kakantousis

VP of Product

Read

Introducing the feature store which is a new data science tool for building and deploying better AI models in the gambling and casino business.

Jim Dowling

CEO

Read

How the Feature Store enables monolithic end-to-end ML pipelines to be decomposed into feature pipelines and model training pipelines.

Fabio Buso

VP Engineering

Read

The Hopsworks 1.x series has updates to the Feature Store, a new Experiments framework for ML, and connectivity with external systems (Databricks, Sagemaker).

Theofilos Kakantousis

VP of Product

Read

Anomaly detection and Deep learning for identifying money laundering . Less false positives and higher accuracy than traditional rule-based approaches.

Jim Dowling

CEO

Read

This is a guide to file formats for ml in Python. The Feature Store can store training/test data in a file format of choice on a file system of choice.

Jim Dowling

CEO

Read

Hopsworks supports easy hyperparameter optimization (both synchronous and asynchronous search), distributed training using PySpark, TensorFlow and GPUs.

Moritz Meister

Software Engineer

Read

Hopsworks now supports AMD GPUs and ROCm for deep learning, enabling developers to train deep learning models on AMD GPU hardware using TensorFlow.

Robin Andersson

Software engineer

Read

Hopsworks is replacing Horovod with Keras/TensorFlow’s new CollectiveAllReduceStrategy , a part of Keras/TensorFlow Estimator framework.

Robin Andersson

Software engineer

Read

Hopsworks is a data platform that integrates popular platforms for data processing such as Apache Spark, TensorFlow, Hops Hadoop, Kafka, and many others.

Jim Dowling

CEO

Read

How we use dynamic executors in PySpark to ensure GPUs are only allocated to executors only when they are training neural networks.

Robin Andersson

Software engineer

Read

Datasets used for deep learning may reach millions of files. The well known image dataset, ImageNet, contains 1m images, and its successor, the Open Images...

Jim Dowling

CEO

Read

When you train deep learning models with lots of high quality training data, you can beat state-of-the-art prediction models in a wide array of domains.

Jim Dowling

CEO

Read

Today, we are announcing that we have raised €1.25 million in seed funding, led by Inventure with participation by Frontline Ventures and AI Seed.

Jim Dowling

CEO

Read

If you are employing a team of Data Scientists for Deep Learning, a cluster manager to share GPUs between your team will maximize utilization of your GPUs.

Jim Dowling

CEO

Read

A Feature Store stores features. We go through data management for deep learning and present the first open-source feature store now in Hopsworks' ML Platform.

Kim Hammar

Software engineer

Read

Distributed ML Experiments on Databricks with Maggy

>
5/13/2021
Riccardo Grigoletto
>
Riccardo Grigoletto

TLDR; Maggy is an open-source framework that simplifies writing and maintaining distributed machine learning programs. By encapsulating your training logic in a function, the same code can be run unchanged with Python on your laptop or distributed using PySpark for hyperparameter tuning, data-parallel training, or model-parallel training. With the arrival of GPU support in Spark 3.0, PySpark can be now used to orchestrate distributed deep learning applications in TensorFlow and PySpark.  We are pleased to announce we have now added support for Maggy on Databricks, so training machine learning models with many workers should be as easy as running Python programs on your laptop. 

Introduction

Machine Learning is going to be distributed, as Andrew Ng calls it, Data-centric AI. Data volumes are constantly increasing, and models are known to improve their prediction accuracy predictably - so companies can often know what return they will get in terms of more accurate models by simply investing in collecting more training data. One of the main challenges for developers is to switch from programming for a single-machine to programming for a distributed cluster.

That's why we developed Maggy (maggy.ai), an open source Python library that introduces a new unified framework for writing core ML training logic as oblivious training functions. By encapsulating your training logic in a Python function (we call it an oblivious training function), the same code can be run unchanged in Python or PySpark. You don’t need to rewrite your code to do hyperparameter tuning, data-parallel training, or model-parallel training.

Maggy can be extremely useful in the case you want to build a distributed ML solution but you have no prior experience of distributed computing.  It is enough to just refactor your training code to wrap it inside a function (this is good development practice, in case you didn’t know) and let Maggy do the distribution magic for you.

Furthermore, it is not important whether you use Tensorflow or PyTorch as Maggy supports both of them. Scikit-Learn and XGBoost are also supported, and are used, in particular, for  parallelizing hyperparameter tuning over many workers.

Maggy on Databricks

In the following example, we use Maggy to train a Neural Network on the Iris dataset.

The first thing we have to do  is to create a cluster on your Databricks workspace. Maggy has been tested on the Databricks Runtime version 7.4 ML. Choose the number of workers you need and you are good to go.


If you don't know how to create a cluster, please follow the tutorial on this page. If it is your first time on Databricks, make sure to get familiar with the platform and prices. 

Once you created the cluster, you have to install Maggy. In order to do this, just navigate to your cluster and click Libraries, then click on Install New and PyPi, write 'maggy' as Package and Install.


Once you have installed Maggy, do the same thing with TensorFlow version 2.4 or higher. For example, on the Package field you can write 'tensorflow==2.4'.

Now we can open the Databricks notebook and write our first Maggy program.

Maggy workflow

In order to use Maggy on your workflow, we need to do the following:

  1. Create classes and functions to (a) create your model, (b) create your dataset,  and (c) define your model training logic.
  2. Pass the dataset or the dataset path, the data processing function and the hyperparameters to the configuration function of Maggy.
  3. Call the 'lagom' function of Maggy passing the training function and the configuration from the previous step.

We are now going to present an example on how to implement this workflow. We are going to distribute the training of a (very) simple machine learning model on the iris dataset. 

You can use this notebook as reference.

1a. Model creation

First of all, we have to wrap our ML model in a class, the class has to be an implementation of tf.keras.Model. 

It's important to note that we are not instantiating the class, we need to pass the class to Maggy, not an instance of it.

In this example we define a class called NeuralNetwork. It is a superclass of tf.keras.Sequential, an implementation of tf.keras.Model. Make sure that your class implements tf.keras.Model. Finally, we define our ML model in the init function.

-- CODE language-bash -- from tensorflow.keras.layers import Dense from tensorflow.keras import Sequential # you can use keras.Sequential(), you just need to override it # on a custom class and define the layers in __init__() class NeuralNetwork(Sequential): def __init__(self, nl=4): super().__init__() self.add(Dense(10,input_shape=(None,4),activation='tanh')) if nl >= 4: for i in range(0, nl-2): self.add(Dense(8,activation='tanh')) self.add(Dense(3,activation='softmax')) model = NeuralNetwork

1b. Dataset creation

First, we need to upload the Iris dataset on Databricks, on your databricks platform, click on Data on the sidebar and then Create Table, and upload Iris.csv that can be downloaded here

In order to use Maggy, we have to pass the training set, test set and optionally a function for data processing to the configuration file. 

The training and test sets can be:

  1. A tuple like train_set = (X_train, y_train). X_train and y_train can be a list, a numpy array or a TensorFlow dataset.
  2. The path to the training and test sets. In this case, you also need to provide a data process function containing the instructions to consume and transform the data, as per the following code snippet.

-- CODE language-bash -- def process_data(train_set_path, test_set_path): import pandas as pd import numpy as np from sklearn.preprocessing import LabelEncoder from sklearn.model_selection import train_test_split dataset_path = "dbfs:/FileStore/tables/Iris.csv" train_set, test_set = spark.read.format("csv")\ .option("header","true")\ .option("inferSchema", "true")\ .load(dataset_path).drop('_c0').randomSplit((0.80, 0.20), seed=0) raw_train_set = train_set.toPandas().values raw_test_set = test_set.toPandas().values X_train = train_set[:,0:4] y_train = train_set[:,4:] X_test = test_set[:,0:4] y_test = test_set[:,4:] return (X_train, y_train), (X_test, y_test)

1c. Create an HPO function

We now wrap the code containing the logics of your experiment in a function.

For HPO, we have to define a function that has the HPs to be optimized as parameters. Inside the function we simply put the training logic as we were training our model in a single machine using Tensorflow. Maggy will run this function multiple times using different parameters for you, as we will see in section 3a.

-- CODE language-bash -- def hpo_function(number_layers, reporter): model = NeuralNetwork(nl=number_layers) model.build() #fitting the model and predicting model.compile(Adam(lr=0.04),'categorical_crossentropy', metrics=['accuracy']) train_input, test_input = process_data(raw_train_set, raw_test_set) train_batch_size = 75 test_batch_size = 15 epochs = 10 model.fit(x=train_input[0], y=train_input[1], batch_size=train_batch_size, epochs=epochs, verbose=1) score = model.evaluate(x=test_input[0], y=test_input[1], batch_size=test_batch_size, verbose=1) print(f'Test loss: {score[0]}') print(f'Test accuracy: {score[1]}') return score[1]

1d. Create an oblivious training function

The training function provides the instruction to run the training and evaluation of your model, given the data passed in the configuration. You just need to wrap the instructions you implemented and eventual hyperparameters (for example the values to pass in the model constructor). The training function has to return a value or a list of values that corresponds to the evaluation results.

-- CODE language-bash -- def training_function(model, train_set, test_set, hparams): model = model() model.build() # fitting the model and predicting model.compile(Adam(lr=0.04), 'categorical_crossentropy', metrics=['accuracy'] ) model.fit(train_set,epochs=20) accuracy = model.evaluate(test_set) return accuracy

It's important to note that in the HPO function we did not pass the model as a parameter while we did that in our oblivious training function. This is because, when using Maggy for distributed training, the library has to patch some functions of the model class.

2a. Configure Maggy for HPO

The next step is to create a configuration instance for Maggy. Since in this example we are using Maggy for hyperparameter optimization and distributed training using TensorFlow, we will use OptimizationConfig and TfDistributedConfig. 

OptimizationConfig contains the information about the hyperparameter optimization. 

We need to define a Searchspace class that contains the hyperparameters we want to optimize. In this example we want to search for the optimal number of layers of the neural network from 2 to 8 layers. 

OptimizationConfig contains the following parameters:

  • num_trials: Controls how many separate runs are conducted during the hp search.
  • optimizer: Optimizer type for searching the hp searchspace.
  • searchspace: A Searchspace object configuring the names, types and ranges of hps.
  • optimization_key: Name of the metric to use for hp search evaluation.
  • direction: Direction of optimization.
  • es_interval: Early stopping polling frequency during an experiment run.
  • es_min: Minimum number of experiments to conduct before starting the early stopping mechanism. Useful to establish a baseline for performance estimates.
  • es_policy: Early stopping policy which formulates a rule for triggering aborts.
  • name: Experiment name.
  • description: A description of the experiment.
  • hb_interval: Heartbeat interval with which the server is polling.

-- CODE language-bash -- from maggy.experiment_config import OptimizationConfig from maggy import Searchspace sp = Searchspace(number_layers=('INTEGER', [2, 8])) hpo_config = OptimizationConfig(num_trials=4, optimizer="randomsearch", searchspace=sp, direction="max", es_interval=1, es_min=5, name="hp_tuning_test", )

2b. Run distributed HPO

Our HPO function and configuration class are now ready, so we can go on and run the HPO experiment. In order to do that, we run the lagom function, passing our training function and the configuration object we instantiated during the last step.

If you are wondering what lagom means, Lagom is a swedish word representing some cultural aspects of balance and equilibrium, in english could be translated as "just the right amount" or "less is more".

-- CODE language-bash -- from maggy import experiment result = experiment.lagom(train_fn=hpo_function, config=hpo_config)



This function will print the HPO summary. As you can see, there are several values returned, we are most interested in the 'best_config' dictionary, it contains the parameters for which the model performed the best. 

3a. Configure distributed training

Now it's time to run the final step of our ML program. Let's initialize the configuration class for the distributed training. First, we need to define our hyperparameters, we want to take the best hyperparameters from the HPO.

TfDistributedConfig class has the following parameters:

  • name: the name of the experiment.
  • module: the model to be trained (defined in the first step of this guideline).
  • train_set: the train set as a tuple (x_train, y_train) or the train set path.
  • test_set: the test set as a tuple (x_test, y_test) or the test set path.
  • process_data: the function to process the data (if needed).

hparams: the model and dataset parameters. In this case we also need to provide the 'train_batch_size' and the 'test_batch_size', these values represent the subset sizes of the sharded dataset. It's value is usually the dataset_size/number_workers but can change depending on your needs.

-- CODE language-bash -- from maggy.experiment_config.tf_distributed import TfDistributedConfig #define the constructor parameters of your model model_params = { #train dataset entries / num_workers 'train_batch_size': 75, #test dataset entries / num_workers 'test_batch_size': 15, 'learning_rate': 0.04, 'epochs': 20, 'number_layers': result['best_config']['number_layers'], } training_config = TfDistributedConfig(name="tf_test", model=model, train_set=train_set, test_set=test_set, process_data=process_data, hparams = model_params)

3b. Run distributed training


Finally, let's run the distributed training using the lagom function.

-- CODE language-bash -- experiment.lagom(training_function, training_config)


Maggy will run the distributed training using the number of workers and resources available to the cluster defined. Finally, it will prompt the test results. The training log can be found in the Spark UI on Databricks.

Try it now!

Maggy is open-source and everyone can contribute to the project. Feel free to experiment with the library and contact us for any questions or issues. You can reach out to us via GitHub or the hopsworks community. You can also give us a star on GitHub to let us know you appreciate our work.

Conclusion

In this article we saw how to train a ML model in a distributed fashion without reformatting our code, thanks to Maggy. Maggy is available on hopsworks.ai. If you want to know more about how to develop your ML projects faster, you may want to check out this article.

How to manage Python libraries in Hopsworks

Hopsworks
>
5/12/2021
Robin Andersson
>
Robin Andersson

TLDR: Hopsworks is the Data-Intensive AI platform with a Feature Store for building complete end-to-end machine learning pipelines. This tutorial will show an overview of how to install and manage Python libraries in the platform. Hopsworks provides a Python environment per project that is shared among all the users in the project. All common installation alternatives are supported such as using the pip and conda package managers, in addition to libraries packaged in a .whl or .egg file and those that reside on a git repository. Furthermore, the service includes automatic conflict/dependency detection, upgrade notifications for platform libraries and easily accessible installations logs.


Introduction

The Python ecosystem is huge and seemingly ever growing. In a similar fashion, the number of ways to install your favorite package also seems to increase. As such, a data analysis platform needs to support a great variety of these options.

The Hopsworks installation ships with a Miniconda environment that comes preinstalled with the most popular libraries you can find in a data scientists toolkit, including TensorFlow, PyTorch and scikit-learn. The environment may be managed using the Hopsworks Python service to install or libraries which may then be used in Jupyter or the Jobs service in the platform.

In this blog post we will describe how to install a python library, wherever it may reside, in the Hopsworks platform. As an example, this tutorial will demonstrate how to install the Hopsworks Feature Store client library, called hsfs. The library is Apache V2 licensed, available on github and published on PyPi.

Prerequisites

To follow this tutorial you should have an Hopsworks instance running on https://hopsworks.ai. You can register for free, without providing credit card information, and receive USD 4000 worth of free credits to get started. The only thing you need to do is to connect your cloud account.

Navigate to Python service

The first step to get started with the platform and install libraries in the python environment is to create a project and then navigate to the Python service.

Inspecting the environment

When a project is created, the python environment is also initialized for the project. An overview of all the libraries and their versions along with the package manager that was used to install them are listed under the Manage Environment tab.

Installing hsfs by name

The simplest alternative to install the hsfs library is to enter the name as is done below and click Install as is shown in the example. The installation itself can then be tracked under the Ongoing Operations tab and when the installation is finished the library appears under the Manage Environment tab.

If hsfs would also have been available on an Anaconda repo, which is currently not the case, we would need to specify the channel where it resides. Searching for libraries on Anaconda repos is accessed by setting Conda as the package location.

Search and install specific version

If a versioned installation is desired to get the hsfs version compatible with a certain Hopsworks installation, the search functionality shows all the versions that have been published to PyPi in a dropdown. Simply pick a version and press Install. The example found below demonstrates this.

Installing a distribution

Many popular Python libraries have a great variety of builds for different platforms, architectures and with different build flags enabled. As such it is also important to support directly installing a distribution. To install hsfs as a wheel requires that the .whl file was previously uploaded to a Dataset in the project. After that we need to select the Upload tab, which means that the library we want to install is contained in an uploaded file. Then click the Browse button to navigate in the file selector to the distribution file and click Install as the following example demonstrates.

Installing a requirements.txt

Installing libraries one by one can be tedious and error prone, in that case it may be easier to use a requirements.txt file to define all the dependencies. This makes it easier to move your existing Python environment to Hopsworks, instead of having to install each library one by one.

The file may for example look like this.

This dependency list defines that version 2.2.7 of hsfs should be installed, along with version 2.9.0 of imageio and the latest available release for mahotas. The file needs to be uploaded to a dataset as in the previous example and then selected in the UI. 

Installing from a git repository

A great deal of python libraries are hosted on git repositories, this makes it especially handy to install a library during the development phase from a git repository. The source code for the hsfs package is as previously mentioned, hosted on a public github repository. Which means we only need to supply the URL to the repository and some optional query parameters. The subdirectory=python query parameter indicates that the setup.py file, which is needed to install the package, is in a subdirectory in the repository called python.

Automatic conflict detection

After each installation or uninstall of a library, the environment is analyzed to detect libraries that may not work properly. As hsfs depends on several libraries, it is important to analyze the environment and see if there are any dependency issues. For example, pandas is a dependency of hsfs and if uninstalled, an alert will appear that informs the user that the library is missing. In the following example pandas is uninstalled to demonstrate that.

Notification on library updates

When new releases are available for the hsfs library, a notification is shown to make it simple to upgrade to the latest version. Currently, only the hops and hsfs libraries are monitored for new releases as they are utility libraries used to interact with the platform services. By clicking the upgrade text, the library is upgraded to the recommended version automatically.

When installations fail

Speaking from experience, Python library installations can fail for a seemingly endless number of reasons. As such, to find the cause, it is crucial to be able to access the installation logs in a simple way to find a meaningful error. In the following example, an incorrect version of hsfs is being installed, and the cause of failure can be found a few seconds later in the logs.

Get started

Hopsworks is available both on AWS and Azure as a managed platform. Visit hopsworks.ai to try it out.

RonDB 21.04.0: Open Source Installation Now Available

RonDB
>
4/29/2021
Mikael Ronström
>
Mikael Ronström
Antonios Kouzoupis

TLDR; The first release of RonDB is now available. RonDB is currently the best low-latency, high availability, high throughput, and high scalability (LATS) database available today. In addition to new documentation and community support, RonDB 21.04.0 brings automated memory configuration, automated thread configuration, improved networking handling, 3x improved performance, bug fixes and new features. It is available as a managed service on the Hopsworks platform and it can also be installed on-premises with the installation scripts or binary tarball.

What’s new in RonDB 21.04.0?

RonDB 21.04.0 is based on MySQL NDB Cluster 8.0.23. The first release includes the following new features and improvements:

  • New documentation of RonDB and its use as a managed service.
  • Community support for questions and bug reports.
  • Automated memory configuration of different memory pools based on available memory
  • Automated thread configuration based on the available CPU resources
  • Improved CPU and network handling
  • Configurable number of replicas
  • Integrate benchmarking tools in RonDB distribution
  • Performance Improvement in ClusterJ API

New documentation

Our new documentation walks through how to use RonDB as a managed service on the Hopsworks platform and to install it on-premises with automated scripts or binary tarball. We have also added new documentation of the RonDB software.

Community support

Welcome to the RonDB community! Feel free to ask any question and our developers will try to reply as soon as possible. 

Automatic memory configuration

NDB has historically required setting a large number of configuration parameters to set memory sizes of various memory pools. RonDB 21.04.0 introduces automatic memory configuration taking away all these configuration properties. RonDB data nodes will use the entire memory available in the server/VM. You can limit the amount of memory it can use with a new configuration parameter TotalMemoryConfig.

Automatic thread configuration

For increased performance and stability, RonDB will automate thread configuration. This feature was introduced in NDB 8.0.23, but in RonDB it is the default behaviour. RonDB makes use of all accessible CPUs and also handles CPU locking automatically. Read more about automated thread configuration on our latest blog.

Improved CPU and network handling

RonDB will perform better under heavy load by employing improved heuristics in thread spinning and in the network stack.

Configurable number of replicas

In NDB 8.0.23 and earlier releases of NDB one could only set NoOfReplicas at the initial start of the cluster. The only method to increase or decrease replication level was to perform a backup and restore. In RonDB 21.04.0 we introduce Active and Inactive nodes, making it possible to change the number of replicas without having to perform an initial restart of the cluster.

Integrate benchmarking tools in RonDB distribution

We have integrated a number of benchmark tools to assess the performance of RonDB. Benchmarking RonDB is now easy as we ship Sysbench, DBT2, flexAsynch and DBT3 benchmarks along with our binary distribution. Support for ClusterJ benchmarks are expected to come in upcoming product releases.

Performance Improvement in ClusterJ API

A new addition to the ClusterJ API was added that releases data objects and Session objects to a cache rather than releasing them fully. In addition an improvement of the garbage collection handling in ClusterJ was handled. These improvements led to a 3x improvement in a simple key lookup benchmark.

How to get started with RonDB?

There are three ways to use RonDB 21.04.0:

  1. Managed version available on the Enterprise Hopsworks platform. The RonDB cluster is integrated with Hopsworks and can be used for both RonDB applications as well as for Hopsworks applications. Access our full documentation to get started.
  2. Open source automated installation. Use a script that automates the creation of VMs and the installation of the software components required by RonDB. These scripts are available to create RonDB clusters on Azure and GCP. This script can be downloaded from https://repo.hops.works/installer/21.04/rondb-cloud-installer.sh.
  3. Binary tarball installation. Download the RonDB binary tarball and install it on any computers of your own liking. The binary tarball is available here.
Star us on Github
Follow us on Twitter

From 100 to ZeRO: PyTorch and DeepSpeed ZeRO on any Spark Cluster with Maggy

>
4/27/2021
Moritz Meister
>
Moritz Meister
Riccardo Grigoletto

TLDR; Maggy is an open source framework that lets you write generic PyTorch training code (as if it is written to run on a single machine) and execute that training distributed across a GPU cluster. Maggy enables you to write and debug PyTorch code on your local machine, and then run the same code at scale without having to change a single line in your program. Going even further, Maggy provides the distribution transparent use of ZeRO, a sharded optimizer recently proposed by Microsoft. You can use ZeRO to improve your memory efficiency with a single change in your Maggy configuration. You can try Maggy in the Hopsworks managed platform for free.

Distributed learning - An introduction

Deep learning has seen a surge in activity with the availability of high level frameworks such as PyTorch to build and train models. A few lines of code in a notebook are sufficient to create powerful classifiers from scratch. However, both the data and model sizes to achieve state-of-the-art performance are ever increasing, so that training on your local GPU becomes a hopeless endeavour. 

Enter distributed training. Distributed training allows you to train the same model on multiple GPUs on different shards of your data to speed up training times. In the ideal case, training on 4 GPUs simultaneously should reduce your training time by 75%. In distributed training, each GPU computes a forward and backward pass over its own batch of the data. For the model update, the computed gradients are shared and averaged between the nodes. This way, all models update their parameters with the same combined gradient and stay in sync. This additional communication step introduces additional overhead of course, which is why ideal scaling is never truly achieved. 

So if distributed training is such a great tool to accelerate training, why is its use still uncommon among normal PyTorch users? Because it is too tedious to use! A dummy example for starting distributed training might look something like this.

def train(args):
    args.world_size = args.gpus * args.nodes
    os.environ['MASTER_ADDR'] = '10.51.45.25'
    os.environ['MASTER_PORT'] = '8888'
    mp.spawn(train, nprocs=args.gpus, args=(args,))
    rank = args.nr * args.gpus + gpus
    dist.init_process_group(backend='nccl',
                            init_method='env://',
                            world_size=args.world_size,
                            rank=rank)
    torch.manual_seed(0)
    model = args.Module()
    torch.cuda.set_device(gpu)
    model = nn.parallel.DistributedDataParallel(
        model.cuda(gpu),
        device_ids=[gpu])
    ...

Going even further, you would need to launch your code on all of your nodes and take care of graceful shutdowns and collecting the results. This is where Maggy comes in. Maggy allows you to launch your PyTorch training script without any changes on Spark clusters. It takes care of the training processes for each node, the resource isolation and node connections.

Next we will explore what is needed to run distribution transparent training on Maggy as well as the restrictions that still exist with the framework.

Building blocks for distribution transparent training

Configuring Maggy

First of all, Maggy requires its experiment to be configured for distributed training. In the most common use case this means passing your model, hyperparameters and your training/test set. Configuring is as easy as creating a config object. Hyperparameters, train and test set are optional and can also be directly loaded in the training loop. If your training loop consists of more than one module such as in training GANs with a Generator and Discriminator or Policy gradient methods in RL, you can also pass a list of modules.

from maggy.experiment_config import TorchDistributedConfig

config = TorchDistributedConfig(module=models.resnet50,train_set=train_set, test_set=test_set)

Writing the training function

Maggy’s API requires the training function to follow a unified signature. Users have to pass their model class, its hyperparameters and the train and test set to the training function. 

def train(module, hparams, train_set, test_set):
...

If you want to load your datasets on each node by yourself, you can also omit passing the datasets in the config. In fact, this is highly recommended when working with larger dataset objects. Additionally, every module used in the training function should be imported within that function. Think of your training function as completely self contained. Last but not least, users should use the PyTorch DataLoader (as is best practice anyways). Alternatively, you can also use Maggy’s custom PetastormDataLoader to load large datasets from Petastorm parquet files. When using the latter, users need to ensure that datasets are even, that is they should have the same number of batches per epoch on all nodes. When using PyTorch’s DataLoader, you do not have to care about this. So to summarize, your training function needs to

  • Implement the correct signature
  • Import all used modules inside the function
  • Use the PyTorch DataLoader (or Maggy’s PetastormDataLoader with even Datasets)

Distributed training on Maggy - A complete example

It’s time to combine all the elements we introduced so far in a complete example of distributed training with Maggy. In this example, we are going to create some arbitrary training data, define a function approximator for scalar fields,write our training loop and launch the distributed training. 

Generate some training data

In order to not rely on specific datasets, we are going to create our own dataset. For this example, a scalar field should suffice. So first of all we randomly sample x and y and compute some function we want our neural network to approximate. PyTorch’s TensorDataset can then be used to form a proper dataset from this data.

import torch
import torch.nn as nn
import torch.nn.functional as F


coord = torch.rand((10000,2)) * 10 - 5  # Create random x/y coordinates in [-5,5]
z = torch.sin(coord[:,1]) + torch.cos(coord[:,0])  # Calculate scalar field for all points to get a dataset
train_set = torch.utils.data.TensorDataset(coord[:8000,:], z[:8000])
test_set = torch.utils.data.TensorDataset(coord[8000:,:], z[8000:])

Define the approximator

Next up we define our function approximator. For our example a standard neural network with 3 layers suffices, although in real applications you would of course train much larger networks.

class Approximator(torch.nn.Module):

    def __init__(self):
        super().__init__()
        self.l1 = torch.nn.Linear(2,100)
        self.l2 = torch.nn.Linear(100,100)
        self.l3 = torch.nn.Linear(100,1)
        
    def forward(self, x):

Writing the training loop

At the heart of every PyTorch program lies the training loop. Following the APIs introduced earlier, we define our training function as follows.

def train(module, hparams, train_set, test_set):
    import torch
    model = module()

    n_epochs = 100
    batch_size = 64
    lr = 1e-5    
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    loss_criterion = torch.nn.MSELoss()
    train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size)
    test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size)    
    def eval_model():
        loss = 0
        model.eval()
        for coord, z in test_loader:
            prediction = model(coord).detach()
            loss += loss_criterion(prediction, z)
        return loss

    for epoch in range(n_epochs):
        model.train()
        for coord, z in train_loader:
            optimizer.zero_grad()
            prediction = model(coord)
            loss = loss_criterion(prediction, z)
            loss.backward()
            optimizer.step()
    return eval_model()

As you can see, there is no additional code for distributed training. Maggy takes care of all the necessary things. 

Starting the training

All that remains now is to configure Maggy and run our training. For this, we have to create the config object and run the lagom function.

from maggy import experiment
from maggy.experiment_config import TorchDistributedConfig

config = TorchDistributedConfig(module=Approximator, train_set = train_set, test_set=test_set)
experiment.lagom(train, config)  # Starts the training loop

Evaluating the training

After running the training on 4 nodes, we can see that our approximator has converged to a good estimate of our scalar field. Of course, this would also be possible on a local node. But with more complex models and larger training sets such as the ImageNet dataset, distributed learning becomes necessary to leverage your workloads.

Try it for yourself

Maggy is open-source and documentation is available at maggy.ai. Give us a star or get in touch if you have more questions. Maggy is also available for all Hopsworks users in the managed platform on AWS or Azure. You can get started for free (no credit card required).

Star us on Github
Follow us on Twitter

Beyond Brainless AI with a Feature Store

>
4/27/2021
Jim Dowling
>
Jim Dowling

TLDR; Machine learning models are only as good as the data (features) they are trained on. In Enterprises, data scientists can often train very effective models in the lab - when given a free hand on which data to use. However, many of those data sources are not available in production environments due to disconnected systems and data silos. An AI-powered product that is limited to the data available within its application silo cannot recall historical data about its users or relevant contextual data from external sources. It is like a jellyfish - its autonomic system makes it functional and useful, but it lacks a brain. You can, however, evolve your models from brain-free AI to Total Recall AI with the help of a Feature Store, a centralized platform that can provide models with low latency access to data spanning the whole enterprise.

Autonomic AI

Jellyfish are undoubtedly complex creatures with sophisticated behaviour - they move, mate, and munch. They eat and discard waste from the same opening. Yet they have no brain - their autonomic system suffices for their needs. The biggest breakthroughs in AI in recent years have been enabled by deep learning, which requires large volumes of data and specialized compute hardware (e.g., GPUs). However, just like a jellyfish, recent successes in image processing and NLP with deep learning required no brain - no working memory, history, or context. Much of deep learning today is Jellyfish AI. We have made incredible progress in identifying objects in images and translating natural language, yet such deep learning models typically only require the immediate input - the image or the text - to make their predictions. The input signal is information-rich. These image and NLP models seldom require a 'brain' to augment the input with context or memories. Google translate doesn’t need to know the historical enmity between the Scots and the Irish in whether its spelt Whisky or Whiskey. Jellyfish AI is impressive - the input data is information rich and models can learn fantastically advanced behaviour from labeled examples. All “knowledge” needed to make predictions is embedded in the model. The model does not need to have working memory (e.g., it doesn’t need to know the user has clicked 10 times on your website in the last minute). 

Now compare using AI for image classification or NLP to building a web application that will use AI to interact with a user browsing a website. The immediate input data your application receives from your web browser are clicks on a mouse or a keyboard. The input signal is information-light - it is difficult to train a useful model using only user clicks. However, large Internet companies collect reams of information about users from many different sources, and transform that user data into features (information-rich signals that are ready to be used for either training models or making predictions with models). Models can then combine the click features with historical features about users and contextual features to build information-rich inputs to models. For example, you could augment the user’s action with everything you know about a user’s history and context to increase the user's engagement with the product. The feature store for machine learning (ML) stores and serves these features to models. We believe that AI-powered products that can easily access historical and contextual features will lead the next wave of AI in the Enterprise, and those products will need a feature store for ML.

Data Scientist and ML Engineer Disconnect

A frequent source of tension in Enterprises is between “naive” data scientists and “street-wise” ML engineers. Motivated by good software engineering practices, many ML engineers believe that ML models should be self-contained and tension can arise with data scientists who want to include features in their models that are “obviously not available in the production system”. 

However, data scientists are tasked with building the best models they can to add to the bottom line - engage more users, increase revenue, reduce costs. They know they can train better models with more data and more diverse sources of data. For example, a data scientist trying to predict if a financial transaction is suspected of money laundering or not might discover that a powerful feature is the graph of financial transfers related to this individual in the previous day/week/month. They can reduce false alerts of money launder by a factor of 100*, reducing the costs of investigating the false alerts, saving the business millions of dollars per year. The data scientist hands the model over the wall to the ML engineer who dismisses the idea of including the graph-based features in the production environment, and tension arises when communicating what is possible and what is not possible in production. The data scientist is crestfallen - but need not be.

The Feature Store is now the de facto Enterprise platform for storing historical and contextual features for AI-powered products. The Feature Store is, in effect, the brain for AI-powered products, the three-eyed Raven that enables the model to access the history and state of the whole Enterprise, not just the local state in the application. 

Feature Stores enable applications or model serving infrastructure to take information-light inputs (such as a cookie identifying a user or a shopping cart session) and enrich it with features harvested from anywhere in the Enterprise or beyond to build feature vectors capable of making better predictions. And as we know from Deep Learning, model accuracy improves predictably with more features and data, so there will be an increasing trend towards adding more and more features to models to improve their accuracy. Andrew Ng has recently been advocating this approach that he calls data-centric development, instead of the more traditional model-centric development. Another noticeable trend in large Enterprises is building faster and more scalable Feature stores that can supply those features within the time budget available to the AI-powered product. But AI is going to revolutionize Enterprise software products, so how do we make sure our AI-enabled products are not just Jellyfish AI?

Enabling AI-Enabled Products with a Feature Store

Anti-Pattern: Re-implementing the “feature engineering” code for the serving layer is non DRY. This introduces the risk of ‘skew’ between the features used to train models and the features served to operational models.

How do we avoid limiting AI-enabled products to only using the input features collected by the application itself? Models will benefit from access to all data that the Enterprise has collected about the user, product, or its context. A potential source of friction here, however, is the dominant architectural preference for microservices and data stove-pipes. Models themselves are being deployed as microservices in model-serving infrastructure, like KFServing or TensorFlow Serving or Nvidia Triton. How can we give these models access to more features?

Anti-Pattern: microservice based Online Feature Store. Microservices can be used to compute features in real-time from raw input data. When features can be pre-computed, microservices is an anti-pattern. This architecture adds latency, needs to be made highly available, handle hotspots, and microservices consume resources even when they are not needed. Serverless functions might be acceptable in the case where seconds of warmup latency is tolerable. But the microservices should still be reused to compute the training data - otherwise, there is a risk of training/serving skew. 

Without a Feature Store, applications could contact microservices or databases to compute or retrieve the historical and contextual features (data), respectively. Computing the features in the application itself is an anti-pattern as it duplicates the feature engineering code - that code should already exist to generate the training data for the model. Re-implementing feature engineering logic in applications also introduces the risk of skew between the features computed in the application and the features computed for training. If serving and training environments use the same programming language, they could avoid non-DRY code by reusing a versioned library that computes the features. However, even if feature engineering logic is written in Python in both training and serving, it may use PySpark for training and Python for serving or different versions of Python. Versioned libraries can help, but are not a general solution to the feature skew problem. 

The Feature Store solves the training/serving skew problem by computing the features once in a feature pipeline. The feature pipeline is then reused to (1) create training data, and (2) save those pre-engineered features to the Feature Store. The serving infrastructure can then retrieve those features when needed to make predictions. For example, when an application wants to make a prediction about a user, it would supply the user’s ID, shopping cart ID, session ID, or location to retrieve pre-engineered features from the Feature Store. The features are retrieved as a feature vector, and the feature vector is sent to the model that makes the prediction. The Feature Store service for retrieving feature vectors is commonly known as the Online Feature Store. The logic for retrieving features from the Online Feature Store can also be implemented in model serving infrastructure, not just in applications. The advantage of looking up features in serving infrastructure is that it keeps the application logic cleaner, and the application just sends IDs and real-time features to the model serving infrastructure, that in turn, builds the feature vector, sends it to the model for prediction, and returns the result to the application. Low latency and high throughput are important properties for the online feature store - the faster you can retrieve features and the more features you can include in a given time budget, the more accurate models you should be able to deploy in production. To quote DoorDash:

“Latency on feature stores is a part of model serving, and model serving latencies tend to be in the low milliseconds range. Thus, read latency has to be proportionately lower” 
AI-enabled products use models to make predictions, and those models need an Online Feature Store to provide them with historical and contextual data (features) to make better predictions.

Data-Centric AI with the Online Feature Store 

So, to summarize, if you want to give your ML models a brain, connect them up to a feature store. For Enterprises building personalized services, the featurestore can enrich their models with a 360 degree Enterprise wide view of the customer - not just a product-specific view of the customer. The feature store enables more accurate predictions through more data being available to make those predictions, and this ultimately enables products with better user experience, increased engagement, and the product intelligence now expected by users.

* This is based on a true story.

Star us on Github
Follow us on Twitter

RonDB, automatic thread configuration

RonDB
>
3/24/2021
Mikael Ronström
>
Mikael Ronström

This blog introduces how RonDB handles automatic thread configuration. It is more technical and dives deeper under the surface of how RonDB operates. RonDB provides a configuration option, ThreadConfig, whereby the user can have full control over the assignment of threads to CPUs, how the CPU locking is to be performed and how the thread should be scheduled.

However, for the absolute majority of users this is too advanced, thus the managed version of RonDB ensures that this thread configuration is based on best practices found over decades of testing. This means that every user of the managed version of RonDB will get access to a thread configuration that is optimised for their particular VM size.

In addition RonDB makes use of adaptive CPU spinning in a way that limits the power usage, but still provides very low latency in all database operations. Adaptive CPU spinning improves latency by up to 50% and in most cases more than 10% improvement.

RonDB 21.04 uses automatic thread configuration by default. This means that as a user you don’t have to care about the configuration of threads. What RonDB does is that it retrieves the number of CPUs available to the RonDB data node process. In the managed version of RonDB, this is the full VM or bare metal server available to the data node. In the open source version of RonDB, one can also limit the amount of CPUs available to RonDB data nodes process by using taskset or numactl when starting the data node. RonDB retrieves information about CPU cores, CPU sockets, and connections to the L3 caches of the CPUs. All of this information is used to set up the optimal thread configuration.

Thread types in RonDB


LDM threads house the data, query threads handle read committed queries, tc threads handle transaction coordination, receive threads handle incoming network messages, send thread handle the sending of network messages, and main threads handle metadata operations, asynchronous replication and a number of other things.

LDM threads

LDM thread is a key thread type. The LDM thread is responsible for reading and writing data. It manages the hash indexes, the ordered indexes, the actual data, and a set of triggers performing actions for indexes, foreign keys, full replication, asynchronous replication. This thread type is where most of the CPU processing is done. RonDB has an extremely high number of instructions per cycle compared to any other DBMS engine. The LDM thread often executes 1.25 instructions per cycle where many other DBMS engines have reported numbers around 0.25 instructions per cycle. This is a key reason why RonDB has such a great performance both in terms of throughput and latency. This is the result of the design of data structures in RonDB that are CPU cache aware and also due to the functional separation of thread types.

Query threads

Query thread is a new addition that was introduced in NDB Cluster 8.0.23. In NDB this is not used by default, RonDB enables the use of query threads by default in the automatic thread configuration. The query threads run the same code as the LDM threads and handles a subset of the operations that the LDM can handle. A normal SELECT query will use read committed queries that can be executed by the query threads. A table partition (sometimes referred to as a table fragment or shard) belongs to a certain LDM thread, thus only this LDM thread can be used for writes and locked reads on rows in this table partition. However for read committed queries, the query threads can be used.

To achieve the best performance RonDB uses CPU locking. In Linux, it is quite common that a thread migrates from one CPU to another CPU. If the thread migrates to a CPU belonging to a different CPU core, the thread will suffer a lot of CPU cache misses immediately after being migrated. To avoid this, RonDB locks threads to specific CPU cores. Thus, it is possible to migrate the thread, but only to another CPU in a CPU core that shares the same CPU caches.

Query threads and LDM threads are organised into Round Robin groups. Each Round Robin group consists of between 4 and 8 LDM threads and the same amount of query threads. All threads within one Round Robin group share the same CPU L3 cache. This ensures that we retain the CPU efficiency even with the introduction of these new query threads. This is important since query threads introduce new mutexes and the performance of these are greatly improved when threads sharing mutexes also share CPU caches. The query thread chosen to execute a query must be in the same Round Robin group as the data owning LDM thread is.

Query threads make it possible to decrease the amount of partitions in a table. As an example, we are able to process more than 3 times as many transactions per second using a single partition in Sysbench OLTP RW compared to when we only use LDM threads. Most key-value stores have data divided into table partitions for the primary key of the table. Many key-value stores also contain additional indexes on columns that are not used for partitioning. Since the table is partitioned, this means that each table partition will contain each of those additional indexes. When performing a range scan on such an index, each table partition must be scanned. Thus the cost of performing range scans increases as the number of table partitions increases. RonDB can scale the reads in a single partition to many query threads, this makes it possible to decrease the number of table partitions in RonDB. In Sysbench OLTP RW this improves performance by around 20% even in a fairly small 2-node setup of RonDB.

In addition query threads ensure that hotspots in the tables can be handled by many threads, thus avoiding the need to partition even more to handle hotspots.

At the same time a modest amount of table partitions increases the amount of writes that we can perform on a table and it makes it possible to parallelise range scans which will speed up complex query execution significantly. Thus in RonDB we have attempted to find a balance between overhead and improved parallelism and improved write scalability.

The cost of key lookups is not greatly affected by the number of partitions since those use a hash lookup and thus always go directly to the thread that can execute the key lookup.

RonDB locks LDM threads and query threads in pairs. There is one LDM thread and one query thread in each such LDM group, we attempt to lock this LDM Group to one CPU core. LDM Groups are organised into Round Robin Groups.

A common choice for a scheduling algorithm in an architecture like this would be to use a simple round robin scheduler. However such an algorithm is too simple for this model. We have two problems to overcome. The first is that the load on LDM threads is not balanced since we have decreased the number of table partitions in a table. Second writes and locked reads can only be scheduled in an LDM thread. Thus it is important to use the Read Committed queries to achieve a balanced load. Since LDM threads and query threads are locked onto the same CPU core it is ok for an LDM thread to be almost idle and we will still be efficient since the query thread on this CPU core will be very efficient.


When a query can be scheduled to both an LDM thread and the query threads in the same Round Robin group the following two-level scheduling algorithm is used.

We gather statistics about CPU usage of threads and we also gather queue lengths in the scheduling queues. Based on this information we prioritise selecting the LDM thread and the query thread in the same LDM group. However, if required to achieve a balanced use of the CPU resources in the Round Robin group we will also schedule read committed queries to any query thread in the Round Robin group of the LDM thread. The gathered CPU usage information affects the load balancer with a delay of around 100 milliseconds. The queue length information makes it possible to adapt to changing load in less than a millisecond.

Given that we use less table partitions in RonDB compared to other solutions, there is a risk of imbalanced load on the CPUs. This problem is solved by two things. First, we use a two-level load balancer on LDM and Query threads. This ensures that we will move away work from overloaded LDM threads towards unused query threads. Second, since the LDM and Query threads share the same CPU core we will have access to an unused CPU core in query threads that execute on the same CPU core as an LDM thread that is currently underutilized. Thus, we expect that this architecture will achieve a balanced load on the CPU cores in the data node architecture.

LDM and query threads use around 50-60% of the available CPU resources in a data node.

tc threads

The tc threads receive all database operations sent from the NDB API. They take care of coordinating transactions and decide which node should take care of the queries. They use around 20-25% of the CPU resources. The NDB API selects tc threads in a node using a simple round robin scheme.

receive threads

The receive threads take care of a subset of the communication links. Thus, the receive thread load is usually fairly balanced but can be a bit more unbalanced if certain API nodes are more used in querying RonDB. The communication links between data nodes in the same node group are heavily used when performing updates. To ensure that RonDB can scale in this situation these node links use multiple communication links. Receive threads use around 10-15% of the CPU resources.

send threads

The send threads assist in sending networking messages to other nodes. The sending of messages can be done by any thread and there is an adaptive algorithm that assigns more load for sending to threads that are not so busy. The send threads assists in sending to ensure that we have enough capacity to handle all the load. It is not necessary to have send threads, the threads can handle sending even without a send thread. Send threads use around 0-10% of the CPUs available.

The total cost of sending can be quite substantial in a distributed database engine, thus the adaptive algorithm is important to balance out this load on the various threads in the data node.

main threads

The number of main threads supported can be 0, 1 or 2. These threads handle a lot of the interactions around creating tables, indexes and any other metadata operation. They also handle a lot of the code around recovery and heartbeats. They are handling any subscriptions to asynchronous replication events used by replication channels to other RonDB clusters.

Analysis of the RonDB thread model

Background

RonDB is based on NDB Cluster. NDB was focused on being a high-availability key-value store from its origin in database research in the 1990s. The thread model in NDB is inherited from a telecom system developed in Ericsson called AXE. Interestingly in one of my first jobs at Philips I worked on a banking system developed in the 1970s, this system had a very similar model compared to the original thread model in NDB and in AXE. In the operating system development time-sharing has been the dominant model since a long time back. However the model used in NDB where the execution thread is programmed as an asynchronous engine where the application handles a state machine has a huge performance advantage when handling many very small tasks. A normal task in RonDB is a key lookup, or a small range scan. Each of those small tasks is actually divided even further when performing updates and parallel range scans. This means that the length of a task in RonDB is on the order of 500 ns up to around 10 microseconds.

Traditional thread design for key-value stores

Time-sharing operating systems are not designed to handle context switches of this magnitude. NDB was designed with this understanding from the very beginning. Early competitors of NDB used normal operating system threads for each transaction and even in a real-time operating system this had no chance to compete with the effectiveness of NDB. None of these competitors are still around competing in the key-value store market.

Asynchronous thread model

The first thread model in NDB used a single thread to handle everything, send, receive, database handling and transaction handling. This is version 1 of the thread architecture, that is also implemented in the open source version of Redis. With the development of multi-core CPUs it became obvious that more threads were needed. What NDB did here was introduce both a functional separation of threads and partitioning the data to achieve a more multi-threaded execution environment. This is version 2 of the thread architecture.

Modern competitors of RonDB have now understood the need to use asynchronous programming to achieve the required performance in a key-value store. We see this in AeroSpike, Redis, ScyllaDB and many other key-value stores. Thus the industry has followed the RonDB road to achieving an efficient key-value store implementation.

Functional separation of threads

Most competitors have opted for only partitioning the data and thus each thread still has to execute all the code for meta data handling, replication handling, send, receive and database operations. Thus RonDB has actually advanced version 2 of the thread architecture further than its competitors.

One might ask, what difference does this make?

All modern CPUs use both a data cache and an instruction cache. By combining all functions inside one thread, the instruction cache will have to execute more code. In RonDB the LDM thread only executes the operation to change the data structures, the tc thread only executes code to handle transactions and the receive thread can focus on the code to execute network receive operations. This makes each thread more efficient. The same is true for the CPU data cache, the LDM thread need not bother with the data structures used for transaction handling and network receive. It can focus the CPU caches on the requirements for database operations which is challenging enough in a database engine.

A scalable key-value store design

A simple splitting of data into different table partitions makes sense if all operations towards the key-value store are primary key lookups or unique key lookups. However most key-value stores also require performing general search operations as part of the application. These search operations are implemented as range scans with search conditions, these scale not so well with a simple splitting of data.

To handle this, RonDB introduces version 3 of the thread architecture that uses a compromise where we still split the data, but we introduce query threads to assist the LDM threads in reading the data. Thus RonDB can handle hotspots of data and require fewer number of table partitions to achieve the required scalability of the key-value store.

Thoughts on a v4 of the thread architecture have already emerged, so expect this development to continue for a while more. This includes even better handling of the higher latency to persistent memory data structures.

Finally, even if a competitor managed to replicate all of those features of RonDB, RonDB has another ace in the 3-level distributed hashing algorithm that makes use of a CPU cache aware data structure.

Conclusion

All of those things combined makes us comfortable that RonDB will continue to lead the key-value store market in terms of LATS: lowest Latency, highest Availability, the highest Throughput and the most Scalable data storage. Thus, being the best LATS database in the industry.

AI/ML needs a Key-Value store, and Redis is not up to it

RonDB
>
2/26/2021
Mikael Ronström
>
Mikael Ronström
Jim Dowling

Online feature stores are the data layer for operational machine learning models - the models that make online shopping recommendations for you and help identify financial fraud. When you train a machine learning model, you feed it with high signal-to-noise data called features. When the model is used in operation, it needs the same types of features that it was trained on (e.g., how many times you used your credit card during the previous week), but the online feature store should have low latency to keep the end-to-end latency of using a model low. Using a model requires both retrieving the features from the online feature store and then sending them to the model for prediction. 

Hopsworks has been using NDB Cluster as our online feature store from its first release. It has the unique combination of low latency, high availability, high throughput, and scalable storage that we call LATS. However, we knew we could make it even better as an online feature store in the cloud, so we asked one of the world’s leading database developers to do it - the person who invented NDB, Mikael Ronström. Together we have made RonDB, a key-value store with SQL capabilities, that is the world’s most advanced and performant online feature store. Although NDB Cluster is open-source, its adoption has been hampered by an undeserved reputation of being challenging to configure and operate. With RonDB, we overcome this limitation by providing it as a managed service in the cloud on AWS and Azure.

Requirements for an Online Feature Store

The main requirements from a database used as an online feature store are: low latency, high throughput for mixed read/write operations, high availability and the ability to store large data sets (larger than fit on a single host). We unified these properties in a single muscular term LATS:

LATS: low Latency, high Availability, high Throughput, scalable Storage. 

RonDB is not without competition as the premier choice as an online feature store. To quote Khan and Hassan from DoorDash, it should be a low latency database: 

“latency on feature stores is a part of model serving, and model serving latencies tend to be in the low milliseconds range. Thus, read latency has to be proportionately lower.” 

To that end, Redis fits this requirement as it is an in-memory key-value store (without SQL capabilities). Redis is open source (BSD Licence), and it enjoys popularity as an online feature store. Doordash even invested significant resources in increasing Redis’ storage capacity as an online feature store, by adding custom serialization and compression schemes. Significantly, similar to RonDB, it provides sub-millisecond latency for single key-value store operations. There are other databases that have been proposed as online feature stores, but they were not considered in this post as they have significantly higher latency (over one order-of-magnitude!), such as DynamoDB, BigTable, and SpliceMachine.

As such, we thought it would be informative to compare the performance of RonDB and Redis as an online feature store. The comparison was between Redis open-source and RonDB open-source (the commercial version of Redis does not allow any benchmarks). In addition to our benchmark, we compare the innards of RonDB’s multithreading architecture to the commercial Redis products (since our benchmark identifies CPU scalability bottlenecks in Redis that commercial products claim to overcome).

Benchmark: RonDB vs Redis

In this simple benchmark, I wanted to compare apples with apples, so I compared open-source RonDB to the open-source version of Redis, since the commercial versions disallow reporting any benchmarks. In the benchmark, I deliberately hobble the performance of RonDB by configuring it with only a single database thread, as Redis is “a single-threaded server from the POV of command execution”. I then proceed to describe the historical evolution of RonDB’s multithreaded architecture, consisting of three different generations, and how open-source Redis is still at the first generation, while commercial Redis products are now at generation two.

Firstly, for our single-threaded database benchmark, we performed our experiments on a 32-core Lenovo P620 workstation with 64 GB of RAM. We performed key-value lookups. Our experiments show that a single-threaded RonDB instance reached around 1.1M reads per second, while Redis reached more than 800k reads per second - both with a mean latency of around 25 microseconds. The throughput benchmark performed batch reads with 50 reads per batch and had 16 threads issuing batch requests in parallel. Batching reads/writes improves throughput at the cost of increased latency.


On the same 32-core server, both RonDB and Redis reached around 600k writes per second when performing SET for Redis and INSERT, UPDATE or DELETE operations for RonDB. For high availability, both of those tests were done with a setup using two replicas in both RonDB and in Redis.

Low latency

We expected that the read latency and throughput of RonDB and Redis would be similar since both require two network jumps to read data. In case of updates (and writes/deletes), Redis should have lower latency since an update is only performed on the main replica before returning. That is, Redis only supports asynchronous replication from the main replica to a backup replica, which can result in data loss on failure of the main node. In contrast, RonDB performs an update using a synchronous replication protocol that requires 6 messages (a non-blocking version of two-phase commit). Thus, the expected latency is 3 times higher for RonDB for writes. 

High Throughput

A comparison of latency and throughput shows that RonDB already has a slight advantage in a single-threaded architecture, but with its third-generation multithreaded architecture, described below, RonDB has an even bigger performance advantage compared to Redis commercial or open-source. RonDB can be scaled up by adding more CPUs and memory or scaled out, by automatically sharding the database.  As early as 2013, we developed a benchmark with NDB Cluster (RonDB’s predecessor) that showed how NDB could handle 200M Reads per second in a large cluster of 30 data nodes with 28 cores each. 

High Availability

The story on high availability is different. A write in Redis is only written to one replica. The replication to other replicas is then done asynchronously, thus consistency can be seriously affected by failures and data can be lost. An online feature store must accept writes that change the online features constantly in parallel with all the batched key reads. Thus handling node failures in an online feature store must be very smooth.

Given that an online feature store may need to scale to millions of writes per second as part of a normal operation, this means that a failed node can cause millions of writes to be lost, affecting the correctness and quality of any models that it is feeding with data. RonDB has transactional capabilities that ensure that transactions can be retried in the event of such partial failures. Thus, as long as the database cluster is not fully down, no transactions will be lost.

In many cases the data comes from external data sources into the online Feature Store, so a replay of the data is possible, but an inconsistent state of the database can easily lead to extra unavailability in failure situations. Since an online feature store is often used in mission-critical services, this is clearly not desirable.

RonDB updates all replicas synchronously as part of writes. Thus, if a node running the transaction coordinator or a participant fails, the cluster will automatically fail over to the surviving nodes, a new transaction coordinator will be elected (non-blocking), and no committed transactions will be lost. This is a key feature of RonDB and has been tested in the most demanding applications for more than 15 years and tested thousands of times on a daily basis.

Additionally it can be mentioned that in a highly available setup, in a cloud environment RonDB can read any replica and still see the latest changes whereas Redis will have to read the main replica to get a consistent view of the data and this will, in this case, require communicating across availability zones which can easily add milliseconds to latency for reads. RonDB will automatically setup the cluster such that applications using the APIs will read replicas that are located in the same availability zone. Thus in those setups RonDB will always be able to read the latest version of the data and still deliver data at the lowest possible latency. Redis setups will have to choose between delivering consistent data with higher latency or inconsistent data with low latency in this setup.

Scalable Storage

Redis only supports in-memory data - this means that Redis will not be able to support online Feature Stores that store lots of data. In contrast, RonDB can store data both in-memory and on-disk, and with support for up to 144 database nodes in a cluster, it can scale to clusters of up to 1PB in size.

Analysis: Three Generations of Multithread Architectures

For our single-threaded benchmark, we did not expect there to be, nor were there, any major differences in throughput or latency for either read or write operations. The purpose of the benchmark was to show that both databases are similar in how efficiently they use a single CPU. RonDB and Redis are both in-memory databases, but the implementation details of their multithreaded architectures matters for scalability (how efficiently they handle increased resources), as we will see. 

Firstly, “Redis is not designed to benefit from multiple CPU cores. People are supposed to launch several Redis instances to scale out on several cores if needed.” For our use-case of online feature stores, it is decidedly non-trivial to partition a feature store across multiple redis instances. Therefore, commercial vendors encourage users to pay for their distributions that introduce a new multithreaded architecture to Redis. We now chronicle the three different generations of threading architectures underlying RonDB, from its NDB roots, and how they compare to Redis’ journey to date.

Open-source Redis has practically the same thread architecture implemented as in the first version of NDB Cluster from the 1990s, when NDB was purely an in-memory database. The commercial Redis distributions have sinced developed multithreaded architectures very similar to the second thread architecture used by later versions of NDB cluster. However, RonDB has evolved into a third version of the NDB cluster thread architecture. Let’s dive into the details of these 3 generations of threading architectures.

The first generation thread architecture is extremely simple - everything is implemented in one thread. This means that this thread handles receive on the socket, handling transactions, handling the database operation, and finally sending the response. This works better with fast CPUs with large caches - more Intel, less AMD. Still, handling all of this in one thread is efficient and my experiments on a 32-core Lenovo P620 workstation, we can see that RonDB reached 1.1M reads per second and Redis a bit more than 800k reads per second in a single thread with a latency of around 25 microseconds. This test used batch reads with 50 reads per batch and had 16 threads issuing batches in parallel. For INSERT, UPDATE or DELETE operations, both RonDB and Redis reached around 600k writes per second, with Redis performing SET operations (it does not have a SQL API). This benchmark was done with a setup using two replicas in both RonDB and in Redis.

Around 2008, the trend towards an increased number of CPUs per socket accelerated and today a modern cloud server can have more than 100 CPUs. With NDB, we decided it was necessary to redesign its multithreading architecture to scale with the available number of CPUs. This resulted in our second generation thread architecture, where we partitioned database tables and let each partition (shard) be managed by a separate thread. This takes care of the database operation. However, as NDB is a transactional distributed database, it was also necessary to design a multithreaded architecture for socket receive, socket send, and transaction handling. These are harder to scale-out on multi-core hardware.

Both NDB and the commercial solutions of Redis implement their multithreaded architecture using message passing. However, the commercial Redis solution uses Unix sockets for communication, whereas NDB (and RonDB) send messages through memory in a lock-free communication scheme. RonDB has dedicated threads to handle socket receive, socket send, and transaction handling. To further reduce latency, network send is adaptively integrated into transaction handling threads.

RonDB has, however, taken even more steps in its multithreaded architecture compared to commercial Redis solutions. RonDB now has a new third generation thread architecture that improves the scalability of the database - it is now possible for more than one thread to perform concurrent reads on a partition. RonDB will ensure that all these  read-only threads that can read a partition all share the same L3 cache to avoid any scalability costs. This ensures that hotspots can be handled by multiple threads in RonDB. This architectural advancement decreases the need to partition the database into an increasingly larger number of partitions, and prevents problems such as over-provisioning due to hotspots in DynamoDB. The introduction of read-only threads is important for applications that also need to perform range scans on indexes that are not part of the partition key. It also enables the key-value store to handle secondary index scans and complex SQL queries in the background, while concurrently handling primary key operations (reads/writes for the online Feature Store).

Finally, the third generation thread architecture enables fine-grained elasticity; it is easy to stop a database node and bring it up again with a higher or lower number of CPUs, thus making it easy to increase database throughput or decrease its cost of operation. Hotspots can be mitigated with read-only threads without resorting to over-provisioning. RonDB also has coarse-grained elasticity, with the ability to add new database nodes without affecting its operation, known as online add node.

Conclusions

There are numerous advantages of RonDB compared to Redis; the higher availability and the ability to handle much larger data sets are the most obvious ones. Both are open-source, but Redis has the advantage of a reduced operational burden as it is not at its core a distributed system. However, in the recent era of managed databases in the cloud, operational overhead is no longer a deciding factor when choosing your database. With RonDB’s appearance as a managed database in the cloud, the operational advantages of Redis diminish, paving the way for RonDB to gain adoption as the highest performance online feature store in the cloud. 

How to Engineer and Use Features in Azure ML Studio with the Hopsworks Feature Store

Hopsworks
>
2/25/2021
Moritz Meister
>
Moritz Meister

TLDR: The Hopsworks Feature Store is an open platform that connects to the largest number of data stores, and data science platforms with the most comprehensive API support - Python, Spark (Python, Java/Scala). It supports Azure ML Studio Notebooks or Designer for feature engineering and as your data science platform. You can design and ingest features and you can browse existing features,  along with creating training datasets as either DataFrames or as files on Azure Blob storage.

Introduction

Azure ML is becoming an increasingly popular data science platform to train, deploy, and manage machine learning models. Azure ML also supports AutoML, and enables you to automate and run pipelines at scale. 

The Hopsworks Feature Store is the leading open-source feature store for machine learning. It is provided as a managed service on Azure (and AWS) and it includes all the tools needed to store, retrieve and track features that will be used when training and serving ML models. The Hopsworks Feature Store integrates with many cloud platforms  and storage services, one of which is Azure ML Studio.

In this blog post, we show how you can connect to the Hopsworks Feature Store from Azure ML, how to ingest features from a Pandas dataframe, combine different features in order to create a training dataset, and finally save that training dataset to Azure Blob storage, from where the data can be read in Azure ML to train and validate a machine learning model.

Prerequisites

In order to follow this tutorial, you need:

  1. Hopsworks Feature Store running on https://hopsworks.ai. You can register for free with no credit-card and receive 300 USD of credits to get started. You can deploy a feature store in either your own Azure account or even in an AWS account.
  2. Users should also have an existing ML Studio notebook with an attached compute cluster. If you don't have an existing notebook, you can create one by following the Azure ML documentation.
  3. If you want to follow this tutorial with the same data, make sure to upload these files to your ML Studio environment.
  4. A project created within Hopsworks. If you don’t have one yet, you can simply follow  the Feature Store tour that creates a sample project for you.

Step 1: Configure a Hopsworks API Key

Connecting to the Feature Store from Azure ML requires setting up a Feature Store API key for authentication. In Hopsworks, click on your username in the top-right corner (1) and select Settings to open the user settings. Select API keys. (2) Give the key a name and select the job, featurestore, dataset.create and project scopes before (3) creating the key. Copy the key into your clipboard for the next step.

Step 2: Connect from an Azure Machine Learning Notebook

To access the Feature Store from Azure Machine Learning, open a Python notebook and proceed with the following steps to install the Hopsworks Feature Store client called HSFS:

-- CODE language-bash -- !pip install hsfs[hive]

Note that we are installing the latest version at the time of writing this (2.1.4) - you should always install the latest minor version that corresponds to the version of your Hopsworks Feature Store. So in this case our Hopsworks instance is running version 2.1.

Furthermore, for Python clients (such as Azure ML), it is important to install HSFS with the `[hive]` optional extra. Spark clients do not need this.

After successfully installing HSFS, you should be able to connect to the Feature Store from your Azure ML  notebook (note: you might need to restart the kernel, if you had HSFS previously installed):

-- CODE language-bash -- import hsfs connection = hsfs.connection(host="[UUID].cloud.hopsworks.ai", project="[project-name]", engine="hive", api_key_value="[api-key]") fs = connection.get_feature_store()

Make sure to replace the [UUID] with the one of the DNS of your Hopsworks instance, the [project-name] with the Hopsworks project that contains your feature store. And the [api-key] with the key created in Step 1. Please note that it’s not good practice to store the Api Key in your notebook- instead you should store the key safely in a permissions protected file and use the “api_key_file” argument to pass the filename to the connection method.

Once you are connected you can get a handle to the feature store with `connection.get_feature_store()`. If the project you have connected to also contains a shared feature store (it is possible to have a feature store from another project shared with the project you are using), you can also get a handle on the shared feature store using the connection object.

Step 3: Ingest data from a Pandas dataframe to the Feature Store

You can simply upload some data in your favourite file format to the Azure ML workspace or you configure a Hopsworks Storage Connector to cloud storage or a database. The Storage Connector safely stores endpoints and credentials to external stores or databases, making it easier for Data Scientists to retrieve data from them.  

If you opted to upload the data as CSV files, as shown below, simply read it into a pandas dataframe:

-- CODE language-bash -- import pandas as pd import numpy as np sales_csv = pd.read_csv("sales data-set.csv") stores_csv = pd.read_csv("stores data-set.csv")

Now, we can perform some feature engineering based on the pandas dataframe. We would like to predict the weekly sales of a department, so let’s create our target feature by selecting the last week available for each department:We can create this as a feature group, also containing the `is_holiday` feature, since, this information will be available at prediction time, there is no risk of data leakage.
We can create this as a feature group, also containing the `is_holiday` feature, since, this information will be available at prediction time, there is no risk of data leakage.

-- CODE language-bash -- sales_csv["date"] = pd.to_datetime(sales_csv["date"]) sales_csv.sort_values(["store", "dept", "date"], inplace=True) target_df = sales_csv.groupby(["store", "dept"]).last().reset_index()

-- CODE language-bash -- target_df


We can create this as a feature group, also containing the `is_holiday` feature, since, this information will be available at prediction time, there is no risk of data leakage.

-- CODE language-bash -- fg_target = fs.create_feature_group("weekly_sales_target", version=1, description="containing the latest weekly sales of each store/department", primary_key=["store", "dept"], time_travel_format=None) fg_target.save(target_df)


By clicking the hyperlink in the logs underneath the notebook cell, you can follow the progress of your ingestion job in Hopsworks.


Let’s now create a few simple features based on the historical sales of each department:

-- CODE language-bash -- df = pd.merge(sales_csv, target_df[["store", "dept", "date"]], on=["store", "dept"], how="left") hist_df = df[df["date_x"] != df["date_y"]] hist_df["holiday_flag"] = df['is_holiday'].apply(lambda x: 1 if x else 0) hist_df["non_holiday_flag"] = df['is_holiday'].apply(lambda x: 0 if x else 1) hist_df["holiday_week_sales"] = hist_df["holiday_flag"] * hist_df["weekly_sales"] hist_df["non_holiday_week_sales"] = hist_df["non_holiday_flag"] * \ hist_df["weekly_sales"] total_features = hist_df.groupby(["store", "dept"]).agg( {"weekly_sales": [sum, np.mean], "date_x": pd.Series.nunique, "holiday_week_sales": sum, "non_holiday_week_sales": sum}) total_features.columns = ['_'.join(col).strip() for col in total_features.columns.values] total_features.reset_index(inplace=True)

And again, we finish by creating a feature group with this dataframe and saving it to the feature store:

-- CODE language-bash -- weekly_sales_total = fs.create_feature_group("weekly_sales_total", version=1, description="containing the total historical sales and weekly average of each store/department", primary_key=["store", "dept"], time_travel_format=None) weekly_sales_total.save(total_features)


Note: If you have existing feature engineering notebooks that you would like to reuse with the Hopsworks Feature Store, it should be enough to simply add the two calls (create the Feature Group, and save the dataframe to it) in order to ingest your features to the Feature Store. No other changes are required in your existing programs and you can still use your favourite Python libraries for feature engineering.

With these two feature groups we can move to the next step to create a training dataset. Since we did not disable statistics computation, you can head to the Hopsworks Feature Store and inspect the pre-computed statistics over the newly created feature groups.

Step 4: Create a training dataset in your favorite file format using the Feature Store

HSFS comes with an expressive Join API and Query Planner that allows users to join, filter and explore feature groups in order to create training datasets.

Assuming, you start with a new Jupyter Notebook, the first commands you need to run are to get handles to the previously created feature groups:

-- CODE language-bash -- target_fg = fs.get_feature_group("weekly_sales_target", version=1) sales_fg = fs.get_feature_group("weekly_sales_total", version=1)

Note that we explicitly supply the (schema) version for the feature group (version=1), so that other developers can update the feature groups safely in higher numbered versions of the feature group.

With our two feature group objects, we would like to join the target feature with our historical features, but only select the departments for our training dataset that have a full history of 142 weeks available:

-- CODE language-bash -- td_query = target_fg.select(["weekly_sales", "is_holiday"]) \ .join(sales_fg.filter(sales_fg.date_x_nunique == 142)) td_query.show(5)

As you can see, feature group joins work similarly to pandas dataframe joins. In this case we can omit the join-key since both feature groups have the same primary key, however, for more advanced joins there is always the possibility to specify the join key from each group as well as the join type (left, inner, right, outer, etc) manually.

Hopsworks Feature Store supports a variety of storage connectors to materialize your training dataset to different cloud storage systems. If you have previously configured an Azure Data Lake Storage connector, you can now use it as the destination for your training dataset:

-- CODE language-bash -- storage = fs.get_storage_connector(“azure-blob”, "ADLS")

Similar to feature groups, you can now create the training dataset in your favourite file format, matching the machine learning library you are planning to use - for example, choose ‘tfrecord’ for TensorFlow. The Feature Store will make sure to track all metadata related to your training dataset, even if the training dataset is created outside of Hopsworks.

-- CODE language-bash -- td = fs.create_training_dataset("weekly_sales_model", version=1, data_format="tfrecord", splits={"train": 0.8, "test": 0.2}, seed=12, label=["weekly_sales"], storage_connector=storage) td.save(td_query)

To retrieve the training dataset in your training environment you can simply get a handle to the dataset and its location, to pass it subsequently to your reader utilities:

-- CODE language-bash -- td = fs.get_training_dataset("weekly_sales_model", version=1) td.location

Get Started

This tutorial is available as a Jupyter Notebook in our GitHub repository. For more information, visit documentation.

RonDB: The World's Fastest Key-value Store is now in the Cloud.

>
2/24/2020
Mikael Ronström
>
Mikael Ronström
Jim Dowling

We are pleased to introduce RonDB, the world’s fastest key-value store with SQL capabilities, available now in the cloud. RonDB is an open source distribution of NDB Cluster, thus providing the same core technology and performance as NDB, but as a managed platform in the cloud. RonDB is the best low-latency, high throughput, and high availability database available today. In addition, it also brings large data storage capabilities. RonDB is a critical component of the infrastructure needed to build feature stores, and, in general, real-time applications.

Introducing RonDB

RonDB is an open source distribution of NDB Cluster which has been used as a key-value store for applications for almost 20 years. The name RonDB is a synthesis of its inventor, Mikael Ronström, and its heritage NDB. RonDB is offered as a managed version of NDB Cluster, providing the fastest solution targeting the traditional applications that use NDB Cluster such as those used in telecom, finance, and gaming. RonDB provides a solution as the data layer for operational AI applications. RonDB gives online models access to reliable, low-latency, high-throughput storage for precomputed features.  

RonDB is ideal for developers who require a real-time database that can (1) be quickly deployed  (RonDB provides a possibility to set up an RonDB cluster in less than a minute), (2) handle millions of database operations per second, and (3) be elastically scaled up and down to match the current load, reducing operational costs. 

What is a key-value store?

A key-value store associates a value with a unique identifier (a key) to enable easy storage, retrieval, searching, and updating of data using simple queries - making them ideal for organisations with large volumes of data that needs to be constantly retrieved and updated.

Distributed key-value stores, such as RonDB, are optimal for organisations with heavy read/write workloads on large data sets, thus being a good fit for businesses operating at scale. For example, they are a critical component of the infrastructure needed to build feature stores and many different types  of interactive and real-time applications.

The value of RonDB

Key-value store applications

RonDB can be used to develop interactive online applications where storing, retrieving and updating data is essential, but there is also the possibility to search for data in the same system. RonDB provides efficient access using a partition key through either native APIs or SQL access. 

RonDB supports distributed transactions enabling applications to perform any type of data modification. RonDB enables concurrent complex queries using SQL as part of the online application. Through its efficient partitioning scheme, RonDB can scale nodes to support hundreds of CPUs per node and then scale to hundreds of data nodes (tens of thousands of CPUs in total).

SQL applications

RonDB supports writing low latency, high availability SQL applications using its full SQL capabilities, providing a MySQL Server API.

LATS: low Latency, high Availability, high Throughput, and scalable Storage

RonDB is the fastest key-value store currently available and the only one with general purpose query capabilities, through its SQL support. The performance of key-value stores are evaluated based on low Latency, high Availability, high Throughput, and scalable Storage (LATS). 

LATS: low Latency, high Availability, high Throughput, scalable Storage.

Low Latency

RonDB has shown the fastest response times: it can respond in 100-200 microseconds on individual requests and in less than a millisecond on batched read requests; it can also perform complex transactions in a highly loaded cluster within 10 milliseconds. Moreover, and perhaps even more importantly, RonDB has predictable latency.

High Availability

NDB Cluster can provide Class 6 Availability; in other words, an NDB system is operational 99.9999% of the time, thus no more than 30 seconds of downtime per year. RonDB brings the same type of availability now to the cloud, thus ensuring that RonDB is always available. RonDB allows you to place replicas within a cloud region on different availability zones, ensuring that failure of availability zones does not result in downtime for RonDB.

High Throughput

RonDB scales to up to hundreds of millions of read or write ops/seconds. A single RonDB instance on a virtual machine can support millions of key value operations per second,  and, without downtime, can be scaled up  to a large cluster that supports hundreds of millions of key value operations per second.

Scalable Storage

RonDB offers large dataset capabilities, being able to store up to petabytes of data.Individual RonDB data nodes can store up to 16TBytes of in-memory data and many tens of TBytes of disk data. Given that RonDB clusters can scale up to 144 data nodes, an RonDB cluster can be scaled to store petabytes of data.

LATS Performance

The combination of predictable low latency, the ability to handle millions of operations per second, and a unique high availability solution makes RonDB the leading LATS database, that is now also available in the cloud.

RonDB as online feature stores

RonDB is the foundation of the Hopsworks Online Feature Store. For those unfamiliar with the concept of feature store, it is a data warehouse of features for machine learning. It contains two databases: an online database serving features at low latency to online applications and an offline database storing large volumes of historic feature values. 

The online feature store provides the data (features) that underlie the models used in intelligent online applications. For example, it helps inform an intelligent decision about whether a financial transaction is suspected of money laundering or not by providing the real-time information about the user’s credit history, recent card usage, location of the transaction, and so on. Thus, the most important requirements from the feature store are quick and consistent responses (low latency), handling large volumes of key lookups (high throughput), and being always available (high availability). In addition an online feature store should be able to scale as the business grows. RonDB is the only database available in the market that meets all these requirements.

Get Started

We’re not releasing RonDB publicly just yet. Instead, we have made it available as early access. If you think your team could benefit from RonDB, please contact us!

How to transform Amazon Redshift data into features with Hopsworks Feature Store

>
2/9/2021
Ermias Gebremeskel
>
Ermias Gebremeskel
Fabio Buso

TLDR: Hopsworks Feature Store provides an open ecosystem that connects to the largest number of data storage, data pipelines, and data science platforms. You can connect the Feature Store to Amazon Redshift to transform your data into features to train models and make predictions.

Introduction

Amazon Redshift is a popular managed data warehouse on AWS. Companies use Redshift to store structured data for traditional data analytics. This makes Redshift a popular source of raw data for computing features for training machine learning models.

The Hopsworks Feature Store is the leading open-source feature store for machine learning. It is provided as a managed service on AWS and it includes all the tools needed to transform data in Redshift into features that will be used when training and serving ML models.

In this blog post, we show how you can configure the Redshift storage connector to ingest data, transform that data into features (feature engineering) and save the pre-computed features in the Hopsworks Feature Store.

Prerequisites

To follow this tutorial users should have an Hopsworks Feature Store instance running on https://hopsworks.ai. You can register for free with no credit-card and receive 300 USD of credits to get started.

Users should also have an existing Redshift cluster. If you don't have an existing cluster, you can create one by following the AWS Documentation 

Step 1 - Configure the Redshift Storage Connector

The first step to be able to ingest Redshift data into the feature store is to configure a storage connector. The Redshift connector requires you to specify the following properties. Most of them are available in the properties area of your cluster in the Redshift UI.

  • Cluster identifier: The name of the cluster
  • Database driver: You can use the default JDBC Redshift Driver `com.amazon.redshift.jdbc42.Driver` (More on this later)
  • Database endpoint: The endpoint for the database. Should be in the format of `[UUID].eu-west-1.redshift.amazonaws.com`
  • Database name: The name of the database to query
  • Database port: The port of the cluster. Defaults to 5349

There are two options available for authenticating with the Redshift cluster. The first option is to configure a username and a password. The password is stored in the secret store and made available to all the members of the project.

The second option is to configure an IAM role. With IAM roles, Jobs or notebooks launched on Hopsworks  do not need to explicitly authenticate with Redshift, as the HSFS library will transparently use the IAM role to acquire a temporary credential to authenticate the specified user. 

In Hopsworks, there are two different ways to configure an IAM role: a per-cluster IAM role or a federated IAM role (role chaining). For the per-cluster IAM role, you select an instance profile for your Hopsworks cluster when launching it in hopsworks.ai, and all jobs or notebooks will be run with the selected IAM role.  For the federated IAM role, you create a head IAM role for the cluster that enables Hopsworks to assume a potentially different IAM role in each project. You can even restrict it so that only certain roles within a project (like a data owner) can assume a given role. 

With regards to the database driver, the library to interact with Redshift *is not* included in Hopsworks - you need to upload the driver yourself. First, you need to download the library from here. Select the driver version without the AWS SDK. You then upload the driver files to the “Resources” dataset in your project, see the screenshot below.

Upload the Redshift Driver (jar file) from your local machine to the Resources dataset.

Then, you add the file to your notebook or job before launching it, as shown in the screenshots below.

Before starting the JupyterLab server, add the Redshift driver jar file, so that it becomes available to jobs run in the notebook.

Step 2- Define an external (on-demand) Feature Group

Hopsworks supports the creation of (a) cached feature groups and (b) external (on-demand) feature groups. For cached feature groups, the features are stored in Hopsworks feature store. For external feature groups, only metadata for features is stored in the feature store - not the actual feature data which is read from the external database/object-store. When the external feature group is accessed from a Spark or Python job, the feature data is read on-demand using a connector from the external store. On AWS, Hopsworks supports the creation of external feature groups from a large number of data stores, including Redshift, RDS, Snowflake, S3, and any JDBC-enabled source. 

In this example, we will define an external feature group for a table in Redshift. External feature groups in Hopsworks support “provenance” in the Hopsworks Web UI, you can track which features are stored on which external systems and how they are computed. Additionally HSFS (the Python/Scala library used to interact with the feature store) provides the same APIs for external feature groups as for cached feature groups.

An external (on-demand) feature group can be defined as follow:

-- CODE language-bash -- # We named the storage connector defined in step 1 telco_redshift_cluster redshift_conn = fs.get_storage_connector("telco_redshift_cluster") telco_on_dmd = fs.create_on_demand_feature_group(name="telco_redshift", version=1, query="select * from telco", description="On-demand feature group for telecom customer data", storage_connector=redshift_conn, statistics_config=True) telco_on_dmd.save()

When running `save()` the metadata is stored in the feature store and statistics for the data are computed and made available through the Hopsworks UI. Statistics helps data scientists in the quest of building better features from raw data.

Step 3 -  Engineer features and save to the Feature Store

On-demand feature groups can be used directly as a source for creating training datasets. This is often the case if a company is migrating to Hopsworks and there are already feature engineering pipelines in production writing data to Redshift.

This flexibility provided by Hopsworks allows users to hit the ground running from day 1, without having to rewrite their pipelines to take advantage of the benefits the Hopsworks feature store provides.

-- CODE language-bash -- telco_on_dmd\ .select(['customer_id', 'internet_service', 'phone_service', 'total_charges', 'churn'])\

On-demand feature groups can also be joined with cached feature groups in Hopsworks to create training datasets. This helper guide  explains in detail how the HSFS joining APIs work and how they can be used to create training datasets.

If, however, Redshift contains raw data that needs to be feature engineered, you can retrieve a Spark DataFrame backed by the Redshift table using the HSFS API.

-- CODE language-bash -- spark_df = telco_on_dmd.read()

You can then transform your `spark_df` into features using feature engineering libraries. It is also possible to retrieve the dataframe as a Pandas dataframe and perform the feature engineering steps in Python code. 

You can then save the final dataframe containing the engineered features to the feature store as a cached feature group:

-- CODE language-bash -- telco_fg = fs.create_feature_group(name="telco_customer_features", version=1, description="Telecom customer features", online_enabled=True, time_travel_format="HUDI", primary_key=["customer_id"], statistics_config=True) spark_df = telco_on_dmd.read()

Storing feature groups as cached feature groups within Hopsworks provides several benefits over on-demand feature groups. First it allows users to leverage Hudi for incremental ingestion (with ACID properties, ensuring the integrity of the feature group) and time travel capabilities. As new data is ingested, new commits are tracked by Hopsworks allowing users to see what has changed over time. On each commit, statistics are computed and tracked in Hopsworks, allowing users to understand how the data has changed over time.

Cached feature groups can also be stored in the online feature store (`online_enabled=True`), thus enabling low latency access to the features using the online feature store API.

Get started

Get started today on Hopsworks.ai by configuring your Redshift storage connector and run this notebook.