No items found.

Show me the code; how we linked notebooks to features

>
Product Updates
>
11/23/2021
Show me the code; how we linked notebooks to features

TLDR; This blog introduces a new feature in the Hopsworks UI - feature code preview - the ability to view the notebook used to create a Feature Group or Training Dataset. The new feature works for both notebooks run on Hopsworks and notebooks run on Databricks. This new functionality enables Data Scientists to easily inspect the code used to compute features and training data.

Introduction

A key capability of feature stores is that they enable users to understand their features; where features come from - both the source of their data and the code used to create the features. Since Hopsworks version 1.2, users have been able to see which compute jobs run in the Hopsworks platform create which Feature Groups. However, an increasing number of users are running their feature engineering code on Jupyter notebooks and on external platforms, such as Databricks. To this end, we have introduced new functionality in the Feature Store UI, where you can view the notebook code used to create features or training data directly in the Feature Registry.

This new quick feature code view will help data scientists become more productive when they are in the EDA (exploratory data analysis) phase of their work. They will not have to navigate to a source code repository or Jobs UI to understand how a feature is computed. This new capability is part of our ongoing effort to make the Hopsworks Feature Store the most open and modular feature store on the market. Not only do we support the widest number of platforms for feature engineering (Python, Spark, SQL), but we also support the widest number of data sources, where you can store features computed from data stored in many different data platforms, including data warehouses, databases, object stores, graph databases, message buses, and free-text search databases. You do not have to bring your data for AI to a single data warehouse to be able to use our feature store. Keep the data in place, and centralize the features computed from it.

How you compute features for Hopsworks

Hopsworks supports general-purpose programming languages and frameworks for computing features, including Python/Pandas, Apache Spark (PySpark, Scala/Java, SQL), and Apache Flink. You can work with your existing workflow orchestrator tool (Hopsworks comes with Airflow built-in, but you can use an external Airflow, Dagster, Azure Data Factory, Jenkins and many more).

Hopsworks does not force you to define your features in a domain specific language (DSL). We meet you where you like to work - write your feature pipeline as a program in any Python/Spark/SQL environment. Extracting the feature computation code from a DSL is a straightforward task. However, as we support general purpose compute frameworks for feature computation, we had to solve the more general problem of storing and attaching the code for the notebook to the features in the feature store.

A code snippet in Python is shown below that uses the hsfs API (Hopsworks Feature Store) to write a dataframe to a feature group. First, we read data into a Pandas dataframe from a bucket in s3, then we perform feature engineering on the dataframe (elided for brevity), and the dataframe is finally saved to the feature group - an ACID update on the Hudi table for the offline feature store.

Notebook Snapshots and Time-Travel in Hudi-enabled Feature Groups

The feature group shown in the code above stores its historical feature values in an offline feature store - a Hudi table. Hudi is a file format that extends Parquet to give it ACID properties and time-travel capabilities. Our new feature code preview functionality works as follows. Every time we update a feature group, we snapshot the notebook (Jupyter in Hopsworks or Databricks notebook), and store it alongside the Hudi commit. 

Feature Code Snapshot Capabilities

The ability to easily view the code used to compute a given snapshot for a Feature Group enables new capabilities in Hopsworks:

  • Debugging: if you discover a bug in your features, you can easily check the code used to create those features. For example, if you discovered a particular data ingestion run that polluted a feature group, it is now trivial to inspect the code that performed that ingestion.
  • Reproducibility: you can reproduce the data used for a given update to a feature group by inspecting the program used to make that update along with the source data;
  • Auditing and governance: you can travel back in time to see what code was used to create feature data in a given time period.

The new feature code preview functionality will be available in Hopsworks 2.5, due for release before the end of 2021.

Learn how to set up customized alerts in Hopsworks for different events that are triggered as part of the ingestion pipeline.

Ermias Gebremeskel

Software engineer

Read

In this blog post we demonstrate how to build such a pipeline with real-world data in order to develop an iceberg classification model.

Theofilos Kakantousis

VP of Product

Read

Learn how to connect Hopsworks to Snowflake and create features and make them available both offline in Snowflake and online in Hopsworks.

Fabio Buso

VP Engineering

Read

Learn how to publish (write) and subscribe to (read) streams of events and how to interact with the schema registry and use Avro for data serialization.

Ahmad Al-Shishtawy

Guest Blogger

Read

RonDB 21.04.0 has integrated benchmark scripts to execute various benchmarks towards RonDB. We bring now the results of RonDB benchmark on AWS, Azure, and GCP.

With support to Apache Hudi, Hopsworks Feature Store offers lakehouse capabilities to improve automated feature pipelines and training pipelines (MLOps).

Davit Bzhalava

Head of Data Science

Read

Today we are pleased to announce the release of two new RonDB; 21.10.1, a beta release of RonDB 21.10 with 4 new features and a maintenance release for 21.04.

Mikael Ronström

Head of Data

Read

Hopsworks brings support for scale-out AI with the ExtremeEarth project which focuses on the most concerning issues of food security and sea mapping.

Theofilos Kakantousis

VP of Product

Read

The Hopsworks Feature Store abstracts away the complexity of a dual database system, unifying feature access for online and batch applications.

Moritz Meister

Software Engineer

Read

In this blog we performed a set of microbenchmarks. In particular, we compare RonDB with ScyllaDB for instruction cache on separating threads.

Mikael Ronström

Head of Data

Read

We have integrated Sysbench into the RonDB installation facilitate running benchmarks with RonDB. This blog will describe the use of these benchmarks in RonDB.

Mikael Ronström

Head of Data

Read

This tutorial gives an overview of how to work with Jupyter on the platform and train a state-of-the-art ML model using the fastai python library.

Robin Andersson

Software engineer

Read

HopsFS is an open-source scaleout metadata file system, but its primary use case is not Exabyte storage, rather customizable consistent metadata.

Jim Dowling

CEO

Read

Learn how to train a ML model in a distributed fashion without reformatting our code on Databricks with Maggy, open source tool available on Hopsworks.

Riccardo Grigoletto

Software Engineer

Read

This tutorial will show an overview of how to install and manage Python libraries in the platform.

Robin Andersson

Software engineer

Read

The first release of RonDB is now available. RonDB 21.04.0 brings new documentation, community support, 3x improved performance, bug fixes and new features.

Mikael Ronström

Head of Data

Read

Use open-source Maggy to write and debug PyTorch code on your local machine and run the code at scale without changing a single line in your program.

Moritz Meister

Software Engineer

Read

Evolve your models from stateless AI to Total Recall AI with the help of a Feature Store

Jim Dowling

CEO

Read

RonDB enables users to have full control over the assignment of threads to CPUs, how the CPU locking is to be performed and how the thread should be scheduled.

Mikael Ronström

Head of Data

Read

RonDB shows higher availability and the ability to handle larger data sets in comparison with Redis, paving the way to be the fastest key-value store available.

Mikael Ronström

Head of Data

Read

Learn how to design and ingest features, browse existing features, create training datasets as DataFrames or as files on Azure Blob storage.

Moritz Meister

Software Engineer

Read

RonDB is a managed key-value store with SQL capabilities. It provides the best low-latency, high throughput, and high availability database available today.

Mikael Ronström

Head of Data

Read

Connect the Hopsworks Feature Store to Amazon Redshift to transform your data into features to train models and make predictions.

Ermias Gebremeskel

Software engineer

Read

Hopsworks now supports dynamic role-based access control to indexes in elasticsearch with no performance penalty by building on Open Distro for Elasticsearch.

Mahmoud Ismail

Software engineer

Read

HopsFS-S3: cloud-native distributed hierarchical file system that has the same cost as S3, but has 100X the performance of S3 for file move/rename operations.

Mahmoud Ismail

Software engineer

Read

Hopsworks is the world's first Enterprise Feature Store along with an advanced end-to-end ML platform.

Theofilos Kakantousis

VP of Product

Read

Hopsworks is now available as a managed platform for Amazon Web Services (AWS) and Microsoft Azure with a comprehensive free tier.

Steffen Grohsschmiedt

Head of Cloud

Read

Hopsworks Feature Store API was rebuilt from the ground based on our extensive experience working with enterprise users requirements.

Fabio Buso

VP Engineering

Read

Use JOINs for feature reuse to save on infrastructure and the number of feature pipelines needed to maintain models in production.

Jim Dowling

CEO

Read

A data warehouse is an input to the Feature Store. A data warehouse is a single columnar database, while a feature store is implemented as two databases.

Jim Dowling

CEO

Read

Hopsworks supports machine learning experiments to track and distribute ML for free and with a built-in TensorBoard.

Robin Andersson

Software engineer

Read

This blog analyses the cost-benefits of Feature Stores for Machine Learning and estimates your return on investment with our Feature Store ROI Calculator.

Jim Dowling

CEO

Read

Integrate with third-party security standards and take advantage from our project-based multi-tenancy model to host data in one single shared cluster.

Antonios Kouzoupis

Software engineer

Read

Feature store as a new element and AI tool in the machine learning systems for the automotive industry.

Remco Frijling

Guest Blogger

Read

Try out Maggy for hyperparameter optimization or ablation studies now on Hopsworks.ai to access a new way of writing machine learning applications.

Moritz Meister

Software Engineer

Read

Learn how to integrate Kubeflow with Hopsworks and take advantage of its Feature Store and scale-out deep learning capabilities.

Jim Dowling

CEO

Read

Given the increasing interest in feature stores, we share our own experience of building one to help others who are considering following us down the same path.

Jim Dowling

CEO

Read

Integrate AWS SageMaker with Hopsworks to manage, discover and use features for creating training datasets and for serving features to operational models.

Fabio Buso

VP Engineering

Read

Get started with Hopsworks.ai to effortlessly launch and manage Hopsworks clusters in any AWS account that also integrates with Databricks and AWS SageMaker.

Steffen Grohsschmiedt

Head of Cloud

Read

This article introduces the Hopsworks Feature Store for Databricks, and how it can accelerate and govern your model development and operations on Databricks.

Fabio Buso

VP Engineering

Read

How ExtremeEarth Brings Large-scale AI to the Earth Observation Community with Hopsworks, the Data-intensive AI Platform

Theofilos Kakantousis

VP of Product

Read

Introducing the feature store which is a new data science tool for building and deploying better AI models in the gambling and casino business.

Jim Dowling

CEO

Read

How the Feature Store enables monolithic end-to-end ML pipelines to be decomposed into feature pipelines and model training pipelines.

Fabio Buso

VP Engineering

Read

The Hopsworks 1.x series has updates to the Feature Store, a new Experiments framework for ML, and connectivity with external systems (Databricks, Sagemaker).

Theofilos Kakantousis

VP of Product

Read

Anomaly detection and Deep learning for identifying money laundering . Less false positives and higher accuracy than traditional rule-based approaches.

Jim Dowling

CEO

Read

This is a guide to file formats for ml in Python. The Feature Store can store training/test data in a file format of choice on a file system of choice.

Jim Dowling

CEO

Read

Hopsworks supports easy hyperparameter optimization (both synchronous and asynchronous search), distributed training using PySpark, TensorFlow and GPUs.

Moritz Meister

Software Engineer

Read

Hopsworks now supports AMD GPUs and ROCm for deep learning, enabling developers to train deep learning models on AMD GPU hardware using TensorFlow.

Robin Andersson

Software engineer

Read

Hopsworks is replacing Horovod with Keras/TensorFlow’s new CollectiveAllReduceStrategy , a part of Keras/TensorFlow Estimator framework.

Robin Andersson

Software engineer

Read

Hopsworks is a data platform that integrates popular platforms for data processing such as Apache Spark, TensorFlow, Hops Hadoop, Kafka, and many others.

Jim Dowling

CEO

Read

How we use dynamic executors in PySpark to ensure GPUs are only allocated to executors only when they are training neural networks.

Robin Andersson

Software engineer

Read

Datasets used for deep learning may reach millions of files. The well known image dataset, ImageNet, contains 1m images, and its successor, the Open Images...

Jim Dowling

CEO

Read

When you train deep learning models with lots of high quality training data, you can beat state-of-the-art prediction models in a wide array of domains.

Jim Dowling

CEO

Read

Today, we are announcing that we have raised €1.25 million in seed funding, led by Inventure with participation by Frontline Ventures and AI Seed.

Jim Dowling

CEO

Read

If you are employing a team of Data Scientists for Deep Learning, a cluster manager to share GPUs between your team will maximize utilization of your GPUs.

Jim Dowling

CEO

Read

A Feature Store stores features. We go through data management for deep learning and present the first open-source feature store now in Hopsworks' ML Platform.

Kim Hammar

Software engineer

Read

Receiving Alerts in Slack/Email/ PagerDuty from Hopsworks - Support for managing your feature store

Hopsworks
>
11/12/2021
Ermias Gebremeskel
>
Ermias Gebremeskel
Theofilos Kakantousis

TLDR; Hopsworks is a Data-Intensive AI platform that manages the full AI lifecycle for MLOps, built around the industry leading Feature Store. Data ingestion tasks, for example data validation and ingestion, tend to be long-lived and typically run as part of a greater orchestrated data pipeline. Therefore, it is necessary to establish a mechanism where alerts can be customized and sent for different events that are triggered as part of the ingestion pipeline. This tutorial will go through the necessary steps to set up alerts in the Hopsworks Feature Store for feature validation and ingestion. 

Introduction

Hopsworks brings new alerting capabilities that enables users to monitor jobs and feature group validations. As alerting capabilities are relatively new to Hopsworks, we keep working on adding alerting support for other services. Currently the scope of alerts in Hopsworks is twofold; notify users about the changes in the status of jobs; notify users about the feature validation status of data being inserted into a feature group of the feature store or even of feature validations performed post-insertion.

There are two ways alerts can be configured for jobs and feature group validations:

  1. By creating alerts for all jobs or feature groups in a project
  2. By creating alerts for a specific job or feature group.

In this blog post we will go through the steps necessary to trigger alerts for jobs and feature group validations. Alerts can be sent via Slack, email, and PagerDuty. For the purposes of this blog Slack will be used as the alert receiver, but the steps described here apply to any of the methods previously mentioned.

Alerts can be set up at a project-level or at a cluster-level, meaning. Cluster wide alerts can only be configured by a platform administrator, a user with the HOPS_ADMIN role assigned, and project-level alerts on the other hand can be configured by any member of a project.

Prerequisites

To follow this tutorial you should have an instance of Hopsworks version 2.4 or above running on https://hopsworks.ai. You can register for free, without providing credit card information, and receive USD 300 worth of free credits to get started. The only thing you need to do is to connect your cloud account.

Walkthrough guide

Below we provide a step-by-step guide showing how to set up Hopsworks to trigger alerts for a PySpark feature engineering job that prior to inserting data into the feature group, uses the feature validation SDK in hsfs to ensure the correctness of the newly arrived data. Both the feature validation and the execution of the job will trigger alerts that are subsequently sent to different engineering groups, one that is responsible for monitoring jobs and another one that is responsible for the feature data itself.

In particular, by the end of this example the following events will have occurred: 

  1. The feature dataset is automatically generated inside the example program running as a job..
  2. The feature engineering job will validate and then insert synthetic data to a feature group.
  3. Two alerts will be sent to two different Slack channels, one when the feature validation fails due to incorrect data and one when the feature engineering job completes.

Step 1 - Configure the Alert manager 

To send alerts via Slack you first need to configure the global Slack webhook from the cluster settings page, as shown in the animation below. 


In addition to the webhook, you can add global receivers (channels). Global receivers will be available to all users of  the cluster.

To send alerts via email or PagerDuty you will need to add their respective configurations. If you prefer, it is also possible to add configurations and receivers by directly editing the alert manager configuration as shown in the image below.


Detailed information on how to configure the alert manager global settings is available here.   

Step 2 - Create receivers

After configuring the global Slack webhook you will be able to add project specific receivers by going to your project setting’s alerts section. Give the receiver a name that can identify the team that will receive the alert. In this example we will call it ml-team and will send alerts to the #ml-team channel and the user @admin in Slack. 


We also created a receiver called op-team that will receive alerts about jobs and feature group validations. 

Step 3 - Create alerts

Once a receiver is created we can go-ahead and create the alerts that will be triggered when feature validation fails and when the validation job finishes. 

  1. Create validation alert - project-wide

The validation alert will be created in the project settings and will be triggered on any validation event, in this demo a failed validation, in the project. When creating an alert we need to specify the trigger, receiver and severity. Here we will choose a trigger on data validation fail and the receiver will be the one created in the previous section (severity can be set to any value; info, warning or critical). 

  1. Create job alert

Now that we have created the validation alert we are ready to create the job that will create the feature group and populate it with fresh validated data. When creating the job we choose advanced to enable us to add alerts for this particular job. We want an alert to be triggered when the job finishes and send the alert to the op-team with severity info

The job’s code is based on the Data Validation with Python notebook from examples.hopsworks.ai


Finally run the job to see if alerts are sent to both teams. If everything works as expected #ml-team will receive an alert on data validation failure and #op-team will receive an alert when the job finishes (on job completion). The video below walks us through the scenario described above.

End-to-end Deep Learning Pipelines with Earth observation Data in Hopsworks

>
10/8/2021
Theofilos Kakantousis
>
Theofilos Kakantousis

Tl;dr: Hopsworks enables data scientists to develop scalable end-to-end machine learning pipelines with Earth observation data. In this blog post we demonstrate how to build such a pipeline with real-world data in order to develop an iceberg classification model.

Introduction

In the blog post AI Software Architecture for Copernicus Data with Hopsworks we described how Hopsworks, the data-intensive AI platform with a feature store, brings support for scale-out AI with Earth observation data from the Copernicus programme and the H2020 ExtremeEarth project. This blog post is a continuation of the previous one as we dive into a real-world example where we describe how to build and deploy a machine learning (ML) model using a horizontally scalable deep learning (DL) architecture that identifies if a remotely sensed target is a ship or iceberg.

An extended version of this blog post is available as deliverable “D1.8 Hops data platform support for EO data -version II” of the H2020 ExtremeEarth project published in June 2021.

Pipeline

In order to develop and put in production a machine learning model, the input data needs to be processed and transformed through a series of stages. Each stage serves a distinct purpose and all the stages chained together transform the input Earth observation data into an ML model that application clients can use. For the ship/iceberg classification model described in this example, these stages are listed below and described in detail in the following sections:

  • Data ingestion and pre-processing
  • Feature Engineering and Feature Validation
  • Training
  • Model Analysis, Model Serving, Model Monitoring
  • Orchestration
