Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.

Distributed ML Experiments on Databricks with Maggy

Riccardo Grigoletto

TLDR; Maggy is an open-source framework that simplifies writing and maintaining distributed machine learning programs. By encapsulating your training logic in a function, the same code can be run unchanged with Python on your laptop or distributed using PySpark for hyperparameter tuning, data-parallel training, or model-parallel training. With the arrival of GPU support in Spark 3.0, PySpark can be now used to orchestrate distributed deep learning applications in TensorFlow and PySpark.  We are pleased to announce we have now added support for Maggy on Databricks, so training machine learning models with many workers should be as easy as running Python programs on your laptop. 


Machine Learning is going to be distributed, as Andrew Ng calls it, Data-centric AI. Data volumes are constantly increasing, and models are known to improve their prediction accuracy predictably - so companies can often know what return they will get in terms of more accurate models by simply investing in collecting more training data. One of the main challenges for developers is to switch from programming for a single-machine to programming for a distributed cluster.

That's why we developed Maggy (, an open source Python library that introduces a new unified framework for writing core ML training logic as oblivious training functions. By encapsulating your training logic in a Python function (we call it an oblivious training function), the same code can be run unchanged in Python or PySpark. You don’t need to rewrite your code to do hyperparameter tuning, data-parallel training, or model-parallel training.

Maggy can be extremely useful in the case you want to build a distributed ML solution but you have no prior experience of distributed computing.  It is enough to just refactor your training code to wrap it inside a function (this is good development practice, in case you didn’t know) and let Maggy do the distribution magic for you.

Furthermore, it is not important whether you use Tensorflow or PyTorch as Maggy supports both of them. Scikit-Learn and XGBoost are also supported, and are used, in particular, for  parallelizing hyperparameter tuning over many workers.

Maggy on Databricks

In the following example, we use Maggy to train a Neural Network on the Iris dataset.

The first thing we have to do  is to create a cluster on your Databricks workspace. Maggy has been tested on the Databricks Runtime version 7.4 ML. Choose the number of workers you need and you are good to go.

If you don't know how to create a cluster, please follow the tutorial on this page. If it is your first time on Databricks, make sure to get familiar with the platform and prices. 

Once you created the cluster, you have to install Maggy. In order to do this, just navigate to your cluster and click Libraries, then click on Install New and PyPi, write 'maggy' as Package and Install.

Once you have installed Maggy, do the same thing with TensorFlow version 2.4 or higher. For example, on the Package field you can write 'tensorflow==2.4'.

Now we can open the Databricks notebook and write our first Maggy program.

Maggy workflow

In order to use Maggy on your workflow, we need to do the following:

  1. Create classes and functions to (a) create your model, (b) create your dataset,  and (c) define your model training logic.
  2. Pass the dataset or the dataset path, the data processing function and the hyperparameters to the configuration function of Maggy.
  3. Call the 'lagom' function of Maggy passing the training function and the configuration from the previous step.

We are now going to present an example on how to implement this workflow. We are going to distribute the training of a (very) simple machine learning model on the iris dataset. 

You can use this notebook as reference.

1a. Model creation

First of all, we have to wrap our ML model in a class, the class has to be an implementation of tf.keras.Model. 

It's important to note that we are not instantiating the class, we need to pass the class to Maggy, not an instance of it.

In this example we define a class called NeuralNetwork. It is a superclass of tf.keras.Sequential, an implementation of tf.keras.Model. Make sure that your class implements tf.keras.Model. Finally, we define our ML model in the init function.

-- CODE language-bash -- from tensorflow.keras.layers import Dense from tensorflow.keras import Sequential # you can use keras.Sequential(), you just need to override it # on a custom class and define the layers in __init__() class NeuralNetwork(Sequential): def __init__(self, nl=4): super().__init__() self.add(Dense(10,input_shape=(None,4),activation='tanh')) if nl >= 4: for i in range(0, nl-2): self.add(Dense(8,activation='tanh')) self.add(Dense(3,activation='softmax')) model = NeuralNetwork

1b. Dataset creation

First, we need to upload the Iris dataset on Databricks, on your databricks platform, click on Data on the sidebar and then Create Table, and upload Iris.csv that can be downloaded here

In order to use Maggy, we have to pass the training set, test set and optionally a function for data processing to the configuration file. 

The training and test sets can be:

  1. A tuple like train_set = (X_train, y_train). X_train and y_train can be a list, a numpy array or a TensorFlow dataset.
  2. The path to the training and test sets. In this case, you also need to provide a data process function containing the instructions to consume and transform the data, as per the following code snippet.

-- CODE language-bash -- def process_data(train_set_path, test_set_path): import pandas as pd import numpy as np from sklearn.preprocessing import LabelEncoder from sklearn.model_selection import train_test_split dataset_path = "dbfs:/FileStore/tables/Iris.csv" train_set, test_set ="csv")\ .option("header","true")\ .option("inferSchema", "true")\ .load(dataset_path).drop('_c0').randomSplit((0.80, 0.20), seed=0) raw_train_set = train_set.toPandas().values raw_test_set = test_set.toPandas().values X_train = train_set[:,0:4] y_train = train_set[:,4:] X_test = test_set[:,0:4] y_test = test_set[:,4:] return (X_train, y_train), (X_test, y_test)

1c. Create an HPO function

We now wrap the code containing the logics of your experiment in a function.

For HPO, we have to define a function that has the HPs to be optimized as parameters. Inside the function we simply put the training logic as we were training our model in a single machine using Tensorflow. Maggy will run this function multiple times using different parameters for you, as we will see in section 3a.

-- CODE language-bash -- def hpo_function(number_layers, reporter): model = NeuralNetwork(nl=number_layers) #fitting the model and predicting model.compile(Adam(lr=0.04),'categorical_crossentropy', metrics=['accuracy']) train_input, test_input = process_data(raw_train_set, raw_test_set) train_batch_size = 75 test_batch_size = 15 epochs = 10[0], y=train_input[1], batch_size=train_batch_size, epochs=epochs, verbose=1) score = model.evaluate(x=test_input[0], y=test_input[1], batch_size=test_batch_size, verbose=1) print(f'Test loss: {score[0]}') print(f'Test accuracy: {score[1]}') return score[1]

1d. Create an oblivious training function

