/ Blog

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Sysbench Evaluation of RonDB

>
RonDB
>
6/11/2021
>
Mikael Ronström
Sysbench Evaluation of RonDB

Sysbench is a tool to benchmark and test open source databases. We have integrated Sysbench into the RonDB installation. This makes it extremely easy to run benchmarks with RonDB. This blog will describe the use of these benchmarks in RonDB. These benchmarks were executed with 1 cluster connection per MySQL Server. This limited the scalability per MySQL Server to about 12 VCPUs. Since we executed those benchmarks we have increased the number of cluster connections per MySQL Server to 4 providing scalability to at least 32 VCPUs per MySQL Server.

Introduction

As preparation to run those benchmarks we have created a RonDB cluster using the Hopsworks framework that is currently used to create RonDB clusters. In these tests all MySQL Servers are using the c5.4xlarge VM instances in AWS (16 VCPUs with 32 GB memory). We have a RonDB management server using the t3a.medium VM instance type. We have tested using two different RonDB clusters, both have 2 data nodes. The first test is using the r5.4xlarge instance type (16 VCPUs and 128 GB memory) and the second test uses the r5n.8xlarge (32 VCPUs and 256 GB memory). It was necessary to use r5n class since we needed more than 10 Gbit/second network bandwidth for the OLTP RW test with 32 VCPUs on data nodes.

In the RonDB documentation you can find more details on how to set up your own RonDB cluster in either our managed version (currently supporting AWS) or using our open source shell scripts to set up a cluster (currently supporting GCP and Azure).

The graph below shows the throughput results from the larger data nodes using 8-12 MySQL Server VMs. Now in order to make sense of these numbers we will explain a bit more about Sysbench and how you can tweak Sysbench to serve your purposes for testing RonDB.

Description of Sysbench OLTP RW

The Sysbench OLTP RW benchmark consists of 20 SQL queries. There is a transaction, this means that the transaction starts with a BEGIN statement and it ends with a COMMIT statement. After the BEGIN statement follows 10 SELECT statements that selects one row using the primary key of the table. Next follows 4 SELECT queries that select 100 rows within a range and either uses SELECT DISTINCT, SELECT … ORDER BY, SELECT or SELECT sum(..). Finally there is one INSERT, one DELETE and 2 UPDATE queries.

In Pseudo code thus:

BEGIN

Repeat 10 times: SELECT col(s) from TAB where PK=pk

SELECT col(s) from TAB where key >= start AND key < (start + 100)

SELECT DISTINCT col(s) from TAB where key >= start AND key < (start + 100)

SELECT col(s) from TAB where key >= start AND key < (start + 100) ORDER BY key

SELECT SUM(col) from TAB where key >= start AND key < (start + 100)

INSERT INTO TAB values (....)

UPDATE TAB SET col=val WHERE PK=pk

UPDATE TAB SET col=val WHERE key=key

DELETE FROM TAB WHERE PK=pk

COMMIT 

This is the standard OLTP RW benchmark.

Benchmark Execution

Now I will describe some changes that the Sysbench installation in RonDB can handle. To understand this we will start by showing the default configuration file for Sysbench.

#

# Software definition

#

MYSQL_BIN_INSTALL_DIR="/srv/hops/mysql"

BENCHMARK_TO_RUN="sysbench"

#

# Storage definition (empty here)

#

#

# MySQL Server definition

#

SERVER_HOST="172.31.23.248;172.31.31.222"

MYSQL_PASSWORD='3*=13*8@20.*0@7$?=45'

#

# NDB node definitions (empty here)

#

#

# Benchmark definition

#

SYSBENCH_TEST="oltp_rw"

SYSBENCH_INSTANCES="2"

THREAD_COUNTS_TO_RUN="1;2;4;8;12;16;24;32;48;64;96;112;128"

MAX_TIME="30"

In this configuration file we provide the pointer to the RonDB binaries, we provide the type of benchmark we want to execute, we provide the password to the MySQL Servers, we provide the number of threads to execute in each step of the benchmark. There is also a list of IP addresses to the MySQL Servers in the cluster and finally we provide the number of instances of Sysbench we want to execute.

This configuration file is created automatically by the managed version of RonDB. This configuration file is available in the API nodes you created when you created the cluster. They are also available in the MySQL Server VMs if you want to test running with a single MySQL Server colocated with the application.

The default setup will run the standard Sysbench OLTP RW benchmark with one sysbench instance per MySQL Server. To execute this benchmark the following steps are done:

Step 1

Log in to the API node VM where you want to run the benchmark from. The username is ubuntu (in AWS). Thus log in using e.g. ssh ubuntu@IP_address. The IP address is the external IP address that you will find in AWS where your VM instances are listed.

Step 2

After successfully being logged in you need to log into the mysql user using the command:

sudo su - mysql

Step 3

Move to the right directory

cd benchmarks

Step 4

Execute the benchmark

bench_run.sh --default-directory /home/mysql/benchmarks/sysbench_multi

Benchmark Results

As you will discover there is also a sysbench_single, dbt2_single, dbt2_multi directory. These are setup for different benchmarks that we will describe in future papers. sysbench_single is the same as sysbench_multi but with only a single MySQL Server. This will exist also on MySQL Server VMs if you want to benchmark from those. Executing a benchmark from the sysbench machine increases latency since it represents a 3-tiered setup whereas executing sysbench in the MySQL Server represents a 2-tiered setup and thus the latency is lower.

If you want to study the benchmark in real-time repeat Step 1, 2 and 3 above and then perform the following commands:

cd sysbench_multi/sysbench_results

tail -f oltp_rw_0_0.res

This will display the output from the first sysbench instance that will provide latency numbers and throughput of one of the sysbench instances.

When the benchmark has completed the total throughput is found in the file:

/home/mysql/benchmarks/sysbench_multi/final_results.txt

Modifying Sysbench benchmark

The configuration for sysbench_multi is found in:

/home/mysql/benchmarks/sysbench_multi/autobench.conf

Thus if you want to modify the benchmark you can edit this file.

In order to modify this benchmark, first you can decide on how many SELECT statements to retrieve using the primary key that should be issued. The default is 10. To change this add the following line in autobench.conf:

SB_POINT_SELECTS=”5”

This will change such that instead 5 primary key SELECTs will be issued for each transaction.

Next, you can decide that you want those primary key SELECTs to retrieve a batch of primary keys. In this case the SELECT will use IN (key1, key2,,, keyN) in the WHERE clause. To use this set the number of keys to retrieve per statement in SB_USE_IN_STATEMENT. Thus to set this to 100 add the following line to autobench.conf.

SB_USE_IN_STATEMENT=”100”

This means that if SB_POINT_SELECTS is set to 5 and SB_USE_IN_STATEMENT is set to 100 there will be 500 key lookups performed per Sysbench OLTP transaction.

Next, it is possible to set the number of range scan SELECTs to perform per transaction. So to e.g. disable all range scans we can add the following lines to autobench.conf.

SB_SIMPLE_RANGES=”0”

SB_ORDER_RANGES=”0”

SB_DISTINCT_RANGES=”0”

SB_SUM_RANGES=”0”

Now, it is also possible to modify the range scans. I mentioned that the range scans retrieves 100 rows. The number 100 is changeable through the configuration parameter SB_RANGE_SIZE.

The default behaviour is to retrieve all 100 rows and send them back to the application. Thus no filtering. We also have an option to perform filtering in those range scans. In this case only 1 row will be returned, but we will still scan the number of rows specified in SB_RANGE_SIZE. This feature of Sysbench is activated through adding the following line to autobench.conf:

SB_USE_FILTER=”yes”

Finally it is possible to remove the use of INSERT, DELETE and UPDATEs. This is done by changing the configuration parameter SYSBENCH_TEST from oltp_rw to oltp_ro.

There are many more ways to change the configuration of how to run Sysbench, but these settings are enough for this paper. For more details see the documentation of dbt2-0.37.50, also see the Sysbench tree

Benchmark Configurations

In our benchmarking reported in this paper we used 2 different configurations. Later, we will report more variants of Sysbench testing as well as other benchmark variants.

The first is the standard Sysbench OLTP RW configuration. The second is the standard benchmark but adding SB_USE_FILTER=”yes”. This was added since the standard benchmark becomes limited by the network bandwidth using r5.8xlarge instances for the data node. This instance type is limited to 10G Ethernet and it needs almost 20 Gb/s in networking capacity with the performance that RonDB delivers. This bandwidth is achievable using the r5n instances.

Each test of Sysbench creates the tables and fills them with data. To have a reasonable execution time of the benchmark each table will be filled with 1M rows. Each sysbench instance will use its own table. It is possible to set the number of rows per table, it is also possible to use multiple tables per sysbench instance. Here we have used the default settings.

The test runs are executed for a fairly short time to be able to test a large variety of test cases. This means that it is expected that results are a bit better than expected. To see how results are affected by running for a long time we also ran a few select tests where we ran a single benchmark for more than 1 hour. The results are in this case around 10% lower than the numbers of shorter runs. This is mainly due to variance of the throughput that is introduced by the execution of checkpoints in RonDB. Checkpoints consume around 5-10% of the CPU capacity in the RonDB data nodes.

Benchmark setup

In all tests set up here we have started the RonDB cluster using the Hopsworks infrastructure. In all tests we have used c5.4xlarge as the VM instance type for MySQL Servers. This VM has 16 VCPUs and 32 GB of memory. This means a VM with more or less 8 Intel Xeon CPU cores. In all tests there are 2 RonDB data nodes, we have tested with 2 types of VM instances here, the first is the r5.4xlarge which has 16 VCPUs with 128 GB of memory. The second is the r5n.8xlarge which has 32 VCPUs and 256 GB of memory. In the Standard Sysbench OLTP RW test the network became a bottleneck when using r5.8xlarge. These VMs can use up to 10 Gb/sec, but in reality we could see that some instances could not go beyond 7 Gb/sec, when switching to r5n.8xlarge instead this jumped up to 13Gb/sec immediately, so clearly this bottleneck was due to the AWS infrastructure.

To ensure that the network bottleneck was removed we switched to using r5n.8xlarge instances instead for those benchmarks. These instances are the same as r5.8xlarge except that they can use up to 25 Gb/sec in network bandwidth instead of 10Gb/sec.

Standard Sysbench OLTP RW

The first test we present here is the standard OLTP RW benchmark. When we run this benchmark most of the CPU consumption happens in the MySQL Servers. Each MySQL Server is capable of processing about 4000 TPS for this benchmark. The MySQL Server can process a bit more if the responsiveness of the data node is better, this is likely to be caused by the CPU caches being hotter in that case when the response comes back to the MySQL Server. Two 16 VCPU data nodes can in this case handle the load from 4 MySQL Servers, adding a 5th can increase the performance slightly, but not much. We compared this to 2 data nodes using 32 VCPUs and in principle the load these data nodes could handle was doubled.

The response time was very similar in both cases, at extreme loads the larger data nodes had more latency increases, most likely due to the fact that we got much closer to the limits of what the network could handle.

The top number here was 34870 at 64 threads from 10 MySQL Servers. In this case 95% of the transactions had a latency that was less than 19.7 ms, this means that the time for each SQL query was below 1 millisecond. This meant that almost 700k SQL queries per second were executed. These queries reported back to the application 14.5M rows per second for the larger data nodes, most of them coming from the 4 range scan queries in the benchmark. Each of those rows are a bit larger than 100 bytes, thus around 2 GByte per second of application data is transferred to the application (about 25% of this is aggregated in the MySQL when using the SUM range scan).

Sysbench OLTP RW with filtering of scans

