Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Beyond Brainless AI with a Feature Store

>
ML Best Practices
>
4/27/2021
>
Jim Dowling

TLDR; Machine learning models are only as good as the data (features) they are trained on. In Enterprises, data scientists can often train very effective models in the lab - when given a free hand on which data to use. However, many of those data sources are not available in production environments due to disconnected systems and data silos. An AI-powered product that is limited to the data available within its application silo cannot recall historical data about its users or relevant contextual data from external sources. It is like a jellyfish - its autonomic system makes it functional and useful, but it lacks a brain. You can, however, evolve your models from brain-free AI to Total Recall AI with the help of a Feature Store, a centralized platform that can provide models with low latency access to data spanning the whole enterprise.

Autonomic AI

Jellyfish are undoubtedly complex creatures with sophisticated behaviour - they move, mate, and munch. They eat and discard waste from the same opening. Yet they have no brain - their autonomic system suffices for their needs. The biggest breakthroughs in AI in recent years have been enabled by deep learning, which requires large volumes of data and specialized compute hardware (e.g., GPUs). However, just like a jellyfish, recent successes in image processing and NLP with deep learning required no brain - no working memory, history, or context. Much of deep learning today is Jellyfish AI. We have made incredible progress in identifying objects in images and translating natural language, yet such deep learning models typically only require the immediate input - the image or the text - to make their predictions. The input signal is information-rich. These image and NLP models seldom require a 'brain' to augment the input with context or memories. Google translate doesn’t need to know the historical enmity between the Scots and the Irish in whether its spelt Whisky or Whiskey. Jellyfish AI is impressive - the input data is information rich and models can learn fantastically advanced behaviour from labeled examples. All “knowledge” needed to make predictions is embedded in the model. The model does not need to have working memory (e.g., it doesn’t need to know the user has clicked 10 times on your website in the last minute). 

Now compare using AI for image classification or NLP to building a web application that will use AI to interact with a user browsing a website. The immediate input data your application receives from your web browser are clicks on a mouse or a keyboard. The input signal is information-light - it is difficult to train a useful model using only user clicks. However, large Internet companies collect reams of information about users from many different sources, and transform that user data into features (information-rich signals that are ready to be used for either training models or making predictions with models). Models can then combine the click features with historical features about users and contextual features to build information-rich inputs to models. For example, you could augment the user’s action with everything you know about a user’s history and context to increase the user's engagement with the product. The feature store for machine learning (ML) stores and serves these features to models. We believe that AI-powered products that can easily access historical and contextual features will lead the next wave of AI in the Enterprise, and those products will need a feature store for ML.

Data Scientist and ML Engineer Disconnect

A frequent source of tension in Enterprises is between “naive” data scientists and “street-wise” ML engineers. Motivated by good software engineering practices, many ML engineers believe that ML models should be self-contained and tension can arise with data scientists who want to include features in their models that are “obviously not available in the production system”. 

However, data scientists are tasked with building the best models they can to add to the bottom line - engage more users, increase revenue, reduce costs. They know they can train better models with more data and more diverse sources of data. For example, a data scientist trying to predict if a financial transaction is suspected of money laundering or not might discover that a powerful feature is the graph of financial transfers related to this individual in the previous day/week/month. They can reduce false alerts of money launder by a factor of 100*, reducing the costs of investigating the false alerts, saving the business millions of dollars per year. The data scientist hands the model over the wall to the ML engineer who dismisses the idea of including the graph-based features in the production environment, and tension arises when communicating what is possible and what is not possible in production. The data scientist is crestfallen - but need not be.

The Feature Store is now the de facto Enterprise platform for storing historical and contextual features for AI-powered products. The Feature Store is, in effect, the brain for AI-powered products, the three-eyed Raven that enables the model to access the history and state of the whole Enterprise, not just the local state in the application. 

Feature Stores enable applications or model serving infrastructure to take information-light inputs (such as a cookie identifying a user or a shopping cart session) and enrich it with features harvested from anywhere in the Enterprise or beyond to build feature vectors capable of making better predictions. And as we know from Deep Learning, model accuracy improves predictably with more features and data, so there will be an increasing trend towards adding more and more features to models to improve their accuracy. Andrew Ng has recently been advocating this approach that he calls data-centric development, instead of the more traditional model-centric development. Another noticeable trend in large Enterprises is building faster and more scalable Feature stores that can supply those features within the time budget available to the AI-powered product. But AI is going to revolutionize Enterprise software products, so how do we make sure our AI-enabled products are not just Jellyfish AI?

Enabling AI-Enabled Products with a Feature Store

Anti-Pattern: Re-implementing the “feature engineering” code for the serving layer is non DRY. This introduces the risk of ‘skew’ between the features used to train models and the features served to operational models.

How do we avoid limiting AI-enabled products to only using the input features collected by the application itself? Models will benefit from access to all data that the Enterprise has collected about the user, product, or its context. A potential source of friction here, however, is the dominant architectural preference for microservices and data stove-pipes. Models themselves are being deployed as microservices in model-serving infrastructure, like KFServing or TensorFlow Serving or Nvidia Triton. How can we give these models access to more features?

Anti-Pattern: microservice based Online Feature Store. Microservices can be used to compute features in real-time from raw input data. When features can be pre-computed, microservices is an anti-pattern. This architecture adds latency, needs to be made highly available, handle hotspots, and microservices consume resources even when they are not needed. Serverless functions might be acceptable in the case where seconds of warmup latency is tolerable. But the microservices should still be reused to compute the training data - otherwise, there is a risk of training/serving skew. 

Without a Feature Store, applications could contact microservices or databases to compute or retrieve the historical and contextual features (data), respectively. Computing the features in the application itself is an anti-pattern as it duplicates the feature engineering code - that code should already exist to generate the training data for the model. Re-implementing feature engineering logic in applications also introduces the risk of skew between the features computed in the application and the features computed for training. If serving and training environments use the same programming language, they could avoid non-DRY code by reusing a versioned library that computes the features. However, even if feature engineering logic is written in Python in both training and serving, it may use PySpark for training and Python for serving or different versions of Python. Versioned libraries can help, but are not a general solution to the feature skew problem. 

The Feature Store solves the training/serving skew problem by computing the features once in a feature pipeline. The feature pipeline is then reused to (1) create training data, and (2) save those pre-engineered features to the Feature Store. The serving infrastructure can then retrieve those features when needed to make predictions. For example, when an application wants to make a prediction about a user, it would supply the user’s ID, shopping cart ID, session ID, or location to retrieve pre-engineered features from the Feature Store. The features are retrieved as a feature vector, and the feature vector is sent to the model that makes the prediction. The Feature Store service for retrieving feature vectors is commonly known as the Online Feature Store. The logic for retrieving features from the Online Feature Store can also be implemented in model serving infrastructure, not just in applications. The advantage of looking up features in serving infrastructure is that it keeps the application logic cleaner, and the application just sends IDs and real-time features to the model serving infrastructure, that in turn, builds the feature vector, sends it to the model for prediction, and returns the result to the application. Low latency and high throughput are important properties for the online feature store - the faster you can retrieve features and the more features you can include in a given time budget, the more accurate models you should be able to deploy in production. To quote DoorDash:

“Latency on feature stores is a part of model serving, and model serving latencies tend to be in the low milliseconds range. Thus, read latency has to be proportionately lower” 
AI-enabled products use models to make predictions, and those models need an Online Feature Store to provide them with historical and contextual data (features) to make better predictions.

Data-Centric AI with the Online Feature Store 

So, to summarize, if you want to give your ML models a brain, connect them up to a feature store. For Enterprises building personalized services, the featurestore can enrich their models with a 360 degree Enterprise wide view of the customer - not just a product-specific view of the customer. The feature store enables more accurate predictions through more data being available to make those predictions, and this ultimately enables products with better user experience, increased engagement, and the product intelligence now expected by users.

* This is based on a true story.

Star us on Github
Follow us on Twitter

Elasticsearch is dead, long live Open Distro for Elasticsearch

>
ML Best Practices
>
1/14/2021
>
Mahmoud Ismail

TLDR: The need for an open-source alternative to Elasticsearch has recently become more evident; platforms that bundle Open Distro for Elasticsearch are able to future-proof open-source support for free-text search and Elasticsearch. In this post, we describe how Hopsworks leverages the authentication and authorization support in Open Distro for Elasticsearch to make free text search a project-based multi-tenant service in Hopsworks. More concretely, Hopsworks now supports dynamic role-based access control (RBAC) to indexes in elasticsearch with no performance penalty by building on Open Distro for Elasticsearch (ODES).

Need Open-Source Elasticsearch?
Try Open Distro.

In January 2021, Elastic switched from the Apache V2 open-source license for both Elasticsearch and Kibana to a non open-source license to Server Side Public License (SSPL).

Hopsworks is an open-source platform that includes Open Distro for Elasticsearch (a fork of Elasticsearch) and Kibana.

In Hopsworks, we use Elasticsearch to provide free-text search for AI assets (features, models, experiments, datasets, etc). We also make Elasticsearch indexes available for use by programs run in Hopsworks. As we interpret it, the latter functionality means we contravene the licensing terms of the SSPL:

“If you make the functionality of the Program or a modified version available to third parties as a service... (license conditions apply)”

Luckily, we recently made the switch from Elasticsearch to Open Distro for Elasticsearch, supported by AWS, which is Apache v2 licensed.

Dynamic RBAC for Elasticsearch

Open Distro for Elasticsearch supports Active Directory and LDAP for authentication and authorization. Using the Security plugin, you can use RBAC to control the actions a user can perform. A role defines the cluster operations and index operations a user can perform, including access to indices, and even fine-grained field and document level access. RBAC allows an administrator to define a single security policy and apply it to all members of a department. But individuals may be members of multiple departments, so a user might be given multiple roles. With dynamic role-based access control you can change the set of roles a user can hold at a given time.

For example, if a user is a member of two departments - one for accessing banking data and another one for accessing trading data, with dynamic RBAC, you could restrict the user to only allow her to hold one of those roles at a given time. The policy for deciding which role the user holds could, for example, depend on what VPN (virtual private network) the user is logged in to or what building the user is present in. In effect, dynamic roles would allow the user to hold only one of the roles at a time and sandbox her inside one of the domains - banking or trading. It would prevent her from cross-linking or copying data between the different trading and banking domains.

Hopsworks implements a dynamic role-based access control model through its project-based multi-tenant security model.  Every Project has an owner with full read-write privileges and zero or more members.  A project owner may invite other users to his/her project as either a Data Scientist (read-only privileges and run jobs privileges) or Data Owner (full privileges). Users can be members of (or own) multiple Projects, but inside each project, each member (user) has a unique identity - we call it a project-user identity.  For example, user Alice in Project A is different from user Alice in Project B - (in fact, the system-wide (project-user) identities are ProjectA__Alice and ProjectB__Alice, respectively).

As such, each project-user identity is effectively a role with the project-level privileges to access data and run programs inside that project. If a user is a member of multiple projects, she has, in effect, multiple possible roles, but only one role can be active at a time when performing an action inside Hopsworks. When a user performs an action (for example, runs a program) it will be executed with the project-user identity. That is, the action will only have the privileges associated with that project. The figure below illustrates how Alice has a different identity for each of the two projects (A and B) that she is a member of. Each project contains its own separate private assets. Alice can use only one identity at a time which guarantees that she can’t access assets from both projects at the same time.

Hopsworks enables you to host sensitive data in a shared cluster using a project-based access control security model (an implementation of dynamic role-based access control). In Hopsworks, a project is a secure sandbox with members, data, code, and services. Similar to GitHub repositories, projects are self-service: users manage membership, roles, and can securely share data assets with other projects. This project-based multi-tenant security model enables users to host both sensitive and shared data in a single Hopsworks cluster - you do not need to manage and pay for separate clusters. 

An important aspect of project-based multi-tenancy is that assets can be shared between projects - sharing does not mean that data is duplicated. The current assets that can be shared between projects are: files/directories in HopsFS, Hive databases, feature stores, and Kafka topics. For example, in the figure below there are three users (User1, User2, and User3)  and two projects (A and B). User1 is a member of project A, while User2 and User3 are members of project B. All three users (User1, User2, User3) can access the assets shared between project A and project B. As sharing does not mean copying, the access control rules for the asset are updated to give users in the other project read or write permissions on the shared asset.


Project-user identity is primarily based on a X.509 certificate issued internally by Hopsworks. Access control policies, however, are implemented by the platform services (HopsFS, Hive, Feature Store, Kafka), and for Elasticsearch Open Distro, permissions are managed using an open-source Hopsworks project-based authorizer plugin.

Using Elastic Index from Spark in Hopsworks

The following PySpark code snippet, available as a notebook when you run the Spark Tour on Hopsworks, shows how to read from an index that is private to a project from PySpark. There is also an equivalent Scala/Spark notebook.

-- CODE language-bash -- from hops import elasticsearch, hdfs df = spark.read.option("header","true").csv("hdfs:///Projects/" + hdfs.project_name() + "/Resources/akc_breed_info.csv") # Write df to the project's private index called 'newindex' df.write.format('org.elasticsearch.spark.sql').options(**elasticsearch.get_elasticsearch_config("newindex")).mode("Overwrite").save() # Read from the project's private index called 'newindex' reader = spark.read.format("org.elasticsearch.spark.sql").options(**elasticsearch.get_elasticsearch_config("newindex")) df = reader.load().orderBy("breed") df.show()

Access Control using JWT and Hopsworks Project Membership

In Hopsworks, we use Public Key Infrastructure (PKI) with X.509 certificates to authenticate and authorize users. Every user and every service in a Hopsworks cluster has a private key and an X.509 certificate. Hopsworks projects also support multi-tenant services that are not currently backed by X.509 certificates, including Elasticsearch. Open Distro for Elasticsearch supports authentication and access control using JSON Web Tokens (JWT).  Similar to application X.509 certificates, Hopsworks’ resource manager (HopsYARN) issues a JWT for each submitted job and propagates it to running containers. Using the JWT, user code can then securely make calls to Elasticsearch indexes owned by the project. The JWT is rotated automatically before it expires and invalidated by HopsYARN once the application has finished.

For every Hopsworks project, a number of private indexes can be created in Elasticsearch: an index for real-time logs of applications in that project (accessible via Kibana), an index for ML experiments in the project, and an index for provenance for the project’s applications and file operations. Elastic indexes are private to the project - they are not accessible by users that are not members of the project. This access control is implemented as follows: when a request is made on Elasticsearch using a JWT token, our authorizer plugin extracts the project-specific username from the JWT token, which is of the form:

ProjectA__Alice 

The index names have the following form:

ProjectA__ElasticIndex 

Our plugin checks if a project-specific user is allowed to read/write an index by checking that the prefix (ProjectA) of both the user and the index match one another. We plan to add support for sharing elasticsearch indexes between projects by storing a list of projects allowed to perform read and write operations, respectively, on the indexes belonging to a project.

X.509 Service certificates

In Hopsworks, services communicate with each other using their own certificate to authenticate and encrypt all traffic. Each service in Hopsworks, that supports TLS encryption and/or authentication, has its own service-specific X.509 certificate, including all services in the ELK Stack (Elasticsearch, Kibana, and Logstash). Service certificates contain the Fully Qualified Domain Name (FQDN) of the host they are installed on and the login name of the system user that the process runs as. They are generated when a user provisions Hopsworks, and they have a long lifespan. Service certificates can be rotated automatically in configurable intervals or upon request of the administrator. 

Securely accessing Elastic Indexes in Jobs on Kubernetes 


Hopsworks can be integrated with Kubernetes by configuring it to use one of the available authentication mechanisms: API tokens, credentials, certificates, and IAM roles for AWS’ managed EKS offering. Hopsworks can run users’ jobs on Kubernetes that have project-specific security material,  X.509 certificates and JWTs, materialized to the launched Pods so user code can securely access services in Hopsworks, such as Open Distro for Elasticsearch. That is, Kubernetes jobs launched from within a project in Hopsworks are only allowed to access those Elasticsearch indexes that belong to that project.

Summary

In this post, we gave an overview of Hopsworks project-based multi-tenant security model and how we use Hopsworks projects and JWT tokens to make Open Distro for Elasticsearch a multi-tenant service.

Follow us on Twitter

Star us on Github


Feature Store for MLOps? Feature reuse means JOIN

>
ML Best Practices
>
10/23/2020
>
Jim Dowling

TLDR; Many engineers conflate operational Feature Stores with key-value (KV) stores, like DynamoDB and Cassandra. In fact, KV stores are missing a key mechanism needed to make features reusable: JOINs. JOINs enable features to be reused by different models. This blog is about how to scale your ML infrastructure by reusing cached features so that the number of feature pipelines you manage does not grow linearly with the number of models you run in production.

Img from OSDC talk by Twitter. Twitter evaluate their Feature Store by the number of features that are reused across teams.

The Cost of No JOINs: One Feature Pipeline per Model

If you don’t reuse features across different models, you will need a new feature pipeline for every new model you put in production. In the diagram below, there is a 1:1 mapping between train/test datasets and models. For every new model you put in production, you will write a new feature pipeline from your data stores that transforms and validates the raw data and performs aggregations, materializing train/test data to files. A ML training program (or pipeline) then trains and validates a model with the train/test data, after which it is tested and deployed to production.

It is very difficult to reuse the features in the materialized train/test datasets, as they tend to be stored in a format specific to a ML framework: .tfrecord for TensorFlow,.npy for PyTorch - and they cannot be easily combined with features stored in other train/test datasets. This is often due to the limitations of the file formats: neither TFRecord nor NPY support projections (selecting just a subset of columns).

If you intend to run hundreds of models in production, running hundreds of pipelines will explode your technical debt.


In online feature stores that do not support JOINs, it is typically the responsibility of the application to perform the JOIN of the cached features to create the feature vector. So, if you build your own online feature store using Cassandra or DynamoDB, you will need to also add logic for joining (and ordering) features in your applications/models. 

Every new feature you add to that model will need an update to the feature pipeline and changes to the application logic - making it costly and potentially cross-team work.

JOINs enable Feature Reuse


A JOIN is a Structured Query Language (SQL) command that combines data from two different two database tables. JOINs are used to create a view over the data that originates from one or more tables. If we assume that we store features normalized in database tables, then we are able to create sets of features (training datasets) by selecting different features from different tables and joining them together using a common join key. The join key(s) identify the common entity these feature values represent. 

If we now assume that features are stored as columns in tables, then what is a train/test dataset? It is a set of features along with a target column. Assuming the features are already present in existing tables, we can just join those features (columns) together to create a view - this view is the train/test dataset. The order of features (in a view) that makes up a train/test dataset is significant - a train/test dataset is a permutations of features - not a combination. Views (enabled by JOINs) enable a massive number of train/test datasets to be defined over a small number of shared features (stored in tables)

In Hopsworks, we store features in tables that we call “Feature Groups”. Feature Groups introduce a level of indirection between the raw input data and the train/test datasets used to train models, see below. They store a cached copy of the features, computed using the same feature pipeline from earlier, but this time, the feature pipeline writes to one or both of the stores that much a feature store: (1) a scalable store for train/test features (offline feature store) and (2) a low latency, high throughput store for features for serving (online feature store). 

In Hopsworks, we use Apache Hive as a scalable database for the offline store, and MySQL Cluster (NDB) as the online store. Our version of Hive stores its metadata in NDB and its data files in HopsFS/S3. In Hopsworks, FeatureGroups are database tables in Hive and MySQL Cluster, along with feature metadata (also tables in the same NDB database). 

As we can see in the diagram below, if we have N features available in the Feature Store, we can create an unlimited number of train/test datasets by simply joining features together from the offline feature store. 

In Hopsworks, we use Spark, with its cost-based optimizer, to perform JOINs. Spark, together with Parquet/Hudi/ORC help optimize joins by supporting partitioned data, push-down projections, SortMergeJoin, and 

  • a hint;
  • a join type (inner, left, outer, etc);
  • a join condition (equi-join);
  • an estimation of the input data size

JOINs in Online Feature Stores

If you use a KV store as your online store, you are back in the same situation as at start - you need a feature pipeline for every new model you put in production. But with Feature Groups (tables in NDB), we can materialize feature vectors (the individual row of features that is fed directly to the model for scoring) from the different tables by performing a join.

NDB supports sophisticated optimizations for JOINs, and can push-down many JOINs to the database nodes. In practice, in NDB even complex queries can “achieve latency down to 5-10 ms for a transaction that contains around 30 SQL statements”.

Conclusions

Reuse features to save on infrastructure and the number of feature pipelines needed to maintain models in production. JOINs is the method we use in Hopsworks to reuse cached features across different models - both for training and serving. We use Spark as the JOIN engine for the offline feature store and NDB with its parallelized, push-down JOINs for low-latency joins in the online feature store.

ML Engineer Guide: Feature Store vs Data Warehouse

>
ML Best Practices
>
10/8/2020
>
Jim Dowling

TLDR; The feature store is a data warehouse of features for machine learning (ML). Architecturally, it differs from the traditional data warehouse in that it is a dual-database, with one database (row-oriented) serving features at low latency to online applications and the other database (column-oriented) storing large volumes of features, used by Data Scientists to create train/test datasets and by batch applications doing offline model scoring.

Features Store: Data Warehouse Déjà Vu

Data warehouses democratized access to Enterprise data by centralizing data in a single platform and then empowering business analysts with visual tools, such as Tableau and Power BI. No longer did they need to know what data resides where and how to query that data in that platform. They could derive historical insights into the business using BI tools. 

Data scientists, in contrast, build predictive models to derive business insights. The feature store is the data warehouse for Data Science - it is a central vault for storing documented, curated, and access-controlled features that can be used across many different models. The feature store ingests data from the Enterprise’s many different sources after transforming, aggregating, and validating the data. 

Feature pipelines need to be written to ensure that data reliably flows from existing sources and is available in a format ready to be consumed by ML training pipelines and models.

Most Data Scientists currently do not have a feature store. They spend most of their time looking for, cleaning, and featurizing data. Hence, the (very real) cliché that 80% of data science is data wrangling. Data Scientists without a feature store work in an era akin to how business analysts worked before the advent of data warehouses, with low individual and organizational productivity.

The Data Warehouse is an input
to the Feature Store 

Both platforms are a central store of curated data used to generate insights into the data. Both platforms have pipelines (ETL and feature pipelines, respectively) to ingest data from one or more disparate sources (operational databases, data lakes, etc).

Both benefit from metadata catalogs to organize data sets and access control to share data with only authorized actors. 

Both platforms can be designed to scale-out on commodity hardware and store large volumes of data, although typically a data warehouse stores only relevant to analysis (modern data lakehouses are designed to store large volumes of data more cost efficiently).

Feature Store as a Dual Database

The main architectural difference between a data warehouse and a feature store is that the data warehouse is typically a single columnar database, while the feature store is typically implemented as two databases:

  • an offline feature store for serving large batches of features to (1) create train/test datasets and (2) batch applications scoring models using those batches of features, and
  • an online feature store for serving a single row of features (a feature vector) to be used as input features for an online model for an individual prediction.

The offline feature store is typically required to efficiently serve and store large amounts of feature data, while the online feature store is required to return feature vectors in very low latency (e.g., < 10ms). Examples of databases used for the offline feature store are Apache Hive and BigQuery and examples of online feature stores include MySQL Cluster, Redis, and DynamoDB. 

Note that if you want to reuse features in different train/test datasets for different models, your database or application will need to join features together. This is true for both the offline and online feature stores. If your feature store does not support joining features, that is, you do not reuse features across different models, you (or some system) will need to create a new ingestion pipeline for every new model you support in production.

Detailed Comparison

In the table below, we see an overview of the main architectural differences between feature stores and data warehouses. Data warehouses are used primarily by business analysts for interactive querying and for generating historical reports/dashboards on the business. Feature stores are used by both data scientists and by the online/batch applications, and they are fed data by feature pipelines, typically written in Python or Scala/Java. 

Data scientists typically use Python programs to create train/test datasets by joining existing features in the feature store together and materializing the train/test datasets in a file format best suited to the framework they are going to train their model in (e.g., TFRecord for TensorFlow, NPY for PyTorch). Data warehouses and SQL currently lack this capability to create train/test datasets in ML file formats.

Feature Data should be Validated
before Ingestion

The table also shows the differences in the types of data stored, as well as how the data is stored, validated, and queried. A data warehouse stores data in tables along with schemas for describing the type of data and constraints for columns. Similarly, the feature store stores typed data (typically in tables), but as features are typically stored as ready-to-consume numerical values or vectors (embeddings) or tensors, there is less need for a richer set of column types compared to a data warehouse.  Foreign key constraints are typically not supported in feature stores, due to the difficulty in enforcing such constraints between online and offline stores.

As model training is very sensitive to bad data (null values, outliers cause numerical instability, missing values), feature data should be validated before ingestion. Data validation frameworks, such as Great Expectations and Deequ, have appeared to help implement feature pipelines that apply predicates (data validation rules) on all the features ingested into the feature store, ensuring high data quality in the feature store. 

Domain specific languages (DSL) are sometimes used to define the feature transformations, aggregations, and data validation rules in feature pipelines, but general purpose languages (Python, Scala) are commonly used when non-trivial feature engineering is required. 

Using the feature store to create train/test data

Data scientists are one of the main users of the feature store. They use a feature repository to perform exploratory data analysis (EDA) - searching/browsing for available features and inspecting feature values/schemas/statistics. Data Scientists mainly use Python to select features to create train/test datasets. This typically involves joining features together to create a  train/test dataset in their file format of choice (.tfrecord, .csv, .npy, .petastorm, etc). Sometimes feature stores support a DSL (domain specific language) to create train/test datasets or other languages such as Scala/Java. 

Online feature store

Online applications use the online feature store to retrieve feature values with low latency to build feature vectors that are sent to models for predictions. In contrast to higher latency data warehouses, feature stores may be required to return feature vectors in single millisecond latency - only really achievable in row-oriented or key-value stores. 

The typical access pattern for retrieving features is a key-value lookup, but if features are to be reused in the online feature store, then joins are again required (either in the database or in the application). In some databases (such as MySQL Cluster), a small number of joins can be performed at very low latency.

Feature statistics to monitor for feature
drift and data drift

Descriptive statistics (e.g., mean, standard deviation) for features are also useful when identifying data drift in online models. Your monitoring infrastructure can calculate statistics on live prediction traffic, and compare those statistics with the values in the feature store to identify data drift for the live traffic, potentially required retraining of the model.

Time-Travel 

Temporal databases support time-travel: the ability to query data as it was at a given point-in-time or data changes in a given time-interval. The “AS OF SYSTEM TIME” syntax was introduced to SQL 2011 to standardize point-in-time queries, while the “VERSIONS BETWEEN SYSTEM TIME ... AND ... “ syntax was introduced to identify the versioned changes to data in a time interval. Time-travel is supported in some data warehouses, but does not have universal support across all vendors.

