# Parallel processing tutorial

## Quick examples

### Sequential processing

using Transducers

xs = randn(10_000_000)
foldl(+, Map(sin), xs)
-2122.9415394848356

Just replace foldl with foldxt, to make use of multiple cores:

foldxt(+, Map(sin), xs)
-2122.9415394848356

(In my laptop (4 core machine) I start seeing some speedup around length(xs) ≥ 100_000 for this transducer and reducing function.)

### Process-based parallelism

using Distributed

foldxd(+, Map(sin), xs)
-2122.941539484855

(Note: there is likely no speedup for light-weight computation and large input data like this, when using foldxd.)

### Parallel processing with iterator comprehensions

You can also use parallel processing functions in Transducers.jl such as foldxt, foldxd, tcollect, dcollect, tcopy and dcopy with iterator comprehensions:

foldxt(+, (sin(x) for x in xs if abs(x) < 1); basesize = 500_000)
-2709.9215854671284
foldxt(+, (x * y for x in 1:3, y in 1:3))
36
tcollect(sin(x) for x in xs if abs(x) < 1)
6826957-element Vector{Float64}:
0.8083648068107231
0.8148131866918079
-0.06530310134381154
-0.4532030074249769
0.6593385615314898
-0.2423080638801703
-0.6906331868077892
0.5256500911943072
-0.10440552230971814
0.6863217796085403
⋮
-0.4784403274943886
-0.46761144097083196
0.566325738864871
-0.5925200365275034
0.5708141670411829
0.3413017229426536
0.39979088181796324
0.7737403935869056
0.7607067950159715
using StructArrays: StructVector
table = StructVector(a = [1, 2, 3], b = [5, 6, 7])

tcopy((A = row.a + 1, B = row.b - 1) for row in table if isodd(row.a))
2-element StructArray(::Vector{Int64}, ::Vector{Int64}) with eltype NamedTuple{(:A, :B), Tuple{Int64, Int64}}:
(A = 2, B = 4)
(A = 4, B = 6)

## When can I use foldxt and foldxd?

### Requirement 1: Associative reducing step function

Parallel reductions such as foldxt and foldxd requires associative reducing step function. Recall that associativity means that the grouping of evaluations of binary operator does not matter:

op = +  # for example
a, b, c = 1, 2, 3  # for example

@assert op(op(a, b), c) == op(a, op(b, c))

Given this property, computations like a + b + c + d can be done with different "groupings":

  a + b + c + d

= ((a + b) + c) + d                +
/ \
+   d
/ \
+   c           foldl-like grouping
/ \
a   b

= (a + b) + (c + d)                +
/ \
/   \
/     \        reduce-like grouping
+       +
/ \     / \
a   b   c   d

Notice that, in the last grouping, computation of c + d does not have to wait for the result of a + b. This is why we need associativity for parallel execution.

Note

Do not confuse associativity with commutativity op(a, b) = op(b, a). For example, matrix multiplication *(::Matrix, ::Matrix) is associative but not commutative in general. However, since foldxt only requires associativity, it is valid to use foldxt(*, xf, matrices).

As reducing function + is associative, it can be used with foldxt (and foldxd):

foldxt(+, Map(identity), 1:10; init = 0, basesize = 1)
55

and the result is the same as the sequential version:

foldl(+, Map(identity), 1:10; init = 0)
55

Note: basesize is for forcing foldxt to avoid falling back to foldl for small length container such as 1:10.

On the other hand, binary function - is not associative. Thus, foldxt cannot be used instead of foldl (they produce different result):

foldxt(-, Map(identity), 1:10; init = 0, basesize = 1)
-5
foldl(+, Map(identity), 1:10; init = 0)
55

### Requirement 2: stateless transducers

Parallel reduction only work with stateless transducers Map, Filter, Cat, etc. and you will get an error when using stateful transducers such as Scan with foldxt or foldxd:

foldxt(+, Scan(+), 1:10; basesize = 1)
ERROR: Stateful transducer Scan(+) does not support combine

Stateful transducers cannot be used with foldxt because it is impossible to start processing input collection from the middle when the transducers need to know all previous elements (= stateful).

ScanEmit is a stateful transducer but it is assumed that it is used in a context that outputs can be treated as stateless (see: Splitting a string into words and counting them in parallel).

## Example: parallel collect

Note

This section explains the implementation ideas of parallel collect. Pre-defined functions such as tcopy and dcopy should cover many use-cases.