Given that Sysbench OLTP RW is to a great extent a networking test we also wanted to perform a test that performed a bit more processing, but reporting back a smaller amount of rows. We achieved this by setting SB_USE_FILTER=”yes” in the benchmark configuration file. This means that instead of each range scan SELECT reporting back 100 rows, it will read 100 rows and filter out 99 of them and report only 1 of the 100 rows. This will decrease the amount of rows to process down to about 1M rows per second. Thus this test is a better evaluator of the CPU efficiency of RonDB whereas the standard Sysbench OLTP RW is a good evaluator of RonDBs ability to ship tons of rows between the application and the database engine.


At first, we wanted to see the effect the number of MySQL servers had on the throughput in this benchmark. We see the results of this in the image above. We see that there is an increase in throughput going from 8 to 12 MySQL Servers. However the additional effect of each added MySQL Server is diminishing. There is very little to gain going beyond 10 MySQL Servers. The optimal use of computing resources is most likely achieved around 8-9 MySQL Servers.

Adding additional MySQL servers also has an impact on the variability of the latency. So probably the overall best fit here is to use about 2x more CPU resources on the MySQL Servers compared to the CPU resources in the RonDB data nodes. This rule is based on this benchmark and isn’t necessarily true for another use case.

The results with the smaller data nodes, r5.4xlarge is the red line that used 5 MySQL Servers in the test.

The rule definitely changes when using the key-value store APIs that RonDB provides. These are at least 100% more efficient compared to using SQL.

A key-value store needs to be a LATS database (low Latency, high Availability, high Throughput, Scalable storage). In this paper we have focused on showing Throughput and Latency. Above is the graph showing how latency is affected by the number of threads in the MySQL Server.

Many applications have strict requirements on the maximum latency of transactions. So for example if the application requires response time to be smaller than 20 ms than we can see in the graph that we can use around 60 threads towards each MySQL Server. At this number of threads r5.4xlarge delivers 22500 TPS (450k QPS) and r5n.8xlarge delivers twice that number, 45000 TPS (900k QPS).

The base latency in an unloaded cluster is a bit below 6 milliseconds. This number is a bit variable based on where exactly the VMs are located that gets started for you. Most of this latency is spent in latency on the networks.  Each network jump in AWS has been reported to be around 40-50 microseconds and one transaction performs around 100 of those network jumps in sequence. Thus almost two-thirds of the base latency comes from the latency in getting messages across. At higher loads the queueing waiting for the message to be executed becomes dominating. Benchmarks where everything executes on a single computer has base latency around 2 millisecond per Sysbench transaction which confirms the above calculations.

RonDB: The World's Fastest Key-value Store is now in the Cloud.

>
2/24/2020
Mikael Ronström
>
Mikael Ronström
Jim Dowling

We are pleased to introduce RonDB, the world’s fastest key-value store with SQL capabilities, available now in the cloud. RonDB is an open source distribution of NDB Cluster, thus providing the same core technology and performance as NDB, but as a managed platform in the cloud. RonDB is the best low-latency, high throughput, and high availability database available today. In addition, it also brings large data storage capabilities. RonDB is a critical component of the infrastructure needed to build feature stores, and, in general, real-time applications.

Introducing RonDB

RonDB is an open source distribution of NDB Cluster which has been used as a key-value store for applications for almost 20 years. The name RonDB is a synthesis of its inventor, Mikael Ronström, and its heritage NDB. RonDB is offered as a managed version of NDB Cluster, providing the fastest solution targeting the traditional applications that use NDB Cluster such as those used in telecom, finance, and gaming. RonDB provides a solution as the data layer for operational AI applications. RonDB gives online models access to reliable, low-latency, high-throughput storage for precomputed features.  

RonDB is ideal for developers who require a real-time database that can (1) be quickly deployed  (RonDB provides a possibility to set up an RonDB cluster in less than a minute), (2) handle millions of database operations per second, and (3) be elastically scaled up and down to match the current load, reducing operational costs. 

What is a key-value store?

A key-value store associates a value with a unique identifier (a key) to enable easy storage, retrieval, searching, and updating of data using simple queries - making them ideal for organisations with large volumes of data that needs to be constantly retrieved and updated.

Distributed key-value stores, such as RonDB, are optimal for organisations with heavy read/write workloads on large data sets, thus being a good fit for businesses operating at scale. For example, they are a critical component of the infrastructure needed to build feature stores and many different types  of interactive and real-time applications.

The value of RonDB

Key-value store applications

RonDB can be used to develop interactive online applications where storing, retrieving and updating data is essential, but there is also the possibility to search for data in the same system. RonDB provides efficient access using a partition key through either native APIs or SQL access. 

RonDB supports distributed transactions enabling applications to perform any type of data modification. RonDB enables concurrent complex queries using SQL as part of the online application. Through its efficient partitioning scheme, RonDB can scale nodes to support hundreds of CPUs per node and then scale to hundreds of data nodes (tens of thousands of CPUs in total).

SQL applications

RonDB supports writing low latency, high availability SQL applications using its full SQL capabilities, providing a MySQL Server API.

LATS: low Latency, high Availability, high Throughput, and scalable Storage

RonDB is the fastest key-value store currently available and the only one with general purpose query capabilities, through its SQL support. The performance of key-value stores are evaluated based on low Latency, high Availability, high Throughput, and scalable Storage (LATS). 

LATS: low Latency, high Availability, high Throughput, scalable Storage.

Low Latency

RonDB has shown the fastest response times: it can respond in 100-200 microseconds on individual requests and in less than a millisecond on batched read requests; it can also perform complex transactions in a highly loaded cluster within 10 milliseconds. Moreover, and perhaps even more importantly, RonDB has predictable latency.

High Availability

NDB Cluster can provide Class 6 Availability; in other words, an NDB system is operational 99.9999% of the time, thus no more than 30 seconds of downtime per year. RonDB brings the same type of availability now to the cloud, thus ensuring that RonDB is always available. RonDB allows you to place replicas within a cloud region on different availability zones, ensuring that failure of availability zones does not result in downtime for RonDB.

High Throughput

RonDB scales to up to hundreds of millions of read or write ops/seconds. A single RonDB instance on a virtual machine can support millions of key value operations per second,  and, without downtime, can be scaled up  to a large cluster that supports hundreds of millions of key value operations per second.

Scalable Storage

RonDB offers large dataset capabilities, being able to store up to petabytes of data.Individual RonDB data nodes can store up to 16TBytes of in-memory data and many tens of TBytes of disk data. Given that RonDB clusters can scale up to 144 data nodes, an RonDB cluster can be scaled to store petabytes of data.

LATS Performance

The combination of predictable low latency, the ability to handle millions of operations per second, and a unique high availability solution makes RonDB the leading LATS database, that is now also available in the cloud.

RonDB as online feature stores

RonDB is the foundation of the Hopsworks Online Feature Store. For those unfamiliar with the concept of feature store, it is a data warehouse of features for machine learning. It contains two databases: an online database serving features at low latency to online applications and an offline database storing large volumes of historic feature values. 

The online feature store provides the data (features) that underlie the models used in intelligent online applications. For example, it helps inform an intelligent decision about whether a financial transaction is suspected of money laundering or not by providing the real-time information about the user’s credit history, recent card usage, location of the transaction, and so on. Thus, the most important requirements from the feature store are quick and consistent responses (low latency), handling large volumes of key lookups (high throughput), and being always available (high availability). In addition an online feature store should be able to scale as the business grows. RonDB is the only database available in the market that meets all these requirements.

Get Started

We’re not releasing RonDB publicly just yet. Instead, we have made it available as early access. If you think your team could benefit from RonDB, please contact us!

How to transform Amazon Redshift data into features with Hopsworks Feature Store

>
2/9/2021
Ermias Gebremeskel
>
Ermias Gebremeskel
Fabio Buso

TLDR: Hopsworks Feature Store provides an open ecosystem that connects to the largest number of data storage, data pipelines, and data science platforms. You can connect the Feature Store to Amazon Redshift to transform your data into features to train models and make predictions.

Introduction

Amazon Redshift is a popular managed data warehouse on AWS. Companies use Redshift to store structured data for traditional data analytics. This makes Redshift a popular source of raw data for computing features for training machine learning models.

The Hopsworks Feature Store is the leading open-source feature store for machine learning. It is provided as a managed service on AWS and it includes all the tools needed to transform data in Redshift into features that will be used when training and serving ML models.

In this blog post, we show how you can configure the Redshift storage connector to ingest data, transform that data into features (feature engineering) and save the pre-computed features in the Hopsworks Feature Store.

Prerequisites

To follow this tutorial users should have an Hopsworks Feature Store instance running on https://hopsworks.ai. You can register for free with no credit-card and receive 4000 USD of credits to get started.

Users should also have an existing Redshift cluster. If you don't have an existing cluster, you can create one by following the AWS Documentation 

Step 1 - Configure the Redshift Storage Connector

The first step to be able to ingest Redshift data into the feature store is to configure a storage connector. The Redshift connector requires you to specify the following properties. Most of them are available in the properties area of your cluster in the Redshift UI.

  • Cluster identifier: The name of the cluster
  • Database driver: You can use the default JDBC Redshift Driver `com.amazon.redshift.jdbc42.Driver` (More on this later)
  • Database endpoint: The endpoint for the database. Should be in the format of `[UUID].eu-west-1.redshift.amazonaws.com`
  • Database name: The name of the database to query
  • Database port: The port of the cluster. Defaults to 5349

There are two options available for authenticating with the Redshift cluster. The first option is to configure a username and a password. The password is stored in the secret store and made available to all the members of the project.

The second option is to configure an IAM role. With IAM roles, Jobs or notebooks launched on Hopsworks  do not need to explicitly authenticate with Redshift, as the HSFS library will transparently use the IAM role to acquire a temporary credential to authenticate the specified user. 

In Hopsworks, there are two different ways to configure an IAM role: a per-cluster IAM role or a federated IAM role (role chaining). For the per-cluster IAM role, you select an instance profile for your Hopsworks cluster when launching it in hopsworks.ai, and all jobs or notebooks will be run with the selected IAM role.  For the federated IAM role, you create a head IAM role for the cluster that enables Hopsworks to assume a potentially different IAM role in each project. You can even restrict it so that only certain roles within a project (like a data owner) can assume a given role. 

With regards to the database driver, the library to interact with Redshift *is not* included in Hopsworks - you need to upload the driver yourself. First, you need to download the library from here. Select the driver version without the AWS SDK. You then upload the driver files to the “Resources” dataset in your project, see the screenshot below.

Upload the Redshift Driver (jar file) from your local machine to the Resources dataset.

Then, you add the file to your notebook or job before launching it, as shown in the screenshots below.

Before starting the JupyterLab server, add the Redshift driver jar file, so that it becomes available to jobs run in the notebook.

Step 2- Define an external (on-demand) Feature Group

Hopsworks supports the creation of (a) cached feature groups and (b) external (on-demand) feature groups. For cached feature groups, the features are stored in Hopsworks feature store. For external feature groups, only metadata for features is stored in the feature store - not the actual feature data which is read from the external database/object-store. When the external feature group is accessed from a Spark or Python job, the feature data is read on-demand using a connector from the external store. On AWS, Hopsworks supports the creation of external feature groups from a large number of data stores, including Redshift, RDS, Snowflake, S3, and any JDBC-enabled source. 

In this example, we will define an external feature group for a table in Redshift. External feature groups in Hopsworks support “provenance” in the Hopsworks Web UI, you can track which features are stored on which external systems and how they are computed. Additionally HSFS (the Python/Scala library used to interact with the feature store) provides the same APIs for external feature groups as for cached feature groups.

An external (on-demand) feature group can be defined as follow:

-- CODE language-bash -- # We named the storage connector defined in step 1 telco_redshift_cluster redshift_conn = fs.get_storage_connector("telco_redshift_cluster") telco_on_dmd = fs.create_on_demand_feature_group(name="telco_redshift", version=1, query="select * from telco", description="On-demand feature group for telecom customer data", storage_connector=redshift_conn, statistics_config=True) telco_on_dmd.save()

When running `save()` the metadata is stored in the feature store and statistics for the data are computed and made available through the Hopsworks UI. Statistics helps data scientists in the quest of building better features from raw data.

Step 3 -  Engineer features and save to the Feature Store

On-demand feature groups can be used directly as a source for creating training datasets. This is often the case if a company is migrating to Hopsworks and there are already feature engineering pipelines in production writing data to Redshift.

This flexibility provided by Hopsworks allows users to hit the ground running from day 1, without having to rewrite their pipelines to take advantage of the benefits the Hopsworks feature store provides.

-- CODE language-bash -- telco_on_dmd\ .select(['customer_id', 'internet_service', 'phone_service', 'total_charges', 'churn'])\

On-demand feature groups can also be joined with cached feature groups in Hopsworks to create training datasets. This helper guide  explains in detail how the HSFS joining APIs work and how they can be used to create training datasets.

If, however, Redshift contains raw data that needs to be feature engineered, you can retrieve a Spark DataFrame backed by the Redshift table using the HSFS API.

-- CODE language-bash -- spark_df = telco_on_dmd.read()

You can then transform your `spark_df` into features using feature engineering libraries. It is also possible to retrieve the dataframe as a Pandas dataframe and perform the feature engineering steps in Python code. 

You can then save the final dataframe containing the engineered features to the feature store as a cached feature group:

-- CODE language-bash -- telco_fg = fs.create_feature_group(name="telco_customer_features", version=1, description="Telecom customer features", online_enabled=True, time_travel_format="HUDI", primary_key=["customer_id"], statistics_config=True) spark_df = telco_on_dmd.read()

Storing feature groups as cached feature groups within Hopsworks provides several benefits over on-demand feature groups. First it allows users to leverage Hudi for incremental ingestion (with ACID properties, ensuring the integrity of the feature group) and time travel capabilities. As new data is ingested, new commits are tracked by Hopsworks allowing users to see what has changed over time. On each commit, statistics are computed and tracked in Hopsworks, allowing users to understand how the data has changed over time.

Cached feature groups can also be stored in the online feature store (`online_enabled=True`), thus enabling low latency access to the features using the online feature store API.

Get started

Get started today on Hopsworks.ai by configuring your Redshift storage connector and run this notebook.

Elasticsearch is dead, long live Open Distro for Elasticsearch

>
1/14/2021
Mahmoud Ismail
>
Mahmoud Ismail
Alex Ormenisan
Jim Dowling

TLDR: The need for an open-source alternative to Elasticsearch has recently become more evident; platforms that bundle Open Distro for Elasticsearch are able to future-proof open-source support for free-text search and Elasticsearch. In this post, we describe how Hopsworks leverages the authentication and authorization support in Open Distro for Elasticsearch to make free text search a project-based multi-tenant service in Hopsworks. More concretely, Hopsworks now supports dynamic role-based access control (RBAC) to indexes in elasticsearch with no performance penalty by building on Open Distro for Elasticsearch (ODES).

Need Open-Source Elasticsearch?
Try Open Distro.

In January 2021, Elastic switched from the Apache V2 open-source license for both Elasticsearch and Kibana to a non open-source license to Server Side Public License (SSPL).

Hopsworks is an open-source platform that includes Open Distro for Elasticsearch (a fork of Elasticsearch) and Kibana.

In Hopsworks, we use Elasticsearch to provide free-text search for AI assets (features, models, experiments, datasets, etc). We also make Elasticsearch indexes available for use by programs run in Hopsworks. As we interpret it, the latter functionality means we contravene the licensing terms of the SSPL:

“If you make the functionality of the Program or a modified version available to third parties as a service... (license conditions apply)”

Luckily, we recently made the switch from Elasticsearch to Open Distro for Elasticsearch, supported by AWS, which is Apache v2 licensed.

Dynamic RBAC for Elasticsearch

Open Distro for Elasticsearch supports Active Directory and LDAP for authentication and authorization. Using the Security plugin, you can use RBAC to control the actions a user can perform. A role defines the cluster operations and index operations a user can perform, including access to indices, and even fine-grained field and document level access. RBAC allows an administrator to define a single security policy and apply it to all members of a department. But individuals may be members of multiple departments, so a user might be given multiple roles. With dynamic role-based access control you can change the set of roles a user can hold at a given time.

For example, if a user is a member of two departments - one for accessing banking data and another one for accessing trading data, with dynamic RBAC, you could restrict the user to only allow her to hold one of those roles at a given time. The policy for deciding which role the user holds could, for example, depend on what VPN (virtual private network) the user is logged in to or what building the user is present in. In effect, dynamic roles would allow the user to hold only one of the roles at a time and sandbox her inside one of the domains - banking or trading. It would prevent her from cross-linking or copying data between the different trading and banking domains.

Hopsworks implements a dynamic role-based access control model through its project-based multi-tenant security model.  Every Project has an owner with full read-write privileges and zero or more members.  A project owner may invite other users to his/her project as either a Data Scientist (read-only privileges and run jobs privileges) or Data Owner (full privileges). Users can be members of (or own) multiple Projects, but inside each project, each member (user) has a unique identity - we call it a project-user identity.  For example, user Alice in Project A is different from user Alice in Project B - (in fact, the system-wide (project-user) identities are ProjectA__Alice and ProjectB__Alice, respectively).

As such, each project-user identity is effectively a role with the project-level privileges to access data and run programs inside that project. If a user is a member of multiple projects, she has, in effect, multiple possible roles, but only one role can be active at a time when performing an action inside Hopsworks. When a user performs an action (for example, runs a program) it will be executed with the project-user identity. That is, the action will only have the privileges associated with that project. The figure below illustrates how Alice has a different identity for each of the two projects (A and B) that she is a member of. Each project contains its own separate private assets. Alice can use only one identity at a time which guarantees that she can’t access assets from both projects at the same time.

Hopsworks enables you to host sensitive data in a shared cluster using a project-based access control security model (an implementation of dynamic role-based access control). In Hopsworks, a project is a secure sandbox with members, data, code, and services. Similar to GitHub repositories, projects are self-service: users manage membership, roles, and can securely share data assets with other projects. This project-based multi-tenant security model enables users to host both sensitive and shared data in a single Hopsworks cluster - you do not need to manage and pay for separate clusters. 

An important aspect of project-based multi-tenancy is that assets can be shared between projects - sharing does not mean that data is duplicated. The current assets that can be shared between projects are: files/directories in HopsFS, Hive databases, feature stores, and Kafka topics. For example, in the figure below there are three users (User1, User2, and User3)  and two projects (A and B). User1 is a member of project A, while User2 and User3 are members of project B. All three users (User1, User2, User3) can access the assets shared between project A and project B. As sharing does not mean copying, the access control rules for the asset are updated to give users in the other project read or write permissions on the shared asset.


Project-user identity is primarily based on a X.509 certificate issued internally by Hopsworks. Access control policies, however, are implemented by the platform services (HopsFS, Hive, Feature Store, Kafka), and for Elasticsearch Open Distro, permissions are managed using an open-source Hopsworks project-based authorizer plugin.

Using Elastic Index from Spark in Hopsworks

The following PySpark code snippet, available as a notebook when you run the Spark Tour on Hopsworks, shows how to read from an index that is private to a project from PySpark. There is also an equivalent Scala/Spark notebook.

-- CODE language-bash -- from hops import elasticsearch, hdfs df = spark.read.option("header","true").csv("hdfs:///Projects/" + hdfs.project_name() + "/Resources/akc_breed_info.csv") # Write df to the project's private index called 'newindex' df.write.format('org.elasticsearch.spark.sql').options(**elasticsearch.get_elasticsearch_config("newindex")).mode("Overwrite").save() # Read from the project's private index called 'newindex' reader = spark.read.format("org.elasticsearch.spark.sql").options(**elasticsearch.get_elasticsearch_config("newindex")) df = reader.load().orderBy("breed") df.show()

Access Control using JWT and Hopsworks Project Membership

In Hopsworks, we use Public Key Infrastructure (PKI) with X.509 certificates to authenticate and authorize users. Every user and every service in a Hopsworks cluster has a private key and an X.509 certificate. Hopsworks projects also support multi-tenant services that are not currently backed by X.509 certificates, including Elasticsearch. Open Distro for Elasticsearch supports authentication and access control using JSON Web Tokens (JWT).  Similar to application X.509 certificates, Hopsworks’ resource manager (HopsYARN) issues a JWT for each submitted job and propagates it to running containers. Using the JWT, user code can then securely make calls to Elasticsearch indexes owned by the project. The JWT is rotated automatically before it expires and invalidated by HopsYARN once the application has finished.

For every Hopsworks project, a number of private indexes can be created in Elasticsearch: an index for real-time logs of applications in that project (accessible via Kibana), an index for ML experiments in the project, and an index for provenance for the project’s applications and file operations. Elastic indexes are private to the project - they are not accessible by users that are not members of the project. This access control is implemented as follows: when a request is made on Elasticsearch using a JWT token, our authorizer plugin extracts the project-specific username from the JWT token, which is of the form:

ProjectA__Alice 

The index names have the following form:

ProjectA__ElasticIndex 

Our plugin checks if a project-specific user is allowed to read/write an index by checking that the prefix (ProjectA) of both the user and the index match one another. We plan to add support for sharing elasticsearch indexes between projects by storing a list of projects allowed to perform read and write operations, respectively, on the indexes belonging to a project.

X.509 Service certificates

In Hopsworks, services communicate with each other using their own certificate to authenticate and encrypt all traffic. Each service in Hopsworks, that supports TLS encryption and/or authentication, has its own service-specific X.509 certificate, including all services in the ELK Stack (Elasticsearch, Kibana, and Logstash). Service certificates contain the Fully Qualified Domain Name (FQDN) of the host they are installed on and the login name of the system user that the process runs as. They are generated when a user provisions Hopsworks, and they have a long lifespan. Service certificates can be rotated automatically in configurable intervals or upon request of the administrator. 

Securely accessing Elastic Indexes in Jobs on Kubernetes 


Hopsworks can be integrated with Kubernetes by configuring it to use one of the available authentication mechanisms: API tokens, credentials, certificates, and IAM roles for AWS’ managed EKS offering. Hopsworks can run users’ jobs on Kubernetes that have project-specific security material,  X.509 certificates and JWTs, materialized to the launched Pods so user code can securely access services in Hopsworks, such as Open Distro for Elasticsearch. That is, Kubernetes jobs launched from within a project in Hopsworks are only allowed to access those Elasticsearch indexes that belong to that project.

Summary

In this post, we gave an overview of Hopsworks project-based multi-tenant security model and how we use Hopsworks projects and JWT tokens to make Open Distro for Elasticsearch a multi-tenant service.

Follow us on Twitter

Star us on Github


HopsFS: 100x Times Faster than AWS S3

>
11/19/2020
Mahmoud Ismail
>
Mahmoud Ismail
Gautier Berthou
Salman Niazi
Jim Dowling
Mikael Ronström

TLDR; Many developers believe S3 is the "end of file system history". It is impossible to build a file/object storage system on AWS that can compete with S3 on cost. But what if you could build on top of S3 a distributed file system with a HDFS API that gives you POSIX goodness and improved performance? That’s what we have done with a cloud-native release of HopsFS that is highly available across availability zones, has the same cost as S3, but has 100X the performance of S3 for file move/rename operations, and 3.4X the read throughput of S3 (EMRFS) for the DFSIO Benchmark (peer reviewed at ACM Middleware 2020).