The training function provides the instruction to run the training and evaluation of your model, given the data passed in the configuration. You just need to wrap the instructions you implemented and eventual hyperparameters (for example the values to pass in the model constructor). The training function has to return a value or a list of values that corresponds to the evaluation results.

-- CODE language-bash -- def training_function(model, train_set, test_set, hparams): model = model() # fitting the model and predicting model.compile(Adam(lr=0.04), 'categorical_crossentropy', metrics=['accuracy'] ),epochs=20) accuracy = model.evaluate(test_set) return accuracy

It's important to note that in the HPO function we did not pass the model as a parameter while we did that in our oblivious training function. This is because, when using Maggy for distributed training, the library has to patch some functions of the model class.

2a. Configure Maggy for HPO

The next step is to create a configuration instance for Maggy. Since in this example we are using Maggy for hyperparameter optimization and distributed training using TensorFlow, we will use OptimizationConfig and TfDistributedConfig. 

OptimizationConfig contains the information about the hyperparameter optimization. 

We need to define a Searchspace class that contains the hyperparameters we want to optimize. In this example we want to search for the optimal number of layers of the neural network from 2 to 8 layers. 

OptimizationConfig contains the following parameters:

  • num_trials: Controls how many separate runs are conducted during the hp search.
  • optimizer: Optimizer type for searching the hp searchspace.
  • searchspace: A Searchspace object configuring the names, types and ranges of hps.
  • optimization_key: Name of the metric to use for hp search evaluation.
  • direction: Direction of optimization.
  • es_interval: Early stopping polling frequency during an experiment run.
  • es_min: Minimum number of experiments to conduct before starting the early stopping mechanism. Useful to establish a baseline for performance estimates.
  • es_policy: Early stopping policy which formulates a rule for triggering aborts.
  • name: Experiment name.
  • description: A description of the experiment.
  • hb_interval: Heartbeat interval with which the server is polling.

-- CODE language-bash -- from maggy.experiment_config import OptimizationConfig from maggy import Searchspace sp = Searchspace(number_layers=('INTEGER', [2, 8])) hpo_config = OptimizationConfig(num_trials=4, optimizer="randomsearch", searchspace=sp, direction="max", es_interval=1, es_min=5, name="hp_tuning_test", )

2b. Run distributed HPO

Our HPO function and configuration class are now ready, so we can go on and run the HPO experiment. In order to do that, we run the lagom function, passing our training function and the configuration object we instantiated during the last step.

If you are wondering what lagom means, Lagom is a swedish word representing some cultural aspects of balance and equilibrium, in english could be translated as "just the right amount" or "less is more".

-- CODE language-bash -- from maggy import experiment result = experiment.lagom(train_fn=hpo_function, config=hpo_config)

This function will print the HPO summary. As you can see, there are several values returned, we are most interested in the 'best_config' dictionary, it contains the parameters for which the model performed the best. 

3a. Configure distributed training

Now it's time to run the final step of our ML program. Let's initialize the configuration class for the distributed training. First, we need to define our hyperparameters, we want to take the best hyperparameters from the HPO.

TfDistributedConfig class has the following parameters:

  • name: the name of the experiment.
  • module: the model to be trained (defined in the first step of this guideline).
  • train_set: the train set as a tuple (x_train, y_train) or the train set path.
  • test_set: the test set as a tuple (x_test, y_test) or the test set path.
  • process_data: the function to process the data (if needed).

hparams: the model and dataset parameters. In this case we also need to provide the 'train_batch_size' and the 'test_batch_size', these values represent the subset sizes of the sharded dataset. It's value is usually the dataset_size/number_workers but can change depending on your needs.

-- CODE language-bash -- from maggy.experiment_config.tf_distributed import TfDistributedConfig #define the constructor parameters of your model model_params = { #train dataset entries / num_workers 'train_batch_size': 75, #test dataset entries / num_workers 'test_batch_size': 15, 'learning_rate': 0.04, 'epochs': 20, 'number_layers': result['best_config']['number_layers'], } training_config = TfDistributedConfig(name="tf_test", model=model, train_set=train_set, test_set=test_set, process_data=process_data, hparams = model_params)

3b. Run distributed training

Finally, let's run the distributed training using the lagom function.

-- CODE language-bash -- experiment.lagom(training_function, training_config)

Maggy will run the distributed training using the number of workers and resources available to the cluster defined. Finally, it will prompt the test results. The training log can be found in the Spark UI on Databricks.

Try it now!

Maggy is open-source and everyone can contribute to the project. Feel free to experiment with the library and contact us for any questions or issues. You can reach out to us via GitHub or the hopsworks community. You can also give us a star on GitHub to let us know you appreciate our work.


In this article we saw how to train a ML model in a distributed fashion without reformatting our code, thanks to Maggy. Maggy is available on If you want to know more about how to develop your ML projects faster, you may want to check out this article.

One function is all you need for ML Experiments

Robin Andersson

TLDR; Hopsworks provides support for machine learning (ML) experiments. That is, it can automatically track the artifacts, graphs, performance, logs, metadata, and dependencies of your ML programs.Many of you already know about platforms like MLflow, so why should you read about Hopsworks Experiments?  Because you do not have to rewrite your TensorFlow/PyTorch/Scikit-learn programs to get tracking and distributed ML for free, and TensorBoard comes built-in. We discuss how Hopsworks uniquely supports implicit provenance to transparently create metadata and how it is combined with the oblivious training function to make your training distribution transparent. 

Hopsworks Introduction

Hopsworks is a single platform for both data science and data engineering that is available as both an open-source platform and a SaaS platform, including a built-in feature store. You can train models on GPUs at scale, easily install any Python libraries you want using pip/conda, run Jupyter notebooks as jobs, put those jobs in Airflow pipelines, and even write (Py)Spark or Flink applications that run at scale. 

As a development environment, Hopsworks provides a central, collaborative development environment that enables machine learning teams to easily share results and experiments with teammates or generate reports for project stakeholders. All resources have strong security, data governance, backup and high availability support in Hopsworks, while assets are stored in a single distributed file system (with data stored on S3 in the cloud).

A Hopsworks ML experiment stores information about your ML training run: logs, images, metrics of interest (accuracy, loss), the program used to train the model, its input training data, and the conda dependencies used. Optional outputs are hyperparameters, a TensorBoard, and a Spark history server.

The logs of each hyperparameter trial are retrieved by clicking on its log, and TensorBoard visualizes the different trials results. The TensorBoard HParams plugin is also available to drill down further on the trials.


