Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

RonDB 21.04.0: Open Source Installation Now Available

>
4/29/2021
>
Mikael Ronström

TLDR; The first release of RonDB is now available. RonDB is currently the best low-latency, high availability, high throughput, and high scalability (LATS) database available today. In addition to new documentation and community support, RonDB 21.04.0 brings automated memory configuration, automated thread configuration, improved networking handling, 3x improved performance, bug fixes and new features. It is available as a managed service on the Hopsworks platform and it can also be installed on-premises with the installation scripts or binary tarball.

What’s new in RonDB 21.04.0?

RonDB 21.04.0 is based on MySQL NDB Cluster 8.0.23. The first release includes the following new features and improvements:

  • New documentation of RonDB and its use as a managed service.
  • Community support for questions and bug reports.
  • Automated memory configuration of different memory pools based on available memory
  • Automated thread configuration based on the available CPU resources
  • Improved CPU and network handling
  • Configurable number of replicas
  • Integrate benchmarking tools in RonDB distribution
  • Performance Improvement in ClusterJ API

New documentation

Our new documentation walks through how to use RonDB as a managed service on the Hopsworks platform and to install it on-premises with automated scripts or binary tarball. We have also added new documentation of the RonDB software.

Community support

Welcome to the RonDB community! Feel free to ask any question and our developers will try to reply as soon as possible. 

Automatic memory configuration

NDB has historically required setting a large number of configuration parameters to set memory sizes of various memory pools. RonDB 21.04.0 introduces automatic memory configuration taking away all these configuration properties. RonDB data nodes will use the entire memory available in the server/VM. You can limit the amount of memory it can use with a new configuration parameter TotalMemoryConfig.

Automatic thread configuration

For increased performance and stability, RonDB will automate thread configuration. This feature was introduced in NDB 8.0.23, but in RonDB it is the default behaviour. RonDB makes use of all accessible CPUs and also handles CPU locking automatically. Read more about automated thread configuration on our latest blog.

Improved CPU and network handling

RonDB will perform better under heavy load by employing improved heuristics in thread spinning and in the network stack.

Configurable number of replicas

In NDB 8.0.23 and earlier releases of NDB one could only set NoOfReplicas at the initial start of the cluster. The only method to increase or decrease replication level was to perform a backup and restore. In RonDB 21.04.0 we introduce Active and Inactive nodes, making it possible to change the number of replicas without having to perform an initial restart of the cluster.

Integrate benchmarking tools in RonDB distribution

We have integrated a number of benchmark tools to assess the performance of RonDB. Benchmarking RonDB is now easy as we ship Sysbench, DBT2, flexAsynch and DBT3 benchmarks along with our binary distribution. Support for ClusterJ benchmarks are expected to come in upcoming product releases.

Performance Improvement in ClusterJ API

A new addition to the ClusterJ API was added that releases data objects and Session objects to a cache rather than releasing them fully. In addition an improvement of the garbage collection handling in ClusterJ was handled. These improvements led to a 3x improvement in a simple key lookup benchmark.

How to get started with RonDB?

There are three ways to use RonDB 21.04.0:

  1. Managed version available on the Enterprise Hopsworks platform. The RonDB cluster is integrated with Hopsworks and can be used for both RonDB applications as well as for Hopsworks applications. Access our full documentation to get started.
  2. Open source automated installation. Use a script that automates the creation of VMs and the installation of the software components required by RonDB. These scripts are available to create RonDB clusters on Azure and GCP. This script can be downloaded from https://repo.hops.works/installer/21.04/rondb-cloud-installer.sh.
  3. Binary tarball installation. Download the RonDB binary tarball and install it on any computers of your own liking. The binary tarball is available here.
Star us on Github
Follow us on Twitter

RonDB, automatic thread configuration

>
3/24/2021
>
Mikael Ronström

This blog introduces how RonDB handles automatic thread configuration. It is more technical and dives deeper under the surface of how RonDB operates. RonDB provides a configuration option, ThreadConfig, whereby the user can have full control over the assignment of threads to CPUs, how the CPU locking is to be performed and how the thread should be scheduled.

However, for the absolute majority of users this is too advanced, thus the managed version of RonDB ensures that this thread configuration is based on best practices found over decades of testing. This means that every user of the managed version of RonDB will get access to a thread configuration that is optimised for their particular VM size.

In addition RonDB makes use of adaptive CPU spinning in a way that limits the power usage, but still provides very low latency in all database operations. Adaptive CPU spinning improves latency by up to 50% and in most cases more than 10% improvement.

RonDB 21.04 uses automatic thread configuration by default. This means that as a user you don’t have to care about the configuration of threads. What RonDB does is that it retrieves the number of CPUs available to the RonDB data node process. In the managed version of RonDB, this is the full VM or bare metal server available to the data node. In the open source version of RonDB, one can also limit the amount of CPUs available to RonDB data nodes process by using taskset or numactl when starting the data node. RonDB retrieves information about CPU cores, CPU sockets, and connections to the L3 caches of the CPUs. All of this information is used to set up the optimal thread configuration.

Thread types in RonDB


LDM threads house the data, query threads handle read committed queries, tc threads handle transaction coordination, receive threads handle incoming network messages, send thread handle the sending of network messages, and main threads handle metadata operations, asynchronous replication and a number of other things.

LDM threads

LDM thread is a key thread type. The LDM thread is responsible for reading and writing data. It manages the hash indexes, the ordered indexes, the actual data, and a set of triggers performing actions for indexes, foreign keys, full replication, asynchronous replication. This thread type is where most of the CPU processing is done. RonDB has an extremely high number of instructions per cycle compared to any other DBMS engine. The LDM thread often executes 1.25 instructions per cycle where many other DBMS engines have reported numbers around 0.25 instructions per cycle. This is a key reason why RonDB has such a great performance both in terms of throughput and latency. This is the result of the design of data structures in RonDB that are CPU cache aware and also due to the functional separation of thread types.

Query threads

Query thread is a new addition that was introduced in NDB Cluster 8.0.23. In NDB this is not used by default, RonDB enables the use of query threads by default in the automatic thread configuration. The query threads run the same code as the LDM threads and handles a subset of the operations that the LDM can handle. A normal SELECT query will use read committed queries that can be executed by the query threads. A table partition (sometimes referred to as a table fragment or shard) belongs to a certain LDM thread, thus only this LDM thread can be used for writes and locked reads on rows in this table partition. However for read committed queries, the query threads can be used.

To achieve the best performance RonDB uses CPU locking. In Linux, it is quite common that a thread migrates from one CPU to another CPU. If the thread migrates to a CPU belonging to a different CPU core, the thread will suffer a lot of CPU cache misses immediately after being migrated. To avoid this, RonDB locks threads to specific CPU cores. Thus, it is possible to migrate the thread, but only to another CPU in a CPU core that shares the same CPU caches.

Query threads and LDM threads are organised into Round Robin groups. Each Round Robin group consists of between 4 and 8 LDM threads and the same amount of query threads. All threads within one Round Robin group share the same CPU L3 cache. This ensures that we retain the CPU efficiency even with the introduction of these new query threads. This is important since query threads introduce new mutexes and the performance of these are greatly improved when threads sharing mutexes also share CPU caches. The query thread chosen to execute a query must be in the same Round Robin group as the data owning LDM thread is.

Query threads make it possible to decrease the amount of partitions in a table. As an example, we are able to process more than 3 times as many transactions per second using a single partition in Sysbench OLTP RW compared to when we only use LDM threads. Most key-value stores have data divided into table partitions for the primary key of the table. Many key-value stores also contain additional indexes on columns that are not used for partitioning. Since the table is partitioned, this means that each table partition will contain each of those additional indexes. When performing a range scan on such an index, each table partition must be scanned. Thus the cost of performing range scans increases as the number of table partitions increases. RonDB can scale the reads in a single partition to many query threads, this makes it possible to decrease the number of table partitions in RonDB. In Sysbench OLTP RW this improves performance by around 20% even in a fairly small 2-node setup of RonDB.

In addition query threads ensure that hotspots in the tables can be handled by many threads, thus avoiding the need to partition even more to handle hotspots.

At the same time a modest amount of table partitions increases the amount of writes that we can perform on a table and it makes it possible to parallelise range scans which will speed up complex query execution significantly. Thus in RonDB we have attempted to find a balance between overhead and improved parallelism and improved write scalability.

The cost of key lookups is not greatly affected by the number of partitions since those use a hash lookup and thus always go directly to the thread that can execute the key lookup.

RonDB locks LDM threads and query threads in pairs. There is one LDM thread and one query thread in each such LDM group, we attempt to lock this LDM Group to one CPU core. LDM Groups are organised into Round Robin Groups.

A common choice for a scheduling algorithm in an architecture like this would be to use a simple round robin scheduler. However such an algorithm is too simple for this model. We have two problems to overcome. The first is that the load on LDM threads is not balanced since we have decreased the number of table partitions in a table. Second writes and locked reads can only be scheduled in an LDM thread. Thus it is important to use the Read Committed queries to achieve a balanced load. Since LDM threads and query threads are locked onto the same CPU core it is ok for an LDM thread to be almost idle and we will still be efficient since the query thread on this CPU core will be very efficient.


When a query can be scheduled to both an LDM thread and the query threads in the same Round Robin group the following two-level scheduling algorithm is used.

We gather statistics about CPU usage of threads and we also gather queue lengths in the scheduling queues. Based on this information we prioritise selecting the LDM thread and the query thread in the same LDM group. However, if required to achieve a balanced use of the CPU resources in the Round Robin group we will also schedule read committed queries to any query thread in the Round Robin group of the LDM thread. The gathered CPU usage information affects the load balancer with a delay of around 100 milliseconds. The queue length information makes it possible to adapt to changing load in less than a millisecond.

Given that we use less table partitions in RonDB compared to other solutions, there is a risk of imbalanced load on the CPUs. This problem is solved by two things. First, we use a two-level load balancer on LDM and Query threads. This ensures that we will move away work from overloaded LDM threads towards unused query threads. Second, since the LDM and Query threads share the same CPU core we will have access to an unused CPU core in query threads that execute on the same CPU core as an LDM thread that is currently underutilized. Thus, we expect that this architecture will achieve a balanced load on the CPU cores in the data node architecture.

LDM and query threads use around 50-60% of the available CPU resources in a data node.

tc threads

The tc threads receive all database operations sent from the NDB API. They take care of coordinating transactions and decide which node should take care of the queries. They use around 20-25% of the CPU resources. The NDB API selects tc threads in a node using a simple round robin scheme.

receive threads

The receive threads take care of a subset of the communication links. Thus, the receive thread load is usually fairly balanced but can be a bit more unbalanced if certain API nodes are more used in querying RonDB. The communication links between data nodes in the same node group are heavily used when performing updates. To ensure that RonDB can scale in this situation these node links use multiple communication links. Receive threads use around 10-15% of the CPU resources.

send threads

The send threads assist in sending networking messages to other nodes. The sending of messages can be done by any thread and there is an adaptive algorithm that assigns more load for sending to threads that are not so busy. The send threads assists in sending to ensure that we have enough capacity to handle all the load. It is not necessary to have send threads, the threads can handle sending even without a send thread. Send threads use around 0-10% of the CPUs available.

The total cost of sending can be quite substantial in a distributed database engine, thus the adaptive algorithm is important to balance out this load on the various threads in the data node.

main threads

The number of main threads supported can be 0, 1 or 2. These threads handle a lot of the interactions around creating tables, indexes and any other metadata operation. They also handle a lot of the code around recovery and heartbeats. They are handling any subscriptions to asynchronous replication events used by replication channels to other RonDB clusters.

Analysis of the RonDB thread model

Background

RonDB is based on NDB Cluster. NDB was focused on being a high-availability key-value store from its origin in database research in the 1990s. The thread model in NDB is inherited from a telecom system developed in Ericsson called AXE. Interestingly in one of my first jobs at Philips I worked on a banking system developed in the 1970s, this system had a very similar model compared to the original thread model in NDB and in AXE. In the operating system development time-sharing has been the dominant model since a long time back. However the model used in NDB where the execution thread is programmed as an asynchronous engine where the application handles a state machine has a huge performance advantage when handling many very small tasks. A normal task in RonDB is a key lookup, or a small range scan. Each of those small tasks is actually divided even further when performing updates and parallel range scans. This means that the length of a task in RonDB is on the order of 500 ns up to around 10 microseconds.

