Data-parallel Julia

Efficient and safe approaches to mutation in data parallelism

As discussed in a quick introduction to data parallelism, data parallel style lets us write fast, portable, and generic parallel programs. One of the main focuses was to unlearn the "sequential idiom" that accumulates the result into mutable state. However, mutable state is sometimes preferred for efficiency. After all, a fast parallel program is typically a composition of fast sequential programs. Furthermore, managing mutable states is sometimes unavoidable for interoperability with libraries preferring or requiring mutation-based API. However, sharing mutable state is almost always a bad idea. Naively doing so likely results in data races and hence programs with undefined behaviors. Although low-level concurrency APIs such as locks and atomics can be used for writing (typically) inefficient but technically correct programs, a better approach is to use single-owner local mutable state [1]. In particular, we will see that unlearning sequential idiom was worth the effort since it points us to what we call ownership-passing style that can be used to construct mutation-based parallel reduction from mutation-free ("purely functional") reduction as an optimization.

This tutorial provides an overview of the mutable object handling in data-parallel Julia programs [2]. It also discusses the effect and analysis of false sharing which is a major performance pitfall when using in-place operations in a parallel program.

[1] Locks and atomics are important building blocks for concurrent programming and non-blocking algorithms and data structures are very useful for high-performance applications. Although these aspects become non-negligible for squeezing out the "last bits" of the performance, here, we focus on how to construct parallel programs independent of how the synchronizations and scheduling are managed. This is the key for writing portable and correct parallel programs. See also: concurrency is not parallelism.
[2] If you are familiar with the approach using threadid and wonder why it is not discussed here, take a look at What is the difference of @reduce and @init to the approach using state[threadid()]? · FAQ · FLoops.

  1. Example: multiplying and adding matrices
    1. Advanced: fusing multiplication and addition in base cases
  2. Categorizing mutation use-cases
  3. Filling outputs
    1. Pitfalls with filling pre-allocated outputs
  4. In-place reductions
    1. Flexible reduction with FLoops.@reduce
      1. @reduce(acc = op(init, input)) example
      2. Ownership-passing style
      3. @reduce() do example
      4. General form of @reduce() do syntax
      5. Ownership-passing style: second argument
    2. Initializing mutable accumulator using Transducers.OnInit
    3. Combining containers
    4. OnlineStats
    5. Pitfalls with mutable reduction states
  5. Mutable temporary objects (private variables)
  6. Accidental mutations
  7. Advanced/Performance: False sharing
    1. Analyzing false sharing using perf c2c
  8. Advanced: adjoining trick

Example: multiplying and adding matrices

💡 Note
This section can be skipped. It is a quick tour on "practical" Julia code that uses parallel loop and manipulates mutable objects. It does not explain all the concepts in order. The explanations in the next sections take more bottom-up approach.

As a starting point, let us consider the following program that computes a sum of products of matrices: [3]

N = 40
M = 1000
As = [randn(N, N) for _ in 1:M]
Bs = [randn(N, N) for _ in 1:M]
sum(A * B for (A, B) in zip(As, Bs))

[3] If you can store the inputs as AbstractArray{T,3}s, it may be better to use higher-level data parallel libraries such as Tullio.jl and TensorOperations.jl.
As explained in the quick introduction, this program can easily be translated to a parallel program, e.g., by using Folds.jl:

using Folds
Folds.sum(A * B for (A, B) in zip(As, Bs))

This program is suboptimal since it allocates temporary arrays for the multiplications and summations. To clarify the source of allocations, let us translate the above code using @floop (see FLoops.@reduce below for how it works):

using FLoops

@floop for (A, B) in zip(As, Bs)
    C = A * B             # allocation for each iteration
    @reduce() do (S = zero(C); C)
        S = S + C         # allocation for each iteration
    end
end

We can eliminate the allocation of A * B by using LinearAlgebra.mul! and the allocation of S + C by using the inplace broadcasting updates S .+= C. However, we cannot allocate arrays C and S outside @floop because then they will be shared across multiple tasks (and causes data races). Fortunately, we can use @init macro for allocating "private" temporary array C and the "init clause" of @reduce macro (i.e., zero(C) in the code below):

using LinearAlgebra: mul!

@floop for (A, B) in zip(As, Bs)
    @init C = similar(A)
    mul!(C, A, B)
    @reduce() do (S = zero(C); C)
        S .+= C
    end
end

In above code, similar(A) and zero(C) are executed only once in each task and their results are re-used. The result S₁ from task 1 and result S₂ from task 2 are combined using the reduction specified by @reduce(); i.e., S₁ .+= S₂.