When you run a Python or PySpark application on the Hopsworks platform, it can create an experiment that includes both the traditional information a program generates (results, logs, errors) as well as ML-specific information to help track, debug, and reproduce your program and its inputs and outputs:

  • hyperparameters: parameters for training runs that are not updated by the ML programs themselves; 
  • metrics: the loss or accuracy of the model(s) trained in this experiment;
  • program artifacts: python/pyspark/airflow programs, and their conda environments;
  • model artifacts: serialized model objects, model schemas, and model checkpoints;
  • executions: information to be able to re-execute the experiment, including parameters, versioned features for input, output files,  etc; 
  • versioned features: to be able to reproduce an experiment, we need the exact training/test data from the run and how it was created from the feature store;
  • visualizations: images generated during training and score. Also use TensorBoard to visualize training runs - Hopsworks aggregates results from all workers transparently;
  • logs (for debugging): model weights, gradients, losses, optimizer state;
  • custom metadata: tag experiments and free-text search for them, govern experiments (label as ‘PII’, ‘data-retention-period’, etc), and reproduce training runs.

Experiment Tracking and Distributed ML in One Library

-- CODE language-bash -- def train(data_path, max_depth, min_child_weight, estimators): X_train, X_test, y_train, y_test = build_data(..) ... print("hello world") # monkeypatched - prints in notebook ..., y_train) # auto-logging ... hops.export_model(model, "tensorflow",..,model_name) ... # create local files ‘logile.txt’, ‘diagram.png’ return {'accuracy': accuracy, 'loss': loss, 'logfile': 'logfile.txt', 'diagram': 'diagram.png'} # track dict from maggy import experiment experiment.lagom(train, name="My Experiment", ...) # To launch as a distributed ML HParam Tuning job: # sp=Searchspace(max_depth=('INTEGER',[2,8]),min_child_weight # =('INTEGER', [2, 8]), ) # experiment.lagom(train, name=“HP, optimizer='randomsearch', # direction='max', num_trials=15,)

Platforms that support experiment tracking require the user to refactor their training code in a function or some explicit scope (such as “with … as xx:” in MLFlow, see Appendix A) to identify when an experiment begins and when an experiment ends. In Hopsworks, we require the developer to write their training code inside a function. 

We call this Python function an oblivious training function because the function is oblivious of whether it is being run on a Python kernel in a Jupyter notebook or on many workers in a cluster, see our blog and Spark/AI summit talk for details. That is, you write your training code once and reuse the same function when training a small model on your laptop or when performing hyperparameter tuning or distributed training on a large cluster of GPUs or CPUs.

We double down on this “wrapper” Python function by also using it to start/stop experiment tracking. Experiment tracking and distribution transparency in a single function, nice! 

In Hopsworks, the Maggy library runs experiments, see code snippet above. As you can see, the only code changes a user needed compared to a best-practice TensorFlow program are: 

  1. factor the training code in a user-defined function (def train(..):);
  2. return a Python dict containing the results, images, and files that the user wants to be tracked for the experiment and accessible later in the Experiments UI; and
  3. invoke the training function using the experiment.lagom function.

The hyperparameters can be fixed for a single execution run, or as shown in the last 4 lines of the code snippet, you can execute the train function as a distributed hyperparameter tuning job across many workers in parallel (with GPUs, if needed). 

Hopsworks will automatically:

  • track all parameters of the train function as hyperparameters for this experiment, 
  • auto-log using Keras callbacks in;
  • create a versioned directory in HopsFS, where a copy of the program, its conda environment, and all logs from all workers are aggregated;
  • track all provenance information for this application - input data from HopsFS used in this experiment (train/test datasets from the Feature Store), and all output artifacts (models, model checkpoints, application logs);
  • redirect all print statements executed in workers to the Jupyter notebook cell for easier debugging (see GIF below - each print statement is prefixed by the worker ID).
In Hopsworks, logs from workers can be printed in your Jupyter notebook during training. Take that Databricks!

TensorBoard support

-- CODE language-bash -- def train(): from maggy import tensorboard ..., callbacks=[TensorBoard(log_dir=tensorboard.logdir(),..)], ...)

TensorBoard is arguably the most common and powerful tool used to visualize, profile and debug machine learning experiments. Hopsworks Experiments integrates seamlessly with TensorBoard. Inside the training function, the data scientist can simply import the tensorboard python module and get the folder location to write all the TensorBoard files. The content of the folder is then collected from each Executor and placed in the experiment directory in HopsFS. As TensorBoard supports showing multiple experiment runs in the same graph, visualizing and comparing multiple hyperparameter combinations becomes as simple as starting the TensorBoard integrated in the Experiments service. By default, Tensorboard is configured with useful plugins such as HParam, Profiler, and Debugging. 

Profiling and debugging

Hopsworks 1.4.0 comes with TensorFlow 2.3, which includes the TensorFlow profiler. A new long-awaited feature that finally allows users to profile model training to identify bottlenecks in the training process such as slow data loading or poor operation placement in CPU + GPU configurations. 

TensorFlow 2.3 also includes Debugger V2, making it easy to find model issues such as NaN which are non-trivial to find the root cause of in complex models.

Model Registry

In the training code models may be exported and saved to HopsFS. Using the model python module in the hops library, it is easy to version and attach meaningful metadata to models to reflect the performance of a given model version. 

The Hopsworks Model Registry, is a service where all models are listed in addition to useful information such as which user created the model, different versions, time of creation and evaluation metrics such as accuracy. 

The Model Registry provides functionality to filter based on the model name, version number and the user that exported the model. Furthermore the evaluation metrics of model versions can be sorted in the UI to find the best version for a given model. 

In the Model Registry UI, you can also navigate to the experiment used to train the model, and from there to the train/test data used to train the model, and from there to the features in the feature store used to create the train/test data. Thanks, provenance!

Exporting a model

A model can be exported programmatically by using the export function in the model module. Prior to exporting the model, the experiment needs to have written a model to a folder or to a path on HopsFS. Then that path is supplied to the function along with the name of the model and the evaluation metrics that should be attached. The export call will upload the contents of the folder to your Models dataset and it will also appear in the Model Registry with an incrementing version number for each export.