Suppose (pretend) there is a compute-heavy transducer:

xf_compute = opcompose(Filter(!ismissing), Map(x -> x^2))

Transducers.jl supports applying this to an input container and then collecting the results into another container. It can be done sequentially (collect, copy, etc.) and in parallel using threads (tcollect, tcopy) or using multiple processes (dcollect, dcopy). For example:

xs = [abs(x) > 1 ? missing : x for x in randn(10_000)]
y1 = collect(xf_compute, xs)

Doing this in parallel is as easy as using tcollect or dcollect. However, it is easy to do this manually, too:

using BangBang: append!!

singleton_vector(x) = [x]
y2 = foldxt(append!!, xs |> xf_compute |> Map(singleton_vector))
@assert y1 == y2

This code illustrates the common pattern in parallel processing:

1. Put a result from the transducer in a "singleton solution". Here, it is [x].

2. Then "merge" the (singleton) solution into the exsiting one. This is done by append!! in the above example.

To illustrate how foldxt(append!!, xs |> ... |> Map(singleton_vector)) works, let's create a reducing function that records the arguments and returned values of append!!:

chan = Channel(Inf)

function append_and_log!!(a, b)

As arguments and output may be mutated later, we use copy to record the snapshots of their values at this moment:

    a0 = copy(a)
b0 = copy(b)
c = append!!(a, b)
put!(chan, (a0, b0) => copy(c))
return c
end

This function can be used instead of append!!. Let's try simpler and shorter example. This is equivalent to collect(1:4):

foldxt(append_and_log!!, Map(singleton_vector), 1:4; basesize = 1, init = Union{}[])
4-element Vector{Int64}:
1
2
3
4

(See below for why we are using init = Union{}[] here.)

Here is the list of arguments and returned value of append!! in this reduction:

records = Pair[]
push!(records, take!(chan))
end
records
7-element Vector{Pair}:
(Union{}[], [1]) => [1]
(Union{}[], [3]) => [3]
(Union{}[], [2]) => [2]
(Union{}[], [4]) => [4]
([1], [2]) => [1, 2]
([3], [4]) => [3, 4]
([1, 2], [3, 4]) => [1, 2, 3, 4]

This recorded inputs and outputs of append!! show that its "call tree" is:

          [1,2,3,4] <------------- append!!([1,2], [3,4]) == [1,2,3,4]
/         \
[1,2]           [3,4] <------- append!!([3], [4]) == [3, 4]
/     \         /     \
[1]     [2]     [3]     [4] <---- append!!([], [4]) == [4]
/ \     / \     / \     / \
[] [1]  [] [2]  [] [3]  [] [4]

Compare this to the example a + b + c + d above.

### Optimization and generic container handling

Above usage of foldxt is not quite efficient as singleton_vector allocates small objects in the heap. Thus, it makes sense to use immutable objects for the singleton solutions so that Julia compiler can eliminate allocation of the intermediate singleton solutions. Here, this can be done by simply using SVectorinstead of singleton_vector:

using StaticArrays: SVector

foldxt(append!!, Map(SVector), 1:4)
4-element StaticArrays.SVector{4, Int64} with indices SOneTo(4):
1
2
3
4

However, notice that the return value is a static vector. This is not ideal when the input collection is large. The output collection type can be specified by init. We can simply use init = Union{}[] in this case:

foldxt(append!!, Map(SVector), 1:4; init = Union{}[])
4-element Vector{Int64}:
1
2
3
4

Note that passing Vector to init of foldxt is usually a wrong choice as it would mean that the same object is simultaneously mutated by different threads. However, since Vector{Union{}} cannot have any element (as there is no object of type Union{}), using Union{}[] for init is an exception and it is a good way to indicate that output vector should use the "smallest" eltype required. That is to say, append!! widens the vector "just enough" to fit the resulting elements.

For generic containers (e.g., various table types), use BangBang.Empty as the empty initial value. This is useful for creating a table object such as DataFrame as the result of parallel processing:

using BangBang: Empty
using DataFrames: DataFrame

foldxt(append!!, Map(x -> SVector((a = x,))), 1:4; init = Empty(DataFrame))
4×1 DataFrame
Row │ a
│ Int64
─────┼───────
1 │     1
2 │     2
3 │     3
4 │     4

It is slightly more tricky to make this approach work with other table types such as StructArrays and TypedTables. Use tcopy or dcopy to work with generic containers.

## Example: ad-hoc histogram

