- What abstractions does Dask offer?
- How can I paralellize existing Python code?
- Understand the abstraction of delayed evaluation.
- Use the
visualize
method to create dependency graphs.
Dask is one of many convenient tools
available for parallelizing Python code. We have seen a basic example of
dask.array
in a previous episode. Now, we will focus on the
delayed
and bag
sub-modules. Dask has other
useful components that we do not cover in this lesson, such as
dataframe
and futures
.
See an overview below:
Dask module | Abstraction | Keywords | Covered |
---|---|---|---|
dask.array |
numpy |
Numerical analysis | ✔️ |
dask.bag |
itertools |
Map-reduce, workflows | ✔️ |
dask.delayed |
functions | Anything that doesn’t fit the above | ✔️ |
dask.dataframe |
pandas |
Generic data analysis | ❌ |
dask.futures |
concurrent.futures |
Control execution, low-level | ❌ |
Dask Delayed
A lot of the functionalities in Dask are based on an important
concept known as delayed evaluation. Hence we go a bit deeper
into dask.delayed
.
dask.delayed
changes the strategy by which our
computation is evaluated. Normally, you expect that a computer runs
commands when you ask for them, and that you can give the next command
when the current job is complete. With delayed evaluation we do not wait
before formulating the next command. Instead, we create the dependency
graph of our complete computation without actually doing any work. Once
we build the full dependency graph, we can see which jobs can be done in
parallel and have those scheduled to different workers.
To express a computation in this world, we need to handle future objects as if they’re already there. These objects may be referred to as either futures or promises.
Several Python libraries provide slightly different support for working with futures. The main difference between Python futures and Dask-delayed objects is that futures are added to a queue at the point of definition, while delayed objects are silent until you ask to compute. We will refer to such ‘live’ futures as futures proper, and to ‘dead’ futures (including the delayed) as promises.
from dask import delayed
The delayed
decorator builds a dependency graph from
function calls:
@delayed
def add(a, b):
= a + b
result print(f"{a} + {b} = {result}")
return result
A delayed
function stores the requested function call
inside a promise. The function is not actually executed
yet, and we get a value promised, which can be computed
later.
= add(1, 2) x_p
We can check that x_p
is now a Delayed
value:
type(x_p)
[out]: dask.delayed.Delayed
Note on notation
It is good practice to suffix with
_p
variables that are promises. That way you keep track of promises versus immediate values. {: .callout}
Only when we ask to evaluate the computation do we get an output:
x_p.compute()
1 + 2 = 3
[out]: 3
From Delayed
values we can create larger workflows and
visualize them:
= add(1, 2)
x_p = add(x_p, 3)
y_p = add(x_p, y_p)
z_p ="LR") z_p.visualize(rankdir
Challenge: run the workflow
Given this workflow:
= add(1, 2)
x_p = add(x_p, 3)
y_p = add(x_p, -3) z_p
Visualize and compute y_p
and z_p
separately. How many times is x_p
evaluated?
Now change the workflow:
= add(1, 2)
x_p = add(x_p, 3)
y_p = add(x_p, y_p)
z_p ="LR") z_p.visualize(rankdir
We pass the not-yet-computed promise x_p
to both
y_p
and z_p
. If you only compute
z_p
, how many times do you expect x_p
to be
evaluated? Run the workflow to check your answer.
Solution
Solution
z_p.compute()
1 + 2 = 3
3 + 3 = 6
3 + 6 = 9
[out]: 9
The computation of x_p
(1 + 2) appears only once. This
should convince you to procrastinate the call compute
as
long as you can.
We can also make a promise by directly calling
delayed
:
= 10**7
N = delayed(calc_pi)(N) x_p
It is now possible to call visualize
or
compute
methods on x_p
.
Decorators
In Python the decorator syntax is equivalent to passing a function through a function adapter, also known as a higher order function or a functional. This adapter can change the behaviour of the function in many ways. The statement
@delayed
def sqr(x):
return x*x
is functionally equivalent to:
def sqr(x):
return x*x
= delayed(sqr) sqr
Variadic arguments
In Python you can define functions taking arbitrary number of arguments:
def add(*args):
return sum(args)
1, 2, 3, 4) # => 10 add(
You can then use tuple-unpacking to pass a sequence of arguments:
= [1, 2, 3, 4]
numbers *numbers) # => 10 add(
We can build new primitives from the ground up. An important function
that is found frequently where non-standard evaluation strategies are
involved is gather
. We can implement gather
as
follows:
@delayed
def gather(*args):
return list(args)
Challenge: understand gather
Can you describe what the gather
function does in terms
of lists and promises? Hint: Suppose I have a list of promises, what
does gather
enable me to do?
Solution
Solution
It turns a list of promises into a promise of a list.
This small example shows what gather
does:
= gather(*(delayed(add)(n, n) for n in range(10))) # Shorthand for gather(add(1, 1), add(2, 2), ...)
x_p x_p.visualize()
Computing the result
x_p.compute()
gives
[out]: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Challenge: design a mean
function and calculate \(\pi\)
Write a delayed
function that computes the mean of its
arguments. Use it to estimate \(\pi\)
several times and have it return the mean of the intermediate
results.
>>> mean(1, 2, 3, 4).compute()
2.5
Make sure that the entire computation is contained in a single promise.
Solution
Solution
from dask import delayed
import random
@delayed
def mean(*args):
return sum(args) / len(args)
def calc_pi(N):
"""Computes the value of pi using N random samples."""
= 0
M for i in range(N):
# take a sample
= random.uniform(-1, 1)
x = random.uniform(-1, 1)
y if x*x + y*y < 1.: M+=1
return 4 * M / N
= 10**6
N = mean(*(delayed(calc_pi)(N) for i in range(10)))
pi_p pi_p.compute()
You may not see a significant speed-up. This is because
dask delayed
uses threads by default, and our native Python
implementation of calc_pi
does not circumvent the GIL. You
should see a more significant speed-up with the Numba version of
calc_pi
, for example.
In practice, you may not need to use @delayed
functions
frequently, but they do offer ultimate flexibility. You can build
complex computational workflows in this manner, sometimes replacing
shell scripting, make files, and suchlike.
- We can change the strategy by which a computation is evaluated.
- Nothing is computed until we run
compute()
. - With delayed evaluation Dask knows which jobs can be run in parallel.
- Call
compute
only once at the end of your program to get the best results.