Advanced: fusing multiplication and addition in base cases

The previous program provides a decent performance for a straightforward piece of code. However, we can further optimize the program by using fused multiply-add provided by the 5-argument method mul!(C, A, B, α, β). We can use this method for the base cases (where we have matrices A and B) but we need to use .+= when combining the base cases. This can be done by dispatching on the type of the second argument of @reduce:

using FLoops
using LinearAlgebra: mul!

@floop for (A, B) in zip(As, Bs)
    C = (A, B)
    @reduce() do (S = zero(A); C)
        if C isa Tuple  # base case
            mul!(S, C[1], C[2], 1, 1)
        else            # combining base cases
            S .+= C
        end
    end
end

Categorizing mutation use-cases

Let's discuss different kinds of mutability in parallel programs separately:

  1. Filling outputs

  2. In-place reductions

  3. Mutable temporary objects (private variables)

Filling outputs

Perhaps the simplest use of mutation in parallel program is filling pre-allocated output.

using FLoops

xs = 1:2:100
ys = similar(xs)  # output
@floop ThreadedEx() for (i, x) in pairs(xs)
    @inbounds ys[i] = gcd(42, x)
end

This loop can also be written as Threads.@threads:

xs = 1:2:100
ys = similar(xs)  # output
Threads.@threads for i in eachindex(xs, ys)
    @inbounds ys[i] = gcd(42, xs[i])
end

A more succinct approach is Folds.map!:

using Folds

xs = 1:2:100
ys = similar(xs)  # output
Folds.map!(x -> gcd(42, x), ys, xs)

Pitfalls with filling pre-allocated outputs

This type of parallel mutation relies on that different tasks mutate disjoint set of memory locations. The correctness of the above code examples rely on that ys is, e.g., an Array. That is to say, updating each element ys[i] only updates data at disjoint memory location for different index i and does not depend on the memory locations updated by other tasks. However, it is not the case for more complex data collections such as

These are all examples of data races: there are multiple unsynchronized concurrent accesses and at least one of the accesses is write.

On the other hand, non-Array types can also be used safely. For example, ys = view(::Vector, 1:50) can be used instead of a Vector since ys[i] and ys[j] (i ≠ j) refer to disjoint memory locations.

In-place reductions

Many sequential programs compute the result by mutating some states; e.g., appending elements to a vector. This approach is very efficient and is useful as a base case of parallel programs. There are several approaches to safely use such sequential reductions in parallel programs.

Flexible reduction with FLoops.@reduce

@reduce(acc = op(init, input)) example

FLoops.jl is a package for a flexible set of syntax sugar for constructing parallel loops. In particular, we can use @reduce(acc = op(init, input)) syntax for writing parallel reduction:

using FLoops

@floop for x in 1:10
    if isodd(x)
        @reduce(odds = append!(Int[], (x,)))
    else
        @reduce(evens = append!(Int[], (x,)))
    end
end

Here, we use @reduce with the following syntax

# @reduce($acc = $op(    $init, $input))
  @reduce(odds = append!(Int[], (x,)))
#         ~~~~   ~~~~~~~ ~~~~~  ~~~~
#          |       |      |      |
#          |       |      |     Input to reduction
#          |       |      |
#          |       |   Initialization of the accumulator
#          |       |
#          |      Reducing function (aka binary operator, monoid)
#          |
#   Accumulator (result of the reduction)

The @reduce macro is used for generating two types of code (function). First, it is used for generating the base case code. The base case code is generated by (roughly speaking):

  1. remove @reduce( and the corresponding )

  2. replace $init in the first argument with $acc

  3. put $acc = $init in front of the loop

i.e.,

function basecase(chunk)
    odds = Int[]  # init
    evens = Int[]  # init
    for x in chunk
        if isodd(x)
            odds = append!(odds, (x,))
        else
            evens = append!(evens, (x,))
        end
    end
    return (odds, evens)
end

Input collection to @floop for loop is split into chunks first[4]. For example, if julia is started with --threads=2, it is split into two chunks by default:

chunk_left = 1:5
chunk_right = 6:10
@assert vcat(chunk_left, chunk_right) == 1:10  # original input

Each chunk is then processed by (a function equivalent to) the basecase function above:

odds_left, evens_left = basecase(chunk_left)
odds_right, evens_right = basecase(chunk_right)

@assert odds_left == 1:2:5
@assert evens_left == 2:2:5
@assert odds_right == 7:2:10
@assert evens_right == 6:2:10