Traditional thread design for key-value stores

Time-sharing operating systems are not designed to handle context switches of this magnitude. NDB was designed with this understanding from the very beginning. Early competitors of NDB used normal operating system threads for each transaction and even in a real-time operating system this had no chance to compete with the effectiveness of NDB. None of these competitors are still around competing in the key-value store market.

Asynchronous thread model

The first thread model in NDB used a single thread to handle everything, send, receive, database handling and transaction handling. This is version 1 of the thread architecture, that is also implemented in the open source version of Redis. With the development of multi-core CPUs it became obvious that more threads were needed. What NDB did here was introduce both a functional separation of threads and partitioning the data to achieve a more multi-threaded execution environment. This is version 2 of the thread architecture.

Modern competitors of RonDB have now understood the need to use asynchronous programming to achieve the required performance in a key-value store. We see this in AeroSpike, Redis, ScyllaDB and many other key-value stores. Thus the industry has followed the RonDB road to achieving an efficient key-value store implementation.

Functional separation of threads

Most competitors have opted for only partitioning the data and thus each thread still has to execute all the code for meta data handling, replication handling, send, receive and database operations. Thus RonDB has actually advanced version 2 of the thread architecture further than its competitors.

One might ask, what difference does this make?

All modern CPUs use both a data cache and an instruction cache. By combining all functions inside one thread, the instruction cache will have to execute more code. In RonDB the LDM thread only executes the operation to change the data structures, the tc thread only executes code to handle transactions and the receive thread can focus on the code to execute network receive operations. This makes each thread more efficient. The same is true for the CPU data cache, the LDM thread need not bother with the data structures used for transaction handling and network receive. It can focus the CPU caches on the requirements for database operations which is challenging enough in a database engine.

A scalable key-value store design

A simple splitting of data into different table partitions makes sense if all operations towards the key-value store are primary key lookups or unique key lookups. However most key-value stores also require performing general search operations as part of the application. These search operations are implemented as range scans with search conditions, these scale not so well with a simple splitting of data.

To handle this, RonDB introduces version 3 of the thread architecture that uses a compromise where we still split the data, but we introduce query threads to assist the LDM threads in reading the data. Thus RonDB can handle hotspots of data and require fewer number of table partitions to achieve the required scalability of the key-value store.

Thoughts on a v4 of the thread architecture have already emerged, so expect this development to continue for a while more. This includes even better handling of the higher latency to persistent memory data structures.

Finally, even if a competitor managed to replicate all of those features of RonDB, RonDB has another ace in the 3-level distributed hashing algorithm that makes use of a CPU cache aware data structure.

Conclusion

All of those things combined makes us comfortable that RonDB will continue to lead the key-value store market in terms of LATS: lowest Latency, highest Availability, the highest Throughput and the most Scalable data storage. Thus, being the best LATS database in the industry.

RonDB: The World's Fastest Key-value Store is now in the Cloud.

>
2/24/2020
>
Mikael Ronström

We are pleased to introduce RonDB, the world’s fastest key-value store with SQL capabilities, available now in the cloud. RonDB is an open source distribution of NDB Cluster, thus providing the same core technology and performance as NDB, but as a managed platform in the cloud. RonDB is the best low-latency, high throughput, and high availability database available today. In addition, it also brings large data storage capabilities. RonDB is a critical component of the infrastructure needed to build feature stores, and, in general, real-time applications.

Introducing RonDB

RonDB is an open source distribution of NDB Cluster which has been used as a key-value store for applications for almost 20 years. The name RonDB is a synthesis of its inventor, Mikael Ronström, and its heritage NDB. RonDB is offered as a managed version of NDB Cluster, providing the fastest solution targeting the traditional applications that use NDB Cluster such as those used in telecom, finance, and gaming. RonDB provides a solution as the data layer for operational AI applications. RonDB gives online models access to reliable, low-latency, high-throughput storage for precomputed features.  

RonDB is ideal for developers who require a real-time database that can (1) be quickly deployed  (RonDB provides a possibility to set up an RonDB cluster in less than a minute), (2) handle millions of database operations per second, and (3) be elastically scaled up and down to match the current load, reducing operational costs. 

What is a key-value store?

A key-value store associates a value with a unique identifier (a key) to enable easy storage, retrieval, searching, and updating of data using simple queries - making them ideal for organisations with large volumes of data that needs to be constantly retrieved and updated.

Distributed key-value stores, such as RonDB, are optimal for organisations with heavy read/write workloads on large data sets, thus being a good fit for businesses operating at scale. For example, they are a critical component of the infrastructure needed to build feature stores and many different types  of interactive and real-time applications.

The value of RonDB

Key-value store applications

RonDB can be used to develop interactive online applications where storing, retrieving and updating data is essential, but there is also the possibility to search for data in the same system. RonDB provides efficient access using a partition key through either native APIs or SQL access. 

RonDB supports distributed transactions enabling applications to perform any type of data modification. RonDB enables concurrent complex queries using SQL as part of the online application. Through its efficient partitioning scheme, RonDB can scale nodes to support hundreds of CPUs per node and then scale to hundreds of data nodes (tens of thousands of CPUs in total).

SQL applications

RonDB supports writing low latency, high availability SQL applications using its full SQL capabilities, providing a MySQL Server API.

LATS: low Latency, high Availability, high Throughput, and scalable Storage

RonDB is the fastest key-value store currently available and the only one with general purpose query capabilities, through its SQL support. The performance of key-value stores are evaluated based on low Latency, high Availability, high Throughput, and scalable Storage (LATS). 

LATS: low Latency, high Availability, high Throughput, scalable Storage.

Low Latency

RonDB has shown the fastest response times: it can respond in 100-200 microseconds on individual requests and in less than a millisecond on batched read requests; it can also perform complex transactions in a highly loaded cluster within 10 milliseconds. Moreover, and perhaps even more importantly, RonDB has predictable latency.

High Availability

NDB Cluster can provide Class 6 Availability; in other words, an NDB system is operational 99.9999% of the time, thus no more than 30 seconds of downtime per year. RonDB brings the same type of availability now to the cloud, thus ensuring that RonDB is always available. RonDB allows you to place replicas within a cloud region on different availability zones, ensuring that failure of availability zones does not result in downtime for RonDB.

High Throughput

RonDB scales to up to hundreds of millions of read or write ops/seconds. A single RonDB instance on a virtual machine can support millions of key value operations per second,  and, without downtime, can be scaled up  to a large cluster that supports hundreds of millions of key value operations per second.

Scalable Storage

RonDB offers large dataset capabilities, being able to store up to petabytes of data.Individual RonDB data nodes can store up to 16TBytes of in-memory data and many tens of TBytes of disk data. Given that RonDB clusters can scale up to 144 data nodes, an RonDB cluster can be scaled to store petabytes of data.

LATS Performance

The combination of predictable low latency, the ability to handle millions of operations per second, and a unique high availability solution makes RonDB the leading LATS database, that is now also available in the cloud.

RonDB as online feature stores

RonDB is the foundation of the Hopsworks Online Feature Store. For those unfamiliar with the concept of feature store, it is a data warehouse of features for machine learning. It contains two databases: an online database serving features at low latency to online applications and an offline database storing large volumes of historic feature values. 

The online feature store provides the data (features) that underlie the models used in intelligent online applications. For example, it helps inform an intelligent decision about whether a financial transaction is suspected of money laundering or not by providing the real-time information about the user’s credit history, recent card usage, location of the transaction, and so on. Thus, the most important requirements from the feature store are quick and consistent responses (low latency), handling large volumes of key lookups (high throughput), and being always available (high availability). In addition an online feature store should be able to scale as the business grows. RonDB is the only database available in the market that meets all these requirements.

Get Started

We’re not releasing RonDB publicly just yet. Instead, we have made it available as early access. If you think your team could benefit from RonDB, please contact us!

HopsFS: 100x Times Faster than AWS S3

>
11/19/2020
>
Mahmoud Ismail

TLDR; Many developers believe S3 is the "end of file system history". It is impossible to build a file/object storage system on AWS that can compete with S3 on cost. But what if you could build on top of S3 a distributed file system with a HDFS API that gives you POSIX goodness and improved performance? That’s what we have done with a cloud-native release of HopsFS that is highly available across availability zones, has the same cost as S3, but has 100X the performance of S3 for file move/rename operations, and 3.4X the read throughput of S3 (EMRFS) for the DFSIO Benchmark (peer reviewed at ACM Middleware 2020).

HopsFS has lower latency and higher throughput than EMRFS (S3) for metadata operations (Middleware ‘20).

The Dumb Bucket

S3 has become the de-facto platform for storage in AWS due to its scalability, high availability, and low cost. However, S3 provides weaker guarantees and lower performance compared to distributed hierarchical file systems. Despite this, many developers erroneously believe that S3 is the end of file system history - there is no alternative to S3, so just re-write your applications to account for its limitations (such as slow and inconsistent file listings, non atomic file/dir rename, closed metadata, and limited change data capture (CDC) support). Azure has built an improved file system, Azure Data Lake Storage (ADLS) V2, on top of Azure Blob Storage (ABS) service. ADLS provides a HDFS API to access data stored in a ABS container, giving improved performance and POSIX-like goodness. But, until today, there has been no equivalent to ADLS for S3. Today, we are launching HopsFS as part of Hopsworks.

Hierarchical File Systems strike back in the Cloud

Hierarchical distributed file systems (like HDFS, CephFS, GlusterFS) were not scalable enough or highly available across availability zones in the cloud, motivating the move to S3 as the scalable storage service of choice. In addition to the technical challenges, AWS have priced virtual machine storage and inter-availability zone network traffic so high that no third party vendor could build a storage system that offers a per-byte storage cost close in price to S3. 

However, the move to S3 has not been without costs. Many applications need to be rewritten as the stronger POSIX-like behaviour of hierarchical file systems (atomic move/rename, consistent file listings, consistent read-after-writes) has been replaced by weakened guarantees in S3. Even simple tasks, such as finding out what files you have, cannot be easily done on S3 when you have enough files, so a new service was introduced to enable you to pay extra to get a stale listing of your files. Most analytical applications (e.g., on EMR) use EMRFS, instead of S3, which is a new metadata layer for S3 that provides slightly stronger guarantees than S3 - such as consistent file listings.

File systems are making the same Journey as Databases

The journey from a stronger POSIX-like file system to a weaker object storage paradigm and back again has parallels in the journey that databases have made in recent years. Databases made the transition from strongly consistent single-host systems (relational databases) to highly available (HA), eventually consistent distributed systems (NoSQL systems) to handle the massive increases in data managed by databases. However, NoSQL is just too hard for developers, and databases are returning to strongly consistent (but now scalable) NewSQL systems, with databases such as Spanner, CockroachDB, SingleSQL, and MySQL Cluster. 

In this blog, we show that distributed hierarchical file systems are completing a similar journey, going from strongly consistent POSIX-compliant file systems to object stores (with their weaker consistency models, but high availability across data centers), and back to distributed hierarchical file systems that are HA across data centers, without any loss in performance and, crucially, without any increase in cost, as we will use S3 as block storage for our file system.

HopsFS

HopsFS is a distributed hierarchical file system that provides a HDFS API (POSIX-like API), but stores its data in a bucket in S3. We redesigned HopsFS to (1) be highly available across availability zones in the cloud and (2) to transparently use S3 to store the file’s blocks without sacrificing the file system’s semantics. The original data nodes in HopsFS have now become stateless workers (part of a standard Hopsworks cluster) that include a new block caching service to leverage faster local VM storage for hot blocks. It is important to note that the cache is a global cache - not a local worker cache found in other vendor’s Spark workers - that includes secure access control to the cache. In our experiments, we show that HopsFS outperforms EMRFS (S3 with metadata in DynamoDB for improved performance) for IO-bound workloads, with up to 20% higher performance and delivers up to 3.4X the aggregated read throughput of EMRFS. Moreover, we demonstrate that metadata operations on HopsFS (such as directory rename or file move) are up to two orders of magnitude faster than EMRFS. Finally, HopsFS opens up the currently closed metadata in S3, enabling correctly-ordered change notifications with HopsFS’ change data capture (CDC) API and customized extensions to metadata. 