HopsFS has lower latency and higher throughput than EMRFS (S3) for metadata operations (Middleware ‘20).

The Dumb Bucket

S3 has become the de-facto platform for storage in AWS due to its scalability, high availability, and low cost. However, S3 provides weaker guarantees and lower performance compared to distributed hierarchical file systems. Despite this, many developers erroneously believe that S3 is the end of file system history - there is no alternative to S3, so just re-write your applications to account for its limitations (such as slow and inconsistent file listings, non atomic file/dir rename, closed metadata, and limited change data capture (CDC) support). Azure has built an improved file system, Azure Data Lake Storage (ADLS) V2, on top of Azure Blob Storage (ABS) service. ADLS provides a HDFS API to access data stored in a ABS container, giving improved performance and POSIX-like goodness. But, until today, there has been no equivalent to ADLS for S3. Today, we are launching HopsFS as part of Hopsworks.

Hierarchical File Systems strike back in the Cloud

Hierarchical distributed file systems (like HDFS, CephFS, GlusterFS) were not scalable enough or highly available across availability zones in the cloud, motivating the move to S3 as the scalable storage service of choice. In addition to the technical challenges, AWS have priced virtual machine storage and inter-availability zone network traffic so high that no third party vendor could build a storage system that offers a per-byte storage cost close in price to S3. 

However, the move to S3 has not been without costs. Many applications need to be rewritten as the stronger POSIX-like behaviour of hierarchical file systems (atomic move/rename, consistent file listings, consistent read-after-writes) has been replaced by weakened guarantees in S3. Even simple tasks, such as finding out what files you have, cannot be easily done on S3 when you have enough files, so a new service was introduced to enable you to pay extra to get a stale listing of your files. Most analytical applications (e.g., on EMR) use EMRFS, instead of S3, which is a new metadata layer for S3 that provides slightly stronger guarantees than S3 - such as consistent file listings.

File systems are making the same Journey as Databases

The journey from a stronger POSIX-like file system to a weaker object storage paradigm and back again has parallels in the journey that databases have made in recent years. Databases made the transition from strongly consistent single-host systems (relational databases) to highly available (HA), eventually consistent distributed systems (NoSQL systems) to handle the massive increases in data managed by databases. However, NoSQL is just too hard for developers, and databases are returning to strongly consistent (but now scalable) NewSQL systems, with databases such as Spanner, CockroachDB, SingleSQL, and MySQL Cluster. 

In this blog, we show that distributed hierarchical file systems are completing a similar journey, going from strongly consistent POSIX-compliant file systems to object stores (with their weaker consistency models, but high availability across data centers), and back to distributed hierarchical file systems that are HA across data centers, without any loss in performance and, crucially, without any increase in cost, as we will use S3 as block storage for our file system.

HopsFS

HopsFS is a distributed hierarchical file system that provides a HDFS API (POSIX-like API), but stores its data in a bucket in S3. We redesigned HopsFS to (1) be highly available across availability zones in the cloud and (2) to transparently use S3 to store the file’s blocks without sacrificing the file system’s semantics. The original data nodes in HopsFS have now become stateless workers (part of a standard Hopsworks cluster) that include a new block caching service to leverage faster local VM storage for hot blocks. It is important to note that the cache is a global cache - not a local worker cache found in other vendor’s Spark workers - that includes secure access control to the cache. In our experiments, we show that HopsFS outperforms EMRFS (S3 with metadata in DynamoDB for improved performance) for IO-bound workloads, with up to 20% higher performance and delivers up to 3.4X the aggregated read throughput of EMRFS. Moreover, we demonstrate that metadata operations on HopsFS (such as directory rename or file move) are up to two orders of magnitude faster than EMRFS. Finally, HopsFS opens up the currently closed metadata in S3, enabling correctly-ordered change notifications with HopsFS’ change data capture (CDC) API and customized extensions to metadata. 

At Logical Clocks, we have leveraged HopsFS’ capabilities to build the industry’s first feature store for machine learning (Hopsworks Feature Store). The Hopsworks Feature Store is built on Hops Hive and customized metadata extensions to HopsFS, ensuring strong consistency between the offline Feature Store, the online Feature Store (NDB Cluster), and data files in HopsFS.

Some of the key advantages of HopsFS/S3 are:

POSIX-Like Semantics with a HDFS API

  • Consistent file listings, consistent read-after-write, atomic rename (files/directories).

Open, Extensible Metadata

  • XAttr API to attach arbitrary metadata to files/directories.

Change Data Capture API

  • Correctly ordered stream of file system mutation events delivered with low latency to downstream clients by ePipe.

Free-Text search API for File System Namespace

  • File system namespace metadata changes can be transparently replicated to Elasticsearch for low-latency free-text search of the namespace and its extended metadata. This service is provided by Hopsworks.

X.509 Certificates for Authentication, TLS for Encryption-in-Transit

  • HopsFS uses X.509 Certificates to identify and authenticate clients, with TLS providing end-to-end encryption-in-transit. 

Faster Metadata Operations

  • File/directory rename/move, file listings - no limit on retrieving 1000 files-at-a-time (as in S3). 

Faster Read Operations

  • Workers in HopsFS securely cache file blocks on behalf of clients using local VM storage. NameNodes are cache-aware and redirect clients to securely read the cached block from the correct worker.

Highly Available across Availability Zones (AZs)

  • Support for high availability (HA) across AZs through AZ-aware replication protocols.

HopsFS/S3 Performance

We compared the performance of EMRFS instead of S3 with HopsFS, as EMRFS provides stronger guarantees than S3 for consisting listing of files and consistent read-after-updates for objects. EMRFS uses DynamoDB to store a partial replica of S3’s metadata (such as what files/directories are found in a given directory), enabling faster listing of files/dirs compared to S3 and stronger consistency (consistent file listings and consistent read-after-update, although no atomic rename) .

Here are some selected results from our peer-reviewed research paper accepted for publication at ACM/IFIP Middleware 2020. The paper includes more results than shown below, and for writes, HopsFS is on-average about 90% of the performance of EMRFS - as HopsFS has the overhead of first writing to workers who then write to S3.  HopsFS has a global worker cache (if the block is cached at any worker, clients will retrieve the data directly from the worker)  for faster reads and the HopsFS’ metadata layer is built on NDB cluster for faster metadata operations.

*Enhanced DFSIO Benchmark Results with 16 concurrent tasks reading 1GB files. For higher concurrency levels (64 tasks), the performance improvement drops from 3.4X to 1.7X.

**As of November 2020, 3500 ops/sec is the maximum number of PUT/COPY/POST/DELETE per second per S3 prefix, while the maximum number of GET/HEAD requests per prefix is 5500 reads/sec. You can increase throughput in S3 by reading/writing in parallel to different prefixes, but this will probably require rewriting your application code and increasing the risk of bugs. For HopsFS (without S3), we showed that it can reach 1.6m metadata ops/sec across 3 availability zones. 

In our paper published at ICDCS, we measured the throughput of HopsFS when deployed in HA mode over 3 availability zones. Using a workload from Spotify, we compared the performance with CephFS. HopsFS (1.6M ops/sec) reaches 2X the throughput of CephFS (800K ops/sec) when both are deployed in full HA mode. CephFS, however, does not currently support storing its data in S3 buckets.

How do I get started with HopsFS?

HopsFS is available as open-source (Apache V2). However, cloud-native HopsFS is currently only available as part of the hopsworks.ai platform. Hopsworks.ai is a platform for the design and operation of AI applications at scale with support for scalable compute in the form of Spark, Flink, TensorFlow, etc (comparable to Databricks or AWS EMR). You can also connect Hopsworks.ai to a Kubernetes cluster and launch jobs on Kubernetes that can read/write from HopsFS. You connect your cluster to a S3 bucket in your AWS account or on Azure to a Azure Blob Storage bucket. You can dynamically add/remove workers to/from your cluster, and the workers act as part of the HopsFS cluster - using minimal resources, but reading/writing to/from S3 or ABS on behalf of clients, providing access control, and caching blocks for faster retrieval.

References

Hopsworks 2.0: The Next Generation Platform for Data-Intensive AI with a Feature Store

>
11/17/2020
Theofilos Kakantousis
>
Theofilos Kakantousis
Jim Dowling

TLDR; Hopsworks 2.0 is now generally available. Hopsworks is the world's first Enterprise Feature Store along with an advanced end-to-end ML platform. This version includes improvements to the User Interface, client APIs and improved integration with Kubernetes. Hopsworks 2.0 also comes with upgrades for popular ML frameworks such as TensorFlow 2.3 and PyTorch 1.6. 

The Hopsworks 2.0 platform:

In addition to bug and documentation fixes, Hopsworks 2.0 includes the following new features and improvements:

  • Feature Store: New Rich APIs and Extended Integration with External Data Sources
  • Provenance: Track and Navigate from Models to Training Datasets to Features
  • Python Programs as Schedulable Feature Engineering Jobs
  • Fine-grained User Access Control for Feature Stores
  • GitLab Support 
  • New example programs for feature engineering and usage for training and serving

Detailed release notes are available at the Hopsworks GitHub repository.

Feature Store: New Rich APIs and Extended Integration with External Data Sources

The Feature Store API is now more intuitive by following a Pandas-like style programming style for feature engineering and works with abstractions such as feature groups and training datasets. Allowing for an improved workflow. Solving the biggest pain points of professional users working with Big Data and AI: being able to organize and discover data with as few steps as possible. An all new API documentation page is accessible here

This release brings the new ability to enrich Feature Groups abstractions by attaching metadata in the form of tags and then allowing free-text search on these, available across the feature store to all users.

More details on the upgraded feature store experience are outlined in the Hopsworks Feature Store API 2.0 blog post and the user guide.

Provenance: Track and Navigate from Models to Training Datasets to Features

Hopsworks is built from the ground-up with support for implicit provenance. What that means is data engineers and data scientists that develop machine learning pipelines, from data ingestion, to feature engineering, to model training and serving, get lineage information for free as the data flows and is transformed through the pipeline. Users can then navigate from a model served in production back to the ML experiment that created the model along with its training dataset and finally all the way back to the feature groups and ingestion pipelines the training dataset was created from.  

Python Programs as Schedulable Feature Engineering Jobs

Up till this moment in Hopsworks’ journey, its Jobs service provided users with the means to run and monitor applications and do feature engineering using popular distributed processing frameworks such as Apache Spark/PySpark and Apache Flink. Hopsworks now further increases its reach to Feature Store users by  enabling them to run such feature engineering jobs by using pure Python programs, detangling them for example from the need to run their code within a PySpark context/cluster. Developers can now do feature engineering with Pandas or their library of choice and still benefit from Jobs capabilities such as include them in Airflow data and ML pipelines as well as run Python Jupyter notebooks as jobs, simplifying the process of including notebooks in development and production pipelines. User guide is available here.

Fine-Grained User Access Control for Feature Stores

Sharing datasets across projects of a Hopsworks instance is now made simpler by using a fine-grained access control mechanism. Users can now select from a list of options that enable a dataset to be shared with different permissions for each project and for each role users have in the target project (Data Owner, Data Scientist). In addition, these new dataset sharing semantics now make it easier to share feature stores with different permissions across projects. For more details please check the user guide here.

GitLab Support

Hopsworks developers can now use GitLab for code version control, in addition to the existing GitHub support. JupyterLab in Hopsworks comes shipped with the Git plugin so developers can work with Git directly from within the JupyterLab IDE. Further details are available in the user-guide here

Python Environment Updates and Examples