-- CODE language-bash -- from hops import model # local path to directory containing model (e.g. .pb or .pk) path = os.getcwd() + “/model_dir” # uploads path to the model repository, metadata is a dict of metrics model.export(path, “mnist”, metrics={‘accuracy’: acc})

Get the best model version

When deploying a model to real-time serving infrastructure or loading a model for offline batch inference, applications can query the model repository to find the best version based on metadata attached to the model versions - such as the accuracy of the model. In the following example, the model version for MNIST with the highest accuracy is returned.

-- CODE language-bash -- from hops import model F from hops.model import Metric MODEL_NAME=”mnist” EVALUATION_METRIC=”accuracy” best_model = model.get_best_model(MODEL_NAME, EVALUATION_METRIC, Metric.MAX) print(‘Model name: ‘ + best_model[‘name’]) print(‘Model version: ‘ + str(best_model[‘version])) print(best_model[‘metrics’])

The Devil is in the Details

That was the brief overview of Hopsworks Experiments and the Model Registry. You can now try it out on or install Hopsworks Community or Enterprise on any servers or VMs you can get your hands on. If you want to read more about how we implemented the plumbing, then read on.

Transparent Distributed ML with PySpark

Hopsworks uses PySpark to transparently distribute the oblivious training function for execution on workers. If GPUs are used by workers, Spark allocates GPUs to workers, and dynamic executors are supported which ensures that GPUs are released after the training function has returned, read more here. This enables you to keep your notebook open and interactively visualize results from training, without having to worry that you are still paying for the GPUs. 

The advantage of the Hopsworks programming model, compared to approaches where training code is supplied as Docker images such as AWS Sagemaker, is that you can write custom training code in place and debug it directly in your notebook. You also don’t need to write Dockerfiles for training code, and Python dependencies are managed by simply installing libraries using PIP or Conda from the Hopsworks UI (we compile the Docker images transparently for you).

The oblivious training function can run in different execution contexts: on a Jupyter notebook in a Python kernel (far left), for parallel ML experiments (middle), and for collective allreduce data parallel training (far right). Maggy and Hopsworks take care of complex tasks such as scheduling tasks, collecting results, and generating new hyperparameter trials.

HopsFS stores experiment data and logs generated by workers during training. When an experiment is started through the API, a subfolder in the Experiments dataset in HopsFS is created and metadata about the experiment is attached to the folder. Hopsworks automatically synchronizes this metadata to elasticsearch using implicit provenance. 

The metadata may include information such as the name of the experiment, type of the experiment, the exported model, and so on. As the existence of an experiment is tracked by a directory, it also means that deleting a folder also deletes the experiment as well as its associated metadata from the tracking service. 

Tracking metadata with Implicit Provenance

Existing systems for tracking the lineage of ML artifacts, such as TensorFlow Extended or MLFlow, require developers to change their application or library code to log tracking events to an external metadata store. 

In Hopsworks, we primarily use implicit provenance to capture metadata, where we instrument our distributed file system, HopsFS, and some libraries to capture changes to ML artifacts, requiring minimal code changes to standard TensorFlow, PyTorch, or Scikit-learn programs (see details in our USENIX OpML’20 paper). 

File system events such as reading features from a train/test dataset and saving a model to a directory implicitly recorded as metadata in HopsFS and then transparently indexed in Elasticsearch. This enables free-text search for ML artifacts, metadata, and experiments in the UI.

Experiments in Hopsworks are the first part of a ML training pipeline that starts at the Feature Store and ends at model serving. ML Artifacts (train/test datasets, experiments, models, etc) can be stored on HopsFS, and they can also have custom metadata attached to them. 

The custom metadata is tightly coupled to the artifact (remove the file, and its metadata is automatically cleaned up) - this is achieved by storing the metadata in the same scaleout metadata layer used by HopsFS. This custom metadata is also automatically synchronized to Elasticsearch (using a service called ePipe), enabling free-text search for metadata in Hopsworks.

That’s all for now Folks!

Of all the developer tools for Data Science, platforms for managing ML experiments have seen the most innovation in recent years. Open-source platforms have appeared, such as MLFlow and our Hopsworks platform, alongside proprietary SaaS offerings such as WandB, Neptune,, and Valohai. 

What makes Hopsworks Experiments different? You can write clean Python code and get experiment tracking and distributed ML for free with the help of implicit provenance and the oblivious training function, respectively. 

There is growing consensus that platforms should keep track of what goes in and out of ML experiments for both debugging and reproducibility. You can instrument your code to keep track of inputs/outputs, or you can let the framework manage it for you with implicit provenance. 

Hopsworks Experiments are a key component in our mission to reduce the complexity of putting ML in production. Further groundbreaking innovations are coming in the next few months in the areas of real-time feature engineering and monitoring operational models. Stay tuned!

Appendix A

In the code snippet below, we compare how you write a Hopsworks experiment with MLFlow. There are more similarities than differences, but explicit logging to a tracking server is not needed in Hopsworks.

-- CODE language-bash -- def train(data_path, max_depth, min_child_weight, estimators): X_train, X_test, y_train, y_test = build_data(..) ... print("hello world") # monkeypatched - prints in notebook ..., y_train) # auto-logging ... hops.export_model(model, "tensorflow",..,model_name) ... # create local files ‘logile.txt’, ‘diagram.png’ return {'accuracy': accuracy, 'loss': loss, 'logfile': 'logfile.txt', 'diagram': 'diagram.png'} # track dict from maggy import experiment experiment.lagom(train, name="My Experiment", ...) # To launch as a distributed ML HParam Tuning job: # sp=Searchspace(max_depth=('INTEGER',[2,8]),min_child_weight # =('INTEGER', [2, 8]), ) # experiment.lagom(train, name=“HP, optimizer='randomsearch', # direction='max', num_trials=15,)

-- CODE language-bash -- def train(data_path, max_depth, min_child_weight, estimators, model_name): # distribution external X_train, X_test, y_train, y_test = build_data(..) mlflow.set_tracking_uri("jdbc:mysql://username:password@host:3306/database") mlflow.set_experiment("My Experiment") with mlflow.start_run() as run: ... mlflow.log_param("max_depth", max_depth) mlflow.log_param("min_child_weight", min_child_weight) mlflow.log_param("estimators", estimators) with open("test.txt", "w") as f: f.write("hello world!") mlflow.log_artifacts("/full/path/to/test.txt") ..., y_train) # auto-logging ... mlflow.tensorflow.log_model(model, "tensorflow-model", registered_model_name=model_name)