The function append! specified by @reduce is used also for merging these base case results:

odds = append!(odds_left, odds_right)
evens = append!(evens_left, evens_right)

@assert odds == 1:2:10
@assert evens == 2:2:10

When there are more than two chunks, the reduction results are merged pair-wise (default [4]) or sequentially, depending on the executor used.

[4] By default, JuliaFolds packages use divide-and-conquer approach for scheduling parallel loops. Roughly speaking, it "fuses" splitting of the collections and scheduling the parallel tasks. It also "fuses" the merges of reduction results and joining of the parallel tasks. This increases parallelism of the entire computation compared to more naive sequential scheduling. However, FLoops.jl itself is just a syntax sugar for defining parallel reduction and completely decoupled from how these reductions are computed. The exact execution strategy can be determined by passing the executor.

Ownership-passing style

Note that the above parallel reduction does not incur data races because

  1. The first arguments to append! are created for each base case,

  2. append! mutates the first argument and returns it, and

  3. append! is used in such a way that the first argument (more specifically its state at which append! is called) is never used.

Therefore, we can treat append!as if it were a pure function for the purpose of understanding this parallel reduction. In other words, we never observe the side-effect of append! through the argument. It's easy to see that the above program is correct even if we replace append! with its mutation-free "equivalent" [5] function vcat:

[5] Note that the equivalence is not quite exact. We replace (x,) with [x] since append! and vcat behave differently when the input is non an array.
using FLoops

@floop for x in 1:10
    if isodd(x)
        @reduce(odds = vcat(Int[], [x]))
    else
        @reduce(evens = vcat(Int[], [x]))
    end
end

This observation points to a powerful recipe for constructing efficient parallel reduction:

  1. Write down parallel reduction without using mutation.

  2. Re-write the reducing function (the body of @reduce or the binary function op passed to reduce etc.; i.e., monoid) by mutating the first argument.

  3. Make sure that subsequent iterations do not mutate the second argument (See the discussion below)

It can be used for general reducing functions (op) specified via @reduce macro as well as the functions passed to higher-order functions such as reduce in all JuliaFolds packages. Furthermore, this style allows us to replace append! with BangBang.append!! which is very useful for collecting elements when their type cannot be determined or hard to do so a priori. For lack of better words, let us call it ownership-passing style[6] (a non-standard terminology). This is *because the ownership of dest in dest′ = append!(dest, src) is first transferred to append! which then it is transferred back to the caller as the return value dest′.

Note that there is a subtlety when it comes to the ownership of the second argument. See the discussion below.

[6] In Scheme Requests for Implementation (SRFI) 1, this is called linear update which in turn is taken from the linear type system. This type of operations is called recycling operations in Practical Common Lisp.

@reduce() do example

@reduce() do syntax can be used for defining a more flexible reduction (see also the example section above). Here is a simple example

@floop for n in 1:10
    xs = [n, 2n, 3n]
    @reduce() do (ys = zeros(Int, 3); xs)
#                 ~~~~~~~~~~~~~~~~~~
#                  initializer
        ys .+= xs
#       ~~~~~~~~~
#       reduce body
    end
end

@assert ys == mapreduce(n -> [n, 2n, 3n], .+, 1:10)

The base case code is equivalent to the loop transformed by:

  1. remove @reduce() do ($acc₁ = $init₁; $input₁), …, ($accₙ = $initₙ; $inputₙ) and the corresponding end and keep the reduce body

  2. put the initializers $accᵢ = $initᵢ in front of the loop

i.e.,

function basecase(chunk)
    ys = zeros(Int, 3)         # initializer
    for n in chunk
        xs = [n, 2n, 3n]
        ys .+= xs              # reduce body
    end
    return ys
end

Similar to odds-evens example above, the input collection is chunked and then processed in multiple tasks:

chunk_left = 1:5
chunk_right = 6:10

ys_left = basecase(chunk_left)
ys_right = basecase(chunk_right)

Finally, the base case results are merged by using the body of the @reduce() do block (here, just .+=):

ys_left .+= ys_right
ys = ys_left

@assert ys == mapreduce(n -> [n, 2n, 3n], .+, 1:10)

General form of @reduce() do syntax

In general, @reduce() do takes the following form:

@reduce() do ($acc₁ = $init₁; $input₁),
             ($acc₂ = $init₂; $input₂),
              …
             ($accₙ = $initₙ; $inputₙ)