Hopsworks 2.0 is shipped with the latest and the greatest frameworks of the data engineering and data science Python world. Every Hopsworks instance now comes pre-installed with Pandas version 1.1.2, TensorFlow 2.3.1, PyTorch 1.6.0 and many more. 

A plethora of new feature engineering and machine learning examples are now available at our examples GitHub repository. Data engineers can now follow examples that demonstrate how to work from the Feature Store with Redshift  or Snowflake. Also new end-to-end and parallel experiments with deep learning and TensorFlow 2.3 are available.

Get Started

Much more is to come from Hopsworks with the upcoming releases so make sure to follow us on Twitter and, if you like it, star us on GitHub.

Visit http://hopsworks.ai to get started with a free instance of Hopsworks and explore the new Hopsworks Feature Store and data science support of the Hopsworks 2.0 series and don’t forget to join our growing community!

Hopsworks: World’s Only Cloud Native Feature Store, now Available on AWS and Azure.

>
11/17/2020
Steffen Grohsschmiedt
>
Steffen Grohsschmiedt
Jim Dowling

TLDR; Hopsworks is now available as a managed platform for Amazon Web Services (AWS) and Microsoft Azure with a comprehensive free tier. Hopsworks is the only currently available cloud-native Enterprise Feature Store and it also includes a Data Science platform for developing and operating machine learning models in production.

The Hopsworks managed platform:

  • Support for AWS and Microsoft Azure;
  • The Feature Store for Databricks (AWS/Azure), SageMaker, Kubeflow, and more;
  • Pay-per-use Compute: Engineer Features at Scale (Spark, Python, Flink);
  • Pay-per-use Compute for Training Machine Learning Models (including GPU support);
  • Scale Storage Independently - data stored on Amazon S3 and Azure Blob Storage;
  • Integrates with Amazon EKS/ECR for Python Jobs, Notebooks, and Model Serving;
  • CI/CD ML Pipelines using Airflow, including support for Jupyter-notebooks-as-jobs;
  • Federated IAM Roles Made Easy;
  • Managed Backups and Upgrades;
  • Active Directory/LDAP Integration;
  • Organization and User Management on Hopsworks.

Start Using Hopsworks for Free

You can get started with Hopsworks for free today. We offer free Hops Credits for you to start engineering and managing your features. You have access to all functionality of the Hopsworks Feature Store alongside all main features of Hopsworks platform, including compute elasticity (add/remove workers when needed), storage on S3/Azure Blob Storage, EKS, ECR and GPUs, as well as integrations with external Data Science and Data Lake/Warehouse platforms. Get started now!

Hopsworks 2.0

Hopsworks is the world's first Enterprise Feature Store for machine learning, including  an end-to-end ML platform for developing and operating AI applications. Hopsworks includes a new and improved Feature Store API, integration with Kubernetes, Sagemaker, Databricks, Redshift, Snowflake, Kubeflow, and many more. Hopsworks also comes with upgrades for popular ML frameworks, such as TensorFlow, PyTorch, and Maggy. 

All details about the 2.0 release can be found in the release blog post: Hopsworks 2.0: The Next Generation Platform for Data-Intensive AI with a Feature Store.

Support for AWS and Microsoft Azure

Hopsworks has a managed platform for AWS and we are pleased to now announce full support for Microsoft Azure. Hopsworks is now cloud-independent, allowing customers to seamlessly move from AWS to Azure or vice-versa, without the fear of being locked in to a single vendor. Enterprises can now manage features for training and serving models at scale on Hopsworks, while maintaining control of their data inside their organisation’s own cloud accounts. Check out how to get started on AWS or Microsoft Azure.

Feature Store for Databricks (AWS/Azure), SageMaker and Kubeflow

As Hopsworks is a modular platform with integrations to third party platforms, organisations can combine our Feature Store with their existing data stores and data science platforms. Hopsworks natively integrates with Databricks on both AWS and Azure, Amazon Sagemaker, Kubeflow and other ML platforms. Check out how to get started with Databricks on AWS and Azure, Amazon Sagemaker, Kubeflow and other Data and ML platforms.

Pay-per-use Compute: Engineer Features and Train Models at Scale

The general available version of Hopsworks adds support for pay-per-use compute, allowing you to add and remove compute nodes for feature engineering and machine learning jobs as needed to scale with your demand and minimize cost when not needed. We support different types of instances allowing you add or remove exactly the right amount of resources (CPU, GPU, memory) that you need to run your application. You can, for example, add GPU nodes to run a large-scale distributed training job while at the same time adding or removing non-GPU nodes for feature engineering workloads. Hopsworks supports running jobs on a wide variety of frameworks: Spark, Python, Flink, TensorFlow, PyTorch, Scikit-Learn, XGBoost, and more.

Pay-per-use Compute for Training Machine Learning Models (including GPU support)

Hopsworks on AWS added support for using GPU instances for running ML workloads, scaling to multiple nodes and GPUs. GPU nodes can be added or removed at any point in time to scale with your workload and safe cost when not needed. In addition, you can utilize EKS for GPU-accelerated workloads in notebooks and Python jobs.

Scale Storage Independently - Amazon S3 and Azure Blob Storage

The Feature Store stores its historical data on HopsFS (a next generation HDFS) that now uses both S3 and Azure Blob storage as its backing store, reducing costs and enabling storage to scale independently of the size of a Hopsworks cluster. As a customer, you only have to provide your bucket and an IAM role/Managed identity with sufficient rights when creating a cluster. All data is kept private and stored inside your cloud account. HopsFS provides both performance improvements (3.4X read throughput vs S3), POSIX-like semantics, and open metadata compared to S3.

Amazon EKS/ECR for Python Jobs, Notebooks, and Model Serving

Hopswoksi on AWS integrates with EKS and ECR allowing you to run applications as containers on EKS from Hopsworks. Some services in Hopsworks are offloaded to EKS, such as running Jupyter/JupyterLab notebooks and model serving workloads. Hopsworks users run applications on EKS that can securely callback and use services in Hopsworks. In particular, there is special support for Python programs, where Data Scientists can install python libraries in their Hopsworks project, and Hopsworks transparently builds the Docker image that is run as a Python job on Kubernetes. Even Jupyter notebooks on Hopsworks can be run and scheduled as jobs on Kubernetes, without users having to write and manage Dockerfiles. An additional benefit of EKS is that the cluster can be set up with autoscaling allowing your workloads to scale dynamically with demand. Check out Integration with Amazon EKS and Amazon ECR to get started with EKS for Hopsworks.

Feature Engineering and ML Pipelines

Hopsworks supports data-parallel feature engineering jobs in Spark, PySpark, and Flink, as well as pure Python feature engineering with Pandas, Feature Tools, and any Python library. Feature engineering pipelines typically end at the Feature Store, and can be orchestrated using Airflow on Hopsworks (or an external Airflow cluster). Hopsworks has additional support for data validation with Great Expectations and Deequ. After the Feature Store, ML pipelines create train/test datasets, train models, analyze, validate and deploy models. Hopsworks provides experiment tracking, Tensorboard, distributed ML (with open-source Maggy), model analysis with the What-if-Tool. Again, ML training pipelines can be orchestrated using Airflow on Hopsworks (or an external Airflow cluster).

Federated IAM Roles Made Easy

Enterprise Hopsworks now enables fine-grained access to AWS resources using AWS role chaining (federated IAM roles). Administrators can now define a mapping between AWS roles and project-specific user roles in Hopsworks. This enables both fine-grained access and auditing of users when they use AWS services from Hopsworks. Hopsworks users can either set a default role to be assumed by their jobs and notebooks to enable access to their AWS resources, or assume a role manually from within their programs by using the Python and Java/Scala SDKs (guide).

Managed Backups and Upgrades

The enterprise version of Hopsworks supports fully-automated backups and upgrades at the click of a button, making the operation of Hopsworks cluster a non-issue for your DevOps team.

Active Directory/LDAP Integration

The enterprise version of Hopsworks supports single sign-on to Hopsworks clusters with Active Directory/LDAP integration. This enables your enterprise to manage access to the Feature Store and Hopsworks with your existing enterprise Identity Provider. Hopsworks also supports OAuth-2 as an Identity Provider.

Organizations and User Management

Hopsworks supports billing organizations and user management, enabling your teams to share access to your Hopsworks account.

Coming soon

We are continuously working on improving the Hopsworks experience. Some of the upcoming features are:

  • Dynamic auto scaling of compute nodes
  • Support for AWS Spot Instances/Azure Spot Virtual Machines
  • Support for Azure Kubernetes Service (AKS)
  • Support for GPUs on Azure

Hopsworks Feature Store API 2.0, a new paradigm.

>
11/17/2020
Fabio Buso
>
Fabio Buso
Moritz Meister
Jim Dowling
Davit Bzhalava

TLDR; Hopsworks Feature Store interacts with the new Python and Scala/Java Software Development Kit (SDK) available in Hopsworks 2.0. The new SDK builds on our extensive experience working with Enterprise customers and users with Enterprise requirements. With this new release, we consolidate multiple libraries with a single one. We named it `HSFS` (HopsworkS Feature Store) and in this blog post we will be looking at some of the improvements and key features that the new SDK brings.

Today, we’re introducing the new Hopsworks Feature Store API. Rebuilt from the ground up, today’s release includes the first set of new endpoints and features we’re launching so developers can help the world connect to the public conversation happening on Twitter. 

If you can’t wait to check it out, visit Hopsworks.ai to get started. If you can, then read on for more about what we’re building, what’s new about the Feature Store API v2.

Rethinking the Hopsworks Feature Store API

The Hopsworks Feature Store was first released at the end of 2018, and it included a new type of Feature Store API based on the FeatureGroup (DataFrame). When designing the Hopsworks Feature Store, we looked at existing feature stores (Michelangelo by Uber and Zipline by AirBnb) that provided Domain Specific Language (DSL) APIs to their Feature Stores - you declaratively define features, and the DSL is then executed to ingest feature data into the Feature Store. However, we were building a general-purpose Feature Store and we knew that history has not been kind to DSLs - they have their day in the sun, but general purpose frameworks and languages win out in the long-run. So, we went with the FeatureGroup (DataFrames in Spark or Pandas) as the way to ingest and export features to/from the Hopsworks Feature Store. Since then, other Feature Stores have followed our approach, such as Feast that introduced FeatureSets in 0.3 almost 1 year later and Spark support almost 2 years later). 

However, our API still encouraged developers to think of features as existing in a flat namespace. With great customers, such as PaddyPower, we rethought and redesigned our Feature Store API to consider other practical problems we encountered, such as making breaking changes to FeatureGroup schemas without breaking existing feature pipelines, handling feature naming conflicts, and redesigning a minimal single client library (from 2 libraries previously) that can run in either a (Py)Spark or Python environment. That library is now released as our new Feature Store API and is called HSFS.

Feature Reuse with Pandas-like Joins

HSFS provides a DataFrame API to ingest data into the Hopsworks Feature Store. You can also retrieve feature data in a DataFrame, that can either be used directly to train models or materialized to file(s) for later use to train models

The idea of the Feature Store is to have pre-computed features available for both training and serving models. The key functionality required to generate training datasets from reusable features are: feature selection, joins, filters and point in time queries. To enable this functionality, we are introducing a new expressive Query abstraction with HSFS that provides these operations and guarantees reproducible creation of training datasets from features in the Feature Store. 

The new joining functionality is heavily inspired by the APIs used by Pandas to merge DataFrames. The APIs allow you to specify which features to select from which feature group, how to join them and which features to use in join conditions.