End-to-end ML pipeline stages

Dataset

A requirement for this example is to use a free and publicly available dataset in the Earth observation domain. As such, we opted for the “Statoil/C-CORE Iceberg Classifier Challenge - Ship or iceberg, can you decide from space?” [1] hosted by Kaggle which is an online community of data scientists and machine learners, and is distributed for free.

The schema for the Statoil dataset is presented in the figure below. The data is in json format and contains 1604 images. For each image in the dataset, we have the following information:

  • id - the id of the image.
  • band_1, band_2 - the flattened image data. Each band has 75x75
  • pixel values in the list, so the list has 5625 elements. Band 1 and Band 2 are signals characterized by radar backscatter produced from the polarizations to HH (transmit/receive horizontally) and HV (transmitted horizontally and received vertically).
  • inc_angle - the incidence angle of which the image was taken.
  • is_iceberg - set to 1 if it is an iceberg, and 0 if it is a ship.
Schema of the Statoil demonstrator dataset

Data ingestion and preprocessing

Hopsworks can ingest data from various external sources and it is up to the users to decide the most efficient approach for their use cases. Such data sources include object stores such as Amazon AWS S3 or Azure EBS, external relational databases that can be accessed via protocols such as JDBC and of course the data that resides in the local filesystem. Another option, which has followed for the purposes of this article, was to upload the input data via the Hopsworks UI which makes use of the Hopsworks REST API. This way, day is readily available to applications running in Hopsworks from within the project’s datasets.

Often times data  needs to be pre-processed, that is transformed into data ready to extract ML features from and eventually use it as training/test data. In the Earth observation domain, such preprocessing steps might involve applying algorithms implemented in arbitrary languages and platforms. For example, the European Space Agency (ESA) is  developing free open source toolboxes for the scientific exploitation of Earth Observation missions. ESA SNAP [2] is a common architecture for all Sentinel Toolboxes. To make it easier for developers to work with SNAP, the toolbox has been containerized and is made available by different organizations such as mundialis [3] to be run as Docker containers. Hopsworks as of version 2.2.0 supports running docker containers as jobs in Hopsworks. That means users can seamlessly integrate running Docker containers as part of their pipelines built in Hopsworks [4]. 

Feature Engineering and Feature Validation

After having ingested the input data into the platform and applied any preprocessing steps, we proceed by engineering the features required by the deep learning training algorithm. Feature engineering in this example is done within Hopsworks by using Jupyter notebooks and Python. Feature engineering can also be performed by an external service and the curated feature data can then be inserted into the Hopsworks feature store. The latter is the service that allows data scientists to store, organize, discover, audit and share feature data that can be reused across multiple ML models. 

In the iceberg classifier example above, we use the band_1 and band_2 features to compute a new feature called band_avg. All the features are then organized into a feature group and inserted into the feature store as shown in the code snippets image below.

Create a new feature band_avg

Input data often contain noise, for example missing feature values or values of the wrong data type. Since the feature data needs to be ready for use by the ML programs, when inserting data into the feature store developers can make use of the feature validation API which is part of the feature store Python and Scala SDKs [5]. This API provides a plethora of rules that can be applied on data as that is being inserted into the feature store. 

In the iceberg feature group example we chose to apply three validation rules:

  • HAS_DATATYPE: Asserts that the feature id of the iceberg feature group does not contain null values. This is asserted by setting the max allowed null values to zero. Additionally, the is_iceberg label is also expected to only contain numbers by setting the threshold for required numeric values of is_iceberg to 1.
  • HAS_MAX: Assertion on the maximum allowed value of the is_iceberg label, which is set to 1.
  • HAS_MIN: Assertion on the minimum allowed value of the is_iceberg label, which is set to 0.

These rules are grouped in feature store expectations and can be set during the feature group creation call as shown in the image below.

Feature expectations Python API example

Training

Hopsworks comes equipped with two Python frameworks, namely experiments [6]  and Maggy [7], that enable data scientists to develop machine learning models at scale as well as manage machine learning experiment metadata. In particular, these frameworks enable scalable deep learning with GPUs across multiple machines, distribution transparent machine learning experiments, ablation studies, and writing core ML training logic as oblivious training functions. Maggy enables you to reuse the same training code whether training small models on your laptop or reusing the same code to scale out hyperparameter tuning or distributed deep learning on a cluster. 

This example uses TensorFlow version 2.4 for developing the model. When launching a machine learning experiment from Hopsworks, the Jupyter service provides users with different options depending on what type of training/experimentation is to be done. As seen in the image below, these types are Experiment, Parallel Experiments, Distributed Training. Experiment is used to conduct a single experiment while Parallel Experiments can significantly speed up the process of exploring hyperparameter combinations that work best for the ML model. Distributed Training automates the process of setting up and launching workers that will develop the model based on the selected distributed training strategy. 

For example the screenshot below shows how to perform hyperparameter optimization with Maggy for the iceberg classification example.

Iceberg hyperparameter optimization with Maggy - launch

Once all trials are executed, a summary of results is printed as the final output.

Iceberg hyper-parameter optimization with Maggy - results

For distributed training, the same model was used as in the previous sections, however, Jupyter was started with the Distributed Training configuration.

Iceberg distributed training function
Iceberg distributed training experiments API launch

In the context of machine learning, we can define an ablation study as “a scientific examination of a machine learning system by removing its building blocks in order to gain insight on their effects on its overall performance”. With Maggy, performing ablation studies of machine learning or deep learning systems is a fairly simple task that consists of the following steps:

  • Creating an AblationStudy instance,
  • Specifying the components that you want to ablate by including them in your AblationStudy instance,
  • Defining a base model generator function and/or a dataset generator function,
  • Wrapping your TensorFlow/Keras code in a Python function (called e.g., the training function) that receives two arguments (model_function and dataset_function), and
  • Launching your experiment with Maggy while specifying an ablation policy.
Maggy ablation studies notebook example - ablations


Maggy ablation studies notebook example - results

Model: Analysis, Serving, Monitoring

Data scientists working with Hopsworks can make use of the What-If [8] tool to test performance in hypothetical situations, analyze the importance of different data features, and visualize model behavior across multiple models and subsets of input data, and for different ML fairness metrics. The What-If tool is available out of box when working within a Hopsworks project.

Below you can see the code snippet used to perform model analysis for the sea iceberg classification model developed with the demonstrator dataset in this deliverable. Users set the number of data points to be displayed, the test dataset location to be used for analysis of the model, and the features to be used.

Model analysis what-if tool code snippet

The next screenshot depicts the performance and fairness of the model based on a particular feature of the model.

Performance and Fairness of the model

After a model has been developed and exported by the previous stages in the DL pipeline, it needs to be served so that external clients can use it for inference. Also as the model is being served, its performance needs to be monitored in real-time so that users can decide when it would be the best time to trigger the training stage. For the iceberg classification model,  Hopsworks uses TensorFlow Model Server on Kubernetes to serve the model in an elastic and scalable manner and  Spark/Kafka for monitoring and logging the inference requests. Users can then manage the serving instances from the Hopsworks UI and view logs as shown in the screenshot below. 

Model serving logs in Kibana

 Orchestration

All previous sections have demonstrated how to apply transformations and processing steps to data via a Deep Learning pipeline, in order to go from raw data into an ML model. So far all steps had to be manually executed in a proper order to produce the output model. However, once that process is established it can then be quite repetitive in nature. That means it decreases the efficiency of data scientists whose primary focus is on improving the accuracy of the models by applying novel techniques and algorithms. Such a repetitive process should then be automated and managed easily with the help of software tools.

One such tool is Apache Airflow [9], a platform to programmatically schedule and monitor workflows. Hopsworks provides Airflow as one of the services available in a project. Users can either create an orchestration pipeline with the Hopsworks UI or implement it themselves and then upload it to Hopsworks.

Airflow service UI in Hopsworks


Airflow tree-view tasks for the iceberg classification pipeline

Conclusion

In this blog post we presented a real-world example of developing an end-to-end machine learning pipeline for performing iceberg classification with Earth observation (remote sensing) data. The pipeline is developed using tools and services available in Hopsworks and the example’s code is available in the ExtremeEarth project GitHub repository [10]. 

References

  1. https://www.kaggle.com/c/statoil-iceberg-classifier-challenge/overview/description 
  2. https://step.esa.int/main/toolboxes/snap/ 
  3. https://github.com/mundialis/esa-snap
  4. https://hopsworks.readthedocs.io/en/stable/user_guide/hopsworks/jobs.html#docker 
  5. https://docs.hopsworks.ai/latest/generated/feature_validation/ 
  6. https://hopsworks.readthedocs.io/en/stable/hopsml/experiment.html
  7. https://maggy.ai/dev/ 
  8. https://pair-code.github.io/what-if-tool/ 
  9. https://airflow.apache.org/ 
  10. https://github.com/ExtremeEarth-Project/eo-ml-examples/tree/main/D1.8

Note: This blog is a republication from Extreme Earth.

How to Transform Snowflake Data into Features with Hopsworks Feature Store

Feature store
>
12/7/2021
Fabio Buso
>
Fabio Buso

Snowflake is a popular cloud-native data warehouse service, and supports scalable feature computation with SQL. However, Snowflake is not viable as an online feature store that serves features to models in production - columnar databases have too high latency compared to OLTP databases or key-value stores.  In this blog post, we show you how to connect Hopsworks to Snowflake and leverage its data and computation capabilities to create features and make them available both offline in Snowflake and online in Hopsworks.

Before you get started, make sure you have created a free account on hopsworks.ai. Hopsworks.ai gives access to a managed cloud based deployment of the Hopsworks Feature Store, with free Hops credits to use.

Storage connector

The first step to integrate Hopsworks with an external Snowflake cluster is to configure the Snowflake storage connector. Hopsworks provides storage connectors to securely centralize and manage connection configurations and credentials to interact with external data stores. In this way users do not have to hardcode passwords and tokens in programs, and can control which users are given access to external data stores.

Hopsworks provides a storage connector and drivers for Snowflake. The storage connector can be configured using the feature store UI as illustrated below:

To configure the connector you need to provide the Connection URL of your cluster. The Snowflake storage connector supports both username and password authentication as well as token-based authentication. Token-based authentication is required when using OAuth to authenticate with Snowflake. To be able to use token-based authentication from Hopsworks, you will have to enable it on your Snowflake cluster, as explained in the Snowflake documentation.

The Hopsworks Snowflake storage connector allows users to specify several additional fields, though only two are mandatory: the database field and the schema field.

The role field can be used to specify which Snowflake security role to assume for the session after the connection is established.

The application field can also be specified to have better observability in Snowflake with regards to which application is running which query. The application field can be a simple string like “Hopsworks” or, for instance, the project name, to track usage and queries from each Hopsworks project.

Additional key/value options can also be specified to control the behaviour of the Snowflake Spark connector. The available options are listed in the Snowflake documentation.

On-Demand Feature Groups (External Tables)

Once the storage connector is configured, users can start defining on-demand feature groups. On-demand feature groups are external tables that can be defined on external SQL databases or tables stored on object stores (such as Parquet or Delta lake tables on S3 object storage). The data for an on-demand feature group is not copied into Hopsworks. Instead, it is stored in-place in the external data store, and is only read from the external store “on-demand”. For example, when its feature data is used to create training data or for batch inference. While data remains on the external store, the feature group metadata is stored in Hopsworks. More in-depth documentation for on-demand feature groups can be found here.

On-demand feature groups can be used in combination with cached feature groups to either create training datasets or to retrieve large volumes of feature data for batch inference. 

An example of on-demand query definition is given below:

The above code defines avg_ss_net_profit, total_ss_net_profit and avg_ss_list_price as features in the store_sales_features feature group, and registers the feature group in the feature store.

Each time features are read from the on-demand feature group, the above query is pushed down to Snowflake for execution. When creating training data in a Spark program, the query result is returned using the Snowflake spark connector as a DataFrame, that can then be joined with features and then optionally materialized to a data store as machine-learning ready files (e.g., in .tfrecord file format).

Users can also compute statistics and define validation rules on the on-demand feature group data. More information can be found in our documentation. For data validation, in the case of an on-demand feature group, as the data is updated externally, a Spark program needs to be periodically run to explicitly validate the data. More details about data validation and alerting will be covered in a future blog post.

Create a Training Dataset

Training datasets are a concept of the Hopsworks Feature Store that allows data scientists to pick features from different feature groups, join them together and get the data in a ML framework friendly file format that can be used to train models in TensorFlow (.tfrecord), Pytorch (.npy), SkLearn (.csv), and others.

The training dataset concept provides additional benefits, such as having a snapshot of the data at a particular point in time, being able to compute statistics on that snapshot and compare them with the statistics on the incoming data being submitted for predictions.

You can create a training dataset either using the HSFS library or using the user interface . In the example below, we join features from two different on-demand feature groups defined over the Snowflake database. It is also possible to join features from cached feature groups as well.

The training dataset in the example below is stored in TFRecords format ready to be used by a TensorFlow or Keras model. The training dataset itself can be stored on external stores like S3 or ADLS, as explained in the documentation.

Online Serving

After having trained the model and put it into production, users need to leverage the online feature store to provide the features required to make predictions.

Snowflake is a columnar database, not designed to provide low-latency access to data. To satisfy the latency requirements that a production system requires, we make the feature data available online using RonDB. RonDB is the database powering the Hopsworks online feature store.
To make the data available online, we need to create a cached feature group and set the online_enabled flag to True. For online storage, we don’t need to track data statistics. The online feature group will contain only the most recent values of each entity (e.g., the last feature vector available for a given store_id or customer_id).

The above example makes the data available only at a specific point in time (i.e., the moment the online feature group was created). In Hopsworks, users can define a Spark/Python program that periodically refreshes the data according to the requirement of the specific use case. Hence making the most recent data always available online.

Examples

This blog post is also available as Jupyter notebooks that users can use to get started with the Hopsworks Snowflake connector. The example notebooks are available for Python and Scala.

You can run it for free by registering an account on Hopsworks.ai and receiving the free credits to experiment with the Hopsworks Feature Store.

Using an External Python Kafka Client to Interact with a Hopsworks Cluster

Hopsworks
>
9/27/2021
Ahmad Al-Shishtawy
>
Ahmad Al-Shishtawy

This tutorial was tested using Hopsworks version 2.2.

Prepare the Environment

We’ll start by preparing the schema, creating a Kafka topic, and downloading security credentials that we’ll need in this tutorial.

Avro Schema

Kafka treats data as blobs of bytes. It is your responsibility to pick a data format per topic and use it consistently across applications interacting with the topic. You are free to choose any format you prefer such as JSON or Protobuf. However, Avro became the industry standard for data format to use with Kafka. Avro is an open source data serialization system that is used to exchange data between systems across programming languages.

Avro relies on schemas that are used when writing/reading data. To simplify the management of schemas, Confluent implemented a Schema Registry as a layer on top of Kafka. Hopsworks implements its own schema registry that is compatible with Confluent Schema Registry v5.3.1. Kafka clients can use the schema registry to validate and make sure that the correct data is written to or read from a kafka topic.

In this tutorial, we’ll use temperature sensors as an example. Each sensor will have a unique ID, produce a temperature as its value at a specific timestamp. We’ll create a generic sensor schema that can be used with sensors with similar pattern. The code blow list the schema used in this tutorial. For more details about declaring Avro schemas and supported data types, check the Avro schemas documentation.

-- CODE language-bash -- { "type": "record", "name": "sensor", "fields": [ { "name": "timestamp", "type": "long", "logicalType": "timestamp-millis" }, { "name": "id", "type": "string" }, { "name": "value", "type": "double" } ] }

To register the above schema in Hopsworks, open the schemas settings in the Kafka tab and select New Avro Schema

Avro schema settings page

Then enter a Schema Name field for your schema and paste the schema itself in the content field. To check that the syntax of the schema is correct, press the Validate button. If everything is OK proceed by pressing the Create button.

Caution!

For the schema to work correctly with standard external clients, such as the Confluent Avro Producer/Consumer, the name given in the “Schema Name” field and in the schema declaration must be the same name. Furthermore, if you use a name space in the schema declaration, e.g., "namespace": "se.ri.kafka.tutorial", "name": "sensor", then the “Schema Name” field should contain the full name, i.e., se.ri.kafka.tutorial.sensor.

Registring a new Avro schema

Kafka Topic

Topics are a way to organize related events. A topic is like a buffer between event producers and event consumers. Events are durably stored in a topic and are not deleted after consumption. Events can be read as many times as needed and you define for how long Kafka should retain your events.

For scalability, a topic is divided into a number of partitions that are distributed across servers (called Kafka Brokers). Events are distributed among partitions either uniformly or by event key. Using an event key is recommended to guarantee that events from the same entity, e.g., user or sensor, end up in the same partition and thus processed in the correct order of arrival.

Tip

The number of partitions determine the maximum parallelism for processing (consuming) events by a single application. You can have as many event producers per topic as you want. Also there can be as many applications processing (consuming) events from a topic as needed. But within a single application, also known as a consumer group, the maximum parallelism (number of consumers) is defined by the number of partitions in the topic. This restriction is to guarantee the ordered processing of events within a topic.

To create a new Kafka topic, open the topic settings in the Kafka tab and select New Topic.

Kafka topics settings page

Give your topic a name. This will be used later in the code to identify the topic. Enter the desired number of partitions and replication degree. Select a schema and schema version to use with this topic. For this tutorial, use the values shown in the image below.

Note

For testing, it is OK to set the number of partitions and replicas to 1. In a production system, you should always set the number of replicas to larger that 1 (typically 3) to avoid data loss on server failures and also select appropriate number of partitions to achieve the desired performance based on the expected number and rate of events.

Creating a new Kafka topic

Security Certificates

Hopsworks provide a secure Kafka-as-a-Service. Connecting your Python Producers and Consumers from an external server to the one provided by Hopsworks requires exporting the project certificates. These are used by the clients to securely connect and authenticate against the Hopsworks Kafka cluster. The certificates are downloaded as a keystore and trustore. These are designed used by Java/Scala programs and needs to be converted to .pem format to be used by Python and other non Java programs.

To export your projects’ certificates, go to Project Settings in the Settings tab and click Export Certificates.

Project settings page

You will be asked to enter your login password before downloading.

Exporting project certificates (1/2)

After successfully entering your password, two certificate files will be downloaded, trustStore.jks and keyStore.jks. The certificate password will be displayed. It’s a long string that is similar to:

MQJNW833YNBR9C0OZYGBGAB09P2PP4H5EHIALGWIT98I2PNSPTIXFCEI72FT0VLE

Important

Store these two files in a safe place as they give remote access to your project and data. Same goes for the password. Copy and save your password in a safe location as we’ll need it later.

Exporting project certificates (2/2)

Next, we’ll convert the JKS keyStore into an intermediate PKCS#12 keyStore, then into PEM file. You will be asked to enter a new password for each of the generated certificates and also the original certificate password you got from the previous step.

-- CODE language-bash -- $ keytool -importkeystore -srckeystore keyStore.jks \ -destkeystore keyStore.p12 \ -srcstoretype jks \ -deststoretype pkcs12

-- CODE language-bash -- $ keytool -importkeystore -srckeystore keyStore.jks -destkeystore keyStore.p12 -srcstoretype jks -deststoretype pkcs12 Importing keystore keyStore.jks to keyStore.p12... Enter destination keystore password: Re-enter new password: Enter source keystore password: Entry for alias kafka_tutorial__meb10000 successfully imported. Import command completed: 1 entries successfully imported, 0 entries failed or cancelled

-- CODE language-bash -- $ openssl pkcs12 -in keyStore.p12 -out keyStore.pem

-- CODE language-bash -- $ openssl pkcs12 -in keyStore.p12 -out keyStore.pem Enter Import Password: Enter PEM pass phrase: Verifying - Enter PEM pass phrase: $ ls keyStore.jks keyStore.p12 keyStore.pem trustStore.jks

We repeat the same steps for the trustStore.

-- CODE language-bash -- $ keytool -importkeystore -srckeystore trustStore.jks \ -destkeystore trustStore.p12 \ -srcstoretype jks \ -deststoretype pkcs12

-- CODE language-bash -- $ keytool -importkeystore -srckeystore trustStore.jks -destkeystore trustStore.p12 -srcstoretype jks -deststoretype pkcs12 Importing keystore trustStore.jks to trustStore.p12... Enter destination keystore password: Re-enter new password: Enter source keystore password: Entry for alias hops_root_ca successfully imported. Import command completed: 1 entries successfully imported, 0 entries failed or cancelled

-- CODE language-bash -- $ openssl pkcs12 -in trustStore.p12 -out trustStore.pem

-- CODE language-bash -- $ openssl pkcs12 -in trustStore.p12 -out trustStore.pem Enter Import Password: $ ls keyStore.jks keyStore.p12 keyStore.pem trustStore.jks trustStore.p12 trustStore.pem

Now you should have keyStore.pem and trustStore.pem that we’ll use in the rest of this tutorial. You can safely delete the intermediate .p12 files.

API Key

Hopsworks provides a rich REST API to interact with most of the available services. One of these services is the Schema Registry that we’ll be using in this tutorial. To access the REST API we’ll need an API Key.

To create a new API Key associated with your account, open your user account settings.

Account Settings

Select the API Keys tab. Give your key a name and select the services that the app using this key can access. Then click on Create API Key.

Account Settings - API Keys tab

Copy and store your new key in a safe place as this is the only time it will be displayed. If you loose your API Key you’ll have to delete it and create a new one.

Your API Key will look something like this:

K97n09yskcBuuFyO.scfQegUMhXfHg7v3Tpk8t6HIPUlmIP463BPdbTSdSEKAfo5AB8SIwY8LGgB4924B

Important

Store your API Key in a text file (e.g., apiKeyFile) next to your certificates. We’ll use this file later to configure clients.

Creating an API Key

Project Name and ID

The final piece if information we need is the project name and ID. You will find this in your project settings tab.

Project Name and ID

Avro Client

Now we are ready for some coding. We’ll create a Kafka Producer and Consumer using the standard confluent-kafka library and connect it to a Hopsworks cluster. You can find the source code for all examples at Kafka Hopsworks Examples at GitHub.

You will need a working Python environment and the following packages:

-- CODE language-bash -- pip install confluent_kafka requests fastavro toml

For plotting you might need the following packages depending on your environment:

-- CODE language-bash -- pip install matplotlib pip install pyqt5 sudo apt install qt5-default

Configuration File

We’ll write down all the parameters we prepared in the previous section in a configuration file. This makes it easier to change and also to switch between multiple projects or deployments by switching configuration files.

Go through the parameters and change them accordingly to match your project settings. Then save it as config.toml

-- CODE language-bash -- [hops] url = '127.0.0.1' # for testing only! set this flag to false or path to server certificate file # needed when testing Hopsworks with a self signed certificate # otherwise leave this true verify = false [project] name = 'Kafka_Tutorial' id = '1143' ca_file = 'cert/trustStore.pem' certificate_file = 'cert/keyStore.pem' key_file = 'cert/keyStore.pem' key_password = 'asdf123' [kafka] topic = 'temperature' schema = 'sensor' port = '9092' [kafka.consumer] group_id = 'TutorialGroup' auto_offset_reset = 'earliest' [api] base = '/hopsworks-api/api' key = 'K97n09yskcBuuFyO.scfQegUMhXfHg7v3Tpk8t6HIPUlmIP463BPdbTSdSEKAfo5AB8SIwY8LGgB4924B' key_file = 'cert/apiKeyFile'

Sensor Data

We’ll need some data to test our example. For that we’ll generate a time series with trend, seasonality, and noise. The code can emulate multiple sensors. The generated data looks like the plot below.

Sensor Data Sample

The code below for sensor.py generates the data. The code was inspired by this example. You can test it yourself by executing the file.

-- CODE language-bash -- $ python sensor.py

-- CODE language-bash -- # Generates a time series with trend, seasonality, and noise. # Inspired by code from https://github.com/tensorflow/examples/blob/master/courses/udacity_intro_to_tensorflow_for_deep_learning/l08c01_common_patterns.ipynb from collections import deque import math import random import matplotlib.pyplot as plt def trend(time, slope=0): return slope * time def seasonal_pattern(season_time): """Just an arbitrary pattern, you can change it if you wish""" if season_time < 0.4: return math.cos(season_time * 2 * math.pi) else: return 1 / math.exp(3 * season_time) def seasonality(time, period, amplitude=1, phase=0): """Repeats the same pattern at each period""" season_time = ((time + phase) % period) / period return amplitude * seasonal_pattern(season_time) def white_noise(time, noise_level=1, seed=None): random.seed(seed) return random.normalvariate(0, 1) * noise_level # Combines the above functions to emulate a sensor. # Uses Python generator function def sensor(baseline=0, slope=0, period=20, amplitude=20, phase=0, noise_level=1, start=0, end=-1): time = start while(time < end or end < 0): yield baseline + trend(time, slope) \ + seasonality(time, period, amplitude, phase) \ + white_noise(time, noise_level) time += 1 if __name__ == '__main__': # initialize a number of sensors sensors = [ sensor(baseline=10, slope=0.1, period=100, amplitude=40, noise_level=5, end=1000), sensor(baseline=10, slope=0.2, period=80, amplitude=30, noise_level=4, end=1000), sensor(baseline=20, slope=-0.1, period=100, amplitude=50, noise_level=6, phase=20, end=1000), sensor(baseline=10, slope=0.1, period=100, amplitude=40, noise_level=0, end=1000), ] # a list of buffers to emulate receving data data_buffer = [deque() for x in range(len(sensors))] fig, ax = plt.subplots(len(sensors), sharex=True) lines = [a.plot([])[0] for a in ax] plt.show(block=False) for events in zip(*sensors): for e, b, l, a in zip(events, data_buffer, lines, ax): b.append(e) l.set_data(range(len(b)), b) a.set_xlim(0, len(b)+10) a.set_ylim(min(b)-10, max(b)+10) fig.canvas.draw() fig.canvas.flush_events() # pause execution so you can examin the figure input("Press Enter to continue...")

Avro Producer

With all preparation work out of the way, we are now ready to securely send sensor events into our HopsWorks Kafka topic. Below is the code for the avro_producer.py.

The code starts by defining an “Event“ class. This is the class for the objects we want to push into Kafka. You can change this class to match your application. The “event_to_dict“ is a helper function that returns a dictionary representation of an event object to be used by the Avro serializer. Note that the key names should match the field names defined in the schema and also the value types should match those in the schema.

The “main()“ function loads the configuration file and initializes the schema registry client, Avro serializer, and the producer. Then initializes a number of sensors to generate data and finally uses the producer to push the generated data into Kafka.

-- CODE language-bash -- # This is a simple example of the SerializingProducer using Avro. # from confluent_kafka import SerializingProducer from confluent_kafka.serialization import StringSerializer from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka.schema_registry.avro import AvroSerializer from confluent_kafka.schema_registry import record_subject_name_strategy from datetime import datetime import toml import argparse from sensor import sensor from time import sleep class Event(object): """ An object representing a sensor event Args: id (str): Sensor's id timestamp (datetime): timestamp in milliseconds value (double): Sensor's reading value """ def __init__(self, id, timestamp, value): self.id = id self.timestamp = timestamp self.value = value def event_to_dict(event, ctx): """ Returns a dict representation of a sensor Event instance for serialization. Args: event (Event): Event instance. ctx (SerializationContext): Metadata pertaining to the serialization operation. Returns: dict: Dict populated with sensor event attributes to be serialized. """ return dict(id=event.id, timestamp=datetime.timestamp(event.timestamp), value=event.value) def delivery_report(err, msg): """ Reports the failure or success of a message delivery. Args: err (KafkaError): The error that occurred on None on success. msg (Message): The message that was produced or failed. Note: In the delivery report callback the Message.key() and Message.value() will be the binary format as encoded by any configured Serializers and not the same object that was passed to produce(). If you wish to pass the original object(s) for key and value to delivery report callback we recommend a bound callback or lambda where you pass the objects along. """ if err is not None: print("Delivery failed for sensor Event {}: {}".format(msg.key(), err)) return print('Sensor Event {} successfully produced to {} [{}] at offset {}'.format( msg.key(), msg.topic(), msg.partition(), msg.offset())) def main(): # Parse arguments parser = argparse.ArgumentParser(description='Produces time series data from emulated ' 'sensors into a kafka topic hosted at a HopsWorks cluster.') parser.add_argument("-c", "--config", default='config.toml', help='Configuration file in toml format.') parser.add_argument("-t", "--time", default=0, type=int, help='Start time step for the time series generator. Used to resume ' 'generating the time series after stopping the program.') parser.add_argument("-e", "--events", default=1000, type=int, help='Number of events to generate per sensor. Negative for infinite number.') parser.add_argument("-d", "--delay", default=0.5, type=float, help='Delay between events in second. Can be float.') args = parser.parse_args() # Load HopsWorks Kafka configuration conf = toml.load(args.config) # Kafka schema that this program supports/expects # The schema will be checked against the schema of the Kafka topic schema_str = """ { "type": "record", "name": "sensor", "fields": [ { "name": "timestamp", "type": "long", "logicalType": "timestamp-millis" }, { "name": "id", "type": "string" }, { "name": "value", "type": "double" } ] } """ # url for the schema registry in HopsWorks REST API services registry_url = 'https://' + conf['hops']['url']\ + conf['api']['base'] + '/project/'+conf['project']['id']+'/kafka' # Initialise the Confluent schema registry client schema_registry_conf = {'url': registry_url, 'ssl.ca.location': conf['hops']['verify']} schema_registry_client = SchemaRegistryClient(schema_registry_conf) # Add the API key required by HopsWorks but not configurable through the confluent schema registry client headers={'Authorization': 'ApiKey ' + conf['api']['key']} schema_registry_client._rest_client.session.headers.update(headers) # Initialize the avro serializer for the value using the schema avro_serializer = AvroSerializer(schema_registry_client, schema_str, event_to_dict, {'auto.register.schemas': False, 'subject.name.strategy': record_subject_name_strategy}) # Initialize a simple String serializer for the key string_serializer = StringSerializer('utf_8') # Initialize the producer producer_conf = {'bootstrap.servers': conf['hops']['url']+':'+conf['kafka']['port'], 'security.protocol': 'SSL', 'ssl.ca.location': conf['project']['ca_file'], 'ssl.certificate.location': conf['project']['certificate_file'], 'ssl.key.location': conf['project']['key_file'], 'ssl.key.password': conf['project']['key_password'], 'key.serializer': string_serializer, 'value.serializer': avro_serializer} producer = SerializingProducer(producer_conf) # Initialize a number of sensors start = args.time end = start + args.events if args.events > 0 else -1 sensors = [ sensor(baseline=10, slope=0.1, period=100, amplitude=40, noise_level=5, start=start, end=end), sensor(baseline=10, slope=0.2, period=50, amplitude=30, noise_level=4, start=start, end=end), sensor(baseline=20, slope=-0.1, period=100, amplitude=50, noise_level=6, phase=20, start=start, end=end), sensor(baseline=10, slope=0.1, period=100, amplitude=40, noise_level=0, start=start, end=end), sensor(baseline=30, slope=-0.1, period=100, amplitude=40, noise_level=5, start=start, end=end), sensor(baseline=40, slope=0, period=200, amplitude=10, noise_level=4, start=start, end=end), sensor(baseline=0, slope=0.3, period=100, amplitude=20, noise_level=6, phase=50, start=start, end=end), sensor(baseline=-10, slope=0.1, period=100, amplitude=40, noise_level=9, start=start, end=end), ] # Start producing events print("Producing sensor events to topic {}.".format(conf['kafka']['topic'])) print('Press Ctrl-c to exit.') time_step = start # a counter for the number of time steps generated try: for data in zip(*sensors): timestamp = datetime.now() time_step += 1 for i, d in enumerate(data): # Serve on_delivery callbacks from previous calls to produce() producer.poll(0.0) try: event = Event(id='sensor'+str(i), timestamp=timestamp, value=d) producer.produce(topic=conf['kafka']['topic'], key=event.id, value=event, on_delivery=delivery_report) except KeyboardInterrupt: break except ValueError: print("Invalid input, discarding record...") continue sleep(args.delay) except KeyboardInterrupt: print('\nStopping...') print("Flushing records...") producer.flush() print('To continue execution start from event {}'.format(time_step)) if __name__ == '__main__': main()

The program takes a number of optional command line arguments to control the execution. You can specify the location of the configuration file using the -c flag. You can use -e to control the number of events generated per sensor and -d for the delay between events per sensor. The -t flag is used to resume the generation of the time series from the specified time step. This is useful if you want to continue generating more events after the program finishes or stopped.

-- CODE language-bash -- python avro_producer.py --help

-- CODE language-bash -- $ python avro_producer.py --help usage: avro_producer.py [-h] [-c CONFIG] [-t TIME] [-e EVENTS] [-d DELAY] Produces time series data from emulated sensors into a kafka topic hosted at a HopsWorks cluster. optional arguments: -h, --help show this help message and exit -c CONFIG, --config CONFIG Configuration file in toml format. -t TIME, --time TIME Start time step for the time series generator. Used to resume generating the time series after stopping the program. -e EVENTS, --events EVENTS Number of events to generate per sensor. Negative for infinite number. -d DELAY, --delay DELAY Delay between events in second. Can be float.

Warning

There is a bug in the HopsWorks REST API implementation for the schema registry that causes an HTTP error code 415 “Unsupported Media Type”.

The reason for this error is a mismatch of the content type sent between the client and the server. The Confluent schema registry client is sending the correct type which is ‘application/vnd.schemaregistry.v1+json’. While the Hopsworks REST API server is expecting content of type ‘application/json’. The bug is reported to the HopsWorks team and is expected to be fixed in upcoming releases after v2.2.

The easiest workaround is to change the Confluent schema registry client to send content type ‘application/json’. This should be OK if you are using Python virtualenv as this change will not affect other applications.

Edit the file schema_registry_client.py in your local python install directory and search for the line with ‘Content-Type’ (line 165 in confluent-kafka v1.7.0) and change it to: 'Content-Type': "application/json"}.

The location of the file depends on your Python installation. If you are using virtualenv it will look something like: ~/.virtualenvs/myvenv/lib/python3.8/site-packages/confluent_kafka/schema_registry/schema_registry_client.py

Now let's generate some events. Below is a sample execution of 5 events with 0.5 seconds delay:

-- CODE language-bash -- python avro_producer.py -e 5 -d 0.5

-- CODE language-bash -- $ python avro_producer.py -e 5 -d 0.5 Producing sensor events to topic temperature. Press Ctrl-c to exit. Sensor Event b'sensor0' successfully produced to temperature [0] at offset 0 Sensor Event b'sensor1' successfully produced to temperature [0] at offset 1 Sensor Event b'sensor2' successfully produced to temperature [0] at offset 2 Sensor Event b'sensor3' successfully produced to temperature [0] at offset 3 Sensor Event b'sensor0' successfully produced to temperature [0] at offset 4 Sensor Event b'sensor1' successfully produced to temperature [0] at offset 5 Sensor Event b'sensor2' successfully produced to temperature [0] at offset 6 Sensor Event b'sensor3' successfully produced to temperature [0] at offset 7 Sensor Event b'sensor4' successfully produced to temperature [1] at offset 0 Sensor Event b'sensor5' successfully produced to temperature [1] at offset 1 Sensor Event b'sensor6' successfully produced to temperature [1] at offset 2 Sensor Event b'sensor7' successfully produced to temperature [1] at offset 3 Sensor Event b'sensor4' successfully produced to temperature [1] at offset 4 Sensor Event b'sensor5' successfully produced to temperature [1] at offset 5 Sensor Event b'sensor6' successfully produced to temperature [1] at offset 6 Sensor Event b'sensor7' successfully produced to temperature [1] at offset 7 Sensor Event b'sensor0' successfully produced to temperature [0] at offset 8 Sensor Event b'sensor1' successfully produced to temperature [0] at offset 9 Sensor Event b'sensor2' successfully produced to temperature [0] at offset 10 Sensor Event b'sensor3' successfully produced to temperature [0] at offset 11 Sensor Event b'sensor4' successfully produced to temperature [1] at offset 8 Sensor Event b'sensor5' successfully produced to temperature [1] at offset 9 Sensor Event b'sensor6' successfully produced to temperature [1] at offset 10 Sensor Event b'sensor7' successfully produced to temperature [1] at offset 11 Sensor Event b'sensor4' successfully produced to temperature [1] at offset 12 Sensor Event b'sensor5' successfully produced to temperature [1] at offset 13 Sensor Event b'sensor6' successfully produced to temperature [1] at offset 14 Sensor Event b'sensor7' successfully produced to temperature [1] at offset 15 Sensor Event b'sensor0' successfully produced to temperature [0] at offset 12 Sensor Event b'sensor1' successfully produced to temperature [0] at offset 13 Sensor Event b'sensor2' successfully produced to temperature [0] at offset 14 Sensor Event b'sensor3' successfully produced to temperature [0] at offset 15 Flushing records... Sensor Event b'sensor4' successfully produced to temperature [1] at offset 16 Sensor Event b'sensor5' successfully produced to temperature [1] at offset 17 Sensor Event b'sensor6' successfully produced to temperature [1] at offset 18 Sensor Event b'sensor7' successfully produced to temperature [1] at offset 19 Sensor Event b'sensor0' successfully produced to temperature [0] at offset 16 Sensor Event b'sensor1' successfully produced to temperature [0] at offset 17 Sensor Event b'sensor2' successfully produced to temperature [0] at offset 18 Sensor Event b'sensor3' successfully produced to temperature [0] at offset 19 To continue execution start from event 5