#              ~~~~    ~~~~~   ~~~~~~
#               |       |        |
#               |       |      Input to reduction (computed outside `@reduce`)
#               |       |
#               |   Initialization of the accumulator
#               |
#             Accumulator (result of the reduction)
    $body
#   ~~~~~
#   Reducing function (aka binary operator, monoid)
end

This expression is used to generate the following function op for merging two set of accᵢs from two tasks

function op(accs_left, accs_right)
    ($acc₁, $acc₂, …, $accₙ) = accs_left
    ($input₁, $input₂, …, $inputₙ) = accs_right
    $body
    return ($acc₁, $acc₂, …, $accₙ)
end

which is invoked as

accs = op(accs_left, accs_right)

to merge the results of "left" and "right" tasks. When using @reduce($acc = $op($init, $input)) syntax, the function $op is used as-is.

Note that the roles of $accᵢs and $inputᵢs are almost "symmetric" in the sense that $body has to be able to handle any value of $accᵢ provided as a $inputᵢ.

The reducing function must be associative; i.e., the results of

op(op(a, b), c)

and

op(a, op(b, c))

must be equivalent in some sense (e.g., isapprox may be enough in some cases; the result of @floop is still deterministic unless a nondeterministic executor is specified or the input collection is unordered).

Furthermore, since the function op has to be executable outside the scope of the sequential loop, it must not use variables whose scope is inside of @floop but outside of @reduce. That is to say, it must only access variables $accᵢs and $inputᵢs or the names defined outside @floop:

using SomeModule: f

function example()
    ...
    x = ...
    @floop for ...
        y = ...
        z = ...
        @reduce() do (acc; y)
            acc   ✅ # accessible (accumulator)
            f     ✅ # accessible (global)
            x     ✅ # accessible (defined outside @floop)
            y     ✅ # accessible (passed as an input)
            z     ❌ # NOT accessible (not passed as an input)
            ...
        end
    end
    ...
end

These requirements for associativity and variable scoping typically can be achieved by "minimizing" the computation done inside @reduce. The following example is incorrect since the body of @reduce is doing an "extra work":

@floop for n in 1:10
    xs = [n, 2n, 3n]
    @reduce() do (ys = zeros(Int, 3); xs)
        ys .+= 2 .* xs  # INCORRECT
    end
end

A correct implementation is to move the computation 2 .* xs out of @reduce.

@floop for n in 1:10
    xs = [n, 2n, 3n]
    zs = 2 .* xs  # CORRECT
    @reduce() do (ys = zeros(Int, 3); zs)
        ys .+= zs
    end
end

@assert ys == mapreduce(n -> 2 .* [n, 2n, 3n], .+, 1:10)

The allocation of temporary variables such as zs can be eliminated by using private variables (see below). It is also possible to fuse the computation of 2 .* _ and _ .+= _ if done carefully (See the example above for how to fuse computation only in the base case).

Ownership-passing style: second argument

Consider the following program that computes sum of arrays

vectors = Any[[n n^2; n^3 n^4] for n in 1:100]
sum(vectors)

Since the element type of the input is unknown, we can't pre-compute the output array type. It may then be tempting to use the first input as the accumulator:

vectors = Any[[n n^2; n^3 n^4] for n in 1:100]
@floop for xs in vectors
    @reduce() do (ys = nothing; xs)
        if ys === nothing
            ys = xs  # ❌ WRONG
        else
            ys .+= xs
        end
    end
end

@assert ys === vectors[1]  # above loop mutated the input

However, as you can see in the @assert statement above, this loop mutated the first element vectors[1]. This is probably not a desirable outcome in many cases (although it may not be problem in specific use cases). Thus, in general, we should assume that the reducing function does not own second argument when using the ownership-passing style. Therefore, we need to copy the second argument when using it as the accumulator.

vectors = Any[[n n^2; n^3 n^4] for n in 1:100]
@floop for xs in vectors
    @reduce() do (ys = nothing; xs)
        if ys === nothing
            ys = copy(xs)  # ✅ CORRECT
        else
            ys .+= xs
        end
    end
end

@assert ys !== vectors[1]  # input not mutated

Initializing mutable accumulator using Transducers.OnInit

Transducers.OnInit(f) can be passed as init argument in many JuliaFolds API. It calls the zero-argument function f that creates the accumulator for each base case:

using Folds
using Transducers: OnInit
ys = Folds.mapreduce(x -> (x, 2x, 3x), .+, 1:10; init = OnInit(() -> [0, 0, 0]))

@assert ys == [sum(1:10), 2sum(1:10), 3sum(1:10)]

Combining containers