If a data scientist wants to modify a new feature that is not available in the Feature Store, she can write code to compute the new feature (using existing features or external data) and ingest the new feature values into the Feature Store. If the new feature is based solely on existing feature values in the Feature Store, we call it a derived feature. The same HSFS APIs can be used to compute derived features as well as features using external data sources.

  

# create a query
feature_join = rain_fg.select_all()
                         .join(temperature_fg.select_all(), on=["date", "location_id"])
                         .join(location_fg.select_all())

td = fs.create_training_dataset("rain_dataset",
                          version=1,
                          label=”weekly_rain”,            
                          data_format=”tfrecords”)

# materialize query in the specified file format
td.save(feature_join)

# use materialized training dataset for training, possibly in a different environment
td = fs.get_training_dataset(“rain_dataset”, version=1)

# get TFRecordDataset to use in a TensorFlow model
dataset = td.tf_data().tf_record_dataset(batch_size=32, num_epochs=100)

# reproduce query for online feature store and drop label for inference
jdbc_querystring = td.get_query(online=True, with_label=False)

When using HSFS to create a training dataset, the features, their order and how the feature groups are joined is also saved as metadata. This metadata is then used at serving time to build a JDBC query that is executed by a client (along with the feature group primary key values) to request a feature vector from the online Feature Store for a specific model.

Features belong in a hierarchical namespace

When we started building the Hopsworks Feature Store we wanted to create a flat namespace for features to make it easier for data scientists to pick the features they wanted. But as soon as our customers started having tens of thousands of features in the feature store, feature naming conflicts became commonplace. 

Features like created_on, customer_id, account_id, are often used in many different unrelated Feature Groups. The same user might, for example, have a different account_id for each service the company provides.Imagine an e-commerce use case - most likely you will have multiple `created_on` or `revenue`-named features.

In HSFS we solved this challenge by requiring users to work with the feature group abstraction - users have to specify which features they need from which feature group. The feature groups abstraction also allows the query planner to intelligently  identify which feature to use when joining features from different feature groups.

Time travel as first class citizen

Hopsworks has offered support for Apache Hudi for over one year. Apache Hudi is the key component to make time travel, upserts, and ACID updates possible on feature data. Time travel enables data engineers and data scientists to retrieve a previous snapshot of the feature data for debugging and auditing purposes. 

In HSFS, we added support for complex time travel queries, allowing users to create training datasets by retrieving different feature groups at different points in time. The different times are stored as metadata alongside the training datasets. This enables users to easily reproduce the creation of training datasets with historical feature values.

The new SDK improves support for Apache Hudi by integrating directly with the feature groups and joining APIs. Apache Hudi is also now available for the Python APIs. The APIs hide the complexity of dealing with Apache Hudi options. At the same time the high level APIs will allow us to implement support for additional formats such as Delta Lake.

Provenance and custom metadata

Provenance for feature data is a key new functionality available as part of HSFS. Hopsworks Feature Store implicity tracks dependencies between feature groups, training datasets and models. Provenance allows users to know, at any given moment, what features are the most widely used and also what features are not used anymore and could potentially be removed from the feature store. Provenance also enables users to traverse from a model to the application used to train it, the input training dataset used, and the features and feature group snapshots used to create the training dataset.

In this new release we also allow users to create and attach user-defined labels to feature groups and training datasets. Labels and tags enable users to build a custom metadata catalog for their Feature Store, that (using Elasticsearch) supports low-latency free-text search over potentially thousands of features, descriptions, labels and tags.

ML Framework APIs to Training Datasets

While the Hopsworks feature store plays a key role in defining governance around feature data, improving execution speed when it comes to experimenting, building and deploying models, the ultimate customers of the feature stores are the data scientists and machine learning engineers.

To make their interaction with the feature store as smooth as possible, we added support retrieving training dataset as a TensorFlow Dataset format using tf.Data. This allows data scientists to efficiently read training datasets in their TensorFlow code.

Pure Python Feature Engineering

Alongside existing support for (Py)Spark, this release also brings supports for feature engineering pipelines built using pure Python programs. With HSFS you are able to upload feature data to the feature store from your SageMaker notebook, KubeFlow, Jupyter notebook, or even your local machine.

This is in addition to the existing capabilities of exploratory data analysis with the feature store from your notebook, and the creation and reading of training datasets.

Get started

You can get started immediately by creating a new cluster on https://hopsworks.ai, the library is already pre-installed in your environment so you can get started straight away.

The documentation is available and you can walk through one of the many example notebooks that are available here.

Feature Store for MLOps? Feature reuse means JOIN

>
10/23/2020
Jim Dowling
>
Jim Dowling

TLDR; Many engineers conflate operational Feature Stores with key-value (KV) stores, like DynamoDB and Cassandra. In fact, KV stores are missing a key mechanism needed to make features reusable: JOINs. JOINs enable features to be reused by different models. This blog is about how to scale your ML infrastructure by reusing cached features so that the number of feature pipelines you manage does not grow linearly with the number of models you run in production.

Img from OSDC talk by Twitter. Twitter evaluate their Feature Store by the number of features that are reused across teams.

The Cost of No JOINs: One Feature Pipeline per Model

If you don’t reuse features across different models, you will need a new feature pipeline for every new model you put in production. In the diagram below, there is a 1:1 mapping between train/test datasets and models. For every new model you put in production, you will write a new feature pipeline from your data stores that transforms and validates the raw data and performs aggregations, materializing train/test data to files. A ML training program (or pipeline) then trains and validates a model with the train/test data, after which it is tested and deployed to production.

It is very difficult to reuse the features in the materialized train/test datasets, as they tend to be stored in a format specific to a ML framework: .tfrecord for TensorFlow,.npy for PyTorch - and they cannot be easily combined with features stored in other train/test datasets. This is often due to the limitations of the file formats: neither TFRecord nor NPY support projections (selecting just a subset of columns).

If you intend to run hundreds of models in production, running hundreds of pipelines will explode your technical debt.


In online feature stores that do not support JOINs, it is typically the responsibility of the application to perform the JOIN of the cached features to create the feature vector. So, if you build your own online feature store using Cassandra or DynamoDB, you will need to also add logic for joining (and ordering) features in your applications/models. 

Every new feature you add to that model will need an update to the feature pipeline and changes to the application logic - making it costly and potentially cross-team work.

JOINs enable Feature Reuse


A JOIN is a Structured Query Language (SQL) command that combines data from two different two database tables. JOINs are used to create a view over the data that originates from one or more tables. If we assume that we store features normalized in database tables, then we are able to create sets of features (training datasets) by selecting different features from different tables and joining them together using a common join key. The join key(s) identify the common entity these feature values represent. 

If we now assume that features are stored as columns in tables, then what is a train/test dataset? It is a set of features along with a target column. Assuming the features are already present in existing tables, we can just join those features (columns) together to create a view - this view is the train/test dataset. The order of features (in a view) that makes up a train/test dataset is significant - a train/test dataset is a permutations of features - not a combination. Views (enabled by JOINs) enable a massive number of train/test datasets to be defined over a small number of shared features (stored in tables)

In Hopsworks, we store features in tables that we call “Feature Groups”. Feature Groups introduce a level of indirection between the raw input data and the train/test datasets used to train models, see below. They store a cached copy of the features, computed using the same feature pipeline from earlier, but this time, the feature pipeline writes to one or both of the stores that much a feature store: (1) a scalable store for train/test features (offline feature store) and (2) a low latency, high throughput store for features for serving (online feature store). 

In Hopsworks, we use Apache Hive as a scalable database for the offline store, and MySQL Cluster (NDB) as the online store. Our version of Hive stores its metadata in NDB and its data files in HopsFS/S3. In Hopsworks, FeatureGroups are database tables in Hive and MySQL Cluster, along with feature metadata (also tables in the same NDB database). 

As we can see in the diagram below, if we have N features available in the Feature Store, we can create an unlimited number of train/test datasets by simply joining features together from the offline feature store. 

In Hopsworks, we use Spark, with its cost-based optimizer, to perform JOINs. Spark, together with Parquet/Hudi/ORC help optimize joins by supporting partitioned data, push-down projections, SortMergeJoin, and 

  • a hint;
  • a join type (inner, left, outer, etc);
  • a join condition (equi-join);
  • an estimation of the input data size

JOINs in Online Feature Stores

If you use a KV store as your online store, you are back in the same situation as at start - you need a feature pipeline for every new model you put in production. But with Feature Groups (tables in NDB), we can materialize feature vectors (the individual row of features that is fed directly to the model for scoring) from the different tables by performing a join.

NDB supports sophisticated optimizations for JOINs, and can push-down many JOINs to the database nodes. In practice, in NDB even complex queries can “achieve latency down to 5-10 ms for a transaction that contains around 30 SQL statements”.

Conclusions

Reuse features to save on infrastructure and the number of feature pipelines needed to maintain models in production. JOINs is the method we use in Hopsworks to reuse cached features across different models - both for training and serving. We use Spark as the JOIN engine for the offline feature store and NDB with its parallelized, push-down JOINs for low-latency joins in the online feature store.

ML Engineer Guide: Feature Store vs Data Warehouse

>
10/8/2020
Jim Dowling
>
Jim Dowling

TLDR; The feature store is a data warehouse of features for machine learning (ML). Architecturally, it differs from the traditional data warehouse in that it is a dual-database, with one database (row-oriented) serving features at low latency to online applications and the other database (column-oriented) storing large volumes of features, used by Data Scientists to create train/test datasets and by batch applications doing offline model scoring.

Features Store: Data Warehouse Déjà Vu

Data warehouses democratized access to Enterprise data by centralizing data in a single platform and then empowering business analysts with visual tools, such as Tableau and Power BI. No longer did they need to know what data resides where and how to query that data in that platform. They could derive historical insights into the business using BI tools. 

Data scientists, in contrast, build predictive models to derive business insights. The feature store is the data warehouse for Data Science - it is a central vault for storing documented, curated, and access-controlled features that can be used across many different models. The feature store ingests data from the Enterprise’s many different sources after transforming, aggregating, and validating the data. 

Feature pipelines need to be written to ensure that data reliably flows from existing sources and is available in a format ready to be consumed by ML training pipelines and models.

Most Data Scientists currently do not have a feature store. They spend most of their time looking for, cleaning, and featurizing data. Hence, the (very real) cliché that 80% of data science is data wrangling. Data Scientists without a feature store work in an era akin to how business analysts worked before the advent of data warehouses, with low individual and organizational productivity.

The Data Warehouse is an input
to the Feature Store 

Both platforms are a central store of curated data used to generate insights into the data. Both platforms have pipelines (ETL and feature pipelines, respectively) to ingest data from one or more disparate sources (operational databases, data lakes, etc).

Both benefit from metadata catalogs to organize data sets and access control to share data with only authorized actors. 

Both platforms can be designed to scale-out on commodity hardware and store large volumes of data, although typically a data warehouse stores only relevant to analysis (modern data lakehouses are designed to store large volumes of data more cost efficiently).

Feature Store as a Dual Database

The main architectural difference between a data warehouse and a feature store is that the data warehouse is typically a single columnar database, while the feature store is typically implemented as two databases:

  • an offline feature store for serving large batches of features to (1) create train/test datasets and (2) batch applications scoring models using those batches of features, and
  • an online feature store for serving a single row of features (a feature vector) to be used as input features for an online model for an individual prediction.

The offline feature store is typically required to efficiently serve and store large amounts of feature data, while the online feature store is required to return feature vectors in very low latency (e.g., < 10ms). Examples of databases used for the offline feature store are Apache Hive and BigQuery and examples of online feature stores include MySQL Cluster, Redis, and DynamoDB. 

Note that if you want to reuse features in different train/test datasets for different models, your database or application will need to join features together. This is true for both the offline and online feature stores. If your feature store does not support joining features, that is, you do not reuse features across different models, you (or some system) will need to create a new ingestion pipeline for every new model you support in production.