Let’s generate some more events. Notice the last line in the execution above. It prints the time step that should be used to continue execution. To do that, we add -t 5 to the command:

-- CODE language-bash -- python avro_producer.py -e 5 -d 0.5 -t 5

-- CODE language-bash -- $ python avro_producer.py -e 5 -d 0.5 -t 5 Producing sensor events to topic temperature. Press Ctrl-c to exit. Sensor Event b'sensor0' successfully produced to temperature [0] at offset 20 Sensor Event b'sensor1' successfully produced to temperature [0] at offset 21 Sensor Event b'sensor2' successfully produced to temperature [0] at offset 22 Sensor Event b'sensor3' successfully produced to temperature [0] at offset 23 Sensor Event b'sensor0' successfully produced to temperature [0] at offset 24 Sensor Event b'sensor1' successfully produced to temperature [0] at offset 25 Sensor Event b'sensor2' successfully produced to temperature [0] at offset 26 Sensor Event b'sensor3' successfully produced to temperature [0] at offset 27 Sensor Event b'sensor4' successfully produced to temperature [1] at offset 20 Sensor Event b'sensor5' successfully produced to temperature [1] at offset 21 Sensor Event b'sensor6' successfully produced to temperature [1] at offset 22 Sensor Event b'sensor7' successfully produced to temperature [1] at offset 23 Sensor Event b'sensor4' successfully produced to temperature [1] at offset 24 Sensor Event b'sensor5' successfully produced to temperature [1] at offset 25 Sensor Event b'sensor6' successfully produced to temperature [1] at offset 26 Sensor Event b'sensor7' successfully produced to temperature [1] at offset 27 Sensor Event b'sensor4' successfully produced to temperature [1] at offset 28 Sensor Event b'sensor5' successfully produced to temperature [1] at offset 29 Sensor Event b'sensor6' successfully produced to temperature [1] at offset 30 Sensor Event b'sensor7' successfully produced to temperature [1] at offset 31 Sensor Event b'sensor0' successfully produced to temperature [0] at offset 28 Sensor Event b'sensor1' successfully produced to temperature [0] at offset 29 Sensor Event b'sensor2' successfully produced to temperature [0] at offset 30 Sensor Event b'sensor3' successfully produced to temperature [0] at offset 31 Sensor Event b'sensor0' successfully produced to temperature [0] at offset 32 Sensor Event b'sensor1' successfully produced to temperature [0] at offset 33 Sensor Event b'sensor2' successfully produced to temperature [0] at offset 34 Sensor Event b'sensor3' successfully produced to temperature [0] at offset 35 Sensor Event b'sensor4' successfully produced to temperature [1] at offset 32 Sensor Event b'sensor5' successfully produced to temperature [1] at offset 33 Sensor Event b'sensor6' successfully produced to temperature [1] at offset 34 Sensor Event b'sensor7' successfully produced to temperature [1] at offset 35 Flushing records... Sensor Event b'sensor0' successfully produced to temperature [0] at offset 36 Sensor Event b'sensor1' successfully produced to temperature [0] at offset 37 Sensor Event b'sensor2' successfully produced to temperature [0] at offset 38 Sensor Event b'sensor3' successfully produced to temperature [0] at offset 39 Sensor Event b'sensor4' successfully produced to temperature [1] at offset 36 Sensor Event b'sensor5' successfully produced to temperature [1] at offset 37 Sensor Event b'sensor6' successfully produced to temperature [1] at offset 38 Sensor Event b'sensor7' successfully produced to temperature [1] at offset 39 To continue execution start from event 10

Note

Remember that when we created the ‘temperature’ topic we set the number of partitions to two. In the output sample the partition number is shown in the square brackets after the topic name. For example ‘temperature [0]’. This means that the event was successfully sent to the temperature topic at partition 0.

Notice that events from the same sensor (e.g., sensor5) always ends up in the same partition (partition [1] in case of sensor5). This is enforced by Kafka to guarantee the ordered processing of events per event source. This is implemented using the key of the produced event which in our case is the sensor id. So pay attention to what you choose as the key depending on the application.

Avro Consumer

The Avro consumer code is similar to the producer code in previous section. It starts with the “Event“ class which is the same as the one in the producer code. The rest is similar but works in the other direction. So now we have a “dict_to_event“ helper function that will return an event object and in the “main()“ function we’ll initialize an Avro deserializer and a consumer. Finally the code loops to poll messages and plot the values.

-- CODE language-bash -- # This is a simple example of the SerializingProducer using Avro. # from confluent_kafka import DeserializingConsumer from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka.schema_registry.avro import AvroDeserializer from confluent_kafka.serialization import StringDeserializer from confluent_kafka.schema_registry import record_subject_name_strategy from datetime import datetime, timedelta import toml import argparse from collections import deque import matplotlib.pyplot as plt class Event(object): """ An object representing a sensor event Args: id (str): Sensor's id timestamp (datetime): timestamp in milliseconds value (double): Sensor's reading value """ def __init__(self, id, timestamp, value): self.id = id self.timestamp = timestamp self.value = value def dict_to_event(obj, ctx): """ Converts object literal(dict) to an Event instance. Args: obj (dict): Object literal(dict) ctx (SerializationContext): Metadata pertaining to the serialization operation. """ if obj is None: return None return Event(id=obj['id'], timestamp=datetime.fromtimestamp(obj['timestamp']), value=obj['value']) def main(): # Parse arguments parser = argparse.ArgumentParser(description='Consumes events from kafka topic hosted at a HopsWorks cluster.') parser.add_argument("-c", "--config", default='config.toml', help='Configuration file in toml format.') parser.add_argument("-s", "--sensors", default=8, type=int, help='The total number of sensors to visualize.') args = parser.parse_args() # Load HopsWorks Kafka configuration conf = toml.load(args.config) # Kafka schema that this program supports/expects # The schema will be checked against the schema of the Kafka topic schema_str = """ { "type": "record", "name": "sensor", "fields": [ { "name": "timestamp", "type": "long", "logicalType": "timestamp-millis" }, { "name": "id", "type": "string" }, { "name": "value", "type": "double" } ] } """ # url for the schema registry in HopsWorks REST API services registry_url = 'https://' + conf['hops']['url']\ + conf['api']['base'] + '/project/'+conf['project']['id']+'/kafka' # Initialise the Confluent schema registry client schema_registry_conf = {'url': registry_url, 'ssl.ca.location': conf['hops']['verify']} schema_registry_client = SchemaRegistryClient(schema_registry_conf) # Add the API key required by HopsWorks but not configurable through the confluent schema registry client headers={'Authorization': 'ApiKey ' + conf['api']['key']} schema_registry_client._rest_client.session.headers.update(headers) # Initialize the avro deserializer for the value using the schema avro_deserializer = AvroDeserializer(schema_registry_client, schema_str, dict_to_event) # Initialize a simple String deserializer for the key string_deserializer = StringDeserializer('utf_8') # Initialize the consumer consumer_conf = {'bootstrap.servers': conf['hops']['url']+':'+conf['kafka']['port'], 'security.protocol': 'SSL', 'ssl.ca.location': conf['project']['ca_file'], 'ssl.certificate.location': conf['project']['certificate_file'], 'ssl.key.location': conf['project']['key_file'], 'ssl.key.password': conf['project']['key_password'], 'key.deserializer': string_deserializer, 'value.deserializer': avro_deserializer, 'group.id': conf['kafka']['consumer']['group_id'], 'auto.offset.reset': conf['kafka']['consumer']['auto_offset_reset'], } consumer = DeserializingConsumer(consumer_conf) # Subscribe to a topic consumer.subscribe([conf['kafka']['topic']]) # a list of buffers to store data for plotting MAX_BUFFER = 1000 # max events to store for plotting, then graph will scroll buffer = [deque(maxlen=MAX_BUFFER) for x in range(args.sensors)] # Plotting fig, ax = plt.subplots(len(buffer), sharex=True) lines = [a.plot([])[0] for a in ax] plt.show(block=False) def plot(): # x is shared, so set lim once ax[0].set_xlim(0, max(len(b) for b in buffer)+10) for b, l, a in zip(buffer, lines, ax): if len(b) == 0: continue l.set_data(range(len(b)), b) a.set_ylim(min(b)-10, max(b)+10) fig.canvas.draw() fig.canvas.flush_events() # loop for consuming events time = datetime.now() # time for replotting every delta seconds delta = timedelta(seconds=0.5) while True: try: # plot if datetime.now() - time > delta: time = datetime.now() plot() # SIGINT can't be handled when polling, limit timeout to 1 second. msg = consumer.poll(1.0) if msg is None: continue event = msg.value() if event is not None: print("Event record {}: id: {}\n" "\ttimestamp: {}\n" "\tvalue: {}\n" .format(msg.key(), event.id, event.timestamp, event.value)) # store event in buffer for plotting id = int(event.id[6:]) buffer[id].append(event.value) except KeyboardInterrupt: break consumer.close() if __name__ == '__main__': main()


Run avro_consumer.py with the command below. It will start receiving and plotting the 10 events we produced in the previous example. After that the program will wait for more events. Keep it running as we’ll be producing more events soon.

Note

The consumer received the 10 events we generated in the previous section because we set the auto.offset.reset property to 'earliest' in our configuration file. This causes a consumer group, when first created, to get all available events in the Kafka topic. Another option is 'latest' which will cause the consumer group to get only the current events ignoring old ones. Read more about consumer groups and offset management here.

-- CODE language-bash -- $ python avro_consumer.py

Keep the avro_producer.py running and try generating 20 more events with the command below.

-- CODE language-bash -- $ python avro_producer.py -e 20 -d 0.5 -t 10

The producer will start producing more events and you will see the consumer receiving and plotting them. The output should be similar to the figure below.

Kafka example with one consumer

Now try creating another avro_consumer.py in another terminal leaving the previous one running.

-- CODE language-bash -- $ python avro_consumer.py

Then produce 20 more events:

-- CODE language-bash -- $ python avro_producer.py -e 20 -d 0.5 -t 30

Notice that now the produced events will be split between the two consumers, or to be more precise, the partitions will be split among the available consumers. Since we created two partitions, we can have a maximum of two consumers. The output should look similar to the image below.

Kafka example with two consumers

Note

Kafka remembers the events consumed by a consumer group. So if a consumer is interrupted and then restarted, it will continue from where it stopped. This is achieved through the consumer commit the offsets corresponding to the messages it has read. This can be configured to provide different delivery guarantees. The default is auto-commit that gives you at least once delivery guarantee. You can read more about this topic here.

Schema Registry Clients (Optional)

In some cases you might need to programmatically access the schema registry and retrieve the schema associated with a topic or by schema name. In this section we’ll show three different ways to do this. The source code for the examples is available at schema_examples.py.

To run this example you will need hops-util-py which is a helper library for HopsWorks that hides some of the configurations and initializations needed to access HopsWorks services. Install it with the following command.

-- CODE language-bash -- $ pip install hops

The code starts by importing required libraries and loading the configuration file.

-- CODE language-bash -- from hops import project from hops import kafka from hops import util, constants from confluent_kafka import avro from confluent_kafka.schema_registry import SchemaRegistryClient import requests import toml import argparse ### Examples on how to interact with HopsWorks Schema Registry service ### externally from outside the HopsWorks cluster to query the schema ### Hops Configuration # Parse arguments parser = argparse.ArgumentParser(description='Examples using different methods to access' ' HopsWorks Schema Registry externally from outside the cluster.') parser.add_argument("-c", "--config", default='config.toml', help='Configuration file in toml format.') args = parser.parse_args() conf = toml.load(args.config)

The first example uses the HopsWorks REST API directly to query the schema registry. You need to construct a url for your query following the API documentation. In our case that is getTopicSubject. Then use a library, such as requests to send your query and retrieve the response. Note that you need to add your API Key to the request header.

-- CODE language-bash -- ### Get schema using HopsWorks REST API ### https://app.swaggerhub.com/apis-docs/logicalclocks/hopsworks-api/2.2.0#/Project%20Service/getTopicSubject print('Example 1: Using HopsWorks REST API') print('===================================') print() # Security header with the API Key headers={'Authorization': 'ApiKey ' + conf['api']['key']} # list all available schemas for your project print('list all available schemas for your project') url = 'https://' + conf['hops']['url'] + conf['api']['base'] + '/project/' + conf['project']['id'] + '/kafka/subjects' print('url: ' + url) response = requests.get(url, headers=headers, verify=conf['hops']['verify']) print('schemas: ' + response.text) print() # get the schema associated with a topic using the topic name print('get the schema associated with a topic using the topic name') url = 'https://' + conf['hops']['url'] + conf['api']['base'] + '/project/' + conf['project']['id'] \ + '/kafka/topics/' + conf['kafka']['topic'] + '/subjects' print('url: ' + url) response = requests.get(url, headers=headers, verify=conf['hops']['verify']) schema = response.json()['schema'] print('schema for topic {} using HopsWorks REST API:'.format(conf['kafka']['topic'])) print(schema) print()

-- CODE language-bash -- Example 1: Using HopsWorks REST API =================================== list all available schemas for your project url: https://192.168.1.10/hopsworks-api/api/project/1143/kafka/subjects schemas: [inferenceschema, sensor] get the schema associated with a topic using the topic name url: https://192.168.1.10/hopsworks-api/api/project/1143/kafka/topics/temperature/subjects schema for topic temperature using HopsWorks REST API: {"type":"record","name":"sensor","fields":[{"name":"timestamp","type":"long","logicalType":"timestamp-millis"},{"name":"id","type":"string"},{"name":"value","type":"double"}]}

The second example uses the handy hops-util-py library. All you need is to connect to your project using the project name, url, and API Key. Then use kafka.get_schema('topic_name')) to get the schema.

The third example uses the Confluent Schema Registry client. You will need to construct the url for the schema registry of your project then use it to initialize the schema registry client. You will also need to add the API Key to the header.

Now you can use the client to query the schema registry. In this example we use sc.get_latest_version('schema_name') to retrieve the schema.

-- CODE language-bash -- ### Get the schema using the Confluent Schema Registry client print('Example 3: Using the Confluent Schema Registry client') print('=====================================================') print() registry_url = 'https://' + conf['hops']['url']\ + conf['api']['base'] + '/project/'+conf['project']['id']+'/kafka' print('registry url: ' + registry_url) sc = SchemaRegistryClient({'url': registry_url, 'ssl.ca.location': conf['hops']['verify']}) # Add the API key required by HopsWorks but not configurable through the confluent schema registry client sc._rest_client.session.headers.update(headers) # here we must use the schema name to look it up as confluent allows multiple schemas per topic # see: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#subject-name-strategy print('get the schema using schema name') schema = sc.get_latest_version(conf['kafka']['schema']) print('id: {}'.format(schema.schema_id)) print('subject: {}'.format(schema.subject)) print('version: {}'.format(schema.version)) print('schema with confluent schema client:') print(schema.schema.schema_str)

-- CODE language-bash -- Example 3: Using the Confluent Schema Registry client ===================================================== registry url: https://109.225.89.144/hopsworks-api/api/project/1143/kafka get the schema using schema name id: 1030 subject: sensor version: 1 schema with confluent schema client: {"type":"record","name":"sensor","fields":[{"name":"timestamp","type":"long","logicalType":"timestamp-millis"},{"name":"id","type":"string"},{"name":"value","type":"double"}]}

Simple Producer/Consumer (Optional)

For completeness we include the code for simple_producer.py and simple_consumer.py. By simple we mean that it doesn’t use Avro schemas and doesn’t validates schema. Kafka only sees blobs of bytes. It is up to you to keep track of what is stored in the topic and how to interpret the value.

-- CODE language-bash -- # Load HopsWorks Kafka configuration conf = toml.load(args.config) # Initialize the Producer producer_conf = {'bootstrap.servers': conf['hops']['url']+':'+conf['kafka']['port'], 'security.protocol': 'SSL', 'ssl.ca.location': conf['project']['ca_file'], 'ssl.certificate.location': conf['project']['certificate_file'], 'ssl.key.location': conf['project']['key_file'], 'ssl.key.password': conf['project']['key_password']} p = Producer(producer_conf) for data in "Hello Kafka! I'm a simple client sending in some strings.".split(): # Trigger any available delivery report callbacks from previous produce() calls p.poll(0) # Asynchronously produce a message, the delivery report callback # will be triggered from poll() above, or flush() below, when the message has # been successfully delivered or failed permanently. p.produce(conf['kafka']['topic'], data.encode('utf-8'), callback=delivery_report) # Wait for any outstanding messages to be delivered and delivery report # callbacks to be triggered. p.flush() if __name__ == '__main__': main()

-- CODE language-bash -- from confluent_kafka import Consumer import toml import argparse def main(): # Parse arguments parser = argparse.ArgumentParser(description='A simple Consumer example to consume strings' ' from a kafka topic hosted at a HopsWorks cluster.') parser.add_argument("-c", "--config", default='config.toml', help='Configuration file in toml format.') args = parser.parse_args() # Load HopsWorks Kafka configuration conf = toml.load(args.config) # Initialize the Consumer consumer_conf = {'bootstrap.servers': conf['hops']['url']+':'+conf['kafka']['port'], 'security.protocol': 'SSL', 'ssl.ca.location': conf['project']['ca_file'], 'ssl.certificate.location': conf['project']['certificate_file'], 'ssl.key.location': conf['project']['key_file'], 'ssl.key.password': conf['project']['key_password'], 'group.id': conf['kafka']['consumer']['group_id'], 'auto.offset.reset': conf['kafka']['consumer']['auto_offset_reset'], } consumer = Consumer(consumer_conf) # Subscribe to topics consumer.subscribe([conf['kafka']['topic']]) while True: try: msg = consumer.poll(1.0) if msg is None: continue if msg.error(): print("Consumer error: {}".format(msg.error())) continue print('Received message: {}'.format(msg.value().decode('utf-8'))) except KeyboardInterrupt: break consumer.close() if __name__ == '__main__': main()