At Logical Clocks, we have leveraged HopsFS’ capabilities to build the industry’s first feature store for machine learning (Hopsworks Feature Store). The Hopsworks Feature Store is built on Hops Hive and customized metadata extensions to HopsFS, ensuring strong consistency between the offline Feature Store, the online Feature Store (NDB Cluster), and data files in HopsFS.

Some of the key advantages of HopsFS/S3 are:

POSIX-Like Semantics with a HDFS API

  • Consistent file listings, consistent read-after-write, atomic rename (files/directories).

Open, Extensible Metadata

  • XAttr API to attach arbitrary metadata to files/directories.

Change Data Capture API

  • Correctly ordered stream of file system mutation events delivered with low latency to downstream clients by ePipe.

Free-Text search API for File System Namespace

  • File system namespace metadata changes can be transparently replicated to Elasticsearch for low-latency free-text search of the namespace and its extended metadata. This service is provided by Hopsworks.

X.509 Certificates for Authentication, TLS for Encryption-in-Transit

  • HopsFS uses X.509 Certificates to identify and authenticate clients, with TLS providing end-to-end encryption-in-transit. 

Faster Metadata Operations

  • File/directory rename/move, file listings - no limit on retrieving 1000 files-at-a-time (as in S3). 

Faster Read Operations

  • Workers in HopsFS securely cache file blocks on behalf of clients using local VM storage. NameNodes are cache-aware and redirect clients to securely read the cached block from the correct worker.

Highly Available across Availability Zones (AZs)

  • Support for high availability (HA) across AZs through AZ-aware replication protocols.

HopsFS/S3 Performance

We compared the performance of EMRFS instead of S3 with HopsFS, as EMRFS provides stronger guarantees than S3 for consisting listing of files and consistent read-after-updates for objects. EMRFS uses DynamoDB to store a partial replica of S3’s metadata (such as what files/directories are found in a given directory), enabling faster listing of files/dirs compared to S3 and stronger consistency (consistent file listings and consistent read-after-update, although no atomic rename) .

Here are some selected results from our peer-reviewed research paper accepted for publication at ACM/IFIP Middleware 2020. The paper includes more results than shown below, and for writes, HopsFS is on-average about 90% of the performance of EMRFS - as HopsFS has the overhead of first writing to workers who then write to S3.  HopsFS has a global worker cache (if the block is cached at any worker, clients will retrieve the data directly from the worker)  for faster reads and the HopsFS’ metadata layer is built on NDB cluster for faster metadata operations.

*Enhanced DFSIO Benchmark Results with 16 concurrent tasks reading 1GB files. For higher concurrency levels (64 tasks), the performance improvement drops from 3.4X to 1.7X.

**As of November 2020, 3500 ops/sec is the maximum number of PUT/COPY/POST/DELETE per second per S3 prefix, while the maximum number of GET/HEAD requests per prefix is 5500 reads/sec. You can increase throughput in S3 by reading/writing in parallel to different prefixes, but this will probably require rewriting your application code and increasing the risk of bugs. For HopsFS (without S3), we showed that it can reach 1.6m metadata ops/sec across 3 availability zones. 

In our paper published at ICDCS, we measured the throughput of HopsFS when deployed in HA mode over 3 availability zones. Using a workload from Spotify, we compared the performance with CephFS. HopsFS (1.6M ops/sec) reaches 2X the throughput of CephFS (800K ops/sec) when both are deployed in full HA mode. CephFS, however, does not currently support storing its data in S3 buckets.

How do I get started with HopsFS?

HopsFS is available as open-source (Apache V2). However, cloud-native HopsFS is currently only available as part of the hopsworks.ai platform. Hopsworks.ai is a platform for the design and operation of AI applications at scale with support for scalable compute in the form of Spark, Flink, TensorFlow, etc (comparable to Databricks or AWS EMR). You can also connect Hopsworks.ai to a Kubernetes cluster and launch jobs on Kubernetes that can read/write from HopsFS. You connect your cluster to a S3 bucket in your AWS account or on Azure to a Azure Blob Storage bucket. You can dynamically add/remove workers to/from your cluster, and the workers act as part of the HopsFS cluster - using minimal resources, but reading/writing to/from S3 or ABS on behalf of clients, providing access control, and caching blocks for faster retrieval.

References

Hopsworks 2.0: The Next Generation Platform for Data-Intensive AI with a Feature Store

>
11/17/2020
>
Theofilos Kakantousis

TLDR; Hopsworks 2.0 is now generally available. Hopsworks is the world's first Enterprise Feature Store along with an advanced end-to-end ML platform. This version includes improvements to the User Interface, client APIs and improved integration with Kubernetes. Hopsworks 2.0 also comes with upgrades for popular ML frameworks such as TensorFlow 2.3 and PyTorch 1.6. 

The Hopsworks 2.0 platform:

In addition to bug and documentation fixes, Hopsworks 2.0 includes the following new features and improvements:

  • Feature Store: New Rich APIs and Extended Integration with External Data Sources
  • Provenance: Track and Navigate from Models to Training Datasets to Features
  • Python Programs as Schedulable Feature Engineering Jobs
  • Fine-grained User Access Control for Feature Stores
  • GitLab Support 
  • New example programs for feature engineering and usage for training and serving

Detailed release notes are available at the Hopsworks GitHub repository.

Feature Store: New Rich APIs and Extended Integration with External Data Sources

The Feature Store API is now more intuitive by following a Pandas-like style programming style for feature engineering and works with abstractions such as feature groups and training datasets. Allowing for an improved workflow. Solving the biggest pain points of professional users working with Big Data and AI: being able to organize and discover data with as few steps as possible. An all new API documentation page is accessible here

This release brings the new ability to enrich Feature Groups abstractions by attaching metadata in the form of tags and then allowing free-text search on these, available across the feature store to all users.

More details on the upgraded feature store experience are outlined in the Hopsworks Feature Store API 2.0 blog post and the user guide.

Provenance: Track and Navigate from Models to Training Datasets to Features

Hopsworks is built from the ground-up with support for implicit provenance. What that means is data engineers and data scientists that develop machine learning pipelines, from data ingestion, to feature engineering, to model training and serving, get lineage information for free as the data flows and is transformed through the pipeline. Users can then navigate from a model served in production back to the ML experiment that created the model along with its training dataset and finally all the way back to the feature groups and ingestion pipelines the training dataset was created from.  

Python Programs as Schedulable Feature Engineering Jobs

Up till this moment in Hopsworks’ journey, its Jobs service provided users with the means to run and monitor applications and do feature engineering using popular distributed processing frameworks such as Apache Spark/PySpark and Apache Flink. Hopsworks now further increases its reach to Feature Store users by  enabling them to run such feature engineering jobs by using pure Python programs, detangling them for example from the need to run their code within a PySpark context/cluster. Developers can now do feature engineering with Pandas or their library of choice and still benefit from Jobs capabilities such as include them in Airflow data and ML pipelines as well as run Python Jupyter notebooks as jobs, simplifying the process of including notebooks in development and production pipelines. User guide is available here.

Fine-Grained User Access Control for Feature Stores

Sharing datasets across projects of a Hopsworks instance is now made simpler by using a fine-grained access control mechanism. Users can now select from a list of options that enable a dataset to be shared with different permissions for each project and for each role users have in the target project (Data Owner, Data Scientist). In addition, these new dataset sharing semantics now make it easier to share feature stores with different permissions across projects. For more details please check the user guide here.

GitLab Support

Hopsworks developers can now use GitLab for code version control, in addition to the existing GitHub support. JupyterLab in Hopsworks comes shipped with the Git plugin so developers can work with Git directly from within the JupyterLab IDE. Further details are available in the user-guide here

Python Environment Updates and Examples

Hopsworks 2.0 is shipped with the latest and the greatest frameworks of the data engineering and data science Python world. Every Hopsworks instance now comes pre-installed with Pandas version 1.1.2, TensorFlow 2.3.1, PyTorch 1.6.0 and many more. 

A plethora of new feature engineering and machine learning examples are now available at our examples GitHub repository. Data engineers can now follow examples that demonstrate how to work from the Feature Store with Redshift  or Snowflake. Also new end-to-end and parallel experiments with deep learning and TensorFlow 2.3 are available.

Get Started

Much more is to come from Hopsworks with the upcoming releases so make sure to follow us on Twitter and, if you like it, star us on GitHub.

Visit http://hopsworks.ai to get started with a free instance of Hopsworks and explore the new Hopsworks Feature Store and data science support of the Hopsworks 2.0 series and don’t forget to join our growing community!

Hopsworks: World’s Only Cloud Native Feature Store, now Available on AWS and Azure.

>
11/17/2020
>
Steffen Grohsschmiedt

TLDR; Hopsworks is now available as a managed platform for Amazon Web Services (AWS) and Microsoft Azure with a comprehensive free tier. Hopsworks is the only currently available cloud-native Enterprise Feature Store and it also includes a Data Science platform for developing and operating machine learning models in production.

The Hopsworks managed platform:

  • Support for AWS and Microsoft Azure;
  • The Feature Store for Databricks (AWS/Azure), SageMaker, Kubeflow, and more;
  • Pay-per-use Compute: Engineer Features at Scale (Spark, Python, Flink);
  • Pay-per-use Compute for Training Machine Learning Models (including GPU support);
  • Scale Storage Independently - data stored on Amazon S3 and Azure Blob Storage;
  • Integrates with Amazon EKS/ECR for Python Jobs, Notebooks, and Model Serving;
  • CI/CD ML Pipelines using Airflow, including support for Jupyter-notebooks-as-jobs;
  • Federated IAM Roles Made Easy;
  • Managed Backups and Upgrades;
  • Active Directory/LDAP Integration;
  • Organization and User Management on Hopsworks.

Start Using Hopsworks for Free

You can get started with Hopsworks for free today. We offer free Hops Credits for you to start engineering and managing your features. You have access to all functionality of the Hopsworks Feature Store alongside all main features of Hopsworks platform, including compute elasticity (add/remove workers when needed), storage on S3/Azure Blob Storage, EKS, ECR and GPUs, as well as integrations with external Data Science and Data Lake/Warehouse platforms. Get started now!

Hopsworks 2.0

Hopsworks is the world's first Enterprise Feature Store for machine learning, including  an end-to-end ML platform for developing and operating AI applications. Hopsworks includes a new and improved Feature Store API, integration with Kubernetes, Sagemaker, Databricks, Redshift, Snowflake, Kubeflow, and many more. Hopsworks also comes with upgrades for popular ML frameworks, such as TensorFlow, PyTorch, and Maggy. 

All details about the 2.0 release can be found in the release blog post: Hopsworks 2.0: The Next Generation Platform for Data-Intensive AI with a Feature Store.

Support for AWS and Microsoft Azure

Hopsworks has a managed platform for AWS and we are pleased to now announce full support for Microsoft Azure. Hopsworks is now cloud-independent, allowing customers to seamlessly move from AWS to Azure or vice-versa, without the fear of being locked in to a single vendor. Enterprises can now manage features for training and serving models at scale on Hopsworks, while maintaining control of their data inside their organisation’s own cloud accounts. Check out how to get started on AWS or Microsoft Azure.

Feature Store for Databricks (AWS/Azure), SageMaker and Kubeflow

As Hopsworks is a modular platform with integrations to third party platforms, organisations can combine our Feature Store with their existing data stores and data science platforms. Hopsworks natively integrates with Databricks on both AWS and Azure, Amazon Sagemaker, Kubeflow and other ML platforms. Check out how to get started with Databricks on AWS and Azure, Amazon Sagemaker, Kubeflow and other Data and ML platforms.

Pay-per-use Compute: Engineer Features and Train Models at Scale

The general available version of Hopsworks adds support for pay-per-use compute, allowing you to add and remove compute nodes for feature engineering and machine learning jobs as needed to scale with your demand and minimize cost when not needed. We support different types of instances allowing you add or remove exactly the right amount of resources (CPU, GPU, memory) that you need to run your application. You can, for example, add GPU nodes to run a large-scale distributed training job while at the same time adding or removing non-GPU nodes for feature engineering workloads. Hopsworks supports running jobs on a wide variety of frameworks: Spark, Python, Flink, TensorFlow, PyTorch, Scikit-Learn, XGBoost, and more.

Pay-per-use Compute for Training Machine Learning Models (including GPU support)