Detailed Comparison

In the table below, we see an overview of the main architectural differences between feature stores and data warehouses. Data warehouses are used primarily by business analysts for interactive querying and for generating historical reports/dashboards on the business. Feature stores are used by both data scientists and by the online/batch applications, and they are fed data by feature pipelines, typically written in Python or Scala/Java. 

Data scientists typically use Python programs to create train/test datasets by joining existing features in the feature store together and materializing the train/test datasets in a file format best suited to the framework they are going to train their model in (e.g., TFRecord for TensorFlow, NPY for PyTorch). Data warehouses and SQL currently lack this capability to create train/test datasets in ML file formats.

Feature Data should be Validated
before Ingestion

The table also shows the differences in the types of data stored, as well as how the data is stored, validated, and queried. A data warehouse stores data in tables along with schemas for describing the type of data and constraints for columns. Similarly, the feature store stores typed data (typically in tables), but as features are typically stored as ready-to-consume numerical values or vectors (embeddings) or tensors, there is less need for a richer set of column types compared to a data warehouse.  Foreign key constraints are typically not supported in feature stores, due to the difficulty in enforcing such constraints between online and offline stores.

As model training is very sensitive to bad data (null values, outliers cause numerical instability, missing values), feature data should be validated before ingestion. Data validation frameworks, such as Great Expectations and Deequ, have appeared to help implement feature pipelines that apply predicates (data validation rules) on all the features ingested into the feature store, ensuring high data quality in the feature store. 

Domain specific languages (DSL) are sometimes used to define the feature transformations, aggregations, and data validation rules in feature pipelines, but general purpose languages (Python, Scala) are commonly used when non-trivial feature engineering is required. 

Using the feature store to create train/test data

Data scientists are one of the main users of the feature store. They use a feature repository to perform exploratory data analysis (EDA) - searching/browsing for available features and inspecting feature values/schemas/statistics. Data Scientists mainly use Python to select features to create train/test datasets. This typically involves joining features together to create a  train/test dataset in their file format of choice (.tfrecord, .csv, .npy, .petastorm, etc). Sometimes feature stores support a DSL (domain specific language) to create train/test datasets or other languages such as Scala/Java. 

Online feature store

Online applications use the online feature store to retrieve feature values with low latency to build feature vectors that are sent to models for predictions. In contrast to higher latency data warehouses, feature stores may be required to return feature vectors in single millisecond latency - only really achievable in row-oriented or key-value stores. 

The typical access pattern for retrieving features is a key-value lookup, but if features are to be reused in the online feature store, then joins are again required (either in the database or in the application). In some databases (such as MySQL Cluster), a small number of joins can be performed at very low latency.

Feature statistics to monitor for feature
drift and data drift

Descriptive statistics (e.g., mean, standard deviation) for features are also useful when identifying data drift in online models. Your monitoring infrastructure can calculate statistics on live prediction traffic, and compare those statistics with the values in the feature store to identify data drift for the live traffic, potentially required retraining of the model.

Time-Travel 

Temporal databases support time-travel: the ability to query data as it was at a given point-in-time or data changes in a given time-interval. The “AS OF SYSTEM TIME” syntax was introduced to SQL 2011 to standardize point-in-time queries, while the “VERSIONS BETWEEN SYSTEM TIME ... AND ... “ syntax was introduced to identify the versioned changes to data in a time interval. Time-travel is supported in some data warehouses, but does not have universal support across all vendors.

For a feature store time-travel has several important uses: when creating train/test data (e.g., training data is data from the years 2010-2018, while test data is data from the range 2019-2020). Time-travel is also useful to make changes to a dataset (e.g., rollback a bad commit of data to the dataset) or to compare metadata (statistics) for features and how they change over time. We rarely require time-travel for features used in serving. Time-travel is also important when performing point-in-time joins, where we ensure that there is no data leakage from the future when we create train/test datasets from historical data.

Feature Pipelines 

Data warehouses typically have timed triggers for running ETL jobs (or data pipelines) to ingest the latest data from operational databases, message queues, and data lakes. Similarly, feature pipelines can timed triggers to transform and aggregate the latest data from different sources before storing it in both the online and offline feature store for scoring by online and offline applications. However, additional pipelines can also feed features to the feature store. 

Predictions made by models can be stored in the feature store along with the outcomes for those predictions. There can be long lags of even days or months or years before outcomes become available - e.g., a prediction on whether a loan will be repaid or not), but as they arrive new training data becomes available that can be used to trigger re-training of models.

Conclusion

Data warehouses can be used to store pre-computed features, but they do not provide much more functionality beyond that for ML pipelines. When Data Scientists need to create train/test data using Python or when online features (for serving features to online models) are needed at low latency, you need a feature store. Similarly, if you want to detect feature drift or data drift, you need support for computing feature statistics and identifying drift.


One function is all you need for ML Experiments

>
9/30/2020
Robin Andersson
>
Robin Andersson
Alex Ormenisan
Moritz Meister
Jim Dowling

TLDR; Hopsworks provides support for machine learning (ML) experiments. That is, it can automatically track the artifacts, graphs, performance, logs, metadata, and dependencies of your ML programs.Many of you already know about platforms like MLflow, so why should you read about Hopsworks Experiments?  Because you do not have to rewrite your TensorFlow/PyTorch/Scikit-learn programs to get tracking and distributed ML for free, and TensorBoard comes built-in. We discuss how Hopsworks uniquely supports implicit provenance to transparently create metadata and how it is combined with the oblivious training function to make your training distribution transparent. 

Hopsworks Introduction

Hopsworks is a single platform for both data science and data engineering that is available as both an open-source platform and a SaaS platform, including a built-in feature store. You can train models on GPUs at scale, easily install any Python libraries you want using pip/conda, run Jupyter notebooks as jobs, put those jobs in Airflow pipelines, and even write (Py)Spark or Flink applications that run at scale. 

As a development environment, Hopsworks provides a central, collaborative development environment that enables machine learning teams to easily share results and experiments with teammates or generate reports for project stakeholders. All resources have strong security, data governance, backup and high availability support in Hopsworks, while assets are stored in a single distributed file system (with data stored on S3 in the cloud).

A Hopsworks ML experiment stores information about your ML training run: logs, images, metrics of interest (accuracy, loss), the program used to train the model, its input training data, and the conda dependencies used. Optional outputs are hyperparameters, a TensorBoard, and a Spark history server.


The logs of each hyperparameter trial are retrieved by clicking on its log, and TensorBoard visualizes the different trials results. The TensorBoard HParams plugin is also available to drill down further on the trials.

Tracking

When you run a Python or PySpark application on the Hopsworks platform, it can create an experiment that includes both the traditional information a program generates (results, logs, errors) as well as ML-specific information to help track, debug, and reproduce your program and its inputs and outputs:

  • hyperparameters: parameters for training runs that are not updated by the ML programs themselves; 
  • metrics: the loss or accuracy of the model(s) trained in this experiment;
  • program artifacts: python/pyspark/airflow programs, and their conda environments;
  • model artifacts: serialized model objects, model schemas, and model checkpoints;
  • executions: information to be able to re-execute the experiment, including parameters, versioned features for input, output files,  etc; 
  • versioned features: to be able to reproduce an experiment, we need the exact training/test data from the run and how it was created from the feature store;
  • visualizations: images generated during training and score. Also use TensorBoard to visualize training runs - Hopsworks aggregates results from all workers transparently;
  • logs (for debugging): model weights, gradients, losses, optimizer state;
  • custom metadata: tag experiments and free-text search for them, govern experiments (label as ‘PII’, ‘data-retention-period’, etc), and reproduce training runs.

Experiment Tracking and Distributed ML in One Library


-- CODE language-bash -- def train(data_path, max_depth, min_child_weight, estimators): X_train, X_test, y_train, y_test = build_data(..) ... print("hello world") # monkeypatched - prints in notebook ... model.fit(X_train, y_train) # auto-logging ... hops.export_model(model, "tensorflow",..,model_name) ... # create local files ‘logile.txt’, ‘diagram.png’ return {'accuracy': accuracy, 'loss': loss, 'logfile': 'logfile.txt', 'diagram': 'diagram.png'} # track dict from maggy import experiment experiment.lagom(train, name="My Experiment", ...) # To launch as a distributed ML HParam Tuning job: # sp=Searchspace(max_depth=('INTEGER',[2,8]),min_child_weight # =('INTEGER', [2, 8]), ) # experiment.lagom(train, name=“HP, optimizer='randomsearch', # direction='max', num_trials=15,)

Platforms that support experiment tracking require the user to refactor their training code in a function or some explicit scope (such as “with … as xx:” in MLFlow, see Appendix A) to identify when an experiment begins and when an experiment ends. In Hopsworks, we require the developer to write their training code inside a function. 

We call this Python function an oblivious training function because the function is oblivious of whether it is being run on a Python kernel in a Jupyter notebook or on many workers in a cluster, see our blog and Spark/AI summit talk for details. That is, you write your training code once and reuse the same function when training a small model on your laptop or when performing hyperparameter tuning or distributed training on a large cluster of GPUs or CPUs.

We double down on this “wrapper” Python function by also using it to start/stop experiment tracking. Experiment tracking and distribution transparency in a single function, nice! 

In Hopsworks, the Maggy library runs experiments, see code snippet above. As you can see, the only code changes a user needed compared to a best-practice TensorFlow program are: 

  1. factor the training code in a user-defined function (def train(..):);
  2. return a Python dict containing the results, images, and files that the user wants to be tracked for the experiment and accessible later in the Experiments UI; and
  3. invoke the training function using the experiment.lagom function.

The hyperparameters can be fixed for a single execution run, or as shown in the last 4 lines of the code snippet, you can execute the train function as a distributed hyperparameter tuning job across many workers in parallel (with GPUs, if needed). 

Hopsworks will automatically:

  • track all parameters of the train function as hyperparameters for this experiment, 
  • auto-log using Keras callbacks in model.fit;
  • create a versioned directory in HopsFS, where a copy of the program, its conda environment, and all logs from all workers are aggregated;
  • track all provenance information for this application - input data from HopsFS used in this experiment (train/test datasets from the Feature Store), and all output artifacts (models, model checkpoints, application logs);
  • redirect all print statements executed in workers to the Jupyter notebook cell for easier debugging (see GIF below - each print statement is prefixed by the worker ID).
In Hopsworks, logs from workers can be printed in your Jupyter notebook during training. Take that Databricks!

TensorBoard support

-- CODE language-bash -- def train(): from maggy import tensorboard ... model.fit(.., callbacks=[TensorBoard(log_dir=tensorboard.logdir(),..)], ...)

TensorBoard is arguably the most common and powerful tool used to visualize, profile and debug machine learning experiments. Hopsworks Experiments integrates seamlessly with TensorBoard. Inside the training function, the data scientist can simply import the tensorboard python module and get the folder location to write all the TensorBoard files. The content of the folder is then collected from each Executor and placed in the experiment directory in HopsFS. As TensorBoard supports showing multiple experiment runs in the same graph, visualizing and comparing multiple hyperparameter combinations becomes as simple as starting the TensorBoard integrated in the Experiments service. By default, Tensorboard is configured with useful plugins such as HParam, Profiler, and Debugging. 

Profiling and debugging

Hopsworks 1.4.0 comes with TensorFlow 2.3, which includes the TensorFlow profiler. A new long-awaited feature that finally allows users to profile model training to identify bottlenecks in the training process such as slow data loading or poor operation placement in CPU + GPU configurations. 

TensorFlow 2.3 also includes Debugger V2, making it easy to find model issues such as NaN which are non-trivial to find the root cause of in complex models.