Warning

Before running the simple_producer.py example make sure to create a new topic to avoid conflicts with the Avro examples. Also make a copy of your config.toml file and change the topic to match your new topic and use a different group_id than the one used in the Avro examples. When running the example use the -c flag to point to your new configuration file.

-- CODE language-bash -- $ python simple_producer.py -c config2.toml

-- CODE language-bash -- Message delivered to strings [0] Message delivered to strings [0] Message delivered to strings [0] Message delivered to strings [0] Message delivered to strings [0] Message delivered to strings [0] Message delivered to strings [0] Message delivered to strings [1] Message delivered to strings [1] Message delivered to strings [1]

-- CODE language-bash -- $ python simple_consumer.py -c config2.toml

-- CODE language-bash -- Received message: Hello Received message: Kafka! Received message: I'm Received message: a Received message: simple Received message: client Received message: sending Received message: in Received message: some Received message: strings.

Source Code

All source code is available at Kafka HopsWorks Examples at GitHub.

Comparing RonDB on AWS, Azure and GCP using Sysbench

RonDB
>
9/3/2021
>
No items found.

RonDB is based on MySQL NDB Cluster optimised for use in modern cloud settings. In RonDB 21.04.0, we have integrated benchmark scripts to execute various benchmarks towards RonDB. We focus our development, testing and optimisations towards executing RonDB in an efficient manner in AWS, Azure and GCP. In this blog, we compare RonDB on AWS, Azure and GCP using Sysbench.

Benchmark Setup

What we have discovered in our benchmarking is that even with very similar HW there are some differences in how RonDB performs on the different clouds. So this report presents the results using very similar setups in AWS, Azure and GCP.


Above we have the benchmark setup used in all the experiments. There are always 2 RonDB data nodes and they are replicas of each other. Thus all write operations are written on both data nodes to ensure that we are always available even in the presence of node failures.

The MySQL Servers are pure clients since data is located on the RonDB data nodes. Thus we can easily scale the benchmark using any number of MySQL Servers. The benchmark application runs on a single VM that sends SQL queries to the MySQL Servers and receives results using a MySQL client written in C. It is sufficient to have a single Sysbench server for these experiments.

In this experiment we will scale RonDB data nodes by using different VM types. It is also possible to scale RonDB by adding more RonDB data nodes. Both of these changes can be performed without any downtime.

It is possible to execute the Sysbench server local to the MySQL Server and let multiple Sysbench servers execute in parallel. This would however be a 2-tiered cluster and we wanted to test a 3-tiered cluster setup since we think this is the most common setup used. Obviously a 2-tiered cluster setup will have lower latency, but it will also be more complex to maintain.

There is also a RonDB management server in the setup, however this is not involved in the benchmark execution and is either located in the Sysbench server or a separate dual CPU VM. 

Availability Zones

AWS, Azure and GCP all use a concept called Availability Zones. These are located in the same city, but can be distant from each other. The latency between Availability Zones can be more than 1 millisecond in latency for each jump. RonDB contains options to optimise for such a setup, but in this test we wanted to test a setup that is within an Availability Zone.

Thus all setups we ensured that all VMs participating in cluster setup were in the same zone. Even within a zone the variance on the latency can be substantial. We see this in that the benchmark numbers can vary even within the same cloud and the same availability zone on different runs. From other reports it is reported that network latency is around 40-60 microseconds between VMs in the same availability zone. Our experience is that it is normal that this latency varies at least 10-20 microseconds up or down. In Azure it is even possible that the variance is higher since they can implement some availability zones in multiple buildings. In this case Azure provides a concept called Proximity Placement Groups that can be used to ensure that VMs are located in the same building and not spread between buildings in the same availability zone.

RonDB VM Types

All cloud vendors have VMs that come from different generations of SW and HW. For a latency sensitive application like RonDB this had serious implications. All the VMs we tested used very similar Intel x86 CPUs. There is some difference in performance between older Intel x86 and newer CPUs. However this difference is usually on the order of 30-40%, so not so drastic.

However an area where innovation has happened at a much higher pace is networking. Cloud vendors have drastically improved the networking latency, bandwidth and efficiency from generation to generation.

What we found is that it is essential to use the latest VM generation for MySQL Servers and RonDB data nodes. The difference between the latest generation and the previous generation was up to 3x in latency and performance. We found that the latest generation of VMs from all cloud vendors have similar performance, but using older versions had a high impact on the benchmark results. All the benchmarking results in this report uses the latest generation of VMs from all vendors.

For AWS this means using their 5th generation VMs. AWS has three main categories of VMs, these c5, m5 and r5. c5 VMs are focused on lots of CPU and modest amounts of memory. m5 VMs twice as much memory with the same amount of CPU and r5 have 4x more memory than the c5 and the same amount of CPU. For RonDB this works perfectly fine. The RonDB data nodes store all the data and thus require as much memory as possible. Thus we use the r5 category here. MySQL Servers only act as clients in RonDB setup, thus require only a modest amount of memory, thus we use the c5 category here.

The latest generation in Azure is the v4 generation. Azure VMs have two categories, the D and E VMs. The E category has twice as much memory as the D category. The E category is similar to AWS r5 and the D category is similar to the AWS m5 category.

The latest generation in GCP is the n2 generation. They have n2-highcpu that matches AWS c5, n2-standard that matches AWS m5 and n2-highmem that matches AWS r5. GCP also has the ability to extend memory beyond 8 GB per CPU which is obviously interesting for RonDB.

Benchmark Notes on Cloud Vendors

Since we developed the RonDB managed version on AWS we have a bit more experience from benchmarking here. We quickly discovered that the standard Sysbench OLTP RW benchmark actually is not only a CPU benchmark. It is very much a networking benchmark as well. In some benchmarks using 32 VCPUs on the data nodes, we had to send around 20 Gb/sec from the data node VMs. Not all VM types could handle this. In AWS this meant that we had to use a category called r5n. This category uses servers that have 100G Ethernet instead of 25G Ethernet and thus a 32 VCPU VM was provided with bandwidth up to 25G. We didn’t investigate this thoroughly on Azure and GCP.

Some quirks we noted was that the supply of Azure v4 VM instances was somewhat limited. In some regions it was difficult to succeed in allocating a set of large v4 VM instances. In GCP we had issues with our quotas and got a denial to increase the quota size for n2 VMs, which was a bit surprising. This meant that we executed not as many configurations on Azure and GCP. Thus some comparisons are between Azure and AWS only.

Using the latest VM generation AWS, Azure and GCP all had reasonable performance. There were differences of course, but between 10-30% except in one benchmark. Our conclusion is that AWS, Azure and GCP have used different strategies in how to handle networking interrupts. AWS reports the lowest latency on networking in our tests and this is also seen in other benchmark reports. However GCP shows both in our benchmarks and other similar reports to have higher throughput but worse latency. Azure falls in between those.

Our conclusion is that it is likely caused by how network interrupts are handled. If the network interrupts are acted upon immediately one gets the best possible latency. But at high loads the performance goes down since interrupt handling costs lots of CPU. If network interrupts are instead handled using polling the latency is worse, but at high loads the cost of interrupts stays low even at extreme loads.

Thus best latency is achieved through handling interrupts directly and using polling one gets better performance the longer the delay in the network interrupt. Obviously the true answer is a lot more complex than this, but suffice it to say that the cloud vendors have selected different optimisation strategies that work well in different situations.

Benchmark Notes on RonDB

One more thing that affects latency of RonDB to a great extent is the wakeup latency of threads. Based on benchmarks I did while at Oracle I concluded that wakeup latency is about 2x higher on VMs compared to on bare metal. On VMs it can be as high as 25 microseconds, but is likely nowadays to be more like on the order of 10-15 microseconds.

RonDB implements adaptive CPU spinning. This ensures that latency is decreasing when the load increases. This means that we get a latency curve that starts a bit higher, then goes down until the queueing for CPU resources starts to impact latency and after that it follows a normal latency where latency increases as load increases.

Latency variations are very small up to about 50% of the maximum load on RonDB.

In our benchmarks we have measured the latency that 95% of the transactions were below. Thus we didn’t focus so much on single outliers. RonDB is implementing soft real-time, thus it isn’t intended for hard real-time applications where life depends on individual transactions completing in time.

The benchmarks do however report a maximum latency. Most of the time these maximum latencies were as expected. But one outlier we saw, this was on GCP where we saw transaction latency at a bit above 200 ms when executing benchmarks with up to 8 threads. These outliers disappeared when going towards higher thread counts. Thus it seems that GCP VMs have some sort of deep sleep that keeps them down for 200 ms. This latency was always in the range 200-210 milliseconds. Thus it seemed that there was a sleep of 200ms somewhere in the VM. In some experiments on Azure we saw even higher maximum latency with similar behaviour as on GCP. So it is likely that most cloud vendors (probably all) can go into deep sleeps that highly affect latency when operations start up again.

Benchmark Configuration

Ok, now on to numbers. We will show results from 4 different setups. All setups use 2 data nodes. The first setup uses 2 MySQL Servers and both RonDB data nodes and MySQL Servers use VMs with 16 VCPUs. This setup mainly tests latency and performance of MySQL Servers in an environment where data nodes are not overloaded. This test compares AWS, Azure and GCP.

The second setup increases the number of MySQL Servers to 4 in the same setup. This makes the data node the bottleneck in the benchmark. This benchmark also compares AWS, Azure and GCP.

The third setup uses 16 VPUs on data nodes and 2 MySQL Servers using 32 VCPUs. This test shows performance in a balanced setup where both data nodes and MySQL Servers are close to their limit. This test compares AWS and Azure.

The final setup compares a setup with 32 VCPUs on data nodes and 3 MySQL Servers using 32 VCPUs. This setup mainly focuses on behaviour latency and throughput of MySQL Servers in an environment where the data nodes are not the bottleneck. The test compares AWS with Azure.

We used 3 different benchmarks. Standard Sysbench OLTP RW, this benchmark is both a test of CPU performance as well as networking performance. Next benchmark is the same as OLTP RW using a filter where the scans only return 1 of the 100 scanned rows instead of all of them. This makes the benchmark more CPU focused.

The final benchmark is a key lookup benchmark that only sends SQL queries using IN statements. This means that each SQL query performs 100 key lookups. This benchmark shows the performance of simple key lookups using RonDB through SQL queries.

Conclusions

The results show clearly that AWS has the best latency numbers at low to modest loads. At high loads GCP gives the best results. Azure has similar latency to GCP, but doesn’t provide the same benefits at higher loads. These results are in line with similar benchmark reports comparing AWS, Azure and GCP.

The variations from one benchmark run to another run can be significant when it comes to latency. This is natural since there is a random latency added dependent on how far apart the VMs are within the availability zone. However throughput is usually not affected in the same manner.

In some regions Azure uses several buildings to implement one availability zone, this will affect latency and throughput negatively. In those regions it is important to use Proximity Placement Groups in Azure to ensure that all VMs are located in the same building. The effect of this is seen in the last benchmark results in this report.

The limitations on VM networking are a bit different. This played out as a major factor in the key lookup benchmark where one could see that AWS performance was limited due to network bandwidth limitation. Azure VMs had access to a higher networking bandwidth for similar VM types.

AWS provided the r5n VM types, this provided 4x more networking bandwidth with the same CPU and memory setup. This provided very useful for benchmarking using RonDB data nodes with 32 VCPUs.

Benchmark Results

2 Data Nodes@16 VCPUs, 2 MySQL Server@16 VCPUs

Standard OLTP RW

In this benchmark we see clearly the distinguishing features of AWS vs GCP. AWS has superior latency at low load. 6,5 milliseconds compared to 9,66 milliseconds. However GCP reaches higher performance. At 128 threads it reaches 7% higher performance at 7% lower latency. So GCP focuses on the performance at high load whereas AWS focuses more on performance at lower loads. Both approaches have obvious benefits, which is best is obviously subjective and depends on the application.

This benchmark is mainly testing the throughput of MySQL Servers. The RonDB data nodes are only loaded to about 60-70% of their potential throughput with 2 MySQL Servers.

Moving to latency numbers one can see the same story, but even clearer. AWS has a better latency up to 48 threads where the latency of GCP becomes better. In GCP we see that the latency at 1 thread is higher than the latency at 12 threads and only at 24 threads the latency starts to increase beyond the latency at 1 thread. Thus in GCP the latency is very stable over different loads until the load goes beyond 50% of the possible throughput. We see the same behaviour on Azure whereas AWS latency slowly starts to increase at lower thread counts.

Standard OLTP RW using filter

The OLTP RW using a filter is more focused on CPU performance. The major difference is seen at higher loads. The latency at low loads is very similar, but at higher loads we get higher throughput at lower latency. Thus standard OLTP RW has a steeper march from acceptable latency to high latency. The difference in throughput is very small between cloud vendors, it is within 10%.

The comparison between AWS and GCP is similar though. The GCP benefit at higher load is slightly higher and similar to the latency. The AWS advantage at lower loads is slightly lower. Thus GCP has a slight advantage compared to standard OLTP RW, but it is a very small.

Key Lookups

In the graph below we see the number of key lookups that 2 MySQL Servers can drive. The numbers are very equal for the different cloud vendors. AWS as usual has an advantage at lower thread counts and GCP gains the higher numbers at higher thread counts and Azure is usually in the middle.

The latency numbers are shown below. These numbers more clearly show the advantage of AWS at lower thread counts. At higher thread counts the latency is mostly the same for all cloud vendors. This benchmark is extremely regular in its use case and thus it is mostly the CPU performance that matters in this benchmark. Since this is more or the less same on all cloud vendors we see no major difference.

2 Data Nodes@16 VCPUs, 4 MySQL Server@16 VCPUs

In this benchmark the bottleneck moves to the RonDB data nodes. We now have sufficient amounts of MySQL Servers to make the RonDB data nodes a bottleneck. This means a bottleneck that can be both a CPU bottleneck as well as a networking bottleneck.

Standard OLTP RW


The latency is very stable until we reach 64 threads where we have around 15k TPS at 20 milliseconds latency. At higher thread counts the data nodes becomes the bottleneck and in this case the latency has a much higher variation. We can even see that latency at 128 threads in Azure goes down and throughput up. We expect that this is due to interrupt handling being executed on the same CPUs as database processing happens. This is something that we will look more into.


OLTP RW using filter

The throughput of OLTP with a filter means that the focus is more on CPU performance. This makes it clear that the high variation on throughput and latency in standard OLTP RW comes from handling the gigabytes per second of data to send to the MySQL Servers. In this benchmark the throughput increases in a stable manner and similarly the latency goes up in an expected manner.

All cloud vendors are very close to each other except at low thread counts where AWS have an advantage.

Key Lookups

The key lookups with 4 MySQL Server and 2 data nodes and all nodes using 16 VCPUs per node moves the bottleneck to the data node. As usual AWS wins out on the latency at lower thread counts. But at higher thread counts AWS hits a firm wall. Most likely it hits a firm bandwidth limitation on the VMs. This limitation is higher on Azure, thus these VM can go an extra mile and serve 1 million more key lookups per second.

This benchmark uses the same amount of CPUs on the MySQL Server side, but instead of being divided on 4 MySQL Servers, it is using 2 MySQL Servers. We didn’t test GCP in this configuration. We expect no surprises in throughput and latency if we do.

Standard OLTP RW

In the Standard OLTP RW we see that the throughput is the same as with 4 MySQL Servers. However the throughput increases in a more regular manner. What we mainly see is that we can achieve a higher throughput using a smaller amount of threads in total. This makes the throughput more stable. Thus we conclude that at least up to 32 VCPUs it pays off to use larger MySQL Servers if required.

2 Data Nodes@32 VCPUs, 3 MySQL Server@32 VCPUs

In this benchmark we increased the number of CPUs on the RonDB data nodes to 32 VCPUs. Most of the testing in this setup has been performed on AWS. The main reason for including the Azure numbers is because these numbers show the impact of not using Proximity Placement Groups in Azure on large regions. We saw clearly in these benchmarks that the latency in the Azure setup was much higher than in previous benchmarks that were using a smaller region.

However in the smaller region it was difficult to allocate these larger VMs in any larger number. We constantly got failures due to lacking resources to fulfil our requests.

Standard OLTP RW

In AWS we discovered that the network was a bottleneck when executing this benchmark. Thus we used r5n.8xlarge instead of r5.8xlarge VMs in this benchmark. These VMs reside in machines with 100G Ethernet connections and each 32 VCPU VM have access to at least 25 Gb/sec networking. The setup tested here with 3 MySQL Servers doesn’t load the RonDB data node fully. In other benchmarks we were able to increase throughput to around 35k TPS. However these benchmarks used a different setup, so these numbers are not relevant for a comparison. What we see is that the throughput in this case is roughly twice the throughput when using 16 VCPUs in the data nodes

Latency numbers look very good and it is clear that we haven't really reached the bottleneck really in neither the MySQL Servers nor the RonDB data nodes.

OLTP RW using filter

Similarly in this experiment we haven’t really reached the bottleneck on neither the RonDB data nodes nor the MySQL Servers. So no real news from this benchmark.

Get Started with RonDB

There are three ways of using RonDB.

·     The first is using the managed version providedby Logical Clocks. This is currently available in AWS and is currently beingdeveloped to also support Azure. This is still in limited access mode.

·     The second way is to use a script provided byLogical Clocks that automates the creation of VMs and the installation of thesoftware components required by RonDB. These scripts are available to createRonDB clusters on Azure and GCP (Google Cloud).

·     The third manner to use RonDB is to simplydownload the RonDB binary tarball and install it on any computers of your ownliking.

Start by visiting http://rondb.com. From there you will find the download scripts, the tarball to download and to send an email request access to the managed version of RonDB.

MLOps Wars: Versioned Feature Data with a Lakehouse

Feature store
>
8/3/2021
Davit Bzhalava
>
Davit Bzhalava
Jim Dowling

The MLOps Wars series of articles chronicles the struggles that you, as a Data Scientist/Engineer, must surmount to bring your machine learning models to production. This first article covers the challenge of working with versioned data.

