Robin Andersson
Software engineer
Jim Dowling
CEO
TLDR; This article describes how we use dynamic executors in PySpark to ensure GPUs are only allocated to executors only when they are training neural networks. When training is finished, GPUs are immediately freed up for use by other applications in the cluster. We also make hyperparameter optimization fault-tolerant using Spark’s executor blacklisting capability.
Click here to see a Spark Summit talk on PySpark, TensorFlow, and Hops.
When developers use Tensorflow/Keras/PyTorch to train deep neural networks in Hops, Hops uses PySpark to transparently distribute the python code to containers with GPUs. Hops enables AutoML, automated search for good neural network architectures and hyperparameters by running parallel experiments on different combinations of hyperparameters and model architectures. In PySpark, Hops runs a different experiment on each executor – not all of the experiments will finish at the same time. Some experiments may finish early, some later. And GPUs cannot currently be shared (multiplexed) by concurrent applications. Population-based approaches for AutoML, typically proceed in stages or iterations, meaning all experiments wait for other experiments to finish, resulting in idle GPU time. That is, GPUs lie idle waiting for other experiments to finish.
As such, we have the problem of how to free up the GPUs as soon as its experiment is finished. Hops leverages dynamic executors in PySpark/YARN to free up the GPU(s) attached to an executor immediately if it sits idle waiting for other experiments to finish, ensuring that (expensive) GPUs are held no longer than needed.
Each Spark executor runs a local TensorFlow process. Hops also supports cluster-wide Conda for managing python library dependencies. Hops supports the creation of projects, and each project has its own conda environment, replicated at all hosts in the cluster. When you launch a PySpark job, it uses the local conda environment for that project. This way, users can install whatever libraries they like using conda and pip, and then use them directly inside Spark Executors. It makes programming PySpark one step closer to the single-host experience of programming Python. Hops also supports Jupyter and the SparkMagic kernel for running PySpark jobs.
YARN, Kubernetes, Mesos cannot ensure that GPU resources allocated to applications will be highly utilized. This is not a problem for traditional resources like CPU and memory. For example, if a workload of CPU-intensive applications hog CPU resources, but overprovision memory, this problem can be addressed by the resource scheduler over-provisioning the amount of memory available at a host. That is, if the host has 256GB of RAM, you can tell YARN the host has 350GB of memory available for applications. You should, of course, ensure the operating-system has enough virtual memory available to handle the configured level of over-provisioning, and, like an airline selling more seats than it has available, ensure that disk swapping almost never happens. Another alternative is to use a system like Dr Elephant that monitors applications’ resource utilization and warns applications when they overprovision memory, so that applications can be right-sized. This, of course, works best where workloads are predictable for a cluster, which is not always the case.
But GPUs are a special resource. We can’t overprovision them, because we can’t run multiple applications on GPUs at the same time. An application is given a GPU by a resource scheduler, and the resource scheduler trusts that the application will fully utilize the GPU while it has it. But as the Germans say, ‘trust is good, but control is better’. In Hops, we use Spark’s Dynamic Executors to ensure that GPUs are released immediately, when no longer needed.
def train_fn(learning_rate, dropout_rate):
…..
# tensorflow training code goes here
…..
args_dict = {'learning_rate': [0.001, 0.0005, 0.0001], 'dropout': [0.45, 0.7]}
experiment.grid_search(train_fn, args_dict)
The above code is an example of hyperparameter optimization for TensorFlow. The program will start run 6 instances of the train_fn, each a different combination of learning_rate and dropout parameters (see the Table below). If our PySpark application has allocated 6 executors then each executor will run one of hyperparameter combinations. It is also possible that fewer than 6 executors were allocated to the application. In that case, hyperparameter tasks (or trials) are placed in a queue, and executors pick them from the queue until all tasks are finished.
In the above example, we use grid_search as an exploration algorithm (to explore the performance of different combinations of hyperparameters). Another approach supported in Hops is genetic algorithms to perform population-based search for good combinations of hyperparameters (aka AutoML or neural architecture search). Other potential approaches are random walk, bayesian optimization, and hyperband.
In case a task fails on a PySpark executor, the task is typically retried up to N times on that same executor. If the task does not succeed after these N attempts the whole Spark Job will fail. Put that in to perspective of running a large hyperparameter search which may be running for days. Let’s assume as the PySpark executors are running the experiments, one of the servers experience a hardware malfunction, for example a disk breaks. The executors running on this machine will fail all their tasks and the job as a whole will be failed, due to this hardware failure. We need a smarter mechanism to rerun the specific experiments that failed on another machine.
Spark 2.1.0 added support for blacklisting problematic executors and even entire nodes. We take advantage of this mechanism to ensure hyperparameter optimization jobs are fault-tolerant. The behaviour that we configure is that an executor is blacklisted after an experiment fails for whatever reason and the allocated CPU/Memory/GPU(s) is reclaimed by YARN. The second time that the same experiment fails, it will blacklist the machine where the executor is running on. Which means that no more experiments will be run on that machine for the entire duration of the hyperparameter optimization job, the assumption here is that the failure may be hardware related. This means that the experiment which failed will now run on a new machine, if it fails a third time the job fails as a whole since the problem is most likely due to an application error and not hardware failure. In practice we have seen that this mechanism can also help remedy code errors, e.g. code which may fail occasionally is rerun up to three times.
In Hops, we have already added support for GPUs in a fork of Spark. This work is ongoing in the Spark community, with YARN/GPU support being developed in SPARK-2473.