When each iteration produce a container, there is usually a "default" way to combine all the containers produced. For basic containers such as vectors, sets and dictionaries, Base already define appropriate functions:

ContainerPure functionIn-place function
vectorvcatappend!
setunionunion!
dictionarymergemerge!
mergewith(op)mergewith!(op)

(These are not the only associative operations on these containers. For example, union works on vectors, too. intersect defines another associative operations on sets.)

The corresponding containers can be constructed in parallel by feeding sub-containers into these "merging" functions:

using Folds
using Transducers: OnInit
ys1 = Folds.mapreduce(x -> [x], append!, 1:10; init = OnInit(() -> Int[]))
ys2 = Folds.mapreduce(x -> Set([x]), union!, 1:10; init = OnInit(Set{Int}))
ys3 = Folds.mapreduce(x -> Dict(x => x^2), merge!, 1:10; init = OnInit(Dict{Int,Int}))
ys4 = Folds.mapreduce(x -> Dict(isodd(x) => 1), mergewith!(+), 1:10; init = OnInit(Dict{Bool,Int}))

@assert ys1 == 1:10
@assert ys2 == Set(1:10)
@assert ys3 == Dict(x => x^2 for x in 1:10)
@assert ys4 == Dict(false => 5, true => 5)

However, it is suboptimal to heap-allocate these singleton containers (containers with single element). We can use special singleton containers from MicroCollections.jl to avoid heap allocations. Another downside of the approach using functions such as append! is that they require specifying element type before the reduction is started. This is sometimes impossible or very tedious. We can avoid it by using the !! functions from BangBang.jl. For example, BangBang.append!!(ys, xs) may return a new array without mutating ys if the element type of ys is not appropriate for storing xs. Thus, in the parallel reduction context, BangBang.jl functions can be used more like pure functions except that the objects passed as the first argument cannot be re-used again.

using Folds
using MicroCollections
using BangBang
ys1 = Folds.mapreduce(x -> SingletonVector((x,)), append!!, 1:10)
ys2 = Folds.mapreduce(x -> SingletonSet((x,)), union!!, 1:10)
ys3 = Folds.mapreduce(x -> SingletonDict(x => x^2), merge!!, 1:10)
ys4 = Folds.mapreduce(x -> SingletonDict(isodd(x) => 1), mergewith!!(+), 1:10)

Since these are common idioms, Folds.jl has shorthands for the first three cases:

using Folds
ys1 = Folds.collect(1:10)
ys2 = Folds.set(1:10)
ys3 = Folds.dict(x => x^2 for x in 1:10)

OnlineStats

OnlineStats.jl is a rich collection of composable single-pass algorithms for computing various statistics. Although OnlineStats.jl itself only provides in-place operations, Transducers.jl defines a compatibility layer (using OnInit etc.) that treat mergeable OnlineStats as monoids. Folds.reduce probably is the easiest API to use:

using OnlineStats
using Folds
Folds.reduce(Mean(), 1:10)
Mean: n=10 | value=5.5

We can also use FLoops.@reduce directly with OnlineStats.jl. The key point is to make the body of @reduce "symmetric in type"; i.e., pass Mean at both arguments:

using FLoops
using OnlineStats

@floop for x in 1:10
    y = 2x
    m = fit!(Mean(), y)
    @reduce() do (acc = Mean(); m)
        merge!(acc, m)
    end
end

@assert acc == fit!(Mean(), 2:2:20)

It is also possible to get rid of the intermediate Mean object by "fusing" fit! and merge! in the base case. (However, this optimization may not be required since the compiler is likely to optimize away the creation of intermediate Mean object m.)

@floop for x in 1:10
    y = 2x
    @reduce() do (acc = Mean(); y)
        if y isa OnlineStat
            merge!(acc, y)
        else
            fit!(acc, y)
        end
    end
end

@assert acc == fit!(Mean(), 2:2:20)

Pitfalls with mutable reduction states

When using in-place reductions, mutable accumulators must be specified carefully to avoid sharing them across tasks. For example,

Folds.mapreduce(x -> (x,), append!, xs; init = [])

is not a correct parallel program since it mutates the array [] in multiple tasks. The APIs such as FLoops.@reduce discussed above can be used to avoid this problem when used correctly. However, it's still possible to misuse the API. For example,

using FLoops

shared_acc = []  # WRONG (shared across tasks and mutated concurrently)
@floop for x in 1:10
    ys = (x,)
    @reduce(acc = append!(shared_acc, ys))
end

has exactly the same problem as init = [].

Mutable temporary objects (private variables)