Like MLFlow, but better?

Appendix B

Pipelines are the program that orchestrates the execution of an end-to-end training and model deployment job. In Hopsworks, you can run Jupyter notebooks as schedulable Jobs in Hopsworks, and these jobs can be run as part of an Airflow pipeline (Airflow also comes as part of Hopsworks). After pipeline runs, data scientists can quickly inspect the training results in the Experiments service. 

The typical steps that make up a full training-and-deploy pipeline include:

  • materialization of train/test data by selecting features from a feature store, 
  • model training on the train/test data and export the model to the Model Registry,
  • evaluation and validation of the model and if it passes robustness, bias, and accuracy tests, model deployment.

Unifying Single-host and Distributed Machine Learning with Maggy

Moritz Meister

This blog covers the oblivious training function and the internals of Maggy presented at Spark+AI Summit 2020, on June 26th.

TLDR; Maggy is an open-source framework for distributed machine learning. In this post, we introduce a new unified framework for writing core ML training logic as “oblivious training functions”. Maggy enables you to reuse the same training code whether training small models on your laptop or reusing the same code to scale out hyperparameter tuning or distributed deep learning on a cluster. Maggy enables the replacement of the current waterfall development process for distributed ML applications, where code is rewritten at every stage, with an iterative development process.

Most of the publicly available ML source code for training models is not built to scale-out on many servers or GPUs. Getting started with deep learning is relatively easy these days, thanks to, GitHub, and the blogosphere. The hard part for practitioners starts when the code examples found online need to be applied to more challenging domains, with larger and custom datasets, which in turn will require a bigger customized version of the model to fit that dataset. Using publicly available code as a starting point for model development on clusters, you will end up in a process similar to the one depicted in Figure 1.

Figure 1: A simplified view of the ML model development process, illustrating its iterative nature.

The software development process for ML models is rarely the perfect waterfall development model, as shown in Figure 1 without the green arrows. In the (discredited) waterfall development process, you would start out with requirements, then move on to design, implementation and test. The (current!) equivalent process in ML model development is the following, as shown in Figure 1 with the green arrows. You start out on your local machine with a subset of the data in order to explore and design the model architecture. Then you move to use a cluster of resources (such as GPUs) to more quickly find hyperparameters, run lots of parallel ablation studies (many skip this stage!), and finally scale out the training of the model on the large dataset using lots of resources. Then, you’re done, right? Wrong! You typically iterate through the stages, finding better hyperparameters, adding new features, rewriting for distribution, going from your laptop to the cluster and back again.

We rewrite our model training code for distribution as it offers many benefits – faster training of models using more GPUs, parallelizing hyperparameter tuning over many GPUs, and parallelizing ablation studies to help understand the behaviour and performance of deep neural networks. However, not only will the boiler plate model training code need to be modified, but as you move along the process, distribution will introduce additional obtrusive code artifacts and modifications, depending on the frameworks used. This will lead to a mix of infrastructure code and model code, with duplicated training logic, hyperparameters hard-coded into the training loop, additional tracking code to keep record of your changes and config files for experiments:

Figure 2: Model development creates a mix of code artefacts duplicating code for every step, making iterative development hard.

With such a code base, iterating becomes near impossible as it requires adapting many copies of redundant code. And finally, imagine handing the code off to an ML engineer to productionize the model.

The Oblivious Training Function

Figure 3: The oblivious training function makes training code reusable among all steps of the process.

We introduce an open-source framework, Maggy, that enables write-once training functions that can be reused in single-host Python programs and cluster-scale PySpark or Distributed TensorFlow programs. Training functions written with Maggy look like best-practice TensorFlow programs where we factor out dependencies using popular programming idioms (such as functions to generate models and data batches). We call this new abstraction for ML model development the oblivious training function, as the core model training logic supports distribution transparency, that is, the training code is not aware (oblivious) of whether it is being run on a single host or whether it is being executed on hundreds of devices in parallel. 

What does it mean for training code to be distribution transparent?

Transparency in distributed systems refers to hiding distribution-specific aspects of an application from the developer - for example, a developer invoking a function may not know (or need to know) if the function she is calling is local to her application or on a remote server. This means, distribution transparency enables developers to write code that is reusable between single-host and distributed instantiations of a program:

Figure 4: Distribution Transparency hides complexities related to distribution from the developer, making the same code executable on a single-host as well as in a large cluster. Transparency leads to DRY training code.

Building Blocks for Distribution Transparency

How does ML code have to be structured in order to be transparently distributed? Firstly, developers have to follow best practices and, secondly, developers must be aware of the difference between distribution contexts, that is, what characterizes, for example, distributed hyperparameter tuning vs. distributed training.

1. ML Development Best Practices:

The ML community has recently developed some best practices, which are already widely spread among developers. Taking a look at the new well-illustrated Keras Guides, you will notice a common approach with four techniques.

  • Modularize: By modularizing code into reusable functions, these functions become building blocks, making the code pluggable in order to construct different configurations of the model for hyperparameter optimization or ablation.
  • Parametrize: Instead of hardcoding parameters such as learning rate, regularization penalty or other hyperparameters, developers are encouraged to replace this with variables whenever possible, to have a single place for them to be changed.
  • Higher order training functions: instead of using instantiated objects for example for the training dataset, the input logic related to the data can be encapsulated in a function which is being used by a higher order function. By doing so also the data input pipeline can be parametrized. The same holds for the generation of the model, which can be encapsulated in a function returning the model.
  • Usage of callbacks at runtime: In order to be able to intercept and interact with the actual training loop, most ML frameworks such as TensorFlow and PyTorch offer the possibility to use callback functions that are  invoked by the framework at certain points in time during training, such as at the end of every epoch or batch. Callback functions  enable  runtime monitoring of training, and can, for example, also be used to add support to stop the training early (important in hyperparameter optimization).

2. Distribution Context

While a single-host environment is self-explanatory, there is a difference between the context of ML experiments, such as hyperparameter optimization or parallel ablation studies, and the distributed training of a single model. Both hyperparameter optimization and parallel ablation studies have weak scaling requirements (also known as embarrassingly parallel), because all workers execute independent pieces of work and have limited communication. For example, hyperparameter tuning involves training independent copies of the model with different hyperparameters or different architectures, in order to find the best performing configuration. Distributed training, however, is strong scaling, as it introduces  significant communication and coordination between the workers. As workers are training a single model, they continually exchange gradients, which are computed on independent shards of data (data parallel training). Many distributed training problems, in fact, become (network or disk) I/O bound as they scale.  Figure 5 illustrates the three contexts and the step in the model development process that they are applicable to.

