Data-parallel Julia

Concurrency patterns for controlled parallelisms

💡 Note
If you are new to parallel programming in Julia, have a look at other tutorials such as A quick introduction to data parallelism in Julia and Efficient and safe approaches to mutation in data parallelism. This tutorial is an introduction to how to control the scheduling details of the parallel execution. But, the best approach is to not think about such issues.

High-level data parallelism is the best starting point for writing parallel programs. However, it is sometimes required to control the parallelism in your program so that, e.g., the usages of the bounded resources like memory can be managed. This is where it is necessary to deal with concurrency. Although there are a lot of concurrency primitives, Channel is the most versatile tool that Julia provides out-of-the-box. In this tutorial, we look at how to implement simple and useful patterns based on Channel. Some of these patterns are known as task-parallel algorithmic skeletons (or parallel skeletons).

  1. Worker pool
    1. Re-distribution hacks
  2. Task farm
  3. Pipeline
  4. Promise (request-response)
    1. Improving the API
    2. Wrapping single-thread API
  5. See also

Worker pool

Useful for:

Pattern:

using Base.Threads: @spawn

function workerpool(work!, allocate, request; ntasks = Threads.nthreads())
    @sync for _ in 1:ntasks
        @spawn allocate() do resource
            for input in request
                work!(input, resource)
            end
        end
    end
end
                           x7 .----->  work!(x1, resource1)
                             /