TLDR: The Feature Store has become the defacto data layer for MLOps. But, the Feature Store should not just provide low cost storage of feature data (data-lake capabilities), it should also provide data warehousing capabilities: SQL support, transactions, time-travel, decoupled object storage, change-data-capture support, and role-based access control. By adding Apache Hudi support to Hopsworks, we have added Lakehouse capabilities to the Hopsworks Feature Store, enabling more complete support for automated feature pipelines and training pipelines, i.e., MLOps.

Introduction

Machine Learning Operations (MLOps) is a process that applies DevOps principles to automate the building, testing, and deployment of machine learning (ML) models through its entire life cycle. However, in contrast to DevOps, MLOps has to deal with versioned data, not just versioned code. When new data becomes available, we may want to trigger automated feature pipelines to compute the new feature data, keeping features as fresh as possible. The availability of new feature data may, in turn, trigger  training pipelines to build and deploy new models that include the new feature data.

Given that feature data for training models is stored in the offline feature store, how can a feature store know about the availability of new feature data for training models? 

Firstly, we will look at how feature data is made available in Hopsworks as Feature Groups (tables containing sets of features). In Hopsworks, we support two sources for this offline feature data: 

  1. external tables from databases like Snowflake, Delta Lake, JDBC sources can be mounted as on-demand Feature Groups, and
  2. cached Feature Groups are stored as Apache Hudi tables (parquet files with additional metadata, stored in S3 or Azure Blob Storage using HopsFS).

Offline features include the historical values of features and are used to (1) create training datasets and (2) score batch applications with analytical models. When you join features together to create training datasets, you can transparently mix features from on-demand Feature Groups and cached Feature Groups. For completeness, online features (the latest feature data values that are served to online models) are stored in RonDB - the world’s fastest key-value store with SQL capabilities

How do you decide between storing offline features as external tables or cached features? Well, you may already have a feature engineering pipeline in Snowflake that outputs engineering features to a table in Snowflake. You can just mount that table containing the engineered features into Hopsworks as an on-demand Feature Group - only metadata is stored in Hopsworks. Sometimes, however, you will engineer features in either Spark or Python. In this case, you can write your features to the Hopsworks Feature Store directly using the hsfs API. In this case, the offline features are stored in Hopsworks as cached Feature Groups (Hudi tables). Apache Hudi enables reproducible, reliable machine learning pipelines - including support for versioned data, ACID updates/deletes, incremental ingestion, and time-travel. In particular, for MLOps, Hudi enables us to identify when feature data in a cached Feature Group has changed - which you can use to trigger a training pipeline(s).

Extending the Feature Store for MLOps

For cached Feature Groups, feature pipelines keep features up-to-date by either executing on a schedule or in response to an event such as the arrival of new data. In Hopsworks, you can plug in your own pipeline orchestration tool of choice, or use Airflow that comes packaged with Hopsworks. 

Modern feature pipelines written in Python, Spark, or Flink store their output features in low-cost object storage that supports efficient querying, incremental writes/updates/deletes of data, and ACID updates. The main storage frameworks/formats that support these properties are known as lakehouses: Apache Hudi, Delta Lake, and Apache Iceberg. Delta lake, supported by Databricks, is perhaps the best known, but the open-source version is not yet feature-complete compared to Databrick’s version. As of July 2021, OSS Delta Lake lacks the following proprietary features: auto compaction, key indexing using Bloom Filters, and Z-Ordering for better file pruning at read time, Therefore, we chose our first file format to be Apache Hudi. You can track our work on adding OSS Delta support here

In the above diagram, we can see how derived Feature Groups are computed from a base Feature Group stored in Hudi. When new data arrives in the Hudi table, a feature pipeline can pull the incremental changes and update the features for all derived Feature Groups.

Lakehouse formats add extra capabilities to Feature Groups that are key for MLOps: version and temporal information about updates to tables in a commit log (containing the row-level changes to tables). In the above diagram, we can see how this enables feature pipelines that compute changes to derived Feature Groups, when their upstream Feature Groups have changed.

Another useful capability is support for time travel queries, that is, retrieve feature data from a Feature group for a given point-in-time or retrieve all changes in feature data for a given time interval. 

Hopsworks Feature Store adds temporal and data-versioning capabilities to Feature Groups using Apache Hudi. Hudi supports Spark, Flink, Presto and Hive as "compute engines", and as of Hopsworks 2.2., we only support Spark, but support for Flink is coming soon.

Data Versioning with Time-Travel Queries and Incremental Pulling

To illustrate the importance of data versioning and time travel capabilities, let's take a hypothetical example from an e-commerce store. We are tasked to create Feature Groups of orders and product views that will be used to create recommendation engines and business reports. 

Below code snippet shows how to create a Feature Group metadata object and then register it to the Hopsworks feature store, along with the corresponding dataframe. We are going to set the time travel format to “HUDI” and provide column names that will be used as primary keys. Apache Hudi requires primary keys to uniquely identify each record. 

The following code snippet can be run in either Python (with a Pandas Dataframe) or in PySpark (with a Spark Dataframe):

-- CODE language-bash -- # create product view fg product_views_fg = fs.create_feature_group( name = "prodocut_views", description = "E-commerce product views Feature Group", version=1, primary_key = ["user_id", "event_timestamp"], partition_key = ["hour"], hudi_precombine_key = "user_session", time_travel_format = "HUDI", statistics_config={"enabled": True, "histograms": True, "correlations": True} ) product_views_fg.save(product_views_df) # create products fg products_fg = fs.create_feature_group( name = "prodocuts_fg", description = "E-commerce producs Feature Group", version = 1, primary_key = ["product_id"], partition_key = ["category"], time_travel_format="HUDI", statistics_config={"enabled": True, "histograms": True, "correlations": True} ) products_fg.save(products_df)

These two Feature Groups are updated at different cadences: product catalog is updated monthly while product views receive e-commerce streams in micro batches every five minutes. When new data arrives we are able to add it to existing Feature Groups using insert method:

-- CODE language-bash -- product_views_fg.insert(product_views_insert_df)

Streaming data involves handling duplicate events, data insertions and updates, as well as reinstating late arriving data. In our hypothetical example first batch of product orders contains data from  2021-06-22 23:35:34 to 2021-06-22 23:55:00

The second batch contains data from 2021-06-23 00:05:08 to 2021-06-23 00:15:56

As you can see, the second microbach contains a late arrival event from 2021-06-22 23:59:00. We partitioned our Feature Groups based on hour of event time and Apache Hudi will update partition 2021-06-22-23, while in case of non time travel enabled Feature Groups we might have been forced to overwrite the 2021-06-22-23 partition or the entire dataset. We would have ended in the same situation if the microbatch contained a duplicated event.  

In time travel enabled Feature Groups every write operation is automatically versioned. During the insert call if values of primary and partition key combination matches that of already existing ones then rest of the corresponding feature values will be “UPDATE”-d, otherwise an “INSERT” operation will be performed. This is called the “UPSERT” - a combination of “UPDATE” and “INSERT”. 

When you define a Feature Group using hsfs, you can enable the computation of statistics for that Feature Group. In this case, for every commit, hsfs will compute statistics over the data being ingested and all the data in the Feature group, and it will store it together with other commit metadata, such as number of rows inserted, updated and/or deleted  as an activity timeline.    

In Hopsworks, the feature store activity timeline metadata is accessible both from UI and hsfs

 From hsfs user can call the commit_details() method on Feature Group object to access its commit details:

-- CODE language-bash -- for item in product_views_fg.commit_details().items(): print(item) (1624400675000, {'committedOn': '20210622222435', 'rowsUpdated': 0, 'rowsInserted': 6, 'rowsDeleted': 0}) (1624400617000, {'committedOn': '20210622222337', 'rowsUpdated': 0, 'rowsInserted': 4, 'rowsDeleted': 0}) (1624400552000, {'committedOn': '20210622222232', 'rowsUpdated': 0, 'rowsInserted': 4, 'rowsDeleted': 0}) (1624400495000, {'committedOn': '20210622222135', 'rowsUpdated': 0, 'rowsInserted': 5, 'rowsDeleted': 0})

Another type of update operation is to delete a record(s) from a Feature Group.  This is achieved by calling delete_commit() method with a data frame containing primary keys on the hudi time travel enabled Feature Group. The following code snippet will delete records of user with id 524325294 from product views Feature Group:

-- CODE language-bash -- data_to_delete = product_views_fg.read().where(“user_id”==524325294) product_views_fg.delete_commit(data_to_delete)

Polling for Data-Change Triggers in MLOps

Time-based triggers are always available in orchestration platforms, such as Apache Airflow, where a feature engineering or training pipeline can be scheduled to run at a user-specified interval, e.g., hourly or daily.  In contrast, data-change triggers can be used to run pipelines if data in a Hudi table is changed, for example.

Polling Triggers for Pipelines

A client can easily track changes in Feature Groups by storing the most recently processed commit-id for a Feature Group, and then polling for changes.

If the client itself does not maintain state, this can be easily offloaded to Hopsworks using custom tags. For example, if an Airflow DAG polls to check if a new training dataset needs to be created, it can get the latest training dataset version and retrieve the commit-ids  for the constituent Feature Groups from the training dataset metadata. It can then compare these commit-ids with the current commit-ids for the Feature Groups, and if a commit-id has changed since the training dataset was built, the logic in your DAG can decide on whether it should recompute a new training dataset or not. As training datasets are immutable, they do not have commit-ids, but instead they have different versions. Similarly, a training pipeline could poll and wait for the appearance of a new training dataset before model training is run.

The following code snippet shows Airflow sensor class for feature group commits. Commit-id from previous training DAG will be saved and if polling identifies a higher commit-id in the current DAG, then it will trigger a pipeline to train a new model with a new version of the training dataset.

-- CODE language-bash -- class FeatureGroupCommitSensor(HopsworksBaseSensor): @apply_defaults def __init__(...): … super(FeatureGroupCommitSensor, self).__init__(...) ... def poke(self, context): # get the latest available commit for product views feature group current_max_commit_time=max([key for key in product_views_fg.commit_details()]) task_instance = context['task_instance'] # It will start training pipeline if there were no previous DAG runs _previous_commit_time = 0 # Identify if previous run succeeded and retrieve fg commit id from that run if task_instance.previous_ti is not None and task_instance.previous_ti.state == 'success': _previous_commit_time = \ task_instance.previous_ti.xcom_pull(self.task_id, key=self.xcom_task_id_key) # if the current commit id is greater than the commit id from previous run then # create a new training dataset and trigger training pipeline if current_max_commit_time > _previous_commit_time: task_instance.xcom_push(self.xcom_task_id_key, current_max_commit_time) return True else: return False

Exploratory Data Analysis on time travel enabled Feature Groups

Data scientists may want to look into what happened from point in time A to point in time B and perform exploratory data analysis using time travel queries  for debugging and analysing model performance. Business analysts may want to create time series reports.  

By default if we don’t specify any point in time the feature store will return the latest snapshot of the Feature Group. This is called Snapshot Query: 

-- CODE language-bash -- product_views_fg.read()

Incremental queries provide a way to retrieve data from a given commit or point in time and perform incremental feature engineering. In Hopsworks feature store incremental reads are performed by calling read_change method on Feature Group metadata object:

-- CODE language-bash -- product_views_fg.read_changes(commit_timestamps[0], commit_timestamps[2]).select("event_type").groupBy("event_type").count().show()

Feature Store is about sharing and reusing features and JOINs enable Feature Reuse. Hopsworks' Feature Store enables users to join Feature Groups that correspond to specific points in time by calling the as_of() method on the Query object. In the following example we are going to join products and product orders Feature Groups that correspond to time as of 2021-06-23 00:15:00. 

-- CODE language-bash -- product_views_fg.select_all().join(products_fg.select_all()).as_of('20210623001500')

It is also possible to join Feature Groups that correspond to different points in time. For example we may want to join the product Feature Group as of point in time 2021-06-23 00:15:00 to the product orders Feature Group that corresponds to point in time 2021-06-23 00:00:00.  In this case one can call as_of separately on each Feature Group and then join them.

-- CODE language-bash -- #select features from different points in time prod_fg_q = products_fg.select_all().as_of(20210623001500) prod_views_fg_q = product_views_fg.select_all().as_of(20210623000000) #join Feature Groups from different points in time prod_fg_q.join(prod_views_fg_q)

Summary 

In this article, we described how cached Feature Groups in the Hopworks Feature Store support time-travel (data-versioning), atomic and incremental upserts using Apache Hudi. These are key enabling properties for a Feature Store as part of an end-to-end MLops framework - enabling incremental feature engineering, reproducible creation of training datasets, change-data-capture support for when to recompute derived Feature Groups, training datasets, and when to run batch analytics pipelines.

Get started

Get started today on hopsworks.ai by using Hopsworks Feature Store and performing time travel queries with this notebook.

Announcing RonDB 21.10.1 beta release & 21.04.1 maintenance release

>
7/29/2021
Mikael Ronström
>
Mikael Ronström

Today we are pleased to announce the release of two new RonDB releases.
The source code of RonDB is found in https://github.com/logicalclocks/rondb.
RonDB information is found through the RonDB website http://rondb.com.

Documentation of RonDB is found at http://docs.rondb.com.

RonDB 21.04.1 is a maintenance release of RonDB 21.04 that contains 3 new features and 18 bug fixes. RonDB 21.04 is a long-term support version that will be supported until 2024.RonDB 21.10.1 is a beta release of RonDB 21.10 that is based on RonDB 21.04.1 and contains an additional 4 features and 2 bug fixes. RonDB 21.10.1 improves throughput in the DBT2 benchmark by 70% compared to RonDB 21.04.1 and improves Sysbench benchmarks by about 10%.

Release Notes for 21.04.1 is found here.
https://docs.rondb.com/release_notes_21041

Release Notes for 21.10.1 is found here.
https://docs.rondb.com/release_notes_21101

You can download a binary tarball of RonDB 21.04.1 here.
http://repo.hops.works/master/rondb-21.04.1-linux-glibc2.17-x86_64.tar.gz

You can download a binary tarball of RonDB 21.10.1 here.
http://repo.hops.works/master/rondb-21.10.1-linux-glibc2.17-x86_64.tar.gz

AI Software Architecture for Copernicus Data with Hopsworks

Hopsworks
>
7/12/2021
Theofilos Kakantousis
>
Theofilos Kakantousis

TLDR: Hopsworks, the data-intensive AI platform with a feature store, brings support for scale-out AI with Copernicus data and the H2020 ExtremeEarth project. Hopsworks is integrated with the Polar and FoodSecurity Thematic Exploitation Platforms (TEPs) on the CREODIAS infrastructure. Two use cases, polar and food security, have been developed by making use of the scale-out distributed deep learning support of Hopsworks and the PBs of data made available by CREODIAS and processed by Hopsworks and the TEPs .

This article is based on the paper “The ExtremeEarth software Architecture for Copernicus Earth Observation Data” included in the Proceedings of the 2021 conference on Big Data from Space (BiDS 2021) [1].

Introduction

In recent years, unprecedented volumes of data are being generated in various domains. Copernicus, a European Union flagship programme, produces more than three petabytes (PB) of Earth Observation (EO) data annually from Sentinel satellites [2]. However, current AI architectures making use of deep learning in remote sensing are struggling to scale in order to fully utilize the abundance of data.

ExtremeEarth is an EU-funded project that aims to develop use-cases that demonstrate how researchers can apply deep learning in order to make use of Copernicus data in the various  European Space Agency (ESA) TEPs. A main differentiator of ExtremeEarth to other such projects is the use of Hopsworks, a Data-Intensive AI software platform with a Feature Store and tooling for horizontally scale-out learning. Hopsworks has successfully been extended as part of ExtremeEarth project to bring specialized AI tools to the EO data community with 2 use cases already developed on the platform with more to come in the near future.

Bringing together a number of cutting edge technologies, which deal from storing extremely large volumes of data all the way to performing scalable machine learning and deep learning algorithms in a distributed manner, and having them operate over the same infrastructure poses some unprecedented challenges. These challenges include, in particular, integration of ESA TEPs and Data and Information Access Service (DIAS) platforms with a data platform (Hopsworks) that enables scalable data processing, machine learning and deep learning on Copernicus data; development of very large training datasets for deep learning architectures targeting the classification of Sentinel images.

In this blog post, we describe both the software architecture of the ExtremeEarth project with Hopsworks as its AI platform centerpiece and the integration of Hopsworks with the other services and platforms of ExtremeEarth that make up for a complete AI with EO data experience. 

AI with Copernicus data - Software Architecture

There are several components that comprise the overall architecture with the main ones being the following.

Hopsworks. An open-source data-intensive AI platform with a feature store. Hopsworks can scale to the petabytes of data required by the ExtremeEarth project and provides tooling to build horizontally scalable end-to-end machine learning and deep learning pipelines. Data engineers and data scientists utilize Hopsworks’ client SDKs that facilitate AI data management, machine learning experiments, and productionizing serving of machine learning models. 

Thematic Exploitation Platforms (TEPs). These are collaborative, virtual work environments providing access to EO data and the tools, processors, and Information and Communication Technology (ICT) resources required to work with various themes, through one coherent interface. TEPs address coastal, forestry, hydrology, geohazards, polar, urban themes, and food security themes. ExtremeEarth in particular is concerned with the polar and food security TEPs where the use cases also stem from. These use cases include building machine learning models for sea ice classification to improve maritime traffic as well as food crops and irrigation classification. 

Data and Information Access Service (DIAS). To facilitate and standardize access to data, the European Commission has funded the deployment of five cloud-based platforms. They provide centralized access to Copernicus data and information, as well as to processing tools. These platforms are known as the DIAS, or Data and Information Access Services [2]. ExtremeEarth software architecture is built on CREODIAS, a Cloud infrastructure platform adapted to the processing of big amounts of EO data, including an EO data storage cluster and a dedicated Infrastructure-as-a-Service (IaaS) Cloud infrastructure for the platform’s users. The EO data repository contains Sentinel-1, 2, 3, and 5-P, Landsat-5, 7, 8, Envisat, and many Copernicus Services data.