Hopsworks on AWS added support for using GPU instances for running ML workloads, scaling to multiple nodes and GPUs. GPU nodes can be added or removed at any point in time to scale with your workload and safe cost when not needed. In addition, you can utilize EKS for GPU-accelerated workloads in notebooks and Python jobs.

Scale Storage Independently - Amazon S3 and Azure Blob Storage

The Feature Store stores its historical data on HopsFS (a next generation HDFS) that now uses both S3 and Azure Blob storage as its backing store, reducing costs and enabling storage to scale independently of the size of a Hopsworks cluster. As a customer, you only have to provide your bucket and an IAM role/Managed identity with sufficient rights when creating a cluster. All data is kept private and stored inside your cloud account. HopsFS provides both performance improvements (3.4X read throughput vs S3), POSIX-like semantics, and open metadata compared to S3.

Amazon EKS/ECR for Python Jobs, Notebooks, and Model Serving

Hopswoksi on AWS integrates with EKS and ECR allowing you to run applications as containers on EKS from Hopsworks. Some services in Hopsworks are offloaded to EKS, such as running Jupyter/JupyterLab notebooks and model serving workloads. Hopsworks users run applications on EKS that can securely callback and use services in Hopsworks. In particular, there is special support for Python programs, where Data Scientists can install python libraries in their Hopsworks project, and Hopsworks transparently builds the Docker image that is run as a Python job on Kubernetes. Even Jupyter notebooks on Hopsworks can be run and scheduled as jobs on Kubernetes, without users having to write and manage Dockerfiles. An additional benefit of EKS is that the cluster can be set up with autoscaling allowing your workloads to scale dynamically with demand. Check out Integration with Amazon EKS and Amazon ECR to get started with EKS for Hopsworks.

Feature Engineering and ML Pipelines

Hopsworks supports data-parallel feature engineering jobs in Spark, PySpark, and Flink, as well as pure Python feature engineering with Pandas, Feature Tools, and any Python library. Feature engineering pipelines typically end at the Feature Store, and can be orchestrated using Airflow on Hopsworks (or an external Airflow cluster). Hopsworks has additional support for data validation with Great Expectations and Deequ. After the Feature Store, ML pipelines create train/test datasets, train models, analyze, validate and deploy models. Hopsworks provides experiment tracking, Tensorboard, distributed ML (with open-source Maggy), model analysis with the What-if-Tool. Again, ML training pipelines can be orchestrated using Airflow on Hopsworks (or an external Airflow cluster).

Federated IAM Roles Made Easy

Enterprise Hopsworks now enables fine-grained access to AWS resources using AWS role chaining (federated IAM roles). Administrators can now define a mapping between AWS roles and project-specific user roles in Hopsworks. This enables both fine-grained access and auditing of users when they use AWS services from Hopsworks. Hopsworks users can either set a default role to be assumed by their jobs and notebooks to enable access to their AWS resources, or assume a role manually from within their programs by using the Python and Java/Scala SDKs (guide).

Managed Backups and Upgrades

The enterprise version of Hopsworks supports fully-automated backups and upgrades at the click of a button, making the operation of Hopsworks cluster a non-issue for your DevOps team.

Active Directory/LDAP Integration

The enterprise version of Hopsworks supports single sign-on to Hopsworks clusters with Active Directory/LDAP integration. This enables your enterprise to manage access to the Feature Store and Hopsworks with your existing enterprise Identity Provider. Hopsworks also supports OAuth-2 as an Identity Provider.

Organizations and User Management

Hopsworks supports billing organizations and user management, enabling your teams to share access to your Hopsworks account.

Coming soon

We are continuously working on improving the Hopsworks experience. Some of the upcoming features are:

  • Dynamic auto scaling of compute nodes
  • Support for AWS Spot Instances/Azure Spot Virtual Machines
  • Support for Azure Kubernetes Service (AKS)
  • Support for GPUs on Azure

Hopsworks Feature Store API 2.0, a new paradigm.

>
11/17/2020
>
Fabio Buso

TLDR; Hopsworks Feature Store interacts with the new Python and Scala/Java Software Development Kit (SDK) available in Hopsworks 2.0. The new SDK builds on our extensive experience working with Enterprise customers and users with Enterprise requirements. With this new release, we consolidate multiple libraries with a single one. We named it `HSFS` (HopsworkS Feature Store) and in this blog post we will be looking at some of the improvements and key features that the new SDK brings.

Today, we’re introducing the new Hopsworks Feature Store API. Rebuilt from the ground up, today’s release includes the first set of new endpoints and features we’re launching so developers can help the world connect to the public conversation happening on Twitter. 

If you can’t wait to check it out, visit Hopsworks.ai to get started. If you can, then read on for more about what we’re building, what’s new about the Feature Store API v2.

Rethinking the Hopsworks Feature Store API

The Hopsworks Feature Store was first released at the end of 2018, and it included a new type of Feature Store API based on the FeatureGroup (DataFrame). When designing the Hopsworks Feature Store, we looked at existing feature stores (Michelangelo by Uber and Zipline by AirBnb) that provided Domain Specific Language (DSL) APIs to their Feature Stores - you declaratively define features, and the DSL is then executed to ingest feature data into the Feature Store. However, we were building a general-purpose Feature Store and we knew that history has not been kind to DSLs - they have their day in the sun, but general purpose frameworks and languages win out in the long-run. So, we went with the FeatureGroup (DataFrames in Spark or Pandas) as the way to ingest and export features to/from the Hopsworks Feature Store. Since then, other Feature Stores have followed our approach, such as Feast that introduced FeatureSets in 0.3 almost 1 year later and Spark support almost 2 years later). 

However, our API still encouraged developers to think of features as existing in a flat namespace. With great customers, such as PaddyPower, we rethought and redesigned our Feature Store API to consider other practical problems we encountered, such as making breaking changes to FeatureGroup schemas without breaking existing feature pipelines, handling feature naming conflicts, and redesigning a minimal single client library (from 2 libraries previously) that can run in either a (Py)Spark or Python environment. That library is now released as our new Feature Store API and is called HSFS.

Feature Reuse with Pandas-like Joins

HSFS provides a DataFrame API to ingest data into the Hopsworks Feature Store. You can also retrieve feature data in a DataFrame, that can either be used directly to train models or materialized to file(s) for later use to train models

The idea of the Feature Store is to have pre-computed features available for both training and serving models. The key functionality required to generate training datasets from reusable features are: feature selection, joins, filters and point in time queries. To enable this functionality, we are introducing a new expressive Query abstraction with HSFS that provides these operations and guarantees reproducible creation of training datasets from features in the Feature Store. 

The new joining functionality is heavily inspired by the APIs used by Pandas to merge DataFrames. The APIs allow you to specify which features to select from which feature group, how to join them and which features to use in join conditions.

If a data scientist wants to modify a new feature that is not available in the Feature Store, she can write code to compute the new feature (using existing features or external data) and ingest the new feature values into the Feature Store. If the new feature is based solely on existing feature values in the Feature Store, we call it a derived feature. The same HSFS APIs can be used to compute derived features as well as features using external data sources.

  

# create a query
feature_join = rain_fg.select_all()
                         .join(temperature_fg.select_all(), on=["date", "location_id"])
                         .join(location_fg.select_all())

td = fs.create_training_dataset("rain_dataset",
                          version=1,
                          label=”weekly_rain”,            
                          data_format=”tfrecords”)

# materialize query in the specified file format
td.save(feature_join)

# use materialized training dataset for training, possibly in a different environment
td = fs.get_training_dataset(“rain_dataset”, version=1)

# get TFRecordDataset to use in a TensorFlow model
dataset = td.tf_data().tf_record_dataset(batch_size=32, num_epochs=100)

# reproduce query for online feature store and drop label for inference
jdbc_querystring = td.get_query(online=True, with_label=False)

When using HSFS to create a training dataset, the features, their order and how the feature groups are joined is also saved as metadata. This metadata is then used at serving time to build a JDBC query that is executed by a client (along with the feature group primary key values) to request a feature vector from the online Feature Store for a specific model.

Features belong in a hierarchical namespace

When we started building the Hopsworks Feature Store we wanted to create a flat namespace for features to make it easier for data scientists to pick the features they wanted. But as soon as our customers started having tens of thousands of features in the feature store, feature naming conflicts became commonplace. 

Features like created_on, customer_id, account_id, are often used in many different unrelated Feature Groups. The same user might, for example, have a different account_id for each service the company provides.Imagine an e-commerce use case - most likely you will have multiple `created_on` or `revenue`-named features.

In HSFS we solved this challenge by requiring users to work with the feature group abstraction - users have to specify which features they need from which feature group. The feature groups abstraction also allows the query planner to intelligently  identify which feature to use when joining features from different feature groups.

Time travel as first class citizen

Hopsworks has offered support for Apache Hudi for over one year. Apache Hudi is the key component to make time travel, upserts, and ACID updates possible on feature data. Time travel enables data engineers and data scientists to retrieve a previous snapshot of the feature data for debugging and auditing purposes. 

In HSFS, we added support for complex time travel queries, allowing users to create training datasets by retrieving different feature groups at different points in time. The different times are stored as metadata alongside the training datasets. This enables users to easily reproduce the creation of training datasets with historical feature values.

The new SDK improves support for Apache Hudi by integrating directly with the feature groups and joining APIs. Apache Hudi is also now available for the Python APIs. The APIs hide the complexity of dealing with Apache Hudi options. At the same time the high level APIs will allow us to implement support for additional formats such as Delta Lake.

Provenance and custom metadata

Provenance for feature data is a key new functionality available as part of HSFS. Hopsworks Feature Store implicity tracks dependencies between feature groups, training datasets and models. Provenance allows users to know, at any given moment, what features are the most widely used and also what features are not used anymore and could potentially be removed from the feature store. Provenance also enables users to traverse from a model to the application used to train it, the input training dataset used, and the features and feature group snapshots used to create the training dataset.

In this new release we also allow users to create and attach user-defined labels to feature groups and training datasets. Labels and tags enable users to build a custom metadata catalog for their Feature Store, that (using Elasticsearch) supports low-latency free-text search over potentially thousands of features, descriptions, labels and tags.

ML Framework APIs to Training Datasets

While the Hopsworks feature store plays a key role in defining governance around feature data, improving execution speed when it comes to experimenting, building and deploying models, the ultimate customers of the feature stores are the data scientists and machine learning engineers.

To make their interaction with the feature store as smooth as possible, we added support retrieving training dataset as a TensorFlow Dataset format using tf.Data. This allows data scientists to efficiently read training datasets in their TensorFlow code.

Pure Python Feature Engineering

Alongside existing support for (Py)Spark, this release also brings supports for feature engineering pipelines built using pure Python programs. With HSFS you are able to upload feature data to the feature store from your SageMaker notebook, KubeFlow, Jupyter notebook, or even your local machine.

This is in addition to the existing capabilities of exploratory data analysis with the feature store from your notebook, and the creation and reading of training datasets.

Get started

You can get started immediately by creating a new cluster on https://hopsworks.ai, the library is already pre-installed in your environment so you can get started straight away.

The documentation is available and you can walk through one of the many example notebooks that are available here.

Hopsworks Feature Store for AWS SageMaker

>
5/18/2020
>
Fabio Buso

Feature stores are key components in enterprises’ machine learning/artificial intelligence architectures. In previous blog posts (Introduction to feature store, MLOps with a feature store, and Hopsworks Feature Store for Databricks) we focused on describing the key concepts and building blocks of the Hopsworks Feature Store. In this blog post we are going to focus on how to integrate AWS SageMaker with Hopsworks. Hopsworks is available on AWS as either a SaaS platform (www.hopsworks.ai) or as a custom Enterprise platform.

While Hopsworks provides all the tools to design and operate pipelines that go from raw data to serving models in production, it is also a modular platform. In particular, the Hopsworks Feature Store can be used as a standalone feature store by data science platforms, such as AWS SageMaker or Databricks. It offers AWS Sagemaker users a centralized platform to manage, discover, and use features - for both creating training datasets and for serving features to operational models. In this blog, we will cover how AWS Sagemaker users can, from within the comfort of their Jupyter notebook, perform exploratory data analysis with the feature store, discovering available features, and join features together to create train/test datasets - all from the comfort of your existing SageMaker notebook instance.

