We have bundled most of the things we believe are going to be critical for you to move from your laptop into production in this chapter. Notebooks and deployments go together, as Dask’s notebook interface greatly simplifies many aspects of using its distributed deployments. While you don’t need to use notebooks to access Dask, and in many cases notebooks have serious drawbacks, for interactive use cases it’s often hard to beat the trade-offs. Interactive/exploratory work has a way of becoming permanent mission-critical workflows, and we cover the steps necessary to turn exploratory work into production deployments.
You can deploy Dask in many fashions, from running it on top of other distributed compute engines like Ray to deploying it on YARN or a raw collection of machines. Once you’ve got your Dask job deployed, you’ll likely need to tune it so you don’t use your company’s entire AWS budget on one job. And then, finally, before you can walk away from a job, you’ll need to set up monitoring—so you know when it’s broken.
Note
|
If you’re just here to learn how to use Dask with notebooks, feel free to skip ahead to that section. If you want to learn more about deploying Dask, congratulations and condolences on exceeding the scale you can handle on a single computer. |
In this chapter, we will cover some (but not all) of the deployment options for Dask and their trade-offs. You will learn how to integrate notebooks into the most common deployment environments. You’ll see how to use these notebooks to track the progress of your Dask tasks and access the Dask UI when running remotely. We will finish by covering some options for deploying your scheduled tasks, so you can take a vacation without lining up someone to press run on your notebook every day.
Note
|
This chapter covers Dask’s distributed deployments, but if your Dask program is happy in local mode, don’t feel the need to deploy a cluster just for the sake of it.[1] |
When you are choosing how to deploy Dask, there are many different factors to consider, but often the biggest one is what tools your organization is already using. Most of the deployment options map to different types of cluster managers (CMs). CMs manage sets of computers and provide some isolation between users and jobs. Isolation can be incredibly important—for example, if one user eats all of the candy (or CPU), then another user won’t have any candy. Most cluster managers provide CPU and memory isolation, and some also isolate other resources (like disks and GPUs). Most clouds (AWS, GCP, etc.) offer both Kubernetes and YARN cluster managers, which can dynamically scale the number of nodes up and down. Dask does not need a CM to run, but without one, auto-scaling and other important features are not available.
When choosing a deployment mechanism, with or without a CM, some important factors to consider are the ability to scale up and down, multi-tenancy, dependency management, and whether the deployment method supports heterogeneous workers.
The ability to scale up and down (or dynamic scale) is important in many situations, as computers cost money. Heterogeneous or mixed worker types are important for workloads that take advantage of accelerators (like GPUs), so that non-accelerated work can be scheduled on less-expensive nodes. Support for heterogeneous workers goes well with dynamic scaling, as the workers can be replaced.
Multi-tenancy can reduce wasted compute resources for systems that cannot scale up and down.
Dependency management allows you to control, at runtime or in advance, what software is on the workers. This is critical in Dask; if the workers and the client do not have the same libraries, your code may not function. Additionally, some libraries can be slow to install at runtime, so the ability to pre-install or share an environment can be beneficial for some use cases, especially those in deep learning.
Deployment option comparisons compares some of the deployment options in Dask.
Deployment method | Dynamic scale | Recommended use case[2] | Dependency management | Notebook deployed inside[3] | Mixed worker types |
---|---|---|---|---|---|
localhost |
No |
Testing, solo dev, GPU-only acceleration |
Yes (runtime or pre-install) |
Yes |
No |
ssh |
No |
Solo lab, testing, but generally not recommended (k8s instead) |
Runtime only |
Yes |
Yes (manual) |
Slurm + GW |
Yes |
Existing high-performance computing/Slurm environments |
Yes (runtime or pre-install) |
Separate project |
Varies |
Dask "Cloud" |
Yes |
Not recommended; use Dask + K8s or YARN on cloud provider |
Runtime only |
Medium effort[4] |
No* |
Dask + K8s |
Yes |
Cloud environments, existing K8s deployments |
Runtime or pre-install (but more effort) |
Separate project, medium effort |
Yes |
Dask + YARN |
Yes |
Existing big data deployments |
Runtime or pre-install (but more effort) |
Separate project that has not been updated since 2019 |
Yes* |
Dask + Ray + [CM] |
Depends on CM |
Existing Ray deployments, multi-tool (TF, etc.), or actor systems |
Depends on CM (always at least runtime) |
Depends on CM |
Yes |
Coiled |
yes |
New cloud deployments |
Yes, including magic "auto-sync" |
No |
Yes |
There are two main ways to deploy Dask on Kubernetes:[5] KubeCluster and HelmCluster. Helm is a popular tool for managing deployments on Kubernetes, with the deployments being specified in Helm charts. Since Helm is the newer recommended way of managing deployments on Kubernetes, we will cover that one here.
The Helm documentation offers an excellent starting point on the different ways to install Helm, but for those in a hurry, curl https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash
will do it for you.[6]
Note
|
The Dask on Kubernetes Helm chart deploys what is called an operator. Currently, installing operators requires the ability to install Custom Resource Definitions (CRDs) and may require administrator privileges. If you can’t get the permissions (or someone with the permissions), you can still use the "vanilla" or "classic" deployment mode. |
Since GPU resources are costly, it is typical to want to allocate only as many of them as needed. Some cluster manager interfaces, including Dask’s Kubernetes plug-in, allow you to configure multiple types of workers so that it will allocate GPU workers only when needed. On our Kubernetes cluster, we deploy the Dask operator as shown in Deploying the Dask operator with Helm.
link:./examples/dask/kubernetes/install-dask-helm.sh[role=include]
Now you can use the Dask operator either by creating YAML files (likely not your favorite) or with the KubeCluster
API, as shown in Using the Dask operator, where we create a cluster and then add additional worker types, allowing Dask to create two different kinds of workers.[7]
link:./examples/dask/kubernetes/connect.py[role=include]
In 2020 Dask added a DaskHub Helm chart, which combines the deployment of JupyterHub with the Dask Gateway.
Deploying Dask on Ray is a little bit different than all of the other options, in that it changes not only how Dask workers and tasks are scheduled but also how Dask objects are stored. This can reduce the number of copies of the same object that need to be stored, allowing you to use your cluster memory more efficiently.
If you have a Ray deployment available to you, enabling Dask can be incredibly straightforward, as shown in Dask on Ray.
link:./examples/dask/Dask-ChN-spark_to_ray_to_dask.py[role=include]
However, if you don’t have an existing Ray cluster, you will still need to deploy Ray somewhere with the same considerations as Dask. Deploying Ray is beyond the scope of this book. Ray’s production guide has details for deploying on Ray, as does Scaling Python with Ray.
YARN is a popular cluster manager from the big data world that is available in open source as well as commercial on-premises (e.g., Cloudera) and cloud (e.g., Elastic Map Reduce). There are two ways to run Dask on a YARN cluster: one is with Dask-Yarn, and the other is with Dask-Gateway. While the two methods are similar, Dask-Gateway is potentially a bit more involved, as it adds a centrally managed server that runs to manage Dask clusters, but it has more fine-grained security and administrative controls.
Depending on the cluster, your workers might be more transient than other types, and their IP address might not be static when they get spun up again. You should ensure worker/scheduler service discovery methods are put in place for your own cluster setup. It could be as simple as a shared file that they read from, or a more resilient broker. If no additional arguments are given, Dask workers would use the DASK_SCHEDULER_ADDRESS
environment variable to connect.
Deploying Dask on YARN with a custom conda environment expands on [ex_yarn_deployment_ch09_1685536092648] with a custom conda environment and logging framework.
link:./examples/dask/Dask-Ch14-Tuning.py[role=include]
Alternatively, you can run the cluster with the CLI interface that Dask-Yarn exposes. You would first deploy YARN in the shell script of your choosing; the shell script then invokes the Python file you want to run. Within the Python file, you reference the deployed YARN cluster, as shown in Deploying Dask on YARN with CLI interface. This can be an easier way to chain your jobs and inspect and pull logs. Note that the CLI is supported only in Python versions above 2.7.6.
link:./examples/dask/Dask-Ch14-Tuning.py[role=include]
Dask has gained a big academic and scientific user base. This comes in part from how you can use existing high-performance computing (HPC) clusters with Dask to make readily available scalable scientific computing without rewriting all of your code.[8]
You can turn your HPC account into a high-performance Dask environment that you can connect to from Jupyter on your local machine. Dask uses its Dask-jobqueue library to support many HPC cluster types, including HTCondor, LSF, Moab, OAR, PBS, SGE, TORQUE, DRMAA, and Slurm. A separate library, Dask-MPI, supports MPI clusters. In [ex_slurm_deployment_ch09_1685536141262], we showed a sample of how to use Dask on Slurm, and in the following section, we’ll build on that example.
The first step for using Dask on your cluster is to set up your own Python and iPython environments in the cluster. The exact way to do that will vary by your cluster’s admin’s preferences. Generally, users often use virtualenv or miniconda to install related libraries on a user level. Miniconda makes it easier to use not just your own libraries but your own version of Python. Once that is done, ensure your Python command points to the Python binary in your user space by running which python
or installing and importing a library not available in system Python.
The Dask-jobqueue library converts your Dask settings and configurations into a job script that is submitted to the HPC cluster. The following example starts a cluster with Slurm workers, and the semantics are similar for other HPC API. Dask-MPI uses a slightly different pattern, so be sure to refer to its documentation for details. job_directives_skip
is an optional parameter, used to ignore errors in cases where auto-generated job script inserts some commands that your particular cluster does not recognize. job_script_prologue
is also an optional parameter that specifies shell commands to be run at each worker spawn. This is a good place to ensure proper Python environments are set up, or a cluster-specific setup script.
Tip
|
Make sure HPC cluster specs for worker memory and cores are correctly matched in |
HPC systems often leverage high-performance network interface on top of a standard Ethernet network, and this has become a crucial way to speed up data movement. You can pass an optional interface parameter, as shown in Deploying Dask on an HPC cluster by hand, to instruct Dask to use the higher-bandwidth network. If you are not sure which interfaces are available, type in ifconfig
on your terminal, and it will show infiniband, often ib0
, as one of the available network interfaces.
Finally, the cores and memory description are per-worker resources, and n_workers
specifies how many jobs you want to queue initially. You can scale and add more workers after the fact, as we do in Deploying Dask on an HPC cluster by hand, with the cluster.scale()
command.
Tip
|
Some HPC systems use GB when they mean 1,024-based units. Dask-jobqueue sticks with the proper notation of GiB. 1 GB is 1,0003 bytes, and 1 GiB is 1,0243 bytes. Academic settings often use binary measurements, while commercial settings usually opt for SI units, hence the discrepancy. |
Before running Dask in a new environment, you should check the job script that is auto-generated by Dask-jobqueue for unsupported commands. While Dask’s jobqueue library tries to work with many HPC systems, it may not have all of the quirks of your institution’s setup. If you are familiar with the cluster capabilities, you may find unsupported commands by calling print(cluster.job_script())
. You can also try to run a small version of your job first with a limited number of workers and see where they fail. If you find any issues with the script, you should use the job_directives_skip
parameter to skip the unsupported components, as outlined in Deploying Dask on an HPC cluster by hand.
link:./examples/dask/Dask-Ch14-Tuning.py[role=include]
In Deploying Dask using job-queue over Slurm with Dask futures we tie together many of the concepts we’ve introduced. Here we run a delayed execution over some asynchronous task using Dask delayed, which is deployed on a Slurm cluster. The example also combines several logging strategies we’ve mentioned, such as revealing the underlying deployment HPC job script, as well as providing a progress bar for users to track in a notebook or in their CLI of choice.
link:./examples/dask/Dask-Ch14-Tuning.py[role=include]
Tip
|
Always ensure your walltime requests don’t run afoul of HPC resource managers. For example, Slurm has a backfill scheduler that applies its own logic, and if you request too long a walltime, your request for compute resources can get stuck in the queue, unable to spin up on time. In such a case, the Dask client may error out with a non-descriptive message, such as “Failed to start worker process. Restarting.” At the time of writing, there aren’t many ways to surface specific deployment issues without some logging code from the user end. |
On the more advanced end, you can control your cluster configuration by updating the Dask-jobqueue YAML file, which is generated on first run and stored at /.config/dask/jobqueue.yaml. The jobqueue configuration file contains, commented out, default configurations for many different types of clusters. To get started editing the file, uncomment the cluster type you are using (e.g., Slurm), and then you can change the values to meet your specific needs. The jobqueue configuration file allows you to configure additional parameters not available through the Python constructor.
If Dask starts to run out of memory, it will by default start to write data out to disk (called spill-to-disk). This is normally great, since we tend to have more disk than memory, and while it is slower, it’s not that much slower. However, on HPC environments, the default location that Dask may write to could be a network storage drive, which will be as slow as transferring the data on the network. You should make sure Dask writes to local storage. You can ask your cluster administrator for the local scratch directory or use df -h
to see where different storage devices are mapped. If you don’t have local storage available, or if it’s too small, you can also turn off spill-to-disk. Both disabling and changing the location of spill-to-disk on clusters can be configured in the ~/.config/dask/distributed.yaml file (also created on first run).
Tip
|
Adaptive scaling is a great way to scale your job up and down while your application is running, especially on busy shared machines like HPC systems. However, each HPC system is unique, and sometimes the way Dask-jobqueue handles adaptive scaling creates issues. We have encountered such a problem when running Dask adaptive scaling on Slurm using jobqueue but with some effort were able to configure it properly. |
Dask also uses files for locking, which can cause issues when using a shared network drive, as is common in HPC clusters. If there are multiple workers working simultaneously, it uses a locking mechanism, which excludes another process from accessing this file, to orchestrate itself. Some issues on HPC can come down to an incomplete locking transaction, or an inability to write a file on disk due to administrative restrictions. Worker configs can be toggled to disable this behavior.
Tip
|
Cluster parameters, such as memory allocation and number of jobs, workers, processes, and CPUs per task, are quite sensitive to user input and can be difficult to grasp at the beginning. For example, if you launch an HPC cluster with multiple processes, each process will take a fraction of total allocated memory. Ten processes with 30 GB of memory would mean each process gets 3 GB of memory. If your workflow at peak has more than 95% of the process memory (2.85 GB in our example), your process will be paused and even terminated early due to memory overflow risks, potentially resulting in a failed task. For more on memory management, refer to Worker Memory Management. |
For HPC users, most processes you launch will have a limited amount of walltime that the job is allowed to stay on. You can stagger the creation of workers in such a way that you will have at least one worker running at all times, creating an infinite worker loop. Alternatively, you can also stagger the creation and end of the workers, so that you avoid all of the workers ending simultaneously. Dask worker management through adaptive scaling shows you how.
link:./examples/dask/Dask-Ch14-Tuning.py[role=include]
Tip
|
Different workers can have different startup times, as well as contain different amounts of data, which impacts the cost of fault recovery. |
While Dask has good tools to monitor its own behavior, sometimes the integration between Dask and your HPC cluster (or other cluster) can break. If you suspect jobqueue isn’t sending the right worker commands for your particular cluster, you can inspect the /.config/dask/jobqueue.yaml file directly or dynamically at runtime or in your Jupyter notebook by running config.get('jobqueue.yaml')
.
Part of running Dask remotely is being able to connect to the server to run your tasks. If you want to connect your client to a remote cluster, run Jupyter remotely, or just access the UI on a cluster, you’ll need to be able to connect to some ports on the remote machine.
Warning
|
Another option is to have Dask bind to a public IP address, but without careful firewall rules, this means that anyone can access your Dask cluster, which is likely not your intention. |
In HPC environments you often already connect using SSH, so using SSH port forwarding is often the easiest way to connect. SSH port forwarding allows you to map a port on another computer to one on your local computer.[9] The default Dask monitoring port is 8787, but if that port is busy (or you configure a different one), Dask may bind to a different port. The Dask server prints out which ports it is bound to at start time. To forward port 8787 on a remote machine to the same local port, you could run ssh -L localhost:8787:my-awesome-hpc-node.hpc.fake:8787
. You can use the same techniques (but with different port numbers) for a remote JupyterLab, or to connect a Dask client to a remote scheduler.
Tip
|
If you want to leave a process running remotely (like JupyterLab), the screen command can be a great way of having a process last beyond a single session. |
With the immense popularity of notebooks, some HPC clusters have special tools to make it even easier to launch Jupyter notebooks. We recommend looking for your cluster administrator’s documentation on how to launch Jupyter notebooks, as you may accidentally create security issues if you don’t do it correctly.
You can run Dask in Jupyter like any other library, but Dask’s JupyterLab extensions make it easier to understand the status of your Dask job while it’s running.
Dask’s lab extensions require nodejs
, which can be installed with conda install -c conda-forge nodejs
. If you are not using conda, it can also be installed with brew install node
on Apple or sudo apt install nodejs
on Ubuntu.
Dask’s lab extensions package is available as dask-labextension
.
Once you’ve installed the lab extension, it will show up with the Dask logo on the left side, as shown in A successfully deployed Dask instance on JupyterLab (digital, color version).
From there you can launch your cluster. By default, the extension launches a local cluster, but you can configure it to use different deployment options, including Kubernetes, by editing ~/.config/dask.
If you are using Dask’s JupyterLab extension (see Dask web UI inside JupyterHub using the JupyterLab extension (digital, color version)), it provides a link to the cluster UI as well as the ability to drag individual components into the Jupyter interface.
The JupyterLab extension links to the Dask web UI, and you can also get a link through the cluster’s repr
. If the cluster link does not work/is not accessible, you can try installing the jupyter-server-proxy
extension so you can use the notebook host as a jump host.
Dask jobs tend to take a long time to run; otherwise we would not be putting in the effort to parallelize them. You can use Dask’s progress
function from dask.distributed
to track your futures' progress inside your notebook itself (see Real-time Dask progress monitoring in JupyterHub (digital, color version)).
Tuning your Dask program involves understanding the intersection of many components. You will need to understand your code’s behavior and how it interacts with the data given and the machines. You can use Dask metrics to gain insight on much of this, but, especially if you did not create it, it’s important to look at the program as well.
Distributed computing requires constantly making decisions and weighing the optimization of the cost and benefits of distributing workload against locally running the work. Most of that low-level decison making is delegated to the internals of Dask. The user should still monitor the runtime characteristics and make modifications to the code and configurations if needed.
Dask automatically tracks relevant compute and runtime metrics. You can use this to help you decide how to store your data, as well as inform where to focus your time on optimizing your code.
Of course, the cost of computation is more than just the compute time. Users should also consider the time spent transferring data over network, the memory footprint within the workers, GPU/CPU utilization rate, and disk I/O costs. These in turn let you understand the higher-level insights of data movement and computation flow, such as how much of the memory in a worker is used up in storing previous computation that hasn’t been passed on to the next computation, or what particular function is taking up the most amount of time. Monitoring these can help tune your cluster and code, but it also can help identify emergent computation patterns or logical bottlenecks that you can change.
Dask’s dashboard provides a lot of statistics and graphs to answer these questions. The dashboard is a web page tied to your Dask cluster at runtime. You can access it through your local machine or on the remote machine that it is running in, through methods we discussed earlier in this chapter. Here, we will cover a few of the ways you can get insights from the performance metrics and tune Dask accordingly to achieve better results.
Dask’s dashboard contains many different pages, each of which can help with understanding different parts of your program.
The Task Stream dashboard gives a high-level view of each worker and its behavior. Exact methods invoked are color-coded, and you can inspect them by zooming in and out. Each row represents one worker. The custom-colored bars are user-induced tasks, and there are four preset colors to indicate common worker tasks: data transfer between workers, disk read and writes, serialization and deserialization times, and failed tasks. A task stream with well-balanced workers (digital, color version) shows a compute workload that is distributed over 10 workers and is well balanced, with no one worker finishing late, evenly distributed compute time, and minimal network IO overhead.
On the other hand, A task stream with too many small data chunks (digital, color version) shows a situation in which compute is uneven. You can see that there is a lot of whitespace between computation, meaning the workers are blocked and are not actually computing during that time. Additionally, you see some workers start earlier and others finish later, hinting that there are some issues with distributing the job. This could be due to the inherent dependency of your code or suboptimal tuning. Changing the DataFrame or array chunk sizes might make these less fragmented. You do see that when the job starts on each worker, they take roughly the same amount of work, meaning the work itself is still balanced fairly well and distributing the workload is giving you good returns. This is a fairly contrived illustrative example, so this task only took a few seconds, but the same idea applies to longer and bulkier workloads as well.
You can monitor the memory usage, sometimes referred to as memory pressure,[10] of each worker on the Bytes Stored portion (see Memory usage by worker in the monitoring UI (digital, color version)). These are by default color-coded to signify memory pressure within limits, approaching limit, and spilled to disk. Even if memory usage is within the limits, as it increases beyond 60% to 70%, you are likely to encounter performance slowdowns. Since memory usage is rising, internals of Python and Dask are going to run costlier garbage collection and memory optimization tasks in the background to keep it from rising.
You can see the aggregated view of task completion in the progress bar in Progress monitoring by task, summed over all workers (digital, color version). The order of execution is from top to bottom, although that does not always mean it’s completely sequential. The colors of the bars are particularly information-rich for tuning. The solid gray on the far right for sum()
and random_sample()
in Progress monitoring by task, summed over all workers (digital, color version) means tasks that are ready to run, with dependent data ready but not yet assigned to a worker. The bold non-gray colors mean tasks are finished, with result data that is waiting for the next sequence of tasks to take it. The fainter non-gray color blocks signify tasks that are done, with result data handed off and purged from memory. Your goal is to keep the solid-color blocks within a manageable size, to make sure you utilize most of your allocated memory.
Similar information is available on Task Graph (see A task graph showing the color-coded status of each task and its preceding and succeeding tasks (digital, color version)), from the view of individual tasks. You may be familiar with these types of MapReduce-like directed acyclic graphs. Order of computation is shown from left to right, with your tasks originating from many parallel workloads, distributed among your workers, and ending up with an outcome that is distributed among 10 workers, in this case. This graph is also an accurate low-level depiction of task dependencies. The color coding also highlights where in the computational life cycle each work and data is currently sitting in. By looking at this, you can get a sense of which tasks are bottlenecks and thus are potentially good places to start optimizing your code.
The Workers tab allows you to see real-time CPU, memory, and disk IO, among other things (see Worker monitoring for a Dask cluster with 10 workers (digital, color version)). Monitoring this tab can be useful if you suspect that your worker is running out of memory or disk space. Some of the remedies for that can include allocating more memory to the workers or choosing a different chunking size or method for the data.
Worker event monitoring for a Dask cluster (digital, color version) shows worker event monitoring. Dask’s distributed scheduler runs on a loop called event loop, which manages all tasks that are to be scheduled and the workers, managing the execution, communication, and status of the computation. The event_loop_interval
metric is a measure of average time between the iterations of this loop for each worker. A shorter time means it took less time for the scheduler to do its management tasks for this worker. If this goes higher, this could mean a number of things, including suboptimal network configuration, resource contention, or high communication overhead. If this remains high, you might want to look into whether you have enough resources for the compute, and either allocate larger resources per worker or rechunk the data into smaller portions.
The System tab allows you to track the CPU, memory, network bandwidth, and file descriptors. CPU and memory are easy to understand. An HPC user would be keen to track network bandwidth if their job requires heavy amounts of data to be moved around. File descriptors here track the number of input and output resources the system has open at the same time. This includes actual files open for read/write, but also network sockets that communicate between machines. There is a limit to how many of these descriptors a system can have open at the same time, so a very complicated job or a leaky workload that opens a lot of connections, gets stuck, and does not close can create trouble. Similar to leaky memory, this can lead to performance issues as time goes on.
The Profile tab allows you to see the amount of time spent on executing code, down to the exact function call, on an aggregate level. This can be helpful in identifying tasks that create a bottleneck. Task duration histogram for a Dask job (digital, color version) shows a task duration histogram, which shows a fine-grained view of each task and all the subroutines needed to call for that task, and their runtime. This can help quickly identify a task that is consistently longer than others.
Tip
|
You can change the logging intervals with the |
You can monitor Dask in real time with the dashboard, but the dashboard will disappear once you close out your cluster. You can save the HTML page, as well as export the metrics as a DataFrame, and write out custom code for metrics (see Generating and saving the Dask dashboard to file).
link:./examples/dask/Dask-Ch14-Tuning.py[role=include]
You can generate a performance report manually for any block of computation, without having to save the entire runtime report, with the following code block. Any computation that you pass within performance_report(“filename”)
will be saved under that file. Note that under the hood, this requires Bokeh to be installed.
For much more heavy-duty usage, you can use Dask with Prometheus, the popular Python metrics and alerting tool. This requires you to have Prometheus deployed. Then through Prometheus, you can hook up other tools, like Grafana for visualization or PagerDuty for alerts.
Dask’s distributed scheduler provides the metrics info as a task stream object without using the UI itself. You can access the information in the Task Stream UI tab from Dask directly, down to the level of lines of code that you want this to be profiled over. Generating and computing Dask’s runtime statistics with task stream demonstrates how to use a task stream and then extract some statistics out of it into a small pandas DataFrame for further analysis and sharing.
link:./examples/dask/Dask-Ch14-Tuning.py[role=include]
You can insert custom metrics using the dask.distributed.diagnostics
class. One of the functions here is a MemorySampler
context manager. When you run your Dask code within ms.sample()
, it records a detailed memory usage on cluster. Inserting a memory sampler for your code, while contrived, shows how you would run the same compute over two different cluster configurations and then plot to compare the two different environment configurations.
link:./examples/dask/Dask-Ch14-Tuning.py[role=include]
Here, we discuss some of the commonly identified issues and overlooked considerations when running your code in distributed cluster settings.
If your cluster manager supports it, you can scale up and down the number of workers by calling scale
with the desired number of workers. You can also tell the Dask scheduler to wait until the requested number of workers are allocated and then proceed with computation with the client.wait_for_workers(n_workers)
command. This can be useful for training certain ML models.
We briefly touched on adaptive scaling in previous chapters. You can enable auto/adaptive scaling on your cluster by calling adapt()
on the Dask client. The scheduler analyzes the computation and invokes the scale
command to add or remove workers. The Dask cluster types—KubeCluster, PBSCluster, LocalClusters, etc.—are the cluster classes that handle actual requests and scaling up and down of the workers. If you see issues with adaptive scaling, ensure that your Dask is correctly asking for resources from the cluster manager. Of course, for auto-scaling in Dask to work, you have to be able to scale your own resource allocations within the cluster that you are running your job on, be it HPC, managed cloud, etc. We already introduced adaptive scaling in Inserting a memory sampler for your code; refer to that example for code snippets.
Some intermediate results can be used further down in the code execution, but not immediately after. In these cases, Dask might delete the data, not realizing it will need it further down, and you will end up needing another round of costly computation. If you identify this pattern, you can use the .persist()
command. With this command, you should also use Python’s built-in del
command to ensure that the data is removed when it’s no longer needed.
Dask Nanny is a process that manages workers. Its job is to prevent workers from exceeding its resource limits, leading to an unrecoverable machine state. It constantly monitors CPU and memory usage for the worker and triggers memory clean-up and compaction. If the worker reaches a bad state, it will automatically restart the worker and try to recover the previous state.
Complications may arise if a worker that contains a computationally expensive and large chunk of data is lost for some reason. The nanny will restart the worker and try to redo the work that led to that point. During that, other workers will also hold on to data that they were working on, leading to a spike in memory usage. The strategies to remedy this will vary, from disabling the nanny to modifying chunking sizes, worker size, and so on. If this happens often, you should consider persisting, or writing that data to disk.[11]
If you see error messages such as “Worker exceeded 95% memory budget. Restarting,” this is likely where it came from. It’s the class responsible for starting workers, monitoring, terminating, and restarting the workers. This memory fraction, as well as spillover location, can be set in the distributed.yaml configuration file. HPC users can turn the nanny’s memory monitoring off if the system itself has its own memory management strategies. If the system also restarts killed jobs, you could turn off the nanny with the --no-nanny
option.
By default, when the worker’s memory is around 60% full, it starts sending some data to disk. At over 80%, it stops allocating new data. At 95%, the worker is terminated preemptively, in order to avoid running out of memory. This means after your worker’s memory is more than 60% full, there will be performance degradations, and it’s usually best practice to keep memory pressure lower.
Advanced users can use Active Memory Manager, a daemon that optimizes memory usage of workers on a holistic view of the cluster. You give this manager a particular goal to optimize for, such as reducing replication of the same data within the cluster, or retire_workers
, a special advanced use case where you do memory transfer from one worker to another when the worker is being retired, or other custom policies. In some cases, Active Memory Manager has been shown to decrease memory usage up to 20% for the same task.[12]
Auto/adaptive scaling takes care of the question of "how many" workers but not of "how big" each worker should be. That said, here are some general rules of thumb:
-
Use smaller worker size when debugging, unless you expect that the bug is due to the large number of workers given.
-
Size up worker memory allocation with the input data size and number of workers you are using.
-
The number of chunks in the data should roughly match the number of workers. Fewer workers than chunks will lead to some chunks not being worked on until the first round of computation is over, leading to a larger memory footprint of intermediate data. Conversely, having more workers than the number of chunks will result in idling workers.
-
If you have the option of having higher worker count and smaller individual worker memory (versus smaller worker count and larger individual worker memory), analyze your data’s chunk sizes. That chunk must fit in one worker with some room for computation, setting the minimum memory needed for your worker.
Fine-tuning your machine sizes can become a never-ending exercise, so it’s important to know what "good enough" is for your purposes.
We have briefly covered chunks and chunk sizes in earlier chapters. Now we expand on this to cluster scale. Chunk size and worker sizes are integral to how Dask functions, as it uses a block-level view of computation and data within its task graph–based execution model. It’s the essential parameter that determines how the distributed computation will work. While using Dask and other distributed systems, we find this is one of the essential ideas to keep in mind as we turn the knobs and dials of such large machines.
For any given worker hardware configuration and computation you are doing, there will be a sweet spot for the chunk sizes, and the user’s job is to set this size. Finding the exact number might not be useful, but finding roughly what type of configuration is likely to give you the best result can make a huge difference.
The key idea of chunking is to load balance computation and storage, at a cost of overhead of communications. On one end of the extreme, you have single-machine data engineering, with your data in one pandas DataFrame, or a single Dask DataFrame with no partitioning. There is not much communication cost, as all communication happens between your RAM and your GPU or CPU, with data moving through a single computer’s motherboard. As your data size grows, this monolithic block will not work, and you run into out-of-memory errors, losing all your previous in-memory computation. Hence, you would use a distributed system like Dask.
On the other extreme, a very fragmented dataset with small chunk sizes over multiple machines connected over Ethernet cables will be slower to work together as the communication overhead grows, and may even overrun the scheduler’s capacity to handle communications, gathering, and coordinating. Maintaining a happy balance between the two extremes, and knowing which problem requires which tools, is an essential job of modern distributed data engineering.
When pipelining multiple data streams into a job, you might have two datasets, with two different chunk sizes, even if their data dimensions match. In runtime, Dask will have to rechunk one of the datasets to match the chunk sizes of the other. Doing so in runtime can get costly and memory inefficient. If this is spotted, you can consider having a separate job that does the rechunking before ingesting into your job.
There are many different systems to make your jobs run on a schedule. This schedule can range from being periodic and time-based to being triggered by upstream events (like data becoming available). Popular tools to schedule jobs include Apache Airflow, Flyte, Argo, GitHub Actions, and Kubeflow.[13] Airflow and Flyte have built-in support for Dask, which can simplify running your scheduled task, so we think that they are both excellent options for scheduled Dask jobs. The built-in operators make it easier to track failure, which is important, as taking actions on stale data can be as bad as taking actions on wrong data.
We also often see people use Unix crontabs and schtasks, but we advise against that, as they run on a single machine and require substantial additional work.
Tip
|
For scheduled jobs on Kubernetes, you can also have your scheduler create a DaskJob resource, which will run your Dask program inside the cluster. |
In [appA], you will learn details about testing and validation, which are especially important for scheduled and automated jobs, where there is no time for manual checking.
Like many other distributed libraries, Dask provides logs, and you can configure Dask logs to be sent to a storage system. The method will vary by the deployment environment and if you are using Jupyter.
One generic way you can get the worker and scheduler logs is through get_worker_logs()
and get_scheduler_logs()
on the Dask client. You can specify specific topics and log to/read from just the relevant topics. Refer to [ex_basic_logging_ch09_1685536244456] for more information.
You are not limited to logging strings, and you can instead log structured events. This can be especially useful for performance analysis or for anything where the log messages might be visualized rather than looked at individually by a human. In Structured logging on workers, we do this with a distributed softmax
function, and log the events and retrieve them on the client.
link:./examples/dask/Dask-Ch10_porting.py[role=include]
In this chapter, you’ve learned the various deployment options for Dask distributed, from commodity cloud to HPC infrastructures. You’ve also learned Jupyter magics to simplify getting access to information with remote deployments. In our experience, Dask on Kubernetes and Dask on Ray on Kubernetes offer the flexibility we need. Your own decision about how to deploy Dask may be different, especially if you are working in a larger institution with existing cluster deployments. Most of the deployment options are covered in detail in the "Deploy Dask Clusters" guide, with the notable exception of Dask on Ray, which is covered in the Ray documentation.
You’ve also learned about runtime considerations and metrics to track when running a distributed work, and the various tools in Dask’s dashboard to accomplish that, expanded with more advanced user-defined metrics generation. Using these metrics, you’ve learned the conceptual basis for tuning your Dask Distributed clusters, troubleshooting, and how this relates to the fundamental design principle of Dask and distributed computing.
distributed.scheduler.worker-saturation
for more information.