In addition to pre-allocated objects or mutable accumulators, it is sometimes necessary to have mutable temporary objects. While temporary objects are technically equivalent to accumulators that are ignored after the loop, it is convenient to have a dedicated API. This is why FLoops.jl has @init macro that declares "private variables" for re-using mutable temporary objects within each base case:

using FLoops

@floop for x in 1:10
    @init xs = Vector{Int}(undef, 3)
    xs .= (x, 2x, 3x)
    @reduce() do (ys = zeros(Int, 3); xs)
        ys .+= xs
    end
end

@assert ys == mapreduce(x -> [x, 2x, 3x], .+, 1:10)

The right hand sides of the assignments in @init is executed only at the first iteration of each base case.

Accidental mutations

Many parallel APIs in Julia, including @threads and @spawn, require creating closures. Thus, it is a common mistake to accidentally expand the scope of local variables. For example, the scope of y in the following program is larger than @threads for loop. As a result, the update of y is shared across all tasks and this function has a data race:

function f(n = 2^10)
    ys = zeros(Int, n)
    Threads.@threads for i in 1:n
        y = gcd(42, i)
        some_function()
        ys[i] = y
    end

    # Suppose that some unrelated code uses the same variable names as the
    # temporary variables in the parallel loop:
    if ys[1] > 0
        y = 1
    end

    return ys
end

# Some function that Julia does not inline:
@noinline some_function() = _FALSE_[] && error("unreachable")
const _FALSE_ = Ref(false)

The data race is observable if there is a non-inlinable function call (some_function() in the example) between the definition and the use of y. If we run the above function multiple times in a julia process started with multiple worker threads, we can observe that the result is different from the expected value:

f_seq(n = 2^10) = gcd.(42, 1:n)
julia> ys = f_seq()
       for i in 1:100
           @assert f() == ys
       end
ERROR: AssertionError: f() == ys

We can use the local keyword to fix it:

Threads.@threads for i in 1:n
    local y = gcd(42, i)  # added `local`
    some_function()
    ys[i] = y
end

Alternatively, if the loop body of Threads.@threads or the thunk of Threads.@spawn is large, it is a good idea to factor it out as a function. It prevents assignments inside the closure and hence the data races. It also makes analyzing base case performance easier.

Using FLoops.jl instead of @threads is useful to prevent this bug. Consider an equivalent parallel loop written with FLoops.jl:

using FLoops

function f_floop(n = 2^10)
    ys = zeros(Int, n)
    @floop ThreadedEx() for i in 1:n
        y = gcd(42, i)
        some_function()
        ys[i] = y
    end

    if ys[1] > 0
        y = 1
    end

    return ys
end

FLoops.jl detects that the variable y is now shared across all tasks:

julia> f_floop()
ERROR: HasBoxedVariableError: Closure __##reducing_function#258 (defined in Main) has 1 boxed variable: y
HINT: Consider adding declarations such as `local y` at the narrowest possible scope required.

As the error message indicates, local y = gcd(42, i) also fix the issue in @floop.

Advanced/Performance: False sharing

Even when parallel programs are correctly written to handle mutable objects, it may not perform well due to false sharing. False sharing can occur when the code running on different CPUs update adjacent (but disjoint) memory locations. The program is still data race-free (not "true" sharing) since it does not simultaneously mutate the same memory locations. However, since the CPU manages the data in a unit (cache line) larger than single bytes, the CPUs have to communicate each other to maintain the coherent view of the data and avoid such simultaneous modification of the "same" memory location (from the point of view of the CPUs). This extra communication slows down the program.

💡 Note
False sharing is hard to invoke when using the patterns discussed above due to Julia's memory management. On the other hand, the use of state[threadid()] pattern to create "thread-local storage" for reduction or private variables is more likely to invoke false-sharing unless used carefully (e.g., padding). More importantly, it is very hard to use threadid-based approach correctly since there is no systematic way to locate or restrict yield points. See also What is the difference of @reduce and @init to the approach using state[threadid()]? · FAQ · FLoops.

Let us use the following functions [7] [8] to demonstrate the effect of false sharing:

[7] The effect of false sharing depends on the actual CPU used. This example showed the effect of false sharing when experimented on several CPUs from different vendors (Intel, AMD, and IBM). However, a simpler example did not show false sharing in some CPUs.
[8] Minor details: these functions use @threads :static scheduling to force distribution of multiple tasks to the worker (OS) threads. Each array in yss includes a space at least as large as a cache line at the end to avoid false sharing as much as possible (although it can be done more parsimoniously).