Figure 5: Single-host vs. parallel multi-host vs. distributed multi-host context and their applicability to the steps of the process.

Being aware of the different contexts and applying popular programming idioms, it becomes apparent what it means for the oblivious training function. It is no longer the developer herself who instantiates and launches the training function, but the framework that will invoke the training function as it is aware of the current context and it will take care of the distribution related complexities. That means, for exploration, the framework can be used to fix all parameters. For hyperparameter optimization experiments, the framework will take care of generating potentially good hyperparameter combinations and parameterizing the oblivious training function with them to be launched on different workers. For distributed training, it means setting up the environment for workers to discover each other and wrapping the model code with a distribution strategy.

Figure 6: The oblivious training function as an abstraction allows us to let the system take care of distributed system related complexities.

Putting it all together

Having the building blocks at hand, how do we write the model training code in Maggy? Let us take a look at the latest best-practices MNIST example that already factors the model configuration, dataset preparation and training logic into functions. Building on this example, we will show the modifications to the code that are needed to construct an oblivious training function in Maggy. It is important to note that all modifications are still vanilla Python code, and can, therefore, be run as is on a single host environment. Let’s start with the boiler plate with the two functions and the training logic:

1. Model Definition

def get_model():
   model = tf.keras.Sequential([
       tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
       tf.keras.layers.Conv2D(32, name='conv1', kernel_size=3, activation='relu'),
       tf.keras.layers.Conv2D(64, name='conv2', kernel_size=2, activation='relu'),
       tf.keras.layers.MaxPooling2D(name='pool1', pool_size=2),
       tf.keras.layers.Dense(128, activation='relu'),
   return model

2. Data set generation

def get_dataset():
   batch_size = 32
   num_val_samples = 10000    # Return the MNIST dataset in the form of a ``.
   (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()    # Preprocess the data (these are Numpy arrays)
   x_train = x_train.reshape(-1, 784).astype("float32") / 255
   x_test = x_test.reshape(-1, 784).astype("float32") / 255
   y_train = y_train.astype("float32")
   y_test = y_test.astype("float32")    # Reserve num_val_samples samples for validation
   x_val = x_train[-num_val_samples:]
   y_val = y_train[-num_val_samples:]
   x_train = x_train[:-num_val_samples]
   y_train = y_train[:-num_val_samples]
   return (, y_train))
              .shuffle(50000).repeat().batch(batch_size),, y_val)).
              .shuffle(10000).repeat().batch(batch_size),, y_test))

3. Training logic

model = get_model()
   metrics=['accuracy', tf.keras.metrics.SparseCategoricalAccuracy()],
)# Train the model on all available devices.
train_dataset, val_dataset, test_dataset = get_dataset(), epochs=2, steps_per_epoch=1000, validation_data=val_dataset, validation_steps=100)# Test the model on all available devices.
metrics = model.evaluate(test_dataset, steps=100)
metric_dict = {out: metrics[i] for i, out in enumerate(model.metrics_names)}

1. Model generation

We are parametrizing the model itself, by replacing hyperparameters with arguments.

Parametrizing the model definion

2. Dataset generation

The dataset generation function stays unchanged in this case, but similar to the model, this function could be parametrized

def get_dataset():
   batch_size = 32
   num_val_samples = 10000    # Return the MNIST dataset in the form of a ``.
   (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()    # Preprocess the data (these are Numpy arrays)
   x_train = x_train.reshape(-1, 784).astype("float32") / 255
   x_test = x_test.reshape(-1, 784).astype("float32") / 255
   y_train = y_train.astype("float32")
   y_test = y_test.astype("float32")    # Reserve num_val_samples samples for validation
   x_val = x_train[-num_val_samples:]
   y_val = y_train[-num_val_samples:]
   x_train = x_train[:-num_val_samples]
   y_train = y_train[:-num_val_samples]
   return (, y_train))
              .shuffle(50000).repeat().batch(batch_size),, y_val)).
              .shuffle(10000).repeat().batch(batch_size),, y_test))

3. Training logic

The training logic is wrapped in a parametrized and pluggable function, the oblivious training function. Again, hyperparameters are passed as arguments to the function. Additionally, the dataset and model generation functions are replaced with arguments, in order to be able to let the system, for example, replace the dataset generator with an alternative one - we use this to drop features for ablation studies. Last, but not least, the training function should return its current performance as a metric to be optimized in hyperparameter optimization. This is needed to make Maggy aware of the desired optimization metric.

Adjust Training Logic to be callable with different parameters

Note that up to this point, all modifications are pure Python code and, hence, the training function can still be run in a single host environment by calling it yourself in a Notebook with a fixed set of parameters and by passing the model and dataset generation functions as arguments.

Finally, to execute the function in a different distribution context, Maggy is used:

from maggy import experiment
experiment.set_model_generator(gen_model)# Hyperparameter optimization
experiment.set_context('optimization', 'randomsearch', searchspace)
result = experiment.lagom(train_fun)
params = result.get('best_hp')# Distributed Training
experiment.set_context('dist_training', 'MultiWorkerMirroredStrategy', params)
experiment.lagom(train_fun)# Ablation study
experiment.set_context('ablation', 'loco', ablation_study, params)

Maggy requires additional configuration information for hyperparameter optimization, such as a search space definition and the optimization strategy to be used. In the case of distributed training, the distribution strategy is needed as well as a set of parameters to fix the model to. These parameters can either be taken from the previous hyperparameter tuning experiments or input manually. Lagom is the API to launch the function on a Spark cluster. 

Future Work

You can try out Maggy for hyperparameter optimization or ablation studies now on and keep an eye on Maggy's GitHub repo for the oblivious training function to be released as a pure Spark version or wait until the next release of Hopsworks, that will include full support. Maggy is still a project under heavy development and our mission with Maggy is to provide a new way of writing machine learning applications that reduces the burden on Data Scientists becoming distributed systems experts. By following the best practices we are able to keep the high-level APIs of frameworks like Keras and PyTorch free of distribution obtrusive code. 