Model Registry

In the training code models may be exported and saved to HopsFS. Using the model python module in the hops library, it is easy to version and attach meaningful metadata to models to reflect the performance of a given model version. 

The Hopsworks Model Registry, is a service where all models are listed in addition to useful information such as which user created the model, different versions, time of creation and evaluation metrics such as accuracy. 

The Model Registry provides functionality to filter based on the model name, version number and the user that exported the model. Furthermore the evaluation metrics of model versions can be sorted in the UI to find the best version for a given model. 

In the Model Registry UI, you can also navigate to the experiment used to train the model, and from there to the train/test data used to train the model, and from there to the features in the feature store used to create the train/test data. Thanks, provenance!

Exporting a model

A model can be exported programmatically by using the export function in the model module. Prior to exporting the model, the experiment needs to have written a model to a folder or to a path on HopsFS. Then that path is supplied to the function along with the name of the model and the evaluation metrics that should be attached. The export call will upload the contents of the folder to your Models dataset and it will also appear in the Model Registry with an incrementing version number for each export.

-- CODE language-bash -- from hops import model # local path to directory containing model (e.g. .pb or .pk) path = os.getcwd() + “/model_dir” # uploads path to the model repository, metadata is a dict of metrics model.export(path, “mnist”, metrics={‘accuracy’: acc})

Get the best model version

When deploying a model to real-time serving infrastructure or loading a model for offline batch inference, applications can query the model repository to find the best version based on metadata attached to the model versions - such as the accuracy of the model. In the following example, the model version for MNIST with the highest accuracy is returned.

-- CODE language-bash -- from hops import model F from hops.model import Metric MODEL_NAME=”mnist” EVALUATION_METRIC=”accuracy” best_model = model.get_best_model(MODEL_NAME, EVALUATION_METRIC, Metric.MAX) print(‘Model name: ‘ + best_model[‘name’]) print(‘Model version: ‘ + str(best_model[‘version])) print(best_model[‘metrics’])

The Devil is in the Details

That was the brief overview of Hopsworks Experiments and the Model Registry. You can now try it out on www.hopsworks.ai or install Hopsworks Community or Enterprise on any servers or VMs you can get your hands on. If you want to read more about how we implemented the plumbing, then read on.

Transparent Distributed ML with PySpark

Hopsworks uses PySpark to transparently distribute the oblivious training function for execution on workers. If GPUs are used by workers, Spark allocates GPUs to workers, and dynamic executors are supported which ensures that GPUs are released after the training function has returned, read more here. This enables you to keep your notebook open and interactively visualize results from training, without having to worry that you are still paying for the GPUs. 

The advantage of the Hopsworks programming model, compared to approaches where training code is supplied as Docker images such as AWS Sagemaker, is that you can write custom training code in place and debug it directly in your notebook. You also don’t need to write Dockerfiles for training code, and Python dependencies are managed by simply installing libraries using PIP or Conda from the Hopsworks UI (we compile the Docker images transparently for you).

The oblivious training function can run in different execution contexts: on a Jupyter notebook in a Python kernel (far left), for parallel ML experiments (middle), and for collective allreduce data parallel training (far right). Maggy and Hopsworks take care of complex tasks such as scheduling tasks, collecting results, and generating new hyperparameter trials.

HopsFS stores experiment data and logs generated by workers during training. When an experiment is started through the API, a subfolder in the Experiments dataset in HopsFS is created and metadata about the experiment is attached to the folder. Hopsworks automatically synchronizes this metadata to elasticsearch using implicit provenance. 

The metadata may include information such as the name of the experiment, type of the experiment, the exported model, and so on. As the existence of an experiment is tracked by a directory, it also means that deleting a folder also deletes the experiment as well as its associated metadata from the tracking service. 

Tracking metadata with Implicit Provenance

Existing systems for tracking the lineage of ML artifacts, such as TensorFlow Extended or MLFlow, require developers to change their application or library code to log tracking events to an external metadata store. 

In Hopsworks, we primarily use implicit provenance to capture metadata, where we instrument our distributed file system, HopsFS, and some libraries to capture changes to ML artifacts, requiring minimal code changes to standard TensorFlow, PyTorch, or Scikit-learn programs (see details in our USENIX OpML’20 paper). 

File system events such as reading features from a train/test dataset and saving a model to a directory implicitly recorded as metadata in HopsFS and then transparently indexed in Elasticsearch. This enables free-text search for ML artifacts, metadata, and experiments in the UI.

Experiments in Hopsworks are the first part of a ML training pipeline that starts at the Feature Store and ends at model serving. ML Artifacts (train/test datasets, experiments, models, etc) can be stored on HopsFS, and they can also have custom metadata attached to them. 

The custom metadata is tightly coupled to the artifact (remove the file, and its metadata is automatically cleaned up) - this is achieved by storing the metadata in the same scaleout metadata layer used by HopsFS. This custom metadata is also automatically synchronized to Elasticsearch (using a service called ePipe), enabling free-text search for metadata in Hopsworks.

That’s all for now Folks!

Of all the developer tools for Data Science, platforms for managing ML experiments have seen the most innovation in recent years. Open-source platforms have appeared, such as MLFlow and our Hopsworks platform, alongside proprietary SaaS offerings such as WandB, Neptune, Comet.ml, and Valohai. 

What makes Hopsworks Experiments different? You can write clean Python code and get experiment tracking and distributed ML for free with the help of implicit provenance and the oblivious training function, respectively. 

There is growing consensus that platforms should keep track of what goes in and out of ML experiments for both debugging and reproducibility. You can instrument your code to keep track of inputs/outputs, or you can let the framework manage it for you with implicit provenance. 

Hopsworks Experiments are a key component in our mission to reduce the complexity of putting ML in production. Further groundbreaking innovations are coming in the next few months in the areas of real-time feature engineering and monitoring operational models. Stay tuned!

Appendix A

In the code snippet below, we compare how you write a Hopsworks experiment with MLFlow. There are more similarities than differences, but explicit logging to a tracking server is not needed in Hopsworks.

-- CODE language-bash -- def train(data_path, max_depth, min_child_weight, estimators): X_train, X_test, y_train, y_test = build_data(..) ... print("hello world") # monkeypatched - prints in notebook ... model.fit(X_train, y_train) # auto-logging ... hops.export_model(model, "tensorflow",..,model_name) ... # create local files ‘logile.txt’, ‘diagram.png’ return {'accuracy': accuracy, 'loss': loss, 'logfile': 'logfile.txt', 'diagram': 'diagram.png'} # track dict from maggy import experiment experiment.lagom(train, name="My Experiment", ...) # To launch as a distributed ML HParam Tuning job: # sp=Searchspace(max_depth=('INTEGER',[2,8]),min_child_weight # =('INTEGER', [2, 8]), ) # experiment.lagom(train, name=“HP, optimizer='randomsearch', # direction='max', num_trials=15,)

-- CODE language-bash -- def train(data_path, max_depth, min_child_weight, estimators, model_name): # distribution external X_train, X_test, y_train, y_test = build_data(..) mlflow.set_tracking_uri("jdbc:mysql://username:password@host:3306/database") mlflow.set_experiment("My Experiment") with mlflow.start_run() as run: ... mlflow.log_param("max_depth", max_depth) mlflow.log_param("min_child_weight", min_child_weight) mlflow.log_param("estimators", estimators) with open("test.txt", "w") as f: f.write("hello world!") mlflow.log_artifacts("/full/path/to/test.txt") ... model.fit(X_train, y_train) # auto-logging ... mlflow.tensorflow.log_model(model, "tensorflow-model", registered_model_name=model_name)

Like MLFlow, but better?

Appendix B

Pipelines are the program that orchestrates the execution of an end-to-end training and model deployment job. In Hopsworks, you can run Jupyter notebooks as schedulable Jobs in Hopsworks, and these jobs can be run as part of an Airflow pipeline (Airflow also comes as part of Hopsworks). After pipeline runs, data scientists can quickly inspect the training results in the Experiments service. 

The typical steps that make up a full training-and-deploy pipeline include:

  • materialization of train/test data by selecting features from a feature store, 
  • model training on the train/test data and export the model to the Model Registry,
  • evaluation and validation of the model and if it passes robustness, bias, and accuracy tests, model deployment.

Real-Time Predictions with a Feature Store

This video shows how you can leverage the Hopsworks online feature store to compute and ingest features and make them available to operational models making real-time predictions, with low latency and preventing skew between the training and serving features.

Manage Python libraries in Hopsworks

Hopsworks provides a Python environment per project that is shared among all the users in the project. All common installation alternatives are supported, , in addition to libraries packaged in a .whl or .egg file and those that reside on a git repository.

Real-Time Predictions with a Feature Store

This video shows how you can leverage the Hopsworks online feature store to compute and ingest features and make them available to operational models making real-time predictions, with low latency and preventing skew between the training and serving features.

Connect Hopsworks to AWS

Connecting Hopsworks to your organisation’s AWS account is the first step towards using the Feature Store: 1. Connect your AWS account, 2. Create an instance profile, 3. Create a S3 bucket, 4. Create a SSH key, 5. Enable permissions for Hopsworks to access

Connect Hopsworks to Azure

Connecting Hopsworks to your organisation’s Azure account is the first step towards using the Feature Store: 1. Connect your Azure account, 2. Create and configure a storage, 3. Add a ssh key to your resource group, 4. Enable permissions for Hopsworks to access

Simplifying Feature Engineering with a Feature Store

A feature store is a data platform that manages and governs your features for machine learning - both for training and serving. You have access to and can reuse previously engineered features available within the entire organisation, avoiding the need to write a feature engineering pipeline for every model put in production.

Hopsworks Feature Store with Microsoft Azure

The Hopsworks Feature Store is available today on Azure as both a managed platform (www.hopsworks.ai) and a custom Enterprise installation. It manages your features for training and serving models in a cluster under your control inside your organisation’s existing cloud account.

Provenance for Machine Learning Pipelines. Guest Lecture at Boston University.

This video goes into depth on the scale-out metadata architecture in Hopsworks and it is used to enable Impliciit Provenance for Machine Learning Pipelines. It was a Guest Lecture at Boston University in October 2020, on a course taught by John (Ioannis) Liagos.

Solve Fraud Challenges with Graph Network and Deep Learning in Hopsworks

Financial institutions invest huge amounts of resources in both identifying and preventing money laundering and fraud. Most existing systems for identifying money laundering are rules-based that are not capable of detecting ever changing schemes. Consequently, these systems generate too many false-positive alerts, taking time and money to run down.

Building a Feature Store around Dataframes and Apache Spark

A Feature Store enables machine learning (ML) features to be registered, discovered, and used as part of ML pipelines, thus making it easier to transform and validate the training data that is fed into machine learning systems. Feature stores can also enable consistent engineering of features between training and inference, but to do so, they need a common data processing platform. The first Feature Stores, developed at hyperscale AI companies such as Uber, Airbnb, and Facebook, enabled feature engineering using domain specific languages, providing abstractions tailored to the companies’ feature engineering domains. However, a general purpose Feature Store needs a general purpose feature engineering, feature selection, and feature transformation platform.

From Python to PySpark and Back Again Unifying Single host and Distributed Deep Learning with Maggy

Distributed deep learning offers many benefits – faster training of models using more GPUs, parallelizing hyperparameter tuning over many GPUs, and parallelizing ablation studies to help understand the behaviour and performance of deep neural networks. With Spark 3.0, GPUs are coming to executors in Spark, and distributed deep learning using PySpark is now possible. However, PySpark presents challenges for iterative model development – starting on development machines (laptops) and then re-writing them to run on cluster-based environments.