function crowded_inc!(ys, data)
    Threads.@threads :static for indices in data
        for i in indices
            @inbounds ys[i] += 1
        end
    end
end

function exclusive_inc!(yss, data)
    Threads.@threads :static for indices in data
        ys = yss[Threads.threadid()]
        for i in indices
            @inbounds ys[i] += 1
        end
    end
end

cacheline = try
    parse(Int, read("/sys/devices/system/cpu/cpu0/cache/index0/coherency_line_size", String))
catch err
    @warn "cannot read cache line size" exception = (err, catch_backtrace())
    64
end

ys = zeros(Threads.nthreads() * 2);
partitioned_indices = reshape(eachindex(ys), Threads.nthreads(), :)'
data = [rand(partitioned_indices[:, i], 2^20) for i in 1:Threads.nthreads()]
yss = [zeros(length(ys) + cld(cacheline, sizeof(eltype(ys)))) for _ in 1:Threads.nthreads()];

The functions crowded_inc! and exclusive_inc! perform almost the same computation and use exactly the same access pattern on the output array ys for each task. In crowded_inc!, since multiple tasks (that are likely to be scheduled on different CPUs) try to update nearby memory locations concurrently, it invokes false sharing which can be observed as the slow down compared to exclusive_inc! (see below). This does not occur in exclusive_inc! where each task updates an array dedicated to it (we also made sure that these accesses are at least one cache line apart).

julia> using BenchmarkTools

julia> @btime crowded_inc!(ys, data) setup = fill!(ys, 0);
  95.385 ms (43 allocations: 3.70 KiB)

julia> @btime exclusive_inc!(yss, data) setup = foreach(ys -> fill(ys, 0), yss);
  1.801 ms (41 allocations: 3.64 KiB)

julia> versioninfo()
Julia Version 1.6.0
Commit f9720dc2eb (2021-03-24 12:55 UTC)
Platform Info:
  OS: Linux (x86_64-pc-linux-gnu)
  CPU: AMD EPYC 7502 32-Core Processor
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-11.0.1 (ORCJIT, znver2)

julia> Threads.nthreads()
8

Analyzing false sharing using perf c2c

If you have a CPU (e.g., Intel) supported by perf c2c, it can be used to detect false sharing. You can try the following steps or use the script perf_c2c_demo.jl to analyze the functions we defined above:

  1. Create a working directory and cd into it.

  2. Run perf c2c record -- --output=crowded_inc-perf.data (in a different system shell session) while invoking @btime crowded_inc!(ys, data) in the Julia REPL. Terminate it with Ctrl-C when the benchmark is finished.

  3. Similarly, run perf c2c record -- --output=exclusive_inc-perf.data while invoking @btime exclusive_inc!(yss, data) in the Julia REPL. Terminate it with Ctrl-C when the benchmark is finished.

  4. Run, e.g., perf c2c report --input=crowded_inc-perf.data -c tid,iaddr and perf c2c report --input=exclusive_inc-perf.data -c tid,iaddr to analyze the memory accesses.

An example output of perf_c2c_demo.jl can be found at https://gist.github.com/01f4793281dc5edee59c9b6cfb05846b. See C2C - False Sharing Detection in Linux Perf - My Octopress Blog for more information on perf c2c and Perf Wiki for more information on perf in general.

In this example, the command perf c2c report --input=crowded_inc-perf.data -c tid,iaddr shows the false sharing in this table:

=================================================
           Shared Data Cache Line Table
=================================================
#
#        ----------- Cacheline ----------      Tot     …
# Index             Address  Node  PA cnt     Hitm     …
# .....  ..................  ....  ......  .......     …
#                                                      …
      0      0x7f439f2a1ec0     0   62111   15.50%     …
      1      0x7f439f2a1e80     0  134824   14.76%     …

The addresses corresponding to these two top Hitm (load that hit in a modified cacheline) are in the output array ys (if you use perf_c2c_demo.jl, the addresses are stored in pointers.txt):

julia> lower_bound = 0x00007f439f2a1e60;  # pointer(ys, 1)

julia> upper_bound = 0x00007f439f2a1ed8;  # pointer(ys, length(ys))

julia> lower_bound <= 0x7f439f2a1ec0 <= upper_bound  # from: perf c2c report
true

julia> lower_bound <= 0x7f439f2a1e80 <= upper_bound  # from: perf c2c report
true

Furthermore, you can find the threads accessing 0x7f439f2a1ec0 by looking at another table in the output:

=================================================
      Shared Cache Line Distribution Pareto