In this blog, we introduced a new feature to an open-source framework, Maggy, that enables write-once training functions that can be reused in single-host Python programs and cluster-scale PySpark programs. Training functions written with Maggy look like best-practice TensorFlow programs where we factor out dependencies using popular programming idioms (such as functions to generate models and data batches). In a single Jupyter notebook, developers can mix vanilla Python code to develop and test models on their laptop with PySpark-specific cells that can be run when a cluster is available using a PySpark kernel, such as Sparkmagic. This way, iterative development of deep learning models now becomes possible, moving from the laptop to the cluster and back again, with DRY code in the training function – as all phases reuse the same training code.

Watch our demo presented at the Spark+AI Summit 2020


Meister et al. (2020). Towards Distribution Transparency for Supervised ML With Oblivious Training Functions. Published in the MLOps workshop of MLSys’20.

Hello Asynchronous Search for PySpark, published on October 19, 2019.

AI & Deep Learning for Fraud & AML

Jim Dowling

tl;dr Deep learning is now the state-of-the-art technique for identifying financial transactions suspected of money laundering. It delivers a lower number of false positives and with higher accuracy than traditional rule-based approaches. However, even though labelled datasets of historical transactions are available at many financial institutions, supervised machine learning is not a viable approach due to the massive imbalance between the number of “good” and “bad” transactions. The answer is to go unsupervised and welcome with open arms GANs and graph embeddings.

Financial institutions invest huge amounts of resources in both identifying and preventing money laundering and fraud. Many companies have systems that automatically flag financial transactions as ‘suspect’ using a database of static rules that generate alerts if they match for a transaction, see Figure below. Flagged transactions can be either withheld and/or investigated by human investigators. As anti-money laundering (AML) is a pattern matching problem, and many institutions have terabytes (TBs) of labelled data for historical transactions (transactions are labelled as either ‘good’ or ‘bad’), many banks and companies are investigating deep learning as a potential technology for classifying transactions as "good" or "bad", see Figure below.  However, supervised machine learning is not a viable approach due to the typical imbalance between the number of “good” and “bad” transactions, where you may only have 1 “bad” transaction for every million transaction or more.

What are the alternatives? Well, in 2019, unsupervised learning and self-supervised learning have taken deep learning by storm, with self-supervised solutions now the state-of-the-art systems for both Natural Language Processing (NLP), RobertA by Facebook, and image classification for ImageNet, Noisey Student and AdvProp both by Quoc Le’s team at Google. Self-supervised learning is autonomous supervised learning, that doesn’t always require labelled data, as self-supervised learning systems extract and use some form of feature or metadata as a supervisory signal. In contrast, unsupervised learning has a long history of being used for anomaly detection, and in this blog we describe how we have worked on a AML solution based on Deep Learning, and how we moved through unsupervised, self-supervised, to semi-supervised to arrive at our method of choice -  Generative Adversarial Networks (GANs).

Rules-based AML versus Deep-Learning AML using models.

The Class Imbalance Problem for AML

In the figure above, we can see the huge data imbalance between good and bad transactions found in a typical AML transaction dataset. A supervised deep learning system could be used to train a binary classifier that predicts with 99.9999% accuracy whether a transaction was involved in money laundering or not. Of course, it would always predict ‘good’, and only get wrong the very small number of ‘bad’ transactions! This classifier would, of course, be useless. The problem we are trying to solve is to predict correctly when ‘bad’ transactions are bad, and minimize the number of ‘good’ transactions that we predict as bad. In ML terminology, we say we want to maximize true positives (miss no bad transactions), and minimize false negatives (good transactions predicted as bad), see the confusion matrix below. False negatives get you in trouble with the regulator and authorities, false positives create unnecessary work and cost for your company.

Confusion matrix of our Binary Fraud Classifier with all possible predictions and their consequences. You could use a variant of the F1 score to evaluate models (precision and recall should not be weighted equally).

Our solution will have to address the class imbalance problem, as  you may have thousands or millions or good transactions for every one transaction that is known to be bad. Supervised machine learning algorithms, including deep learning, work best when the number of samples in each class are roughly equal. This is a consequence of the fact that most supervised ML algorithms maximize accuracy and reduce error. Unfortunately, over-sampling the minority class, undersampling the majority class, synthetic sampling, and cost-function approaches will not help us solve the class imbalance problem, due to the sheer scale of typical data imbalance. Other classical techniques such as One-Class support vector machines or Kernel Density Estimators (KDE) will not scale enough as they “require manual feature engineering to be effective on high-dimensional data and are limited in their scalability to large datasets”.

AML as Anomaly Detection

Anomaly detection follows quite naturally from a good unsupervised model

- Alex Graves (Deep Mind) at NeurIPS 2018.

This figure from Ruff et al visualizes different possible approaches to Anomaly detection from unsupervised to supervised to semi-supervised.

As AML problems are not easily amenable to supervised machine learning, much ongoing research and development concentrates on treating AML as an Anomaly detection (AD) problem. That is, how can we automatically identify suspected money-laundering transactions as anomalies? The domain of AML is also challenging due to its non-stationarity - new money transfer schemes are constantly being introduced into the market, whole countries may join or leave common payment areas or currencies, and, of course, new money launderingschemes are constantly appearing. AML is a living organism and any techniques developed need to be able to be quickly updated in response to changes in its environment. 

Traditional machine learning approaches to solving anomaly detection have involved using unsupervised learning approaches, see Figure above. An unsupervised approach to AML would be to find a “compact” description of the “good” class, with “bad” transactions being anomalies (not part of the “good” class). Examples of unsupervised approaches to AD include k-means clustering and principal component analysis (PCA). 

However, AML is not a classical use-case for anomaly detection techniques as we typically have labelled datasets - financial institutions typically know which transactions were “good” and which transactions were “bad”. We should, therefore, exploit that labelled data when training models. That is, we have semi-supervised learning - it is not fully unsupervised learning as there is some labeled training data available.

What are useful Features for AML and how do we manage them?

Before we get into semi-supervised ML, we need to discuss the features that can be used to train your AML model. Firstly, you should only train your AML model on features that will be accessible in your production AML binary classifier. If you access to features while in development that will never be usable in production, don’t even use them out of curiosity. Don’t kill the cat. If you are designing an AML classifier for an online application, then it is important that you know that you can access the features used to train your model at very low latency in production. Typically, this requires an online feature store, such as the Hopsworks Feature Store (see End-to-End figure later). If you are training a classifier for an offline batch application, then you typically do not need to worry about low latency access to feature data, as you will probably have feature engineering code in the batch application (or in the Feature Store) that pre-computes the features for your classifier.