Figure 1 provides a high-level overview of the integration of the different components with each other. The components can be classified into four main categories(layers):

  • Product layer. The product layer provides a collaborative virtual work environment, through TEPs, that operates in the cloud and enables access to the products, tools, and relevant EO, and non-EO data. 
  • Processing layer. This layer provides the Hopsworks data-intensive Artificial Intelligence (AI) platform. Hopsworks is installed within the CREODIAS OpenStack cloud infrastructure and operates alongside the TEPs. Also, Hopsworks has direct access to the data layer and the different data products provided by the TEPs.
  • Data layer. The data layer offers a cloud-based platform that facilitates and standardizes access to EO data through a Data and Information Access Service (DIAS). It also provides centralized access to Copernicus data and information, as well as to processing tools. TEPs are installed and run on a DIAS infrastructure, which in the case of ExtremeEarth is the CREODIAS.
  • Physical layer. It contains the cloud environment’s compute, storage, networking resources, and hardware infrastructures.

To provide a coherent environment for AI with EO data to application users and data scientists, the goal of the architecture presented here is to make most components transparent and simplify developer access by using well-defined APIs while making use of commonly used interfaces such as RESTful API. As a result, a key part of the overall architecture is how these different components can be integrated to provide a coherent whole. The APIs used for the full integration of the ExtremeEarth components via the inter-layer interfaces of the software platform are described below and also are illustrated in Figure 1.

  1. Raw EO data. DIASes and CREODIAS in particular, provide Copernicus data access which includes the downstream of Copernicus data as it is generated by satellites. At an infrastructure level, this data is persisted at an object store with an S3 object interface, managed by CREODIAS.
  2. Pre-processed data. TEPs provide the developers of the deep learning pipelines with pre-processed EO data which forms the basis for creating training and testing datasets. Developers of the deep learning pipeline can also define and execute their own pre-processing, if the pre-processed data is not already available.
  3. Object storage. CREODIAS provides object storage used for storing data produced and consumed by the TEPs services and applications. In ExtremeEarth, this object store is used primarily for storing training data required by the Polar and Food Security use cases. This training data is provided as input to the deep learning pipelines.
  4. EO Data Hub Catalog. This service is provided and managed by CREODIAS. It provides various protocols including OGC WFS and a REST API as interfaces to the EO data.
  5. TEP-Hopsworks EO data access. Users can directly access raw EO data from their applications running on Hopsworks. Multiple methods, e.g., object data access API (SWIFT/S3), filesystem interface, etc., are provided for accessing Copernicus and other EO-data available on CREODIAS.
  6. TEP-Hopsworks infrastructure Integration. Hopsworks and both the Polar and Food Security TEPs are installed and operated on CREODIAS and its administrative tools enable TEPs to spawn and manage virtual machines and storage by using CloudFerro [3] which provides an OpenStack-based cloud platform to TEPs. Hopsworks is then installed in this platform and it can access compute and storage resources provided by the TEPs.
  7. TEP-Hopsworks API integration. Hopsworks is provided within the TEP environment as a separate application and is mainly used as a development platform for the deep learning pipelines and applications of the Polar and Food Security use cases. These applications are exposed in the form of processors to the TEP users. Practically, a processor is a TEP abstraction that uses machine learning models that have previously been developed by the data scientists of the ExtremeEarth use cases in Hopsworks. 
  8. Hopsworks-TEPs datasets. TEPs provide users with access to data to be served as input to processors from various sources. Such sources include the data provided by CREODIAS and external services that the TEP can connect to. The pre-processed data is stored in an object storage provided by CREODIAS, thus made available to Hopsworks users by exchanging authentication information. Hopsworks users can also upload their own data to be used for training or serving. Hopsworks provides a REST API for clients to work with model serving, and authentication is done in the form of API keys managed by Hopsworks on a per user basis. These keys can therefore be used by external clients to authenticate against the Hopsworks REST API. There are two ways by which the trained model can be served via the TEP: (i) The model can be exported from Hopsworks and be embedded into the TEP processor. (ii) The model can be served online on Hopsworks and a processor on the TEP submits inference requests to the model serving instance on Hopsworks and returns the results. In method (i), once the machine learning model is developed, it can then be transferred from Hopsworks to the Polar TEP by using the Hopsworks REST API and Python SDK. TEP users can integrate the Hopsworks Python SDK into the processor workflow to further automate the machine learning pipeline lifecycle. In method (ii), the TEP is able to submit inference requests to the model being served by the online model serving infrastructure run on Kubernetes and hosted on Hopsworks. Figure 2 illustrates this approach for the Polar use case
  9. Linked Data tools. Linked data applications are deployed as Hopsworks jobs using Apache Spark. A data scientist can work with big geospatial data using ontologies and submit GeoSPARSQL queries using tools developed and extended within ExtremeEarth, namely GeoTriples [4], Strabo2, JedAI-Spatial, and Semagrow.

Figure 1: ExtremeEarth software architecture


Figure 2: ExtremeEarth software architecture


Application users that interact with the TEPs effectively are the users of the AI products generated by the machine learning and deep learning pipelines developed by the data scientists in Hopsworks. Previously we described the integration of the various components. Figure 3 depicts the flow of events within this architecture. 

  1. EO data scientists log in to Hopsworks.
  2. Read and pre-process raw EO data in Hopsworks, TEP or in the local machine. 
  3. Create training datasets based on the intermediate pre-processed data.
  4. Develop deep learning pipelines.
  5. Perform linked data transformation, interlinking, and storage.
  6. Log in to the Polar or Food Security TEP applications. 
  7. Select Hopsworks as the TEP processor. The processor starts the model serving in Hopsworks via the REST API. The processor also downloads the model from Hopsworks via the REST API and serving is done within the TEP application. 
  8. Submit federated queries with Semagrow and use the semantic catalogue built into Hopsworks. 
Figure 3: ExtremeEarth software architecture flow of events

Conclusion

In this blog,  we have shown how Hopsworks has been integrated with various services and platforms in order to extract knowledge from AI and build AI applications using Copernicus and Earth Observation data. Already two use cases, sea ice classification (PolarTEP) and crop type mapping and classification (Food Security TEP ), have been developed using the aforementioned architecture by using the PBs of data made available by the Copernicus programme and infrastructure.

If you would like to try it out yourself, make sure to register an account. You can also follow Hopsworks on GitHub and Twitter.

References

  1. Proceedings of the 2021 conference on Big Data from Space 18-20 May 2021 
  2. https://workshop.copernicus.eu/sites/default/files/content/attachments/ajax/copernicus_overview.pdf
  3. https://www.copernicus.eu/en/access-data/dias
  4. https://github.com/LinkedEOData/GeoTriples 

Note: This article was previously published by ExtremeEarth.

Hopsworks Online Feature Store: Fast Access to Feature Data for AI Applications

Hopsworks
>
6/17/2021
Moritz Meister
>
Moritz Meister
Fabio Buso
Mikael Ronström
Jim Dowling

TLDR: The Hopsworks Feature Store abstracts away the complexity of a dual database system, unifying feature access for online and batch applications. We built a reliable and performant service to materialize features to the online feature store and to guarantee not only low-latency access but also access to the freshest possible feature values at serving time.

Enterprise Machine Learning models are most valuable when they are powering a part of a product by guiding user interaction. Oftentimes these ML models are applied to an entire database of entities, for example users identified by a unique primary key. An example for such an offline application, would be predictive Customer Lifetime Value,  where a prediction can be precomputed in batches in regular intervals (nightly, weekly), and is then used to select target audiences for marketing campaigns. More advanced AI-powered applications, however, guide user interaction in real-time, such as recommender systems. For these online applications, some part of the model input (feature vector) will be available in the application itself, such as the last button clicked on, while other parts of the feature vector rely on historical or contextual data and have to be retrieved from a back end storage, such as the number of times the user clicked on the button in the last hour or whether the button is a popular button.

In this blog, we are going to dive into the details of the requirements of online applications and how the Hopsworks Feature Store abstracts away the complexity of a dual storage system.

Machine Learning Models in Production

While batch applications with (analytical) models are largely similar to the training of the model itself, requiring efficient access to large volumes of data that will be scored, online applications require low latency access to latest feature values for a given primary key (potentially, multi-part) which is then sent as a feature vector to the model serving instance for inference.

To the best of our knowledge there is no single database accommodating both of these requirements at high performance. Therefore, data teams tended to keep the data for training and batch inference in data lakes, while ML engineers built microservices to replicate the feature engineering logic in micro services for online applications.

This, however, introduces unnecessary obstacles for both Data Scientists and ML engineers to iterate quickly and significantly increases the time to production for ML models:

  1. Data science perspective: Tight coupling of data and infrastructure through microservices, resulting in Data Scientists not being able to move from development to production and not being able to reuse features.
  2. ML engineering perspective: Large engineering effort to guarantee consistent access to data in production as it was seen by the ML model during the training process.

Hopsworks Feature Store: A Transparent Dual Storage System

The Hopsworks Feature Store is a dual storage system, consisting of the high-bandwidth (low-cost) offline storage and the low-latency online store. The offline storage is a mix of Apache Hudi tables on our HopsFS file system (backed by S3 or Azure Blob Storage) and external tables (such as Snowflake, Redshift, etc), together , providing access to large volumes of feature data for training or batch scoring. In contrast, the online store is a low latency key value database that stores only the latest value of each feature and its primary key. The online feature store thereby acts as a low latency cache for these feature values.

In order for this system to be valuable for data scientists and to improve the time to production, as well as providing a nice experience for the end user, it needs to meet some requirements:

  1. Consistent Features for Training and Serving: In ML it’s of paramount importance to replicate the exact feature engineering logic for features in production as was used to generate the features for model training.
  2. Feature Freshness: A low latency, high throughput online feature store is only beneficial if the data stored in it is kept up-to-date. Feature freshness is defined as the end-to-end latency between an event arriving that triggers a feature recomputation and the recomputed feature being published in the online feature store.
  3. Latency: The online feature store must provide near real-time low latency and high throughput, in order for applications to be able to use as many features as possible with its available SLA.
  4. Accessibility: The data needs to be accessible through intuitive APIs, and just as easily as the extraction of data from the offline feature store for training.

The Hopsworks Online Feature Store is built around four pillars in order to satisfy the requirements while scaling to manage large amounts of data:

  1. HSFS API: The Hopsworks Feature Store library is the main entry point to the feature store for developers, and is available for both Python and Scala/Java. HSFS abstracts away the two storage systems, providing a transparent Dataframe API (Spark, Spark Structured Streaming, Pandas) for writing and reading from online and offline storage.
  2. Metadata: Hopsworks can store large amounts of custom metadata in order for Data Scientists to discover, govern, and reuse features, but also to be able to rely on schemas and data quality when moving models to production.
  3. Engine: The online feature store comes with a scalable and stateless service that makes sure your data is written to the online feature store as fast as possible without write amplification from either a data stream (Spark Structured Streaming) or from static Spark or Pandas Dataframes. By write amplification , we mean that you do not have to first materialize your features to storage before ingesting them - you can write directly to the feature store.
  4. RonDB: The database behind the online store is the world’s fastest key-value store with SQL capabilities. Not only building the base for the online feature data, but also handling all metadata generated within Hopsworks.

We will cover each of these in detail in the following sections and provide some benchmarks for quantitative comparison.

RonDB: The Online Feature Store, Foundation of the File System and Metadata

Hopsworks is built from the ground up around distributed scaleout metadata. This helps to ensure consistency and scalability of the services within Hopsworks as well as  the annotation and discoverability of data and ML artifacts.

Since the first release, Hopsworks has been using NDB Cluster (a precursor to RonDB) as the online feature store. In 2020, we created RonDB as a managed version of NDB Cluster, optimized for use as an online feature store.

However, in Hopsworks, we use RonDB for more than just the Online Feature Store. RonDB also stores metadata for the whole Feature Store, including schemas, statistics, and commits. RonDB also stores the metadata of the file system, HopsFS, in which offline Hudi tables are stored. Using RonDB as a single metadata database, we use transactions and foreign keys to keep the Feature Store and Hudi metadata consistent with the target files and directories (inodes). Hopsworks is accessible either through a REST API or an intuitive UI (that includes a Feature Catalog), or programmatically through the Hopsworks Feature Store API (HSFS).

OnlineFS: The Engine for Scalable Online Feature Materialization

With the underlying RonDB and the needed metadata in place, we were able to build a scale-out, high throughput materialization service to perform the updates, deletes, and writes on the online feature store - we simply named it OnlineFS.

OnlineFS is a stateless service using ClusterJ for direct access to the RonDB data nodes. ClusterJ is implemented as a high performance JNI layer on top of the native C++ NDB API, providing low latency and high throughput. We were able to make OnlineFS stateless due to the availability of the metadata in RonDB, such as avro schemas and feature types. Making the service stateless allows us to scale writes to the online feature store up and down by simply adding or removing instances of the service, thereby increasing or decreasing throughput linearly with the number of instances.

Let’s go through the steps needed to write data to the online feature store, which are numbered in the diagram below.

  1. Features are written to the feature store as Pandas or Spark Dataframes. 

Each Dataframe updates a table called a feature group (there is a similar table in the offline store). The features in a feature group share the same primary key, which can be a composite primary key. Primary keys are tracked along with the rest of the metadata. As such, the Hopsworks Feature Store has a Dataframe API, that means that the result of your feature engineering should be a regular Spark, Spark Structured Streaming or Pandas Dataframe that will be written to the Feature Store. The APIs for writing to the Feature Store are almost identical for all three types of Dataframe. With a reference to the feature group object, you insert the Dataframe. The feature group has been configured on creation to either store the Dataframe to both the online and offline stores or to only one of them.

  1. Encode and produce

The rows of the Dataframe are encoded using avro and written to Kafka, running on Hopsworks. Each feature group has its own Kafka topic with a configurable number of partitions, and partitioning by primary key, which is necessary to guarantee the ordering of writes.

  1. Consume and decode

We use Kafka to buffer writes from Spark feature engineering jobs, as a large Spark cluster that writes directly to RonDB could overload RonDB, due to a lack of backpressure in the existing Spark JDBC driver. OnlineFS reads the buffered messages from Kafka and decodes them. Importantly, OnlineFS decodes only primitive feature types, whereas complex features such as embeddings are stored in binary format in the online feature store.

  1. Primary-key based upsert

Now, OnlineFS can perform the actual upsert of the rows to RonDB using the ClusterJ API. Upserts are performed in batches (with a configurable batch size) to improve throughput. 

Since all services in the steps of the pipeline have access to the same metadata, we are able to hide all complexity related to encoding and schemas from the user. Furthermore, all services involved are horizontally scalable (Spark, Kafka, OnlineFS) and due to our streaming-like setup, the process does not create unnecessary copies of the data, that is, there is no write amplification. This highly scalable setup is possible due to the availability of services like a schema registry, a X.509 certificate manager, and Kafka within Hopsworks. At all times, X.509 certificates are used for two-way authentication and TLS is used to encrypt network traffic.

Accessibility means Transparent APIs

In distributed systems, we often speak about transparency. A distributed system is transparent if it hides network access, and implementation specific knowledge from the developer. In the Hopsworks Feature Store, writing is done transparently through the same APIs, as mentioned before (1) no matter if it is a regular Spark, Spark Streaming or Pandas and (2) the system is responsible for updating both online and offline storage consistently.

Insert

The core abstractions in the HSFS library are the metadata objects representing feature groups, training datasets and features in the feature store. Our goal with HSFS was to enable developers to use their favourite languages and frameworks to engineer features. As we aligned on the Dataframe API, anything that is contained in a Dataframe can be written to the feature store. If you have existing ETL or ELT pipelines, which produce a Dataframe containing the features, you can write that Dataframe to the Feature Store by simply acquiring a reference to its feature group object and invoking `.insert()` with your Dataframe as a parameter. This can be called from a regularly scheduled job (using any orchestrator of your choice, alternatively, Hopsworks comes with Airflow, if you want an out-of-the-box orchestrator). But a feature group object can also be updated continuously by writing batches as Dataframes from a Spark structured streaming application.

-- CODE language-bash -- # populate feature group metadata object store_fg_meta = fs.create_feature_group(name="store_fg", version=1, primary_key=["store"], description="Store related features", online_enabled=True) # create feature group for the first time in feature store fg.save(Dataframe) # replace .save with .insert for scheduled batch job fg.insert(Dataframe) # if required, stream data only to the online feature store in long running Spark # Structured Streaming application fg.insert_stream(streaming_Dataframe)

Get

Many existing Feature Stores do not have a representation for models. Hopsworks, however, introduced the Training Dataset abstraction to represent the set of features and the feature values used to train a model. That is, there is a one-to-one mapping between immutable training datasets and models, but a one-to-many relationship from the mutable feature groups to the immutable training datasets. You create a training dataset by joining, selecting and filtering features from feature groups. The training dataset includes metadata for the features, such as which feature group they came from, the commit-id(s) for that feature group, and the ordering of features in the training dataset. All of this information enables HSFS to recreate training datasets at a later point in time and to transparently construct feature vectors at serving time.

-- CODE language-bash -- # create a query feature_join = rain_fg.select_all() \ .join(temperature_fg.select_all(), on=["location_id"]) \ .join(location_fg.select_all()) td = fs.create_training_dataset("rain_dataset", version=1, label=”weekly_rain”, data_format=”tfrecords”) # materialize query in the specified file format td.save(feature_join) # we can also use the training dataset for serving # this serving code typically runs in a Python environment td = fs.get_training_dataset(“rain_dataset”, version=1) # get serving vector td.get_serving_vector({“location_id”: “honolulu”})

The clients of the online feature store are either applications that use ML models or model-serving infrastructure that enriches feature vectors with missing features. Hopsworks provides a JDBC based API to the online store. JDBC has the advantage of offering a high performance protocol, network encryption, authentication of the client, and access control. HSFS provides language level support for Python and Scala/Java. However, you can always fall back on using JDBC directly if your serving application runs in a different programming language or framework.

Benchmarks

There are sysbench benchmarks for RonDB by Mikael Ronstrom (inventor of NDB Cluster and Head of Data at Logical Clocks, leading the RonDB team) and a comparative performance evaluation against Redis available. In this section we show the performance of the OnlineFS service, being able to handle and sustain high throughput in writing to the online feature store, as well as an evaluation of feature vector lookup latency and throughput on a typical managed RonDB setup within Hopsworks.

In this benchmark, Hopsworks is set up with 3xAWS m5.2xlarge (8 vCPU, 32 GB) instances (1 head, 2 worker). The workers are used by Spark for writing Dataframes to the online store. Additionally, the same workers are re-used as clients that perform the read operations on the Online Feature Store for read benchmark.