=================================================
#
# ----- HITM -----  -- Store Refs --  ------- CL --------
# RmtHitm  LclHitm   L1 Hit  L1 Miss    Off  Node  PA cnt            Tid      …
# .......  .......  .......  .......  .....  ....  ......  .............      …
#
  -------------------------------------------------------------
      0       21       21    38634    28457      0x7f439f2a1ec0
  -------------------------------------------------------------
   28.57%    9.52%    0.00%    0.00%    0x0     0       1    22239:julia      …
    0.00%    0.00%   26.02%   26.68%    0x0     0       1    22239:julia      …
   33.33%   47.62%    0.00%    0.00%    0x8     0       1    22240:julia      …
    0.00%    0.00%   27.23%   27.08%    0x8     0       1    22240:julia      …
   19.05%   33.33%    0.00%    0.00%   0x10     0       1    22241:julia      …
    0.00%    0.00%   27.33%   27.14%   0x10     0       1    22241:julia      …
   19.05%    9.52%    0.00%    0.00%   0x18     0       1    22242:julia      …
    0.00%    0.00%   19.42%   19.10%   0x18     0       1    22242:julia      …

Compare the Tid column above with the list of TIDs of Julia's worker thread (see worker_tids.txt generated by perf_c2c_demo.jl):

$ cat worker_tids.txt
22234
22236
22237
22238
22239
22240
22241
22242

On the other hand, the output of perf c2c report --input=exclusive_inc-perf.data -c tid,iaddr does not show the sign of false sharing (Hitm is small and the addresses are outside of yss[_][_]):

=================================================
           Shared Data Cache Line Table
=================================================
#
#        ----------- Cacheline ----------      Tot      …
# Index             Address  Node  PA cnt     Hitm      …
# .....  ..................  ....  ......  .......      …
#
      0  0xffffffff8d546040     0       1    2.99%      …
      1      0x555ae2321340     0       9    2.99%      …
      2      0x7f43b1a91cc0     0       1    1.80%      …
      3  0xffff95a65fc2cc80     0       1    1.20%      …
      4  0xffffffff8ce44a40     0       1    0.60%      …
      …

Advanced: adjoining trick

While APIs such as FLoops.@reduce, FLoops.@init, and Transducers.OnInit are useful, not all parallel frameworks support such constructs. Furthermore, it may be desirable to have even finer grained control; e.g., fusing the first iteration and the initial value computation. Fortunately, the basis of the initial value handling is just a plain algebra concept that can be implemented with a couple of lines of code. (See "adjoining" in, e.g., Semigroup - Wikipedia.)

First, let us start from a simple case without mutable accumulator. Given a binary associative function semigroup without a (known) identity element, we can construct a binary function monoid with pre-defined identity element:

struct Init end

function asmonoid(semigroup)
    monoid(a, b) = semigroup(a, b)
    monoid(::Init, b) = b
    monoid(a, ::Init) = a
    monoid(::Init, ::Init) = Init()  # disambiguation
    return monoid
end

let ⊗ = asmonoid(min)
    @assert 1 ⊗ 2 == 1
    @assert 1 ⊗ Init() == 1
    @assert Init() ⊗ 2 == 2
    @assert Init() ⊗ Init() == Init()
end

(See also InitialValues.jl as an implementation of this idea used in JuliaFolds.)

💡 Note
Even though asmonod turns the accumulator type into a small Union of types, the actual base case loop can be very efficient if it uses the tail-call "function-barrier" pattern to type-stabilize the accumulator type.

The function asmonoid can be slightly modified to support "in-place semigroup" with mutable initial value:

function withinit(f, semigroup!)
    monoid!(a, b) = semigroup!(a, b)
    monoid!(::Init, b) = semigroup!(f(), b)
    monoid!(a, ::Init) = a
    monoid!(::Init, ::Init) = Init()  # disambiguation
    return monoid!
end

let ⊗ = withinit(() -> [], append!)
    @assert [1] ⊗ [2] == [1, 2]
    @assert [1] ⊗ Init() == [1]
    @assert Init() ⊗ [2] == [2]
    @assert Init() ⊗ Init() == Init()
end

using Folds
ys = Folds.mapreduce(tuple, withinit(() -> Int[], append!), 1:10; init = Init())

@assert ys == 1:10

Assuming that the parallel mapreduce implementation uses left-to-right iteration (i.e., left fold; foldl) as the base case, the in-place function modnoid! created by withinit initializes the accumulator at the first iteration using the function f and re-uses it for each base case.

#show test results