Today we are pleased to announce the release of two new RonDB releases.
The source code of RonDB is found in https://github.com/logicalclocks/rondb.
RonDB information is found through the RonDB website http://rondb.com.
Documentation of RonDB is found at http://docs.rondb.com.
RonDB 21.04.1 is a maintenance release of RonDB 21.04 that contains 3 new features and 18 bug fixes. RonDB 21.04 is a long-term support version that will be supported until 2024.RonDB 21.10.1 is a beta release of RonDB 21.10 that is based on RonDB 21.04.1 and contains an additional 4 features and 2 bug fixes. RonDB 21.10.1 improves throughput in the DBT2 benchmark by 70% compared to RonDB 21.04.1 and improves Sysbench benchmarks by about 10%.
Release Notes for 21.04.1 is found here.
You can download a binary tarball of RonDB 21.04.1 here.
You can download a binary tarball of RonDB 21.10.1 here.
Hopsworks brings support for scale-out AI with the ExtremeEarth project which focuses on the most concerning issues of food security and sea mapping.
The Hopsworks Feature Store abstracts away the complexity of a dual database system, unifying feature access for online and batch applications.
In this blog we performed a set of microbenchmarks. In particular, we compare RonDB with ScyllaDB for instruction cache on separating threads.
This tutorial gives an overview of how to work with Jupyter on the platform and train a state-of-the-art ML model using the fastai python library.
HopsFS is an open-source scaleout metadata file system, but its primary use case is not Exabyte storage, rather customizable consistent metadata.
Learn how to train a ML model in a distributed fashion without reformatting our code on Databricks with Maggy, open source tool available on Hopsworks.
The first release of RonDB is now available. RonDB 21.04.0 brings new documentation, community support, 3x improved performance, bug fixes and new features.
Use open-source Maggy to write and debug PyTorch code on your local machine and run the code at scale without changing a single line in your program.
RonDB shows higher availability and the ability to handle larger data sets in comparison with Redis, paving the way to be the fastest key-value store available.
Learn how to design and ingest features, browse existing features, create training datasets as DataFrames or as files on Azure Blob storage.
RonDB is a managed key-value store with SQL capabilities. It provides the best low-latency, high throughput, and high availability database available today.
Connect the Hopsworks Feature Store to Amazon Redshift to transform your data into features to train models and make predictions.
Hopsworks now supports dynamic role-based access control to indexes in elasticsearch with no performance penalty by building on Open Distro for Elasticsearch.
Hopsworks is the world's first Enterprise Feature Store along with an advanced end-to-end ML platform.
Hopsworks is now available as a managed platform for Amazon Web Services (AWS) and Microsoft Azure with a comprehensive free tier.
A data warehouse is an input to the Feature Store. A data warehouse is a single columnar database, while a feature store is implemented as two databases.
Feature store as a new element and AI tool in the machine learning systems for the automotive industry.
Try out Maggy for hyperparameter optimization or ablation studies now on Hopsworks.ai to access a new way of writing machine learning applications.
Learn how to integrate Kubeflow with Hopsworks and take advantage of its Feature Store and scale-out deep learning capabilities.
How ExtremeEarth Brings Large-scale AI to the Earth Observation Community with Hopsworks, the Data-intensive AI Platform
Introducing the feature store which is a new data science tool for building and deploying better AI models in the gambling and casino business.
This is a guide to file formats for ml in Python. The Feature Store can store training/test data in a file format of choice on a file system of choice.
Datasets used for deep learning may reach millions of files. The well known image dataset, ImageNet, contains 1m images, and its successor, the Open Images...
When you train deep learning models with lots of high quality training data, you can beat state-of-the-art prediction models in a wide array of domains.
If you are employing a team of Data Scientists for Deep Learning, a cluster manager to share GPUs between your team will maximize utilization of your GPUs.
A Feature Store stores features. We go through data management for deep learning and present the first open-source feature store now in Hopsworks' ML Platform.
TLDR: Hopsworks, the data-intensive AI platform with a feature store, brings support for scale-out AI with Copernicus data and the H2020 ExtremeEarth project. Hopsworks is integrated with the Polar and FoodSecurity Thematic Exploitation Platforms (TEPs) on the CREODIAS infrastructure. Two use cases, polar and food security, have been developed by making use of the scale-out distributed deep learning support of Hopsworks and the PBs of data made available by CREODIAS and processed by Hopsworks and the TEPs .
This article is based on the paper “The ExtremeEarth software Architecture for Copernicus Earth Observation Data” included in the Proceedings of the 2021 conference on Big Data from Space (BiDS 2021) .
In recent years, unprecedented volumes of data are being generated in various domains. Copernicus, a European Union flagship programme, produces more than three petabytes (PB) of Earth Observation (EO) data annually from Sentinel satellites . However, current AI architectures making use of deep learning in remote sensing are struggling to scale in order to fully utilize the abundance of data.
ExtremeEarth is an EU-funded project that aims to develop use-cases that demonstrate how researchers can apply deep learning in order to make use of Copernicus data in the various European Space Agency (ESA) TEPs. A main differentiator of ExtremeEarth to other such projects is the use of Hopsworks, a Data-Intensive AI software platform with a Feature Store and tooling for horizontally scale-out learning. Hopsworks has successfully been extended as part of ExtremeEarth project to bring specialized AI tools to the EO data community with 2 use cases already developed on the platform with more to come in the near future.
Bringing together a number of cutting edge technologies, which deal from storing extremely large volumes of data all the way to performing scalable machine learning and deep learning algorithms in a distributed manner, and having them operate over the same infrastructure poses some unprecedented challenges. These challenges include, in particular, integration of ESA TEPs and Data and Information Access Service (DIAS) platforms with a data platform (Hopsworks) that enables scalable data processing, machine learning and deep learning on Copernicus data; development of very large training datasets for deep learning architectures targeting the classification of Sentinel images.
In this blog post, we describe both the software architecture of the ExtremeEarth project with Hopsworks as its AI platform centerpiece and the integration of Hopsworks with the other services and platforms of ExtremeEarth that make up for a complete AI with EO data experience.
There are several components that comprise the overall architecture with the main ones being the following.
Hopsworks. An open-source data-intensive AI platform with a feature store. Hopsworks can scale to the petabytes of data required by the ExtremeEarth project and provides tooling to build horizontally scalable end-to-end machine learning and deep learning pipelines. Data engineers and data scientists utilize Hopsworks’ client SDKs that facilitate AI data management, machine learning experiments, and productionizing serving of machine learning models.
Thematic Exploitation Platforms (TEPs). These are collaborative, virtual work environments providing access to EO data and the tools, processors, and Information and Communication Technology (ICT) resources required to work with various themes, through one coherent interface. TEPs address coastal, forestry, hydrology, geohazards, polar, urban themes, and food security themes. ExtremeEarth in particular is concerned with the polar and food security TEPs where the use cases also stem from. These use cases include building machine learning models for sea ice classification to improve maritime traffic as well as food crops and irrigation classification.
Data and Information Access Service (DIAS). To facilitate and standardize access to data, the European Commission has funded the deployment of five cloud-based platforms. They provide centralized access to Copernicus data and information, as well as to processing tools. These platforms are known as the DIAS, or Data and Information Access Services . ExtremeEarth software architecture is built on CREODIAS, a Cloud infrastructure platform adapted to the processing of big amounts of EO data, including an EO data storage cluster and a dedicated Infrastructure-as-a-Service (IaaS) Cloud infrastructure for the platform’s users. The EO data repository contains Sentinel-1, 2, 3, and 5-P, Landsat-5, 7, 8, Envisat, and many Copernicus Services data.
Figure 1 provides a high-level overview of the integration of the different components with each other. The components can be classified into four main categories(layers):
To provide a coherent environment for AI with EO data to application users and data scientists, the goal of the architecture presented here is to make most components transparent and simplify developer access by using well-defined APIs while making use of commonly used interfaces such as RESTful API. As a result, a key part of the overall architecture is how these different components can be integrated to provide a coherent whole. The APIs used for the full integration of the ExtremeEarth components via the inter-layer interfaces of the software platform are described below and also are illustrated in Figure 1.
Application users that interact with the TEPs effectively are the users of the AI products generated by the machine learning and deep learning pipelines developed by the data scientists in Hopsworks. Previously we described the integration of the various components. Figure 3 depicts the flow of events within this architecture.
In this blog, we have shown how Hopsworks has been integrated with various services and platforms in order to extract knowledge from AI and build AI applications using Copernicus and Earth Observation data. Already two use cases, sea ice classification (PolarTEP) and crop type mapping and classification (Food Security TEP ), have been developed using the aforementioned architecture by using the PBs of data made available by the Copernicus programme and infrastructure.
Note: This article was previously published by ExtremeEarth.
TLDR: The Hopsworks Feature Store abstracts away the complexity of a dual database system, unifying feature access for online and batch applications. We built a reliable and performant service to materialize features to the online feature store and to guarantee not only low-latency access but also access to the freshest possible feature values at serving time.
Enterprise Machine Learning models are most valuable when they are powering a part of a product by guiding user interaction. Oftentimes these ML models are applied to an entire database of entities, for example users identified by a unique primary key. An example for such an offline application, would be predictive Customer Lifetime Value, where a prediction can be precomputed in batches in regular intervals (nightly, weekly), and is then used to select target audiences for marketing campaigns. More advanced AI-powered applications, however, guide user interaction in real-time, such as recommender systems. For these online applications, some part of the model input (feature vector) will be available in the application itself, such as the last button clicked on, while other parts of the feature vector rely on historical or contextual data and have to be retrieved from a back end storage, such as the number of times the user clicked on the button in the last hour or whether the button is a popular button.
In this blog, we are going to dive into the details of the requirements of online applications and how the Hopsworks Feature Store abstracts away the complexity of a dual storage system.
While batch applications with (analytical) models are largely similar to the training of the model itself, requiring efficient access to large volumes of data that will be scored, online applications require low latency access to latest feature values for a given primary key (potentially, multi-part) which is then sent as a feature vector to the model serving instance for inference.
To the best of our knowledge there is no single database accommodating both of these requirements at high performance. Therefore, data teams tended to keep the data for training and batch inference in data lakes, while ML engineers built microservices to replicate the feature engineering logic in micro services for online applications.
This, however, introduces unnecessary obstacles for both Data Scientists and ML engineers to iterate quickly and significantly increases the time to production for ML models:
The Hopsworks Feature Store is a dual storage system, consisting of the high-bandwidth (low-cost) offline storage and the low-latency online store. The offline storage is a mix of Apache Hudi tables on our HopsFS file system (backed by S3 or Azure Blob Storage) and external tables (such as Snowflake, Redshift, etc), together , providing access to large volumes of feature data for training or batch scoring. In contrast, the online store is a low latency key value database that stores only the latest value of each feature and its primary key. The online feature store thereby acts as a low latency cache for these feature values.
In order for this system to be valuable for data scientists and to improve the time to production, as well as providing a nice experience for the end user, it needs to meet some requirements:
The Hopsworks Online Feature Store is built around four pillars in order to satisfy the requirements while scaling to manage large amounts of data:
We will cover each of these in detail in the following sections and provide some benchmarks for quantitative comparison.
Hopsworks is built from the ground up around distributed scaleout metadata. This helps to ensure consistency and scalability of the services within Hopsworks as well as the annotation and discoverability of data and ML artifacts.
Since the first release, Hopsworks has been using NDB Cluster (a precursor to RonDB) as the online feature store. In 2020, we created RonDB as a managed version of NDB Cluster, optimized for use as an online feature store.
However, in Hopsworks, we use RonDB for more than just the Online Feature Store. RonDB also stores metadata for the whole Feature Store, including schemas, statistics, and commits. RonDB also stores the metadata of the file system, HopsFS, in which offline Hudi tables are stored. Using RonDB as a single metadata database, we use transactions and foreign keys to keep the Feature Store and Hudi metadata consistent with the target files and directories (inodes). Hopsworks is accessible either through a REST API or an intuitive UI (that includes a Feature Catalog), or programmatically through the Hopsworks Feature Store API (HSFS).
With the underlying RonDB and the needed metadata in place, we were able to build a scale-out, high throughput materialization service to perform the updates, deletes, and writes on the online feature store - we simply named it OnlineFS.
OnlineFS is a stateless service using ClusterJ for direct access to the RonDB data nodes. ClusterJ is implemented as a high performance JNI layer on top of the native C++ NDB API, providing low latency and high throughput. We were able to make OnlineFS stateless due to the availability of the metadata in RonDB, such as avro schemas and feature types. Making the service stateless allows us to scale writes to the online feature store up and down by simply adding or removing instances of the service, thereby increasing or decreasing throughput linearly with the number of instances.
Let’s go through the steps needed to write data to the online feature store, which are numbered in the diagram below.
Each Dataframe updates a table called a feature group (there is a similar table in the offline store). The features in a feature group share the same primary key, which can be a composite primary key. Primary keys are tracked along with the rest of the metadata. As such, the Hopsworks Feature Store has a Dataframe API, that means that the result of your feature engineering should be a regular Spark, Spark Structured Streaming or Pandas Dataframe that will be written to the Feature Store. The APIs for writing to the Feature Store are almost identical for all three types of Dataframe. With a reference to the feature group object, you insert the Dataframe. The feature group has been configured on creation to either store the Dataframe to both the online and offline stores or to only one of them.
The rows of the Dataframe are encoded using avro and written to Kafka, running on Hopsworks. Each feature group has its own Kafka topic with a configurable number of partitions, and partitioning by primary key, which is necessary to guarantee the ordering of writes.
We use Kafka to buffer writes from Spark feature engineering jobs, as a large Spark cluster that writes directly to RonDB could overload RonDB, due to a lack of backpressure in the existing Spark JDBC driver. OnlineFS reads the buffered messages from Kafka and decodes them. Importantly, OnlineFS decodes only primitive feature types, whereas complex features such as embeddings are stored in binary format in the online feature store.
Now, OnlineFS can perform the actual upsert of the rows to RonDB using the ClusterJ API. Upserts are performed in batches (with a configurable batch size) to improve throughput.
Since all services in the steps of the pipeline have access to the same metadata, we are able to hide all complexity related to encoding and schemas from the user. Furthermore, all services involved are horizontally scalable (Spark, Kafka, OnlineFS) and due to our streaming-like setup, the process does not create unnecessary copies of the data, that is, there is no write amplification. This highly scalable setup is possible due to the availability of services like a schema registry, a X.509 certificate manager, and Kafka within Hopsworks. At all times, X.509 certificates are used for two-way authentication and TLS is used to encrypt network traffic.
In distributed systems, we often speak about transparency. A distributed system is transparent if it hides network access, and implementation specific knowledge from the developer. In the Hopsworks Feature Store, writing is done transparently through the same APIs, as mentioned before (1) no matter if it is a regular Spark, Spark Streaming or Pandas and (2) the system is responsible for updating both online and offline storage consistently.
The core abstractions in the HSFS library are the metadata objects representing feature groups, training datasets and features in the feature store. Our goal with HSFS was to enable developers to use their favourite languages and frameworks to engineer features. As we aligned on the Dataframe API, anything that is contained in a Dataframe can be written to the feature store. If you have existing ETL or ELT pipelines, which produce a Dataframe containing the features, you can write that Dataframe to the Feature Store by simply acquiring a reference to its feature group object and invoking `.insert()` with your Dataframe as a parameter. This can be called from a regularly scheduled job (using any orchestrator of your choice, alternatively, Hopsworks comes with Airflow, if you want an out-of-the-box orchestrator). But a feature group object can also be updated continuously by writing batches as Dataframes from a Spark structured streaming application.
Many existing Feature Stores do not have a representation for models. Hopsworks, however, introduced the Training Dataset abstraction to represent the set of features and the feature values used to train a model. That is, there is a one-to-one mapping between immutable training datasets and models, but a one-to-many relationship from the mutable feature groups to the immutable training datasets. You create a training dataset by joining, selecting and filtering features from feature groups. The training dataset includes metadata for the features, such as which feature group they came from, the commit-id(s) for that feature group, and the ordering of features in the training dataset. All of this information enables HSFS to recreate training datasets at a later point in time and to transparently construct feature vectors at serving time.
The clients of the online feature store are either applications that use ML models or model-serving infrastructure that enriches feature vectors with missing features. Hopsworks provides a JDBC based API to the online store. JDBC has the advantage of offering a high performance protocol, network encryption, authentication of the client, and access control. HSFS provides language level support for Python and Scala/Java. However, you can always fall back on using JDBC directly if your serving application runs in a different programming language or framework.
There are sysbench benchmarks for RonDB by Mikael Ronstrom (inventor of NDB Cluster and Head of Data at Logical Clocks, leading the RonDB team) and a comparative performance evaluation against Redis available. In this section we show the performance of the OnlineFS service, being able to handle and sustain high throughput in writing to the online feature store, as well as an evaluation of feature vector lookup latency and throughput on a typical managed RonDB setup within Hopsworks.
In this benchmark, Hopsworks is set up with 3xAWS m5.2xlarge (8 vCPU, 32 GB) instances (1 head, 2 worker). The workers are used by Spark for writing Dataframes to the online store. Additionally, the same workers are re-used as clients that perform the read operations on the Online Feature Store for read benchmark.
RonDB is set up with 1x AWS t3.medium (2 vCPU, 4 GB) instance as management node, 2x r5.2xlarge (8 vCPU, 64 GB) instances as data nodes, and 3x AWS c5.2xlarge (8 vCPU, 16 GB) instances for MySQL servers. This setup allows us to store 64GB of data in-memory in the online feature store with 2X replication. The MySQL servers provide the SQL interface to the online feature store, in this benchmark we did not saturate the RonDB data nodes fully, so one could potentially add more MySQL servers and clients to increase throughput beyond the levels shown here.
We benchmarked the throughput for writing to RonDB in the OnlineFS service. Additionally, we measured the time it takes to process a record from the moment it gets taken from the Kafka topic until it is committed to RonDB. For this benchmark we deployed two OnlineFS services, one on the head node and one on one of the MySQL server nodes.
We ran the experiments by writing 20M rows to the online feature store from a Spark application. After a short warm-up period the throughput of the two service instances stabilizes at ~126K rows/second for 11 features, ~90K rows/second for 51 features and for the largest feature vectors at ~60K rows/second. Due to its design, this can easily be scaled by adding more instances of the service.
Secondly, we report the time it takes to process the feature vectors within the OnlineFS service. This time does not include the time a record is waiting for processing in Kafka, the reason for that is that the waiting time depends highly on the number of Spark executors writing to Kafka. Instead, you should rely on the throughput numbers to compare them to your requirements.
The processing times are reported on a per row basis, but parts of the pipeline within OnlineFS are parallelized, for example rows are committed to RonDB in batches of 1000. With this setup we achieve p99 of ~250ms for 11 features with a row size of 948 bytes.
We benchmarked throughput and latency for different feature vector sizes in relation to an increasing number of clients performing requests in parallel. Note that the clients were split among the two worker nodes (each 8vCPU).
In this benchmark, every request contains one primary key value lookup (one feature vector). Throughput and latency scale linearly up to 16 clients while sustaining low latencies. With more than 16 clients we observed the hosts on which the clients are being run getting to their maximum CPU and network utilization. Furthermore, we did not see an over-utilization of the RonDB data nodes or the MySQL servers, which means we could further scale linearly by running the clients from larger worker instances or adding more worker hosts to run clients from.
In order to show that RonDB scales to many more key lookups per second, we run another benchmark in which each client requests feature vectors in batches of 100. As we can see the number of lookups still scales linearly, lookup throughput increases by 15x, while the latency per request increases only moderately.
Hopsworks comes with managed RonDB that provides a unified metadata store for both Hopsworks and the online feature. In this blog, we showed that a highly available two-node RonDB cluster (r5.2xlarge VMs) scales linearly to >250k ops/sec with feature vector lookups of 11 features of ~1KB in size and 7.5 ms p99 latency. Thus, Hopsworks provides the highest performance online feature store on the market today.
In our previous blog, we discussed automatic thread configuration in RonDB. In this blog we performed a set of microbenchmarks. In particular, we compare RonDB with ScyllaDB for instruction cache on separating threads. Then, we discuss tossing data between threads and latency in thread pipelines.
What we understand from the ScyllaDB description is that it is implemented as an asynchronous engine. ScyllaDB specifically mentioned that they didn’t want to introduce a Thread Pipeline. Instead, they seem to have introduced a Batch Pipeline where they execute one task type at a time. This clearly improves the use of the instruction cache.
The Batch Pipeline approach will have best latency when the system is at low load. When the system is at high load the batch size increases and the latency increases. With RonDB and its Thread Pipeline, the latency decreases as the load increases since the likelihood of the thread being awake is higher. Thus RonDB at high load acts as a set of CPUs that interact directly with small messages sent to instruct the receiver of what he is supposed to do. Thus at high load the overhead of switching to a new thread is negligible, there is a small extra cost to handle extra data for the messages, but the messages are small and thus this overhead is small compared to the extra latency introduced by having to wait for another batch to execute before my turn to execute comes.
Actually the Batch Pipeline model resembles the first thread model of NDB Cluster where everything was executed in a single thread. This thread received a batch of messages and executed each of those in FIFO order, the messages sent asynchronous messages to the next part of the code. This actually had exactly the same effect as seen in the Batch Pipeline model since all messages followed the same code path. Thus if 100 messages were received we first executed 100 messages in the transaction block and then 100 messages in the database blocks.
The RonDB model uses a normal FIFO scheduler in each thread and threads only execute a part of the functionality and the database part only executes queries on parts of the database. Thus we achieve both the benefits from the batch processing of similar messages in a pipeline and the division of work into different CPUs.
RonDB has the ability to execute a benchmark exactly in the fashion as the Batch Pipeline model. This happens when the RonDB data node executes using a single thread. Thus we can use RonDB to analyse how Batch Pipelines compare to Thread Pipelines in a microbenchmark.
In this microbenchmark we run a single RonDB data node without any replication, we run one benchmark program called flexAsynch. This benchmark program sends batches of Inserts, Updates, Reads and Deletes and measures the achieved throughput.
When a read is processed it will be executed in 4 steps, the first step is to receive the network message, the second step handles the transaction processing which ensures that we find the data read and write the correct parts of the data partitions. The third step does the actual read/write of the data, the fourth step sends the data back to the benchmark program.
In RonDB using the single-threaded setup the receive part will receive a batch of operations, each of those will be inserted to the job queue in FIFO order.
After executing the receive we will execute those messages that will all execute the transaction part. Each of those transaction part messages will generate a new asynchronous message to be executed by the data owning part. Thus although they are located in the same thread we will still execute all those messages in batches that will all execute the same code and thus achieving the efficiency of the instruction cache in the CPU.
Next all messages will be executed in the data owning part and each of those messages will put something into the send queue which will be the last step executed before we return to the next set of messages in the receive part. Thus in the single threaded configuration of RonDB we get exactly the behaviour of a Batch Pipeline.
The next step is to separate the receive part from the transaction execution part and data owning part. Thus in this case we will have a thread pipeline consisting of two threads.
The third step is to split the thread pipeline into 3 parts. The receive part, the transaction handling part and the data owning part.
The fourth step finally separates the sending part as well into a separate send thread.
All of these steps are easily achieved in RonDB by using a configuration parameter called ThreadConfig.
The expected result here is that the single-threaded variant will get the optimal throughput since it divides the execution based on batches and thus minimizes the instruction cache misses. It is expected that the Thread Pipeline will have the same effect, but spread the work to more than one CPU. In our threaded pipeline we can specify the size of the batches executed before we flush all messages to other threads and before we send them back to the NDB API program.
The results showed that the first step could handle around 750k operations per second. Splitting the receive thread from the other parts improved throughput to around 835k operations per second. The result shows that a fairly small part of the processing happens in the receive stage. The third step to divide the transaction processing step from the data owning part gave a much bigger impact. Here performance increased to 1.5M operations per second.
The fourth step was implemented by specifically disallowing the data owning part and the transaction processing part to not perform any sends. Thus all sending had to happen from a separate fourth thread. This step had very little impact, thus our adaptive send approach where send happens locally if the thread isn’t overloaded and otherwise it is performed by a less loaded thread is a good approach. This means that in RonDB there will be send threads, but for the most part the sending will be done by execution threads that are not so busy. Thus we can use send load to ensure that the CPUs are used in a balanced manner.
What we derive from those numbers is that a Thread Pipeline and a Batch Pipeline has equal efficiency. However the Thread Pipeline provides a lot of benefits. For receive threads it means that the application doesn’t have to find the thread where data resides. This simplifies the NDB API greatly. We have made experiments where the NDB API actually had this possibility and where the transaction processing and data owning part was colocated in the same thread. This had a small improvement of latency at low loads, but at higher loads the thread pipeline was superior in both throughput and latency and thus this idea was discarded.
The Thread Pipeline also improves scalability. As shown here we can have a single thread handling the data owning part and scale this to twice as many operations per second without having to split the data partitions.
We also did an experiment where I used 2 receive threads, 2 transaction processing threads and 2 data owning parts and this scaled nicely to 3M operations per second. What we have noticed is that in larger thread setups it is important that we have sufficient CPU capacity in the receive threads and the transaction processing parts to ensure that the data owning parts execute in an optimal manner. Using 1 receive thread in this setup provides almost the same results, but this architecture gains from having receive threads that can respond quickly such that they can keep the Thread Pipelines busy.
In RonDB there is a configuration parameter called SchedulerResponsiveness. This specifies whether to optimise for latency or for throughput. By increasing the Scheduler Responsiveness, we decrease the batch execution sizes before we flush messages to other threads and other nodes.
Thread Pipelining makes data partitioning transparent to APIs
From a performance perspective the Thread Pipeline is equally efficient to a Batch Pipeline. However the thread pipeline gives greater flexibility to the architecture. This flexibility is used in RonDB to ensure that many database connections share the same TCP/IP socket. This gives great batching improvements.
Since receive threads execute on behalf of all threads in the data nodes, the API doesn’t have to route the messages to a specific receive thread. Thus normal round robin can be used to spread load if there are several connections from the API to the RonDB data node.
What we showed here is that the Thread Pipeline enables us to reach the same performance with 2 database threads compared to the 4 that would have been needed if all functionality would be gathered in a single thread. This means that we can handle the same load with half as many data partitions. Decreasing the number of partitions enables us to more efficiently range scans that don’t use the partition key. We also introduced query threads that ensure that one data partition can be read from multiple query threads, thus even decreasing the need to split data into many partitions. Thus RonDB achieves a nice balance between splitting data partitions for higher write throughput and higher parallelism in range scans with efficiency in performing the range scans.
As mentioned above the Thread Pipeline enables us to route messages from the transaction part to the data owning part. But in RonDB we also have query threads that can read data from several data owning parts. This provides the possibility also for routing based on load in the various threads. Thus thread pipelining not only provides a very efficient approach to executing database requests, it also enables us to build a highly scalable data node that can process many millions of concurrent requests with internal load regulation.
So based on this analysis we can conclude that the Thread Pipeline and the Batch Pipeline both provide very similar benefits in CPU efficiency. RonDB has seen IPCs of 1.27 in the data owning parts and ScyllaDB reports IPC of 1.31 in their Batch Pipeline. However the Thread Pipeline at the same time delivers many advantages from a software architecture perspective. It enables transparency of data partitioning, it enables us to decrease the amount of data partitioning required to achieve the performance objectives and it enables us to build very scalable data nodes. RonDB data nodes have shown linear scaling to at least 32 CPUs and it continues to scale to many more CPUs after that and the software is prepared to handle up to 1024 CPUs.
The only data transported between threads is the protocol messages. The actual data in the database is handled by the data owning thread. There is one part where the passing of data between threads matters, and this is for sending the message. Sending network messages has a significant overhead in a distributed database and can also cause extra CPU cache misses. RonDB has an additional design requirement that requires collecting messages from several threads to be sent on one socket. Thus sending network messages is a bit special in RonDB. However often we can avoid the extra cost of involving another thread for sending by sending when we have completed a batch of processing.
There is also an adaptive process that moves the CPU processing of sends to the threads with lowest load. Thus in a highly loaded data node we will avoid the sending in overloaded threads to ensure that we can process all the requests coming in.
In addition RonDB uses special handling of large messages, where the large part of the message is transported in a special section. This section is only read by the receiving thread (usually the data owning thread), thus there is no extra cost in passing this part through other threads (usually the transaction processing thread). When received by RonDB from the API this is a simple interpreted program. Messages sent back to the API are sent directly from the data owning part to the API, this message doesn't pass through any Thread Pipeline. Only messages that are part of transaction handling are sent back to the transaction processing part.
A question that one could ask is whether the latency is negatively affected by the thread pipeline. In ScyllaDB they report that the mean execution time for a request is 282 microseconds. This is an eternity when compared to the latency overhead of a thread pipeline. Even if all threads are sleeping when the request arrives, the wakeup latency of a thread is around 5-10 microseconds. So in the absolute worst case there is an added latency of around 15-25 microseconds to execute this thread pipeline.
However in reality the latency at 0% usage is seldom interesting. The latency of interest is the latency that you get when the load starts to reach the highest load. In this case the extra latency that the thread pipeline comes close to zero since most of the threads are awake in this situation.
The total execution time of a key-value lookup in RonDB data nodes is around 2-3 microseconds and writes around 5 microseconds. The thread pipeline means that each thread can execute at maximum speed all the time. An expected latency in a loaded RonDB data node would be on the order of 50 microseconds and thus the overhead of wakeup latencies in this case will be no more than 10% added latency. Compared to the advantages that the Thread Pipeline provides, it is a good idea.
Actually this behaviour is of great use in that RonDB becomes more efficient as load increases. Thus if RonDB is overloaded it will become more efficient since the threads will continuously be busy executing the messages and batch effects will increase as the load increases.
The real performance in a database is not determined by the number of instructions per second. The performance is determined by the number of instructions to execute for an operation divided by the number of instructions executed per cycle. RonDB reaches an IPC of the most critical database code of 1.27. Many other DBMSs have reported numbers in the order of 0.25-0.5. Thus it is clear that the separation of code into a thread pipeline works well. At the same time we have used many other tricks in RonDB to achieve the desired performance.
One such trick is how we designed our protocols. Most internet protocols are heavy CPU consumers. I had a master thesis student design a generic protocol engine that was intended to be used in NDB. This was probably the most important master thesis project I led. It showed exactly how to NOT do it. When the project was ready and I saw the resulting code, I realised that there is no way to get that code to become efficient. Thus NDB protocols are based on messages where the message data are residing in fixed positions. Thus no protocol processing is required, only copying from the message to data structures used by internal operations.
The code to process those messages can have a lot of instructions, but it is code without any logic, basically just processing fixed data move instructions for the most part. Thus this code is extremely efficient on modern CPUs. In addition the code in RonDB provides hints to the C++ compiler which branches are the normal path through the code. This is important since RonDB is implementing failfast, thus the code is filled with extra assert statements (implemented using a macro called ndbrequire) that will crash the data node immediately if any data is not consistent. All these asserts are also executed in production code to ensure that we don’t continue executing in an incorrect program state. In addition we also constantly generate crash information in the code to be able to understand exactly what happens if there is a failure.
Thus with all the talk about low latency and high throughput, it is important to understand that the highest priority in RonDB is to achieve the highest possible availability.
So will our approach decrease the amount of CPU instructions? The answer is no, there will be extra instructions to process the protocol between threads and at times there will be extra CPU instructions to handle context switches. So how can we gain from executing more CPU instructions?
The answer to this question is a variant of Divide and Conquer. By separating the code into multiple processors each processor will have less code to execute although the total number of instructions and the total amount of code increases. So e.g. if we start out with 10.000 instructions using 40 kB of code and split this into two functional units we will handle e.g. 5.500 instructions and 25 kB of code. Thus since each thread uses its own CPU we actually can increase the instruction speed significantly since our application will fit nicely into a 32kB instruction cache and this will improve performance more than the 10% extra instruction costs.
The thread pipeline always starts with the receive thread. This thread will receive the request from the network. By providing this as a separate thread we provide the ability to colocate the operating system network processing with the processing by the receive thread itself. Linux provides the ability to handle the network processing in the same CPU as the recv call is made in. This enables very efficient handling of the receive part of the network processing.
There is another reason why the receive part is required to be a separate thread in RonDB. This is because one socket can be used to carry thousands of simultaneous connections to the data node. Thus the receive part will break the received messages into smaller messages and these smaller messages will be transported on memory channels to the appropriate thread. Thus one TCP/IP receive that receives say 32 kByte of data can easily be broken into 300 separate messages to be executed by different threads.
You can think about this thread pipeline as an example of a programmer using an assembly line. By dividing the tasks into smaller tasks we decrease the size of the program each task will have to execute. The total program grows a little bit, but since each thread will only see the part it is responsible for it will have very good code locality.
If you think of CPUs as workers in the assembly line, it is also important to not switch workers. To avoid that RonDB uses CPU locking to avoid that the OS all of a sudden decides to move the execution to another CPU. The cost of rebuilding the CPU caches in a new CPU can take quite a few microseconds. In our case even most context switches avoid this cost since most of the time there are no other tasks waiting to be executed on this CPU since we have separated the heavy CPU consumers into different CPUs. Thus even after returning from a context switch we will have fairly hot CPU caches.
The thread pipeline also splits the data cache usage. The LDM threads that handle database operations have no data stored on the global transaction state and have no knowledge about the global transaction state.
Thus the thread pipeline provides both an improved use of the instruction CPU cache as well as the CPU data caches.
From the micro benchmarks we conclude that splitting up the execution into functional units in a thread pipeline can provide significant benefits. It is all about using the CPU caches, both data and instruction in the most efficient manner. However, one needs to find ways to use the free CPU time that can happen if load is unbalanced. This is currently an active development in RonDB that will bring even greater benefits to RonDB.
If you would like to replicate our benchmarks, there are three ways to use RonDB:
Sysbench is a tool to benchmark and test open source databases. We have integrated Sysbench into the RonDB installation. This makes it extremely easy to run benchmarks with RonDB. This blog will describe the use of these benchmarks in RonDB. These benchmarks were executed with 1 cluster connection per MySQL Server. This limited the scalability per MySQL Server to about 12 VCPUs. Since we executed those benchmarks we have increased the number of cluster connections per MySQL Server to 4 providing scalability to at least 32 VCPUs per MySQL Server.
As preparation to run those benchmarks we have created a RonDB cluster using the Hopsworks framework that is currently used to create RonDB clusters. In these tests all MySQL Servers are using the c5.4xlarge VM instances in AWS (16 VCPUs with 32 GB memory). We have a RonDB management server using the t3a.medium VM instance type. We have tested using two different RonDB clusters, both have 2 data nodes. The first test is using the r5.4xlarge instance type (16 VCPUs and 128 GB memory) and the second test uses the r5n.8xlarge (32 VCPUs and 256 GB memory). It was necessary to use r5n class since we needed more than 10 Gbit/second network bandwidth for the OLTP RW test with 32 VCPUs on data nodes.
In the RonDB documentation you can find more details on how to set up your own RonDB cluster in either our managed version (currently supporting AWS) or using our open source shell scripts to set up a cluster (currently supporting GCP and Azure).
The graph below shows the throughput results from the larger data nodes using 8-12 MySQL Server VMs. Now in order to make sense of these numbers we will explain a bit more about Sysbench and how you can tweak Sysbench to serve your purposes for testing RonDB.
The Sysbench OLTP RW benchmark consists of 20 SQL queries. There is a transaction, this means that the transaction starts with a BEGIN statement and it ends with a COMMIT statement. After the BEGIN statement follows 10 SELECT statements that selects one row using the primary key of the table. Next follows 4 SELECT queries that select 100 rows within a range and either uses SELECT DISTINCT, SELECT … ORDER BY, SELECT or SELECT sum(..). Finally there is one INSERT, one DELETE and 2 UPDATE queries.
In Pseudo code thus:
Repeat 10 times: SELECT col(s) from TAB where PK=pk
SELECT col(s) from TAB where key >= start AND key < (start + 100)
SELECT DISTINCT col(s) from TAB where key >= start AND key < (start + 100)
SELECT col(s) from TAB where key >= start AND key < (start + 100) ORDER BY key
SELECT SUM(col) from TAB where key >= start AND key < (start + 100)
INSERT INTO TAB values (....)
UPDATE TAB SET col=val WHERE PK=pk
UPDATE TAB SET col=val WHERE key=key
DELETE FROM TAB WHERE PK=pk
This is the standard OLTP RW benchmark.
Now I will describe some changes that the Sysbench installation in RonDB can handle. To understand this we will start by showing the default configuration file for Sysbench.
# Software definition
# Storage definition (empty here)
# MySQL Server definition
# NDB node definitions (empty here)
# Benchmark definition
In this configuration file we provide the pointer to the RonDB binaries, we provide the type of benchmark we want to execute, we provide the password to the MySQL Servers, we provide the number of threads to execute in each step of the benchmark. There is also a list of IP addresses to the MySQL Servers in the cluster and finally we provide the number of instances of Sysbench we want to execute.
This configuration file is created automatically by the managed version of RonDB. This configuration file is available in the API nodes you created when you created the cluster. They are also available in the MySQL Server VMs if you want to test running with a single MySQL Server colocated with the application.
The default setup will run the standard Sysbench OLTP RW benchmark with one sysbench instance per MySQL Server. To execute this benchmark the following steps are done:
Log in to the API node VM where you want to run the benchmark from. The username is ubuntu (in AWS). Thus log in using e.g. ssh ubuntu@IP_address. The IP address is the external IP address that you will find in AWS where your VM instances are listed.
After successfully being logged in you need to log into the mysql user using the command:
sudo su - mysql
Move to the right directory
Execute the benchmark
bench_run.sh --default-directory /home/mysql/benchmarks/sysbench_multi
As you will discover there is also a sysbench_single, dbt2_single, dbt2_multi directory. These are setup for different benchmarks that we will describe in future papers. sysbench_single is the same as sysbench_multi but with only a single MySQL Server. This will exist also on MySQL Server VMs if you want to benchmark from those. Executing a benchmark from the sysbench machine increases latency since it represents a 3-tiered setup whereas executing sysbench in the MySQL Server represents a 2-tiered setup and thus the latency is lower.
If you want to study the benchmark in real-time repeat Step 1, 2 and 3 above and then perform the following commands:
tail -f oltp_rw_0_0.res
This will display the output from the first sysbench instance that will provide latency numbers and throughput of one of the sysbench instances.
When the benchmark has completed the total throughput is found in the file:
The configuration for sysbench_multi is found in:
Thus if you want to modify the benchmark you can edit this file.
In order to modify this benchmark, first you can decide on how many SELECT statements to retrieve using the primary key that should be issued. The default is 10. To change this add the following line in autobench.conf:
This will change such that instead 5 primary key SELECTs will be issued for each transaction.
Next, you can decide that you want those primary key SELECTs to retrieve a batch of primary keys. In this case the SELECT will use IN (key1, key2,,, keyN) in the WHERE clause. To use this set the number of keys to retrieve per statement in SB_USE_IN_STATEMENT. Thus to set this to 100 add the following line to autobench.conf.
This means that if SB_POINT_SELECTS is set to 5 and SB_USE_IN_STATEMENT is set to 100 there will be 500 key lookups performed per Sysbench OLTP transaction.
Next, it is possible to set the number of range scan SELECTs to perform per transaction. So to e.g. disable all range scans we can add the following lines to autobench.conf.
Now, it is also possible to modify the range scans. I mentioned that the range scans retrieves 100 rows. The number 100 is changeable through the configuration parameter SB_RANGE_SIZE.
The default behaviour is to retrieve all 100 rows and send them back to the application. Thus no filtering. We also have an option to perform filtering in those range scans. In this case only 1 row will be returned, but we will still scan the number of rows specified in SB_RANGE_SIZE. This feature of Sysbench is activated through adding the following line to autobench.conf:
Finally it is possible to remove the use of INSERT, DELETE and UPDATEs. This is done by changing the configuration parameter SYSBENCH_TEST from oltp_rw to oltp_ro.
There are many more ways to change the configuration of how to run Sysbench, but these settings are enough for this paper. For more details see the documentation of dbt2-0.37.50, also see the Sysbench tree
In our benchmarking reported in this paper we used 2 different configurations. Later, we will report more variants of Sysbench testing as well as other benchmark variants.
The first is the standard Sysbench OLTP RW configuration. The second is the standard benchmark but adding SB_USE_FILTER=”yes”. This was added since the standard benchmark becomes limited by the network bandwidth using r5.8xlarge instances for the data node. This instance type is limited to 10G Ethernet and it needs almost 20 Gb/s in networking capacity with the performance that RonDB delivers. This bandwidth is achievable using the r5n instances.
Each test of Sysbench creates the tables and fills them with data. To have a reasonable execution time of the benchmark each table will be filled with 1M rows. Each sysbench instance will use its own table. It is possible to set the number of rows per table, it is also possible to use multiple tables per sysbench instance. Here we have used the default settings.
The test runs are executed for a fairly short time to be able to test a large variety of test cases. This means that it is expected that results are a bit better than expected. To see how results are affected by running for a long time we also ran a few select tests where we ran a single benchmark for more than 1 hour. The results are in this case around 10% lower than the numbers of shorter runs. This is mainly due to variance of the throughput that is introduced by the execution of checkpoints in RonDB. Checkpoints consume around 5-10% of the CPU capacity in the RonDB data nodes.
In all tests set up here we have started the RonDB cluster using the Hopsworks infrastructure. In all tests we have used c5.4xlarge as the VM instance type for MySQL Servers. This VM has 16 VCPUs and 32 GB of memory. This means a VM with more or less 8 Intel Xeon CPU cores. In all tests there are 2 RonDB data nodes, we have tested with 2 types of VM instances here, the first is the r5.4xlarge which has 16 VCPUs with 128 GB of memory. The second is the r5n.8xlarge which has 32 VCPUs and 256 GB of memory. In the Standard Sysbench OLTP RW test the network became a bottleneck when using r5.8xlarge. These VMs can use up to 10 Gb/sec, but in reality we could see that some instances could not go beyond 7 Gb/sec, when switching to r5n.8xlarge instead this jumped up to 13Gb/sec immediately, so clearly this bottleneck was due to the AWS infrastructure.
To ensure that the network bottleneck was removed we switched to using r5n.8xlarge instances instead for those benchmarks. These instances are the same as r5.8xlarge except that they can use up to 25 Gb/sec in network bandwidth instead of 10Gb/sec.
The first test we present here is the standard OLTP RW benchmark. When we run this benchmark most of the CPU consumption happens in the MySQL Servers. Each MySQL Server is capable of processing about 4000 TPS for this benchmark. The MySQL Server can process a bit more if the responsiveness of the data node is better, this is likely to be caused by the CPU caches being hotter in that case when the response comes back to the MySQL Server. Two 16 VCPU data nodes can in this case handle the load from 4 MySQL Servers, adding a 5th can increase the performance slightly, but not much. We compared this to 2 data nodes using 32 VCPUs and in principle the load these data nodes could handle was doubled.
The response time was very similar in both cases, at extreme loads the larger data nodes had more latency increases, most likely due to the fact that we got much closer to the limits of what the network could handle.
The top number here was 34870 at 64 threads from 10 MySQL Servers. In this case 95% of the transactions had a latency that was less than 19.7 ms, this means that the time for each SQL query was below 1 millisecond. This meant that almost 700k SQL queries per second were executed. These queries reported back to the application 14.5M rows per second for the larger data nodes, most of them coming from the 4 range scan queries in the benchmark. Each of those rows are a bit larger than 100 bytes, thus around 2 GByte per second of application data is transferred to the application (about 25% of this is aggregated in the MySQL when using the SUM range scan).
Given that Sysbench OLTP RW is to a great extent a networking test we also wanted to perform a test that performed a bit more processing, but reporting back a smaller amount of rows. We achieved this by setting SB_USE_FILTER=”yes” in the benchmark configuration file. This means that instead of each range scan SELECT reporting back 100 rows, it will read 100 rows and filter out 99 of them and report only 1 of the 100 rows. This will decrease the amount of rows to process down to about 1M rows per second. Thus this test is a better evaluator of the CPU efficiency of RonDB whereas the standard Sysbench OLTP RW is a good evaluator of RonDBs ability to ship tons of rows between the application and the database engine.
At first, we wanted to see the effect the number of MySQL servers had on the throughput in this benchmark. We see the results of this in the image above. We see that there is an increase in throughput going from 8 to 12 MySQL Servers. However the additional effect of each added MySQL Server is diminishing. There is very little to gain going beyond 10 MySQL Servers. The optimal use of computing resources is most likely achieved around 8-9 MySQL Servers.
Adding additional MySQL servers also has an impact on the variability of the latency. So probably the overall best fit here is to use about 2x more CPU resources on the MySQL Servers compared to the CPU resources in the RonDB data nodes. This rule is based on this benchmark and isn’t necessarily true for another use case.
The results with the smaller data nodes, r5.4xlarge is the red line that used 5 MySQL Servers in the test.
The rule definitely changes when using the key-value store APIs that RonDB provides. These are at least 100% more efficient compared to using SQL.
A key-value store needs to be a LATS database (low Latency, high Availability, high Throughput, Scalable storage). In this paper we have focused on showing Throughput and Latency. Above is the graph showing how latency is affected by the number of threads in the MySQL Server.
Many applications have strict requirements on the maximum latency of transactions. So for example if the application requires response time to be smaller than 20 ms than we can see in the graph that we can use around 60 threads towards each MySQL Server. At this number of threads r5.4xlarge delivers 22500 TPS (450k QPS) and r5n.8xlarge delivers twice that number, 45000 TPS (900k QPS).
The base latency in an unloaded cluster is a bit below 6 milliseconds. This number is a bit variable based on where exactly the VMs are located that gets started for you. Most of this latency is spent in latency on the networks. Each network jump in AWS has been reported to be around 40-50 microseconds and one transaction performs around 100 of those network jumps in sequence. Thus almost two-thirds of the base latency comes from the latency in getting messages across. At higher loads the queueing waiting for the message to be executed becomes dominating. Benchmarks where everything executes on a single computer has base latency around 2 millisecond per Sysbench transaction which confirms the above calculations.
TLDR: Hopsworks is the Data-Intensive AI platform with a Feature Store for building complete end-to-end machine learning pipelines. This tutorial will show an overview of how to work with Jupyter on the platform and train a state-of-the-art ML model using the fastai python library. Hopsworks provides Jupyter as a service in the platform, including kernels for writing PySpark/Spark and pure Python code. With an intuitive service to install Python libraries covered in a previous blog and access to a Jupyter notebook, getting started with your favourite ML library requires little effort in Hopsworks.
Jupyter provides an integrated development environment (IDE) allowing users to seamlessly mix code with visualization and comments, making it not just handy for prototyping, but also for visualization and educational purposes. In recent years, it has become a favourite tool employed by the many, used for data wrangling, data mining, statistical modeling, visualization, machine learning and the notebooks themselves scheduled to run in production at tech leaders such as Netflix.
The Hopsworks platform ships with Jupyter as one of the integrated components. Having already been pre installed and configured to work with Spark and PySpark kernels, in addition to the Python kernel, getting started with writing your notebooks and scheduling them in production on a Hopsworks cluster is straightforward. The Hopsworks installation also includes a Miniconda environment with the most popular libraries you can find in a data scientists toolkit, such as TensorFlow, PyTorch and scikit-learn.
In this tutorial we will describe how to work with a Jupyter notebook in the Hopsworks platform. As an example, we will demonstrate how to install fastai, a library that provides high-level components for building and training ML models to get state-of-the-art deep learning results, clone a set of notebooks from a git repository and train a model on a P100 GPU.
To follow this tutorial you should have a Hopsworks instance running on https://hopsworks.ai. You can register for free, without providing credit card information, and receive USD 4000 worth of free credits to get started. The only thing you need to do is to connect your cloud account.
The first step in the tutorial is to install the fastai library. To get started, navigate to the Python service to install the fastai library from PyPi as shown in the example below. There are many different approaches to installing the library but in this instance we install the latest version of the fastai and nbdev package from PyPi, required to run the first notebook in the fastai course.
In the Jupyter service page there are three different modes that can be configured.
Firstly, there is a Python tab, in which configuration for the Python kernel, such as Memory/Cores/GPUs is set and optionally a git repository can also be configured that should be cloned when Jupyter starts up. This is the kernel that we are going to use in this tutorial.
Secondly, in the Experiments tab the PySpark kernel is configured. If you want to enable all the features in the plattform regarding, experiment tracking, hyperparameter optimization, distributed training. See HopsML for more information on the Machine Learning pipeline.
Thirdly, for general purpose notebooks, select the Spark tab and run with Static or Dynamic Spark Executors on Spark or PySpark.
The image below shows the configuration options set for the Python kernel. As working with larger ML models can be memory intensive make sure you are configuring the Memory for the kernel to be at least 8GB, then set GPUs to 1 to allocate a GPU that should be accessible for the kernel and set the git configuration to clone the fastai git repository https://github.com/fastai/fastai.git to get access to the notebooks.
Once the configuration has been entered for the Python kernel, press the button on the top that says JupyterLab to start the Notebook Server. Keep in mind that it may take some time as resources need to be allocated for the Notebook Server and to clone the git repository. The image below demonstrates the process of starting Jupyter.
The Jupyter Notebook Server will now have been allocated a GPU which you can use in the Python kernel. To check the type and specifications of the GPU, open a new terminal inside Jupyter and run nvidia-smi. We can see that in this instance we have access to a P100 NVIDIA GPU.
Now you’re all set to start following the course material that fastai provides. To make sure the GPU is being utilized you can leave a terminal window open and run nvidia-smi -l 1, which will print out the GPU utilization every second while you are running the training in the notebook.
In the example below, the first notebook lesson1-pets.ipynb in the fastai course is executed.
Hopsworks is available both on AWS and Azure as a managed platform. Visit hopsworks.ai to try it out.
TLDR; A new class of hierarchical distributed file system with scaleout metadata has taken over at Google, Facebook, and Microsoft that provides a single centralized file system that manages the data for an entire data center, scaling to Exabytes in size. The common architectural feature of these systems is scaleout metadata, so we call them scaleout metadata file systems. Scaleout metadata file systems belie the myth that hierarchical distributed file systems do not scale, so you have to redesign your applications to work with object stores, and their weaker semantics. We have built a scaleout metadata file system, HopsFS, that is open-source, but its primary use case is not Exabyte storage, rather customizable consistent metadata for the Hopsworks Feature Store. Scaleout metadata is also the key technology behind Snowflake, but here we stick to file systems.
Google, Microsoft, and Facebook have been pushing out the state-of-the-art in scalable systems research in the last 15 years. Google has presented systems like MapReduce, GFS, Borg, and Spanner. Microsoft introduced CosmosDB, Azure Blob Storage and federated YARN. Facebook has provided Hive, Haystack, and F4 systems. All of these companies have huge amounts of data (Exabytes) under management, and need to efficiently, securely, and durably store that data in data centers. So, why not unify all storage systems within a single data center to more efficiently manage all of its data? That’s what Google and Facebook have done with Colossus and Tectonic, respectively. The other two scaleout metadata file systems covered here, ADLSv2 and HopsFS, were motivated by similar scalability challenges but, although they could be, they are typically not deployed as data center scale file systems, just as scalable file systems for analytics and machine learning.
First generation hierarchical distributed file systems (like HDFS) were not scalable enough in the cloud, motivating the move to object stores (like S3) as the cloud-native storage service of choice. However, the move to object stores is not without costs. Many applications need to be rewritten as the stronger POSIX-like behaviour of hierarchical file systems (atomic move/rename, consistent read-after-writes) has been replaced by weakened guarantees in object stores. In particular, data analytics frameworks traditionally rely on atomic rename to provide atomic update guarantees for updating columnar data stores. The lack of atomic rename in S3 has been one of the motivations for the introduction of new columnar store frameworks for analytics and ML, such as Delta Lake, Apache Hudi, and Apache Iceberg that provide ACID guarantees for updating tables over object stores. These frameworks add metadata to files in the object store to provide the ACID guarantees, but their performance lags behind systems built on mutable scaleout metadata, underpinning columnar data stores such as Snowflake and BigQuery.
Hierarchical file systems typically provide well-defined behaviour (a POSIX API) for how a client can securely create, read, write, modify, delete, organize, and find files. The data in such file systems is stored in files as blocks or extents. A file is divided up into blocks, and distributed file systems spread and replicate these blocks over many servers for improved performance (you can read many blocks in parallel from different block servers) and high availability (failure of a block server does not cause the file system to go down, as replicas of that block are still available on other block servers).
However, the data about what files, directories, blocks, and file system permissions are in the system have historically been stored in a single server called the metaserver or namenode. We call this data about the file system objects metadata. In file systems like HDFS, the namenode stores its metadata in-memory to improve both latency and throughput in the number of metadata operations it can support per second. Example metadata operations are: create a directory, move or rename a file or directory, change file permissions or ownership. Operations on files and some operations on directories (such as `rm -rf`) require both updates to metadata and to the blocks stored on the block servers.
As the size of data under management by distributed file systems increased, it was quickly discovered that metadata servers became a bottleneck. For example, HDFS could scale to, at a push, a Petabyte, but not handle more than 100K reads/sec and only a few thousand writes/sec.
It has long been desired to re-architect distributed file systems to shard their metadata across many servers to enable them to support (1) larger volumes of metadata and (2) more operations/second. But it is a very hard problem. Read here about the contortions Uber applies to get its HDFS’ namenode to scale instead of re-designing a scaleout metadata layer from scratch.
When sharding the state of the metadata server over many servers, you need to make decisions about how to do it. Google used its existing BigTable key-value store to store Colossus’ metadata. Facebook, similarly, chose the ZippyDB key-value store for Tectonic. Microsoft built their own Replicated State Library - Hekaton Ring Service (RSL-HK) to scale-out ADLS’ metadata. The RSL-HK ring architecture combines Paxos-based metadata with Hekaton (in-memory engine from SQL Server). HopsFS used NDBCluster (now RonDB) to scale out its metadata.
The capabilities of these underlying storage engines are reflected in the semantics provided by the higher level file systems. For example, Tectonic and (probably) Colossus do not support atomic move of files from any directory to any other directory. Their key-value stores do not support agreement protocols across shards (only within a shard). So, at the file system level, you introduce an abstraction like a file system volume (Tectonic calls them tenants), and users then know they can perform atomic rename/move within that volume, but not across volumes. Google solves this problem at a higher layer for structured data with Spanner by implementing two-phase commit transactions to ensure consistency across shards. In contrast, RSL-HK Ring by Microsoft and RonDB by Logical Clocks support cross-shard transactions that enable both ADLSv2 and HopsFS to support atomic rename/move between any two paths in the file system.
To put this in database terms, the consistency models provided by the scaleout metadata file systems are tightly coupled to the capabilities provided by the underlying metadata store. If the store does not support cross-partition transactions - consistent operations across multiple shards, you will not get strongly consistent cross-partition file system operations. For example, if the metadata store is a key-value store, where each shard typically maintains strongly consistent key-value data using Paxos. But Paxos do not compose - you cannot run Paxos between two shards that themselves maintain consistency using Paxos. In contrast, RonDB supports 2-phase commit (2PC) across shards, enabling strongly consistent metadata operations both within shards and across shards.
Once a scaleout metadata storage layer is in place, stateless services can be used to provide access control and implement background maintenance tasks like maintaining the durability and availability of data, disk space balancing, and repairing blocks.
We can see that Hadoop File System APIs are still popular, as they model the contents of a filesystem as a set of paths that are either directories, symbolic links, or files, but address the challenge of scalability by restricting the POSIX-like semantics with append-only writers (there is no support for writing at random offsets in files).
With a scaleout metadata file system, you can have many more concurrent clients, leading to the well-known problem of hotspots - overloaded read/writes that are handled by a single shard. For example, Tectonic, ADLS, and HopsFS all ensure that objects (files/directories) in a directory are co-located in the same shard for efficient low latency directory listing operations. However, if the directory contains millions of files, such an operation can overload the threads responsible for handling operations on that shard. HopsFS and Tectonic randomly spread independent directories across shards to prevent hotspots, while ADLS supports range partitioning. Another well-known technique from object-stores, like S3, is used by ADLS - paged enumeration of directories. This requires clients to perform many iterative operations to list all objects in a large directory, but enables client quotas to kick in throttle and clients before they overload a shard.
Blocks are a logical unit of storage that hides the complexity of raw data storage and durability from the upper layers of the filesystem. In earlier generations of distributed file systems, such as HDFS, full replicas of blocks were stored at different data nodes to ensure high availability of file blocks. However, object stores and scaleout metadata file systems have eschewed full replicas and instead ensure high availability of file blocks using Reed-Solomon (RS) Coding. RS-encoded blocks provide higher availability guarantees and lower storage overhead, but with the disadvantage of more CPU and network bandwidth required to recover lost blocks. Given the continued growth in network bandwidth and available CPU cycles, this tradeoff is favorable.
There is a general trend towards smaller blocks, enabling faster recovery of failed blocks and faster availability of blocks to readers, but the cost is the need for more metadata storage capacity and higher available throughput in ops/sec at the metadata service.
Both Colossus and Tectonic provide rich clients that can customize the types of blocks and RS coding needs, depending on the workload needed by the client. For example, blob storage requires frequent appends and is handled differently from writing tabular data. Although neither Tectonic or Colossus discussed the block sizes they support, it is safe to assume that they support blocks all the way down to a few MBs in size. ADLSv2 stores its block data in Azure Blob Storage (ABS). HopsFS, the managed service on Hopsworks, also stores its blocks as objects in object storage (S3 on AWS and ABS on Azure). On premises, HopsFS stores its blocks replicated as fixed-size files replicated across data nodes.
When your ambition is to store data for the entire data center, you need to support many different storage technologies with different cost/storage trade-offs. As such, Colossus, HopsFS, ADLSv2, and Tectonic all support storing data in tiers: magnetic disks, SSDs, NVMe, in-memory. Among these systems, HopsFS has unique support for storing small files in the scaleout metadata layer for higher performance operations on small files.
HopsFS takes a different approach to using scaleout metadata. Instead of using it, primarily, to build exascale file systems, it provides a principled architecture for easily extending metadata for files and directories. In particular, this is useful in the domain of machine learning where we have both artifacts (feature data, training data, programs, models, log files) that are typically stored as files and metadata (experiments, hyperparameters, tags, metrics, etc ) that are stored in a metastore (often a relational database). HopsFS unifies the artifact store and metastore, and even enables polyglot storage and querying of metadata in both RonDB (SQL) and Elasticsearch (free-text search). This simplifies operations and provides new free-text search capabilities compared to existing ML metastores (TFX, MLFlow). The same approach enabled us to be the first to release an open-source Feature Store for ML based on Hopsworks. When building our Feature Store, instead of needing to build a separate artifact store (file system) and metastore (database) and write complex protocols to ensure the consistency of both stores, we had a single consistent storage system, where artifacts can be easily extended with consistent metadata that can be queried using free-text search. Features can be annotated with statistics and tags in metadata, training data
Some important requirements for extensible file system metadata are that it:
Even though we first heard about Colossus’ architecture in 2009 and its name in 2012, Google has been surprisingly secretive about the lowest layer of their scalable storage and compute architecture. However, after the release of Tectonic (coincidence?) in early 2021, Google released more details on Colossus in May 2021.
Colossus’ metadata storage service is BigTable, which does not support cross-shard transactions. We assume this means that Colossus lacks atomic rename, a hole that is filled for tabular data (at least) by Spanner, which supports cross-shard transactions.
In Colossus, file system clients connect to curators to perform metadata operations, who, in turn, talk to BigTable. Custodians perform file system maintenance operations, and “D” services provide block storage services, where clients read/write blocks directly from/to “D” servers.
Different clients of Colossus can store their data on different volumes (metadata shards). Atomic rename is possible within a volume, but not across volumes.
Tectonic was first announced as a file system at USENIX Fast 2021, and it unifies Facebook’s previous storage services (federated HDFS, Haystack, and others) to provide a data-center scale file system.
Similar to Colossus, Tectonic stores its metadata in a key-value store, but in this case in ZippyDB. As ZippyDB lacks cross-partition transactions, cross-namespace file system operations are not supported. That is, you cannot atomically move a file from one volume (metadata shard) to another. Often, such operations are not needed, as all the data for a given service can fit in a single namespace, and there are no file system operations between different applications. There are separate stateless services to manage the name space, blocks, files, and file system maintenance operations.
Azure Data Lake Storage (ADLS) was first announced at Sigmod 2017 and it supports Hadoop distributed file system (HDFS) and Cosmos APIs. It has since been redesigned as Azure Data Lake Gen 2 (ADLSv2) that provides multi-protocol support to the same data using the Hadoop File System API, the Azure Data Lake Storage API and the Azure Blob storage API. Unlike Colossus and Tectonic, it is available for use as a service - but only on Azure.
The most recent information about ADLS’ architecture is the original paper describing ADLS from 2017 - no architecture has been published yet for ADLSv2. However, ADLS used RSL-HK to store metadata and it has a key-value store (ring) with shards using state machine replication (Paxos) and with transactions across shards, al in an in-memory engine (“It implements a novel combination of Paxos and a new transactional in-memory block data management design.”).
HopsFS was first announced at USENIX Fast 2017 and provides a HDFS API. HopsFS is a rewrite of HDFS and it supports multiple stateless namenode (metadata servers), where the leader performs file system maintenance operations, and a pluggable metadata storage layer.
HopsFS provides a DAL API to support different metadata storage engines. Currently the default engine for HopsFS is RonDB (a fork of NDB Cluster, the storage engine for MySQL Cluster), a scalable key-value store with SQL capabilities. RonDB can scale to handle hundreds of millions of transactional reads per second and 10s of millions of transactional writes per second and it provides both a native key-value API and a SQL API via a MySQL Server. RonDB also provides a CDC (change-data-capture) API to allow us to automatically replicate changes in metadata to Elasticsearch, providing a free-text search API to HopsFS’ metadata (including its extended metadata). Metadata can be queried using any of the 3 APIs: the native key-value API for RoNDB, the SQL API, or using free-text search in Elasticsearch.
HopsFS scales the Namespace Layer with RonDB and Stateless Namenodes, while the block layer is cloud object storage.
The journey from a stronger POSIX-like file system to a weaker object storage paradigm and back again has parallels in the journey that databases have made in recent years. Databases made the transition from strongly consistent single-host systems (relational databases) to highly available (HA), eventually consistent distributed systems (NoSQL systems) to handle the massive increases in data managed by databases. However, NoSQL is just too hard for developers, and databases are returning to strongly consistent (but now scalable) NewSQL systems, with databases such as Spanner, CockroachDB, SingleSQL, and NDB Cluster.
The scaleout metadata file systems, introduce here, show that distributed hierarchical file systems are completing a similar journey, going from strongly consistent POSIX-compliant file systems to object stores (with their weaker consistency models), and back to distributed hierarchical file systems that are have solved the scalability problem by redesigning the file system around a mutable, scaleout metadata service.
TLDR; Maggy is an open-source framework that simplifies writing and maintaining distributed machine learning programs. By encapsulating your training logic in a function, the same code can be run unchanged with Python on your laptop or distributed using PySpark for hyperparameter tuning, data-parallel training, or model-parallel training. With the arrival of GPU support in Spark 3.0, PySpark can be now used to orchestrate distributed deep learning applications in TensorFlow and PySpark. We are pleased to announce we have now added support for Maggy on Databricks, so training machine learning models with many workers should be as easy as running Python programs on your laptop.
Machine Learning is going to be distributed, as Andrew Ng calls it, Data-centric AI. Data volumes are constantly increasing, and models are known to improve their prediction accuracy predictably - so companies can often know what return they will get in terms of more accurate models by simply investing in collecting more training data. One of the main challenges for developers is to switch from programming for a single-machine to programming for a distributed cluster.
That's why we developed Maggy (maggy.ai), an open source Python library that introduces a new unified framework for writing core ML training logic as oblivious training functions. By encapsulating your training logic in a Python function (we call it an oblivious training function), the same code can be run unchanged in Python or PySpark. You don’t need to rewrite your code to do hyperparameter tuning, data-parallel training, or model-parallel training.
Maggy can be extremely useful in the case you want to build a distributed ML solution but you have no prior experience of distributed computing. It is enough to just refactor your training code to wrap it inside a function (this is good development practice, in case you didn’t know) and let Maggy do the distribution magic for you.
Furthermore, it is not important whether you use Tensorflow or PyTorch as Maggy supports both of them. Scikit-Learn and XGBoost are also supported, and are used, in particular, for parallelizing hyperparameter tuning over many workers.
In the following example, we use Maggy to train a Neural Network on the Iris dataset.
The first thing we have to do is to create a cluster on your Databricks workspace. Maggy has been tested on the Databricks Runtime version 7.4 ML. Choose the number of workers you need and you are good to go.
If you don't know how to create a cluster, please follow the tutorial on this page. If it is your first time on Databricks, make sure to get familiar with the platform and prices.
Once you created the cluster, you have to install Maggy. In order to do this, just navigate to your cluster and click Libraries, then click on Install New and PyPi, write 'maggy' as Package and Install.
Once you have installed Maggy, do the same thing with TensorFlow version 2.4 or higher. For example, on the Package field you can write 'tensorflow==2.4'.
Now we can open the Databricks notebook and write our first Maggy program.
In order to use Maggy on your workflow, we need to do the following:
We are now going to present an example on how to implement this workflow. We are going to distribute the training of a (very) simple machine learning model on the iris dataset.
You can use this notebook as reference.
First of all, we have to wrap our ML model in a class, the class has to be an implementation of tf.keras.Model.
It's important to note that we are not instantiating the class, we need to pass the class to Maggy, not an instance of it.
In this example we define a class called NeuralNetwork. It is a superclass of tf.keras.Sequential, an implementation of tf.keras.Model. Make sure that your class implements tf.keras.Model. Finally, we define our ML model in the init function.
First, we need to upload the Iris dataset on Databricks, on your databricks platform, click on Data on the sidebar and then Create Table, and upload Iris.csv that can be downloaded here.
In order to use Maggy, we have to pass the training set, test set and optionally a function for data processing to the configuration file.
The training and test sets can be:
We now wrap the code containing the logics of your experiment in a function.
For HPO, we have to define a function that has the HPs to be optimized as parameters. Inside the function we simply put the training logic as we were training our model in a single machine using Tensorflow. Maggy will run this function multiple times using different parameters for you, as we will see in section 3a.
The training function provides the instruction to run the training and evaluation of your model, given the data passed in the configuration. You just need to wrap the instructions you implemented and eventual hyperparameters (for example the values to pass in the model constructor). The training function has to return a value or a list of values that corresponds to the evaluation results.
It's important to note that in the HPO function we did not pass the model as a parameter while we did that in our oblivious training function. This is because, when using Maggy for distributed training, the library has to patch some functions of the model class.
The next step is to create a configuration instance for Maggy. Since in this example we are using Maggy for hyperparameter optimization and distributed training using TensorFlow, we will use OptimizationConfig and TfDistributedConfig.
OptimizationConfig contains the information about the hyperparameter optimization.
We need to define a Searchspace class that contains the hyperparameters we want to optimize. In this example we want to search for the optimal number of layers of the neural network from 2 to 8 layers.
OptimizationConfig contains the following parameters:
Our HPO function and configuration class are now ready, so we can go on and run the HPO experiment. In order to do that, we run the lagom function, passing our training function and the configuration object we instantiated during the last step.
If you are wondering what lagom means, Lagom is a swedish word representing some cultural aspects of balance and equilibrium, in english could be translated as "just the right amount" or "less is more".
This function will print the HPO summary. As you can see, there are several values returned, we are most interested in the 'best_config' dictionary, it contains the parameters for which the model performed the best.
Now it's time to run the final step of our ML program. Let's initialize the configuration class for the distributed training. First, we need to define our hyperparameters, we want to take the best hyperparameters from the HPO.
TfDistributedConfig class has the following parameters:
hparams: the model and dataset parameters. In this case we also need to provide the 'train_batch_size' and the 'test_batch_size', these values represent the subset sizes of the sharded dataset. It's value is usually the dataset_size/number_workers but can change depending on your needs.
Finally, let's run the distributed training using the lagom function.
Maggy will run the distributed training using the number of workers and resources available to the cluster defined. Finally, it will prompt the test results. The training log can be found in the Spark UI on Databricks.
Maggy is open-source and everyone can contribute to the project. Feel free to experiment with the library and contact us for any questions or issues. You can reach out to us via GitHub or the hopsworks community. You can also give us a star on GitHub to let us know you appreciate our work.
In this article we saw how to train a ML model in a distributed fashion without reformatting our code, thanks to Maggy. Maggy is available on hopsworks.ai. If you want to know more about how to develop your ML projects faster, you may want to check out this article.
TLDR: Hopsworks is the Data-Intensive AI platform with a Feature Store for building complete end-to-end machine learning pipelines. This tutorial will show an overview of how to install and manage Python libraries in the platform. Hopsworks provides a Python environment per project that is shared among all the users in the project. All common installation alternatives are supported such as using the pip and conda package managers, in addition to libraries packaged in a .whl or .egg file and those that reside on a git repository. Furthermore, the service includes automatic conflict/dependency detection, upgrade notifications for platform libraries and easily accessible installations logs.
The Python ecosystem is huge and seemingly ever growing. In a similar fashion, the number of ways to install your favorite package also seems to increase. As such, a data analysis platform needs to support a great variety of these options.
The Hopsworks installation ships with a Miniconda environment that comes preinstalled with the most popular libraries you can find in a data scientists toolkit, including TensorFlow, PyTorch and scikit-learn. The environment may be managed using the Hopsworks Python service to install or libraries which may then be used in Jupyter or the Jobs service in the platform.
In this blog post we will describe how to install a python library, wherever it may reside, in the Hopsworks platform. As an example, this tutorial will demonstrate how to install the Hopsworks Feature Store client library, called hsfs. The library is Apache V2 licensed, available on github and published on PyPi.
To follow this tutorial you should have an Hopsworks instance running on https://hopsworks.ai. You can register for free, without providing credit card information, and receive USD 4000 worth of free credits to get started. The only thing you need to do is to connect your cloud account.
The first step to get started with the platform and install libraries in the python environment is to create a project and then navigate to the Python service.
When a project is created, the python environment is also initialized for the project. An overview of all the libraries and their versions along with the package manager that was used to install them are listed under the Manage Environment tab.
The simplest alternative to install the hsfs library is to enter the name as is done below and click Install as is shown in the example. The installation itself can then be tracked under the Ongoing Operations tab and when the installation is finished the library appears under the Manage Environment tab.
If hsfs would also have been available on an Anaconda repo, which is currently not the case, we would need to specify the channel where it resides. Searching for libraries on Anaconda repos is accessed by setting Conda as the package location.
If a versioned installation is desired to get the hsfs version compatible with a certain Hopsworks installation, the search functionality shows all the versions that have been published to PyPi in a dropdown. Simply pick a version and press Install. The example found below demonstrates this.
Many popular Python libraries have a great variety of builds for different platforms, architectures and with different build flags enabled. As such it is also important to support directly installing a distribution. To install hsfs as a wheel requires that the .whl file was previously uploaded to a Dataset in the project. After that we need to select the Upload tab, which means that the library we want to install is contained in an uploaded file. Then click the Browse button to navigate in the file selector to the distribution file and click Install as the following example demonstrates.
Installing libraries one by one can be tedious and error prone, in that case it may be easier to use a requirements.txt file to define all the dependencies. This makes it easier to move your existing Python environment to Hopsworks, instead of having to install each library one by one.
The file may for example look like this.
This dependency list defines that version 2.2.7 of hsfs should be installed, along with version 2.9.0 of imageio and the latest available release for mahotas. The file needs to be uploaded to a dataset as in the previous example and then selected in the UI.
A great deal of python libraries are hosted on git repositories, this makes it especially handy to install a library during the development phase from a git repository. The source code for the hsfs package is as previously mentioned, hosted on a public github repository. Which means we only need to supply the URL to the repository and some optional query parameters. The subdirectory=python query parameter indicates that the setup.py file, which is needed to install the package, is in a subdirectory in the repository called python.
After each installation or uninstall of a library, the environment is analyzed to detect libraries that may not work properly. As hsfs depends on several libraries, it is important to analyze the environment and see if there are any dependency issues. For example, pandas is a dependency of hsfs and if uninstalled, an alert will appear that informs the user that the library is missing. In the following example pandas is uninstalled to demonstrate that.
When new releases are available for the hsfs library, a notification is shown to make it simple to upgrade to the latest version. Currently, only the hops and hsfs libraries are monitored for new releases as they are utility libraries used to interact with the platform services. By clicking the upgrade text, the library is upgraded to the recommended version automatically.
Speaking from experience, Python library installations can fail for a seemingly endless number of reasons. As such, to find the cause, it is crucial to be able to access the installation logs in a simple way to find a meaningful error. In the following example, an incorrect version of hsfs is being installed, and the cause of failure can be found a few seconds later in the logs.
Hopsworks is available both on AWS and Azure as a managed platform. Visit hopsworks.ai to try it out.
TLDR; The first release of RonDB is now available. RonDB is currently the best low-latency, high availability, high throughput, and high scalability (LATS) database available today. In addition to new documentation and community support, RonDB 21.04.0 brings automated memory configuration, automated thread configuration, improved networking handling, 3x improved performance, bug fixes and new features. It is available as a managed service on the Hopsworks platform and it can also be installed on-premises with the installation scripts or binary tarball.
RonDB 21.04.0 is based on MySQL NDB Cluster 8.0.23. The first release includes the following new features and improvements:
Our new documentation walks through how to use RonDB as a managed service on the Hopsworks platform and to install it on-premises with automated scripts or binary tarball. We have also added new documentation of the RonDB software.
Welcome to the RonDB community! Feel free to ask any question and our developers will try to reply as soon as possible.
NDB has historically required setting a large number of configuration parameters to set memory sizes of various memory pools. RonDB 21.04.0 introduces automatic memory configuration taking away all these configuration properties. RonDB data nodes will use the entire memory available in the server/VM. You can limit the amount of memory it can use with a new configuration parameter TotalMemoryConfig.
For increased performance and stability, RonDB will automate thread configuration. This feature was introduced in NDB 8.0.23, but in RonDB it is the default behaviour. RonDB makes use of all accessible CPUs and also handles CPU locking automatically. Read more about automated thread configuration on our latest blog.
RonDB will perform better under heavy load by employing improved heuristics in thread spinning and in the network stack.
In NDB 8.0.23 and earlier releases of NDB one could only set NoOfReplicas at the initial start of the cluster. The only method to increase or decrease replication level was to perform a backup and restore. In RonDB 21.04.0 we introduce Active and Inactive nodes, making it possible to change the number of replicas without having to perform an initial restart of the cluster.
We have integrated a number of benchmark tools to assess the performance of RonDB. Benchmarking RonDB is now easy as we ship Sysbench, DBT2, flexAsynch and DBT3 benchmarks along with our binary distribution. Support for ClusterJ benchmarks are expected to come in upcoming product releases.
A new addition to the ClusterJ API was added that releases data objects and Session objects to a cache rather than releasing them fully. In addition an improvement of the garbage collection handling in ClusterJ was handled. These improvements led to a 3x improvement in a simple key lookup benchmark.
There are three ways to use RonDB 21.04.0:
TLDR; Maggy is an open source framework that lets you write generic PyTorch training code (as if it is written to run on a single machine) and execute that training distributed across a GPU cluster. Maggy enables you to write and debug PyTorch code on your local machine, and then run the same code at scale without having to change a single line in your program. Going even further, Maggy provides the distribution transparent use of ZeRO, a sharded optimizer recently proposed by Microsoft. You can use ZeRO to improve your memory efficiency with a single change in your Maggy configuration. You can try Maggy in the Hopsworks managed platform for free.
Deep learning has seen a surge in activity with the availability of high level frameworks such as PyTorch to build and train models. A few lines of code in a notebook are sufficient to create powerful classifiers from scratch. However, both the data and model sizes to achieve state-of-the-art performance are ever increasing, so that training on your local GPU becomes a hopeless endeavour.
Enter distributed training. Distributed training allows you to train the same model on multiple GPUs on different shards of your data to speed up training times. In the ideal case, training on 4 GPUs simultaneously should reduce your training time by 75%. In distributed training, each GPU computes a forward and backward pass over its own batch of the data. For the model update, the computed gradients are shared and averaged between the nodes. This way, all models update their parameters with the same combined gradient and stay in sync. This additional communication step introduces additional overhead of course, which is why ideal scaling is never truly achieved.
So if distributed training is such a great tool to accelerate training, why is its use still uncommon among normal PyTorch users? Because it is too tedious to use! A dummy example for starting distributed training might look something like this.
Going even further, you would need to launch your code on all of your nodes and take care of graceful shutdowns and collecting the results. This is where Maggy comes in. Maggy allows you to launch your PyTorch training script without any changes on Spark clusters. It takes care of the training processes for each node, the resource isolation and node connections.
Next we will explore what is needed to run distribution transparent training on Maggy as well as the restrictions that still exist with the framework.
First of all, Maggy requires its experiment to be configured for distributed training. In the most common use case this means passing your model, hyperparameters and your training/test set. Configuring is as easy as creating a config object. Hyperparameters, train and test set are optional and can also be directly loaded in the training loop. If your training loop consists of more than one module such as in training GANs with a Generator and Discriminator or Policy gradient methods in RL, you can also pass a list of modules.
Maggy’s API requires the training function to follow a unified signature. Users have to pass their model class, its hyperparameters and the train and test set to the training function.
If you want to load your datasets on each node by yourself, you can also omit passing the datasets in the config. In fact, this is highly recommended when working with larger dataset objects. Additionally, every module used in the training function should be imported within that function. Think of your training function as completely self contained. Last but not least, users should use the PyTorch DataLoader (as is best practice anyways). Alternatively, you can also use Maggy’s custom PetastormDataLoader to load large datasets from Petastorm parquet files. When using the latter, users need to ensure that datasets are even, that is they should have the same number of batches per epoch on all nodes. When using PyTorch’s DataLoader, you do not have to care about this. So to summarize, your training function needs to
It’s time to combine all the elements we introduced so far in a complete example of distributed training with Maggy. In this example, we are going to create some arbitrary training data, define a function approximator for scalar fields,write our training loop and launch the distributed training.
In order to not rely on specific datasets, we are going to create our own dataset. For this example, a scalar field should suffice. So first of all we randomly sample x and y and compute some function we want our neural network to approximate. PyTorch’s TensorDataset can then be used to form a proper dataset from this data.
Next up we define our function approximator. For our example a standard neural network with 3 layers suffices, although in real applications you would of course train much larger networks.
At the heart of every PyTorch program lies the training loop. Following the APIs introduced earlier, we define our training function as follows.
As you can see, there is no additional code for distributed training. Maggy takes care of all the necessary things.
All that remains now is to configure Maggy and run our training. For this, we have to create the config object and run the lagom function.
After running the training on 4 nodes, we can see that our approximator has converged to a good estimate of our scalar field. Of course, this would also be possible on a local node. But with more complex models and larger training sets such as the ImageNet dataset, distributed learning becomes necessary to leverage your workloads.
Maggy is open-source and documentation is available at maggy.ai. Give us a star or get in touch if you have more questions. Maggy is also available for all Hopsworks users in the managed platform on AWS or Azure. You can get started for free (no credit card required).