Exploratory Data Analysis with a Feature Store

Exploratory data analysis (EDA) is a key component of every data scientists’ job. The Hopsworks Feature Store provides data scientists with a repository of features ready to be used for training models. Data scientists can browse the available features, understand the features by inspecting their metadata, investigate pre-computed feature statistics, and preview sample feature values. These are typical steps a data scientist takes to determine if a feature is a good fit for a specific model. With the Hopsworks AWS SageMaker integration, data scientists can perform these steps in a Jupyter notebook by making feature store API calls in Python.

In Hopsworks, features are organized into groups of related features in what is called a Feature Group. Exploration usually starts at the feature group level, by listing all the available feature groups in the feature store:

>>> featurestore.get_featuregroups()
['games_features_1',
'games_features_on_demand_tour_1',
'games_features_hudi_tour_1',
'season_scores_features_1',
'attendances_features_1',
'players_features_1',
'teams_features_1',
'imported_feature_name_1',
'imported_feature_name_online_1']

The following step allows data scientists to understand which individual features are available in a given feature group, and it returns the first five rows (a data sample):

>>> df = featurestore.get_featuregroup("teams_features").head(5)


The above API call will send a request to the Hopsworks Feature Store and return the result to the user in a Pandas dataframe df.

Individual features are the building blocks of the Hopsworks feature store. From SageMaker, data scientists can join features together and visualize them. As joining features is performed in Spark and SageMaker only provides a Python kernel, the join is executed on the Hopsworks Feature Store and the result returned to the user in a Pandas dataframe df. The complexity of the request is hidden behind the API call.

>>> df = featurestore.get_features(
["team_budget", "average_attendance", "average_player_age"]
).head(5)


Statistics and data visualization help to give an understanding of the data. Hopsworks allow users to compute statistics such as the distribution of feature values, feature correlation within a feature group, and descriptive statistics (Min, Max, Averages, Counts) on the different features.

The statistics are shown in the Hopsworks Feature Store UI, but they are also available from a notebook in SageMaker:

>>> featurestore.visualize_featuregroup_correlations("players_features")

Generate train/test datasets

Once you have explored the feature store and identified which features you need for your model, you can create a training dataset (the train and test data you need to train and evaluate a model, respectively). A training dataset is a materialization of multiple features joined together, potentially coming from different feature groups. The joining of features together on-demand, enables data scientists to reuse the same features in many different training datasets. Once features have been joined together into a dataframe, they can be stored in a ML framework friendly file format on a storage platform of choice, such as S3. For example, if you are training a TensorFlow model, you may decide to store your training dataset in TensorFlow’s native TFRecord file format, in a bucket on S3, s3_bucket.

>>> featurestore.create_training_dataset(
    training_dataset = "team_position_prediction",
    features =  ["team_budget", "average_attendance", "average_player_age"],
    training_dataset_version = latest_version + 1,
    data_format=’tfrecords’,
    sink=s3_bucket
)

In the above example, the feature store joins the list of features together and saves the result in files in TFRecord format in a S3 bucket. The S3 bucket needs to be defined inside a connector in the Hopsworks Feature Store.  In practice, what happens is that the SageMaker notebook asks the Hopsworks Feature Store to start a Spark job to produce the training dataset. When the job has completed on Hopsworks, you’ll be able to use the training dataset, typically in a different notebook, to train your model.

Get Started

Before you begin, make sure you have started a Hopsworks cluster using our managed platform Hopsworks.ai. The Hopsworks - SageMaker integration is an enterprise only feature and Hopsworks.ai gives you access to it. The first time you use the Hopsworks - SageMaker integration, there are a few simple steps that you need to perform to configure your SageMaker environment.

API Key

From SageMaker you need to be able to authenticate and interact with the Hopsworks Feature Store. As such you need to get an API key from Hopsworks. You can generate an API key by clicking on your username in the top right of the window, click on Settings and select API KEY.