Following example counts number of occurrence of each leading digit in a distribution of random numbers. First, let's create "singleton solutions" using transducers:

xs = 1_000_000 * randn(10_000_000)
dicts1 = xs |> Map(abs) |> Filter(x -> x > 1) |> Map() do x
y = digits(floor(Int, x))[end]
Dict(y => 1)
end

The singleton solutions can be merged using mergewith!(+, a, b). Conveniently, mergewith!(+) is the curried form (args...) -> mergewith!(+, args...):

using Compat: mergewith!  # not required in Julia >= 1.5
rf! = mergewith!(+)
rf!(Dict(:a => 1, :b => 2), Dict(:b => 3, :c => 4))
Dict{Symbol, Int64} with 3 entries:
:a => 1
:b => 5
:c => 4

This is the form of binary function appropriate for foldl and foldxt.

Note that it is OK to use in-place function mergewith! here because the dictionary passed as a is created by Dict(y => 1) and not shared by anyone. When there is no such guarantee, passing init = OnInit(Dict{Int,Int}) is a good option. Note that passing init = Dict{Int,Int}() to foldxt is not correct as multiple tasks would share and try to mutate the same dictionary this way.

Let's try this with parallel foldxt:

counts1 = foldxt(mergewith!(+), dicts1)

Compare the result with foldl:

counts2 = foldl(mergewith!(+), dicts1)
@assert counts1 == counts2

Hopefully the result is close to the Benford's law - Wikipedia:

let n = sum(values(counts1))
sort!(keys(counts1) .=> values(counts1) ./ n)
end
9-element Vector{Pair{Int64, Float64}}:
1 => 0.3594709594709595
2 => 0.12891542891542893
3 => 0.0865062865062865
4 => 0.08103318103318104
5 => 0.07736777736777736
6 => 0.07341727341727342
7 => 0.06906966906966908
8 => 0.06452776452776453
9 => 0.05969165969165969

Since we are counting only nine elements, it is actually better to use fixed-size container such as a tuple in this case:

dicts2 = xs |> Map(abs) |> Filter(x -> x > 1) |> Map() do x
y = digits(floor(Int, x))[end]
ntuple(i -> i == y, 9)
end

counts3 = foldxt(dicts2; init=ntuple(_ -> 0, 9)) do a, b
map(+, a, b)
end
@assert Dict(zip(1:9, counts3)) == counts1

Note that, as tuples are immutable, it is valid to pass it as init of foldxt.

### MicroCollections.jl for efficient singleton solution

When the appropriate "bins" are not known, mergewith!(+)-based strategy is more appropriate. However, it is not ideal to allocate a small container like Dict(y => 1) in the heap for each iteration. MicroCollections.jl provides singleton (and empty) containers that are designed for this usecase. The SingletonDict is "upcast" to the mutable Dict in the first invocation when merged with BangBang.jl functions:

using BangBang: mergewith!!
using MicroCollections: SingletonDict

acc1 = mergewith!!(+, SingletonDict(:a => 1), SingletonDict(:b => 1))
Dict(:a => 1, :b => 1)

This dictionary is reused in the subsequent iterations:

acc2 = mergewith!!(+, acc1, SingletonDict(:b => 1))
Dict(:a => 1, :b => 2)
acc3 = mergewith!!(+, acc2, SingletonDict(:c => 1))
Dict(:a => 1, :b => 2, :c => 1)

The first result is reused across these iterations (within a single thread).

@assert acc1 === acc2 === acc3

Finally, Dicts from different threads are merged using the same function mergewith!!(+):

acc4 = Dict(:a => 5, :c => 3)  # from different thread
acc5 = mergewith!!(+, acc3, acc4)
Dict(:a => 6, :b => 2, :c => 4)

Thus, dicts1 can be optimized simply by replacing Dict(y => 1) with SingletonDict(y => 1):

dicts3 = xs |> Map(abs) |> Filter(x -> x > 1) |> Map() do x
y = digits(floor(Int, x))[end]
SingletonDict(y => 1)
end

counts4 = foldxt(mergewith!!(+), dicts3)
@assert counts1 == counts4

## Example: early termination

Find the first element that is multiple of three:

foldxt(ReduceIf(x -> x % 3 == 0), 1:10; init = nothing, basesize = 1) do _, x
# # Uncomment for demo:
# x == 3 ? sleep(0.1) : @show x  # give other tasks a chance to finish first
return x
end
3

This snippet always returns 3, even though the reductions for c = 6 and c = 9 may finish first.