For a feature store time-travel has several important uses: when creating train/test data (e.g., training data is data from the years 2010-2018, while test data is data from the range 2019-2020). Time-travel is also useful to make changes to a dataset (e.g., rollback a bad commit of data to the dataset) or to compare metadata (statistics) for features and how they change over time. We rarely require time-travel for features used in serving. Time-travel is also important when performing point-in-time joins, where we ensure that there is no data leakage from the future when we create train/test datasets from historical data.

Feature Pipelines 

Data warehouses typically have timed triggers for running ETL jobs (or data pipelines) to ingest the latest data from operational databases, message queues, and data lakes. Similarly, feature pipelines can timed triggers to transform and aggregate the latest data from different sources before storing it in both the online and offline feature store for scoring by online and offline applications. However, additional pipelines can also feed features to the feature store. 

Predictions made by models can be stored in the feature store along with the outcomes for those predictions. There can be long lags of even days or months or years before outcomes become available - e.g., a prediction on whether a loan will be repaid or not), but as they arrive new training data becomes available that can be used to trigger re-training of models.

Conclusion

Data warehouses can be used to store pre-computed features, but they do not provide much more functionality beyond that for ML pipelines. When Data Scientists need to create train/test data using Python or when online features (for serving features to online models) are needed at low latency, you need a feature store. Similarly, if you want to detect feature drift or data drift, you need support for computing feature statistics and identifying drift.


One function is all you need for ML Experiments

>
ML Best Practices
>
9/30/2020
>
Robin Andersson

TLDR; Hopsworks provides support for machine learning (ML) experiments. That is, it can automatically track the artifacts, graphs, performance, logs, metadata, and dependencies of your ML programs.Many of you already know about platforms like MLflow, so why should you read about Hopsworks Experiments?  Because you do not have to rewrite your TensorFlow/PyTorch/Scikit-learn programs to get tracking and distributed ML for free, and TensorBoard comes built-in. We discuss how Hopsworks uniquely supports implicit provenance to transparently create metadata and how it is combined with the oblivious training function to make your training distribution transparent. 

Hopsworks Introduction

Hopsworks is a single platform for both data science and data engineering that is available as both an open-source platform and a SaaS platform, including a built-in feature store. You can train models on GPUs at scale, easily install any Python libraries you want using pip/conda, run Jupyter notebooks as jobs, put those jobs in Airflow pipelines, and even write (Py)Spark or Flink applications that run at scale. 

As a development environment, Hopsworks provides a central, collaborative development environment that enables machine learning teams to easily share results and experiments with teammates or generate reports for project stakeholders. All resources have strong security, data governance, backup and high availability support in Hopsworks, while assets are stored in a single distributed file system (with data stored on S3 in the cloud).

A Hopsworks ML experiment stores information about your ML training run: logs, images, metrics of interest (accuracy, loss), the program used to train the model, its input training data, and the conda dependencies used. Optional outputs are hyperparameters, a TensorBoard, and a Spark history server.


The logs of each hyperparameter trial are retrieved by clicking on its log, and TensorBoard visualizes the different trials results. The TensorBoard HParams plugin is also available to drill down further on the trials.

Tracking

When you run a Python or PySpark application on the Hopsworks platform, it can create an experiment that includes both the traditional information a program generates (results, logs, errors) as well as ML-specific information to help track, debug, and reproduce your program and its inputs and outputs:

  • hyperparameters: parameters for training runs that are not updated by the ML programs themselves; 
  • metrics: the loss or accuracy of the model(s) trained in this experiment;
  • program artifacts: python/pyspark/airflow programs, and their conda environments;
  • model artifacts: serialized model objects, model schemas, and model checkpoints;
  • executions: information to be able to re-execute the experiment, including parameters, versioned features for input, output files,  etc; 
  • versioned features: to be able to reproduce an experiment, we need the exact training/test data from the run and how it was created from the feature store;
  • visualizations: images generated during training and score. Also use TensorBoard to visualize training runs - Hopsworks aggregates results from all workers transparently;
  • logs (for debugging): model weights, gradients, losses, optimizer state;
  • custom metadata: tag experiments and free-text search for them, govern experiments (label as ‘PII’, ‘data-retention-period’, etc), and reproduce training runs.

Experiment Tracking and Distributed ML in One Library


-- CODE language-bash -- def train(data_path, max_depth, min_child_weight, estimators): X_train, X_test, y_train, y_test = build_data(..) ... print("hello world") # monkeypatched - prints in notebook ... model.fit(X_train, y_train) # auto-logging ... hops.export_model(model, "tensorflow",..,model_name) ... # create local files ‘logile.txt’, ‘diagram.png’ return {'accuracy': accuracy, 'loss': loss, 'logfile': 'logfile.txt', 'diagram': 'diagram.png'} # track dict from maggy import experiment experiment.lagom(train, name="My Experiment", ...) # To launch as a distributed ML HParam Tuning job: # sp=Searchspace(max_depth=('INTEGER',[2,8]),min_child_weight # =('INTEGER', [2, 8]), ) # experiment.lagom(train, name=“HP, optimizer='randomsearch', # direction='max', num_trials=15,)

Platforms that support experiment tracking require the user to refactor their training code in a function or some explicit scope (such as “with … as xx:” in MLFlow, see Appendix A) to identify when an experiment begins and when an experiment ends. In Hopsworks, we require the developer to write their training code inside a function. 

We call this Python function an oblivious training function because the function is oblivious of whether it is being run on a Python kernel in a Jupyter notebook or on many workers in a cluster, see our blog and Spark/AI summit talk for details. That is, you write your training code once and reuse the same function when training a small model on your laptop or when performing hyperparameter tuning or distributed training on a large cluster of GPUs or CPUs.

We double down on this “wrapper” Python function by also using it to start/stop experiment tracking. Experiment tracking and distribution transparency in a single function, nice! 

In Hopsworks, the Maggy library runs experiments, see code snippet above. As you can see, the only code changes a user needed compared to a best-practice TensorFlow program are: 

  1. factor the training code in a user-defined function (def train(..):);
  2. return a Python dict containing the results, images, and files that the user wants to be tracked for the experiment and accessible later in the Experiments UI; and
  3. invoke the training function using the experiment.lagom function.

The hyperparameters can be fixed for a single execution run, or as shown in the last 4 lines of the code snippet, you can execute the train function as a distributed hyperparameter tuning job across many workers in parallel (with GPUs, if needed). 

Hopsworks will automatically:

  • track all parameters of the train function as hyperparameters for this experiment, 
  • auto-log using Keras callbacks in model.fit;
  • create a versioned directory in HopsFS, where a copy of the program, its conda environment, and all logs from all workers are aggregated;
  • track all provenance information for this application - input data from HopsFS used in this experiment (train/test datasets from the Feature Store), and all output artifacts (models, model checkpoints, application logs);
  • redirect all print statements executed in workers to the Jupyter notebook cell for easier debugging (see GIF below - each print statement is prefixed by the worker ID).
In Hopsworks, logs from workers can be printed in your Jupyter notebook during training. Take that Databricks!

TensorBoard support

-- CODE language-bash -- def train(): from maggy import tensorboard ... model.fit(.., callbacks=[TensorBoard(log_dir=tensorboard.logdir(),..)], ...)

TensorBoard is arguably the most common and powerful tool used to visualize, profile and debug machine learning experiments. Hopsworks Experiments integrates seamlessly with TensorBoard. Inside the training function, the data scientist can simply import the tensorboard python module and get the folder location to write all the TensorBoard files. The content of the folder is then collected from each Executor and placed in the experiment directory in HopsFS. As TensorBoard supports showing multiple experiment runs in the same graph, visualizing and comparing multiple hyperparameter combinations becomes as simple as starting the TensorBoard integrated in the Experiments service. By default, Tensorboard is configured with useful plugins such as HParam, Profiler, and Debugging. 

Profiling and debugging

Hopsworks 1.4.0 comes with TensorFlow 2.3, which includes the TensorFlow profiler. A new long-awaited feature that finally allows users to profile model training to identify bottlenecks in the training process such as slow data loading or poor operation placement in CPU + GPU configurations. 

TensorFlow 2.3 also includes Debugger V2, making it easy to find model issues such as NaN which are non-trivial to find the root cause of in complex models.

Model Registry

In the training code models may be exported and saved to HopsFS. Using the model python module in the hops library, it is easy to version and attach meaningful metadata to models to reflect the performance of a given model version. 

The Hopsworks Model Registry, is a service where all models are listed in addition to useful information such as which user created the model, different versions, time of creation and evaluation metrics such as accuracy. 

The Model Registry provides functionality to filter based on the model name, version number and the user that exported the model. Furthermore the evaluation metrics of model versions can be sorted in the UI to find the best version for a given model. 

In the Model Registry UI, you can also navigate to the experiment used to train the model, and from there to the train/test data used to train the model, and from there to the features in the feature store used to create the train/test data. Thanks, provenance!

Exporting a model

A model can be exported programmatically by using the export function in the model module. Prior to exporting the model, the experiment needs to have written a model to a folder or to a path on HopsFS. Then that path is supplied to the function along with the name of the model and the evaluation metrics that should be attached. The export call will upload the contents of the folder to your Models dataset and it will also appear in the Model Registry with an incrementing version number for each export.

-- CODE language-bash -- from hops import model # local path to directory containing model (e.g. .pb or .pk) path = os.getcwd() + “/model_dir” # uploads path to the model repository, metadata is a dict of metrics model.export(path, “mnist”, metrics={‘accuracy’: acc})

Get the best model version

When deploying a model to real-time serving infrastructure or loading a model for offline batch inference, applications can query the model repository to find the best version based on metadata attached to the model versions - such as the accuracy of the model. In the following example, the model version for MNIST with the highest accuracy is returned.

-- CODE language-bash -- from hops import model F from hops.model import Metric MODEL_NAME=”mnist” EVALUATION_METRIC=”accuracy” best_model = model.get_best_model(MODEL_NAME, EVALUATION_METRIC, Metric.MAX) print(‘Model name: ‘ + best_model[‘name’]) print(‘Model version: ‘ + str(best_model[‘version])) print(best_model[‘metrics’])

The Devil is in the Details

That was the brief overview of Hopsworks Experiments and the Model Registry. You can now try it out on www.hopsworks.ai or install Hopsworks Community or Enterprise on any servers or VMs you can get your hands on. If you want to read more about how we implemented the plumbing, then read on.

Transparent Distributed ML with PySpark

Hopsworks uses PySpark to transparently distribute the oblivious training function for execution on workers. If GPUs are used by workers, Spark allocates GPUs to workers, and dynamic executors are supported which ensures that GPUs are released after the training function has returned, read more here. This enables you to keep your notebook open and interactively visualize results from training, without having to worry that you are still paying for the GPUs. 

The advantage of the Hopsworks programming model, compared to approaches where training code is supplied as Docker images such as AWS Sagemaker, is that you can write custom training code in place and debug it directly in your notebook. You also don’t need to write Dockerfiles for training code, and Python dependencies are managed by simply installing libraries using PIP or Conda from the Hopsworks UI (we compile the Docker images transparently for you).

The oblivious training function can run in different execution contexts: on a Jupyter notebook in a Python kernel (far left), for parallel ML experiments (middle), and for collective allreduce data parallel training (far right). Maggy and Hopsworks take care of complex tasks such as scheduling tasks, collecting results, and generating new hyperparameter trials.

HopsFS stores experiment data and logs generated by workers during training. When an experiment is started through the API, a subfolder in the Experiments dataset in HopsFS is created and metadata about the experiment is attached to the folder. Hopsworks automatically synchronizes this metadata to elasticsearch using implicit provenance. 

The metadata may include information such as the name of the experiment, type of the experiment, the exported model, and so on. As the existence of an experiment is tracked by a directory, it also means that deleting a folder also deletes the experiment as well as its associated metadata from the tracking service. 

Tracking metadata with Implicit Provenance

Existing systems for tracking the lineage of ML artifacts, such as TensorFlow Extended or MLFlow, require developers to change their application or library code to log tracking events to an external metadata store. 

In Hopsworks, we primarily use implicit provenance to capture metadata, where we instrument our distributed file system, HopsFS, and some libraries to capture changes to ML artifacts, requiring minimal code changes to standard TensorFlow, PyTorch, or Scikit-learn programs (see details in our USENIX OpML’20 paper). 

File system events such as reading features from a train/test dataset and saving a model to a directory implicitly recorded as metadata in HopsFS and then transparently indexed in Elasticsearch. This enables free-text search for ML artifacts, metadata, and experiments in the UI.

Experiments in Hopsworks are the first part of a ML training pipeline that starts at the Feature Store and ends at model serving. ML Artifacts (train/test datasets, experiments, models, etc) can be stored on HopsFS, and they can also have custom metadata attached to them. 

The custom metadata is tightly coupled to the artifact (remove the file, and its metadata is automatically cleaned up) - this is achieved by storing the metadata in the same scaleout metadata layer used by HopsFS. This custom metadata is also automatically synchronized to Elasticsearch (using a service called ePipe), enabling free-text search for metadata in Hopsworks.

That’s all for now Folks!

Of all the developer tools for Data Science, platforms for managing ML experiments have seen the most innovation in recent years. Open-source platforms have appeared, such as MLFlow and our Hopsworks platform, alongside proprietary SaaS offerings such as WandB, Neptune, Comet.ml, and Valohai. 

What makes Hopsworks Experiments different? You can write clean Python code and get experiment tracking and distributed ML for free with the help of implicit provenance and the oblivious training function, respectively. 

There is growing consensus that platforms should keep track of what goes in and out of ML experiments for both debugging and reproducibility. You can instrument your code to keep track of inputs/outputs, or you can let the framework manage it for you with implicit provenance. 

Hopsworks Experiments are a key component in our mission to reduce the complexity of putting ML in production. Further groundbreaking innovations are coming in the next few months in the areas of real-time feature engineering and monitoring operational models. Stay tuned!

Appendix A

In the code snippet below, we compare how you write a Hopsworks experiment with MLFlow. There are more similarities than differences, but explicit logging to a tracking server is not needed in Hopsworks.

-- CODE language-bash -- def train(data_path, max_depth, min_child_weight, estimators): X_train, X_test, y_train, y_test = build_data(..) ... print("hello world") # monkeypatched - prints in notebook ... model.fit(X_train, y_train) # auto-logging ... hops.export_model(model, "tensorflow",..,model_name) ... # create local files ‘logile.txt’, ‘diagram.png’ return {'accuracy': accuracy, 'loss': loss, 'logfile': 'logfile.txt', 'diagram': 'diagram.png'} # track dict from maggy import experiment experiment.lagom(train, name="My Experiment", ...) # To launch as a distributed ML HParam Tuning job: # sp=Searchspace(max_depth=('INTEGER',[2,8]),min_child_weight # =('INTEGER', [2, 8]), ) # experiment.lagom(train, name=“HP, optimizer='randomsearch', # direction='max', num_trials=15,)

-- CODE language-bash -- def train(data_path, max_depth, min_child_weight, estimators, model_name): # distribution external X_train, X_test, y_train, y_test = build_data(..) mlflow.set_tracking_uri("jdbc:mysql://username:password@host:3306/database") mlflow.set_experiment("My Experiment") with mlflow.start_run() as run: ... mlflow.log_param("max_depth", max_depth) mlflow.log_param("min_child_weight", min_child_weight) mlflow.log_param("estimators", estimators) with open("test.txt", "w") as f: f.write("hello world!") mlflow.log_artifacts("/full/path/to/test.txt") ... model.fit(X_train, y_train) # auto-logging ... mlflow.tensorflow.log_model(model, "tensorflow-model", registered_model_name=model_name)

Like MLFlow, but better?

Appendix B

Pipelines are the program that orchestrates the execution of an end-to-end training and model deployment job. In Hopsworks, you can run Jupyter notebooks as schedulable Jobs in Hopsworks, and these jobs can be run as part of an Airflow pipeline (Airflow also comes as part of Hopsworks). After pipeline runs, data scientists can quickly inspect the training results in the Experiments service. 

The typical steps that make up a full training-and-deploy pipeline include:

  • materialization of train/test data by selecting features from a feature store, 
  • model training on the train/test data and export the model to the Model Registry,
  • evaluation and validation of the model and if it passes robustness, bias, and accuracy tests, model deployment.

How we secure your data with Hopsworks

>
ML Best Practices
>
7/16/2020
>
Antonios Kouzoupis

TLDR; Feature stores are the new cool kids in the neighbourhood of Data engineering and AI (artificial intelligence). Hyperscale  AI companies (such as Uber, Netflix) have built their own feature stores to solve the problems of reusing, governing and securing access to features (data for AI) in a shared platform. Hopsworks is a modular open-source platform, developed by Logical Clocks, for managing data for AI (a standalone Feature Store), computing features (Spark, Python), and training models. In this post, we introduce the project-based multi-tenancy security model in Hopsworks for users, data, and programs. We describe how our project-based multi-tenant security model is, in effect, a form of dynamic role-based access control with zero performance overhead.

Cloud-Native Security and More

Hopsworks.ai is a SaaS version of Hopsworks, currently available on AWS. Hopsworks clusters can be run with an IAM profile, providing them with an identity in AWS with permission policies that capture what operations the Hopsworks cluster is authorized to perform in AWS - such as read/write data in S3 buckets. Hopsworks can also be used from Data Science and Feature Engineering platforms (Databricks, Sagemaker, KubeFlow, EMR) using API keys exported from Hopsworks.

This post is concerned primarily with the internal security model in Hopsworks that enables you to host sensitive data in a shared cluster, providing powerful access control and self-service capabilities. The benefit of Hopsworks project-based multi-tenancy model is that you can host many users and feature stores (and other projects) on a single cluster, with self-service access to different feature stores. The advantage of our security model is that you can host production, staging, and development feature stores in a single cluster - you do not need to manage and pay for separate clusters. 

Project-based Multi-tenancy

Role-based access control (RBAC) is a well-known security model that enables administrators to give a group of users the same access rights to selected resources. With roles, an administrator at a company could define a single security policy and apply it to all members of a department. But individuals may be members of multiple departments, so a user might be given multiple roles. Dynamic role-based access control means that, based on some other policy, you can change the set of roles a user can hold at a given time. For example, if a user has two different roles - one for accessing banking data and another one for accessing trading data, with dynamic RBAC, you could restrict the user to only allow her to hold one of those roles at a given time. The policy for deciding which role the user holds could, for example, depend on what VPN (virtual private network) the user is logged in to or what building the user is present in. In effect, dynamic roles would allow to hold only one of the roles at a time and sandbox her inside one of the domains - banking or trading. It would prevent her from cross-linking or copying data between the different trading and banking domains.

Hopsworks implements a dynamic role-based access control model through a project-based multi-tenant security model. Inspired by GDPR, in Hopsworks a Project is a sandboxed set of users, data, and programs (where data can be shared in a controlled manner between projects). Every Project has an owner with full read-write privileges and zero or more members

A project owner may invite other users to his/her project as either a Data Scientist (read-only privileges and run jobs privileges) or Data Owner (full privileges). Users can be members of (or own) multiple Projects, but inside each project, each member (user) has a unique identity - we call it a project-user identity.  For example, user Alice in Project A is different from user Alice in Project B - (in fact, the system-wide (project-user) identities are ProjectA__Alice and ProjectB__Alice, respectively). As such, each project-user identity is effectively a role with the project-level privileges to access data and run programs inside that project. If a user is a member of multiple projects, she has, in effect, multiple possible roles, but only one role can be active at a time when performing an action inside Hopsworks. When a user performs an action (for example, runs a program) it will be executed with the project-user identity. That is, the action will only have the privileges associated with that project. The figure below illustrates how Alice has a different identity for each of the two projects (A and B) that she is a member of. Each project contains its own separate private assets. Alice can use only one identity at a time which guarantees that she can’t access assets from both projects at the same time.

An important aspect of Project based multi-tenancy is that assets can be shared between projects - sharing does not mean that data is duplicated. The current assets that can be shared between projects are: files/directories in HopsFS, Hive databases, feature stores, and Kafka topics. For example, in the figure below there are three users (User1, User2, and User3)  and two projects (A and B). User1 is a member of project A, while User2 and User3 are members of project B. All three users (User1, User2, User3) can access the assets shared between project A and project B. As sharing does not mean copying, the access control rules for the asset are updated to give users in the other project read or write permissions on the shared asset.

As we will see later on, project-user identity is based on a X.509 certificate issued internally by Hopsworks. Access control policies, however, are implemented by the platform services:

User Identity in Hopsworks

When a user authenticates with Hopsworks, they are logged into the platform with a Hopsworks user identity. This user identity is needed to be able to construct the project-user identity - it is the “user” part of the project-user identity. In Hopsworks, a user-identity is mapped to a global Hopsworks role (independent of project membership) - a normal user or an administrator. A normal user can search for assets, update her profile, generate API keys, and change to/from projects. Administrators have access to user, project, storage, and application management pages, system monitoring and maintenance services. They can activate or block users, delete Projects, manage Project quotas, promote normal users to administrators, and so on. It’s important to mention here that a Hopsworks administrator cannot view the data  inside a project - even if they are allowed to delete a project.

Multi-Tenant Services in Hopsworks

A user interacts with Hopsworks through the web application and they don’t necessarily realize that the web application is a facade to a modular distributed system. In the background we run HopsFS - our next-generation HDFS-on-S3 filesystem, HopsYARN - a cluster management system and scheduler, Apache Hive, Elasticsearch, (optionally Kafka and Airflow), and other logging and monitoring services.

A fundamental principle in every distributed system is that processes exchange messages over the network or through shared state (such as a filesystem or database). When communication is performed by message passing, it is imperative that we protect the messages from adversaries reading or modifying their content and validate the identity of the caller. Traditionally in Hadoop, they use Kerberos and GSSAPI to authenticate and authorize users and encrypt data in-transit. While Kerberos (Active Directory) is widely adopted by big organizations, the administration of users and services is a painful process and the system does not scale. On top of that Kerberos APIs are so complicated that programming against them can be really challenging.

To avoid the pain of Kerberos (and be able to natively integrate with platforms like Kubernetes), we completely re-designed the security model for HopsFS and YARN to use certificates. We replaced Kerberos with Public Key Infrastructure (PKI) with X.509 certificates to authenticate and authorize users. Certificates enabled us to also use the well established TLS protocol to provide confidentiality and data integrity. Every user and every service in a Hopsworks cluster has a private key and an X.509 certificate.


TLS-Based Multi-Tenant Services

Hopsworks supports a number of stateful and compute services that use X.509 certificates to authenticate users, applications, and services: HopsFS, HiveServer2, Kafka, YARN. These services all provide their own authorization schemes. We unified HopsFS and Hive’s authorization models by providing 2-way TLS in HiveServer2 and storage based authorization for the Hive metastore, that we ported to Hive 3.X, to delegate access control decisions for Hive to HopsFS. In Hive, tables and databases store their data files inside directories on HopsFS, so HopsFS  ACLs (access control lists) authorize file system operations by Hive (read from tables, write to tables). The easy-to-understand ACLs that we expose in Hopsworks (for Hive, and datasets in HopsFS) are captured in two roles: Data Scientists can read, Data Owners can read/write. HopsFS ACLs can be customized directly in Hopsworks from version 1.4.

For Kafka, we developed a Hopsworks Authorizer plugin that authorizes operations on Kafka topics by extracting the project-user identity from the client supplied X.509 certificate. The Hopsworks Kafka Authorizer then validates that the user is a member of the project that has permissions to perform the requested action on the Kafka topic. 

HopsYARN uses X.509 certificates to identify users. HopsYARN also creates and manages (including renewal) application certificates for YARN applications (such as Spark jobs). Application certificates in HopsYARN are a key feature missing currently in Kubernetes - because each application has an identity, you can track and log its actions (reading/writing files/topics/databases/etc). This is incredibly valuable for machine learning pipelines, where it enables Hopsworks to automatically gather provenance information for models, notebooks, and train/test datasets (see our USENIX OpML ‘20 paper for more details) - enabling easy reproduction of models.

Non TLS-Based Multi-Tenant Services

Hopsworks projects also support two other multi-tenant services that are not currently backed by X.509 certificates: Elasticsearch and the Online Feature Store (MySQL Cluster (NDB)). 

Hopsworks includes the Open Distro for Elasticsearch that supports authentication and access control using JSON Web Tokens (JWT). For every Hopsworks project, a number of private indexes are created in Elasticsearch: an index for real-time logs of applications in that project (accessible via Kibana), an index for ML experiments in the project, and an index for provenance for the project’s applications and file operations. Currently, we do not support sharing elastic indexes across projects - they are private to the project. MySQL Cluster is the online feature store in Hopsworks, and details on how we provide multi-tenant access to the online feature store using user credentials is provided later in this post.

Public Key Infrastructure

As we saw in the previous section, most multi-tenant services in Hopsworks are built on X.509 certificates. Hopsworks comes with its own root Certificate Authority (CA) for signing certificates internally. Hopsworks root CA does not directly sign requests, instead it uses an intermediate CA to do so. To protect against a security breach, more than one intermediate CA can be made responsible for a specific domain. As shown in the above figure, there is one intermediate CA for creating API and Kublet certificates that can be used to access an external Kubernetes cluster (more details on integration with Kubernetes are provided later in this post) and another Hopsworks intermediate CA for user, application, and service certificates inside Hopsworks. 

Hopsworks intermediate CA supports three kinds of certificates: User, Application and Service certificates. They are all signed by the same Hopsworks intermediate CA but have different lifespan and attributes. In the following sections we are going to dig deeper on how they are issued, their lifecycle and how they’re used.

User certificates

When a user in Hopsworks becomes a member of a project (either when they create their own project or are added as a member to a project), a new user is created behind the scenes. This new user uniquely identifies a user belonging to a specific Project, a project-user ID. The username of this project-user is in the form of PROJECTNAME__USERNAME.

For each project-user, we automatically generate an X.509 certificate and a private key. The certificate contains the user’s username to the Common Name field of the X.509 Subject to authenticate the user.

Certificates and private keys are stored in the database and pulled when needed to perform RPCs or REST calls (a caching mechanism is set in place to avoid round-trips to the database). All private keys are stored encrypted in the database. User certificates are used for all operations that are not executed within the context of an application but still access the data managed by Hopsworks. For example, a user in Hopsworks who previews a dataset in the UI or when a user submits an application to the scheduler.

Application certificates