RonDB is set up with 1x AWS t3.medium (2 vCPU, 4 GB) instance as management node, 2x r5.2xlarge (8 vCPU, 64 GB) instances as data nodes, and 3x AWS c5.2xlarge (8 vCPU, 16 GB) instances for MySQL servers. This setup allows us to store 64GB of data in-memory in the online feature store with 2X replication. The MySQL servers provide the SQL interface to the online feature store, in this benchmark we did not saturate the RonDB data nodes fully, so one could potentially add more MySQL servers and clients to increase throughput beyond the levels shown here.

Write Throughput

We benchmarked the throughput for writing to RonDB in the OnlineFS service. Additionally, we measured the time it takes to process a record from the moment it gets taken from the Kafka topic until it is committed to RonDB. For this benchmark we deployed two OnlineFS services, one on the head node and one on one of the MySQL server nodes.

We ran the experiments by writing 20M rows to the online feature store from a Spark application. After a short warm-up period the throughput of the two service instances stabilizes at ~126K rows/second for 11 features, ~90K rows/second for 51 features and for the largest feature vectors at ~60K rows/second. Due to its design, this can easily be scaled by adding more instances of the service.

This diagram shows the throughput of writing 20m rows from Kafka to RonDB using two instances of the OnlineFS materialization service.

Secondly, we report the time it takes to process the feature vectors within the OnlineFS service. This time does not include the time a record is waiting for processing in Kafka, the reason for that is that the waiting time depends highly on the number of Spark executors writing to Kafka. Instead, you should rely on the throughput numbers to compare them to your requirements.

The processing times are reported on a per row basis, but parts of the pipeline within OnlineFS are parallelized, for example rows are committed to RonDB in batches of 1000. With this setup we achieve p99 of ~250ms for 11 features with a row size of 948 bytes.

This diagram shows the end-to-end processing latency of events starting from when the event is read from Kafka until it has been written to RonDB. The batch size for this experiment was 1000 - you can reduce latency by reducing the batch size, but reducing the batch size may also reduce throughput.

Serving Lookup Throughput and Latency

We benchmarked throughput and latency for different feature vector sizes in relation to an increasing number of clients performing requests in parallel. Note that the clients were split among the two worker nodes (each 8vCPU).

Single vector per request:

In this benchmark, every request contains one primary key value lookup (one feature vector). Throughput and latency scale linearly up to 16 clients while sustaining low latencies. With more than 16 clients we observed the hosts on which the clients are being run getting to their maximum CPU and network utilization. Furthermore, we did not see an over-utilization of the RonDB data nodes or the MySQL servers, which means we could further scale linearly by running the clients from larger worker instances or adding more worker hosts to run clients from.

Batch, 100 vectors per request:

In order to show that RonDB scales to many more key lookups per second, we run another benchmark in which each client requests feature vectors in batches of 100. As we can see the number of lookups still scales linearly, lookup throughput increases by 15x, while the latency per request increases only moderately.

Conclusion

Hopsworks comes with managed RonDB that provides a unified metadata store for both Hopsworks and the online feature. In this blog, we showed that a highly available two-node RonDB cluster (r5.2xlarge VMs) scales linearly to >250k ops/sec with feature vector lookups of 11 features of ~1KB in size and 7.5 ms p99 latency. Thus, Hopsworks provides the highest performance online feature store on the market today. 

Designing a Thread Pipeline for optimal database throughput with high IPC and low CPU cache misses

RonDB
>
6/15/2021
Mikael Ronström
>
Mikael Ronström

In our previous blog, we discussed automatic thread configuration in RonDB. In this blog we performed a set of microbenchmarks. In particular, we compare RonDB with ScyllaDB for instruction cache on separating threads. Then, we discuss tossing data between threads and latency in thread pipelines.

Batch Pipeline vs Thread Pipeline

What we understand from the ScyllaDB description is that it is implemented as an asynchronous engine. ScyllaDB specifically mentioned that they didn’t want to introduce a Thread Pipeline. Instead, they seem to have introduced a Batch Pipeline where they execute one task type at a time. This clearly improves the use of the instruction cache.

The Batch Pipeline approach will have best latency when the system is at low load. When the system is at high load the batch size increases and the latency increases. With RonDB and its Thread Pipeline, the latency decreases as the load increases since the likelihood of the thread being awake is higher. Thus RonDB at high load acts as a set of CPUs that interact directly with small messages sent to instruct the receiver of what he is supposed to do. Thus at high load the overhead of switching to a new thread is negligible, there is a small extra cost to handle extra data for the messages, but the messages are small and thus this overhead is small compared to the extra latency introduced by having to wait for another batch to execute before my turn to execute comes.

Actually the Batch Pipeline model resembles the first thread model of NDB Cluster where everything was executed in a single thread. This thread received a batch of messages and executed each of those in FIFO order, the messages sent asynchronous messages to the next part of the code. This actually had exactly the same effect as seen in the Batch Pipeline model since all messages followed the same code path. Thus if 100 messages were received we first executed 100 messages in the transaction block and then 100 messages in the database blocks.

ScyllaDB also handles complex query processing in the same engine. RonDB does all complex query processing in MySQL in a separate program. This enables RonDB to provide a very efficient API in C++, Java and in JavaScript which translates database requests directly into the efficient NDB protocol.

The RonDB model uses a normal FIFO scheduler in each thread and threads only execute a part of the functionality and the database part only executes queries on parts of the database. Thus we achieve both the benefits from the batch processing of similar messages in a pipeline and the division of work into different CPUs.

Microbenchmarking RonDB

RonDB has the ability to execute a benchmark exactly in the fashion as the Batch Pipeline model. This happens when the RonDB data node executes using a single thread. Thus we can use RonDB to analyse how Batch Pipelines compare to Thread Pipelines in a microbenchmark.

In this microbenchmark we run a single RonDB data node without any replication, we run one benchmark program called flexAsynch. This benchmark program sends batches of Inserts, Updates, Reads and Deletes and measures the achieved throughput.

When a read is processed it will be executed in 4 steps, the first step is to receive the network message, the second step handles the transaction processing which ensures that we find the data read and write the correct parts of the data partitions. The third step does the actual read/write of the data, the fourth step sends the data back to the benchmark program.

In RonDB using the single-threaded setup the receive part will receive a batch of operations, each of those will be inserted to the job queue in FIFO order.

After executing the receive we will execute those messages that will all execute the transaction part. Each of those transaction part messages will generate a new asynchronous message to be executed by the data owning part. Thus although they are located in the same thread we will still execute all those messages in batches that will all execute the same code and thus achieving the efficiency of the instruction cache in the CPU.

Next all messages will be executed in the data owning part and each of those messages will put something into the send queue which will be the last step executed before we return to the next set of messages in the receive part. Thus in the single threaded configuration of RonDB we get exactly the behaviour of a Batch Pipeline.

The next step is to separate the receive part from the transaction execution part and data owning part. Thus in this case we will have a thread pipeline consisting of two threads.

The third step is to split the thread pipeline into 3 parts. The receive part, the transaction handling part and the data owning part.

The fourth step finally separates the sending part as well into a separate send thread.

All of these steps are easily achieved in RonDB by using a configuration parameter called ThreadConfig.

The expected result here is that the single-threaded variant will get the optimal throughput since it divides the execution based on batches and thus minimizes the instruction cache misses. It is expected that the Thread Pipeline will have the same effect, but spread the work to more than one CPU. In our threaded pipeline we can specify the size of the batches executed before we flush all messages to other threads and before we send them back to the NDB API program.

The results showed that the first step could handle around 750k operations per second. Splitting the receive thread from the other parts improved throughput to around 835k operations per second. The result shows that a fairly small part of the processing happens in the receive stage. The third step to divide the transaction processing step from the data owning part gave a much bigger impact. Here performance increased to 1.5M operations per second.

The fourth step was implemented by specifically disallowing the data owning part and the transaction processing part to not perform any sends. Thus all sending had to happen from a separate fourth thread. This step had very little impact, thus our adaptive send approach where send happens locally if the thread isn’t overloaded and otherwise it is performed by a less loaded thread is a good approach. This means that in RonDB there will be send threads, but for the most part the sending will be done by execution threads that are not so busy. Thus we can use send load to ensure that the CPUs are used in a balanced manner.

What we derive from those numbers is that a Thread Pipeline and a Batch Pipeline has equal efficiency. However the Thread Pipeline provides a lot of benefits. For receive threads it means that the application doesn’t have to find the thread where data resides. This simplifies the NDB API greatly.  We have made experiments where the NDB API actually had this possibility and where the transaction processing and data owning part was colocated in the same thread. This had a small improvement of latency at low loads, but at higher loads the thread pipeline was superior in both throughput and latency and thus this idea was discarded.

The Thread Pipeline also improves scalability. As shown here we can have a single thread handling the data owning part and scale this to twice as many operations per second without having to split the data partitions.

We also did an experiment where I used 2 receive threads, 2 transaction processing threads and 2 data owning parts and this scaled nicely to 3M operations per second. What we have noticed is that in larger thread setups it is important that we have sufficient CPU capacity in the receive threads and the transaction processing parts to ensure that the data owning parts execute in an optimal manner. Using 1 receive thread in this setup provides almost the same results, but this architecture gains from having receive threads that can respond quickly such that they can keep the Thread Pipelines busy.

In RonDB there is a configuration parameter called SchedulerResponsiveness. This specifies whether to optimise for latency or for throughput. By increasing the Scheduler Responsiveness, we decrease the batch execution sizes before we flush messages to other threads and other nodes.

Thread Pipelining makes data partitioning transparent to APIs

From a performance perspective the Thread Pipeline is equally efficient to a Batch Pipeline. However the thread pipeline gives greater flexibility to the architecture. This flexibility is used in RonDB to ensure that many database connections share the same TCP/IP socket. This gives great batching improvements.

Since receive threads execute on behalf of all threads in the data nodes, the API doesn’t have to route the messages to a specific receive thread. Thus normal round robin can be used to spread load if there are several connections from the API to the RonDB data node.

Thread Pipeline decreases the number of data partitions

What we showed here is that the Thread Pipeline enables us to reach the same performance with 2 database threads compared to the 4 that would have been needed if all functionality would be gathered in a single thread. This means that we can handle the same load with half as many data partitions. Decreasing the number of partitions enables us to more efficiently range scans that don’t use the partition key. We also introduced query threads that ensure that one data partition can be read from multiple query threads, thus even decreasing the need to split data into many partitions. Thus RonDB achieves a nice balance between splitting data partitions for higher write throughput and higher parallelism in range scans with efficiency in performing the range scans.

Thread Pipeline enables Routing of Messages

As mentioned above the Thread Pipeline enables us to route messages from the transaction part to the data owning part. But in RonDB we also have query threads that can read data from several data owning parts. This provides the possibility also for routing based on load in the various threads. Thus thread pipelining not only provides a very efficient approach to executing database requests, it also enables us to build a highly scalable data node that can process many millions of concurrent requests with internal load regulation.

Thread Pipeline versus Batch Pipeline

So based on this analysis we can conclude that the Thread Pipeline and the Batch Pipeline both provide very similar benefits in CPU efficiency. RonDB has seen IPCs of 1.27 in the data owning parts and ScyllaDB reports IPC of 1.31 in their Batch Pipeline. However the Thread Pipeline at the same time delivers many advantages from a software architecture perspective. It enables transparency of data partitioning, it enables us to decrease the amount of data partitioning required to achieve the performance objectives and it enables us to build very scalable data nodes. RonDB data nodes have shown linear scaling to at least 32 CPUs and it continues to scale to many more CPUs after that and the software is prepared to handle up to 1024 CPUs.

Tossing data between threads

The only data transported between threads is the protocol messages. The actual data in the database is handled by the data owning thread. There is one part where the passing of data between threads matters, and this is for sending the message. Sending network messages has a significant overhead in a distributed database and can also cause extra CPU cache misses. RonDB has an additional design requirement that requires collecting messages from several threads to be sent on one socket. Thus sending network messages is a bit special in RonDB. However often we can avoid the extra cost of involving another thread for sending by sending when we have completed a batch of processing.

There is also an adaptive process that moves the CPU processing of sends to the threads with lowest load. Thus in a highly loaded data node we will avoid the sending in overloaded threads to ensure that we can process all the requests coming in.

In addition RonDB uses special handling of large messages, where the large part of the message is transported in a special section. This section is only read by the receiving thread (usually the data owning thread), thus there is no extra cost in passing this part through other threads (usually the transaction processing thread). When received by RonDB from the API this is a simple interpreted program. Messages sent back to the API are sent directly from the data owning part to the API, this message doesn't pass through any Thread Pipeline. Only messages that are part of transaction handling are sent back to the transaction processing part.

Latency in the Thread Pipeline

A question that one could ask is whether the latency is negatively affected by the thread pipeline. In ScyllaDB they report that the mean execution time for a request is 282 microseconds. This is an eternity when compared to the latency overhead of a thread pipeline. Even if all threads are sleeping when the request arrives, the wakeup latency of a thread is around 5-10 microseconds. So in the absolute worst case there is an added latency of around 15-25 microseconds to execute this thread pipeline.

However in reality the latency at 0% usage is seldom interesting. The latency of interest is the latency that you get when the load starts to reach the highest load. In this case the extra latency that the thread pipeline comes close to zero since most of the threads are awake in this situation.

The total execution time of a key-value lookup in RonDB data nodes is around 2-3 microseconds and writes around 5 microseconds. The thread pipeline means that each thread can execute at maximum speed all the time. An expected latency in a loaded RonDB data node would be on the order of 50 microseconds and thus the overhead of wakeup latencies in this case will be no more than 10% added latency. Compared to the advantages that the Thread Pipeline provides, it is a good idea.

Actually this behaviour is of great use in that RonDB becomes more efficient as load increases. Thus if RonDB is overloaded it will become more efficient since the threads will continuously be busy executing the messages and batch effects will increase as the load increases.

Efficiency of the Thread Pipeline

The real performance in a database is not determined by the number of instructions per second. The performance is determined by the number of instructions to execute for an operation divided by the number of instructions executed per cycle. RonDB reaches an IPC of the most critical database code of 1.27. Many other DBMSs have reported numbers in the order of 0.25-0.5. Thus it is clear that the separation of code into a thread pipeline works well. At the same time we have used many other tricks in RonDB to achieve the desired performance.

One such trick is how we designed our protocols. Most internet protocols are heavy CPU consumers. I had a master thesis student design a generic protocol engine that was intended to be used in NDB. This was probably the most important master thesis project I led. It showed exactly how to NOT do it. When the project was ready and I saw the resulting code, I realised that there is no way to get that code to become efficient. Thus NDB protocols are based on messages where the message data are residing in fixed positions. Thus no protocol processing is required, only copying from the message to data structures used by internal operations.

The code to process those messages can have a lot of instructions, but it is code without any logic, basically just processing fixed data move instructions for the most part. Thus this code is extremely efficient on modern CPUs. In addition the code in RonDB provides hints to the C++ compiler which branches are the normal path through the code. This is important since RonDB is implementing failfast, thus the code is filled with extra assert statements (implemented using a macro called ndbrequire) that will crash the data node immediately if any data is not consistent. All these asserts are also executed in production code to ensure that we don’t continue executing in an incorrect program state. In addition we also constantly generate crash information in the code to be able to understand exactly what happens if there is a failure.

Thus with all the talk about low latency and high throughput, it is important to understand that the highest priority in RonDB is to achieve the highest possible availability.

Instruction Cache

So will our approach decrease the amount of CPU instructions? The answer is no, there will be extra instructions to process the protocol between threads and at times there will be extra CPU instructions to handle context switches. So how can we gain from executing more CPU instructions?

The answer to this question is a variant of Divide and Conquer. By separating the code into multiple processors each processor will have less code to execute although the total number of instructions and the total amount of code increases. So e.g. if we start out with 10.000 instructions using 40 kB of code and split this into two functional units we will handle e.g. 5.500 instructions and 25 kB of code. Thus since each thread uses its own CPU we actually can increase the instruction speed significantly since our application will fit nicely into a 32kB instruction cache and this will improve performance more than the 10% extra instruction costs.

Thread Pipeline

The thread pipeline always starts with the receive thread. This thread will receive the request from the network. By providing this as a separate thread we provide the ability to colocate the operating system network processing with the processing by the receive thread itself. Linux provides the ability to handle the network processing in the same CPU as the recv call is made in. This enables very efficient handling of the receive part of the network processing.

There is another reason why the receive part is required to be a separate thread in RonDB. This is because one socket can be used to carry thousands of simultaneous connections to the data node. Thus the receive part will break the received messages into smaller messages and these smaller messages will be transported on memory channels to the appropriate thread. Thus one TCP/IP receive that receives say 32 kByte of data can easily be broken into 300 separate messages to be executed by different threads.

You can think about this thread pipeline as an example of a programmer using an assembly line. By dividing the tasks into smaller tasks we decrease the size of the program each task will have to execute. The total program grows a little bit, but since each thread will only see the part it is responsible for it will have very good code locality.

If you think of CPUs as workers in the assembly line, it is also important to not switch workers. To avoid that RonDB uses CPU locking to avoid that the OS all of a sudden decides to move the execution to another CPU. The cost of rebuilding the CPU caches in a new CPU can take quite a few microseconds. In our case even most context switches avoid this cost since most of the time there are no other tasks waiting to be executed on this CPU since we have separated the heavy CPU consumers into different CPUs. Thus even after returning from a context switch we will have fairly hot CPU caches.

The thread pipeline also splits the data cache usage. The LDM threads that handle database operations have no data stored on the global transaction state and have no knowledge about the global transaction state.

Thus the thread pipeline provides both an improved use of the instruction CPU cache as well as the CPU data caches.

Conclusion

From the micro benchmarks we conclude that splitting up the execution into functional units in a thread pipeline can provide significant benefits. It is all about using the CPU caches, both data and instruction in the most efficient manner. However, one needs to find ways to use the free CPU time that can happen if load is unbalanced. This is currently an active development in RonDB that will bring even greater benefits to RonDB.

Try RonDB

If you would like  to replicate our benchmarks, there are three ways to use RonDB:

  • Managed version available on the Enterprise Hopsworks platform. The RonDB cluster is integrated with Hopsworks and can be used for both RonDB applications as well as for Hopsworks applications. 
  • Open source automated installation. Use a script that automates the creation of VMs and the installation of the software components required by RonDB. These scripts are available to create RonDB clusters on Azure and GCP.
  • Binary tarball installation. Download the RonDB binary tarball and install it on any computers of your own liking.