The most obvious features that are available for AML are the features that arrive with the transaction itself: sending customer, receiving customer, sending/receiving bank, the amount of money being transferred, the date, etc). Other examples of useful features include how many transactions the sending/receiving account has executed in the last day, week, or month; whether the transaction date/time is “special” (holiday, weekend, etc), the graph of customers connected to the sender/receiver accounts over different time windows (last hour/day/week/month/year); credit scores for accounts, and so on. Typically, these features are not available in your online application and you do not want to rewrite your feature engineering code in your online AML classifier application, so you use the Feature Store to lookup those features at runtime when the transaction arrives. When you have queried those features from the Online Feature Store, you can join them (in the correct order) with the transaction-supplied features and then send  the complete feature vector to the network-hosted model for a prediction of whether the transaction is “good” or “bad”.

Image from Anomaly Detection at Spark/AI EU Summit 2019.

Semi-Supervised Learning for Anomaly detection

Semi-supervised learning is a class of machine learning tasks and techniques that also make use of unlabeled data for training – typically a small amount of labeled data with a large amount of unlabeled data.

- Wikipedia

In semi-supervised learning, we use many of the techniques from unsupervised learning - you train a good model of the data, then you can see how likely something is under that model. In AML, we could use, for example, one-class novelty detection, to train one GAN on the large class of ‘good’ transactions as the novelty detector, and another GAN that supports it by enhancing the inlier samples and distorting the outliers.

Generative Adversarial Networks (GANs) 

Generative models involve building a model of the world that enable you to act under uncertainty. More precisely, they are able to model complex and high dimensional distributions of real-world data.  We use GANs for anomaly detection by modeling the normal behavior of some training data using adversarial training and then detecting anomalies using an anomaly score. To evaluate our performance, we use Precision, Recall and F1-Score metrics.

Training a GAN with Transactions classified as ‘good’ and ‘bad’.

GANs are generally very difficult to train - there is a significant risk of mode-collapse - where the generator and discriminator do not learn from one another. You will need to carefully consider the architecture you use. To quote Marc Aurelio Ranzato (Facebook) at NeurIPS 2018 “the Convolutional Neural Network architecture is more important than how you train (GANs)”. You will need to dedicate time and GPU resources to hyperparameter tuning due to the sensitivity of GANs and the risk of mode-collapse. 

Hyperparameter Tuning for Training GANs in Hopsworks and Maggy

The Maggy framework on Hopsworks uses PySpark to distribute hyperparameter tuning tasks to GPUs running TensorFlow/Keras/PyTorch ocde. Maggy supports state-of-the-art asynchronous directed search, such as ASHA, as well as custom optimizers.

You need to have the tools and platform to do hyperparameter sweeps at scale with GANs, due to their sensitivity. Hopsworks uses PySpark and the Maggy framework to distribute hyperparameter trials to GPUs running TensorFlow/Keras/PyTorch code. Hopsworks/Maggy supports state-of-the-art asynchronous directed hyperparameter search, which can provide up to 80% more efficient use of GPUs when tuning sensitive GANs, compared to other hyperparameter tuning approaches using PySpark. Read our Maggy blog post to find out more.

Similar to MLFlow, Hopsworks also provides an experiment service where you can manage all hyperparameter trials as experiments. Experiments supports Tensorboard, tabular results for trials, and any images generated during training. This makes training runs and models easier to analyse and reproduce.

Hopsworks manages your hyperparameter tuning experiments, making them easy to analyse and reproduce.

Putting your GAN in production

If you have now successfully trained a GAN that performs well on your test dataset (keep the last year/months of your transaction data as holdout data), you can save that the discriminator as a model for use on new transactions. Typically, you want further model validation - the what-if tool is a good way to evaluate black-box models in Jupyter notebooks on on Hopsworks - before you deploy your model from your model registry into production for either online applications or batch applications.

Save just the Discriminator from your original GAN to classify new transactions as either ‘good’ or ‘bad’.

Will GANs increase True Fraud Detection and reduce False Fraud Detection?

While there is no guarantee that GANs will magically make money laundering disappear overnight, they appear to be a much more powerful tool for correctly identifying fraudulent transactions and for minimizing the number of false alerts that need to be manually investigated. In China, GANs are already being used in production at two banks: “Two commercial banks have reduced losses of about 10 million RMB in twelve weeks and significantly improved their business reputation”, GAN-based telecom fraud detection at the receiving bank. Another example shown below was recently presented at the Spark/AI Summit EU 2019.

There is also the issue of explainability (particularly in the EU), where it is unclear how regulators will handle the use of black-box techniques in making fraud classification decisions. The first step to take is to augment existing systems with extra input from a GAN-based system, and evaluate for a period of time before discussing how to proceed with the regulators.

Expected results from using GANs (Anomaly Detection at Spark/AI EU Summit 2019)

End-to-End Workflow on Hopsworks for GANs

Hopsworks enables automated end-to-end ML workflows for taking historical financial transactions from production backend data systems (data warehouses, data lakes, Kafka), feature engineering with Spark/PySpark, feature storage in the Feature Store. For orchestrating such feature pipelines, Hopsworks includes Airflow as a service. Airflow is also used by Data Scientists to orchestrate Jupyter notebooks  for (1) selecting features to create train/test data, (2) finding good hyperparameters, (3) training a model, (4) evaluating the model before deploying it into production. Finally, online AML detection applications need to enrich their feature vectors with transaction window counts and other features not available locally. The Hopsworks’ online Feature Store is a low latency highly available database, based on MySQL Cluster, that enables online AML applications to access their pre-engineered feature data with very low latency (<10 ms). The Hopsworks Feature store is modular with a REST API, so all steps 1-5 in the above figure can either be run on the Hopsworks platform itself or on external platforms. You decide.


Deep learning is revolutionizing how we identify money laundering, and it has unique challenges related to huge data volumes and massive data imbalances. At Logical Clocks, we have worked on a semi-supervised approach based on GANs, enabled by Hopsworks that scales to process huge datasets, and graph embeddings at scale, using Spark. The result is new state-of-the-art results for money laundering. The next phase is to put this model into production.