As discussed in a quick introduction to data parallelism, data parallel style lets us write fast, portable, and generic parallel programs. One of the main focuses was to unlearn the "sequential idiom" that accumulates the result into mutable state. However, mutable state is sometimes preferred for efficiency. After all, a fast parallel program is typically a composition of fast sequential programs. Furthermore, managing mutable states is sometimes unavoidable for interoperability with libraries preferring or requiring mutation-based API. However, sharing mutable state is almost always a bad idea. Naively doing so likely results in data races and hence programs with undefined behaviors. Although low-level concurrency APIs such as locks and atomics can be used for writing (typically) inefficient but technically correct programs, a better approach is to use single-owner local mutable state [1]. In particular, we will see that unlearning sequential idiom was worth the effort since it points us to what we call ownership-passing style that can be used to construct mutation-based parallel reduction from mutation-free ("purely functional") reduction as an optimization.
This tutorial provides an overview of the mutable object handling in data-parallel Julia programs [2]. It also discusses the effect and analysis of false sharing which is a major performance pitfall when using in-place operations in a parallel program.
| [1] | Locks and atomics are important building blocks for concurrent programming and non-blocking algorithms and data structures are very useful for high-performance applications. Although these aspects become non-negligible for squeezing out the "last bits" of the performance, here, we focus on how to construct parallel programs independent of how the synchronizations and scheduling are managed. This is the key for writing portable and correct parallel programs. See also: concurrency is not parallelism. |
| [2] | If you are familiar with the approach using threadid and wonder why it is not discussed here, take a look at What is the difference of @reduce and @init to the approach using state[threadid()]? · FAQ · FLoops. |
As a starting point, let us consider the following program that computes a sum of products of matrices: [3]
N = 40
M = 1000
As = [randn(N, N) for _ in 1:M]
Bs = [randn(N, N) for _ in 1:M]
sum(A * B for (A, B) in zip(As, Bs)) | [3] | If you can store the inputs as AbstractArray{T,3}s, it may be better to use higher-level data parallel libraries such as Tullio.jl and TensorOperations.jl. |
using Folds
Folds.sum(A * B for (A, B) in zip(As, Bs)) This program is suboptimal since it allocates temporary arrays for the multiplications and summations. To clarify the source of allocations, let us translate the above code using @floop (see FLoops.@reduce below for how it works):
using FLoops
@floop for (A, B) in zip(As, Bs)
C = A * B # allocation for each iteration
@reduce() do (S = zero(C); C)
S = S + C # allocation for each iteration
end
end We can eliminate the allocation of A * B by using LinearAlgebra.mul! and the allocation of S + C by using the inplace broadcasting updates S .+= C. However, we cannot allocate arrays C and S outside @floop because then they will be shared across multiple tasks (and causes data races). Fortunately, we can use @init macro for allocating "private" temporary array C and the "init clause" of @reduce macro (i.e., zero(C) in the code below):
using LinearAlgebra: mul!
@floop for (A, B) in zip(As, Bs)
@init C = similar(A)
mul!(C, A, B)
@reduce() do (S = zero(C); C)
S .+= C
end
end In above code, similar(A) and zero(C) are executed only once in each task and their results are re-used. The result S₁ from task 1 and result S₂ from task 2 are combined using the reduction specified by @reduce(); i.e., S₁ .+= S₂.
The previous program provides a decent performance for a straightforward piece of code. However, we can further optimize the program by using fused multiply-add provided by the 5-argument method mul!(C, A, B, α, β). We can use this method for the base cases (where we have matrices A and B) but we need to use .+= when combining the base cases. This can be done by dispatching on the type of the second argument of @reduce:
using FLoops
using LinearAlgebra: mul!
@floop for (A, B) in zip(As, Bs)
C = (A, B)
@reduce() do (S = zero(A); C)
if C isa Tuple # base case
mul!(S, C[1], C[2], 1, 1)
else # combining base cases
S .+= C
end
end
end Let's discuss different kinds of mutability in parallel programs separately:
Filling outputs
In-place reductions
Mutable temporary objects (private variables)
Perhaps the simplest use of mutation in parallel program is filling pre-allocated output.
using FLoops
xs = 1:2:100
ys = similar(xs) # output
@floop ThreadedEx() for (i, x) in pairs(xs)
@inbounds ys[i] = gcd(42, x)
end This loop can also be written as Threads.@threads:
xs = 1:2:100
ys = similar(xs) # output
Threads.@threads for i in eachindex(xs, ys)
@inbounds ys[i] = gcd(42, xs[i])
end A more succinct approach is Folds.map!:
using Folds
xs = 1:2:100
ys = similar(xs) # output
Folds.map!(x -> gcd(42, x), ys, xs) This type of parallel mutation relies on that different tasks mutate disjoint set of memory locations. The correctness of the above code examples rely on that ys is, e.g., an Array. That is to say, updating each element ys[i] only updates data at disjoint memory location for different index i and does not depend on the memory locations updated by other tasks. However, it is not the case for more complex data collections such as
Certain views such as ys = view([0], [1, 1, 1, 1]). Mutating ys[1] mutates ys[2], ys[3], and ys[4].
BitArray: ys[i] and ys[i+1] may be stored in a single UInt64.
SparseMatrixCSC, SparseVector: Assigning a value to a previously zero index requires moving data for other non-zero elements internally.
Dict: inserting a new key-vale pair mutates memory locations shared by other tasks.
These are all examples of data races: there are multiple unsynchronized concurrent accesses and at least one of the accesses is write.
On the other hand, non-Array types can also be used safely. For example, ys = view(::Vector, 1:50) can be used instead of a Vector since ys[i] and ys[j] (i ≠ j) refer to disjoint memory locations.
Many sequential programs compute the result by mutating some states; e.g., appending elements to a vector. This approach is very efficient and is useful as a base case of parallel programs. There are several approaches to safely use such sequential reductions in parallel programs.
FLoops.@reduce@reduce(acc = op(init, input)) exampleFLoops.jl is a package for a flexible set of syntax sugar for constructing parallel loops. In particular, we can use @reduce(acc = op(init, input)) syntax for writing parallel reduction:
using FLoops
@floop for x in 1:10
if isodd(x)
@reduce(odds = append!(Int[], (x,)))
else
@reduce(evens = append!(Int[], (x,)))
end
end Here, we use @reduce with the following syntax
# @reduce($acc = $op( $init, $input))
@reduce(odds = append!(Int[], (x,)))
# ~~~~ ~~~~~~~ ~~~~~ ~~~~
# | | | |
# | | | Input to reduction
# | | |
# | | Initialization of the accumulator
# | |
# | Reducing function (aka binary operator, monoid)
# |
# Accumulator (result of the reduction) The @reduce macro is used for generating two types of code (function). First, it is used for generating the base case code. The base case code is generated by (roughly speaking):
remove @reduce( and the corresponding )
replace $init in the first argument with $acc
put $acc = $init in front of the loop
i.e.,
function basecase(chunk)
odds = Int[] # init
evens = Int[] # init
for x in chunk
if isodd(x)
odds = append!(odds, (x,))
else
evens = append!(evens, (x,))
end
end
return (odds, evens)
end Input collection to @floop for loop is split into chunks first[4]. For example, if julia is started with --threads=2, it is split into two chunks by default:
chunk_left = 1:5
chunk_right = 6:10
@assert vcat(chunk_left, chunk_right) == 1:10 # original input Each chunk is then processed by (a function equivalent to) the basecase function above:
odds_left, evens_left = basecase(chunk_left)
odds_right, evens_right = basecase(chunk_right)
@assert odds_left == 1:2:5
@assert evens_left == 2:2:5
@assert odds_right == 7:2:10
@assert evens_right == 6:2:10 The function append! specified by @reduce is used also for merging these base case results:
odds = append!(odds_left, odds_right)
evens = append!(evens_left, evens_right)
@assert odds == 1:2:10
@assert evens == 2:2:10 When there are more than two chunks, the reduction results are merged pair-wise (default [4]) or sequentially, depending on the executor used.
| [4] | By default, JuliaFolds packages use divide-and-conquer approach for scheduling parallel loops. Roughly speaking, it "fuses" splitting of the collections and scheduling the parallel tasks. It also "fuses" the merges of reduction results and joining of the parallel tasks. This increases parallelism of the entire computation compared to more naive sequential scheduling. However, FLoops.jl itself is just a syntax sugar for defining parallel reduction and completely decoupled from how these reductions are computed. The exact execution strategy can be determined by passing the executor. |
Note that the above parallel reduction does not incur data races because
The first arguments to append! are created for each base case,
append! mutates the first argument and returns it, and
append! is used in such a way that the first argument (more specifically its state at which append! is called) is never used.
Therefore, we can treat append!as if it were a pure function for the purpose of understanding this parallel reduction. In other words, we never observe the side-effect of append! through the argument. It's easy to see that the above program is correct even if we replace append! with its mutation-free "equivalent" [5] function vcat:
| [5] | Note that the equivalence is not quite exact. We replace (x,) with [x] since append! and vcat behave differently when the input is non an array. |
using FLoops
@floop for x in 1:10
if isodd(x)
@reduce(odds = vcat(Int[], [x]))
else
@reduce(evens = vcat(Int[], [x]))
end
end This observation points to a powerful recipe for constructing efficient parallel reduction:
Write down parallel reduction without using mutation.
Re-write the reducing function (the body of @reduce or the binary function op passed to reduce etc.; i.e., monoid) by mutating the first argument.
Make sure that subsequent iterations do not mutate the second argument (See the discussion below)
It can be used for general reducing functions (op) specified via @reduce macro as well as the functions passed to higher-order functions such as reduce in all JuliaFolds packages. Furthermore, this style allows us to replace append! with BangBang.append!! which is very useful for collecting elements when their type cannot be determined or hard to do so a priori. For lack of better words, let us call it ownership-passing style[6] (a non-standard terminology). This is *because the ownership of dest in dest′ = append!(dest, src) is first transferred to append! which then it is transferred back to the caller as the return value dest′.
Note that there is a subtlety when it comes to the ownership of the second argument. See the discussion below.
| [6] | In Scheme Requests for Implementation (SRFI) 1, this is called linear update which in turn is taken from the linear type system. This type of operations is called recycling operations in Practical Common Lisp. |
@reduce() do example@reduce() do syntax can be used for defining a more flexible reduction (see also the example section above). Here is a simple example
@floop for n in 1:10
xs = [n, 2n, 3n]
@reduce() do (ys = zeros(Int, 3); xs)
# ~~~~~~~~~~~~~~~~~~
# initializer
ys .+= xs
# ~~~~~~~~~
# reduce body
end
end
@assert ys == mapreduce(n -> [n, 2n, 3n], .+, 1:10) The base case code is equivalent to the loop transformed by:
remove @reduce() do ($acc₁ = $init₁; $input₁), …, ($accₙ = $initₙ; $inputₙ) and the corresponding end and keep the reduce body
put the initializers $accᵢ = $initᵢ in front of the loop
i.e.,
function basecase(chunk)
ys = zeros(Int, 3) # initializer
for n in chunk
xs = [n, 2n, 3n]
ys .+= xs # reduce body
end
return ys
end Similar to odds-evens example above, the input collection is chunked and then processed in multiple tasks:
chunk_left = 1:5
chunk_right = 6:10
ys_left = basecase(chunk_left)
ys_right = basecase(chunk_right) Finally, the base case results are merged by using the body of the @reduce() do block (here, just .+=):
ys_left .+= ys_right
ys = ys_left
@assert ys == mapreduce(n -> [n, 2n, 3n], .+, 1:10) @reduce() do syntaxIn general, @reduce() do takes the following form:
@reduce() do ($acc₁ = $init₁; $input₁),
($acc₂ = $init₂; $input₂),
…
($accₙ = $initₙ; $inputₙ)
# ~~~~ ~~~~~ ~~~~~~
# | | |
# | | Input to reduction (computed outside `@reduce`)
# | |
# | Initialization of the accumulator
# |
# Accumulator (result of the reduction)
$body
# ~~~~~
# Reducing function (aka binary operator, monoid)
end This expression is used to generate the following function op for merging two set of accᵢs from two tasks
function op(accs_left, accs_right)
($acc₁, $acc₂, …, $accₙ) = accs_left
($input₁, $input₂, …, $inputₙ) = accs_right
$body
return ($acc₁, $acc₂, …, $accₙ)
end which is invoked as
accs = op(accs_left, accs_right)
to merge the results of "left" and "right" tasks. When using @reduce($acc =
$op($init, $input)) syntax, the function $op is used as-is.
Note that the roles of $accᵢs and $inputᵢs are almost "symmetric" in the sense that $body has to be able to handle any value of $accᵢ provided as a $inputᵢ.
The reducing function must be associative; i.e., the results of
op(op(a, b), c)
and
op(a, op(b, c))
must be equivalent in some sense (e.g., isapprox may be enough in some cases; the result of @floop is still deterministic unless a nondeterministic executor is specified or the input collection is unordered).
Furthermore, since the function op has to be executable outside the scope of the sequential loop, it must not use variables whose scope is inside of @floop but outside of @reduce. That is to say, it must only access variables $accᵢs and $inputᵢs or the names defined outside @floop:
using SomeModule: f
function example()
...
x = ...
@floop for ...
y = ...
z = ...
@reduce() do (acc; y)
acc ✅ # accessible (accumulator)
f ✅ # accessible (global)
x ✅ # accessible (defined outside @floop)
y ✅ # accessible (passed as an input)
z ❌ # NOT accessible (not passed as an input)
...
end
end
...
end
These requirements for associativity and variable scoping typically can be achieved by "minimizing" the computation done inside @reduce. The following example is incorrect since the body of @reduce is doing an "extra work":
@floop for n in 1:10
xs = [n, 2n, 3n]
@reduce() do (ys = zeros(Int, 3); xs)
ys .+= 2 .* xs # INCORRECT
end
end
A correct implementation is to move the computation 2 .* xs out of @reduce.
@floop for n in 1:10
xs = [n, 2n, 3n]
zs = 2 .* xs # CORRECT
@reduce() do (ys = zeros(Int, 3); zs)
ys .+= zs
end
end
@assert ys == mapreduce(n -> 2 .* [n, 2n, 3n], .+, 1:10)
The allocation of temporary variables such as zs can be eliminated by using private variables (see below). It is also possible to fuse the computation of 2 .* _ and _ .+= _ if done carefully (See the example above for how to fuse computation only in the base case).
Consider the following program that computes sum of arrays
vectors = Any[[n n^2; n^3 n^4] for n in 1:100]
sum(vectors)
Since the element type of the input is unknown, we can't pre-compute the output array type. It may then be tempting to use the first input as the accumulator:
vectors = Any[[n n^2; n^3 n^4] for n in 1:100]
@floop for xs in vectors
@reduce() do (ys = nothing; xs)
if ys === nothing
ys = xs # ❌ WRONG
else
ys .+= xs
end
end
end
@assert ys === vectors[1] # above loop mutated the input
However, as you can see in the @assert statement above, this loop mutated the first element vectors[1]. This is probably not a desirable outcome in many cases (although it may not be problem in specific use cases). Thus, in general, we should assume that the reducing function does not own second argument when using the ownership-passing style. Therefore, we need to copy the second argument when using it as the accumulator.
vectors = Any[[n n^2; n^3 n^4] for n in 1:100]
@floop for xs in vectors
@reduce() do (ys = nothing; xs)
if ys === nothing
ys = copy(xs) # ✅ CORRECT
else
ys .+= xs
end
end
end
@assert ys !== vectors[1] # input not mutated
Transducers.OnInitTransducers.OnInit(f) can be passed as init argument in many JuliaFolds API. It calls the zero-argument function f that creates the accumulator for each base case:
using Folds
using Transducers: OnInit
ys = Folds.mapreduce(x -> (x, 2x, 3x), .+, 1:10; init = OnInit(() -> [0, 0, 0]))
@assert ys == [sum(1:10), 2sum(1:10), 3sum(1:10)]
When each iteration produce a container, there is usually a "default" way to combine all the containers produced. For basic containers such as vectors, sets and dictionaries, Base already define appropriate functions:
| Container | Pure function | In-place function |
|---|---|---|
| vector | vcat | append! |
| set | union | union! |
| dictionary | merge | merge! |
mergewith(op) | mergewith!(op) |
(These are not the only associative operations on these containers. For example, union works on vectors, too. intersect defines another associative operations on sets.)
The corresponding containers can be constructed in parallel by feeding sub-containers into these "merging" functions:
using Folds
using Transducers: OnInit
ys1 = Folds.mapreduce(x -> [x], append!, 1:10; init = OnInit(() -> Int[]))
ys2 = Folds.mapreduce(x -> Set([x]), union!, 1:10; init = OnInit(Set{Int}))
ys3 = Folds.mapreduce(x -> Dict(x => x^2), merge!, 1:10; init = OnInit(Dict{Int,Int}))
ys4 = Folds.mapreduce(x -> Dict(isodd(x) => 1), mergewith!(+), 1:10; init = OnInit(Dict{Bool,Int}))
@assert ys1 == 1:10
@assert ys2 == Set(1:10)
@assert ys3 == Dict(x => x^2 for x in 1:10)
@assert ys4 == Dict(false => 5, true => 5)
However, it is suboptimal to heap-allocate these singleton containers (containers with single element). We can use special singleton containers from MicroCollections.jl to avoid heap allocations. Another downside of the approach using functions such as append! is that they require specifying element type before the reduction is started. This is sometimes impossible or very tedious. We can avoid it by using the !! functions from BangBang.jl. For example, BangBang.append!!(ys, xs) may return a new array without mutating ys if the element type of ys is not appropriate for storing xs. Thus, in the parallel reduction context, BangBang.jl functions can be used more like pure functions except that the objects passed as the first argument cannot be re-used again.
using Folds
using MicroCollections
using BangBang
ys1 = Folds.mapreduce(x -> SingletonVector((x,)), append!!, 1:10)
ys2 = Folds.mapreduce(x -> SingletonSet((x,)), union!!, 1:10)
ys3 = Folds.mapreduce(x -> SingletonDict(x => x^2), merge!!, 1:10)
ys4 = Folds.mapreduce(x -> SingletonDict(isodd(x) => 1), mergewith!!(+), 1:10)
Since these are common idioms, Folds.jl has shorthands for the first three cases:
using Folds
ys1 = Folds.collect(1:10)
ys2 = Folds.set(1:10)
ys3 = Folds.dict(x => x^2 for x in 1:10)
OnlineStatsOnlineStats.jl is a rich collection of composable single-pass algorithms for computing various statistics. Although OnlineStats.jl itself only provides in-place operations, Transducers.jl defines a compatibility layer (using OnInit etc.) that treat mergeable OnlineStats as monoids. Folds.reduce probably is the easiest API to use:
using OnlineStats
using Folds
Folds.reduce(Mean(), 1:10)
Mean: n=10 | value=5.5
We can also use FLoops.@reduce directly with OnlineStats.jl. The key point is to make the body of @reduce "symmetric in type"; i.e., pass Mean at both arguments:
using FLoops
using OnlineStats
@floop for x in 1:10
y = 2x
m = fit!(Mean(), y)
@reduce() do (acc = Mean(); m)
merge!(acc, m)
end
end
@assert acc == fit!(Mean(), 2:2:20)
It is also possible to get rid of the intermediate Mean object by "fusing" fit! and merge! in the base case. (However, this optimization may not be required since the compiler is likely to optimize away the creation of intermediate Mean object m.)
@floop for x in 1:10
y = 2x
@reduce() do (acc = Mean(); y)
if y isa OnlineStat
merge!(acc, y)
else
fit!(acc, y)
end
end
end
@assert acc == fit!(Mean(), 2:2:20)
When using in-place reductions, mutable accumulators must be specified carefully to avoid sharing them across tasks. For example,
Folds.mapreduce(x -> (x,), append!, xs; init = [])
is not a correct parallel program since it mutates the array [] in multiple tasks. The APIs such as FLoops.@reduce discussed above can be used to avoid this problem when used correctly. However, it's still possible to misuse the API. For example,
using FLoops
shared_acc = [] # WRONG (shared across tasks and mutated concurrently)
@floop for x in 1:10
ys = (x,)
@reduce(acc = append!(shared_acc, ys))
end
has exactly the same problem as init = [].
In addition to pre-allocated objects or mutable accumulators, it is sometimes necessary to have mutable temporary objects. While temporary objects are technically equivalent to accumulators that are ignored after the loop, it is convenient to have a dedicated API. This is why FLoops.jl has @init macro that declares "private variables" for re-using mutable temporary objects within each base case:
using FLoops
@floop for x in 1:10
@init xs = Vector{Int}(undef, 3)
xs .= (x, 2x, 3x)
@reduce() do (ys = zeros(Int, 3); xs)
ys .+= xs
end
end
@assert ys == mapreduce(x -> [x, 2x, 3x], .+, 1:10)
The right hand sides of the assignments in @init is executed only at the first iteration of each base case.
Many parallel APIs in Julia, including @threads and @spawn, require creating closures. Thus, it is a common mistake to accidentally expand the scope of local variables. For example, the scope of y in the following program is larger than @threads for loop. As a result, the update of y is shared across all tasks and this function has a data race:
function f(n = 2^10)
ys = zeros(Int, n)
Threads.@threads for i in 1:n
y = gcd(42, i)
some_function()
ys[i] = y
end
# Suppose that some unrelated code uses the same variable names as the
# temporary variables in the parallel loop:
if ys[1] > 0
y = 1
end
return ys
end
# Some function that Julia does not inline:
@noinline some_function() = _FALSE_[] && error("unreachable")
const _FALSE_ = Ref(false)
The data race is observable if there is a non-inlinable function call (some_function() in the example) between the definition and the use of y. If we run the above function multiple times in a julia process started with multiple worker threads, we can observe that the result is different from the expected value:
f_seq(n = 2^10) = gcd.(42, 1:n)
julia> ys = f_seq()
for i in 1:100
@assert f() == ys
end
ERROR: AssertionError: f() == ys
We can use the local keyword to fix it:
Threads.@threads for i in 1:n
local y = gcd(42, i) # added `local`
some_function()
ys[i] = y
end
Alternatively, if the loop body of Threads.@threads or the thunk of Threads.@spawn is large, it is a good idea to factor it out as a function. It prevents assignments inside the closure and hence the data races. It also makes analyzing base case performance easier.
Using FLoops.jl instead of @threads is useful to prevent this bug. Consider an equivalent parallel loop written with FLoops.jl:
using FLoops
function f_floop(n = 2^10)
ys = zeros(Int, n)
@floop ThreadedEx() for i in 1:n
y = gcd(42, i)
some_function()
ys[i] = y
end
if ys[1] > 0
y = 1
end
return ys
end
FLoops.jl detects that the variable y is now shared across all tasks:
julia> f_floop()
ERROR: HasBoxedVariableError: Closure __##reducing_function#258 (defined in Main) has 1 boxed variable: y
HINT: Consider adding declarations such as `local y` at the narrowest possible scope required.
As the error message indicates, local y = gcd(42, i) also fix the issue in @floop.
Even when parallel programs are correctly written to handle mutable objects, it may not perform well due to false sharing. False sharing can occur when the code running on different CPUs update adjacent (but disjoint) memory locations. The program is still data race-free (not "true" sharing) since it does not simultaneously mutate the same memory locations. However, since the CPU manages the data in a unit (cache line) larger than single bytes, the CPUs have to communicate each other to maintain the coherent view of the data and avoid such simultaneous modification of the "same" memory location (from the point of view of the CPUs). This extra communication slows down the program.
state[threadid()] pattern to create "thread-local storage" for reduction or private variables is more likely to invoke false-sharing unless used carefully (e.g., padding). More importantly, it is very hard to use threadid-based approach correctly since there is no systematic way to locate or restrict yield points. See also What is the difference of @reduce and @init to the approach using state[threadid()]? · FAQ · FLoops.Let us use the following functions [7] [8] to demonstrate the effect of false sharing:
| [7] | The effect of false sharing depends on the actual CPU used. This example showed the effect of false sharing when experimented on several CPUs from different vendors (Intel, AMD, and IBM). However, a simpler example did not show false sharing in some CPUs. |
| [8] | Minor details: these functions use @threads :static scheduling to force distribution of multiple tasks to the worker (OS) threads. Each array in yss includes a space at least as large as a cache line at the end to avoid false sharing as much as possible (although it can be done more parsimoniously).
|
function crowded_inc!(ys, data)
Threads.@threads :static for indices in data
for i in indices
@inbounds ys[i] += 1
end
end
end
function exclusive_inc!(yss, data)
Threads.@threads :static for indices in data
ys = yss[Threads.threadid()]
for i in indices
@inbounds ys[i] += 1
end
end
end
cacheline = try
parse(Int, read("/sys/devices/system/cpu/cpu0/cache/index0/coherency_line_size", String))
catch err
@warn "cannot read cache line size" exception = (err, catch_backtrace())
64
end
ys = zeros(Threads.nthreads() * 2);
partitioned_indices = reshape(eachindex(ys), Threads.nthreads(), :)'
data = [rand(partitioned_indices[:, i], 2^20) for i in 1:Threads.nthreads()]
yss = [zeros(length(ys) + cld(cacheline, sizeof(eltype(ys)))) for _ in 1:Threads.nthreads()];
The functions crowded_inc! and exclusive_inc! perform almost the same computation and use exactly the same access pattern on the output array ys for each task. In crowded_inc!, since multiple tasks (that are likely to be scheduled on different CPUs) try to update nearby memory locations concurrently, it invokes false sharing which can be observed as the slow down compared to exclusive_inc! (see below). This does not occur in exclusive_inc! where each task updates an array dedicated to it (we also made sure that these accesses are at least one cache line apart).
julia> using BenchmarkTools
julia> @btime crowded_inc!(ys, data) setup = fill!(ys, 0);
95.385 ms (43 allocations: 3.70 KiB)
julia> @btime exclusive_inc!(yss, data) setup = foreach(ys -> fill(ys, 0), yss);
1.801 ms (41 allocations: 3.64 KiB)
julia> versioninfo()
Julia Version 1.6.0
Commit f9720dc2eb (2021-03-24 12:55 UTC)
Platform Info:
OS: Linux (x86_64-pc-linux-gnu)
CPU: AMD EPYC 7502 32-Core Processor
WORD_SIZE: 64
LIBM: libopenlibm
LLVM: libLLVM-11.0.1 (ORCJIT, znver2)
julia> Threads.nthreads()
8
perf c2cIf you have a CPU (e.g., Intel) supported by perf
c2c, it can be used to detect false sharing. You can try the following steps or use the script perf_c2c_demo.jl to analyze the functions we defined above:
Create a working directory and cd into it.
Run perf c2c record -- --output=crowded_inc-perf.data (in a different system shell session) while invoking @btime crowded_inc!(ys, data) in the Julia REPL. Terminate it with Ctrl-C when the benchmark is finished.
Similarly, run perf c2c record -- --output=exclusive_inc-perf.data while invoking @btime exclusive_inc!(yss, data) in the Julia REPL. Terminate it with Ctrl-C when the benchmark is finished.
Run, e.g., perf c2c report --input=crowded_inc-perf.data -c tid,iaddr and perf c2c report --input=exclusive_inc-perf.data -c tid,iaddr to analyze the memory accesses.
An example output of perf_c2c_demo.jl can be found at https://gist.github.com/01f4793281dc5edee59c9b6cfb05846b. See C2C - False Sharing Detection in Linux Perf - My Octopress Blog for more information on perf c2c and Perf Wiki for more information on perf in general.
In this example, the command perf c2c report --input=crowded_inc-perf.data -c
tid,iaddr shows the false sharing in this table:
=================================================
Shared Data Cache Line Table
=================================================
#
# ----------- Cacheline ---------- Tot …
# Index Address Node PA cnt Hitm …
# ..... .................. .... ...... ....... …
# …
0 0x7f439f2a1ec0 0 62111 15.50% …
1 0x7f439f2a1e80 0 134824 14.76% …
The addresses corresponding to these two top Hitm (load that hit in a modified cacheline) are in the output array ys (if you use perf_c2c_demo.jl, the addresses are stored in pointers.txt):
julia> lower_bound = 0x00007f439f2a1e60; # pointer(ys, 1)
julia> upper_bound = 0x00007f439f2a1ed8; # pointer(ys, length(ys))
julia> lower_bound <= 0x7f439f2a1ec0 <= upper_bound # from: perf c2c report
true
julia> lower_bound <= 0x7f439f2a1e80 <= upper_bound # from: perf c2c report
true
Furthermore, you can find the threads accessing 0x7f439f2a1ec0 by looking at another table in the output:
=================================================
Shared Cache Line Distribution Pareto
=================================================
#
# ----- HITM ----- -- Store Refs -- ------- CL --------
# RmtHitm LclHitm L1 Hit L1 Miss Off Node PA cnt Tid …
# ....... ....... ....... ....... ..... .... ...... ............. …
#
-------------------------------------------------------------
0 21 21 38634 28457 0x7f439f2a1ec0
-------------------------------------------------------------
28.57% 9.52% 0.00% 0.00% 0x0 0 1 22239:julia …
0.00% 0.00% 26.02% 26.68% 0x0 0 1 22239:julia …
33.33% 47.62% 0.00% 0.00% 0x8 0 1 22240:julia …
0.00% 0.00% 27.23% 27.08% 0x8 0 1 22240:julia …
19.05% 33.33% 0.00% 0.00% 0x10 0 1 22241:julia …
0.00% 0.00% 27.33% 27.14% 0x10 0 1 22241:julia …
19.05% 9.52% 0.00% 0.00% 0x18 0 1 22242:julia …
0.00% 0.00% 19.42% 19.10% 0x18 0 1 22242:julia …
Compare the Tid column above with the list of TIDs of Julia's worker thread (see worker_tids.txt generated by perf_c2c_demo.jl):
$ cat worker_tids.txt
22234
22236
22237
22238
22239
22240
22241
22242
On the other hand, the output of perf c2c report
--input=exclusive_inc-perf.data -c tid,iaddr does not show the sign of false sharing (Hitm is small and the addresses are outside of yss[_][_]):
=================================================
Shared Data Cache Line Table
=================================================
#
# ----------- Cacheline ---------- Tot …
# Index Address Node PA cnt Hitm …
# ..... .................. .... ...... ....... …
#
0 0xffffffff8d546040 0 1 2.99% …
1 0x555ae2321340 0 9 2.99% …
2 0x7f43b1a91cc0 0 1 1.80% …
3 0xffff95a65fc2cc80 0 1 1.20% …
4 0xffffffff8ce44a40 0 1 0.60% …
…
While APIs such as FLoops.@reduce, FLoops.@init, and Transducers.OnInit are useful, not all parallel frameworks support such constructs. Furthermore, it may be desirable to have even finer grained control; e.g., fusing the first iteration and the initial value computation. Fortunately, the basis of the initial value handling is just a plain algebra concept that can be implemented with a couple of lines of code. (See "adjoining" in, e.g., Semigroup - Wikipedia.)
First, let us start from a simple case without mutable accumulator. Given a binary associative function semigroup without a (known) identity element, we can construct a binary function monoid with pre-defined identity element:
struct Init end
function asmonoid(semigroup)
monoid(a, b) = semigroup(a, b)
monoid(::Init, b) = b
monoid(a, ::Init) = a
monoid(::Init, ::Init) = Init() # disambiguation
return monoid
end
let ⊗ = asmonoid(min)
@assert 1 ⊗ 2 == 1
@assert 1 ⊗ Init() == 1
@assert Init() ⊗ 2 == 2
@assert Init() ⊗ Init() == Init()
end
(See also InitialValues.jl as an implementation of this idea used in JuliaFolds.)
asmonod turns the accumulator type into a small Union of types, the actual base case loop can be very efficient if it uses the tail-call "function-barrier" pattern to type-stabilize the accumulator type.The function asmonoid can be slightly modified to support "in-place semigroup" with mutable initial value:
function withinit(f, semigroup!)
monoid!(a, b) = semigroup!(a, b)
monoid!(::Init, b) = semigroup!(f(), b)
monoid!(a, ::Init) = a
monoid!(::Init, ::Init) = Init() # disambiguation
return monoid!
end
let ⊗ = withinit(() -> [], append!)
@assert [1] ⊗ [2] == [1, 2]
@assert [1] ⊗ Init() == [1]
@assert Init() ⊗ [2] == [2]
@assert Init() ⊗ Init() == Init()
end
using Folds
ys = Folds.mapreduce(tuple, withinit(() -> Int[], append!), 1:10; init = Init())
@assert ys == 1:10
Assuming that the parallel mapreduce implementation uses left-to-right iteration (i.e., left fold; foldl) as the base case, the in-place function modnoid! created by withinit initializes the accumulator at the first iteration using the function f and re-uses it for each base case.