You need to choose the featurestore, jobs, and project scopes when creating your API key. You should upload the API key as a secret on the AWS Secrets Manager service. The Hopsworks SageMaker integration also supports reading the API key from the AWS Parameter Store or a local file. The documentation (https://hopsworks.readthedocs.io) covers the setup for all the cases.

To use the AWS Secrets Manager, you should first find the IAM Role of your SageMaker notebook - in this case it is AmazonSageMaker-ExecutionRole-20190511T072435.


Create a new secret called hopsworks/role/[MY_SAGEMAKER_ROLE] where the [MY_SAGEMAKER_ROLE] is the same name as the IAM Role you retrieved in the previous step. The key should be api-key and the value you should be the API Key you copied from Hopsworks in the first step.



Finally we need to give the IAM role of the SageMaker notebook permissions to read the secret we just created. In the AWS Management Console, go to IAM, select Roles and then the role that is used when creating SageMaker notebook instances. Select Add inline policy. Choose Secrets Manager as service, expand the Read access level and check GetSecretValue. Expand Resources and select Add ARN. Paste the ARN of the secret created in the previous step with the AWS Secrets Manager. Click on Review, give the policy a name und click on Create policy.

After this step, your Sagemaker notebook when run as the above IAM Role will have permission to read the Hopsworks API key from the Secrets Manager service.

Hopsworks-cloud-sdk

With the API key configured correctly, in your AWS Sagemaker Jupyter notebook, you should be able to install the hopsworks-cloud-sdk library (https://pypi.org/project/hopsworks-cloud-sdk/) using PIP:

>>> !pip install hopsworks-cloud-sdk ~= 1.2

Make sure that the hopsworks-cloud-sdk library version matches the installed version of Hopsworks.

Establish the first connection

With the API Key configured and the library installed, you should be now able to establish a connection to the feature store, and start using the Hopsworks - AWS SageMaker integration.

import hops.featurestore as fs
fs.connect(
'my_instance',                       # DNS of your Feature Store instance
'my_project',                     # Name of your Hopsworks Feature Store project
secrets_store = 'secretsmanager')   # Either parameterstore, secretsmanager, or file

Try it out now with Hopsworks.ai

You can now try out the Hopsworks Feature Store and the SageMaker integration by starting a Hopsworks instance on Hopsworks.ai and running this example Jupyter notebook on your SageMaker instance: https://github.com/logicalclocks/hops-examples/blob/master/notebooks/featurestore/aws/SageMakerFeaturestoreTourPython.ipynb.

The Hopsworks Community is also available if you need help with your setup.

Upcoming improvements

Several exciting improvements are coming to the Hopsworks feature store APIs in the next couple of weeks. The most important one is a more expressive API for joining features together. The new API is heavily inspired by Pandas dataframe joining and should make life easier for data scientists. Moreover, we are adding the capability to register a small Pandas dataframe as a feature group directly from a SageMaker notebook. While we still encourage you to use a Spark environment to engineer complex features with lots of data, it will also be possible to ingest Pandas dataframes as feature groups without the need for PySpark.

Learn More:


Introducing Hopsworks.ai

>
4/27/2020
>
Steffen Grohsschmiedt

Palo Alto, 22nd April 2020 - We are pleased to introduce Hopsworks.ai, a managed cloud service providing Hopsworks and the Feature Store. Hopsworks is a platform for the design and operation of AI applications at scale. Hopsworks.ai allows you to effortlessly launch and manage Hopsworks clusters in your AWS account and integrate them with third party platforms such as Databricks and AWS SageMaker. For users of Databricks and AWS SageMaker, it enables your organisation to manage and share your machine learning data with the Feature Store and gives your Machine Learning teams the ability to develop, train and deploy AI applications at scale following best practices established by industry leaders such as Uber (Michelangelo) and Airbnb (Zipline).

Product overview

Hopsworks.ai offers two product tiers, a free version and an enterprise version. The free version is for individuals or organisations that want to get started with Hopsworks and the Feature Store. The enterprise version provides advanced features and support to organisations building production machine learning applications at scale. The enterprise version is in early access and available upon request.

The free and enterprise versions have the following features, respectively:

Free

  • Industry’s first Feature Store for ML
  • Integration with Sagemaker, Databricks, Kubeflow and other ML platforms
  • Jupyter Notebooks for development
  • End-to-end Machine Learning workflows, orchestrated with AirFlow
  • Parallel Hyperparameter optimisation with Maggy
  • Online Model serving for TensorFlow and Scikit-Learn
  • Advanced security and privacy with Dynamic Role-based Access Control
  • Runs in your AWS account
  • EBS-based storage
  • Single cluster with one node and no GPUs

Enterprise (Early Access)

  • All features of Free
  • Unlimited clusters scaling to any size
  • Elastic sizing of clusters
  • Distributed Deep Learning with GPUs
  • Scalable consistent storage backed by S3
  • Kubernetes-based online model serving
  • Automated backups and upgrades
  • High Availability for mission critical services
  • Pay-per-use
  • First class support included

Getting started with Hopsworks.ai

Sign up for free. If you quickly want to try the platform without connecting an AWS account, then you can make use of 30 days of free demo access. All you have to do to go further and get your own cluster is to connect your AWS account, see Getting started with Hopsworks.ai.

If you are interested in the enterprise version of Hopsworks.ai, then reach out to us to apply for the early access program: Contact Logical Clocks.

For technical questions regarding Hopsworks.ai you can reach out to us on Hopsworks Community.

A Feature Store for Databricks and AWS Sagemaker

Hopsworks.ai seamlessly integrates with Amazon SageMaker and Databricks, providing them with a Feature Store, usable directly from notebooks in those platforms.  The Hopsworks Feature Store also offers Python, Scala and Java libraries to support custom integrations. For detailed information, see Feature Store Integrations.

Databricks

To connect from Databricks, we offer both a native PySpark/Spark SDK based on Dataframes and a Python SDK based on Pandas. The native PySpark/Spark SDK is recommended for production workloads but requires you to establish network connectivity by either setting up VPC peering or placing your Hopsworks cluster in the same VPC and availability zone as the Databricks cluster. See Databricks Quick Start for documentation for how to connect to the Feature Store from Databricks.

AWS SageMaker

We provide a Python SDK based on Pandas that supports integration with Amazon SageMaker. See SageMaker Quick Start for documentation on how to connect to the Feature Store from Amazon SageMaker.

Custom integrations

The Cloud, Python and Scala/Java SDKs can be used to build custom integrations. See Using the Feature Store from any Python environment (KubeFlow) for documentation on how to connect to the Feature Store. For any technical questions, you can reach out to us on Hopsworks Community.

Introducing the Hopsworks 1.x series!

>
2/17/2020
>
Theofilos Kakantousis

Hopsworks 1.x series brings many new features and improvements, ranging from services such as the Feature Store and Experiments, to enhanced support for distributed stream processing and analytics with Apache Flink and Apache Beam, to building Deep Learning pipelines with TensorFlow Extended (TFX), to code versioning support for Jupyter notebooks with Git, to all-new provenance/lineage of data across all steps of a data engineering and data science. We are also excited that Hopsworks 1.x is the back-bone of the all new Managed Hopsworks platform for AWS, Hopsworks.ai (https://www.hopsworks.ai/). 

Feature Store

Hopsworks 1.x brings significant Feature Store improvements ranging from updated UI components to connectivity with external systems and feature discovery. Most notably:

  • Being able to store training datasets in external data sinks (such as S3) but still track the metadata in hopsworks
  • Improved online feature store experience 
  • Support Apache Hudi as storage format for feature groups, to allow for upserts and time travel
  • Pluggable storage of feature groups and training datasets by using storage connectors to external systems such as S3, JDBC, HopsFS
  • UI-support for Citizen Data Scientists who can: (1) generate training datasets from the UI by selecting features; (2) generate feature groups using SQL; (3) update feature group and training dataset statistics.
  • On-Demand feature groups: allow the user to define feature groups using SQL that are computed on-demand using an external JDBC connection, without having to cache the data in Hopsworks

Users of Hopsworks Enterprise can now easily connect to the Feature Store from their Databricks notebooks and Amazon Sagemaker. Documentation for connecting with these two platforms can be found at hopsworks.readthedocs.io and a plethora of notebooks are available at our hops-examples GitHub repository.

All-new Experiments UX and Model Registry

Hopsworks 1.x brings an all-new Experiments user experience with a revamped user interface and new functionalities. To make use of the Experiments service, Data Scientists can use the hops Python library, a rich experiment API for Data Scientists to run their Machine Learning code, whether it be TensorFlow, Keras, PyTorch or another framework with a Python API. Experiments also provide features such as automatic versioning of notebooks and the Python environment, parallel hyperparameter tuning algorithms, and managed TensorBoard. Along with the experiments API comes Maggy, an in-house built framework for asynchronous algorithms for parallel hyperparameter tuning and parallel ablation studies.

The new Experiments user interface allows users to easily track experiments and compare them using metrics exposed by the API and defined by the Data Scientists in their programs/notebooks. With one click, Hopsworks users can now launch the TensorBoard of a current or past experiment, view and export the Python anaconda environment that the particular experiment run used, preview within Hopsworks or download the notebook used for this experiment, view the experiment’s logs, view historical information about the underlying Spark framework such as configuration parameters and execution information. 

Hopsworks 1.2 Experiments UI showing an hyper-parameter optimization experiment


Last but not least, Data Scientists can now manage their models in the new Model Registry service with UI support. The Model Registry lists all models developed (exported) from different experiment runs and more importantly it makes it easy for Data Scientists to discover, search and compare models developed by other Data Scientists within the current project. Users can also easily compare the performance of different versions of the same model using the metrics supplied in their experiments. Each model provides a link to the experiment that create it, providing provenance to the code and Python environment used to create the model, enabling models to be more easily reproduced. 

Hopsworks 1.2 Model Repository. From any model, you can navigate to the experiment 

Tracking experiments, models and feature data that was used for developing the models is managed by Hopsworks by using the provenance capabilities of HopsFS. Hopsworks can now track operations on files that are created/deleted and models that are developed from programs that use Experiments APIs. Effectively, users in Hopsworks can now navigate from the Feature Store to train/test data to experiments (programs and Python environments) to models.

Project-based multi-tenant Elasticsearch

Hopsworks 1.x expands its unique project-based Elasticsearch multi-tenancy. Compared to previous Hopsworks versions, users of the platform now get programmatic access to Elasticsearch indices that are private to their projects (workspaces). By private, we mean that users in a Hopsworks project can only access the indices owned by that project and not any other indices belonging to other projects. Programmatic access means that users can use Elasticsearch APIs from within their Hopsworks jobs. For example, Spark dataframes can be securely written and read directly from Elasticsearch from with a Spark (Scala) or a PySpark (Python) notebook. Project-based multi-tenancy is implemented by integrating the open-distro security plugin, open-sourced by Amazon, with Elasticsearch OSS.

Enhanced Jobs UI

Users will now notice changes in the Hopsworks Jobs UI, with a sleeker design and more functionalities available from within the Jobs page. It is now possible to quickly navigate through different runs of a job and click-to-view logs in full-screen mode. Further information is available in Hopsworks Jobs user guide.

New Layout of jobs UI in Hopsworks


New notebook services

Hopsworks 1.x adds support for working with JupyterLab as part of the notebook service offering of the platform. Users can now select their favorite notebook IDE between JupyterLab and Jupyter Notebook from within the Hopsworks Jupyter dashboard.

It is now easier than ever to get started with writing Python programs that utilize a GPU. Hopsworks Enterprise integrates with Kubernetes to enable users to allocate GPUs to the container running their Jupyter notebook. Now, users can now train models using either the Python kernel or the PySpark (sparkmagic) kernel. 

In addition, Hopsworks 1.x brings a long-awaited feature, git support for notebooks. Users can now set their GitHub repository in the Hopsworks Jupyter dashboard and then Hopsworks automates the process of cloning the repository, checking out branches and pushing changes to the notebooks back to GitHub. There is also Git plugin support in JupyterLab.

Support for Apache Flink, Apache Beam, and TensorFlow Extended (TFX)

Support for running Apache Flink has been completely re-engineered as part of the Hopsworks 1.x series. Flink is now a first-class citizen in the Hopsworks Jobs service as users can now create a new Flink job (cluster) by setting various parameters and access the Flink dashboard and Flink history server from within the Jobs UI. More information on how to use Flink from the UI or even launch Flink programs programmatically can be found in our Hopsworks-Flink docs.

Flink History Server in Hopsworks


Apache Beam is now also supported in beta. Full-fledged support will be added in the next 1.3 release along with the latest versions of Beam, Flink and TFX. Hopsworks supports developing and running Beam programs with the Beam Portability framework and the Flink runner. To ease development, the hops Python library provides the beam module that automates collecting logs, managing Beam related services, and distributing binaries. Examples can be found in hops-example/flink and extended documentation in Hopsworks-Beam docs.

Building on Beam support, Hopsworks now provides initial support for building ML pipelines with TensorFlow Extended (TFX) components on Beam with the Flink runner. Details on how the integration of Flink, Beam and TFX is implemented in Hopsworks is presented in our talk at BigThings conference 2019, link to video, in our docs and our examples.

Hopsworks.ai

Hopsworks 1.x is the engine behind Hopsworks.ai, the platform for Data-Intensive AI in the cloud. Hopsworks.ai enables businesses or individuals to seamlessly deploy Hopsworks with the Feature Store in an AWS account. Visit https://www.hopsworks.ai/ on more information on how to quickly get started.

Release cycle and new support page

The first three releases of this series, 1.0, 1.1 and 1.2, kick off with more than 300 JIRAs including new features, improvements and bug fixes. An important change in the Hopsworks release cycle is the move to timely releases. A new release of Hopsworks will now be issued every ~6 weeks, allowing for faster availability of new features and making upgrades smoother than before. 

Detailed release notes can be found at the Hopsworks GitHub repository and important release notes with any breaking changes and upgrades notes are available in the Hopsworks version upgrades documentation page.

Last but not least, Hopsworks community support has moved under a new roof at https://community.hopsworks.ai where Hopsworks developers answer any questions you may have regarding Hopsworks and Hopsworks.ai platforms!

Welcoming AMD/ROCm to Hopsworks

>
10/14/2019
>
Robin Andersson

With Hopsworks 1.0, we have now added support for AMD GPUs with ROCm. This enables you to take your TensorFlow programs and run them, unchanged, on Hopsworks. RadeonOpenCompute (ROCm) is an open-source framework for GPU computing that supports multi-GPU computing to scale out training and reduce the time needed to train models.

ROCm is signficant for data scientists as, until now, they have had a lack of choice in GPU hardware when training models. With the recent upstreaming of ROCm changes to TensorFlow, ROCm is now a first-class citizen in the TensorFlow ecosystem. This enable Enterprise AI platforms, such as Hopsworks, to allow TensorFlow applications to run, unchanged, on AMD GPU hardware. To further enable deep learning on many GPUs in a cluster, Logical Clocks have also added support for resource scheduling of AMD GPUs in Hopsworks clusters, with Hops YARN. With Hopsworks and AMD GPUs, developers can now train deep learning models much faster on frameworks like TensorFlow using tens or hundreds of GPUs in parallel.

To learn more, read our whitepaper ROCm in Hopsworks, see our talk and demo from the Databricks Summit 2019 or the O'Reilly AI Conference 2019.

Hopsworks adds support for AMD GPUs by adding ROCm support to YARN, Hopsworks' resource scheduler.

Goodbye Horovod, Hello CollectiveAllReduce

>
10/22/2018
>
Robin Andersson

tl;dr Distributed Deep Learning is producing state-of-the-art results in problems from NLP to machine translation to image classification. Hopsworks continually pushes the state-of-the-art in enterprise support for distributed deep learning, and we are doing so again by supporting the new CollectiveAllReduceStrategy, a part of Keras/TensorFlow Estimator framework. As a consequence, we are dropping support for Horovod.

I usually tell my students that time in the Deep Learning community is measured in dog years. The rate of advancement is so rapid that methods, frameworks, and algorithms that were state-of-the-art just one year ago (in human years) may already be obsolete. However, one constant among all the change is that deep learning is going distributed.

ICLR 2019 lessons thus far: The deep neural nets have to be BIGGER and they’re hungry for data, memory and compute.

- Prof Nando de Freitas, Oxford Univeristy (Oct 2018)

It is starting to look like deep learning workflows of the future feature autotuned architectures running with autotuned compute schedules across arbitrary backends.

- Andrej Karpathy – Head of AI @ Tesla (March 2017)

As we can see from the figure above, if we want to train state-of-the-art (SoTA) models, there are five common approaches. Improving algorithms for regularization and optimization tends to be the focus of academia and hyperscale AI companies. For the remaining three approaches, just going distributed enables you to improve the accuracy of your models by

  1. discovering better hyperparameters (the adam/amsgrad controversy shows their importance),
  2. training with larger amounts of training data (see how Facebook improved the SoTA in ImageNet by adding 2bn more images), and
  3. discovering better neural network architectures with AutoML techniques (see here how Google used genetic algorithms to design a SoTA Deep Neural Network (DNN) for ImageNet).

[Adapted from OpenAI]

Distributed deep learning has been pushing the SoTA in AI for several years. As you can see in the diagram above from OpenAI, there has been a 3.5 month doubling time for the amount of compute (number FLOPs) required to train state-of-the-art machine learning systems since 2012. That is, AlphaGo Zero required 300,000x more compute in 2017 than AlexNet required in 2012. While Nvidia, Google, and, more recently, AMD have been doing a good job increasing the transistor count for their hardware accelerators, they are not doubling their compute every 3.5 months. What has happened is that training of DNNs has gone distributed. For example, in October 2018, Google introduced a new framework for NLP called BERT which both heralds new possibilities for transfer learAning in NLP and pushes out the SoTA in well known NLP problems. Tim Dettmers’ analysis of BERT training times shows that it would require about 396 days to train on a single GPU – the latest Nvidia RTX 2080 Ti. However, by going distributed (and buying 20 DeepLearning11 servers – the price in Oct 2018 is roughly $12k dollars each when using Nvidia 1080Ti cards), you could reduce training time to just 1-2 days. Although this type of investment is impossible for most developers, there are many enterprises that can afford such an investment in hardware, not just the hyperscale AI companies.

Data Parallel Distributed Training

Much existing enterprise data resides in data lakes (Hadoop and S3). It is currently still a challenge to build scale-out end-to-end workflows for Deep Learning and Machine Learning (ML) on such data lakes. Hopsworks is a platform designed to help enterprises build a scale-out AI platform around a data lake. In previous blogs, we talked about the importance of a distributed filesystem for an AI platform and how we optimize the use of GPUs in ML workflows. In the rest of this blog, we discuss distributed training. In particular, data parallel training with stochastic gradient descent (aka data parallel training). In data parallel training, we expect that our ML model being trained will fit in the memory of a single GPU. When we add more GPUs to training, we expect to reduce overall training time by dividing each training batch among the available GPUs, and have them process data in parallel. Data parallel training is, however, a strong scaling problem, in that communication is required between the GPUs. The batch size limits the amount of parallelism possible in data-parallel training, and therefore it is common to increase the batch size as more GPUs are added. (Also, many follow the linear scaling rule, increasing the learning rate linearly with the batch size). When the amount of time required to communicate updates in weights between GPUs grows linearly, network I/O can quickly become a bottleneck preventing training from scaling further.

In data-parallel training, for each training iteration, each Executor/GPU receives a non-overlapping partition of samples from the batch. This is most easily achieved using a shared, distributed filesystem that holds the training data, such as HopsFS.

The first generation of data-parallel distributed training was dominated by the parameter-server architecture. In the parameter-server architecture, one or more parameter servers holds the current model and synchronizes it between a set of worker-nodes for each iteration. The problem with this type of training is that the network links between the parameter server and the workers become a bottleneck. The workers cannot utilize their full bandwidth, while the bandwidth of the parameter server becomes a bottleneck, slowing down training. This problem motivated the Ring-AllReduce technique for training.

Ring-AllReduce

Baidu first introduced the Ring-AllReduce algorithm in Feb 2017 to data parallel training for DNNs, first on the PaddlePaddle platform, then later porting it to TensorFlow’s contrib package. (AllReduce was originally developed for HPC applications.) In Ring-AllReduce, each node corresponds to a hardware accelerator (GPU), see illustration below. During training, the servers work in lockstep processing a large minibatch of training data. Each server computes gradients on its local shard (partition) of the minibatch and each server then both sends and receives gradients to/from their neighbours on the ring, in a bandwidth-optimal manner, utilizing each node’s upload and download capacity. All gradients travel in the same direction on the ring, and when all servers have received all the gradients computed for the minibatch, they update the weights for their local copy of the model using an optimization algorithm such as stochastic gradient descent. Note that all servers will have the same copy of the model after this update step. In effect, the model is replicated at all servers in the system.

Ring-AllReduce Algorithm for Data Parallel Training
[From https://preferredresearch.jp/2018/07/10/technologies-behind-distributed-deep-learning-allreduce ]

Ring-AllReduce Algorithm for Data Parallel Training in 5 steps. Workers compute and  share gradients without any Parameter Servers  
[From https://eng.uber.com/horovod/  ]

Horovod

In August 2017, Uber released Horovod as a faster, easier to use library for distributed training of DNNs on TensorFlow. Horovod has since been extended to support Keras and PyTorch. Alexander Sergeev, the main developer, produced a great API for distributed training, and great quality software, that for the first time made distributed training easy to use for Data Scientists (compared to the previous parameter server models by TensorFlow and TensorFlowOnSpark). As can be seen from the table below, the amount of code required to write a “HelloWorld” distributed training example was reduced dramatically.

Horovod was built on stable frameworks – AllReduce is from NCCL2 by Nvidia and inter-process communication is handled by MPI. As can be seen from the benchmark diagram below, Horovod scales linearly for small models up 256 GPUs, and still has a respectable scaling factor for large models (VGG-16) – it massively outperforms the parameter server model for distributed training. Due to this combination of a clean API and performance, Horovod has rightly gained widespread adoption.

[ Horovod Performance, from https://github.com/uber/horovod ]

CollectiveAllReduceStrategy in Keras/TensorFlow

As we saw with the introduction of eager mode in TensorFlow to counter the rapid adoption of PyTorch (with its more Pythonic API), the TensorFlow team are flexible enough to co-opt external innovations into their platform. Ring-AllReduce has shown itself to be, in general, a better algorithm for data parallel training than the original parameter server model. As such, in TensorFlow 1.11 (September 2018), Google introduced a new CollectiveAllReduceStrategy to the DistributedTraining part of the Estimator framework, providing Ring-AllReduce distributed training on both Keras/TensorFlow and TensorFlow. Google called it a distribution strategy that uses collective ops for all-reduce. Google have not yet released any official benchmarks for CollectiveAllReduceStrategy, but we at Logical Clocks were just too curious, and jumped in to perform our own benchmarks. But before we look at the numbers, let us look at how we support programming with CollectiveAllReduceStrategy.

Firstly, we run TensorFlow inside PySpark applications – so applications are written fully in Python. You can seen the pseudo-code for distributed training below. In the example below, the distributed_training function will run on an Executor in PySpark and have its own dedicated GPU (managed by Hops-YARN). We can launch PySpark applications with anything from 1 to 100s of Executors, and distributed training will scale to use the GPUs available – the code below will not change when we change the number of executors.

All of the workers that execute the above code snippet will write to shared directories in our distributed filesystem. This relieves us, the Data Scientists/Engineers, from having to collect and aggregate logs, models, and checkpoints from all workers – as you would have to do when working with the local filesystem.

There are a few gotchas in CollectiveAllReduce, as this feature is still currently alpha in TensorFlow. Firstly, you have to specify the GPU order in the ring statically, using the gpu_indices parameter. Luckily, the hops API will do this for you, transparently. For performance, you may also need to configure the batch size for merging tensors, allreduce_merge_scope. You can set allreduce_merge_scope to ‘1’ for no merging, or set it to ’32’ for higher throughput. Hops sets allreduce_merge_scope to a sensible default of ‘32’, trading off a small increase in latency when aggregating gradients for increased network I/O throughput.

The Estimator framework in TensorFlow relieves the programmer from having to aggregate all the application logs, tensorboard logs, manage checkpoints, and export models - provided you have a distributed filesystem. Due to its HDFS compatibility, HopsFS works seamlessly with the TensorFlow Estimator Framework. HopsML’s experiment API also adds additional information needed to reproduce experiments, such as the version of the IPython notebook and the conda environment used for this experiment.

Benchmarks

Big model => High Network Traffic
Network I/O is the most common bottleneck when training. GPUs idle waiting for gradient updates from their neighbors. This is more likely to happen if you have a large model, too low network bandwidth between hosts, and/or too low memory bus bandwidth.

Small model => Low Network Traffic
The easiest way to prevent network I/O bottlenecks is to make more bandwidth available between GPUs - use 25 Gb/s or better networking and high memory-bandwidth buses, like NVLink or PCIe 4.0.

We benchmarked Horovod against CollectiveAllReduceStrategy for both a small model (GoogleLeNet, aka Inception v1) and a big model (VGG19) to see if the additional communication overhead would lead to different results. In each experiment, we tested with 8 GPUs each. Note, our results are not yet good enough for peer review, so caveat emptor. For example, we had different versions of TensorFlow (Horovod experiments were done earlier in the year), and we haven’t tested on multiple GPU servers yet.

VGG19
GoogLeNet

Evaluation

The results surprised us, positively. CollectiveAllReduceStrategy outperformed Horovod. Initially, our results were not good for CollectiveAllReduceStrategy. However, we received help from the TensorFlow mailing list – thanks, telling us we had to set the following switch, which made a huge difference:

–rewriter_config=”scoped_allocator_optimization: ON scoped_allocator_opts {enable_op: ‘CollectiveReduce’}”

As Horovod is a thin layer over NCCL2 for AllReduce, we expected that Nvidia’s implementation would outperform Googles’, as Nvidia have access to the lower level bus bandwidth measurements. One of the optimizations that possibly explains the improvement over Horovod is the use of a ScopedAllocator. In Paul Tucker’s words, from the TensorFlow mailing list:

“–allreduce_merge_scope controls a specific case of a general optimization called ScopedAllocator. The idea behind ScopedAllocator is to look for potentially parallel (in the dataflow graph) applications of the same elementwise Op, and where they can be found replace them all by a single Op instance without extra data copying. This is possible if we can arrange for the inputs of those Ops to be computed into adjacent regions of a single previously allocated tensor of the right dimensions. Since CollectiveReduce is elementwise, it fits the model and is the first such Op we’ve applied this optimization to.

The ScopedAllocator optimization is an improvement if the cost of the Op being merged has a significant min cost or is sublinear with tensor size, and if all the inputs of the nodes it merges are available at roughly the same time, so that we don’t stall progress while waiting for the last input to become available. The –allreduce_merge_scope parameter to the benchmark program actually controls the distance in the graph that we’re willing to look for potential nodes to merge. When it’s 1 no merging will be done. When it’s 32 then the gradients subject to all-reduce are batched into groups of 32 based on the order passed into the graph builder and the optimization is limited only to Op instances in the same group. This is obviously crude and non-optimal but seems to work pretty well for now, at least with the benchmark programs. Given time and manpower a more sophisticated approach could be implemented.”

Hopsworks

Distributed Deep Learning Pipeline on Hopsworks

Hopsworks is a full stack Data Science platform that supports distributed deep learning, including Hyperparameter Optimization and Distributed training (see Spark Summit talk). Data Scientists can write end-to-end machine learning pipelines in PySpark, TensorFlow, Keras, PyTorch, and orchestrate multi-job pipelines in Airflow (DAGs written in Python), see docs for details. Hopsworks also supports model serving on Kubernetes, including TensorFlow serving server. In contrast to Google, who promote Apache Beam for the ETL stage of pipelines, we use PySpark in Hopsworks, both for ETL stages and for distributing both hyperparameter optimization and distributed training. PySpark works with Hops YARN to allocate GPUs for executors when they are needed for training DNNs. Since late 2017, Hopsworks has supported Horovod for distributed training. This is despite the fact that Horovod builds on MPI, which is not natively supported by YARN. In Horovod, MPI transparently sets up the distributed infrastructure necessary for workers to communicate with each other – underneath YARN. This required coordination and cleanup by Hopsworks, resulting in complex, hard-to-maintain code with many failure edge cases (e.g., the Spark driver dies, but the MPI processes live on – Hopsworks now needs to cleanup – even if Hopsworks itself fails while cleaning up)).

In contrast, we were able to integrate TensorFlow CollectiveAllReduce into Hopsworks in a week’s work. We already had the PySpark framework in place, and we had already worked with building and exporting TF_CONFIG for each worker as part of our support for TensorFlowOnSpark (which we are also now deprecating). TensorFlow CollectiveAllReduce will be available in Hopsworks 0.6, due for release in October 2018.

The number of lines of code required to write a MNIST distributed training example in TensorFlow is lower for AllReduce frameworks than for Parameter Server frameworks.

Summary

The deep learning community is moving fast, with new innovations appearing almost weekly. We, at Logical Clocks, are keeping at the edge by being the first platform to support TensorFlow’s new CollectiveAllReduceStrategy. As part of bringing in the new, we’re leaving out the old, which means farewell to Horovod and TensorFlowOnSpark.

References

Introducing Hopsworks

>
3/21/2018
>
Jim Dowling

tl;dr Hopsworks is a data platform that integrates popular platforms for data processing such as Apache Spark, TensorFlow, Hops Hadoop, Kafka, and many others. All services provided by Hopsworks can be accessed using either a REST API or a User Interface. But the real value add of Hopsworks is that it makes Big Data and AI frameworks easier to use by introducing new concepts for collaborative data science (Projects, Users, and Datasets) and ubiquitous support for TLS certificates, opening the platform for integration with the outside world (IoT/mobile devices and external applications).

Hopsworks makes the most popular data parallel processing platforms easy to use via a UI and REST API.

The Problem – Collaborative Data Science

The erudite Thomas Dinsmore describes the desirable properties of a collaborative data science platform, that we think aptly describe the Hopsworks platform:

“The rise of collaborative data science leads organizations to adopt open data science platforms that do the following:

- Provide a shared platform for all data science contributors
- Facilitate the use of open data science tools (such as Python and R) at scale
- Provide self-service access to data, storage, and compute
- Support a complete pipeline from data to deployment
- Include collaborative development tools
- Ensure asset management and reproducibility”

Introducing Hopsworks

Hopsworks is a data platform that includes the most popular and scalable data storage and processing systems in the Hadoop ecosystem and beyond. In Hopsworks, all services are exposed via a single REST API and security is managed, end-to-end, using TLS certificates and new abstractions required for multi-tenancy, based on Projects. For humans, Hopsworks is accessed via a user interface and allows users to access data, services, and code through a new project abstraction. A project is a sandbox containing datasets, other users, and code. Users familiar with GitHub will recognize a project as the equivalent of a GitHub repository – users manage membership of the project themselves and also what code and data should be in the project. In a project, a user can have the role of “Data Owner” (the administrator) or a “Data Scientist”. Data Scientists are restricted to only uploading programs, running programs, and visualizing results. Data Scientists are not allowed change membership of the project or import/export data from the project. This enables Data Owners to manage the analysis of sensitive datasets within the confines of a project by inviting a Data Scientist into the project to carry out the analysis within the project. In the background, for each project that a user is a member of, we will construct a new Hadoop identity and create a new TLS certificate for the “project-specific user” (or projectUser, for short).

Users manage membership of projects themselves. A “Data Owner” can add a “Data Scientist” to a Project.

Share Datasets like in Dropbox

Some datasets will need to be made in available in more than one project. Rather than storing a copy of the dataset in more than one project, which is both expensive and error-prone, a data owner can share a dataset from her project with another project, making it available in that project with either read-only or read-write privileges. Datasets can also be made public within an organization, so users can add them to their projects in a self-service manner.

The Data Owner of the “Raw” Project is sharing a dataset with the “Derived” Project.

In the Hopsworks UI, sharing a dataset is like Dropbox. Right-click on the folder to share it.

First Class Python Support with Conda/Pip

An important and unique feature of Hopsworks (unique among Hadoop platforms, in any case) is that each project can have its own conda environment on all hosts in the cluster. If a Data Owner enables conda for her project, a conda environment for that project is provisioned on every host in the Hops cluster. The user can then install the libraries and versions of the libraries that she wants just for her Project. Pyspark and TensorFlow jobs run in that project will run in the project’s conda environment on every host in the cluster. The conda environment can be initialized with the desired version of python, such as 2.7 or 3.6. There is only one conda environment supported per project.

A user can search for python libraries on either conda channels or with pip, and then install the desired library/version on all hosts in the cluster (for just her project’s environment).

TensorFlow

Thomas Dinsmore again writes:

“I have to run a hundred experiments to find the best model,” he complained, as he showed me his Jupyter notebooks. “That takes time. Every experiment takes a lot of programming, because there are so many different parameters. We cross-check everything manually to make sure there are no mistakes.”

Hops supports GPUs-as-a-Resource, and Hopsworks allows users to start a job asking for GPUs for executors. YARN node labels can be used when there are different types of GPU servers.

As Hops manages GPUs-as-a-Resource, from within Hopsworks, we can start tensorflow applications with 10s or even hundreds of GPUs. Hopsworks supports APIs for massive parallel experimentation (such as hyperparameter optimization) via the Hops Python API. As Hopsworks supports distributed TensorFlow (including Horovod and TensorFlowOnSpark), we make it easier to use by running TensorFlow applications from within PySpark applications.

In practice this means, you put your TensorFlow code inside the wrapper function below:

def wrapper():
   from hops import hdfs
   hdfs.log('Hello Hopsworks')
   # tensorflow code goes here

from hops import experiment

experiment.launch(spark, wrapper)

Hops’ experiment API allows us to easily support hyperparameter optimization across tens or hundreds of GPUs with the following code example:

def wrapper(learning_rate, dropout):
   
   #tensorflow code goes here

args_dict = {'learning_rate': [0.1, 0.3], 'dropout': [0.4, 0.7]}

experiment.launch(spark, wrapper, args_dict)

Tensorboard support is also easily added as follows:

hdfs_path = experiment.launch(spark, wrapper, args_dict)

tensorboard.visualize(spark, hdfs_path)

This will enable the user to debug her application with Tensorboard from the Hopsworks UI:

The Tensorboard UI can be viewed for every TensorFlow application. Results from hyperparameter optimization runs are also aggregated into a single Tensorboard to easily discover the best hyperparameters for that experiment.

Spark

Spark is supported for both batch applications and streaming applications. There is extensive API support for launching Spark Jobs in both the Python API and the Java/Scala API. Hops’ APIs can make programming Spark applications much easier, by hiding information about the location of services (what IP are the Kafka brokers running on?), transparently distributing TLS certificates to executors, and providing configuration files for services (like Kafka):

Hops Python and Java/Scala APIs make it easier to securely use services. I this example, we can replace tens of lines of boilerplate code for setting up a Properties file for Kafka with a single API call to Hopsworks.

First Class Streaming Support

Now you don’t need to choose between Spark Streaming and Flink when you decide on a data platform. Hopsworks supports both Flink and Spark Streaming, through YARN. We also support Kafka-as-a-Service. Kafka topics (used as channels for producing/consuming messages) are private to projects. Users can create a topic with just a few clicks. Topics can also be shared with other projects enabling real-time communication between projects. Hopsworks also provides an Avro schema registry for Kafka topics that is accessible via Hopsworks’ REST API.

Hopsworks has REST/UI support for creating/deleting/managing Kafka topics and their Avro Schemas.

An example of how to manage a streaming application with Hopsworks is shown below. Data arrives at a Kafka topic as a stream (the external applications uses a certificate downloaded from Hopsworks to securely communicate with the topic and to authenticate itself). The Engineering project, process the stream and filter/enrich the data before it is forwarded to different Kafka topics for different projects, as well as sent to a sink, such as Apache Hive for offline analysis. The FX team below can then process the stream arriving at their FX topic without further help from IT, and manage access to that data themselves.

Teams manage their own historical data (in Hive, SparkSQL) as well as streaming data (in Kafka topics).

Jobs

Hopsworks provides UI and REST support for running Jobs. A job could be a Spark application or a Spark workflow, a TensorFlow application (parallel experimentation or distributed training), or a Flink application. Jobs can be scheduled for periodic execution or run on-demand. Spark Jobs are monitored using a Grafana UI that provides information about Spark, HDFS, and YARN resource consumption. Spark and TensorFlow Job logs can be accessed either in real-time via a Kibana UI. All jobs (Spark, TensorFlow, Flink) have their logs aggregated by YARN and they can be read after the job completes from a “Logs” dataset in the project.

Jobs can be scheduled or run from a Jobs UI that also provides access to job logs and the Spark History Server UI, Kibana Logs for the Job, and Grafana logs for the Job. The Jobs UI is used to run workflows. Hopsworks provides a REST API for composing a workflow of Spark Jobs together. The workflow can then be run as a single Spark Job.

Spark Jobs can use log4j to write logs in real-time to Elasticsearch. These logs can be viewed in real-time with Kibana from the Jobs UI. Hopsworks also makes the complete application logs available in a Logs dataset, but they are only available after the Job has completed.

Spark Jobs provide a metrics.properties file to write performance statistics to InfluxDB, and those logs can be viewed in real-time with Grafana from the Jobs UI. This information is useful for performance debugging of Spark apps.

Apache Hive

Each project in Hopsworks can have its own Apache Hive database, by enabling the Hive service for that project. By default, only members of the project can read/write from the Hive database. Hive databases can also be shared with other Projects, similar to how datasets are shared between Projects. Hopsworks supports both Hive-on-Tez and Hive/LLAP.

Kerberos/LDAP Integration

Hopsworks supports integration with Active Directory and LDAP servers. Users authenticate with a Kerberos keytab or LDAP credentials with Hopsworks. The user then receives a JWOT token that is used to communicate over TLS with the REST API (or via the browser and the Hopsworks UI). Hopsworks also supports native password-based authentication and 2-Factor authentication using either Smartphones (Google Authenticate) or Yubikey (for more secure environments),

Kerberos and LDAP are supported through authentication realms in Hopsworks.

Jupyter

When a user starts a Jupyter notebook server, she decides how many executors (how many CPUs/memory per executor) and how many GPUs they want. The notebook will acquire those resources when it is run.

Hopsworks supports Jupyter Notebooks, where each projectUser can launch his/her own Jupyter Notebook server. Jupyter notebooks are stored, by default, in the “Jupyter” dataset in HopsFS, but notebooks can be opened from any dataset within the Project. We support the sparkmagic Jupyter kernel, which we use to run both Spark and TensorFlow applications (Flink is not supported by Jupyter).

Apache Zeppelin

Hopsworks supports Zeppelin as a Notebook. There is a single Zeppelin Server for Hopsworks, and each project can start its own interpreter. Interpreters are shared between all users in a project, making Zeppelin suitable for collaborative work on notebooks between different users. Zeppelin notebooks are stored in HopsFS and available from the “Notebooks” dataset.

Zeppelin on Hopsworks support Spark-Livy/Hops/Flink interpreters for running jobs.

REST API

Hopsworks provides a REST API for authentication, running Spark jobs, producing to Kafka topics, training and deploying TensorFlow applications, and performing all the functions that are available in the Hopsworks User Interface. This enables Hopsworks to be an embedded platform, that is included as a component in other systems – truly, Big-Data-and-AI-as-a-Service. External clients can download either a certificate for accessing the REST API or access it using JWT tokens. All communication with the REST API is encrypted over TLS/SSL connections. Hopsworks also has a library for Android devices, allowing them to seamless produce data in real-time to Kafka and call inferencing functions on trained TensorFlow models.

Data Governance

In Hopsworks, extended metadata is built-in to the platform.The Hopsworks UI supports free-text search for data assets using Elasticsearch. Hopsworks also provides a metadata designer tool to design or import a schema with which to describe data assets. Non-technical users can then curate data through our user interface, while the extended metadata is automatically exported to Elasticsearch (using our own ePipe system). For users, Hopsworks provides a simple unified interface for managing data assets. Hopsworks also has built-in auditing capabilities, supported by the default open-source web application server shipped with Hopsworks, Payara.

Metadata schemas can be designed with an intuitive UI.

Metadata can be added to data assets from the UI by attaching metadata schemas to them and editing the metadata in the UI. The data asset can then be discovered using free-text search (using an Elasticsearch backend).

Free-text search for data assets is supported. Public datasets are discoverable from the landing page, but search within a project can be used to discover data assets only within the scope of that Project.

Summary

Hopsworks is a new data platform that makes working with large-scale data processing platforms easier for both humans, via an integrated UI, and external devices and applications, via an integrated REST API. Hopsworks is built on the new abstractions of Projects, ProjectUsers, and Datasets and it is enabled by a new distributed metadata architecture in Hops Hadoop.

More Reading

Millions and millions of files for Deep Learning with HopsFS

>
10/3/2018
>
Jim Dowling

Datasets used for deep learning may reach millions of files. The well known image dataset, ImageNet, contains 1m images, and its successor, the Open Images dataset has over 9m images. Google and Facebook have published papers on datasets with 300m and 2bn images, respectively. Typically, developers want to store and access these datasets as image files, stored in a distributed file system. However, according to Uber, there’s a problem:

“multiple round-trips to the filesystem are costly. It is hard to implement at large scale, especially using modern distributed file systems such as HDFS and S3 (these systems are typically optimized for fast reads of large chunks of data).”

Uber’s proposed solution is to pack image files into larger Apache Parquet files. Parquet is a columnar database file format, and thousands of individual image files can be packed into a single Parquet file, typically 64-256MB in size. For many image and NLP datasets, however, this introduces costly complexity. Existing tools for processing/viewing/indexing files/text need to be rewritten. An alternative approach would be to use HopsFS.

HopsFS solves this problem by now being able to transparently integrate NVMe disks into its HDFS-compatible file system, see our peer-reviewed paper to be published at ACM Middleware 2018. HDFS (and S3) are designed around large blocks (optimized to overcome slow random I/O on disks), while new NVMe hardware supports fast random disk I/O (and potentially small blocks sizes). However, as NVMe disks are still expensive, it would be prohibitively expensive to store tens of terabytes or petabyte-sized datasets on only NVMe hardware. In Hops, our hybrid solution involves storing files smaller than a configurable threshold (default: 64KB, but scales up to around 1MB) on NVMe disks in our metadata layer. On top of this, files under a smaller threshold, typically 1KB, we store replicated in-memory in the metadata layer (due to their minimal overhead). This design choice was informed from our collaboration with Spotify, where we observed that most of their filesystem operations are on small files (≈64% of file read operations are performed on files less than 16 KB). Similar file size distributions have been reported at Yahoo!, Facebook, and others.

The result is that when clients read and write small files, they can do so at an order of higher magnitude in throughput (number of files per second), and with massively reduced latency (>90% of file writes on the Spotify workload completed in less than 10ms, compared to >100ms for Apache HDFS). In HopsFS, most incredibly, the small files stored at the metadata layer are not cached and are replicated at more than one host. That is, the scale-out metadata layer in HopsFS, can be scaled out like a key-value store and provide file read/write performance for small files comparable with get/put performance for a modern key-value stores.

NVMe Disk Performance

As we can see from Google Cloud disk performance figures, NVMe disks support more than two orders of magnitude more IOPs than magnetic disks, and over one order of magnitude more IOPs than standard SATA SSD disks.

Key-Value Store Performance for Small Files

In our Middleware paper, we observed up to 66X throughput improvements for writing small files and up 4.5X throughput improvements for reading small files, compared to HDFS. For latency, we saw operational latencies on Spotify’s Hadoop workload were 7.39 times lower for writing small files and 3.15 times lower for reading small files. For real-world datasets, like the Open Images 9m images dataset, we saw 4.5X improvements for reading files and 5.9X improvements for writing files. These figures were generated using only 6 NVMe disks, and we are confident that we scale to must higher numbers with more NVMe disks.

We also discuss in the paper how we solved the problems of maintaining full-HDFS Compatibility: changes for handling small files do not break HopsFS’ compatibility with HDFS clients. We also address the problem of migrating data between different storage types: when the size of a small file that is stored in the metadata layer exceeds some threshold then the file is reliably and safely moved to the HopsFS datanodes.

Running HopsFS (on-premise or in the cloud)

You can already benefit from our small files solution for HopsFS that has been available since HopsFS 2.8.2, released in 2017. We have been running www.hops.site using small files since October 2017, and we are very happy with its stability in production.

Since early 2018, NVMe disks are now available at Google Cloud, AWS, and Azure. Logical Clocks can help with providing support for running HopsFS in the cloud, including running HopsFS in an availability-zone fault-tolerant configuration, available only in Enterprise Hops.

References