In most cases, users perform actions within the context of an application. A user will launch a job which will read/write data from/to HopsFS, negotiate for more resources in the cluster, write to the feature store, or produce to a Kafka topic. To support fine-grained access control, lineage, and provenance, we issue a new set of cryptographic material for every application. The application certificate contains the username of the user who submitted the job and a unique identifier for the current application. With this information we are now able to log information about who created or modified a specific file and with what application, or in machine learning, we can infer which application read/wrote this feature data.

Application certificates are transparent to the user. HopsYARN is responsible for their lifecycle. When a new application is submitted to the cluster, a new X.509 certificate is generated and signed by Hopsworks intermediate CA. The cryptographic material is shipped automatically to all containers and revoked once the application has finished. To minimize the attack vector, HopsYARN will periodically rotate them. It will generate a new pair, ship them to already running containers, and revoke the previous set.

Service certificates

Finally, the last type of Hopsworks certificate is the one used by the services themselves. Services will communicate with each other using their own certificate to authenticate and encrypt all traffic. Each service in Hopsworks, that supports TLS encryption and/or authentication, has its own service-specific certificate. Service certificates contain the Fully Qualified Domain Name (FQDN) of the host they are installed on and the login name of the system user who’s supposed to run the process. They are generated when a user provisions Hopsworks - either with a fresh on-prem installation or by spinning up an instance on hopsworks.ai - and in general they have a long lifespan. Service certificates can be rotated automatically in configurable intervals or upon request of the administrator. The services in Hopsworks that have their own X.509 certificates for encrypting their network traffic:

  • Hopsworks
  • MySQL Server
  • Consul
  • ELK Stack (Elasticsearch, Kibana, and Logstash)
  • History servers (Spark, MapReduce, Flink)

The services in Hopsworks that support two-way X.509 certificates for both encrypting their network traffic and client-side authentication:

  • Hive Server2 
  • Kafka
  • HopsFS (Namenodes and Datanodes)
  • HopsYARN (ResourceManager and Nodemanagers)

Encryption-in-transit

So, now that we’ve outlined the building blocks for our security model, we will see how we actually protect all data in-transit. All backend services exchange messages with each other. Both servers and clients require all communication to be done over TLS with 2-way authentication, that is both entities should present a certificate.

The server will check if the client’s certificate is still valid and trusted. It will also validate the peer’s certificate against a Certificate Revocation List and if the certificate has been revoked, it will drop the connection. As a last step of protection for project-user and application certificates, the username that is encoded in the message - which is the effective user performing an action - is validated against the Common Name field of the X.509 certificate. That way a rogue user cannot impersonate other users. In server-to-server communication, service certificates are used. Remember the FQDN in the certificate? The receiver will do a DNS lookup (result is cached) and the answer should match the incoming IP address, so an adversary can’t join a faulty node in the cluster. Finally, the Locality (L) field of a Service X.509 certificate should also match the username of an incoming RPC.

Hopsworks Certificate Authority keeps a list of revoked certificates. Each time a project is deleted, or a user is removed from a project or a job finishes or when a certificate is rotated, Hopsworks updates its CRL and digitally signs it. All backend services periodically fetch the CRL and refresh their internal data structures so that connections with revoked certificates will be dropped.

Encryption-at-Rest

In Hopsworks.ai, HopsFS data can be stored in a bucket in S3. S3 has the advantages of lower cost, but it also provides encryption-at-rest for files stored in buckets. For on-premises HopsFS, data can be stored encrypted-at-rest using ZFS native encryption support. Hopsworks can centrally manage encryption keys for ZFS pools (volumes), so that encryption keys are not stored on the storage hosts. As services in Hopsworks run in user-mode in Linux, the platform can be configured to ensure that administrators are not able to read file data.

Application JSON Web Token

Hopsworks and Elasticsearch (Open Distro) use JWT for authentication. Similar to application X.509 certificates, HopsYARN issues a token for each submitted job and propagates it to running containers. User code can then securely make REST calls to Hopsworks API or to Elasticsearch indexes owned by the project. The JWT is rotated automatically before it expires and invalidated once the application has finished.

A general overview of the security architecture of Hopsworks with all the artifacts we’ve discussed so far is depicted in the figure below.

Online Feature Store Credentials

When an online feature store is enabled in a project in Hopsworks, a new database is created in MySQL Cluster NDB to store the online features. The credentials for accessing that database are created by Hopsworks and securely stored in the database, encrypted by a master key. Clients of the online feature store (such as external online applications that make queries on machine learning models) use a REST request to retrieve their feature store credentials with which they can read data directly from MySQL Cluster using a JDBC/TLS connection. Credentials can be rotated in MySQL Cluster.

Integration with External Security Models

While Hopsworks has its own security model, it needs to integrate with the security models of the environments in which it is used.

AWS IAM Integration

In our Hopsworks SaaS platform, www.hopsworks.ai, a Hopsworks cluster can be given an IAM role - an identity in AWS with permission policies that capture what operations the Hopsworks cluster is authorized to perform in AWS. In Hopsworks.ai, you can start a cluster by selecting an instance profile for it in the UI - the instance profile is a container for the IAM role. Users can create their own instance profile which suits their needs and security requirements. It is also possible to use AWS keys in Hopsworks.ai to define connectors to external AWS services. With either the instance profile or AWS keys, applications (Python, PySpark, Spark, and Flink) can natively read/write to services in AWS, such as S3. Similar to Hopsworks.ai SaaS platform, Enterprise Hopsworks on AWS can also be configured to run clusters with an instance profile.

Active Directory (Kerberos) and OAuth2

Hopsworks Enterprise supports single sign-on for both Kerberos, LDAP, and OAuth2 identity providers. If you deploy Hopsworks in an environment where users use ActiveDirectory (Kerberos), LDAP, or OAuth2 for single sign-on, when they navigate to the Hopsworks UI, authentication plugins available in Enterprise Hopsworks (SPNEGO for Kerberos, and OpenID for OAuth2) will automatically log users in.

Kubernetes Integration

Hopsworks can be integrated with Kubernetes by configuring it to use one of the available authentication mechanisms: API tokens, credentials, certificates, and IAM roles for AWS’ managed EKS offering. Hopsworks can offload to Kubernetes some of its microservices (Jupyter notebook instances, model serving) and run users’ jobs on Kubernetes. Project specific security material such as X.509 certificates and JWTs will be propagated to launched Pods so user code can access services in Hopsworks: HopsFS, Hive, Elasticsearch, and the Hopsworks REST API.

The Hopsworks installer gives you the option to also install the open-source Kubernetes distribution alongside Hopsworks. This is convenient for environments with no existing Kubernetes installation or for testing and development. For this case, our KubernetesCA intermediate CA will issue all Kuberentes internal and etcd certificates.

API-Based access to Hopsworks

Programs can also authenticate to the Hopsworks REST API using JWT or API keys. The former expire after a short period of time and the latter can be issued with limited scope, e.g.,  access only the Feature Store API. As such, API keys are typically used for external access to Hopsworks Feature Store by platforms such as Databricks, AWS Sagemaker, KubeFlow, and AWS EMR.

Summary

In this post, we gave an overview of how Hopsworks tackles information security and we introduced our novel project-based multi-tenant security model and the multi-tenant services supported in Hopsworks. Our security model is based primarily on X.509 certificates that come in three types: Project-User certificates, service certificates, and application issued certificates. We also described how we address encryption in transit with TLS, as well as the authentication methods to our web front-end based on JWT and API Tokens and integration with third-party services.

Follow us on Twitter

Like us on Github

Unifying Single-host and Distributed Machine Learning with Maggy

>
ML Best Practices
>
6/26/2020
>
Moritz Meister

This blog covers the oblivious training function and the internals of Maggy presented at Spark+AI Summit 2020, on June 26th.

TLDR; Maggy is an open-source framework for distributed machine learning. In this post, we introduce a new unified framework for writing core ML training logic as “oblivious training functions”. Maggy enables you to reuse the same training code whether training small models on your laptop or reusing the same code to scale out hyperparameter tuning or distributed deep learning on a cluster. Maggy enables the replacement of the current waterfall development process for distributed ML applications, where code is rewritten at every stage, with an iterative development process.

Most of the publicly available ML source code for training models is not built to scale-out on many servers or GPUs. Getting started with deep learning is relatively easy these days, thanks to fast.ai, GitHub, and the blogosphere. The hard part for practitioners starts when the code examples found online need to be applied to more challenging domains, with larger and custom datasets, which in turn will require a bigger customized version of the model to fit that dataset. Using publicly available code as a starting point for model development on clusters, you will end up in a process similar to the one depicted in Figure 1.

Figure 1: A simplified view of the ML model development process, illustrating its iterative nature.

The software development process for ML models is rarely the perfect waterfall development model, as shown in Figure 1 without the green arrows. In the (discredited) waterfall development process, you would start out with requirements, then move on to design, implementation and test. The (current!) equivalent process in ML model development is the following, as shown in Figure 1 with the green arrows. You start out on your local machine with a subset of the data in order to explore and design the model architecture. Then you move to use a cluster of resources (such as GPUs) to more quickly find hyperparameters, run lots of parallel ablation studies (many skip this stage!), and finally scale out the training of the model on the large dataset using lots of resources. Then, you’re done, right? Wrong! You typically iterate through the stages, finding better hyperparameters, adding new features, rewriting for distribution, going from your laptop to the cluster and back again.

We rewrite our model training code for distribution as it offers many benefits – faster training of models using more GPUs, parallelizing hyperparameter tuning over many GPUs, and parallelizing ablation studies to help understand the behaviour and performance of deep neural networks. However, not only will the boiler plate model training code need to be modified, but as you move along the process, distribution will introduce additional obtrusive code artifacts and modifications, depending on the frameworks used. This will lead to a mix of infrastructure code and model code, with duplicated training logic, hyperparameters hard-coded into the training loop, additional tracking code to keep record of your changes and config files for experiments:

Figure 2: Model development creates a mix of code artefacts duplicating code for every step, making iterative development hard.

With such a code base, iterating becomes near impossible as it requires adapting many copies of redundant code. And finally, imagine handing the code off to an ML engineer to productionize the model.

The Oblivious Training Function

Figure 3: The oblivious training function makes training code reusable among all steps of the process.

We introduce an open-source framework, Maggy, that enables write-once training functions that can be reused in single-host Python programs and cluster-scale PySpark or Distributed TensorFlow programs. Training functions written with Maggy look like best-practice TensorFlow programs where we factor out dependencies using popular programming idioms (such as functions to generate models and data batches). We call this new abstraction for ML model development the oblivious training function, as the core model training logic supports distribution transparency, that is, the training code is not aware (oblivious) of whether it is being run on a single host or whether it is being executed on hundreds of devices in parallel. 

What does it mean for training code to be distribution transparent?

Transparency in distributed systems refers to hiding distribution-specific aspects of an application from the developer - for example, a developer invoking a function may not know (or need to know) if the function she is calling is local to her application or on a remote server. This means, distribution transparency enables developers to write code that is reusable between single-host and distributed instantiations of a program:

Figure 4: Distribution Transparency hides complexities related to distribution from the developer, making the same code executable on a single-host as well as in a large cluster. Transparency leads to DRY training code.

Building Blocks for Distribution Transparency

How does ML code have to be structured in order to be transparently distributed? Firstly, developers have to follow best practices and, secondly, developers must be aware of the difference between distribution contexts, that is, what characterizes, for example, distributed hyperparameter tuning vs. distributed training.

1. ML Development Best Practices:

The ML community has recently developed some best practices, which are already widely spread among developers. Taking a look at the new well-illustrated Keras Guides, you will notice a common approach with four techniques.

  • Modularize: By modularizing code into reusable functions, these functions become building blocks, making the code pluggable in order to construct different configurations of the model for hyperparameter optimization or ablation.
  • Parametrize: Instead of hardcoding parameters such as learning rate, regularization penalty or other hyperparameters, developers are encouraged to replace this with variables whenever possible, to have a single place for them to be changed.
  • Higher order training functions: instead of using instantiated objects for example for the training dataset, the input logic related to the data can be encapsulated in a function which is being used by a higher order function. By doing so also the data input pipeline can be parametrized. The same holds for the generation of the model, which can be encapsulated in a function returning the model.
  • Usage of callbacks at runtime: In order to be able to intercept and interact with the actual training loop, most ML frameworks such as TensorFlow and PyTorch offer the possibility to use callback functions that are  invoked by the framework at certain points in time during training, such as at the end of every epoch or batch. Callback functions  enable  runtime monitoring of training, and can, for example, also be used to add support to stop the training early (important in hyperparameter optimization).

2. Distribution Context

While a single-host environment is self-explanatory, there is a difference between the context of ML experiments, such as hyperparameter optimization or parallel ablation studies, and the distributed training of a single model. Both hyperparameter optimization and parallel ablation studies have weak scaling requirements (also known as embarrassingly parallel), because all workers execute independent pieces of work and have limited communication. For example, hyperparameter tuning involves training independent copies of the model with different hyperparameters or different architectures, in order to find the best performing configuration. Distributed training, however, is strong scaling, as it introduces  significant communication and coordination between the workers. As workers are training a single model, they continually exchange gradients, which are computed on independent shards of data (data parallel training). Many distributed training problems, in fact, become (network or disk) I/O bound as they scale.  Figure 5 illustrates the three contexts and the step in the model development process that they are applicable to.

Figure 5: Single-host vs. parallel multi-host vs. distributed multi-host context and their applicability to the steps of the process.

Being aware of the different contexts and applying popular programming idioms, it becomes apparent what it means for the oblivious training function. It is no longer the developer herself who instantiates and launches the training function, but the framework that will invoke the training function as it is aware of the current context and it will take care of the distribution related complexities. That means, for exploration, the framework can be used to fix all parameters. For hyperparameter optimization experiments, the framework will take care of generating potentially good hyperparameter combinations and parameterizing the oblivious training function with them to be launched on different workers. For distributed training, it means setting up the environment for workers to discover each other and wrapping the model code with a distribution strategy.

Figure 6: The oblivious training function as an abstraction allows us to let the system take care of distributed system related complexities.

Putting it all together

Having the building blocks at hand, how do we write the model training code in Maggy? Let us take a look at the latest best-practices MNIST example that already factors the model configuration, dataset preparation and training logic into functions. Building on this example, we will show the modifications to the code that are needed to construct an oblivious training function in Maggy. It is important to note that all modifications are still vanilla Python code, and can, therefore, be run as is on a single host environment. Let’s start with the boiler plate with the two functions and the training logic:

1. Model Definition

def get_model():
   model = tf.keras.Sequential([
       tf.keras.Input(shape=(784,)),
       tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
       tf.keras.layers.Conv2D(32, name='conv1', kernel_size=3, activation='relu'),
       tf.keras.layers.MaxPooling2D(pool_size=2),
       tf.keras.layers.Conv2D(64, name='conv2', kernel_size=2, activation='relu'),
       tf.keras.layers.MaxPooling2D(name='pool1', pool_size=2),
       tf.keras.layers.Flatten(),
       tf.keras.layers.Dropout(0.1),
       tf.keras.layers.Dense(128, activation='relu'),
       tf.keras.layers.Dense(10)
   ])
   return model

2. Data set generation