request                     /   x9
[..., x12, x11, x10] ------+-------->  work!(x6, resource2)
                            \
                             \
                          x8  `----->  work!(x4, resource3)

Note that request must define iterate that can be invoked from multiple tasks concurrently. For example, a Channel can be used in this pattern. The allocate function passed as the second argument is used for allocating and releasing resources.

Following example computes mean(rand(UInt8, 2^15)) using /dev/urandom:

# Prepare inputs to the worker pool
results = Vector{Float64}(undef, 2^5)
works = Channel{typeof(Ref(results, 1))}(Inf)
for i in eachindex(results)
    put!(works, Ref(results, i))
end
close(works)

let buffer_length = 2^10

    # `allocate(body)` function allocates the resource and pass it to `body`:
    function allocate(body)
        open("/dev/urandom") do file
            buffer = Vector{UInt8}(undef, buffer_length)
            body((file, buffer))
        end
    end

    # The first argument to `workerpool` is a function that takes a work and a
    # resource:
    workerpool(allocate, works) do ref, (file, buffer)
        read!(file, buffer)
        ref[] = sum(buffer; init = 0.0)
    end

    sum(results) / (length(results) * buffer_length)
end
126.79141235351562

Re-distribution hacks

As of version 1.6, Julia's parallel task runtime does not migrate tasks across worker threads once a task is started. Thus, depending on when the worker pool is constructed, the above code many not distribute the tasks across worker threads. If there is no need to allocate the resource for each worker, the best solution is to use Threads.foreach(_, ::Channel). added in Julia 1.6, if you don't need to run resource = allocate() as above.

If you need to allocate the resource, a simple workaround to this problem is to spawn a new task for each call to work!. This is the strategy used in Threads.foreach.

function workerpool_redist(work!, allocate, request; kwargs...)
    workerpool(allocate, request; kwargs...) do input, resource
        wait(@spawn work!(input, resource))
    end
end

Another (less recommended) approach is to let @threads distribute the tasks

@sync @threads for _ in 1:ntasks
    allocate() do resource
        @async for input in request
            work!(input, resource)
        end
    end
end

This approach is not recommended because (1) how @threads for schedules the tasks is an implementation detail and (2) use of @async impedes migration of the tasks across OS threads (which is not implemented as of Julia 1.6 but is likely to be implemented in the future Julia versions).

Task farm

Useful for:

Pattern:

ys = Channel() do ys
    @sync for _ in 1:ntasks
        @spawn for x in xs
            put!(ys, f(x))
        end
    end
end
                           x7 .------ f -------.  f(x4)
                             /                  \
                            /  x9         f(x6)  \
[..., x12, x11, x10] ------+--------- f ----------+----------> [f(x1), f(x5), f(x2), ...]
                            \                    /
                             \                  /
                          x8  `------ f -------`  f(x3)

This is an extension of the worker pool pattern. It is useful for limiting the number of concurrent/parallel tasks. However, as the diagram above indicates, it does not preserve the ordering of input (hence u in umap for unordered):

umap(f, xs; kwargs...) = umap(f, Any, xs; kwargs...)
function umap(f, TY::Type, xs::Channel; ntasks = Threads.nthreads(), buffersize = ntasks)
    return Channel{TY}(buffersize) do ys
        @sync for _ in 1:ntasks
            @spawn for x in xs
                put!(ys, f(x))
            end
        end
    end
end

Note that the input collection xs must support concurrent iteration. To support arbitrary input collection, we can automatically wrap it in a fallback implementation:

function umap(f, TY::Type, xs; kwargs...)
    ch = Channel{eltype(xs)}() do ch
        for x in xs
            put!(ch, x)
        end
    end
    return umap(f, TY, ch; kwargs...)
end

This pattern is called the task farm algorithmic skeleton.

umap can be used like Iterators.map although the ordering is not preserved:

function slow_square(x)
    sleep(rand(0.01:0.01:0.3))
    return x^2
end

collect(umap(slow_square, 1:10; ntasks = 5))
10-element Vector{Any}:
   1
   9
  16
  49
  25
   4
 100
  36
  64
  81

Pipeline

An interesting use-case is to call umap with ntasks = 1 but with a long chain of calls:

a = umap(f, xs; ntasks = 1)
b = umap(g, a; ntasks = 1)
c = umap(h, b; ntasks = 1)
step    items in a     items in b     items in c
------- -------------- -------------- ------------
  1     a1 = f(x1)
  2     a2 = f(x2);    b1 = g(a1)
  3     a3 = f(x3);    b2 = g(a2);    c1 = h(b1)
  4     a4 = f(x4);    b3 = g(a3);    c2 = h(b2)
  5     a5 = f(x5);    b4 = g(a4);    c3 = h(b3)
  6     a6 = f(x5);    b5 = g(a4);    c4 = h(b4)
  7     a7 = f(x7);    b6 = g(a4);    c5 = h(b5)
  8     a8 = f(x7);    b7 = g(a5);    c6 = h(b6)
  9                    b8 = g(a8);    c7 = h(b7)
 10                                   c8 = h(b7)

If ntask = 1, the ordering of the input is preserved in the output. It can be used to improve the performance as long as buffersize > 0 and the length of the input is long enough. In this case, different functions (f, g, and h in the above example) can be evaluated in parallel. This pattern is called the pipeline algorithmic skeleton.

Promise (request-response)

Useful for:

The task farm pattern has an unfortunate restriction that it does not preserve the ordering of input. Can we make it work when we want to relate the input and output? One approach is to combine promise (or future) with the worker pool pattern.

Similar to the worker pool pattern, we still use a channel as the request queue (request) that the worker waits for the works. The key trick here is to send another channel (promise) over the request channel together with the input describing the work. Once the worker finish the computation, it "returns" the result by putting it in the promise channel.

function raw_service(f; ntasks = Threads.nthreads())
    request = Channel() do request
        @sync for _ in 1:ntasks
            @spawn for (x, promise) in request
                y = f(x)
                put!(promise, y)
            end
        end
    end
    return request
end

function call(request, x)
    promise = Channel(1)
    put!(request, (x, promise))
    return take!(promise)
end

adder = raw_service() do x
    return x + 1
end
try
    @assert call(adder, 0) == 1
    @assert call(adder, 1) == 2
finally
    close(adder)
end

Several variants are possible:

Another useful variant may be an asynchronous call API:

function async_call(request, x)
    promise = Channel(1)
    put!(request, (x, promise))
    return promise  # no take!
end

The caller can schedule the call and then retrieve the result (using take!) at different locations in the code. It is also useful for scheduling multiple concurrent (and potentially parallel) calls.

Improving the API

The above API using raw_service and call has a problem that the user must close the channel adder to cleanup the resources (tasks) deterministically and handle errors reliably. It's better to wrap our API so that it can be used with the do block pattern (c.f., open(f, path)) instead of explicit try-finally:

function provide(body, request)
    endpoint(x) = call(request, x)
    try
        body(endpoint)
    finally
        close(request)
    end
end

function define_service(f; kwargs...)
    open_service(body) = provide(body, raw_service(f; kwargs...))
    return open_service
end

We can use this API in two steps: (1) define the serve and then (2) "open" it using another do block:

with_adder = define_service() do x
    return x + 1
end
with_adder() do add
    @assert add(0) == 1
    @assert add(1) == 2
end

Wrapping single-thread API

This pattern is useful also as an ad-hoc interface around an API that is not safe to call from arbitrary OS threads:

function define_single_thread_service(f)
    function open_service(body)
        request = Channel() do request
            for (x, promise) in request
                y = f(x)
                put!(promise, y)
            end
        end
        provide(body, request)
    end
    return open_service
end

with_adder = define_single_thread_service() do x
    return x + 1  # call a "thread-unsafe" API here
end
with_adder() do add
    @assert add(0) == 1
    @assert add(1) == 2
end

This function makes sure (as of Julia 1.6) that f is called on the OS thread that with_adder(body) is called while add(x) can be called in any tasks.

Note that the call to the "thread-unsafe" function f becomes the bottleneck if you use define_single_thread_service(f) because every worker has to wait for the preceding calls of f. It may be a reasonable approach when the execution time of f is much smaller than the parallelized portion of your program. However, if it is not the case, it may be better to switch to process-based parallelism (using Distributed.jl, Dagger.jl, etc.) instead of threading-based parallelism.

See also

#show test results