Updated: Aug 6
In this blog post, we will talk about a case study of utilizing dask to speed up a particular computation and will scale that with the help of coiled.
What is Dask?
Dask is an open-source parallel computing library written in Python. One of the most useful features of Dask is that you can use the same code for computations on one machine or clusters of distributed machines. Traditionally making the transition from a single machine to running the same computation on a distributed computing cluster has been very difficult in the PyData ecosystem without Dask.
The beauty of Dask is that it uses existing APIs from libraries like NumPy, pandas, and scikit-learn to create Dask equivalents of those libraries for scalable computing.
Reading a DataFrame in pandas
import pandas as pd df = pd.read_csv(filename) df.head()
Reading a DataFrame in Dask
import dask.dataframe as dd df = dd.read_csv(filename) df.head()
And the latter can scale from your computer to distributed machines on the cloud.
Coiled is a service to scale Dask on the cloud. It's a company started by Matthew Rocklin, who is also the main developer of Dask.
It gives you access to compute resources on the cloud with just a few lines of code. For example, launching a Coiled Dask Cluster with "n" number of workers is as easy as:
# Install Coiled pip install coiled
# Launch a cluster with Coiled import coiled cluster = coiled.Cluster( n_workers=5, worker_cpu=4, worker_memory="16 GiB", ) # Connect Dask to your cluster from dask.distributed import Client client = Client(cluster)
As shown above you need the Coiled pypi package to interact with Coiled's API.
You also need to sign up for a free account on Coiled to be able to spin up a Dask cluster on Coiled. At the time of writing this, Coiled's Free account gives 1000 CPU hours with 100 cores every month.
In large-scale genomics analysis, the samples are often very similar. Because of that, it is useful to reduce samples into groups of individuals, that share common traits. One of the most common techniques for doing this is calculating a distance metric between all individuals. This involves calculating the pairwise distance between each of the individuals, which is a very time time-expensive calculation. Consequently, it is important to utilize all kinds of performant hardware we have at our disposal.
To simplify this a bit, consider a set of vectors in a 2D Matrix:
Our goal here is to calculate the pairwise distance between all vector pairs. The output matrix will look like the following:
Do the computation mentioned above correctly.
It should be scalable to large datasets, where the array doesn't fit in memory.
It should be able to utilize GPUs if available to speed up the computation.
It should be generic enough to be able to accommodate further implementations of various distance metrics like say Euclidean, Minkowski, cityblock, etc.
Ability to benchmark on large clusters on cloud with access to CPUs and GPUs
There exists a solution for the same operation in the scipy library in the form of a function named pdist.
It solves the problem for relatively smaller datasets and is fast enough, but it doesn't scale well and does not utilize GPUs either. Therefore, there was a need for a new implementation.
The final implementation is available in the sgkit library and it is exposed as follows:
import dask.array as da from sgkit.distance.api import pairwise_distance x = da.random.random((4, 5))
Run on CPU
pairwise_distance(x).compute() array([[0. , 0.58320543, 0.85297084, 0.67021758], [0.58320543, 0. , 0.86805965, 0.61936337], [0.85297084, 0.86805965, 0. , 0.78851248], [0.67021758, 0.61936337, 0.78851248, 0. ]])
Run on GPU
pairwise_distance(x, device="gpu").compute() array([[0. , 0.58320543, 0.85297084, 0.67021758], [0.58320543, 0. , 0.86805965, 0.61936337], [0.85297084, 0.86805965, 0. , 0.78851248], [0.67021758, 0.61936337, 0.78851248, 0. ]])
To tackle the problem of scaling the computation for large datasets, where the array would not fit in memory, we took the map-reduce approach on the chunks of the arrays which can be described as follows:
Consider a small chunk of a large 2D array as follows:
We calculate the map step of the computation on the chunk. To illustrate this, consider euclidean distance. The euclidean distance for a pair of vectors V0 and V1 is defined as the square root of
Map of Map-Reduce Algorithm
Now to understand map-reduce on a chunk of a large matrix, let's try to understand it with a simple example of two vectors. To perform the map step of the map-reduce algorithm for Euclidean distance, we calculate the squared sum of a pair of vectors, which is demonstrated in the image below:
In the above image, you can see that the 2D array has three chunks and we calculate the squared sum for each chunk of the array. In practice, a chunk will have more than two vectors, but here we have taken only two vectors for simplicity.
Reduction step of Map-Reduce Algorithm
In the reduction step, we take the square root of the sum of all the squared sums of all the chunks. It is demonstrated in the image below:
The implementation was done using Dask and Numba. Dask array's da.blockwise and da.reduction were used for