def get_dataset():
   batch_size = 32
   num_val_samples = 10000    # Return the MNIST dataset in the form of a `tf.data.Dataset`.
   (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()    # Preprocess the data (these are Numpy arrays)
   x_train = x_train.reshape(-1, 784).astype("float32") / 255
   x_test = x_test.reshape(-1, 784).astype("float32") / 255
   y_train = y_train.astype("float32")
   y_test = y_test.astype("float32")    # Reserve num_val_samples samples for validation
   x_val = x_train[-num_val_samples:]
   y_val = y_train[-num_val_samples:]
   x_train = x_train[:-num_val_samples]
   y_train = y_train[:-num_val_samples]
   return (
       tf.data.Dataset.from_tensor_slices((x_train, y_train))
              .shuffle(50000).repeat().batch(batch_size),
       tf.data.Dataset.from_tensor_slices((x_val, y_val)).
              .shuffle(10000).repeat().batch(batch_size),
       tf.data.Dataset.from_tensor_slices((x_test, y_test))
              .shuffle(10000).repeat().batch(batch_size),
   )

3. Training logic

model = get_model()
model.compile(
   optimizer=tf.keras.optimizers.Adam(),
   loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
   metrics=['accuracy', tf.keras.metrics.SparseCategoricalAccuracy()],
)# Train the model on all available devices.
train_dataset, val_dataset, test_dataset = get_dataset()
model.fit(train_dataset, epochs=2, steps_per_epoch=1000, validation_data=val_dataset, validation_steps=100)# Test the model on all available devices.
metrics = model.evaluate(test_dataset, steps=100)
metric_dict = {out: metrics[i] for i, out in enumerate(model.metrics_names)}

1. Model generation

We are parametrizing the model itself, by replacing hyperparameters with arguments.

Parametrizing the model definion

2. Dataset generation

The dataset generation function stays unchanged in this case, but similar to the model, this function could be parametrized

def get_dataset():
   batch_size = 32
   num_val_samples = 10000    # Return the MNIST dataset in the form of a `tf.data.Dataset`.
   (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()    # Preprocess the data (these are Numpy arrays)
   x_train = x_train.reshape(-1, 784).astype("float32") / 255
   x_test = x_test.reshape(-1, 784).astype("float32") / 255
   y_train = y_train.astype("float32")
   y_test = y_test.astype("float32")    # Reserve num_val_samples samples for validation
   x_val = x_train[-num_val_samples:]
   y_val = y_train[-num_val_samples:]
   x_train = x_train[:-num_val_samples]
   y_train = y_train[:-num_val_samples]
   return (
       tf.data.Dataset.from_tensor_slices((x_train, y_train))
              .shuffle(50000).repeat().batch(batch_size),
       tf.data.Dataset.from_tensor_slices((x_val, y_val)).
              .shuffle(10000).repeat().batch(batch_size),
       tf.data.Dataset.from_tensor_slices((x_test, y_test))
              .shuffle(10000).repeat().batch(batch_size),
   )

3. Training logic

The training logic is wrapped in a parametrized and pluggable function, the oblivious training function. Again, hyperparameters are passed as arguments to the function. Additionally, the dataset and model generation functions are replaced with arguments, in order to be able to let the system, for example, replace the dataset generator with an alternative one - we use this to drop features for ablation studies. Last, but not least, the training function should return its current performance as a metric to be optimized in hyperparameter optimization. This is needed to make Maggy aware of the desired optimization metric.

Adjust Training Logic to be callable with different parameters

Note that up to this point, all modifications are pure Python code and, hence, the training function can still be run in a single host environment by calling it yourself in a Notebook with a fixed set of parameters and by passing the model and dataset generation functions as arguments.

Finally, to execute the function in a different distribution context, Maggy is used:

from maggy import experiment
experiment.set_dataset_generator(gen_dataset)
experiment.set_model_generator(gen_model)# Hyperparameter optimization
experiment.set_context('optimization', 'randomsearch', searchspace)
result = experiment.lagom(train_fun)
params = result.get('best_hp')# Distributed Training
experiment.set_context('dist_training', 'MultiWorkerMirroredStrategy', params)
experiment.lagom(train_fun)# Ablation study
experiment.set_context('ablation', 'loco', ablation_study, params)
experiment.lagom(train_fun)

Maggy requires additional configuration information for hyperparameter optimization, such as a search space definition and the optimization strategy to be used. In the case of distributed training, the distribution strategy is needed as well as a set of parameters to fix the model to. These parameters can either be taken from the previous hyperparameter tuning experiments or input manually. Lagom is the API to launch the function on a Spark cluster. 

Future Work

You can try out Maggy for hyperparameter optimization or ablation studies now on Hopsworks.ai and keep an eye on Maggy's GitHub repo for the oblivious training function to be released as a pure Spark version or wait until the next release of Hopsworks, that will include full support. Maggy is still a project under heavy development and our mission with Maggy is to provide a new way of writing machine learning applications that reduces the burden on Data Scientists becoming distributed systems experts. By following the best practices we are able to keep the high-level APIs of frameworks like Keras and PyTorch free of distribution obtrusive code. 

Summary

In this blog, we introduced a new feature to an open-source framework, Maggy, that enables write-once training functions that can be reused in single-host Python programs and cluster-scale PySpark programs. Training functions written with Maggy look like best-practice TensorFlow programs where we factor out dependencies using popular programming idioms (such as functions to generate models and data batches). In a single Jupyter notebook, developers can mix vanilla Python code to develop and test models on their laptop with PySpark-specific cells that can be run when a cluster is available using a PySpark kernel, such as Sparkmagic. This way, iterative development of deep learning models now becomes possible, moving from the laptop to the cluster and back again, with DRY code in the training function – as all phases reuse the same training code.

Watch our demo presented at the Spark+AI Summit 2020

References

Meister et al. (2020). Towards Distribution Transparency for Supervised ML With Oblivious Training Functions. Published in the MLOps workshop of MLSys’20.

Hello Asynchronous Search for PySpark, published on October 19, 2019.

Manage your own Feature Store on Kubeflow with Hopsworks

>
ML Best Practices
>
6/15/2020
>
Jim Dowling

Feature stores are key components in enterprises’ machine learning/artificial intelligence architectures. In previous blog posts, we introduced the feature store, MLOps with a feature store, and how to use the Hopsworks Feature Store in Databricks and AWS Sagemaker. In this blog post, we focus on how to integrate Kubeflow with the Hopsworks Feature Store. Hopsworks is available as an open-source platform, but integrations with external platforms, like Kubeflow, are only available on either our SaaS (www.hopsworks.ai) or Enterprise versions.

Hopsworks is a modular platform that includes a feature store, a compute engine ((Py)Spark, Python, Flink), a data science studio, a file system (HopsFS/S3), and model serving/monitoring support. The Hopsworks Feature Store can be used as a standalone feature store by Kubeflow.  As Kubernetes has limited support for Spark (Spark-on-k8s still has problems with shuffle, as of June 2020), Hopsworks is often used for both its feature store and Spark and scale-out deep learning capabilities. 

Hopsworks offers a centralized platform to manage, govern, discover, and use features. Features can be used:

  • at scale to create train/test datasets, 
  • for model scoring in analytical (batch application) models, 
  • at low latency by operational models to enrich feature vectors.

Get Started

Before you begin, make sure you have started a Hopsworks cluster. If you are on AWS, we recommend using our managed platform Hopsworks.ai. If you are on GCP, Azure or on-premises, as of June 2020, you have to use the hopsworks-installer script to install Hopsworks.

Hopsworks should be installed so that Kubeflow has access to the private IPs of the feature store services: Hive Server 2 and the Hopsworks REST API endpoint. On GCP, this means you should install your Hopsworks cluster in the same Region/Zone as your Kubeflow cluster. Similarly, on AWS, your Hopsworks cluster should be installed in the same VPC/subnet as Kubeflow. And on Azure, your Hopsworks cluster should be installed in the same resource group as your Kubeflow cluster.

API Key

From a Jupyter notebook in Kubeflow, you need to be able to authenticate and interact with the Hopsworks Feature Store. As such you need to get an API key from Hopsworks. You can generate an API key by clicking on your username in the top right of the window, click on Settings and select API KEY.


You need to choose the featurestore, jobs, and project scopes when creating your API key. You should save your API key to a file, with the path API_KEY_FILE, that will be accessible from your Jupyter notebook in Kubeflow. 

Hopsworks-cloud-sdk

With the API key configured correctly, in your KubeFlow Jupyter notebook, you should be able to install the hopsworks-cloud-sdk library (https://pypi.org/project/hopsworks-cloud-sdk/) using PIP:

>>> !pip install hopsworks-cloud-sdk ~= 1.3

Make sure that the hopsworks-cloud-sdk library version matches the installed version of Hopsworks.

Establish the first connection


From your Jupyter notebook, the API Key should now be readable from the local file system at the path API_KEY_FILE, and the hopsworks-cloud-sdk library should be installed. You should be now able to establish a connection to the feature store, and start using the Hopsworks - Kubeflow integration with this connect call:

import hops.featurestore as fs
fs.connect(
'my_instance',   # Private IP/DNS of your Feature Store instance
'my_project',   # Name of your Hopsworks Feature Store project
secrets_store=file,
api_key_file=API_KEY_FILE
)

Upcoming improvements

Several exciting improvements are coming to the Hopsworks feature store APIs in the next couple of weeks. The most important one is a more expressive API for joining features together. The new API is heavily inspired by Pandas dataframe joining and should make life easier for data scientists. Moreover, we are adding the capability to register a small Pandas dataframe as a feature group directly from a python-kernel in a Jupyter notebook. It will also be possible to ingest Pandas dataframes as feature groups without the need for PySpark.

Learn more with our demo!


Follow us on Twitter

Like us on Github

How to Build your own Feature Store

>
ML Best Practices
>
5/26/2020
>
Jim Dowling

As of May 2020, Logical Clocks are the only vendor of a Feature Store for machine learning (ML) and the only maker of a fully open-source and cloud-native Feature Store for ML. As such, we have many conversations with companies and organizations who are deciding between building their own feature store and buying one. Given the increasing interest in building feature stores, we thought we would share our experience of building one and motivate some of the decisions and choices we took (and did not take) to help others who are considering following us down the same path. 

8 Benefits of a Feature Store for ML

Here we list some of the reasons for wanting a feature store for ML in the first place.

1. Consistent Feature Engineering for Training Data and Serving 

The feature store can eliminate the need for two different implementations for features: one when training a model and one when serving a model. With a feature store, you can have a single feature pipeline that computes the features and stores them into both an offline and online stores for use in both training models and serving models, respectively. The offline feature store needs to support large volumes of data for model training and for use by batch (analytical) applications for model scoring. The online store needs to support low latency access to features for models served in production.

2. Encourage Feature Reuse

Features should be reused between different models. In Twitter, they have a “sharing adoption” metric to evaluate the success of their feature store internally. The “sharing adoption” measures the number of teams that reuse in production models features created by other teams.

3. System support for Serving of Features 

Operational models often need low latency access to features that may be computationally complex or generated from historical data. These types of features are often difficult or impossible to compute inside the applications themselves, either because of the lack of available data or because of the excessive time required to compute the features. The feature store can solve this problem by acting as a low-latency store for precomputed features for operational models (used by online applications). 

4. Exploratory Data Analysis with a Feature Store

Data scientists can discover the available pre-computed features, the types of those features (numerical, categorical), their descriptive statistics , and the distribution of feature values. They can also view a small sample of feature values to help quickly identify any potential issues with using a given feature in a model.

5. Temporal Queries for Features

You would like to augment feature store queries with temporal logic. That is, you want to know the value of a given feature at:

  • an instant in time (for example, when joining features together from different feature groups);
  • a time interval - a length of time (for example, the last 3 months);
  • a time period: an anchored duration of time (for example, training data for years 2012-2018, test data for 2019). 

In relational databases, temporal queries are typically supported by a user-defined table that keeps a full history of data changes and allows easy point in time analysis, such as SQL Server. In scalable SQL systems, examples of columnar storage formats that support temporal queries are Apache Hudi and Delta Lake as well as streaming support in Apache Flink. These systems require increased storage (to store updates as well as the current values) for the ability to query the value of features at points in time, time intervals or time periods.

6. Security, Governance, and Tracking

The feature store is a central repository for an organization’s features enabling them to be access controlled, governed, and to have their usage tracked. The feature store also provides common standards for metadata, consistent documentation and coding standards for features. The repository can maintain popularity counts for features to show which ones are widely used which ones could potentially be removed, enabling better management of features.

7. Reproducibility for Training Datasets 

The feature store should enable the re-creation of training datasets for given points-in-time to enable the reproducibility of models. An alternative to recreating training datasets is to archive them, but for many industries, such as healthcare and insurance, they will need to be reproducible for regulatory reasons. The ability to recreate training datasets for models is also useful for debugging models, even when you are not required by law to keep the training dataset.

8. Data Drift for Model Serving

The feature store can compute and store statistics over training datasets and make those statistics available to model serving platforms via API calls. In operational models, the training data statistics can then be compared with statistics computed over time windows of live data (last minute, hour, day) sent to the model for predictions. Simple (or complex) statistical tests can identify data drift - when live feature values diverge significantly from the model’s training data. 

Feature Store Design Flow Chart

The feature store design flow chart below shows some of the decisions that need to be taken when deciding to roll your own feature store. This set of decisions is obviously not complete, and we omitted systems design issues that are commonly found in any scalable system, such as schema design, API design, language support, and platform support (on-premises, cloud-native).

The first decision point is whether you really want to go ahead and build a data platform to support your ML efforts, knowing the considerable time it will take (at least 6-12 months before you will have anything production-ready)  and the future costs of maintaining and updating your platform. If you decide that building a feature store is too much, then give us a shout - mention that Jim sent you and ask for the blog reader discount :)

If you still are determined to build one, you need to take your first big decision: is your feature store a library or a store (a materialized cache of your features)? If the only problem you want to solve is consistent features between training and serving, then the library approach may be suitable. In the library approach, you write your feature encoding functions in versioned libraries that are included in both training and serving pipelines. A downside to this is that both training and serving pipelines need to be implemented in the same (or compatible) programming languages. Another downside is that you may need to wait a long time to backfill training data, as you need to run a job to compute the features. Netflix’s early fact/feature store introduced shared, versioned feature encoders to ensure consistent feature engineering.

Your next decision is whether you want to reuse features across different models or not. Reusing features means you will need to join normalized features together to create training datasets and also when serving features. If you decide you do not want to reuse features, you will still be able to solve the problems of consistent feature engineering and system support for serving features. There are feature stores, like Condé Nest based on Cassandra, that have a single datastore used for storing training and serving features. 

The next decision is consequential if you are only working with analytical models and not online (operational) models, you might decide you only need a single (scalable) database to store your features. Assuming you are a typical user who needs the feature store for both training and serving models, you then need to decide if you need support for time-travel queries. Many online models with windowed features (how many times a user did ‘X’ in the last 15 minutes) often need time-travel support to ensure consistent features for feature serving and creating training data. Assuming you decide to add time-travel support, you now need to build on a system with, or add application support for, temporal queries.

Your next decision also depends on your choice of data stores for the offline (scalable SQL) and online (low latency feature serving) stores. In Hopsworks, uniquely, we have a unified metadata layer for our file system, HopsFS, our feature serving layer, MySQL Cluster, and our scalable SQL layer, Apache Hive (-on-Hops). We could easily add extended metadata to our unified metadata layer to describe features, their statistics, and tag them. A CDC (change-data-capture) API to our unified metadata layer enabled us to also index feature descriptions in Elasticsearch, which supports free-text search and searches do not affect the performance of the unified metadata service. The bad news if you do not have unified metadata for your dual databases is that you need to design and develop agreement protocols to ensure that the 3 different systems are kept in sync, presenting a consistent view of your features: your offline, online, and feature metadata platforms. This is a tough distributed systems engineering challenge - good luck!

Finally, you need to decide on a compute engine (maybe internal or external to your platform) for both joining features to create training datasets, but also to compute your features. You may decide on a domain-specific language (like Michelangelo or Zipline) or a more general purpose language or framework like Python (Feast) or Spark (Hopsworks).

Apart from all these issues, you also need to decide on whether you need a UI to discover and manage features (Hopsworks, Michelangelo, Twitter, Zipline) or not (Feast). You also need to decide on whether you need to support multiple feature stores (such as development, production, and sensitive feature stores), and access control to those different feature stores (as in Hopsworks).

Phew! Finally you have navigated some of the decisions that you need to make to tailor your feature store platform for your needs. Although we have not covered the problem of API design to your feature store, we can confide that most feature store platforms have gone through more than one generation of API (we were not immune to that, either). So, good luck, give us a call if you decide to buy rather than build.

Follow us on Twitter

Like us on Github

Hopsworks Feature Store for AWS SageMaker

>
ML Best Practices
>
5/18/2020
>
Fabio Buso

Feature stores are key components in enterprises’ machine learning/artificial intelligence architectures. In previous blog posts (Introduction to feature store, MLOps with a feature store, and Hopsworks Feature Store for Databricks) we focused on describing the key concepts and building blocks of the Hopsworks Feature Store. In this blog post we are going to focus on how to integrate AWS SageMaker with Hopsworks. Hopsworks is available on AWS as either a SaaS platform (www.hopsworks.ai) or as a custom Enterprise platform.

While Hopsworks provides all the tools to design and operate pipelines that go from raw data to serving models in production, it is also a modular platform. In particular, the Hopsworks Feature Store can be used as a standalone feature store by data science platforms, such as AWS SageMaker or Databricks. It offers AWS Sagemaker users a centralized platform to manage, discover, and use features - for both creating training datasets and for serving features to operational models. In this blog, we will cover how AWS Sagemaker users can, from within the comfort of their Jupyter notebook, perform exploratory data analysis with the feature store, discovering available features, and join features together to create train/test datasets - all from the comfort of your existing SageMaker notebook instance.

Exploratory Data Analysis with a Feature Store

Exploratory data analysis (EDA) is a key component of every data scientists’ job. The Hopsworks Feature Store provides data scientists with a repository of features ready to be used for training models. Data scientists can browse the available features, understand the features by inspecting their metadata, investigate pre-computed feature statistics, and preview sample feature values. These are typical steps a data scientist takes to determine if a feature is a good fit for a specific model. With the Hopsworks AWS SageMaker integration, data scientists can perform these steps in a Jupyter notebook by making feature store API calls in Python.

In Hopsworks, features are organized into groups of related features in what is called a Feature Group. Exploration usually starts at the feature group level, by listing all the available feature groups in the feature store:

>>> featurestore.get_featuregroups()
['games_features_1',
'games_features_on_demand_tour_1',
'games_features_hudi_tour_1',
'season_scores_features_1',
'attendances_features_1',
'players_features_1',
'teams_features_1',
'imported_feature_name_1',
'imported_feature_name_online_1']

The following step allows data scientists to understand which individual features are available in a given feature group, and it returns the first five rows (a data sample):

>>> df = featurestore.get_featuregroup("teams_features").head(5)


The above API call will send a request to the Hopsworks Feature Store and return the result to the user in a Pandas dataframe df.

Individual features are the building blocks of the Hopsworks feature store. From SageMaker, data scientists can join features together and visualize them. As joining features is performed in Spark and SageMaker only provides a Python kernel, the join is executed on the Hopsworks Feature Store and the result returned to the user in a Pandas dataframe df. The complexity of the request is hidden behind the API call.

>>> df = featurestore.get_features(
["team_budget", "average_attendance", "average_player_age"]
).head(5)


Statistics and data visualization help to give an understanding of the data. Hopsworks allow users to compute statistics such as the distribution of feature values, feature correlation within a feature group, and descriptive statistics (Min, Max, Averages, Counts) on the different features.

The statistics are shown in the Hopsworks Feature Store UI, but they are also available from a notebook in SageMaker:

>>> featurestore.visualize_featuregroup_correlations("players_features")

Generate train/test datasets

Once you have explored the feature store and identified which features you need for your model, you can create a training dataset (the train and test data you need to train and evaluate a model, respectively). A training dataset is a materialization of multiple features joined together, potentially coming from different feature groups. The joining of features together on-demand, enables data scientists to reuse the same features in many different training datasets. Once features have been joined together into a dataframe, they can be stored in a ML framework friendly file format on a storage platform of choice, such as S3. For example, if you are training a TensorFlow model, you may decide to store your training dataset in TensorFlow’s native TFRecord file format, in a bucket on S3, s3_bucket.

>>> featurestore.create_training_dataset(
    training_dataset = "team_position_prediction",
    features =  ["team_budget", "average_attendance", "average_player_age"],
    training_dataset_version = latest_version + 1,
    data_format=’tfrecords’,
    sink=s3_bucket
)

In the above example, the feature store joins the list of features together and saves the result in files in TFRecord format in a S3 bucket. The S3 bucket needs to be defined inside a connector in the Hopsworks Feature Store.  In practice, what happens is that the SageMaker notebook asks the Hopsworks Feature Store to start a Spark job to produce the training dataset. When the job has completed on Hopsworks, you’ll be able to use the training dataset, typically in a different notebook, to train your model.

Get Started

Before you begin, make sure you have started a Hopsworks cluster using our managed platform Hopsworks.ai. The Hopsworks - SageMaker integration is an enterprise only feature and Hopsworks.ai gives you access to it. The first time you use the Hopsworks - SageMaker integration, there are a few simple steps that you need to perform to configure your SageMaker environment.

API Key

From SageMaker you need to be able to authenticate and interact with the Hopsworks Feature Store. As such you need to get an API key from Hopsworks. You can generate an API key by clicking on your username in the top right of the window, click on Settings and select API KEY.


You need to choose the featurestore, jobs, and project scopes when creating your API key. You should upload the API key as a secret on the AWS Secrets Manager service. The Hopsworks SageMaker integration also supports reading the API key from the AWS Parameter Store or a local file. The documentation (https://hopsworks.readthedocs.io) covers the setup for all the cases.

To use the AWS Secrets Manager, you should first find the IAM Role of your SageMaker notebook - in this case it is AmazonSageMaker-ExecutionRole-20190511T072435.


Create a new secret called hopsworks/role/[MY_SAGEMAKER_ROLE] where the [MY_SAGEMAKER_ROLE] is the same name as the IAM Role you retrieved in the previous step. The key should be api-key and the value you should be the API Key you copied from Hopsworks in the first step.



Finally we need to give the IAM role of the SageMaker notebook permissions to read the secret we just created. In the AWS Management Console, go to IAM, select Roles and then the role that is used when creating SageMaker notebook instances. Select Add inline policy. Choose Secrets Manager as service, expand the Read access level and check GetSecretValue. Expand Resources and select Add ARN. Paste the ARN of the secret created in the previous step with the AWS Secrets Manager. Click on Review, give the policy a name und click on Create policy.

After this step, your Sagemaker notebook when run as the above IAM Role will have permission to read the Hopsworks API key from the Secrets Manager service.

Hopsworks-cloud-sdk

With the API key configured correctly, in your AWS Sagemaker Jupyter notebook, you should be able to install the hopsworks-cloud-sdk library (https://pypi.org/project/hopsworks-cloud-sdk/) using PIP:

>>> !pip install hopsworks-cloud-sdk ~= 1.2

Make sure that the hopsworks-cloud-sdk library version matches the installed version of Hopsworks.

Establish the first connection

With the API Key configured and the library installed, you should be now able to establish a connection to the feature store, and start using the Hopsworks - AWS SageMaker integration.

import hops.featurestore as fs
fs.connect(
'my_instance',                       # DNS of your Feature Store instance
'my_project',                     # Name of your Hopsworks Feature Store project
secrets_store = 'secretsmanager')   # Either parameterstore, secretsmanager, or file

Try it out now with Hopsworks.ai

You can now try out the Hopsworks Feature Store and the SageMaker integration by starting a Hopsworks instance on Hopsworks.ai and running this example Jupyter notebook on your SageMaker instance: https://github.com/logicalclocks/hops-examples/blob/master/notebooks/featurestore/aws/SageMakerFeaturestoreTourPython.ipynb.

The Hopsworks Community is also available if you need help with your setup.

Upcoming improvements

Several exciting improvements are coming to the Hopsworks feature store APIs in the next couple of weeks. The most important one is a more expressive API for joining features together. The new API is heavily inspired by Pandas dataframe joining and should make life easier for data scientists. Moreover, we are adding the capability to register a small Pandas dataframe as a feature group directly from a SageMaker notebook. While we still encourage you to use a Spark environment to engineer complex features with lots of data, it will also be possible to ingest Pandas dataframes as feature groups without the need for PySpark.

Learn More:


Hopsworks Feature Store for Databricks

>
ML Best Practices
>
4/23/2020
>
Fabio Buso


TLDR; Feature Stores have become the key piece of data infrastructure for machine learning platforms. They manage the whole lifecycle of features: from training different models to providing low-latency access to features by online-applications for model inference. This article introduces the Hopsworks Feature Store for Databricks, and how it can accelerate and govern your model development and operations on Databricks.

What is a Feature Store?

The Feature Store for machine learning is a feature computation and storage service that enables features to be registered, discovered, and used both as part of ML pipelines as well as by online applications for model inferencing. Feature Stores are typically required to store both large volumes of feature data and provide low latency access to features for online applications. As such, they are typically implemented as a dual-database system: a low latency online feature store (typically a key-value store or real-time database) and a scale-out SQL database to store large volumes of feature data for training and batch applications. The online feature store enables online applications to enrich feature vectors with near real-time feature data before performing inference requests.  The offline feature store can store large volumes of feature data that is used to create train/test data for model development or by batch applications for model scoring. The Feature Store solves the following problems in ML pipelines: 

  • reuse of feature pipelines by sharing features between teams/projects;
  • enables the serving of features at scale and with low latency for online applications;
  • ensures the consistency of features between training and serving - features are engineered once and can be cached in both the Online and Offline Feature Stores;
  • ensures point-in-time correctness for features - when a prediction was made and an outcome arrives later, we need to be able to query the values of different features at a given point in time in the past. 


The Feature Store for ML consists of both an Online and Offline database and Databricks can be used to transform raw data from backend systems into engineered features cached in the online and offline stores. Those features are made available to online and batch applications for inferencing and for creating train/test data for model training.

Engineer Features in Databricks, publish to the Feature Store

The process for ingesting and featurizing new data is separate from the process for training models using features that come from potentially many different sources. That is, there are often differences in the cadence for feature engineering compared to the cadence for model training. Some features may be updated every few seconds, while others are updated every few months. Models, on the other hand, can be trained on demand, regularly (every day or every week, for example), or when monitoring shows a model’s performance has degraded. Feature engineering pipelines are typically triggered at regular intervals when new data arrives or on-demand when source code is pushed to git because changes were made in how features are engineered.


Feature pipelines have a natural cadence for each data source, and the cached features can be reused by many downstream model training pipelines. Feature Pipelines can be developed in Spark or Pandas applications that are run on Databricks. They can be combined with data validation libraries like Deequ to ensure feature data is correct and complete.


The feature store enables feature pipelines to cache feature data for use by many downstream model training pipelines, reducing the time to create/backfill features. Groups of features are often computed together and have their own natural ingestion cadence, see figure above. Real-time features may be updated in the online feature store every few seconds using a streaming application, while batch features could be updated hourly, daily, weekly, or monthly.

In practice, feature pipelines are data pipelines, where the output is cleaned, validated, featurized data. As there are typically no guarantees on the correctness of the incoming data, input data must be validated and any missing values must be handled (often by either imputing them or ignoring them). One popular framework for data validation with Spark is AWS Deequ, as they allow you to extend traditional schema-based support for validating data (e.g., this column should contain integers) with data validation rules for numerical or categorical values. For example, while a schema ensures that a numerical feature is of type float, additional validation rules are needed to ensure those floats lie within an expected range. You can also check to ensure a columns’ values are unique, not null, that its descriptive statistics  are within certain ranges. Validated data is then transformed into numeric and categorical features that are then cached in the feature store, and subsequently used both to train models and for batch/online model inferencing.


import hsfs
# “prod” is the production feature store

conn = hsfs.connection(host="ea2.aws.hopsworks.ai", project="prod")
featurestore = conn.get_feature_store()

# read raw data and use Spark to engineer features
raw_data_df =  spark.read.parquet('/parquet_partitioned')
polynomial_features = raw_data_df.map(lambda x: x^2)

# Features computed together in a DataFrames are in the same feature group
fg = featurestore.create_feature_group(name='fg_revenue',
                                       version=1,                         
                                       type='offline')

fg.create(polynomial_features)
g.compute_statistics()

In this code snippet, we connect to the Hopsworks Feature Store, read some raw data into a DataFrame from  a parquet file, and transform the data into polynomial features. Then, we create a feature group, it’s version is ‘1’ and it is only to be stored in the ‘offline’ feature store. Finally, we ingest our new polynomial_dataframe into the feature group, and compute statistics over the feature group that are also stored in the Hopsworks Feature Store. Note that Pandas DataFrames are supported as well as Spark DataFrames, and there are both Python and Scala/Java APIs.

When a feature store is available, the output of feature pipelines is cached feature data, stored in the feature store. Ideally, the destination data sink will have support for versioned data, such as in Apache Hudi in Hopsworks Feature Store. In Hopsworks, feature pipelines upsert (insert or update) data into existing feature groups, where a  feature group is a set of features computed together (typically because they come from the same backend system and are related by some entity or key). Every time a feature pipeline runs for a feature group, it creates a new commit in the sink Hudi dataset. This way, we can track and query different commits to feature groups in the Feature Store, and monitor changes to statistics of ingested data over time.

You can find an example notebook for feature engineering with PySpark in Databricks and registering features with Hopsworks here.


Model Training Pipelines in Databricks start at the Feature Store 

Model training pipelines in Databricks can read in train/test data either directly as Spark Dataframes from the Hopsworks Feature Store or as train/test files in S3 (in a file format like .tfrecords or .npy or .csv or .petastorm). Notebooks/jobs in Databricks can use the Hopsworks Feature Store to join features together to create such train/test datasets on S3.


Model training with a feature store typically involves at least three stages:

  1. select the features from feature groups and join them together to build a train/test dataset. You may also here want to filter out data and include an optional timestamp to retrieve features exactly as they were at a point of time in the past;
  2. train the model using the training dataset created in step 1 (training can be further decomposed into the following steps: hyperparameter optimization, ablation study, and model training);
  3. validate the model using automated tests and deploy it to a model registry for batch applications and/or an online model server for online applications.

import hsfs
conn = hsfs.connection(host="ea2.aws.hopsworks.ai", project="prod")
featurestore = conn.get_feature_store()

# get feature groups from which you want to create a training dataset
fg1 = featurestore.get_feature_group('fg_revenue', version=1)
fg2 = featurestore.get_feature_group('fg_users', version=2)
# lazily join features

joined_features = fg1.select_all() \
                  .join(fg2.select(['user_id', 'age']), on='user_id')

sink = featurestore.get_storage_connector('S3-training-dataset-bucket')

td = featurestore.create_training_dataset(name='revenue_prediction',
                    version=1,
                    data_format='tfrecords',
                    storage_connector=sink,
                    split={'train': 0.8, 'test': 0.2})

td.seed = 1234
td.create(joined_features)

Data Scientists are able to rely on the quality and business logic correctness in published features and can therefore quickly export and create training datasets in their favourite data format.
You can find an example notebook for getting started with creating train/test datasets from Hopsworks in Databricks here.

Deploying the Hopsworks Feature Store for Databricks

The Hopsworks Feature Store is available as a managed platform for AWS (Hopsworks.ai) and as an Enterprise platform for Azure.

Hopsworks.ai for AWS

Hopsworks.ai is our new managed platform for the Hopsworks Feature Store on AWS. In its current version, it will deploy a Hopsworks Feature Store into your AWS account. From Hopsworks.ai, you can stop/start/backup your Hopsworks Feature Store.


The details for how to launch a Hopsworks Feature Store inside an existing VPC/subnet used by Databricks are found in our documentation. The following figures from Hopsworks.ai show you how you have to pick the same Region/VPC/Zone used by your Databricks cluster when launching Hopsworks.


You also need to expose the Feature Store service for use by Databricks, see the figure below. 


For some Enterprises, an alternative to deploying Hopsworks in the same VPC as Databricks is VPC peering. VPC peering requires manual work, and you can contact us for help in VPC peering.

Enterprise Hopsworks for Databricks Azure

On Azure, by default, Databricks is deployed to a locked resource group with all data plane resources, including a virtual network (VNet) that all clusters will be associated with. However, with VNet injection, you can deploy Azure Databricks into the same virtual network where the Hopsworks Feature Store is deployed. Contact us for more details on how to install and setup VNet injection for Azure with Hopsworks Feature Store. An alternative to VNet injection is VPC, and you can contact us for help in VPC peering.

Learn more

Summary

A new key piece of infrastructure for machine learning has now arrived for Databricks users - the Hopsworks Feature Store. It enables you to centralize your features for ML for easier discovery and governance, it enables the reuse of features in different ML projects, and provides a single pipeline or engineering features for both training and inference. The Hopsworks Feature Store is available today as either a managed platform or AWS, so you can spin up a cluster in just a few minutes, or as an Enterprise platform for either AWS or Azure. 

References

Follow us on Twitter

Star us on Github

ExtremeEarth scales AI to the Earth Observation Community with Hopsworks

>
ML Best Practices
>
4/15/2020
>
Theofilos Kakantousis

This article was originally published at Extreme Earth.

In recent years, unprecedented volumes of data are generated in various domains. Copernicus, a European Union flagship programme, produces more than three petabytes(PB) of Earth Observation (EO) data annually from Sentinel satellites [1]. This data is made readily available to researchers that are using it, among other things, to develop Artificial Intelligence (AI) algorithms in particular using Deep Learning (DL) techniques that are suitable for Big Data. One of the greatest challenges that researchers face however, is the lack of tools that can help them unlock the potential of this data deluge and develop predictive and classification AI models.

ExtremeEarth is an EU-funded project that aims to develop use-cases that demonstrate how researchers can apply Deep Learning in order to make use of Copernicus data in the various EU Thematic Exploitation Platforms (TEPs). A main differentiator of ExtremeEarth to other such projects is the use of Hopsworks, a Data-Intensive AI software platform for scalable Deep Learning. Hopsworks is being extended as part of ExtremeEarth to bring specialized AI tools for EO data and the EO data community in general.


Hopsworks, Earth Observation Data and AI in one Platform

Hopsworks is a Data-Intensive AI platform which brings a collaborative data science environment to researchers who need a horizontally scalable solution to developing AI models using Deep Learning. Collaborative means that users of the platform get access to different workspaces, called projects, where they can share data and programs with their colleagues, hence improving collaboration and increasing productivity. The Python programming language has become the lingua franca amongst data scientists and Hopsworks is a Python-first platform, as it provides all the tools needed to get started programming with Python and Big Data. Hopsworks integrates with Apache Spark and PySpark, a popular distributed processing framework. 

Hopsworks brings to the Copernicus program and the EO data community essential features required for developing aI applications at scale, such as distributed Deep Learning with Graphics Processing Units (GPUs) on multiple servers, as demanded by the Copernicus volumes of data. Hopsworks provides services that facilitate conducting Deep Learning experiments, all the way from doing feature engineering with the Feature Store [2], to developing Deep Learning models with the Experiments and Models services that allow them to manage and monitor Deep Learning artifacts such as experiments, models and automated code-versioning and much more [3]. Hopsworks storage and metadata layer is built on top of HopsFS, the award-winning highly scalable distributed file system [4], which enables Hopsworks to meet the extreme storage and computational demands of the ExtremeEarth project.

Hopsworks brings horizontally scalable Deep Learning for EO data close to where the data lives, as it can be deployed on Data and Information Access Services (DIAS) [5]. The latter provides centralised access to Copernicus data and information which combined with the AI for EO data capabilities that Hopsworks brings, an unparalleled data science environment is made available to researchers and data scientists of the EO data  community.


Challenges of Deep Learning with EO Data


Recent years have witnessed the performance leaps of Deep Learning (DL) models thanks to the availability of big datasets (e.g. ImageNet) and the improvement of computation capabilities (e.g., GPUs and cloud environments). Hence, with the massive amount of data coming from earth observation satellites such as the Sentinel constellation, DL models can be used for a variety of EO-related tasks. Examples of these tasks are sea-ice classification, monitoring of water flows, and calculating vegetation indices.

However, together with the performance gains comes many challenges for applying DL to EO tasks, including, but not limited to:


Labeled datasets for training; 

While collecting raw Synthetic-Aperture Radar (SAR) images from the satellites is one thing, labeling those images to make them suitable for supervised DL is yet a time consuming task. Should we seek help from unsupervised or semi-supervised learning approaches to eliminate the need for labeled datasets? Or should we start building tools to make annotating the datasets easier?

Interpretable and Human-understandable models or EO tasks;

Given enough labeled data, we can probably build a model with satisfactory performance. But how can we justify the reasons behind why the model makes certain predictions given certain inputs? While we can extract the intermediate predictions for given outputs, can we reach interpretations that can be better understood by humans?

Management of very large datasets;

Managing terabytes (TB) of data that can still fit into a single machine is one thing, but managing petabytes (PB) of data that requires distributed storage and provides a good service for the DL algorithms so as not to slow down the training and serving process is a totally different challenge. To further complicate the management, what about partial failures in the distributed file system? How shall we handle them? 

Heterogeneous data sources and modalities (e.g., SAR images from satellites, sensor readings from ground weather stations); 

How can we build models that effectively use multi-modalities? For example, how can we utilize the geo-location information in an image classification model?

DL architectures and learning algorithms for spectral, spatial, and temporal data;

While we might be able to perform preprocessing and design model architectures for RGB image classification, how do these apply to SAR images? Can we use the same model architectures? How to extract useful information from multi-spectral images? 

Training and fine-tuning (hyperparameter optimizations) of DL models;

Hyperparameters are those parameters of the training process (e.g., the learning rate of the optimizer, or the size of the convolution windows) that should be manually set before training. How can we effectively train models and tune the hyperparameters? Should we change the code manually? Or can we use frameworks to provide some kind of automation?

The real time requirements for serving DL models;

Once the training is done, we want to use our trained model to predict outcomes based on the newly observed data. Often these predictions have to be made in real-time or near-real-time to make quick decisions. For example, we want to update the ice charts of the shipping routes every hour. How to serve our DL models online to meet these real-time requirements?


Deep Learning Pipelines for EO Data with Hopsworks

A Data Science application in the domain of Big Data typically consists of a set of stages that form a Deep Learning pipeline. ُThis pipeline is responsible for managing the lifecycle of data that comes into the platform and is to be used for developing machine learning models. In the EO data domain in particular, these pipelines need to scale to the petabyte-scale data that is available within the Copernicus program. Hopsworks provides data scientists with all the required tools to build and orchestrate each stage of the pipeline, depicted in the following diagram.

 In detail, a typical Deep Learning pipeline would consist of:

  • Data Ingestion: The first step is to collect and insert data into the AI platform where the pipeline is to be run. A great variety of data sources can be used such as Internet of Things (IoT) devices, web-service APIs etc. In ExtremeEarth, the data typically resides on the DIAS which can be directly accessed from Hopsworks.
  • Data Validation: Tools such as Apache Spark that can cope with Big Data are typically employed to validate incoming data that is to be used in later stages. For example data might need to be parsed and cleaned up from duplicate or missing values or a simple transformation of an alphanumeric field to a numeric one might be needed.
  • Feature Engineering: Before making use of the validated data to develop DL models, the features that will be used to develop such  models need to be defined, computed and persisted. Hopsworks Feature Store is the service that data engineers and data scientists use for such tasks, as it provides rich APIs, scalability and elasticity to cope with varying data volumes and complex data types and relations. For example users can create groups of features or compute new features such as aggregations of existing ones. 
  • Model development (Training): Data scientists can greatly benefit from a rich experiment API provided by Hopsworks to run their machine learning code, whether it be TensorFlow, Keras, PyTorch or another framework with a Python API. In addition, Hopsworks manages GPU allocation across the entire cluster and facilitates distributed training which involves making use of multiple machines with multiple GPUs per machine in order to train bigger models and faster [9]
  • Model Serving & Monitoring: Typically the output of the previous stage is a DL model. To make use of it, users can submit inference requests by using the Hopsworks built-in elastic model serving infrastructure for TensorFlow and scikit-learn, two popular machine learning frameworks. Models can also be exported in the previous pipeline and downloaded from Hopsworks directly to be embedded into external applications, such as iceberg detection and water availability detection in food crops. Hopsworks also provides infrastructure for model monitoring, that is continuously monitoring the requests being submitted to the model and its responses and users can then apply their own business logic on which actions to take depending on how the monitoring metrics output changes over time.



Example Use Case: Iceberg Classification with Hopsworks


Drifting icebergs pose major threats to the safety of navigation in areas where icebergs might appear, e.g., the Northern Sea Route and North-West Passage. Currently, domain experts manually conduct what is known as an “ice chart” on a daily basis, and send it to ships and vessels. This is a time-consuming and repetitive task, and automating it using DL models for iceberg classification would result in generation of more accurate and more frequent ice charts, which in turn leads to safer navigation in concerned routes.

Iceberg classification is concerned with telling whether a given SAR image patch contains an iceberg or not. Details of the classification depends on the dataset that will be used. For example, given the Statoil/C-CORE Iceberg Classifier Challenge dataset [6], the main task is to train a DL model that can predict whether an image contains a ship or an iceberg (binary classification). 

Satellite radar images from the Statoil/C-CORE Iceberg Classifier Challenge [6]


The steps we took to develop and serve the model were the following:

  • First step is preprocessing. We read the data which is stored in JSON format and create a new feature which is the average of the satellite image bands.
  • Second step is inserting the data into the Feature Store which provides APIs for managing feature groups and creating training and test datasets. In this case, we created the training and test datasets in TFRecord format after scaling the images as we are using TensorFlow for training. 
  • Third step is building and training our DL model on Hopsworks. Since the dataset is not very complicated and we have a binary classification task, using a DL model that is very similar to LeNet-5 [7] yields 87% accuracy on the validation set after 20 epochs of training which takes 60 seconds to train on a Nvidia GTX1080. This step also includes hyperparameter tuning. Ablation studies, in which we remove different components (e.g., different convolutional layers, or dataset features) can also be employed to gain more insights about the model. Hopsworks provides efficient and easy support for hyperparameter tuning and ablation studies through a Python-based framework called Maggy [8]. Finally, to further increase the training speed, the distributed training strategy provided in Hopworks can be used.

The final step is to export and serve the model. Model is exported and saved into the Hopsworks “Models” dataset. Then we use the Hopsworks elastic model serving infrastructure to host TensorFlow serving which can scale with the number of inference requests.

Conclusion

In this blog post we described how the ExtremeEarth project brings new tools and capabilities with Hopsworks to the EO data community and the Copernicus program. We also showed how we have developed a practical use case by using Copernicus data and Hopsworks. We keep developing Hopsworks to make it even more akin to the tools and processes used by researchers across the entire EO community and we continue development of our use cases with more sophisticated models using even more advanced distributed Deep Learning training techniques. 


References

  1. https://workshop.copernicus.eu/sites/default/files/content/attachments/ajax/copernicus_overview.pdf 
  2. https://www.logicalclocks.com/blog/feature-store-the-missing-data-layer-in-ml-pipelines
  3. https://hopsworks.readthedocs.io/en/latest/
  4. https://www.arcos.inf.uc3m.es/ccgrid2017/calls/10th-ieee-international-scalable-computing-challenge-scale-2017/
  5. https://www.copernicus.eu/en/access-data/dias
  6. https://www.kaggle.com/c/statoil-iceberg-classifier-challenge/data 
  7. LeCun, Yann. "LeNet-5, convolutional neural networks." URL: http://yann. lecun. com/exdb/lenet 20 (2015): 5.
  8. https://maggy.readthedocs.io/en/latest/
  9. https://www.logicalclocks.com/blog/when-deep-learning-with-gpus-use-a-cluster-manager

MLOps with a Feature Store

>
ML Best Practices
>
2/14/2020
>
Fabio Buso

TLDR; If AI is to become embedded in the DNA of Enterprise computing systems, Enterprises must first re-align their machine learning (ML) development processes to include data engineers, data scientists and ML engineers in a single automated development, integration, testing, and deployment pipeline. This blog introduces platforms and methods for continuous integration (CI), continuous delivery (CD), and continuous training (CT) with machine learning platforms, with details on how to do CI/CD machine learning operations (MLOps) with a Feature Store. We will see how the Feature Store refactors the monolithic end-to-end ML pipeline into a feature engineering and a model training pipeline.

What is MLOps?

MLOps is a recent term that describes how to apply DevOps principles to automating the building, testing, and deployment of ML systems. The Continuous Delivery Foundation’s SIG-MLOps defines MLOps as  “the extension of the DevOps methodology to include Machine Learning and Data Science assets as first class citizens within the DevOps ecology”. MLOps aims to unify ML application development and the operation of ML applications, making it easier for teams to deploy better models more frequently. Martinfowler.com defines MLOps as:
“a software engineering approach in which a cross-functional team produces machine learning applications based on code, data, and models in small and safe increments that can be reproduced and reliably released at any time, in short adaptation cycles.”

Some of the major challenges of MLOps, compared to Devops, is how to deal with versioned data, not just versioned code, how to manage specialized hardware (graphical processing units (GPUs)), and how to manage data governance and compliance for models. 

DevOps vs MLOps

Traditional DevOps CI/CD Workflow triggered by changes to source code.


Git is the world’s most popular source-code version control system. It is used to track changes in source code over time and to support different versions of source code. Support for version control is a prerequisite for automation and continuous integration (CI) solutions as it enables it enables reproducible provisioning of any environment in a fully automated fashion. That is, we assume the configuration information required to provision the environment is stored in a version control system, as well as the source code for the system we will be testing. Typically, when working with DevOps, every commit to Git triggers the automated creation of packages that can be deployed to any environment using only information in version control.

In most DevOps setups, Jenkins is used together with Git as an automation server that builds, tests and deploys your versioned code in a controlled and predictable way. The typical steps Jenkins follows for a CI/CD pipeline is to: provision testing virtual machines (VMs)/containers, checkout code onto machines, compile the code, run tests, package binaries, and deploy binaries. For Java, this involves running a build tool like maven to compile, test, and package Java binaries before deploying the binaries in some staging or production system. For Docker, this means compiling a Dockerfile and deploying the Docker image to a Docker registry.


High Level MLOps CI/CD Worflow triggered by changes in either source code or data.


Perhaps the most defining characteristic of MLOps is the need to version data as well as code to enable reproducible workflows for training models. Git is not suitable as a platform for versioning data, as it does not scale to store large volumes of data. Luckily, others have been working on alternative platforms in recent years.

However, Git and Jenkins are not enough for MLOps, where the build process involves running a complex distributed workflow and we need both versioned code and versioned data to ensure reproducible automated builds. The workflow is what we call a ML pipeline, a graph of components, where each component takes input parameters and data, and at the end, a successful workflow run deploys a trained model to production.  A standard ML pipeline consists of at least the following components: validate incoming data, compute features on the incoming data, generate train/test data, train the model, validate the model, deploy the model, and monitor the model in production. This simplified pipeline can, in practice, be even more complex, where the model training stage can be broken into smaller components: hyperparameter tuning, ablation studies, and distributed training. 

There are many already several end-to-end ML frameworks that support orchestration frameworks to run ML pipelines: TensorFlow Extended (TFX) supports Airflow, Beam and Kubeflow pipelines, Hopsworks supports Airflow, MLFlow supports Spark, and Kubeflow supports Kubeflow pipelines. These frameworks enable the automated execution of workflows, the ability to repeat steps, such as re-training a model, with only input parameter changes, the ability to pass data between components, and the ability to specify event-based triggering of workflows (e.g., at a specific time of day, on the arrival of new data, or when model performance degrades below a given level). TFX, MLFlow, and Hopsworks also support distributed processing using Beam and/or Spark, enabling scale-out execution on clusters using very large amounts of data.

MLOps: versioned Code and Data 

Data Versioning, Git-Style

DVC, developed by the affable Dmitry Petrov,  provides an open-source tool for versioning files/objects in cloud storage that uses Git to store metadata about files and reflinks (that support transparent copy-on-write for data files) to ensure consistency between git entries and the data files. Similarly, Pachyderm, a ML platform on Kubernetes,  also provides a data versioning platform using git-like semantics. However, these git-like approaches just track immutable files, they do not store the differences between files. They cannot handle time-travel queries, such as “give me train/test data for the range between the years 2016-2018” or “give me the value of these features on the 6th September 2018”. Without time-travel, they cannot support incremental feature engineering: compute features only on the data that has changed since the last time run (1 hour ago, a day ago, etc).

Data Versioning with Time-Travel Queries and Incremental Pulling

An alternative to git-like data versioning systems is to use a transactional data-lake that provides versioned, structured datasets. A versioned dataset does not just have a version of the schema for its data (schemas may evolve over time), but also updates to the data-lake are executed atomically and identified by a commit. The most well known such platforms are the open-source projects: Delta Lake, Apache Hudi, Apache Iceberg. Here users can perform time-travel queries that return the data at a given point-in-time (commit-id), or the data for a given time-interval, or the changes to the data since a given point in time. They execute time travel queries efficiently using indexes (bloom filters, z-indexes, data-skipping indexes) that massively reduce the amount of data that needs to be read from the file system or object store. Transactional data lakes also enable incremental feature engineering - compute features only for the data that has changed in the last hour or day - by enabling clients to read only those changes in a dataset since a given point in time.

Hopsworks Feature Store 

The Feature Store for machine learning is a feature computation and storage service that enables features to be registered, discovered, and used both as part of ML pipelines as well as by online applications for model inferencing. Feature Stores are typically required to store both large volumes of feature data and provide low latency access to features for online applications. As such, they are typically implemented as a dual-database system: a low latency online feature store (typically a key-value store or real-time database) and a scale-out SQL database to store large volumes of feature data for training and batch applications. The online feature store enables online applications to enrich feature vectors with near real-time feature data before performing inference requests.  The offline feature store can store large volumes of feature data that is used to create train/test data for model development or by batch applications for model scoring. The Feature Store solves the following problems in ML pipelines: 

  • reuse of feature pipelines by sharing features between teams/projects;
  • enables the serving of features at scale and with low latency;
  • ensures the consistency of features between training and serving - features are engineered once and can be cached in both the Online and Offline Feature Stores;
  • ensures point-in-time correctness for featuress - when a prediction was made and an outcome arrives later, we need to be able to query the values of different features at a given point in time in the past. 
The Feature Store for ML consists of both an Online and Offline database and transforms raw data from backend systems into engineered features that are made available to online and batch applications for inferencing and to Data Scientists to create train/test data for model development.

Most hyperscale AI companies have built internal feature stores (Uber, Twitter, AirBnb, Google, Facebook, Netflix, Comcast), but there are also two open-source Feature Stores: Hopsworks Feature Store (built on Apache Hudi/Hive, MySQL Cluster and HopsFS) and Feast (built on Big Query, BigTable, and Redis). Other databases used by existing Feature Stores include Cassandra, S3, and Kafka, and custom key-value stores.


End-to-End ML Pipelines with the Hopsworks Feature Store

Both MLOps and DataOps CI/CD pipelines differ from traditional DevOps in that they may be triggered by new data arriving for processing (as well as triggering due to updates to the source code for the data engineering or model training pipelines). DataOps is concerned with automating test and deployment of data processing pipelines (or feature pipelines, in our case), with stages such as data validation and data pipelines. MLOps, on the other hand, is concerned with the automation of training and deploying production ML models, with stages such as model training, model validation, and model deployment.


The Feature Store enables ML workflows to be decomposed into two workflows: (1) a “DataOps” workflow for engineering features and validating incoming data that caches the features in the Feature Store, and (2) a “MLOps” workflow for training models using features from the Feature Store, analyzing and validating those models, deploying them into online model serving infrastructure, and monitoring model performance in production.


Some ML lifecycle frameworks, such as TensorFlow Extended (TFX) and MLFlow, are based around end-to-end ML pipelines that start with raw data and end in production models. However, the first steps of an end-to-end ML pipeline, that take raw data and turn it into training data for models can be very expensive. According to Airbnb, without a feature store, creating train/test data can take up to 60-80% of data scientists time. The feature store enables transformed data (features) to be reused in different models. When you have a feature store, you no longer need end-to-end ML pipelines from raw data to models. You can decompose end-to-end ML pipelines into two separate pipelines that each run at their own cadence: (1) feature pipelines that ingest data from backend systems, validate it, featurize it and cache it in the feature store, and (2) training pipelines that train models from feature data, validate those models, and deploy them to production. 

The motivation for introducing the Feature Store for MLOps is that the process for ingesting and featurizing new data is separate from the process for training models using features that come from potentially many different sources. That is, there are often differences in the cadence for feature engineering compared to the cadence for model training. Some features may be updated every few seconds, while others are updated every few months. Models, on the other hand, can be trained on demand, regularly (every day or every week, for example), or when monitoring shows a model’s performance has degraded. Feature engineering pipelines are typically triggered at regular intervals when new data arrives or on-demand when source code is pushed to git because changes were made in how features are engineered.

ML Pipelines are Stateful 

The best practice when developing data pipelines is for them to be stateless and idempotent, so that they can be safely re-run in case of failures. ML pipelines, however, have state. Before you deploy a model to production, you need some contextual information - does this model perform better than the currently deployed model? This decision requires state about the currently deployed model. Ideally, we also want historical state, so we can observe and evaluate the performance of models over time and the processing time/success-rate of building models over time. Hopsworks, TFX and MLFlow provide a metadata store to enable ML pipelines to make stateful decisions, to log their execution steps, store the artifacts they produce, and store the provenance of the final models. Both TFX and MLFlow are obtrusive - they make developers re-write the code at each of the stages with their component models (with well-defined inputs and outputs to each stage). This way, they can intercept input parameters to components and log them to the metadata store. Hopsworks provides an inobtrusive metadata model, where pipelines read/write to the HopsFS (HDFS) file system and use Hopsworks APIs to interact with its feature store. This way, metadata events, artifacts, executions, and provenance are implicitly stored to the metadata store without the need to rewrite notebooks or python programs, as is needed in TFX or MLFlow.

Feature Pipelines feed the Hopsworks Feature Store


Feature pipelines have a natural cadence for each data source, and the cached features can be reused by many downstream model training pipelines.


The feature store enables feature pipelines to cache feature data for use by many downstream model training pipelines, reducing the time to create/backfill features. Groups of features are often computed together and have their own natural ingestion cadence, see figure above. Real-time features may be updated in the online feature store every few seconds using a streaming application, while batch features could be updated hourly, daily, weekly, or monthly.

In practice, feature pipelines are data pipelines, where the output is cleaned, validated, featurized data. As there are typically no guarantees on the correctness of the incoming data, input data must be validated and any missing values must be handled (often by either imputing them or ignoring them). Two popular frameworks for data validation are TFX data validation and AWS Deequ, as they allow you to extend traditional schema-based support for validating data (e.g., this column should contain integers) with data validation rules for checking if numerical or categorical values are as expected. For example, while a schema ensures that a numerical feature is of type float, additional validation rules are needed to ensure those floats lie within an expected range. You can also check to ensure a columns’ values are unique, not null, that its descriptive statistics  are within certain ranges. Validated data is then transformed into numeric and categorical features that are then cached in the feature store, and subsequently used both to train models and for batch/online model inferencing. 


Feature Pipelines belong to the DataOps paradigm, where frameworks like Spark, PySpark, Pandas, and Featuretools are used along with data validation libraries like TFX data validation and Deequ.


Feature pipelines share many of the same best-practice DevOps practices with data pipelines. Some of the types of automated tests for data/features, include:

  • unit test and integration tests for all featurization code (Jenkins can run these tests when code is pushed to Git);
  • test that feature values fall within expected ranges (TFX data validation or Deequ);
  • test the uniqueness, completeness, and distinctness of features (Deequ);
  • test that feature distributions match your expectations (TFX data validation or Deequ);
  • test the relationship between each feature and the label, and the pairwise correlations between individual signals (Deequ);
  • test the cost of each feature (custom tests);
  • test that personally identifiable information is not leaking into features (custom tests).

When a feature store is available, the output of feature pipelines is cached feature data, stored in the feature store. Ideally, the destination data sink will have support for versioned data, such as in Apache Hudi in Hopsworks Feature Store. In Hopsworks, feature pipelines upsert (insert or update) data into existing feature groups, where a  feature group is a set of features computed together (typically because they come from the same backend system and are related by some entity or key). Every time a feature pipeline runs for a feature group, it creates a new commit in the sink Hudi dataset. This way, we can track and query different commits to feature groups in the Feature Store, and monitor changes to statistics of ingested data over time.

Model Training Pipelines start at the Feature Store

Model training pipelines belong to the MLOps paradigm, where versioned features are read from Apache Hudi in the Hopsworks Feature Store to create train/test data that is used to train models that are then deployed and monitored in production. Provenance of ML artifacts and executions are stored in the Metadata store in Hopsworks, and ML pipelines are orchestrated by Hopsworks.


Model training with a feature store typically involves at least three stages (or programs) in a workflow:

  1. select the features to include, the file format, and the file system (or object store) for the train/test dataset that will be created from features in the feature store. Note that for Hopsworks Feature store, a timestamp (corresponding to Hudi commit-ids), can also be supplied to reproduce a train/test dataset exactly as it was at a point of time in the past;
  2. train the model using the training dataset created in step 1 (training can be further decomposed into the following steps: hyperparameter optimization, ablation study, and model training);
  3. validate the model using automated tests and deploy it to a model registry for batch applications and/or an online model server for online applications.

In the Hopsworks platform, these three steps would typically be python programs or Jupyter notebooks and they are executed as part of an Airflow DAG (directed acyclic graph). That is, Airflow orchestrates the execution of the pipeline. Airflow enables DAGs to be scheduled periodically, but it can also be configured to run workflows when new feature data arrives in the feature store or when Git commits are pushed for model training pipeline code.

The type of automated tests that are performed during the model validation step include:

  • test how the model performs on different data slices to check for bias,
  • test the robustness of the model to out-of-distribution feature vectors.

Hopsworks supports Google’s What-If Tool for model analysis using Jupyter notebooks. It is useful to investigate counterfactuals (compare a datapoint to the most similar point where your model predicts a different result), making it easier to develop model validation tests that can subsequently in production pipelines.

Google’s What-If Tool can be used to analyze a model, asking counterfactuals and testing for bias on different slices of data. Knowledge discovered here can be transferred into model validation tests.

Monitoring Online Models

When a model is deployed to a model server for use by online applications, we need to monitor model its performance and its input features. We need to identify if the input features in production are statistically different from the input features used to train the model. In practice, we can do this by comparing statistics computed over the training data (accessible through feature store API calls) with statistics collected from input features at runtime. In Hopsworks, we log all prediction requests sent to models to a topic in Kafka. You can then write a Spark Streaming or Flink application that processes the prediction requests in Kafka, computing statistics in time-based windows and comparing those statistics with the training data statistics from the Feature Store. If the time-based windows statistics for a given feature diverge significantly from the training statistics, your streaming application can notify ML engineers that input features are not as expected. Your streaming application will typically also compute business-level KPIs for the models and provide a UI to enable operators to visualize model performance. More concretely, the error signals to look for in online monitoring include:

Concept drift

In a model, the target variable is what the model is trying to predict. It could be, for example, if a financial transaction is suspected as fraud or not fraud. When the statistical properties of a model change over time in an unexpected way (for example, a new fraud scheme appears that increases the overall amount of fraud), we have concept drift.

Data drift

If, however, the statistical properties of the input features change over time in an unexpected way, it will negatively impact the model’s performance. For example, if users execute many more financial transactions than normal due to it being a holiday period, but the model was not trained to handle holiday periods, then the model performance may degrade (either missing fraud or flagging up too many transactions as suspicious).

Feature pipeline changes

If there are changes in how a feature is computed in a feature pipeline, and an online model enriches its feature vector with that feature data from the online feature store, then this can negatively impact the model’s performance. For example, if you change how to compute the number of transactions a user carries out, it may negatively impact the model’s performance.

Summary

We have now covered an End-to-End ML pipeline with a Feature Store based on MLOps principles. Updates to pipeline code or newly arrived data enable changes to be continuously tested and models to be continually updated and deployed in production. We showed how the Feature Store enables monolithic end-to-end ML pipelines to be decomposed into feature pipelines and model training pipelines. We also discussed how data versioning is possible with modern data lake frameworks such as Apache Hudi. In the next blog, we will cover ML pipelines and reproducible experiments Hopsworks in more detail, and how to easily move pipelines from development to production environments. We will also show how to develop both feature pipelines and model training pipelines using Airflow. 

References

AI & Deep Learning for Fraud & AML

>
ML Best Practices
>
11/27/2019
>
Jim Dowling

tl;dr Deep learning is now the state-of-the-art technique for identifying financial transactions suspected of money laundering. It delivers a lower number of false positives and with higher accuracy than traditional rule-based approaches. However, even though labelled datasets of historical transactions are available at many financial institutions, supervised machine learning is not a viable approach due to the massive imbalance between the number of “good” and “bad” transactions. The answer is to go unsupervised and welcome with open arms GANs and graph embeddings.

Financial institutions invest huge amounts of resources in both identifying and preventing money laundering and fraud. Many companies have systems that automatically flag financial transactions as ‘suspect’ using a database of static rules that generate alerts if they match for a transaction, see Figure below. Flagged transactions can be either withheld and/or investigated by human investigators. As anti-money laundering (AML) is a pattern matching problem, and many institutions have terabytes (TBs) of labelled data for historical transactions (transactions are labelled as either ‘good’ or ‘bad’), many banks and companies are investigating deep learning as a potential technology for classifying transactions as "good" or "bad", see Figure below.  However, supervised machine learning is not a viable approach due to the typical imbalance between the number of “good” and “bad” transactions, where you may only have 1 “bad” transaction for every million transaction or more.

What are the alternatives? Well, in 2019, unsupervised learning and self-supervised learning have taken deep learning by storm, with self-supervised solutions now the state-of-the-art systems for both Natural Language Processing (NLP), RobertA by Facebook, and image classification for ImageNet, Noisey Student and AdvProp both by Quoc Le’s team at Google. Self-supervised learning is autonomous supervised learning, that doesn’t always require labelled data, as self-supervised learning systems extract and use some form of feature or metadata as a supervisory signal. In contrast, unsupervised learning has a long history of being used for anomaly detection, and in this blog we describe how we have worked on a AML solution based on Deep Learning, and how we moved through unsupervised, self-supervised, to semi-supervised to arrive at our method of choice -  Generative Adversarial Networks (GANs).

Rules-based AML versus Deep-Learning AML using models.


The Class Imbalance Problem for AML

In the figure above, we can see the huge data imbalance between good and bad transactions found in a typical AML transaction dataset. A supervised deep learning system could be used to train a binary classifier that predicts with 99.9999% accuracy whether a transaction was involved in money laundering or not. Of course, it would always predict ‘good’, and only get wrong the very small number of ‘bad’ transactions! This classifier would, of course, be useless. The problem we are trying to solve is to predict correctly when ‘bad’ transactions are bad, and minimize the number of ‘good’ transactions that we predict as bad. In ML terminology, we say we want to maximize true positives (miss no bad transactions), and minimize false negatives (good transactions predicted as bad), see the confusion matrix below. False negatives get you in trouble with the regulator and authorities, false positives create unnecessary work and cost for your company.

Confusion matrix of our Binary Fraud Classifier with all possible predictions and their consequences. You could use a variant of the F1 score to evaluate models (precision and recall should not be weighted equally).

Our solution will have to address the class imbalance problem, as  you may have thousands or millions or good transactions for every one transaction that is known to be bad. Supervised machine learning algorithms, including deep learning, work best when the number of samples in each class are roughly equal. This is a consequence of the fact that most supervised ML algorithms maximize accuracy and reduce error. Unfortunately, over-sampling the minority class, undersampling the majority class, synthetic sampling, and cost-function approaches will not help us solve the class imbalance problem, due to the sheer scale of typical data imbalance. Other classical techniques such as One-Class support vector machines or Kernel Density Estimators (KDE) will not scale enough as they “require manual feature engineering to be effective on high-dimensional data and are limited in their scalability to large datasets”.

AML as Anomaly Detection

Anomaly detection follows quite naturally from a good unsupervised model

- Alex Graves (Deep Mind) at NeurIPS 2018.


This figure from Ruff et al visualizes different possible approaches to Anomaly detection from unsupervised to supervised to semi-supervised.

As AML problems are not easily amenable to supervised machine learning, much ongoing research and development concentrates on treating AML as an Anomaly detection (AD) problem. That is, how can we automatically identify suspected money-laundering transactions as anomalies? The domain of AML is also challenging due to its non-stationarity - new money transfer schemes are constantly being introduced into the market, whole countries may join or leave common payment areas or currencies, and, of course, new money launderingschemes are constantly appearing. AML is a living organism and any techniques developed need to be able to be quickly updated in response to changes in its environment. 

Traditional machine learning approaches to solving anomaly detection have involved using unsupervised learning approaches, see Figure above. An unsupervised approach to AML would be to find a “compact” description of the “good” class, with “bad” transactions being anomalies (not part of the “good” class). Examples of unsupervised approaches to AD include k-means clustering and principal component analysis (PCA). 

However, AML is not a classical use-case for anomaly detection techniques as we typically have labelled datasets - financial institutions typically know which transactions were “good” and which transactions were “bad”. We should, therefore, exploit that labelled data when training models. That is, we have semi-supervised learning - it is not fully unsupervised learning as there is some labeled training data available.

What are useful Features for AML and how do we manage them?

Before we get into semi-supervised ML, we need to discuss the features that can be used to train your AML model. Firstly, you should only train your AML model on features that will be accessible in your production AML binary classifier. If you access to features while in development that will never be usable in production, don’t even use them out of curiosity. Don’t kill the cat. If you are designing an AML classifier for an online application, then it is important that you know that you can access the features used to train your model at very low latency in production. Typically, this requires an online feature store, such as the Hopsworks Feature Store (see End-to-End figure later). If you are training a classifier for an offline batch application, then you typically do not need to worry about low latency access to feature data, as you will probably have feature engineering code in the batch application (or in the Feature Store) that pre-computes the features for your classifier.

The most obvious features that are available for AML are the features that arrive with the transaction itself: sending customer, receiving customer, sending/receiving bank, the amount of money being transferred, the date, etc). Other examples of useful features include how many transactions the sending/receiving account has executed in the last day, week, or month; whether the transaction date/time is “special” (holiday, weekend, etc), the graph of customers connected to the sender/receiver accounts over different time windows (last hour/day/week/month/year); credit scores for accounts, and so on. Typically, these features are not available in your online application and you do not want to rewrite your feature engineering code in your online AML classifier application, so you use the Feature Store to lookup those features at runtime when the transaction arrives. When you have queried those features from the Online Feature Store, you can join them (in the correct order) with the transaction-supplied features and then send  the complete feature vector to the network-hosted model for a prediction of whether the transaction is “good” or “bad”.

Image from Anomaly Detection at Spark/AI EU Summit 2019.

Semi-Supervised Learning for Anomaly detection

Semi-supervised learning is a class of machine learning tasks and techniques that also make use of unlabeled data for training – typically a small amount of labeled data with a large amount of unlabeled data.

- Wikipedia

In semi-supervised learning, we use many of the techniques from unsupervised learning - you train a good model of the data, then you can see how likely something is under that model. In AML, we could use, for example, one-class novelty detection, to train one GAN on the large class of ‘good’ transactions as the novelty detector, and another GAN that supports it by enhancing the inlier samples and distorting the outliers.

Generative Adversarial Networks (GANs) 

Generative models involve building a model of the world that enable you to act under uncertainty. More precisely, they are able to model complex and high dimensional distributions of real-world data.  We use GANs for anomaly detection by modeling the normal behavior of some training data using adversarial training and then detecting anomalies using an anomaly score. To evaluate our performance, we use Precision, Recall and F1-Score metrics.


Training a GAN with Transactions classified as ‘good’ and ‘bad’.

GANs are generally very difficult to train - there is a significant risk of mode-collapse - where the generator and discriminator do not learn from one another. You will need to carefully consider the architecture you use. To quote Marc Aurelio Ranzato (Facebook) at NeurIPS 2018 “the Convolutional Neural Network architecture is more important than how you train (GANs)”. You will need to dedicate time and GPU resources to hyperparameter tuning due to the sensitivity of GANs and the risk of mode-collapse. 

Hyperparameter Tuning for Training GANs in Hopsworks and Maggy

The Maggy framework on Hopsworks uses PySpark to distribute hyperparameter tuning tasks to GPUs running TensorFlow/Keras/PyTorch ocde. Maggy supports state-of-the-art asynchronous directed search, such as ASHA, as well as custom optimizers.

You need to have the tools and platform to do hyperparameter sweeps at scale with GANs, due to their sensitivity. Hopsworks uses PySpark and the Maggy framework to distribute hyperparameter trials to GPUs running TensorFlow/Keras/PyTorch code. Hopsworks/Maggy supports state-of-the-art asynchronous directed hyperparameter search, which can provide up to 80% more efficient use of GPUs when tuning sensitive GANs, compared to other hyperparameter tuning approaches using PySpark. Read our Maggy blog post to find out more.

Similar to MLFlow, Hopsworks also provides an experiment service where you can manage all hyperparameter trials as experiments. Experiments supports Tensorboard, tabular results for trials, and any images generated during training. This makes training runs and models easier to analyse and reproduce.

Hopsworks manages your hyperparameter tuning experiments, making them easy to analyse and reproduce.

Putting your GAN in production

If you have now successfully trained a GAN that performs well on your test dataset (keep the last year/months of your transaction data as holdout data), you can save that the discriminator as a model for use on new transactions. Typically, you want further model validation - the what-if tool is a good way to evaluate black-box models in Jupyter notebooks on on Hopsworks - before you deploy your model from your model registry into production for either online applications or batch applications.

Save just the Discriminator from your original GAN to classify new transactions as either ‘good’ or ‘bad’.


Will GANs increase True Fraud Detection and reduce False Fraud Detection?

While there is no guarantee that GANs will magically make money laundering disappear overnight, they appear to be a much more powerful tool for correctly identifying fraudulent transactions and for minimizing the number of false alerts that need to be manually investigated. In China, GANs are already being used in production at two banks: “Two commercial banks have reduced losses of about 10 million RMB in twelve weeks and significantly improved their business reputation”, GAN-based telecom fraud detection at the receiving bank. Another example shown below was recently presented at the Spark/AI Summit EU 2019.

There is also the issue of explainability (particularly in the EU), where it is unclear how regulators will handle the use of black-box techniques in making fraud classification decisions. The first step to take is to augment existing systems with extra input from a GAN-based system, and evaluate for a period of time before discussing how to proceed with the regulators.

Expected results from using GANs (Anomaly Detection at Spark/AI EU Summit 2019)

End-to-End Workflow on Hopsworks for GANs

Hopsworks enables automated end-to-end ML workflows for taking historical financial transactions from production backend data systems (data warehouses, data lakes, Kafka), feature engineering with Spark/PySpark, feature storage in the Feature Store. For orchestrating such feature pipelines, Hopsworks includes Airflow as a service. Airflow is also used by Data Scientists to orchestrate Jupyter notebooks  for (1) selecting features to create train/test data, (2) finding good hyperparameters, (3) training a model, (4) evaluating the model before deploying it into production. Finally, online AML detection applications need to enrich their feature vectors with transaction window counts and other features not available locally. The Hopsworks’ online Feature Store is a low latency highly available database, based on MySQL Cluster, that enables online AML applications to access their pre-engineered feature data with very low latency (<10 ms). The Hopsworks Feature store is modular with a REST API, so all steps 1-5 in the above figure can either be run on the Hopsworks platform itself or on external platforms. You decide.

Summary

Deep learning is revolutionizing how we identify money laundering, and it has unique challenges related to huge data volumes and massive data imbalances. At Logical Clocks, we have worked on a semi-supervised approach based on GANs, enabled by Hopsworks that scales to process huge datasets, and graph embeddings at scale, using Spark. The result is new state-of-the-art results for money laundering. The next phase is to put this model into production.

References

Guide to File Formats for Machine Learning: Columnar, Training, and Inferencing

>
ML Best Practices
>
10/25/2019
>
Jim Dowling

TLDR; Most machine learning models are trained using data from files. This post is a guide to the popular file formats used in open source frameworks for machine learning in Python, including TensorFlow/Keras, PyTorch, Scikit-Learn, and PySpark. We will also describe how a Feature Store can make the Data Scientist’s life easier by generating training/test data in a file format of choice on a file system of choice.

A file format defines the structure and encoding of the data stored in it and it is typically identified by its file extension – for example, a filename ending in .txt indicates the file is a text file. However, even though files are used to store much of the world’s data, most of that data is not in a format that can be used directly to train ML models. 

This post is mostly concerned with file formats for structured data and we will discuss how the Hopsworks Feature Store enables the easy creation of training data in popular file formats for ML, such as .tfrecords, .csv, .npy, and .petastorm, as well as the file formats used to store models, such as .pb and .pkl . We will not cover other well-known file formats, such as image file formats (e.g., .png, .jpeg), video file formats (e.g.,.mp4, .mkv, etc), archive file formats (e.g.,.zip, .gz, .tar, .bzip2), document file formats (e.g., .docx, .pdf, .txt) or web file formats (e.g., .html).

Data sources

Files are stored on file systems, and, in the cloud, increasingly on object stores. File systems come in different flavors. A local file system (POSIX) usually stores its data on one or more disks (magnetic (hard drives), SSD (solid-state storage), or NVMe drives). Such file systems can be accessible over the network (e.g., NFS), or if large amounts of capacity are needed distributed file systems can be used that scale to store hundreds of Petabytes of data across thousands of servers, like HDFS, HopsFS, and CephFS. In the cloud, object stores are the cheapest file storage option where the files can be read/written with reasonable performance by applications. 

File formats are linked to their file system. For example, if you ran a TensorFlow/Pytorch application to train ImageNet on images stored in S3 (object store) on a Nvidia V100 GPU, you might be able to process 100 images/sec, as that is what a single client can read from S3. However, the V100 can potentially process 1000 images/sec – you are bottlenecking on the I/O to the filesystem. New file formats appear to get around just this type of problem. For example, the petastorm file format was developed by Uber to store PBs of data for self-driving vehicles that is stored on HDFS. petastorm files are large, splittable, and compressed with readers for TensorFlow and PyTorch enabling them to feed lots of V100 in parallel, ensuring they do not bottleneck on file I/O – as they would have if they worked with traditional multimedia file formats on slower network file systems. An alternative, but much more expensive solution, would be to work with the traditional file formats on storage devices made up of hundreds or thousands of NVMe disks.

Machine learning frameworks want to consume training data as a sequence of samples, so file formats for training ML models should have easily consumable layouts with no impedance mismatch with the storage platform or the language used to read/write the files. Additionally, distributed training (training ML models on many GPUs at the same time to make training go faster)  requires files to be splittable and accessible over a distributed file system or object store, so that different GPUs can read different shards (partitions) of the data in parallel from different servers.

Big Data – from Binary to Structured Data

Machine learning has produced technological breakthroughs in a number of domains, such as image classification, voice recognition, voice synthesis, natural language processing, and neural machine translation. The types of file formats used in these systems are typically compressed binary or plaintext data formats. 

Machine learning has extended its impact beyond these first domains and is now being applied to Enterprise data to solve business problems that can be modelled as supervised machine learning problems.  However, much of Enterprise data is available as structured data – originating from Data Warehouses, databases, document repositories, and Data Lakes. Structured Enterprise data can also be used in a variety of text-based and binary file formats. In general, if you have large amounts of data, then you should consider using a binary file format (instead of a traditional text-based format like CSV), as binary file formats may significantly improve the throughput of your import pipeline, helping reduce model training time. Binary file formats require less space on disk and take less time to read from disk. Binary file formats are significant as the other main trend in machine learning is the increasing use of deep learning. Deep learning is ravenous for data - it just keeps getting better the more data it is trained on – and with increased volumes of data, efficient, compressed file formats have a role to play in the deep learning wars to come.


File Formats in Machine Learning Frameworks

Older file formats (e.g., .csv) may not be compressed, may not be splittable (e.g., HDF5 and netCDF) so that they can work seamlessly when training with many workers in parallel, and may make it difficult to combine multiple datasets. However, if the framework you use for machine learning, such as TensorFlow, PyTorch, ScikitLearn, does not provide data import and preprocessing functionality that is integrated seamlessly with those file format features and data sources, then you may not get the benefit of  better file formats. For example, the TFRecord file format was designed for TensorFlow and has full support in tf.data, while PyTorch’s DataLoader was designed, first-and-foremost, around Numpy files and then extended to other file formats. Similarly, ScikitLearn was first designed to work with CSV and Pandas and then extended to work with other file formats. So, while it is clear you should be using more modern file formats for machine learning, you still have the challenge of converting your data to this format, and there may be limited documentation available on how to do that. Fortunately, the Feature Store (introduced later) enables you to easily convert your data into the most important file formats for machine learning with no effort. 

File Formats

This section introduces the most widely used file formats for ML, grouping them into known classes of well-known file format types: columnar, tabular, nested, array-based, and hierarchical. There are also new file formats that have been designed for model serving, that are described below.

Columnar Data File Formats

Apart from possibly Excel, the most common location for Enterprise data is the Data Warehouse or Data Lake. This data is typically accessed using Structured Query Language (SQL) and the data may be stored in either row-oriented format (typically OLTP databases that provide low latency access, high write throughput) or, more commonly, column-oriented format (OLAP/columnar databases that can scale from TBs to PBs and provide faster queries and aggregations). In Data Lakes, structured data may be stored as files (.parquet and .orc) that is still accessible via SQL using scalable SQL engines such as SparkSQL, Hive, and Presto. These columnar data files and backend databases are often the source of training data for Enterprise ML models, and feature engineering on them often requires data-parallel processing frameworks (Spark, Beam, and Flink) to enable feature engineering to scale-out over many servers. This scalable processing is enabled because the path to data in parquet/orc/petastorm data is actually a directory – not a file. The directory contains many files that can be processed in parallel. So, typically you supply the base path (directory) when you want to read data in a columnar file format, and the processing engine figures out which file(s) to read. If you only want to read a subset of columns from a table, files containing excluded columns will not be read from disk, and if you perform a range scan, statistics (max/min values for columns in files) in files enable data skipping - skip this file if your query asks for columns with values outside the range of values stored in the file.

While both parquet and orc have similar properties, Petastorm is uniquely designed to support ML data - it is the only columnar file format that natively supports multi-dimensional data. Columnar file formats typically assume 2-dimensional relational data, but tensors can have much higher dimensionality than 1-d vector or 2-d relational data sources.  Petastorm provides multi-dimensional data capability by extending Parquet with its own Unischema designed explicitly for machine learning use-cases. The Unischema enables petastorm files to store multi-dimensional tensors natively in Parquet. The unischema is also compatible with PyTorch and TensorFlow so you can convert the petastorm schema directly to TensorFlow schema or PyTorch schema, which enables native TensorFlow/PyTorch readers for petastorm.

Columnar file formats are designed for use on distributed file systems (HDFS, HopsFS) and object stores (S3, GCS, ADL) where workers can read the different files in parallel.

  • File formats: .parquet, .orc, .petastorm.
  • Feature Engineering: PySpark, Beam, Flink.
  • Training:.petastorm has native readers in TensorFlow and PyTorch;
                    .orc, .parquet have native readers in Spark;
                    JDBC/Hive sources supported by Spark

Tabular Text-based File Formats

Tabular data for machine learning is typically found is .csv files. Csv files are text-based files containing comma separated values (csv). Csv files are popular for ML as they are easy to view/debug and easy to read/write from programs (no compression/indexing). However, they have no support for column types, there is no distinction between text and numeric columns, and they have poor performance which is more noticeable when the volume of data grows to GBs or more - they are not splittable, they do not have indexes, and they do not support column filtering. .csv files can be compressed using GZIP to save space. Other popular tabular formats that are not typically used in ML are spreadsheet file formats (e.g.,.xlsx and .xls) and unstructured text formats (.txt).

  • File formats: .csv, .xslx
  • Feature Engineering: Pandas, Scikit-Learn, PySpark, Beam, and lots more
  • Training: .csv has native readers in TensorFlow, PyTorch, Scikit-Learn, Spark

Nested File Formats

Nested file formats store their records (entries) in an n-level hierarchical format and have a schema to describe their structure. A hierarchical format means that a record could have one parent (or be the root, with no parent) but it could also have children. Nested file format schemas are able to be extended (add attributes while maintaining backwards compatibility) and the order of attributes is typically not significant.  The .json and .xml file formats are the best known plain text nested file formats, while binary nested file formats include protocol buffers (.pb) and avro (.avro). 

TFRecords is a sequence of binary records, typically a protobuf with either a schema “Example” or “SequenceExample”. The developer decides whether to store samples as either an “Example” or a “SequenceExample”. Choose a SequenceExample if your features are lists of identically typed data.  A TFRecords file can be a directory (containing lots of .tfrecords files), and it supports compression with Gzip. More details on how to use TFRecords can be found in the official documentation, and this good blog post.

  • File formats: .tfrecords, .json, .xml, .avro
  • Feature Engineering: Pandas, Scikit-Learn, PySpark, Beam, and lots more
  • Training: .tfrecords is the native file format for TensorFlow;
                    .json has native readers in TensorFlow, PyTorch, Scikit-Learn, Spark
                    .avro files can be used as training data in TensorFlow with LinkedIn’s library

Array-Based Formats

Numpy is an abbreviation of Numerical Python and it is a massively popular library for scientific computing and data analysis. Through support for vectorization, Numpy (.npy) is also a high performance file format. A Numpy array is a densely packed array with elements of the same type. The file format, .npy, is a binary file format that stores a single NumPy array (including nested record arrays and object arrays).

  • File formats: .npy
  • Feature Engineering: PyTorch, Numpy, Scikit-Learn, TensorFlow;
  • Training: .npy has native readers in PyTorch, TensorFlow, Scikit-Learn.

Hierarchical Data Formats 

HDF5 (.h5 or .hdf5) and NetCDF (.nc) are popular hierarchical data file formats (HDF) that are designed to support large, heterogeneous, and complex datasets. In particular, HDF formats are suitable for high dimensional data that does not map well to columnar formats like parquet (although petastorm is both columnar and supports high dimensional data). Lots of medical device data is stored in HDF files or related file formats, such as BAM, VCF for genomic data. Internally, HDF5 and NetCDF store data in a compressed layout. NetCDF is popular in domains such as climate science and astronomy. HDF5 is popular in domains such as GIS systems.  They are not splittable, so not suitable for distributed processing (with engines like Spark). 

  • File formats: .h5 (HDF5), .nc (NetCDF)
  • Feature Engineering: Pandas, Dask, XArray;
  • Training: .h5 has no native readers in TensorFlow or PyTorch;
                    .nc has no native readers we are aware of.

Model File Formats

In supervised machine learning, the artefact created after training that is used to make predictions on new data is called a model. For example, after training a deep neural network (DNN), the trained model is basically a file containing the layers and weights in the DNN. Often, models can be saved in a file that can potentially be compressed, so typically model files have a binary file format. TensorFlow saves models as protocol buffer files, with a .pb file extension. Keras saves models natively as .h5 file. Scikit-Learn saves models as pickled python objects, with a .pkl file extension. An older format for model serving based on XML, predictive model markup language (.pmml), is still usable on some frameworks, such as Scikit-Learn.

Model files are used to make predictions on new data by either (1) batch applications that typically read the model in as a file or (2) a real-time model serving server (such as TensorFlow Serving Server) that reads the model into memory, may even have multiple versions of a model in memory for AB testing. 

Other model file formats that are used include SparkML models that can be saved in MLeap file format and served in real-time using a MLleap model server (files are packaged in .zip format). Apple developed the .mlmodel file format to store models embedded in iOS applications as part of its Core ML framework (which has superior support for ObjectiveC and Swift languages). Applications trained in TensorFlow, Scikit-Learn, and other frameworks need to convert their model files to the .mlmodel file format for use on iOS, with tools like, coremltools and Tensorflow converter being available to help file format conversion. ONNX is a ML framework independent file format, supported by Microsoft, Facebook, and Amazon. In theory, any ML framework should be able to export its models in .onnx file format, so it offers great promise in unifying model serving across the different frameworks. However, as of late 2019, ONNX does not support all operations for the most popular ML frameworks (TensorFlow, PyTorch, Scikit-Learn), so ONNX is not yet practical for those frameworks. In PyTorch, the recommended way to serve models is to use Torch Script to trace and save a model as a .pt file and serve it from a C++ application. 

One final file format to mention here is YAML that is used to package models as part of the MLFlow framework for ML pipelines on Spark. MLFlow stores a YAML file that describes the files it packages for model serving, so that deployment tools can understand the model file format and know what files to deploy.

  • File formats: .pb, .onnx, .pkl, .mlmodel, .zip, .pmml, .pt
  • Inference: .pb files are served by TensorFlowServing Server;
                      .onnx files are served by Microsoft’s commercial model serving platorm;
                      .pkl files are served for Scikit-Learn models, often on Flask servers;
                      .mlmodel files are served by iOS platforms;
                      .zip files are used to package up MLeap files that are served on the MLeap runtime;
                      .pt files are use to package PyTorch models that can be served inside C++ applications.

ML Data File Formats Summary

A summary of the different file formats for the different ML pipeline stages (feature engineering / dataprep, training, and serving) is shown in the tables below:


And the winner is...

The most feature complete and language independent and scalable of the file formats for training data for deep learning is petastorm. Not only does it support high-dimensional data and have native readers in TensorFlow and PyTorch, but it also scales for parallel workers, but it also supports push-down index scans (only read those columns from disk that you request and even skip files where the values in that file are outside the range of values requested) and scales to store many TBs of data. 

For model serving, we cannot really find any file format superior to the others. The easiest model serving solution to deploy and operate is protocol buffers and TensorFlow serving server. While both ONNX and Torch Script have potential, the open-source model serving servers are not there yet for them. Similarly, serving .pkl files on a flask server, while used heavily in production, still requires a lot more work on behalf of the ML operations engineer - you have to write your own Python serving program, manage security, load balancing, and much more.

Hopsworks and ML File Formats

In this section, we discuss support for converting ML feature data into the file format of your choice using the Hopsworks Feature Store, and the support for the main ML frameworks file formats in Hopsworks using the HopsFS file system.

Feature Store for File Format Conversion 

Hopsworks Feature Store can create train/test data in your file format of choice, on your data store of choice (S3, HDFS, HopsFS).


The Hopsworks Feature Store is a warehouse for storing reusable features for ML. It is designed to act as a cache for feature data. Feature data is the output of feature engineering, which transforms raw data from backend systems into features that can be used directly to train ML models. Data scientists interact with the feature store by browsing the available features, and, then, on finding the features they need to build their predictive model, they generate train/test data in the file format of their choice, on the storage platform of their choice. In the example shown in the Figure above, we can see we have 3 features and a target variable from the Titanic dataset, and we can then pick a file format (from the 10+ available file formats) and a target file system where the train/test data will be created. The Feature Store enables teams to easily collaborate on solving a problem by easily trying out solutions in different frameworks using different approaches (for example, deep learning on TensorFlow and PyTorch, and decision trees on Scikit-Learn) without the effort of converting the train/test data into the most efficient, easy-to-use file format for the respective framework.

File Formats Examples in Hopsworks 

Hopsworks provides HopsFS (a next-generation HDFS file system) as its default distributed file system. HopsFS is a great distributed file system for machine learning due to its high throughput and low latency as well as widespread native HDFS readers in popular frameworks for ML: Spark, TensorFlow, Pandas (Scikit-Learn), and PyTorch (through petastorm). 

Some examples of Notebooks that use different file formats with HopsFS in Hopsworks are included here, followed by specific examples from popular frameworks for ML in Python (PySpark, TensorFlow/Keras, PyTorch, Scikit-Learn):

In the next sections we give a brief overview of the recommended file formats for the major python ML frameworks: PySpark, TensorFlow/Keras, PyTorch, and Scikit-Learn, along with an example code snippet and a link to a Python notebook from Hopsworks.

PySpark

File formats: .csv, .parquet, .orc, .json, .avro, .petastorm

Data sources: local filesystem, HDFS , S3
Model serving  file formats: .zip (MLeap)

Columnar file formats work better with PySpark (.parquet, .orc, .petastorm) as they compress better, are splittable, and support reading selective reading of columns (only those columns specified will be read from files on disk). Avro files are frequently used when you need to write fast with PySpark, as they are row-oriented and splittable. PySpark can read files from the local filesystem, HDFS, and S3 data sources.

Open Example PySpark Notebook


Pandas/Scikit-Learn

File formats: .csv, .npy, .parquet, .h5, .json, .xlsx
Data sources: local filesystem, HDFS , S3
Model serving file formats: .pkl

Pandas can read files natively in .csv, .parquet, .hdf5, .json, .xlsx, and also from SQL sources. Pandas can read files from the local filesystem, HDFS, S3, http, and ftp data sources. In Hopsworks, you can read files in HopsFS using Panda’s native HDFS reader with a helper class:

Open Example Pandas Notebook

TensorFlow/Keras

File formats: .csv, .npy, .tfrecords, .petastorm

Data sources: local filesystem, HDFS , S3
Model serving file formats: .pb

The native file format for Tensorflow is .tfrecords, and if your Dataset is not huge and you do not need to read only a subset of columns from the dataset, then it is our recommended file format. If, however, you only want to read a subset of columns (a projection in database terminology) from the dataset, then .petastorm is the file format to use. TensorFlow can read files from the local filesystem, HDFS, and S3 data sources.

Open Example TensorFlow Notebook

PyTorch 

Training File formats: .csv, .npy, .petastorm

Data sources: local filesystem, HDFS ( petastorm), S3
Model serving file formats: .pt

PyTorch is tightly integrated with Numpy, and .npy is the native file format for PyTorch. However, np.load() does not work with HopsFS natively, so you have to use a wrapper function that we include in our library that first materializes the data from HopsFS to the local filesystem before reading it in as a numpy array.  If you have large datasets, we recommend using .petastorm, which works natively with PyTorch through its own reader.

Open Example PyTorch Notebook


Numpy/Scikit-Learn

File formats: .npy

Data sources: local filesystem
Model serving file formats: .pkl

The native data format for Scikit-Learn is numeric data, and is most commonly stored as numpy arrays or pandas DataFrames (convertible to numpy arrays). PyTorch also builds directly on numpy arrays. As such, .npy is a popular file format for training data with Scikit-Learn. In Hopsworks, we provide a numpy_helper to read .npy files from HopsFS.

Open Example Scikit-Learn Notebook

References

[hdf5] https://www.neonscience.org/about-hdf5

[numpy] https://towardsdatascience.com/why-you-should-start-using-npy-file-more-often-df2a13cc0161

[tftransform] https://www.tensorflow.org/tfx/transform/get_started

[featurestore] https://www.logicalclocks.com/featurestore 

[tfrecords] https://medium.com/mostly-ai/tensorflow-records-what-they-are-and-how-to-use-them-c46bc4bbb564 

[bigdataFileFormats] https://luminousmen.com/post/big-data-file-formats

[petastorm] https://qcon.ai/system/files/presentation-slides/yevgeni_-_petastorm_16th_apr_2019_.pdf

Hello Asynchronous Search for PySpark

>
ML Best Practices
>
10/14/2019
>
Moritz Meister

TLDR; Hopsworks uses PySpark to parallelize machine learning application across lots of containers, containing GPUs if you need them. PySpark's stage-based execution model works well with the state-of-the-art method for distributed training, Collective-AllReduce, but less well with hyperparameter optimization, where state-of-the-art methods involve asynchronous directed search. Previously, to avail of state-of-the-art hyperparameter optimization, researchers have used frameworks like Ray, but in this blog post, we introduce this capability to Spark with our new framework, Maggy, that provides asynchronous directed search on PySpark for blackbox hyperparameter optimization.

“Methods that scale with computation  are the future of AI” [1],
“The two (general purpose) methods that .. scale ...are search and learning.”
[2]

Prof Rich Sutton, Father of Reinforcement Learning in “The Bitter Lesson”

Spark/AI Summit  EU Video

Deep learning is a powerful supervised learning method for building predictive models when there are sufficient amounts of high-quality labelled training data available. When an organization chooses a platform for deep learning, it needs to cater for as many potential use cases as possible, and one use case that is appearing more and more frequently is the use of larger models with more training data [3]. Empirical studies in different domains such as neural machine translation and image classification have shown that predictive models have a logarithmic improvement in their accuracy for every order of magnitude more training data that becomes available. AutoML, the automating of building machine learning models, has been shown to outperform humans in designing model architectures, and the most accurate models for the ImageNet classification challenge have been designed by search algorithms. The only conclusion we can draw is that, long term, more data and more compute will yield better models, and the companies with better models will have a competitive advantage in their field of business.

Horizontally Scaling Machine Learning

In Figure 1, we can see the challenges in scaling out deep learning. In the InnerLoop, the current best practice for reducing the time required to train models by adding lots of GPUs is data-parallel, synchronous stochastic gradient descent. To find good hyperparameters (before we train the model with lots of GPUs), directed search algorithms (such as genetic algorithms, Bayesian optimization, HyperOpt, ASHA) are considered the state -of-the-art.

In Hopsworks, we use PySpark to scale out both the inner loop and the outer loop for Machine Learning, see Figure 1 (below). The inner loop is where we train models - scaling out here means adding more GPUs to make training go faster (data-parallel training). The outer loop is where we run many experiments to establish good hyperparameters for the model we are going to train. We typically run many experiments as we need to search for good hyperparameters - hyperparameters are not updated during training and hyperparameter space is typically not smooth, so gradient-based approaches often do not work - undirected and directed search typically work better.

Spark provides a bulk-synchronous parallel computation framework for distributed computing that maps well to the Inner Loop (synchronous stochastic Gradient Descent is the state of the art for distributed training), but Spark does not scale efficiently for directed search for hyperparameter tuning. To this end, we later introduce a new framework, Maggy, and describe Maggy’s new asynchronous task-based compute model for more efficient hyperparameter tuning. But most importantly, we show how, with just a few lines of code, you can update your single-host Python program to become a monster job that can have its hyperparameters tuned using hundreds of GPUs in parallel or be trained in parallel on hundreds of GPUs. We show how you can do it all in a Jupyter notebook - even though the GPUs will be in the cluster, and you will be able to view the logs from the parallel training tasks in real-time from the comfort of your Jupyter notebook cell.

Figure1: Scaling out Training ML/DL requires scaling out both the Inner Loop (dataparallel training with synchronous stochastic gradient descent) and the OuterLoop (hyperparameter search – directed or undirected). Scaling out the OuterLoop enables you to run lots of experiments in parallel to find good models with good hyperparameters. Scaling out the Inner Loop will reduce the time required to train a model, with more gains seen when training large models on large volumes of data.

Horizontally Scalable ML Pipelines with Hopsworks

Hopsworks is an open-source platform for the development and operation of data-intensive AI applications. Hopsworks leverages PySpark to provide cluster support for Python. With PySpark, a TensorFlow/Keras/PyTorch program can be made to run hyperparameter optimization or distributed training with up to hundreds of GPUs in the cluster, with a few lines of code changes. As can be seen in  see Figure 2, by parallelizing ML/DL across many GPUs, we can both speed up our ML pipeline as well as make Data Scientists massively more productive.

Figure 2: Hopsworks supports building horizontally scalable ML pipelines. Machine Learning stages can be scaled out for both ML Experiments and Training (Data Parallel Training), massively increasing Data Scientist productivity.

Synchronous Hyperparameter Tuning on PySpark

Hopsworks has supported hyperparameter tuning using PySpark since version 0.4 (released in Oct’18). In Figure 3, we can see how Executors in PySpark (workers) run hyperparameter trials in parallel, where each Executor may also have one or more GPUs to run the training function. In hyperparameter tuning, a trial is an experiment (a training run) with a given set of hyperparameters that returns its result as a metric. In synchronous hyperparameter tuning (see Figure 4), the results of trials(metrics) are written to HopsFS, where the Driver reads the results and can then issue new jobs with new trials as Spark tasks to Executors, iterating until hyperparameter optimization is finished.

Figure 3: In the example GridSearch code shown on the right, six trials are run with all different possible combinations of learning_rate (lr) and dropout. Executors will run these trials in parallel,so if you run this Gridsearch code with 6 Executors, it will expect to complete6 times faster than running the trials sequentially. HopsFS is used to store results of the trials, logs, any models trained, and any visualization data forTensorBoard.

While the synchronous approach works well for undirected search algorithms, such as grid search and random search, it is less efficient for directed search algorithms (population-based methods, Bayesian optimization, etc). In Figure 4, we can see 3 Spark stages (i.e., 3 barriers) with N Spark tasks each, being executed on separate Spark Executors. In the bulk synchronous execution model of Spark, once a job is started and Spark creates the directed acyclic graph of tasks and stages to be executed, there is no possibility to dynamically stop, add or remove tasks from stages. Furthermore, a new stage can only begin once all tasks in the previous stage have finished. Since we evaluate one trials per task, this introduces inefficiencies -early-stopping will not free up the Executor to take new trials, it can only receive a new trial at the start of the next stage. If any executor is slow(straggler) or fails, it will slow down all other Executors. Trials will naturally have varying training times due to the differing hyperparameter settings. Together with the stage based execution, this is suboptimal for directed search algorithms such as BayesianOptimization. A Bayesian Optimizer is able to incorporate the feedback in terms of the final performance metric of a trial as soon as it finishes to produce better new samples. However, the meta-level Bayesian Optimizer, will only be updated with all the metrics of a stage once it finishes. Therefore, we would like to update our knowledge as soon as a trial finishes.

Figure 4: A Spark job consisting of 3stages with N tasks in each stage. Each task is a trial, evaluating some combination of hyperparameters. The trials for a stage are only finished whenall the tasks in a stage have completed. That is, the driver reads the results of all trials from the shared filesystem, and then can issue new trials as tasks to Executors. If a trial is performing poorly, early-stopping will not help as the Executor will have to wait anyway until the end of the stage before it can receive a new trial.

Early-Stopping with Wasted GPUs

In hyperparameter tuning, some trials will perform poorly and early during the trial’s execution it will be clear that the trial can be stopped, because its performance is very poor relative to the other executing trials. In experiments with RESNET-50 on Hyperband, they saw savings of up to 96.5% in cost with early stopping. However, to get those savings early-stopping requires sharing the current performance of trials between Executors, to know if a trial’s relative performance is poor. In Figure 5, we introduce early-stopping to Spark - the red arrows indicate trials that are stopped early. Here, we can see that there is a significant amount of wasted compute (GPUs), where the Executor idles until the end of the stage after the trial has been stopped. In this example, early-stopping decisions are not optimal as they are taken locally by the Executor – the Executor has no knowledge of the performance of other Executor trials. In Spark, tasks are independent units of work, without communication between them. One could potentially free up resources by blacklisting the idle executor to make the resources available to other jobs, but on a multi-tenant platform it will be hard to reacquire those resources.

Figure 5: Hyperparameter Search with early-stopping leads to lots of wasted GPU cycles. Executors in PySpark have to wait until the next stage (Barrier) is reached before a new Trial can be executed. For early-stopping to work well, Executors should continually share the current performance of their trial, so the worst relative performers can be stopped early.

Asynchronous Hyperparameter Search with Maggy

Maggy is a framework for asynchronous trials and early-stopping with global knowledge, guided by an Optimizer. Developers can use an existing Optimizer, such as asynchronous successive halving (ASHA), or provide their own one. The basic approach we followed, see Figure 6, was to add support for the Driver and Executors to communicate via RPCs. The Optimizer that guides hyperparameter search is located on the Driver and it assigns trials to Executors. Executors periodically send back to the Driver the current performance of their trial, and the Optimizer can decide to early-stop its ongoing trial, followed by sending the Executor with a new trial. Because of the impedance mismatch between trials and the stage-/task-based execution model of Spark, we are blocking Executors with long running tasks to run multiple trials per task. In this way, Executors are always kept busy running trials, and global information needed for efficient early-stopping is aggregated in the Optimizer.

Figure 6 Directed Asynchronous Search using Maggy. Executors run a single long-running task and receive commands from the Driver (Optimizer) for trials to execute. Executors also periodically send metrics to the Driver to enable the Optimizer to take global early-stopping decisions.

Maggy, provides two high-level APIs: one for black-box optimization experiments (hyperparameter tuning) and one for parallel Ablation studies. As Maggy has a general-purpose RPC framework for collecting events from Executors at the Driver, we also collect logs from training generated by Executors, so that those logs can subsequently be displayed in real-time in a Jupyter notebook. That way, users can debug and follow hyperparameter tuning directly from their notebook.

Distributed Training with CollectiveAllReduce

Once good hyperparameters and a good model architecture have been designed, a model can be trained on the full dataset. If training is slow, it can be speeded up by adding more GPUs to train in parallel, in what is known as data-parallel training, where each Worker(Executor) trains on different shards of the training data. This type of distributed training benefits hugely from having a distributed file system (HopsFS in Hopsworks), where Workers can read the same training data, and write to the same directories containing logs for all the workers, checkpoints for recovery if training crashes for some reason, TensorBoard logs, and any models that are produced at the end of training.

Synchronous stochastic Gradient Descent is the current state-of-the-art algorithm for the updating of weights in DeepLearning models, and it maps well to Spark’s stage-based execution model. CollectiveAllReduce is the current state-of-the-art implementation of Synchronous stochastic Gradient Descent, as it is bandwidth optimal (using both upload and download bandwidth for all Workers) compared to the Parameter Server model, which can be I/O bound at the Parameter Server(s).

In CollectiveAllReduce, within a stage, each worker will read its shard of the mini-batch, then send its Gradients (changes to its weights as a result of the learning algorithm) to its successor on the ring, while receiving Gradients from its predecessor on the ring in parallel. Assuming all Workers train on similar batch sizes per iteration and there are no stragglers, there will be excellent efficient in use of GPUs. The code changes required to make a TensorFlow/Keras program distributed are minimal, a scan be seen in Figure 7.

Figure 7 Distributed Training using Ring-AllReduce (CollectiveAllReduce). Hopsworks ensures that the shared TF_CONFIG environment variables, used to build the ring, are distributed to the Spark Executors (or Workers in TensorFlow terminology) by the Driver. The code snippet shown here remains unchanged if you run with 1 or 1000 workers. The train function will be run on the Workers, and each Worker will read its shard of the mini-batches from HopsFS during training.

Notebooks to Get Started

We have a number of example notebooks to help you get started with distributed deep learning on Hopsworks using Keras/TensorFlow:

For PyTorch, we have hyperparameter optimization examples:

Takeaway

With PySpark, we can scale ML programs from single-host to clustered applications,enabling us to perform faster experimentation, train models faster and on more data, and be all-round more productive ML engineers. Where PySpark has limitations in supporting asynchronous search, GPU utilization can be affected, and we introduce Maggy as a framework that adding asynchronous tasks to Spark, enabling new efficient directed hyperparameter search algorithms. Hopsworks also provides framework support for easy distributed training, using PySpark and CollectiveAllReduce from TensorFlow, enabling models to be trained faster on more GPUs, enabling more accurate models that can drive the business forward.

[1] https://www.youtube.com/watch?v=EeMCEQa85tw

[2] http://www.incompleteideas.net/IncIdeas/BitterLesson.html

Optimizing GPU utilization in Hops

>
ML Best Practices
>
9/28/2018
>
Robin Andersson

TLDR; This article describes how we use dynamic executors in PySpark to ensure GPUs are only allocated to executors only when they are training neural networks. When training is finished, GPUs are immediately freed up for use by other applications in the cluster. We also make hyperparameter optimization fault-tolerant using Spark’s executor blacklisting capability.

Click here to see a Spark Summit talk on PySpark, TensorFlow, and Hops.

When developers use Tensorflow/Keras/PyTorch to train deep neural networks in Hops, Hops uses PySpark to transparently distribute the python code to containers with GPUs. Hops enables AutoML, automated search for good neural network architectures and hyperparameters by running parallel experiments on different combinations of hyperparameters and model architectures. In PySpark, Hops runs a different experiment on each executor – not all of the experiments will finish at the same time. Some experiments may finish early, some later. And GPUs cannot currently be shared (multiplexed) by concurrent applications. Population-based approaches for AutoML, typically proceed in stages or iterations, meaning all experiments wait for other experiments to finish, resulting in idle GPU time. That is, GPUs lie idle waiting for other experiments to finish.

As such, we have the problem of how to free up the GPUs as soon as its experiment is finished. Hops leverages dynamic executors in PySpark/YARN to free up the GPU(s) attached to an executor immediately if it sits idle waiting for other experiments to finish, ensuring that (expensive) GPUs are held no longer than needed.

Spark/TensorFlow on Hops

Each Spark executor runs a local TensorFlow process. Hops also supports cluster-wide Conda for managing python library dependencies. Hops supports the creation of projects, and each project has its own conda environment, replicated at all hosts in the cluster. When you launch a PySpark job, it uses the local conda environment for that project. This way, users can install whatever libraries they like using conda and pip, and then use them directly inside Spark Executors. It makes programming PySpark one step closer to the single-host experience of programming Python. Hops also supports Jupyter and the SparkMagic kernel for running PySpark jobs.

GPUs are a ‘special resource’

YARN, Kubernetes, Mesos cannot ensure that GPU resources allocated to applications will be highly utilized. This is not a problem for traditional resources like CPU and memory. For example, if a workload of CPU-intensive applications hog CPU resources, but overprovision memory, this problem can be addressed by the resource scheduler over-provisioning the amount of memory available at a host. That is, if the host has 256GB of RAM, you can tell YARN the host has 350GB of memory available for applications. You should, of course, ensure the operating-system has enough virtual memory available to handle the configured level of over-provisioning, and, like an airline selling more seats than it has available, ensure that disk swapping almost never happens. Another alternative is to use a system like Dr Elephant that monitors applications’ resource utilization and warns applications when they overprovision memory, so that applications can be right-sized. This, of course, works best where workloads are predictable for a cluster, which is not always the case.

But GPUs are a special resource. We can’t overprovision them, because we can’t run multiple applications on GPUs at the same time. An application is given a GPU by a resource scheduler, and the resource scheduler trusts that the application will fully utilize the GPU while it has it. But as the Germans say, ‘trust is good, but control is better’. In Hops, we use Spark’s Dynamic Executors to ensure that GPUs are released immediately, when no longer needed.

For deep neural networks (Tensorflow/Keras/PyTorch), users wrap their code in a function like this:

   def train_fn(learning_rate, dropout_rate):
   …..
   # tensorflow training code goes here
   …..
   args_dict = {'learning_rate': [0.001, 0.0005, 0.0001], 'dropout': [0.45, 0.7]}
   experiment.grid_search(train_fn, args_dict)

The above code is an example of hyperparameter optimization for TensorFlow. The program will start run 6 instances of the train_fn, each a different combination of learning_rate and dropout parameters (see the Table below). If our PySpark application has allocated 6 executors then each executor will run one of hyperparameter combinations. It is also possible that fewer than 6 executors were allocated to the application. In that case, hyperparameter tasks (or trials) are placed in a queue, and executors pick them from the queue until all tasks are finished.

In the above example, we use grid_search as an exploration algorithm (to explore the performance of different combinations of hyperparameters). Another approach supported in Hops is genetic algorithms to perform population-based search for good combinations of hyperparameters (aka AutoML or neural architecture search). Other potential approaches are random walk, bayesian optimization, and hyperband.

Fault-tolerant hyperparameter optimization

In case a task fails on a PySpark executor, the task is typically retried up to N times on that same executor. If the task does not succeed after these N attempts the whole Spark Job will fail. Put that in to perspective of running a large hyperparameter search which may be running for days. Let’s assume as the PySpark executors are running the experiments, one of the servers experience a hardware malfunction, for example a disk breaks. The executors running on this machine will fail all their tasks and the job as a whole will be failed, due to this hardware failure. We need a smarter mechanism to rerun the specific experiments that failed on another machine.

Spark 2.1.0 added support for blacklisting problematic executors and even entire nodes. We take advantage of this mechanism to ensure hyperparameter optimization jobs are fault-tolerant. The behaviour that we configure is that an executor is blacklisted after an experiment fails for whatever reason and the allocated CPU/Memory/GPU(s) is reclaimed by YARN. The second time that the same experiment fails, it will blacklist the machine where the executor is running on. Which means that no more experiments will be run on that machine for the entire duration of the hyperparameter optimization job, the assumption here is that the failure may be hardware related. This means that the experiment which failed will now run on a new machine, if it fails a third time the job fails as a whole since the problem is most likely due to an application error and not hardware failure. In practice we have seen that this mechanism can also help remedy code errors, e.g. code which may fail occasionally is rerun up to three times.

GPU Support in Spark/YARN

In Hops, we have already added support for GPUs in a fork of Spark. This work is ongoing in the Spark community, with YARN/GPU support being developed in SPARK-2473.

References

Why you need a Distributed Filesystem for Deep Learning

>
ML Best Practices
>
10/17/2018
>
Jim Dowling

tl;dr When you train deep learning models with lots of high quality training data, you can beat state-of-the-art prediction models in a wide array of domains (image classification, voice recognition, and machine translation). Distributed filesystems are becoming increasingly indispensable as a central store for training data, logs, model serving, and checkpoints. HopsFS is a great choice, as it has native support for the main Python frameworks for Data Science: Pandas, TensorFlow/Keras, PySpark, and Arrow.

Prediction Performance Improves Predictably with Dataset Size

Baidu showed that the improvement in prediction accuracy (or reduction in generalization error) for deep learning models was predictable based on the amount of training data. The decrease in generalization error with increasing training dataset size follows a power-law distribution(as seen by the straight lines in the log-log graph below). This astonishing result came from a large-scale study in the different application domains of machine translation, language modeling, image classification, and speech recognition. Given that this result holds true in vastly different application domains, there is a good chance the same result holds true for your particular application domain. This result is important for companies considering investing in deep learning – if it costs $X to collect or generate a new GB of high quality training data, you can predict the improvement of prediction accuracy for your model, given the slope, Y, of the log-log graph you have observed while training.

[Baidu Research http://research.baidu.com/deep-learning-scaling-predictable-empirically/ ]

Predictable ROI in the Power-Law Region

This predictable return-on-investment (ROI) for collecting/generating more training data is slightly more complex that the one described above. You first need to collect enough training data to get beyond the “Small Data Region” in the diagram below. That is, you can only make predictions if you have enough data that you are in the “Power-Law Region”.

[Baidu18 https://arxiv.org/pdf/1712.00409.pdf ]

You can determine this by graphing the reduction in your generalization error as a function of your training data size on a log-log scale. After you start observing the straight line on your model, calculate the exponent of your power-law graph (the slope of the graph). Baidu’s empirically-collected learning curves showed exponents in the range [-0.35, -0.07] – suggesting models learn real-world data more slowly than suggested by theory (theoretical models indicate the power-law exponent is expected to be -0.5).

Still, if you observe the power-law region, increasing your training data set size will give you a predictable decrease in generalization error. For example, if you are training an image classifier for a self-driving vehicle, the number of hours your cars have driven autonomously determines your training data size. So, going from 2m hours to 6m hours of autonomous driving should reduce errors in your image classifier by a predictable amount. This is important in giving businesses a level of certainty in the improvements they can expect when making large investments in new data collection or generation.

Need for a Distributed Filesystem

The TensorFlow team say a distributed filesystem is a must for deep learning. Datasets are getting larger, GPUs are disaggregated from storage, workers with GPUs need to coordinate for model checkpointing, hyperparameter optimization, and model-architecture search. Your system may grow beyond a single server, or you may have different servers for serving your models from the servers you have for training your models. A distributed filesystem is the glue that holds together the different stages of your machine learning workflows, and it enables teams to share both GPU hardware and data. What is important is that the distributed filesystem works with your choice of programming language and deep learning framework(s).

A distributed filesystem is needed for managing logs, tensorboard, coordinating GPUs for experiments, storing checkpoints during training, and storing/serving models.

HopsFS is a great choice as a distributed filesystem, due to it being a drop-in replacement for HDFS. HopsFS/HDFS are supported in major Python frameworks: Pandas, PySpark DataFrames, TensorFlow Data, and so on. In Hopsworks, we provide built-in HopsFS/HDFS support with the pydoop library. HopsFS has one additional feature that is aimed at machine learning workloads: improved throughput and lower latency reading/writing for small files. In a peer reviewed paper at Middleware 2018, we showed throughput improvements of up to 66X compared to HDFS for small files.

Python Support in Distributed Filesystems

As we can see from the table below, the choice of distributed filesystem will affect what you can do.

Python Support in HopsFS

We now give some simple examples of how to write Python code to use datasets in HopsFS. Complete notebooks can be found here.

Pandas with HopsFS

import hops.hdfs as hdfs

cols = [“Age”, “Occupation”, “Sex”, …, “Country”]
h = hdfs.get_fs()
with h.open_file(hdfs.project_path()+“/TestJob/data/census/adult.data”, “r”) as f:
train_data=pd.read_csv(f, names=cols, sep=r’\s*,\s*’,engine=‘python’,na_values=“?”)

In Pandas, the only change we need to make to our code, compared to a local filesystem, is to replace open_file(..) with h.open_file(..), where h is a file handle to HDFS/HopsFS.

PySpark with HopsFS

from mmlspark import ImageTransformer

IMAGE_PATH=“/Projects/myProj/Resources/imgs”
images = spark.readImages(IMAGE_PATH, recursive = True, sampleRatio = 0.1).cache()
tr = (ImageTransformer().setOutputCol(“transformed”)
   .resize(height = 200, width = 200)
   .crop(0, 0, height = 180, width = 180) )

smallImgs = tr.transform(images).select(“transformed”)
smallImgs.write.save(“/Projects/myProj/Resources/small_imgs”, format=“parquet”)

TensorFlow Datasets with HopsFS

def input_fn(batch_sz):
files = tf.data.Dataset.list_files(IMAGE_PATH)
def tfrecord_dataset(f):
return tf.data.TFRecordDataset(f, num_parallel_reads=32, buffer_size=8*1024*1024)
dataset = files.apply(tf.data.parallel_interleave(tfrecord_dataset,cycle_length=32))
dataset = dataset.prefetch(4)
return dataset

References

When Deep Learning with GPUs, use a Cluster Manager

>
ML Best Practices
>
12/18/2018
>
Jim Dowling

TLDR; If you are employing a team of Data Scientists or working with Deep Learning, a cluster manager that can share GPUs between your team will maximize utilization of your GPUs and make your Data Scientists happier.

Anti-Patterns for Sharing GPUs

At Logical Clocks, we talk to lots of customers about how they share GPUs between their teams, and surprisingly many companies still resort to using Google calendars or fixed schedules to share GPUs. Many companies do not even share GPUs across lines of business, which is even worse. It goes without saying that these approaches are bad for getting the most out of your investments in GPUs and bad for developers – as they may not be able to use all available GPUs when they need them.

GPUs-as-a-Resource

Resource managers are used to manage the compute and GPU resources available in a Data Center or organization. Developers can run an applications using a Resource Manager by submitting an application request to the cluster: please start my application with X containers, where each container has Y CPUs and X GB of memory. The Resource Manager then schedules the application for execution when it can allocate those resources to the application. For Deep Learning, we need GPUs, and some modern Resource Managers support GPUs-as-a-Resource, where you can also request that a container has N GPUs attached to it. Deep Learning applications can make special demands on Resource Managers. For distributed training (with >1 GPU), applications request all of their GPUs at the same time – so-called, Gang Scheduling. However, for hyperparameter optimization, applications can start work with just 1 GPU and make use more GPUs as they are incrementally allocated by the Resource Manager. For both Gang Scheduling and Incremental Allocation to work correctly, support is needed in both the application software and the Resource Manager.

Distributed Training needs Gang Scheduling support from the Resource Manager to give it its GPUs at the same time. If Gang Scheduling is not supported or partly supported, Distributed Training can either be starved indefinitely or deadlock the Resource Manager.
Hyperparameter Optimization can use Incremental Allocation of GPUs by the Resource Manager. It can make progress with just 1 GPU or go faster with more available GPUs.

Machine Learning Workflows

When machine learning migrates from R&D into production, model training typically becomes one stage in a longer machine learning workflow that involves (1) collecting and preparing data for training, (2) training/validating the models, and (3) deploying the models for serving. If data volumes are large, stage (1) may require many containers with lots of CPUs for ETL and/or feature engineering. Spark/PySpark is a popular framework for this stage. For stage (2), training can be done with frameworks like PyTorch or Keras/TensorFlow. Distributed training can be done with the help of frameworks like HopsML. Finally, stage (3) involves deploying models to production for serving. This may be done on a different cluster or the same cluster. Kubernetes is a popular framework for model serving, due to its support for load-balancing and elasticity.

A Machine Learning workflow, consisting of a DataPrep phase, a training phase and a model serving phase will require different sets of resources from the cluster. DataPrep typically requires CPUs, training requires GPUs, and serving requires CPUs (and maybe GPUs for low latency model serving).

YARN, Mesos, Slurm, Kubernetes

There are a number of Data Center Resource Managers that support GPUs-as-a-Resource:

  • YARN
  • Kubernetes
  • Mesos (DC/OS)
  • Slurm

YARN is the dominant resource scheduler for on-premise data lakes, and since release 3.1 of Hadoop, it has full support for GPUs-as-a-Resource. Hops’ YARN is a fork of Hadoop that has supported GPU-as-a-Resource since October 2017. Neither version support gang scheduling, but Hops’ layers gang scheduling semantics over YARN using PySpark and the HopsML API. Essentially, training applications are run in a single map operation that is gang scheduled by HopsML on PySpark Executors. In Spark 2.4, there is a new barrier execution mode to also support Gang Scheduling for distributed training.

Mesos does not support gang scheduling, but, similar to how HopsML adds support for gang scheduling using Spark to YARN, Uber have added support gang scheduling for distributed training using Spark in a platform called Peleton. Unfortunately, Peleton is currently not open source, and Uber are talking about migrating Peleton to Kubernetes. Kubernetes is currently working on supporting gang-scheduling (or co-scheduling as they call it), and it can be expected to be included in frameworks like KubeFlow some time later in 2019. Slurm is traditionally used for HPC clusters, and is not widely adopted in the cloud or for data lake clusters, but supports gang scheduling natively.
We now discuss how Resource Managers are used by Data Scientists from two open-source frameworks: KubeFlow on Kubernetes and Hopsworks on Hops YARN.

Data Scientist experience with KubeFlow

Kubernetes provides support for creating clusters with GPUs using a cluster specification in YAML.

An example: 

apiVersion: v1
kind: Pod
metadata:
   name: cuda-vector-add
spec:
   restartPolicy: OnFailure
   containers:
      – name: cuda-vector-add
# https://github.com/kubernetes/kubernetes/blob/v1.7.11/test/images/nvidia-cuda/Dockerfile
         image: "k8s.gcr.io/cuda-vector-add:v0.1"
         resources:
            limits:
               nvidia.com/gpu: 1 # requesting 1 GPU

Data Scientists typically do not use Kubernetes directly, as this involves too much devops: YAML specifications, Dockerfiles to install Python libraries and other packages. Instead, Kubeflow is typically used to provision clusters and train deep neural networks using GPUs from the command-line.

First, Data scientists can use the command-line you can check the availability of GPUs in the cluster.

Using commands such as:

$ kubectl describe nodes | grep -B 3 gpu
Capacity:
cpu:             8
memory:          32879772Ki
nvidia.com/gpu:  2

Allocatable:
cpu:             8
memory:          32777372Ki
nvidia.com/gpu:  2

Then, assuming you have installed Kubeflow, such as using this tutorial, you can train deep neural networks using Kubeflow on GPUs using commands such as the following.

Kubeflow commands:

ks generate tf-job mnist –name=mnist –namespace=mykubeflow
# examine, then set the cluster configuration parameters

ks param list

COMPONENT                  PARAM          VALUE
=========                  =====         =====
mnist             args          "null"
mnist             image         "null"
mnist             image_gpu.    "null"
mnist             name                 "mnist"
mnist             namespace     "mykubeflow"
mnist             num_gpus 0
mnist             num_masters 1
mnist             num_ps  0
mnist             num_workers 0

IMAGE=docker.io/raddaoui/tfjob_mnist_image:2.0
ks param set mnist image ${IMAGE}
ks param set mnist num_ps 2
ks param set mnist num_workers 3
ks param set mnist num_masters 0
ks param set  mnist args — python,/opt/mnist_replica.py

# start training
ks apply default -c

Data Scientist experience with Hopsworks

Hopsworks is our scale-out platform for machine learning and data analytics, based on a next-generation distribution of Hadoop called Hops. In the Hopsworks UI, a Data Scientist can quickly see the number of available GPUs in the cluster:

Launching a cluster with a number of GPUs is as simple as deciding on the number of GPUs and the amount of memory to allocate to the Application Master and the Executor:

Finally, a Data Scientist can install Python libraries using pip and conda (there is no need to write a Dockerfile):


Machine Learning Workflows in Hopsworks

In Hopsworks, we support YARN for both (1) DataPrep and (2) Training stages, and (3) Kubernetes for model serving, see Figure below. Typically, DataPrep is done on PySpark or Spark and the output of that stage is that training data is written to our distributed filesystem, HopsFS. Training is typically done by launching PyTorch or TensorFlow/Keras applications using PySpark, and trained models are stored on HopsFS. Finally, models are served in Kubernetes by reading them from HopsFS.

A Machine Learning workflow in HopsML can run the DataPrep stage on PySpark, (Distributed) Training on TensorFlow/PyTorch, and Model Serving on Kubernetes. A distributed filesystem, HopsFS, is used to integrate the different stages, and YARN is used to allocate both CPUs for the PySpark stage and GPUs for the training phase.

Is a Cluster Manager enough?

In our talk at the Spark Summit Europe 2018, we argued that a cluster manager by itself is not enough to make the most efficient use of your GPUs. Data scientists may write Python programs in Jupyter notebooks where they both train and visualize using the same resources. For example, a developer can write a cell in Jupyter to train a network on one or more GPUs, then write a subsequent cell to evaluate the performance of that model, and then have one or more cells to visualize or validate the trained model. While the Data Scientist is visually analyzing the trained model, she is unnecessarily using valuable GPU resources. GPUs should be freed immediately after training/evaluation has completed – unrelated to how you implement gang scheduling. You can make sure GPUs are freed up immediately by using (1) discipline and a distributed filesystem or (2) HopsML. For (1), developers can write their trained models and evaluation dataset to a distributed filesystem and shutdown the containers used to train the model when training has finished. Then open a new Python notebook with access to the same distributed filesystem to visually inspect the trained model. For (2), in HopsML, developers put their training code in a function in Python which is run on a GPU and when the function returns its associated GPU is released after a few seconds on inactivity. HopsML implements this behaviour using Dynamic Executors in PySpark – for more details read this blog post.

An example of how to structure your code in HopsML is shown below:

def train_fn():
   # training code goes here

from hops import experiment
   experiment.launch(train_fn)

Summary

A cluster manager, such as YARN for Hopsworks, will help you get the most value from your GPUs and keep your Data Scientists happy by enabling them to be more productive with both distributed training of models and hyperparameter optimization.

References

Feature Store: The Missing Data Layer in ML Pipelines?

>
ML Best Practices
>
12/30/2018
>
Kim Hammar

TLDR; A feature store is a central vault for storing documented, curated, and access-controlled features. In this blog post, we discuss the state-of-the-art in data management for deep learning and present the first open-source feature store, available in Hopsworks.

What is a Feature Store?

The concept of a feature store was introduced by Uber in 2017 [11]. The feature store is a central place to store curated features within an organization. A feature is a measurable property of some data-sample. It could be for example an image-pixel, a word from a piece of text, the age of a person, a coordinate emitted from a sensor, or an aggregate value like the average number of purchases within the last hour. Features can be extracted directly from files and database tables, or can be derived values, computed from one or more data sources.

Features are the fuel for AI systems, as we use them to train machine learning models so that we can make predictions for feature values that we have never seen before.

Figure 1. A feature store is the interface between feature engineering and model development.

The feature store has two interfaces:

Writing to the feature store: The interface for Data Engineers. At the end of the feature engineering pipeline, instead of writing features to a file or a project-specific database or file, features are written to the feature store.

Data Engineer example:

from hops import featurestore
raw_data = spark.read.parquet(filename)

polynomial_features = raw_data.map(lambda x: x^2)

featurestore.insert_into_featuregroup(polynomial_features, "polynomial_featuregroup")

Reading from the feature store: The interface for Data Scientists. To train a model on a set of features, the features can be read from the feature store directly.

Data Scientist example:

from hops import featurestore

features_df = featurestore.get_features(["average_attendance", "average_player_age"])

A feature store is not a simple data storage service, it is also a data transformation service as it makes feature engineering a first-class construct. Feature engineering is the process of transforming raw data into a format that is understandable for predictive models.

Why You Need a Feature Store

At Logical Clocks we are committed to developing technologies to operate machine learning workflows at large scale and to help organizations distill intelligence from data. Machine learning is an extremely powerful method that has the potential to help us move from a historical understanding of the world to a predictive modeling of the world around us. However, building machine learning systems is hard and requires specialized platforms and tools.

Although ad-hoc feature engineering and training pipelines is a quick way for Data Scientists to experiment with machine learning models, such pipelines have a tendency to become complex over time. As the number of models increase, it quickly becomes a pipeline jungle that is hard to manage. This motivates the usage of standardized methods and tools for the feature engineering process, helping reduce the cost of developing new predictive models. The feature store is a service designed for this purpose.

Technical Debt in Machine Learning Systems

“Machine Learning: The High-Interest  Credit Card of Technical Debt”

Google [3]

Machine learning systems have a tendency to assemble technical debt [1]. Examples of technical debt in machine learning systems are:

  • There is no principled way to to access features during model serving.
  • Features cannot easily be re-used between multiple machine learning pipelines.
  • Data science projects work in isolation without collaboration and re-use.
  • Features used for training and serving are inconsistent.
  • When new data arrives, there is no way to pin down exactly which features need to be recomputed, rather the entire pipeline needs to be run to update features.

Several organizations that we have spoken to struggle to scale their machine learning workflows due to the technical complexity, and some teams are even reluctant to adopting machine learning considering the high technical cost of it. Using a feature store is a best practice that can reduce the technical debt of machine learning workflows.

“Pipeline jungles can only be avoided by thinking holistically about data collection and feature extraction”

Google [1]

Data Engineering is the hardest problem in Machine Learning

“Data is the hardest part of ML and the most important piece to get right. Modelers spend most of their time selecting and transforming features at training time and then building the pipelines to deliver those features to production models. Broken data is the most common cause of problems in production ML systems”

Uber [2]

Delivering machine learning solutions in production and at large-scale is very different from fitting a model to a pre-processed dataset. In practice, a large part of the effort that goes into developing a model is spent on feature engineering and data wrangling.

Figure 2. Model development is just one part of the work that goes into machine learning project.

There are many different ways to extract features from raw data, but common feature engineering steps include:

  • Converting categorical data into numeric data;
  • Normalizing data (to alleviate ill-conditioned optimization when features originate from different distributions);
  • One-hot-encoding/binarization;
  • Feature binning (e.g., convert continuous features into discrete);
  • Feature hashing (e.g., to reduce the memory footprint of one-hot-encoded features);
  • Computing polynomial features;
  • Representation learning (e.g.,  extract features using clustering, embeddings, or generative models);
  • Computing aggregate features (e.g., count, min, max, stdev).

To illustrate the importance of feature engineering, let’s consider a classification task on a dataset with just one feature, x1, that looks like this:

Figure 3. A dataset with a single feature, x1, that have two classes (filled circle and non-filled circle) that are not linearly separable.

We are doomed to fail if we try to fit a linear model directly to this dataset as it is not linearly separable. During feature engineering we can extract an additional feature, x2, where the function for deriving x2 from the raw dataset is x2 = (x1)^2. The resulting two-dimensional dataset might look like depicted in Figure 2.

Figure 4. A dataset with two features, x1 and x2, that have two classes (filled circle and non-filled circle) that are linearly separable (e.g., by the red line).

By adding an extra feature, the dataset becomes linearly separable and can be fitted by our model. This was a simple example, in practice the process of feature engineering can involve much more complex transformations.

In the case of deep learning, deep models tend to perform better the more data they are trained on (more data samples during training can have a regularizing effect and combat overfitting). Consequently, a trend in machine learning is to train on increasingly larger datasets. This trend further complicates the feature engineering process as Data Engineers must think about scalability and efficiency in addition to the feature engineering logic. With a standardized and scalable feature platform, the complexity of feature engineering can be managed more effectively.

Life Before the Feature Store

Figure 5. A typical machine learning infrastructure without a feature store.

No Feature Store

In Figure 5, feature code is duplicated across training jobs and there are also features that have different implementations:  one for training, and one for deployment (Inferencing) (Model C). Having different implementations for computing features for training and deployment entails non-DRY code and can lead to prediction problems. Moreover, without a feature store, features are typically not reusable as they are embedded in training/serving jobs. This also means that Data Scientists have to write low level code for accessing data stores, requiring data engineering skills. There is also no service to search for feature implementations, and there is no management or governance of features.

Figure 6. A machine learning Infrastructure with a feature store.

With a Feature Store

Data Scientists can now search for features, and with API support, easily use them to build models with minimal data engineering. In addition, features can be cached and reused by other models, reducing model training time and infrastructure costs. Features are now a managed, governed asset in the Enterprise.

Economies of Scale for Machine Learning Organizations

A frequent pitfall for organizations that apply machine learning is to think of data science teams as individual groups that work independently with limited collaboration. Having this mindset results in machine learning workflows where there is no standardized way to share features across different teams and machine learning models. Not being able to share features across models and teams is limiting Data Scientist's productivity and makes it harder to build new models. By using a shared feature store, organizations can achieve an economies-of-scale effect. When the feature store is built up with more features, it becomes easier and cheaper to build new models as the new models can re-use features that exist in the feature store.

Figure 7. By centralizing feature storage within the organization, the ramp-up period for new models and machine learning projects is reduced.

Hopsworks Feature Store

With Hopsworks 0.8.0 we are releasing the first open-source feature store service that will be integrated in the HopsML framework [8]. In this section we cover the technical details of the system and how it is used.

The Components of a Feature Store and a Comparison of Existing Feature Stores

During 2018, a number of large companies that are at the forefront of applying machine learning at scale announced the development of proprietary feature stores. Uber, LinkedIn, and Airbnb built their feature stores on Hadoop data lakes, while Comcast built a feature store on an AWS data lake, and GO-JEK built a feature store on Google’s data platform.

These existing feature stores consist of five main components:

  • The feature engineering jobs, the computation of features, the dominant frameworks for feature computation are Samza (Uber [4]),Spark (Uber [4], Airbnb [5], Comcast [6]), Flink (Airbnb [5], Comcast [6]), and Beam (GO-JEK [7]).
  • The storage layer for storing feature data. Common solutions for storing features are Hive (Uber [4], Airbnb [5]), S3 (Comcast [6]), and BigQuery (GO-JEK [7]).
  • The metadata layer used for storing code to compute features, feature version information, feature analysis data, and feature documentation.
  • The Feature Store API used for reading/writing features from/to the feature store.
  • The feature registry, a user interface (UI) service where Data Scientists can share, discover, and order computation of features.

Before we dive into the feature store API and its usage, let’s have a look at the technology stack that we built our feature store on.

Hopsworks Feature Store Architecture

Figure 8. Architecture of Hopsworks Feature Store

Feature Engineering Frameworks

At Logical Clocks we specialize in Python-first ML pipelines, and for feature engineering we focus our support on Spark, PySpark, Numpy, and Pandas. The motivation for using Spark/PySpark to do feature engineering is that it is the preferred choice for data wrangling among our users that are working with large-scale datasets. However, we have also observed that users working with small datasets prefer to do the feature engineering with frameworks such as Numpy and Pandas, which is why we decided to provide native support for those frameworks as well. Users can submit feature engineering jobs on the Hopsworks platform using notebooks, python files, or .jar files.

The Storage Layer

We have built the storage layer for the feature data on top of Hive/HopsFS with additional abstractions for modeling feature data. The reason for using Hive as the underlying storage layer is two-fold: (1) it is not uncommon that our users are working with datasets in terabyte-scale or larger, demanding scalable solutions that can be deployed on HopsFS (see blog post on HopsFS [9]); and (2) data modeling of features is naturally done in a relational manner, grouping relational features into tables and using SQL to query the feature store. This type of data modelling and access patterns fits well with Hive in combination with columnar storage formats such as Parquet or ORC.

The Metadata Layer

To provide automatic versioning, documentation, feature analysis, and feature sharing we store extended metadata about features in a metadata store. For the metadata store we utilize NDB (MySQL Cluster) which allows us to keep feature metadata that is strongly consistent with other metadata in Hopsworks, such as metadata about feature engineering jobs and datasets.

Feature Data Modeling

We introduce three new concepts to our users for modeling data in the feature store.

  • The feature is an individual versioned and documented data column in the feature store, e.g., the average rating of a customer.
  • The feature group is a documented and versioned group of features stored as a Hive table. The feature group is linked to a specific Spark/Numpy/Pandas job that takes in raw data and outputs the computed features.
  • The training dataset is a versioned and managed dataset of features and labels (potentially from multiple different feature groups). Training datasets are stored in HopsFS as tfrecords, parquet, csv, tsv, hdf5, or .npy files.
Figure 9. A feature group contains a group of features and a training dataset contains a set of features, potentially from many different feature groups.

When designing feature groups, it is a best-practice to let all features that are computed from the same raw dataset to be in the same feature group. It is common that there are several feature groups that share a common column, such as a timestamp or a customer-id, that allows feature groups to be joined together into a training dataset.

The Feature Store API

The feature store has two interfaces; one interface for writing curated features to the feature store and one interface for reading features from the feature store to use for training or serving.  

Creating Features

The feature store is agnostic to the method for computing the features. The only requirement is that the features can be grouped together in a Pandas, Numpy, or Spark dataframe. The user provides a dataframe with features and associated feature metadata (metadata can also be edited later through the feature registry UI) and the feature store library takes care of creating a new version of the feature group, computing feature statistics, and linking the features to the job to compute them.

Insert Features

from hops import featurestore

featurestore.insert_into_featuregroup(features_df, featuregroup_name)

Create Feature Group

from hops import featurestore

featurestore.create_featuregroup(
   features_df,
   featuregroup_name,
   featuregroup_description,
   feature_engineering_job,
   featuregroup_version
)

Reading From the Feature Store (Query Planner)

To read features from the feature store, users can use either SQL or APIs in Python and Scala. Based on our experience with users on our platform, Data Scientists can have diverse backgrounds. Although some Data Scientists are very comfortable with SQL, others prefer higher level APIs. This motivated us to develop a query-planner to simplify user queries. The query-planner enables users to express the bare minimum information to fetch features from the feature store. For example, a user can request 100 features that are spread across 20 different feature groups by just providing a list of feature names. The query-planner uses the metadata in the feature store to infer where to fetch the features from and how to join them together.

Figure 10. Users query the feature store programmatically or with SQL queries. The output is provided as Pandas, Numpy, or Spark dataframes.

To fetch the features “average_attendance” and “average_player_age” from the feature store, all the user has to write is this.

Example to fetch features

from hops import featurestore

features_df = featurestore.get_features(["average_attendance", "average_player_age"])

The returned “features_df” is a (Pandas, Numpy, or Spark) dataframe that can then be used to generate training datasets for models.

Creating Training Datasets

Organizations typically have many different types of raw datasets that can be used to extract features. For example, in the context of user recommendation there might be one dataset with demographic data of users and another dataset with user activities. Features from the same dataset are naturally grouped into a feature group, and it is common to generate one feature group per dataset. When training a model, you want to include all features that have predictive power for the prediction task, these features can potentially span multiple feature groups. The training dataset abstraction in the Hopsworks Feature Store is used for this purpose. The training dataset allows users to group a set of features with labels for training a model to do a particular prediction task.

Once a user has fetched a set of features from different feature groups in the feature store, the features can be joined with labels (in case of supervised learning) and materialized into a training dataset. By creating a training dataset using the feature store API, the dataset becomes managed by the feature store. Managed training datasets are automatically analyzed for data anomalies, versioned, documented, and shared with the organization.

Figure 11. The life-cycle of data in HopsML. Raw data is transformed into features which are grouped together into training datasets that are used to train models.

To create a managed training dataset, the user supplies a Pandas, Numpy or Spark dataframe with features, labels, and metadata.

Create a managed training dataset

from hops import featurestore

features_df = featurestore.get_features(["average_attendance", "average_player_age"])

featurestore.create_training_dataset(
   features_df,
   training_dataset_name,
   training_dataset_description,
   computation_job,
   training_dataset_version,
   data_format="tfrecords"
)

Once the training dataset has been created, the dataset is discoverable in the feature registry and users can use it to train models. Below is an example code snippet for training a model using a training dataset stored in a distributed manner in the tfrecords format on HopsFS.

Training a model using a training dataset

from hops import featurestore

import tensorflow as tf

dataset_dir = featurestore.get_training_dataset_path(td_name)


# the tf records are written in a distributed manner using partitions

input_files = tf.gfile.Glob(dataset_dir + "/part-r-*")


# tf record schemas are managed by the feature store

tf_record_schema = featurestore.get_training_dataset_tf_record_schema(td_name)


# tf records are a sequence of *binary* (serialized with protobuf) records that need to be decoded.

def decode(example_proto):

   return tf.parse_single_example(example_proto, tf_record_schema)

dataset = tf.data.TFRecordDataset(input_files)
   .map(decode)
   .shuffle(shuffle_buffer_size)
   .batch(batch_size)
   .repeat(num_epochs)


# three layer MLP for regression

model = tf.keras.Sequential([
   layers.Dense(64, activation="relu"),
   layers.Dense(64, activation="relu"),
   layers.Dense(1)
])

model.compile(optimizer=tf.train.AdamOptimizer(lr), loss="mse")

model.fit(dataset, epochs=num_epochs, steps_per_epoch=spe)

The Feature Registry

The feature registry is the user interface for publishing and discovering features and training datasets. The feature registry also serves as a tool for analyzing feature evolution over time by comparing feature versions. When a new data science project is started, Data Scientists within the project typically begin by scanning the feature registry for available features, and only add new features for their model that do not already exist in the feature store.

Figure 12. Feature Registry on Hopsworks.

The feature registry provides :

  • Keyword search on feature/feature group/training dataset metadata.
  • Create/Update/Delete/View operations on feature/feature group/training dataset metadata.
  • Automatic feature analysis.
  • Feature dependency tracking.
  • Feature job tracking.
  • Feature data preview.

Automatic Feature Analysis

When a feature group or training dataset is updated in the feature store, a data analysis step is performed. In particular, we look at cluster analysis, feature correlation, feature histograms, and descriptive statistics. We have found that these are the most common type of statistics that our users find useful in the feature modeling phase. For example, feature correlation information can be used to identify redundant features, feature histograms can be used to monitor feature distributions between different versions of a feature to discover covariate shift, and cluster analysis can be used to spot outliers. Having such statistics accessible in the feature registry helps users decide on which features to use.

Figure 13. Viewing the feature correlation for a training dataset using the feature registry.


Figure 14. Viewing the distribution of a feature in a feature group using the feature registry.

Feature Dependencies and Automatic Backfilling

When the feature store increases in size, the process of scheduling jobs to recompute features should be automated to avoid a potential management bottleneck. Feature groups and training datasets in Hopsworks feature store are linked to Spark/Numpy/Pandas jobs which enables  the reproduction and recompution of the features when necessary. Moreover, each feature group and training dataset can have a set of data dependencies. By linking feature groups and training datasets to jobs and data dependencies, the features in the Hopsworks feature store can be automatically backfilled using workflow management systems such as Airflow [10].

Figure 15. Feature dependency tracking.

A Multi-Tenant Feature Store Service

We believe that the biggest benefit of a feature store comes when it is centralized across the entire organization. The more high-quality features available in the feature store the better. For example, in 2017 Uber reported that they had approximately 10000 features in their feature store [11].

Despite the benefit of centralizing features, we have identified a need to enforce access control to features. Several organizations that we have talked to are working partially with sensitive data that requires specific access rights that is not granted to everyone in the organization. For example, it might not be feasible to publish features that are extracted from sensitive data to a feature store that is public within the organization.

To solve this problem we utilize the multi-tenancy property built-in to the architecture of the Hopsworks platform [12]. Feature stores in Hopsworks are by default project-private and can be shared across projects, which means that an organization can combine public and private feature stores. An organization can have a central public feature store that is shared with everyone in the organization as well as private feature stores containing features of sensitive nature that are only accessible by users with the appropriate permissions.

Figure 16. Based on the organization need, features can be divided into several feature stores to preserve data access control.

Future Work

The feature store covered in this blog post is a so called batch feature store, meaning that it is a feature store designed for training and non-real time model serving. In future work, we plan to extend the feature store to meet real-time guarantees that are required during serving of user-facing models. Moreover, we are currently in the process of evaluating the need for a Domain Specific Language (DSL) for feature engineering. By using a DSL, users that are not proficient in Spark/Pandas/Numpy can provide an abstract declarative description of how features should be extracted from raw data and then the library translates that description into a Spark job for computing the features. Finally, we are also looking into supporting Petastorm [13] as a data format for training datasets. By storing training datasets in Petastorm we can feed Parquet data directly into machine learning models in an efficient manner. We consider Petastorm as a potential replacement for tfrecords, that can make it easier to re-use training datasets for other ML-frameworks than Tensorflow, such as PyTorch.

Summary

Building successful AI systems is hard. At Logical Clocks we have observed that our users spend a lot of effort on the Data Engineering phase of machine learning. From the release of version 0.8.0, Hopsworks provides the world’s first open-source feature store. A feature store is a data management layer for machine learning that allows Data Scientists and Data Engineers to share and discover features, better understand features over time, and effectivize the machine learning workflow.

References

[1] Hidden Technical Debt in Machine Learning Systems: https://papers.nips.cc/paper/5656-hidden-technical-debt-in-machine-learning-systems.pdf

[2] Scaling Machine Learning at Uber with Michelangelo: https://eng.uber.com/scaling-michelangelo/

[3] Machine Learning: The High-Interest Credit Card of Technical Debt: https://static.googleusercontent.com/media/research.google.com/sv//pubs/archive/43146.pdf

[4] Scaling Machine Learning as a Service (Uber): http://proceedings.mlr.press/v67/li17a/li17a.pdf

[5] Zipline: Airbnb’s Machine Learning Data Management Platform: https://databricks.com/session/zipline-airbnbs-machine-learning-data-management-platform

[6] Operationalizing Machine Learning—Managing Provenance from Raw Data to Predictions: https://databricks.com/session/operationalizing-machine-learning-managing-provenance-from-raw-data-to-predictions

[7] Building a Feature Platform to Scale Machine Learning | DataEngConf BCN '18: https://www.youtube.com/watch?v=0iCXY6VnpCc

[8] HopsML, Python-First ML Pipelines: https://hops.readthedocs.io/en/latest/hopsml/hopsML.html

[9] Fixing the Small Files Problem in HDFS: https://www.logicalclocks.com/fixing-the-small-files-problem-in-hdfs/

[10] Airflow: https://airflow.apache.org/

[11] Meet Michelangelo: Uber’s Machine Learning Platform: https://eng.uber.com/michelangelo/

[12] Introducing Hopsworks: https://www.logicalclocks.com/introducing-hopsworks/

[13] Petastorm: https://github.com/uber/petastorm

[14] Deep learning scaling is predictable, empirically: https://blog.acolyer.org/2018/03/28/deep-learning-scaling-is-predictable-empirically/

[15] Feature Store at LinkedIn https://www.thestrangeloop.com/2018/democratizing-ai—back-fitting-end-to-end-machine-learning-at-linkedin-scale.html