Transducers and Transducible processes

Transducible processes

Transducers.foldxlFunction
foldxl(step, xf::Transducer, reducible; init, simd) :: T
foldxl(step, reducible; init, simd) :: T
foldl(step, xf::Transducer, reducible; init, simd) :: T
foldl(step, ed::Eduction; init, simd) :: T
transduce(xf, step, init, reducible, [executor]; simd) :: Union{T, Reduced{T}}

eXtended left fold. Compose transducer xf with reducing step function step and reduce itr using it.

Note

transduce differs from foldxl as Reduced{T} is returned if the transducer xf or step aborts the reduction and step is not automatically wrapped by Completing.

This API is modeled after transduce in Clojure.

For parallel versions, see foldxt and foldxd.

See also: Empty result handling.

Arguments

  • xf::Transducer: A transducer.
  • step: A callable which accepts 1 and 2 arguments. If it only accepts 2 arguments, wrap it with Completing to "add" 1-argument form (i.e., complete protocol).
  • reducible: A reducible object (array, dictionary, any iterator, etc.).
  • executor: Specify an executor. See SequentialEx.
  • init: An initial value fed to the first argument to reducing step function step. This argument can be omitted for well know binary operations like + or *. Supported binary operations are listed in InitialValues.jl documentation. When Init (not the result of Init, such as Init(*)) is given, it is automatically "instantiated" as Init(step) (where step is appropriately unwrapped if step is a Completing). See Empty result handling in the manual for more information.
  • simd: If true or :ivdep, enable SIMD using Base.@simd. If :ivdep, use @simd ivdep for ... end variant. Read Julia manual of Base.@simd to understand when it is appropriate to use this option. For example, simd = :ivdep must not be used with stateful transducer like Scan. If false (default), Base.@simd is not used.

Examples

julia> using Transducers

julia> foldl(Filter(isodd), 1:4, init=0.0) do state, input
           @show state, input
           state + input
       end
(state, input) = (0.0, 1)
(state, input) = (1.0, 3)
4.0

julia> foldxl(+, 1:5 |> Filter(isodd))
9

julia> 1:5 |> Filter(isodd) |> foldxl(+)
9

Since TeeRF requires the extended fold protocol, foldl(TeeRF(min, max), [5, 2, 6, 8, 3]) does not work while it works with foldxl:

julia> foldxl(TeeRF(min, max), [5, 2, 6, 8, 3])
(2, 8)

The unary method of foldlx is useful when combined with |>:

julia> (1:5, 4:-1:1) |> Cat() |> Filter(isodd) |> Enumerate() |> foldxl() do a, b
           a[2] < b[2] ? b : a
       end
(3, 5)
source
Base.foldlFunction
foldl(step, xf::Transducer, reducible; init, simd) :: T
foldl(step, ed::Eduction; init, simd) :: T

See foldxl.

source
Base.foreachFunction
foreach(eff, xf::Transducer, reducible; simd)
foreach(eff, ed::Eduction; simd)

Feed the results of xf processing items in reducible into a unary function eff. This is useful when the primary computation at the bottom is the side-effect. It is also equivalent to foreach(eff, eduction(xf, coll)). Note that

foreach(eduction(xf, coll)) do x
    ...
end

can be more efficient than

for x in eduction(xf, coll)
    ...
end

as the former does not have to translate the transducer protocol to the iterator protocol.

foreach supports all constructs in the native for loop as well as the enhancements [julia_issue_22891] to break with a value (break D(x) below) and append the else clause (E(x) below).

This native for loop

ans = for x in xs
    A(x)
    B(x) && break
    C(x) && break D(x)
else
    E(x)
end

can be written as

ans = foreach(Map(identity), xs) do x
    A(x)
    B(x) && return reduced()
    C(x) && return reduced(D(x))
    x  # required for passing `x` to `E(x)` below
end |> ifunreduced() do x
    E(x)
end

See: mapfoldl, reduced, ifunreduced.

Transducers.jl 0.3

foreach is changed to return what the do block (eff function) returns as-is in version 0.3. This was required for supporting "for-else" (|> ifunreduced). Previously, it only supported break-with-value and always applied unreduced before it returns.

Examples

julia> using Transducers

julia> foreach(eduction(Filter(isodd), 1:4)) do input
           @show input
       end
input = 1
input = 3
3

julia> foreach(Filter(!ismissing), [1, missing, 2, 3]) do input
           @show input
           if iseven(input)
               return reduced()
           end
       end
input = 1
input = 2
Reduced(nothing)

It is often useful to append |> unreduced to unwrap Reduced in the final result (note that |> here is the standard function application, not the transducer composition).

julia> foreach(Filter(!ismissing), [1, missing, 2, 3]) do input
           reduced("got $input")
       end |> unreduced
"got 1"

Combination of break-with-value and for-else is useful for triggering action after (e.g.) some kind of membership testing failed:

julia> has2(xs) = foreach(Filter(!ismissing), xs) do input
           input == 2 && reduced(true)
       end |> ifunreduced() do input
           @show input
           false
       end;

julia> has2([1, missing, 2, 3])
true

julia> has2([1, missing])
input = false
false

However, note the output input = false in the last example. This is because how && works in Julia

julia> false && "otherwise"
false

Thus, pure membership testing functions like has2 above can be written in a more concise manner:

julia> simpler_has2(xs) = foreach(Filter(!ismissing), xs) do input
           input == 2 && reduced(true)
       end |> unreduced;

julia> simpler_has2([1, missing, 2, 3])
true

julia> simpler_has2([1, missing])
false
source
Transducers.foldxtFunction
foldxt(step, xf, reducible; [init, simd, basesize, stoppable, nestlevel]) :: T

eXtended threaded fold (reduce). This is a multi-threaded reduce based on extended fold protocol defined in Transducers.jl.

The "bottom" reduction function step(::T, ::T) :: T must be associative and init must be its identity element.

Transducers composing xf must be stateless (e.g., Map, Filter, Cat, etc.) except for ScanEmit. Note that Scan is not supported (although possible in theory). Early termination requires Julia ≥ 1.3.

Use tcollect or tcopy to collect results into a container.

See also: Parallel processing tutorial, foldxl, foldxd.

Keyword Arguments

  • basesize::Integer = amount(reducible) ÷ nthreads(): A size of chunk in reducible that is processed by each worker. A smaller size may be required when:

    • computation time for processing each item fluctuates a lot
    • computation can be terminated by reduced or transducers using it, such as ReduceIf
  • stoppable::Bool: [This option usually does not have to be set manually.] The threaded fold executed in the "stoppable" mode used for optimizing reduction with reduced has a slight overhead if reduced is not used. This mode can be disabled by passing stoppable = false. It is usually automatically detected and set appropriately. Note that this option is purely for optimization and does not affect the result value.

  • nestlevel::Union{Integer,Val}: Specify how many inner Cat (flatten) transducers to be multi-threaded (using TCat). It must be a positive integer, Val of positive integer, or Val(:inf). Val(:inf) means to use multi-threading for all Cat transducers. Note that Cat transducer should be statically known. That is to say, the fold implementation sees two Cats in ... |> Map(f) |> Cat() |> Cat() but only one Cat in ... |> Map(x -> f(x) |> Cat()) |> Cat() even though they are semantically identical.

  • For other keyword arguments, see foldl.

Transducers.jl 0.4.23

Keyword option stoppable requires at least Transducers.jl 0.4.23.

Examples

julia> using Transducers

julia> foldxt(+, 1:3 |> Map(exp) |> Map(log))
6.0

julia> using BangBang: append!!

julia> foldxt(append!!, Map(x -> 1:x), 1:2; basesize=1, init=Union{}[])
3-element Vector{Int64}:
 1
 1
 2

julia> 1:5 |> Filter(isodd) |> foldxt(+)
9

julia> foldxt(TeeRF(min, max), [5, 2, 6, 8, 3])
(2, 8)
source
Transducers.foldxdFunction
foldxd(step, xform::Transducer, array; [init, simd, basesize, threads_basesize, pool])

eXtended distributed fold (reduce). This is a distributed reduce based on extended fold protocol defined in Transducers.jl.

Unlike foldxt, early termination by reduced is not supported yet.

Use dcollect or dcopy to collect results into a container.

See also: Parallel processing tutorial, foldxl, foldxt.

Transducers.jl 0.4.3

New in version 0.4.3.

Keyword Arguments

  • pool::AbstractWorkerPool: Passed to Distributed.remotecall.

  • basesize::Integer = amount(array) ÷ nworkers(): A size of chunk in array that is processed by each worker. A smaller size may be required when computation time for processing each item can fluctuate a lot.

  • threads_basesize::Integer = basesize ÷ nthreads(): A size of chunk in array that is processed by each task in each worker process. The default setting assumes that the number of threads used in all workers are the same. For heterogeneous setup where each worker process has different number of threads, it may be required to use smaller threads_basesize and basesize to get a good performance.

  • For other keyword arguments, see foldl.

Examples

julia> using Transducers

julia> foldxd(+, 1:3 |> Map(exp) |> Map(log))
6.0

julia> 1:5 |> Filter(isodd) |> foldxd(+)
9

julia> foldxd(TeeRF(min, max), [5, 2, 6, 8, 3])
(2, 8)
source
Transducers.eductionFunction
eduction(xf::Transducer, coll)
xf(coll)
coll |> xf

Create a iterable and reducible object.

This API is modeled after eduction in Clojure.

Note

Even though eduction returns an iterable, it is highly recommended to use the foldl-based method provided by Transducers.jl when the performance is important.

Examples

julia> using Transducers

julia> for x in 1:1000 |> Filter(isodd) |> Take(3)  # slow
           @show x
       end
x = 1
x = 3
x = 5

julia> foreach(1:1000 |> Filter(isodd) |> Take(3)) do x  # better
           @show x
       end;
x = 1
x = 3
x = 5
source
eduction(iterator::Iterators.Generator)
eduction(iterator::Iterators.Filter)
eduction(iterator::Iterators.Flatten)

Convert an iterator to an eduction. The iterators that are typically used in the generator comprehensions are supported.

Transducers.jl 0.3

New in version 0.3.

Examples

julia> using Transducers

julia> iter = (y for x in 1:10 if x % 2 == 0 for y in (x, x + 1));

julia> ed = eduction(iter);

julia> collect(iter) == collect(ed)
true
source
Base.map!Function
map!(xf::Transducer, dest, src; simd)

Feed src to transducer xf, storing the result in dest. Collections dest and src must have the same shape. Transducer xf may contain filtering transducers. If some entries src are skipped, the corresponding entries in dest will be unchanged. Transducer xf must not contain any expansive transducers such as MapCat.

See also copy!.

Examples

julia> using Transducers

julia> xs = collect(1:5)
       ys = zero(xs)
       map!(Filter(isodd), ys, xs)
5-element Vector{Int64}:
 1
 0
 3
 0
 5

julia> ans === ys
true
source
Base.copy!Function
copy!(xf::Transducer, dest, src)

Feed src to transducer xf, storing the result in dest. Collections dest and src may have the same shape. Source src must be iterable. Destination dest must implement empty! and push!.

See also map!.

Examples

julia> using Transducers

julia> copy!(opcompose(PartitionBy(x -> x ÷ 3), Map(sum)), Int[], 1:10)
4-element Vector{Int64}:
  3
 12
 21
 19
source
Base.copyFunction
copy(xf::Transducer, T, foldable) :: Union{T, Empty{T}}
copy(xf::Transducer, foldable::T) :: Union{T, Empty{T}}
copy([T,] eduction::Eduction) :: Union{T, Empty{T}}

Process foldable with a transducer xf and then create a container of type T filled with the result. Return BangBang.Empty{T} if the transducer does not produce anything. (This is because there is no consistent interface to create an empty container given its type and not all containers support creating an empty container.)

For parallel versions, see tcopy and dcopy.

Transducers.jl 0.4.4

New in version 0.4.4.

Transducers.jl 0.4.8

copy now accepts eductions.

Examples

julia> using Transducers
       using BangBang: Empty

julia> copy(Map(x -> x => x^2), Dict, 2:2)
Dict{Int64, Int64} with 1 entry:
  2 => 4

julia> @assert copy(Filter(_ -> false), Set, 1:1) === Empty(Set)

julia> using TypedTables

julia> @assert copy(Map(x -> (a=x, b=x^2)), Table, 1:1) == Table(a=[1], b=[1])

julia> using StructArrays

julia> @assert copy(Map(x -> (a=x, b=x^2)), StructVector, 1:1) == StructVector(a=[1], b=[1])

julia> using DataFrames

julia> @assert copy(
           Map(x -> (A = x.a + 1, B = x.b + 1)),
           DataFrame(a = [1], b = [2]),
       ) == DataFrame(A = [2], B = [3])
source
Transducers.tcopyFunction
tcopy(xf::Transducer, T, reducible; basesize) :: Union{T, Empty{T}}
tcopy(xf::Transducer, reducible::T; basesize) :: Union{T, Empty{T}}
tcopy([T,] itr; basesize) :: Union{T, Empty{T}}

Thread-based parallel version of copy. Keyword arguments are passed to foldxt.

See also: Parallel processing tutorial (especially Example: parallel collect).

Transducers.jl 0.4.5

New in version 0.4.5.

Transducers.jl 0.4.8

tcopy now accepts iterator comprehensions and eductions.

Examples

julia> using Transducers

julia> tcopy(Map(x -> x => x^2), Dict, 2:2)
Dict{Int64, Int64} with 1 entry:
  2 => 4

julia> using TypedTables

julia> @assert tcopy(Map(x -> (a=x,)), Table, 1:1) == Table(a=[1])

julia> using StructArrays

julia> @assert tcopy(Map(x -> (a=x,)), StructVector, 1:1) == StructVector(a=[1])

tcopy works with iterator comprehensions and eductions (unlike copy, there is no need for manual conversion with eduction):

julia> table = StructVector(a = [1, 2, 3], b = [5, 6, 7]);

julia> @assert tcopy(
           (A = row.a + 1, B = row.b - 1) for row in table if isodd(row.a)
       ) == StructVector(A = [2, 4], B = [4, 6])

julia> @assert tcopy(
           DataFrame,
           (A = row.a + 1, B = row.b - 1) for row in table if isodd(row.a)
       ) == DataFrame(A = [2, 4], B = [4, 6])

julia> @assert table |>
           Filter(row -> isodd(row.a)) |> Map(row -> (A = row.a + 1, B = row.b - 1)) |>
           tcopy == StructVector(A = [2, 4], B = [4, 6])

If you have Cat or MapCat at the end of the transducer, consider using foldxt directly:

julia> using Transducers
       using DataFrames

julia> @assert tcopy(
           DataFrame,
           1:2 |> Map(x -> DataFrame(a = [x])) |> MapCat(eachrow);
           basesize = 1,
       ) == DataFrame(a = [1, 2])

julia> using BangBang: Empty, append!!

julia> @assert foldxt(
           append!!,
           Map(x -> DataFrame(a = [x])),
           1:2;
           basesize = 1,
           # init = Empty(DataFrame),
       ) == DataFrame(a = [1, 2])

Note that above snippet assumes that it is OK to mutate the dataframe returned by the transducer. Use init = Empty(DataFrame) if this is not the case.

This approach of using foldxt works with other containers; e.g., with TypedTables.Table:

julia> using TypedTables

julia> @assert foldxt(
           append!!,
           Map(x -> Table(a = [x])),
           1:2;
           basesize = 1,
           # init = Empty(Table),
       ) == Table(a = [1, 2])
source
Transducers.dcopyFunction
dcopy(xf::Transducer, T, reducible; [basesize, threads_basesize]) :: Union{T, Empty{T}}
dcopy(xf::Transducer, reducible::T; [basesize, threads_basesize]) :: Union{T, Empty{T}}
dcopy([T,] itr; [basesize, threads_basesize]) :: Union{T, Empty{T}}

Distributed.jl-based parallel version of copy. Keyword arguments are passed to foldxd. For examples, see tcopy.

See also: Parallel processing tutorial (especially Example: parallel collect).

Transducers.jl 0.4.5

New in version 0.4.5.

Transducers.jl 0.4.8

dcopy now accepts iterator comprehensions and eductions.

source
Base.append!Function
append!(xf::Transducer, dest, src) -> dest

This API is modeled after into in Clojure.

Warning

The performance of append!(dest, src::Eduction) is poor. Use append!! instead if two-argument form is preferred.

Examples

julia> using Transducers

julia> append!(Drop(2), [-1, -2], 1:5)
5-element Vector{Int64}:
 -1
 -2
  3
  4
  5
source
BangBang.append!!Function
BangBang.append!!(xf::Transducer, dest, src) -> dest′
BangBang.append!!(dest, src::Eduction) -> dest′

Mutate-or-widen version of append!.

Transducers.jl 0.4.4

New in version 0.4.4.

Transducers.jl 0.4.37

Performance optimization for append!!(dest, src::Eduction) requires version 0.4.37.

Examples

julia> using Transducers, BangBang

julia> append!!(opcompose(Drop(2), Map(x -> x + 0.0)), [-1, -2], 1:5)
5-element Vector{Float64}:
 -1.0
 -2.0
  3.0
  4.0
  5.0
source
Base.collectFunction
collect(xf::Transducer, itr) :: Vector
collect(ed::Eduction) :: Vector

Process an iterable itr using a transducer xf and collect the result into a Vector.

For parallel versions, see tcollect and dcollect.

Transducers.jl 0.4.8

collect now accepts eductions.

Examples

julia> using Transducers

julia> collect(Interpose(missing), 1:3)
5-element Vector{Union{Missing, Int64}}:
 1
  missing
 2
  missing
 3
source
Transducers.tcollectFunction
tcollect(xf::Transducer, reducible; basesize) :: Union{Vector, Empty{Vector}}
tcollect(itr; basesize) :: Union{Vector, Empty{Vector}}

Thread-based parallel version of collect. This is just a short-hand notation of tcopy(xf, Vector, reducible). Use tcopy to get a container other than a Vector.

See also: Parallel processing tutorial (especially Example: parallel collect).

Transducers.jl 0.4.5

New in version 0.4.5.

Transducers.jl 0.4.8

tcollect now accepts iterator comprehensions and eductions.

Examples

julia> using Transducers

julia> tcollect(Map(x -> x^2), 1:2)
2-element Vector{Int64}:
 1
 4

julia> tcollect(x^2 for x in 1:2)
2-element Vector{Int64}:
 1
 4
source
Transducers.dcollectFunction
dcollect(xf::Transducer, reducible; [basesize, threads_basesize]) :: Union{Vector, Empty{Vector}}
dcollect(itr; [basesize, threads_basesize]) :: Union{Vector, Empty{Vector}}

Distributed.jl-based parallel version of collect. This is just a short-hand notation of dcopy(xf, Vector, reducible). Use dcopy to get a container other than a Vector.

See also: Parallel processing tutorial (especially Example: parallel collect).

Transducers.jl 0.4.5

New in version 0.4.5.

Transducers.jl 0.4.8

dcollect now accepts iterator comprehensions and eductions.

source
Base.ChannelType
Channel(xf::Transducer, itr; kwargs...)
Channel(ed::Eduction; kwargs...)

Pipe items from an iterable itr processed by the transducer xf through a channel. Channel(xf, itr) and Channel(eduction(xf, itr)) are equivalent. Note that itr itself can be a Channel.

Keyword arguments are passed to Channel(function; kwargs...).

Examples

julia> using Transducers

julia> ch1 = Channel(Filter(isodd), 1:5);

julia> ch2 = Channel(Map(x -> 2x - 1), ch1);

julia> ed = eduction(Map(x -> 1:x), ch2);

julia> ch3 = Channel(Cat(), ed);

julia> foreach(PartitionBy(isequal(1)), ch3) do input
           @show input
       end;
input = [1, 1]
input = [2, 3, 4, 5]
input = [1]
input = [2, 3, 4, 5, 6, 7, 8, 9]
source

Experimental transducible processes

Transducers.channel_unorderedFunction
channel_unordered(xf, input; eltype, size, ntasks, basesize) :: Channel{eltype}
channel_unordered(itr; eltype, size, ntasks, basesize) :: Channel{eltype}

Provide elements in input processed by a transducer xf through a Channel.

Unary method channel_unordered(itr) produces a Channel that provides elements in the input iterator itr with possibly different order. Iterator comprehensions and eductions can be passed as the input itr.

Use append_unordered! to send outputs to an existing channel.

Transducers.jl 0.4.8

New in version 0.4.8.

Transducers.jl 0.4.9

Unary method channel_unordered(itr) requires Transducers.jl 0.4.9.

Warning

This API is experimental. Backward incompatible change, including the removal of this API, is more likely to occur than other parts of this package.

Keyword Arguments

  • eltype::Type: element type of returned channel
  • size: The size of Channel. A non-negative Int or Inf.
  • ntasks::Int: Number of concurrent tasks. Default to Threads.nthreads().
  • basesize: The "batch size" of the processing; i.e., the number of elements to be processed in a single task. Default to 1.

Examples

julia> using Transducers: Map, channel_unordered

julia> sort!(collect(channel_unordered(Map(x -> x + 1), 1:3)))
3-element Vector{Any}:
 2
 3
 4

julia> sort!(collect(channel_unordered(x + 1 for x in 1:3 if isodd(x))))
2-element Vector{Any}:
 2
 4
source
Transducers.append_unordered!Function
append_unordered!(output, xf, input; ntasks, basesize)
append_unordered!(output, itr; ntasks, basesize)

Process input elements through a transducer xf and then push! them into output in undefined order.

Binary method append_unordered!(output, itr) is like append!(output, itr) but without order guarantee. Iterator comprehensions and eductions can be passed as the input itr.

output (typically a Channel) must implement thread-safe push!(output, x) method.

See also channel_unordered.

Transducers.jl 0.4.8

New in version 0.4.8.

Transducers.jl 0.4.9

Binary method append_unordered!(output, itr) requires Transducers.jl 0.4.9.

Warning

This API is experimental. Backward incompatible change, including the removal of this API, is more likely to occur than other parts of this package.

Examples

julia> using Transducers: Map, append_unordered!

julia> input = Channel(Map(identity), 1:3);

julia> output = Channel{Int}(0);

julia> task = @async try
           append_unordered!(output, Map(x -> x + 1), input)
       finally
           close(output)
       end;

julia> sort!(collect(output))
3-element Vector{Int64}:
 2
 3
 4

julia> input = Channel(Map(identity), 1:3);

julia> output = Channel{Int}(0);

julia> task = @async try
           append_unordered!(output, (y for x in input if isodd(x) for y in 1:x))
       finally
           close(output)
       end;

julia> sort!(collect(output))
4-element Vector{Int64}:
 1
 1
 2
 3
source

Transducers

Transducers.TransducerType
Transducer

The abstract type for transducers.

A transducer xf can be used as both iterator transformation xf(itr) and reducing function transformation xf'(rf).

See also adjoint for xf'(rf).

Transducers.jl 0.4.39

The call overload xf(rf) requires Transducers.jl 0.4.39 or later.

Note

The call overload xf(rf) requires Julia 1.3 or later. For older Julia versions, use eduction.

Examples

julia> using Transducers

julia> xs = Map(inv)(2:2:4)
2-element StepRange{Int64, Int64} |>
    Map(inv)

julia> collect(xs)
2-element Vector{Float64}:
 0.5
 0.25

julia> rf = Map(inv)'(+)
Reduction(
    Map(inv),
    BottomRF(
        +))

julia> rf(1, 4)  # +(1, inv(4))
1.25
source
Base.:∘Function
f ⨟ g
g ∘ f
opcompose(f, g)
compose(g, f)

Composition of transducers.

Transducers.jl 0.4.39

Transducers.jl 0.4.39 or later is required for composing transducers with and other operators and functions derived from it.

Transducers written as f |> g |> h in previous versions of Transducers.jl can now be written as f ⨟ g ⨟ h (in Julia 1.5 or later) or opcompose(f, g, h).

Note

"op" in opcompose does not stand for operator; it stands for opposite.

source
Base.adjointFunction
xf'

xf'(rf₁) is a shortcut for calling reducingfunction(xf, rf₁).

More precisely, adjoint xf′ of a transducer xf is a reducing function transform rf₁ -> rf₂. That is to say, xf' a function that maps a reducing function rf₁ to another reducing function rf₂.

Examples

julia> using Transducers

julia> y = Map(inv)'(+)(10, 2)
10.5

julia> y == +(10, inv(2))
true
source
Transducers.BroadcastingType
Broadcasting()

Broadcast inner reducing function over elements in the input. Roughly speaking, it transforms the inner reducing function op to op′(a, b) = op.(a, b). However, it has a better memory usage and better initial value handling.

If the input is an array, the array created at the first iteration is reused if it can hold the element types of subsequent iterations. Otherwise, the array type is widen as needed.

If init passed to the fold function is a lazy "initializer" object such as OnInit, it is initialized independently for each item in the first input array. This makes using Broadcasting for (possibly) in-place functions safe.

Transducers.jl 0.4.32

New in version 0.4.32.

Note

Broadcasting transducer is not supported in Julia 1.0.

Examples

julia> using Transducers

julia> foldl(+, Broadcasting(), [[1, 2], [3, 4], 5])
2-element Vector{Int64}:
  9
 11

julia> foldl(+, Broadcasting(), [(0,), [1], [2.0], [3 + 0im]])
1-element Vector{ComplexF64}:
 6.0 + 0.0im

julia> foldl(
           *,
           [[[1], [10, 100]], [[2], [20, 200]], [[3], [30, 300]]] |>
               Broadcasting() |> Broadcasting(),
       )
2-element Vector{Vector{Int64}}:
 [6]
 [6000, 6000000]

When processing nested data structure (e.g., array-of-arrays) and mutating accumulator in-place, be careful with sharing the accumulators with each processing of items in the input. For example, this is a bad example:

julia> add!(a, b) = a .+= b;

julia> foldl(add!, Broadcasting(), [[[1], [2, 3]], [[4, 5], 6]];
             init = Ref([0, 0]))
2-element Vector{Vector{Int64}}:
 [13, 15]
 [13, 15]

julia> ans[1] === ans[2]  # they are the same object
true

Use OnInit to initialize a new array with each item in the input:

julia> foldl(add!, Broadcasting(), [[[1], [2, 3]], [[4, 5], 6]];
             init = OnInit(() -> [0, 0]))
2-element Vector{Vector{Int64}}:
 [5, 6]
 [8, 9]

julia> ans == [
           add!(add!([0, 0], [1]), [4, 5]),
           add!(add!([0, 0], [2, 3]), 6),
       ]
true
source
Transducers.CatType
Cat()

Concatenate/flatten nested iterators.

This API is modeled after cat in Clojure.

Examples

julia> using Transducers

julia> collect(Cat(), [[1, 2], [3], [4, 5]]) == 1:5
true
source
Transducers.ConsecutiveType
Consecutive(size, step = size)
Consecutive(size; step = size)

Sliding window of width size and interval step. Yield tuples.

This transducer is like Partition but feeds tuples to the downstream transducers instead of vectors.

If step == 1, this transducer supports parallel reduction for any collections; i.e., Consecutive(size, 1)'(op) is associative if op is associative.

Warning

Currently, in parallel folds, Consecutive(size, 1) cannot be used with reducing functions that can produce a Reduced.

If step > 1, this transducer can, in principle, support parallel reduction if the input colection allows random access (e.g., arrays). However, this feature is not implemented yet.

Examples

julia> using Transducers

julia> 1:8 |> Consecutive(3) |> collect
2-element Vector{Tuple{Int64, Int64, Int64}}:
 (1, 2, 3)
 (4, 5, 6)

julia> 1:8 |> Consecutive(3; step = 1) |> collect
6-element Vector{Tuple{Int64, Int64, Int64}}:
 (1, 2, 3)
 (2, 3, 4)
 (3, 4, 5)
 (4, 5, 6)
 (5, 6, 7)
 (6, 7, 8)
source
Transducers.CountType
Count([start[, step]])

Generate a sequence start, start + step, start + step + step, and so on.

Note that input is ignored. To use the input in the downstream reduction steps, use Zip.

start defaults to 1 and step defaults to oneunit(start).

See also: Iterators.countfrom. Enumerate

Examples

julia> using Transducers

julia> collect(Zip(Map(identity), Count()), -3:-1)
3-element Vector{Tuple{Int64, Int64}}:
 (-3, 1)
 (-2, 2)
 (-1, 3)

julia> using Dates

julia> 1:3 |> Zip(Map(identity), Count(Day(1))) |> MapSplat(*) |> collect ==
       [Day(1), Day(4), Day(9)]
true
source
Transducers.DedupeType
Dedupe()
Dedupe(eq)

De-duplicate consecutive items. Comparison operator which identifies duplicates can be specified by the eq parameter, which defaults to == (equal).

This API is modeled after dedupe in Clojure.

Examples

julia> using Transducers

julia> collect(Dedupe(), [1, 1, 2, 1, 3, 3, 2])
5-element Vector{Int64}:
 1
 2
 1
 3
 2
source
Transducers.DropType
Drop(n)

Drop first n items.

This API is modeled after drop in Clojure.

Examples

julia> using Transducers

julia> 1:5 |> Drop(3) |> collect
2-element Vector{Int64}:
 4
 5
source
Transducers.DropLastType
DropLast(n)

Drop last n items.

This API is modeled after drop-last in Clojure.

Examples

julia> using Transducers

julia> 1:5 |> DropLast(2) |> collect
3-element Vector{Int64}:
 1
 2
 3

julia> 1:1 |> DropLast(2) |> collect == []
true

julia> 1:0 |> DropLast(2) |> collect == []
true
source
Transducers.DropWhileType
DropWhile(pred)

Drop items while pred returns true consecutively. It becomes a no-op after pred returns a false.

This API is modeled after drop-while in Clojure.

Examples

julia> using Transducers

julia> collect(DropWhile(x -> x < 3), [1:5; 1:2])
5-element Vector{Int64}:
 3
 4
 5
 1
 2
source
Transducers.EnumerateType
Enumerate([start[, step]])

Transducer variant of Base.enumerate. The start and step arguments are optional and have the same meaning as in Count.

Examples

julia> using Transducers

julia> collect(Enumerate(), ["A", "B", "C"])
3-element Vector{Tuple{Int64, String}}:
 (1, "A")
 (2, "B")
 (3, "C")

julia> start=2; step=3;

julia> collect(Enumerate(start, step), ["A", "B", "C"])
3-element Vector{Tuple{Int64, String}}:
 (2, "A")
 (5, "B")
 (8, "C")
source
Transducers.FilterType
Filter(pred)

Skip items for which pred is evaluated to false.

This API is modeled after filter in Clojure.

Examples

julia> using Transducers

julia> 1:3 |> Filter(iseven) |> collect
1-element Vector{Int64}:
 2
source
Transducers.FlagFirstType
FlagFirst()

Output (isfirst, input) where isfirst::Bool is true only for the first iteration and input is the original input.

See also: IterTools.flagfirst

Examples

julia> using Transducers

julia> collect(FlagFirst(), 1:3)
3-element Vector{Tuple{Bool, Int64}}:
 (1, 1)
 (0, 2)
 (0, 3)
source
Transducers.GroupByType
GroupBy(key, rf, [init])
GroupBy(key, xf::Transducer, [step = right, [init]])

Group the input stream by a function key and then fan-out each group of key-value pairs to the reducing function rf.

For example, if GroupBy is used as in:

xs |> Map(upstream) |> GroupBy(key, rf, init) |> Map(downstream)

then the "function signatures" would be:

upstream(_) :: V
key(::V) :: K
rf(::Y, ::Pair{K, V}) ::Y
downstream(::Dict{K, Y})

That is to say,

  • Ouput of the upstream is fed into the function key that produces the group key (of type K).

  • For each new group key, a new transducible process is started with the initial state init :: Y. Pass OnInit or CopyInit object to init for creating a dedicated (possibly mutable) state for each group.

  • After one "nested" reducing function rf is called, the intermediate result dictionary (of type Dict{K, Y}) accumulating the current and all preceding results is then fed into the downstream.

See also groupreduce in SplitApplyCombine.jl.

Transducers.jl 0.3

New in version 0.3.

Examples

julia> using Transducers
       using BangBang  # for `push!!`

julia> foldl(right, GroupBy(string, Map(last), push!!), [1, 2, 1, 2, 3])
Transducers.GroupByViewDict{String,Vector{Int64},…}(...):
  "1" => [1, 1]
  "2" => [2, 2]
  "3" => [3]

Note that the reduction stops if one of the group returns a reduced. This can be used, for example, to find if there is a group with a sum grater than 3 and stop the computation as soon as it is find:

julia> result = transduce(
           GroupBy(
               string,
               opcompose(Map(last), Scan(+), ReduceIf(x -> x > 3)),
           ),
           right,
           nothing,
           [1, 2, 1, 2, 3],
       );

julia> result isa Reduced
true

julia> unreduced(result)
Transducers.GroupByViewDict{String,Any,…}(...):
  "1" => 2
  "2" => 4
source
Transducers.InterposeType
Interpose(sep)

Interleave input items with a sep.

This API is modeled after interpose in Clojure.

Examples

julia> using Transducers

julia> collect(Interpose(missing), 1:3)
5-element Vector{Union{Missing, Int64}}:
 1
  missing
 2
  missing
 3
source
Transducers.IteratedType
Iterated(f, init)

Generate a sequence init, f(init), f(f(init)), f(f(f(init))), and so on.

Note that input is ignored. To use the input in the downstream reduction steps, use Zip.

Pass OnInit or CopyInit object to init for creating a dedicated (possibly mutable) state for each fold.

See also: Scan, ScanEmit.

The idea is taken from IterTools.iterated

Examples

julia> using Transducers

julia> collect(Iterated(x -> 2x, 1), 1:5)
5-element Vector{Int64}:
  1
  2
  4
  8
 16

julia> collect(Zip(Map(identity), Iterated(x -> 2x, 1)), 1:5)
5-element Vector{Tuple{Int64, Int64}}:
 (1, 1)
 (2, 2)
 (3, 4)
 (4, 8)
 (5, 16)
source
Transducers.KeepSomethingType
KeepSomething(f = identity)

Pass non-nothing output of f to the inner reducing step after possibly unwrapping Some.

This API is modeled after keep in Clojure.

Examples

julia> using Transducers

julia> xf = KeepSomething() do x
           if x == 0
               :zero
           elseif x == 1
               Some(:one)
           elseif x == 2
               Some(nothing)
           end
       end;

julia> collect(xf, 0:3)
3-element Vector{Union{Nothing, Symbol}}:
 :zero
 :one
 nothing

Note that NotA(Nothing) can be used to avoid automatically unwrapping Some:

julia> [Some(1), 2, nothing] |> KeepSomething() |> collect
2-element Vector{Int64}:
 1
 2

julia> [Some(1), 2, nothing] |> NotA(Nothing) |> collect
2-element Vector{Any}:
  Some(1)
 2
source
Transducers.MapType
Map(f)

Apply unary function f to each input and pass the result to the inner reducing step.

This API is modeled after map in Clojure.

Examples

julia> using Transducers

julia> collect(Map(x -> 2x), 1:3)
3-element Vector{Int64}:
 2
 4
 6
source
Transducers.MapCatType
MapCat(f)

Concatenate output of f which is expected to return an iterable.

This API is modeled after mapcat in Clojure.

Examples

julia> using Transducers

julia> collect(MapCat(x -> 1:x), 1:3)
6-element Vector{Int64}:
 1
 1
 2
 1
 2
 3
source
Transducers.MapSplatType
MapSplat(f)

Like Map(f) but calls f(input...) for each input and then pass the result to the inner reducing step.

Examples

julia> using Transducers

julia> collect(MapSplat(*), zip(1:3, 10:10:30))
3-element Vector{Int64}:
 10
 40
 90
source
Transducers.NondeterministicThreadingType
NondeterministicThreading(; basesize, ntasks = nthreads())

Parallelize inner reducing function using ntasks.

Given

#                   ,-- Not parallelized
#            ______/__
  foldxl(rf, xs |> xfo |> NondeterministicThreading() |> xfi)
#        ==                                              ===
#        Parallelized                                    Parallelized

the inner transducer xfi and the reducing function rf are parallelized but the outer transducer xfo and the iteration over data collection xs are not parallelized.

The scheduling of the tasks (hence the call tree of the inner reducing function) is non-deterministic. It means that the result is deterministic only if the inner reducing function is exactly associative. If the inner reducing function is only approximately associative (e.g., addition of floating point numbers), the result of reduction is likely different for each time.

The outer transducers and iterate are processed sequentially. In particular, the data collection does not have to implement SplittablesBase.halve.

Note

Currently, the default basesize is 1. However, it may be changed in the future (e.g. it may be automatically tuned at run-time).

Note

NondeterministicThreading does not work with Julia < 1.3.

Keyword Arguments

  • basesize::Integer: The number of input elements to be accumulated in a buffer before sent to a task.
  • ntasks::Integer: The number of tasks @spawned. The default value is Threads.nthreads(). A number larger than Threads.nthreads() may be useful if the inner reducing function contains I/O and does not consume too much resource (e.g., memory).

Examples

julia> using Transducers

julia> collect(1:4 |> Filter(isodd))
2-element Vector{Int64}:
 1
 3

julia> collect(1:4 |> NondeterministicThreading() |> Filter(isodd))
2-element Vector{Int64}:
 1
 3
source
Transducers.NotAType
NotA(T)

Skip items of type T. Unlike Filter(!ismissing), downstream transducers can have a correct type information for NotA(Missing).

See also: OfType

Examples

julia> using Transducers

julia> [1, missing, 2] |> NotA(Missing) |> collect
2-element Vector{Int64}:
 1
 2
source
Transducers.OfTypeType
OfType(T)

Include only items of type T.

See also: NotA

Examples

julia> using Transducers

julia> [1, missing, 2] |> OfType(Int) |> collect
2-element Vector{Int64}:
 1
 2
source
Transducers.PartitionType
Partition(size, step = size, flush = false)
Partition(size; step = size, flush = false)

Sliding window of width size and interval step. Yield vectors.

Note: step = size is the default. Hence, the default behavior is non-overlapping windows.

For small size, see Consecutive for a similar transducer that yields tuples instead.

Warning

The vector passed to the inner reducing function is valid only during its immediate reduction step. It must be reduced immediately or copied.

This API is modeled after partition-all in Clojure.

Examples

julia> using Transducers

julia> 1:8 |> Partition(3) |> Map(copy) |> collect
2-element Vector{Vector{Int64}}:
 [1, 2, 3]
 [4, 5, 6]

julia> 1:8 |> Partition(3; flush=true) |> Map(copy) |> collect
3-element Vector{Vector{Int64}}:
 [1, 2, 3]
 [4, 5, 6]
 [7, 8]

julia> 1:8 |> Partition(3; step=1) |> Map(copy) |> collect
6-element Vector{Vector{Int64}}:
 [1, 2, 3]
 [2, 3, 4]
 [3, 4, 5]
 [4, 5, 6]
 [5, 6, 7]
 [6, 7, 8]
source
Transducers.PartitionByType
PartitionBy(f)

Group input sequence into chunks in which f returns a same value consecutively.

Warning

The vector passed to the inner reducing function is valid only during its immediate reduction step. It must be reduced immediately or copied.

This API is modeled after partition-by in Clojure.

Examples

julia> using Transducers

julia> 1:9 |> PartitionBy(x -> (x + 1) ÷ 3) |> Map(copy) |> collect
4-element Vector{UnitRange{Int64}}:
 1:1
 2:4
 5:7
 8:9
source
Transducers.ReduceIfType
ReduceIf(pred)

Stop fold when pred(x) returns true for the output x of the upstream transducer.

Examples

julia> using Transducers

julia> foldl(right, ReduceIf(x -> x == 3), 1:10)
3
source
Transducers.ReducePartitionByType
ReducePartitionBy(f, rf, [init])

Reduce partitions determined by isequal on the output value of f with an associative reducing function rf. Partitions are reduced on-the-fly and no intermediate arrays are allocated.

Examples

Consider the input 1:6 "keyed" by a function x -> x ÷ 3:

julia> map(x -> x ÷ 3, 1:6)
6-element Vector{Int64}:
 0
 0
 1
 1
 1
 2

i.e., there are three partitions with the key values 0, 1, and 2. We can use ReducePartitionBy to compute, e.g., the length and the sum of each partition by:

julia> using Transducers

julia> 1:6 |> ReducePartitionBy(x -> x ÷ 3, Map(_ -> 1)'(+)) |> collect
3-element Vector{Int64}:
 2
 3
 1

julia> 1:6 |> ReducePartitionBy(x -> x ÷ 3, +) |> collect
3-element Vector{Int64}:
  3
 12
  6
source
Transducers.ReplaceType
Replace(assoc)

Replace each input with the value in the associative container assoc (e.g., a dictionary, array, string) if it matches with a key/index. Otherwise output the input as-is.

This API is modeled after replace in Clojure.

Examples

julia> using Transducers

julia> collect(Replace(Dict('a' => 'A')), "abc")
3-element Vector{Char}:
 'A': ASCII/Unicode U+0041 (category Lu: Letter, uppercase)
 'b': ASCII/Unicode U+0062 (category Ll: Letter, lowercase)
 'c': ASCII/Unicode U+0063 (category Ll: Letter, lowercase)

julia> collect(Replace([:a, :b, :c]), 0:4)
5-element Vector{Any}:
 0
  :a
  :b
  :c
 4

julia> collect(Replace("abc"), 0:4)
5-element Vector{Any}:
 0
  'a': ASCII/Unicode U+0061 (category Ll: Letter, lowercase)
  'b': ASCII/Unicode U+0062 (category Ll: Letter, lowercase)
  'c': ASCII/Unicode U+0063 (category Ll: Letter, lowercase)
 4
source
Transducers.ScanType
Scan(f, [init = Init])

Accumulate input with binary function f and pass the accumulated result so far to the inner reduction step.

The inner reducing step receives the sequence y₁, y₂, y₃, ..., yₙ, ... when the sequence x₁, x₂, x₃, ..., xₙ, ... is fed to Scan(f).

y₁ = f(init, x₁)
y₂ = f(y₁, x₂)
y₃ = f(y₂, x₃)
...
yₙ = f(yₙ₋₁, xₙ)

This is a generalized version of the prefix sum also known as cumulative sum, inclusive scan, or scan.

Note that the associativity of f is not required when the transducer is used in a process that gurantee an order, such as foldl.

Unless f is a function with known identity element such as +, *, min, max, and append!, the initial state init must be provided.

Pass OnInit or CopyInit object to init for creating a dedicated (possibly mutable) state for each fold.

See also: ScanEmit, Iterated.

Examples

julia> using Transducers

julia> collect(Scan(*), 1:3)
3-element Vector{Int64}:
 1
 2
 6

julia> 1:3 |> Map(x -> x + im) |> Scan(*) |> collect
3-element Vector{Complex{Int64}}:
 1 + 1im
 1 + 3im
 0 + 10im

julia> collect(Scan(*, 10), 1:3)
3-element Vector{Int64}:
 10
 20
 60
source
Transducers.ScanEmitType
ScanEmit(f, init[, onlast])

Accumulate input x with a function f with the call signature (u, x) -> (y, u) and pass the result y to the inner reduction step.

The inner reducing step receives the sequence y₁, y₂, y₃, ..., yₙ, ... computed as follows

u₀ = init
y₁, u₁ = f(u₀, x₁)
y₂, u₂ = f(u₁, x₂)
y₃, u₃ = f(u₂, x₃)
...
yₙ, uₙ = f(uₙ₋₁, xₙ)
...
yₒₒ = onlast(uₒₒ)

when the sequence x₁, x₂, x₃, ..., xₙ, ... is fed to ScanEmit(f).

Pass OnInit or CopyInit object to init for creating a dedicated (possibly mutable) state for each fold.

See also: Scan, Iterated.

Examples

julia> using Transducers

julia> collect(ScanEmit(tuple, 0), 1:3)
3-element Vector{Int64}:
 0
 1
 2
source
Transducers.TCatType
TCat(basesize::Integer)

Threaded version of Cat (concatenate/flatten).

To use this transducer, all the downstream (inner) transducers must be stateless (or of type ScanEmit) and the reducing function must be associative. See also: Parallel processing tutorial.

Note that the upstream (outer) transducers need not to be stateless as long as it is called with non-parallel reduction such as foldl and collect.

Examples

julia> using Transducers

julia> 1:3 |> Map(x -> 1:x) |> TCat(1) |> tcollect
6-element Vector{Int64}:
 1
 1
 2
 1
 2
 3

julia> 1:3 |> Scan(+) |> Map(x -> 1:x) |> TCat(1) |> collect
10-element Vector{Int64}:
 1
 1
 2
 3
 1
 2
 3
 4
 5
 6
source
Transducers.TakeType
Take(n)

Take n items from the input sequence.

This API is modeled after take in Clojure.

Examples

julia> using Transducers

julia> 1:10 |> Take(2) |> collect
2-element Vector{Int64}:
 1
 2

julia> 1:2 |> Take(5) |> collect
2-element Vector{Int64}:
 1
 2

Using a low-level API, it's possible to distinguish if the output is truncated or not:

julia> transduce(Take(3), Completing(push!!), Init, 1:2)
2-element Vector{Int64}:
 1
 2

julia> transduce(Take(3), Completing(push!!), Init, 1:4)
Reduced([1, 2, 3])

julia> transduce(Take(3), Completing(push!!), Init, 1:0)
InitialValue(push!!)

See also transduce, Reduced and Completing.

source
Transducers.TakeLastType
TakeLast(n)

Take last n items from the input sequence.

Examples

julia> using Transducers

julia> 1:10 |> TakeLast(2) |> collect
2-element Vector{Int64}:
  9
 10

julia> 1:2 |> TakeLast(5) |> collect
2-element Vector{Int64}:
 1
 2
source
Transducers.TakeNthType
TakeNth(n)

Output every n item to the inner reducing step.

This API is modeled after take-nth in Clojure.

Examples

julia> using Transducers

julia> 1:9 |> TakeNth(3) |> collect
3-element Vector{Int64}:
 1
 4
 7
source
Transducers.TakeWhileType
TakeWhile(pred)

Take items while pred returns true. Abort the reduction when pred returns false for the first time.

This API is modeled after take-while in Clojure.

Examples

julia> using Transducers

julia> [1, 2, 3, 1, 2] |> TakeWhile(x -> x < 3) |> collect
2-element Vector{Int64}:
 1
 2
source
Transducers.UniqueType
Unique(by = identity)

Pass only unseen item to the inner reducing step.

The item is distinguished by the output of function by when given.

This API is modeled after distinct in Clojure.

Transducers.jl 0.4.2

New in version 0.4.2.

Examples

julia> using Transducers

julia> [1, 1, 2, -1, 3, 3, 2] |> Unique() |> collect
4-element Vector{Int64}:
  1
  2
 -1
  3

julia> [1, 1, 2, -1, 3, 3, 2] |> Unique(x -> x^2) |> collect
3-element Vector{Int64}:
 1
 2
 3
source
Transducers.ZipMethod
Zip(xforms...)

Zip outputs of transducers xforms in a tuple and pass it to the inner reduction step.

Warning

Head transducers drive tail transducers. Be careful when using it with transducers other than Map, especially the contractive ones like PartitionBy and the expansive ones like MapCat.

Examples

julia> using Transducers

julia> collect(Zip(Map(identity), Map(x -> 10x), Map(x -> 100x)), 1:3)
3-element Vector{Tuple{Int64, Int64, Int64}}:
 (1, 10, 100)
 (2, 20, 200)
 (3, 30, 300)
source

Experimental transducers

Transducers.ZipSourceType
ZipSource(xform::Transducer)

Branch input into two "flows", inject one into xform and then merge (zip) the output of xform with the original (source) input.

Warning

This API is experimental. Backward incompatible change, including the removal of this API, is more likely to occur than other parts of this package.

To illustrate how it works, consider the following usage

collection |> xf0 |> ZipSource(xf1) |> xf2

where xf0, xf1, and xf2 are some transducers. Schematically, the output yn from xfn flows as follows:

xf0      xf1                       xf2
---- y0 ------ y1 ---.-- (y0, y1) ----->
      |              |
       `-------------'

Examples

julia> using Transducers
       using Transducers: ZipSource

julia> collect(ZipSource(opcompose(Filter(isodd), Map(x -> x + 1))), 1:5)
3-element Vector{Tuple{Int64, Int64}}:
 (1, 2)
 (3, 4)
 (5, 6)
source
Transducers.GetIndexType
GetIndex(array)
GetIndex{inbounds}(array)

Transform an integer input i to array[i].

Warning

This API is experimental. Backward incompatible change, including the removal of this API, is more likely to occur than other parts of this package.

Examples

julia> using Transducers
       using Transducers: GetIndex

julia> collect(GetIndex('a':'z'), [2, 3, 4])
3-element Vector{Char}:
 'b': ASCII/Unicode U+0062 (category Ll: Letter, lowercase)
 'c': ASCII/Unicode U+0063 (category Ll: Letter, lowercase)
 'd': ASCII/Unicode U+0064 (category Ll: Letter, lowercase)

julia> collect(GetIndex{true}('a':'z'), [2, 3, 4]) # Inbounds
3-element Vector{Char}:
 'b': ASCII/Unicode U+0062 (category Ll: Letter, lowercase)
 'c': ASCII/Unicode U+0063 (category Ll: Letter, lowercase)
 'd': ASCII/Unicode U+0064 (category Ll: Letter, lowercase)
source
Transducers.SetIndexType
SetIndex(array)
SetIndex{inbounds}(array)

Perform array[i] = v for each input pair (i, v).

Warning

This API is experimental. Backward incompatible change, including the removal of this API, is more likely to occur than other parts of this package.

Examples

julia> using Transducers
       using Transducers: SetIndex

julia> ys = zeros(3);

julia> foldl(first ∘ tuple, SetIndex(ys), [(1, 11.1), (3, 33.3)], init=nothing)

julia> ys
3-element Vector{Float64}:
 11.1
  0.0
 33.3
source
Transducers.InjectType
Inject(iterator)

Inject the output from iterator to the stream processed by the inner reduction step.

Warning

This API is experimental. Backward incompatible change, including the removal of this API, is more likely to occur than other parts of this package.

Examples

julia> using Transducers
       using Transducers: Inject

julia> collect(Inject(Iterators.cycle("hello")), 1:8)
8-element Vector{Tuple{Int64, Char}}:
 (1, 'h')
 (2, 'e')
 (3, 'l')
 (4, 'l')
 (5, 'o')
 (6, 'h')
 (7, 'e')
 (8, 'l')

julia> collect(Inject(Iterators.repeated([1 2])), 1:4)
4-element Vector{Tuple{Int64, Matrix{Int64}}}:
 (1, [1 2])
 (2, [1 2])
 (3, [1 2])
 (4, [1 2])

julia> collect(Inject(Iterators.product(1:2, 3:5)), 1:100)
6-element Vector{Tuple{Int64, Tuple{Int64, Int64}}}:
 (1, (1, 3))
 (2, (2, 3))
 (3, (1, 4))
 (4, (2, 4))
 (5, (1, 5))
 (6, (2, 5))
source

Other reducing function combinators

Transducers.TeeRFType
TeeRF(reducing_functions::Tuple)
TeeRF(reducing_functions...)

Combine multiple reducing functions into a new reducing function that "multicast" the input to multiple reducing functions.

Roughly speaking, TeeRF(op₁, op₂, ..., opₙ) is equivalent to

((a₁, a₂, ..., aₙ), x) -> (op₁(a₁, x), op₂(a₂, x), ..., opₙ(aₙ, x))

For combine, it behaves like ProductRF.

Transducers.jl 0.4.32

New in version 0.4.32.

Examples

julia> using Transducers

julia> extrema′(xs, xf = Map(identity)) = foldl(TeeRF(min, max), xf, xs);

julia> extrema′([5, 2, 6, 8, 3])
(2, 8)

Note that the input is considered empty unless all reducing functions call their bottom reducing functions. Specify init to obain results even when the input collection is empty or all filtered out.

julia> filtering_max = Filter(isodd)'(max);

julia> foldl(TeeRF(min, filtering_max), Map(identity), [5, 2, 6, 8, 3])
(2, 5)

julia> foldl(TeeRF(min, filtering_max), Map(identity), 2:2:8)
ERROR: EmptyResultError: ...

julia> foldl(TeeRF(min, filtering_max), Map(identity), 2:2:8; init = Init)
(2, InitialValue(max))
source
Transducers.ProductRFType
ProductRF(reducing_functions::Tuple)
ProductRF(reducing_functions...)

Combine N reducing functions into a new reducing function that work on N-tuple. The i-th reducing function recieves the i-th element of the input tuple.

Roughly speaking, ProductRF(op₁, op₂, ..., opₙ) is equivalent to

((a₁, a₂, ..., aₙ), (b₁, b₂, ..., bₙ)) -> (op₁(a₁, b₁), op₂(a₂, b₂), ..., opₙ(aₙ, bₙ))
Transducers.jl 0.4.32

New in version 0.4.32.

Examples

Like TeeRF, ProductRF can be used to drive multiple reducing functions. ProductRF is more "low-level" in the sense that TeeRF can be defined in terms of ProductRF (other direction is much harder):

julia> using Transducers

julia> TeeRF′(fs...) = reducingfunction(
           Map(x -> ntuple(_ -> x, length(fs))),
           ProductRF(fs...),
       );

julia> foldl(TeeRF′(min, max), Map(identity), [5, 2, 6, 8, 3])
(2, 8)

ProductRF may be useful for handling pre-existing stream whose item type is already a tuple:

julia> foldl(ProductRF(&, +), Map(x -> (isodd(x), x)), [5, 2, 6, 8, 3])
(false, 24)

julia> foldl(TeeRF(reducingfunction(Map(isodd), &), +), Map(identity), [5, 2, 6, 8, 3])
(false, 24)
source
Transducers.wheninitFunction
wheninit(oninit, rf) -> rf′
wheninit(oninit) -> rf -> rf′
whenstart(start, rf) -> rf′
whenstart(start) -> rf -> rf′
whencomplete(complete, rf) -> rf′
whencomplete(complete) -> rf -> rf′
whencombine(combine, rf) -> rf′
whencombine(combine) -> rf -> rf′

Add initialization/completion/merging phase to arbitrary reducing function.

The functions passed to those combinators are used as follows in foldl:

init′ = oninit()  # if oninit is given; otherwise standard `init`-preprocessing
acc = start(init′)
for x in collection
    acc += rf(acc, x)
end
result = acc
return complete(result)

In foldxt, a collection is split in multiple parts and then above foldl except for complete is run on them, yielding multiple results which are combined by repeatedly calling combine(result_1, result_2). Note that this allows non-associative function for next while combine must be associative.

See also next, start, complete, and combine.

Arguments

  • rf: reducing function
  • oninit: nullary function that generates an initial value for rf
  • start: unary function that pre-process the initial value for rf
  • complete: unary function that post-process the accumulator
  • combine: (approximately) associative binary function for combining multiple results of rf (before post-processed by complete).

Extended help

Examples

An example for using non-associative reducing function in foldxt:

julia> using Transducers

julia> collector! = push! |> whencombine(append!) |> wheninit(() -> []);

julia> foldxt(collector!, Filter(isodd), 1:5; basesize = 1)
3-element Vector{Any}:
 1
 3
 5

More "tightly" typed vector can returned by using BangBang.jl interface:

julia> collector!! = push!! |> whencombine(append!!) |> wheninit(Vector{Union{}});

julia> foldxt(collector!!, Filter(isodd), 1:5; basesize = 1)
3-element Vector{Int64}:
 1
 3
 5

Online averaging algorithm can be implemented, e.g., by:

julia> averaging = function add_average((sum, count), x)
           (sum + x, count + 1)
       end |> wheninit() do
           (Init(+), 0)
       end |> whencombine() do (sum1, count1), (sum2, count2)
           (sum1 + sum2), (count1 + count2)
       end |> whencomplete() do (sum, count)
           sum / count
       end;

julia> foldl(averaging, Filter(isodd), 1:5)
3.0

julia> foldxt(averaging, Filter(isodd), 1:50; basesize = 1)
25.0

An alternative implementation is to use Map to construct a singleton solution and then merge it into the accumulated solution:

julia> averaging2 = function merge_average((sum1, count1), (sum2, count2))
           (sum1 + sum2, count1 + count2)
       end |> whencomplete() do (sum, count)
           sum / count
       end |> Map() do x
           (x, 1)
       end';  # `'` here is important

julia> foldl(averaging2, Filter(isodd), 1:5)
3.0

julia> foldxt(averaging2, Filter(isodd), 1:50; basesize = 1)
25.0
source

Early termination

Transducers.ReducedType
Reduced

The type signaling transducible processes to abort.

Note

Call reduced function for aborting the transducible process since reduced makes sure x is not doubly wrapped. Reduced is meant to be used as x isa Reduced for checking if the result from transduce is due to early termination.

See reduced, unreduced.

Examples

julia> using Transducers

julia> function step_demo(y, x)
           if x > 5
               return reduced(y)
           else
               return y + x
           end
       end;

julia> result = transduce(Map(identity), Completing(step_demo), 0, 1:10)
Reduced(15)

julia> result isa Reduced
true

julia> unreduced(result)
15

julia> result = transduce(Map(identity), Completing(step_demo), 0, 1:4)
10

julia> result isa Reduced
false

julia> unreduced(result)
10
source
Transducers.reducedFunction
reduced([x = nothing])

Stop transducible process with the final value x (default: nothing). Return x as-is if it already is a reduced value.

See Reduced, unreduced.

This API is modeled after ensure-reduced in Clojure.

Examples

julia> using Transducers

julia> foldl(Enumerate(), "abcdef"; init=0) do y, (i, x)
           if x == 'd'
               return reduced(y)
           end
           return y + i
       end
6

julia> foreach(Enumerate(), "abc") do (i, x)
           println(i, ' ', x)
           if x == 'b'
               return reduced()
           end
       end;
1 a
2 b
source
Transducers.ifunreducedFunction
ifunreduced(f, [x])

Equivalent to unreduced(x) if x is a Reduced; otherwise run f(x). Return a curried version if x is not provided.

See: foreach.

Examples

julia> using Transducers

julia> 1 |> ifunreduced() do x
           println("called with x = ", x)
       end
called with x = 1

julia> reduced(1) |> ifunreduced() do x
           println("called with x = ", x)
       end
1

Notice that nothing is printed in the last example.

Implementation

ifunreduced(f) = x -> ifunreduced(f, x)
ifunreduced(f, x::Reduced) = unreduced(x)
ifunreduced(f, x) = f(x)
source

Executors

Transducers.SequentialExType
SequentialEx(; simd)

Sequential fold executor. It can be passed to APIs from packages such as Folds.jl and FLoops.jl to run the algorithm sequentially.

See also: foldxl, ThreadedEx and DistributedEx.

Keyword Arguments

  • simd: If true or :ivdep, enable SIMD using Base.@simd. If :ivdep, use @simd ivdep for ... end variant. Read Julia manual of Base.@simd to understand when it is appropriate to use this option. For example, simd = :ivdep must not be used with stateful transducer like Scan. If false (default), Base.@simd is not used.

Examples

julia> using Folds

julia> Folds.sum(1:3, SequentialEx())
6
source
Transducers.ThreadedExType
ThreadedEx(; basesize, stoppable, nestlevel, simd)

Multi-threaded fold executor. This is the default [1] parallel executor used by Folds.jl and FLoops.jl.

See also: foldxt, SequentialEx and DistributedEx.

Keyword Arguments

  • basesize::Integer = amount(reducible) ÷ nthreads(): A size of chunk in reducible that is processed by each worker. A smaller size may be required when:

    • computation time for processing each item fluctuates a lot
    • computation can be terminated by reduced or transducers using it, such as ReduceIf
  • stoppable::Bool: [This option usually does not have to be set manually.] The threaded fold executed in the "stoppable" mode used for optimizing reduction with reduced has a slight overhead if reduced is not used. This mode can be disabled by passing stoppable = false. It is usually automatically detected and set appropriately. Note that this option is purely for optimization and does not affect the result value.

  • nestlevel::Union{Integer,Val}: Specify how many inner Cat (flatten) transducers to be multi-threaded (using TCat). It must be a positive integer, Val of positive integer, or Val(:inf). Val(:inf) means to use multi-threading for all Cat transducers. Note that Cat transducer should be statically known. That is to say, the fold implementation sees two Cats in ... |> Map(f) |> Cat() |> Cat() but only one Cat in ... |> Map(x -> f(x) |> Cat()) |> Cat() even though they are semantically identical.

  • simd: If true or :ivdep, enable SIMD using Base.@simd. If :ivdep, use @simd ivdep for ... end variant. Read Julia manual of Base.@simd to understand when it is appropriate to use this option. For example, simd = :ivdep must not be used with stateful transducer like Scan. If false (default), Base.@simd is not used.

Examples

julia> using Folds

julia> Folds.sum(1:3, ThreadedEx(basesize = 1))
6
source
Transducers.DistributedExType
DistributedEx(; pool, basesize, threads_basesize, simd)

Distributed fold executor. It can be passed to APIs from packages such as Folds.jl and FLoops.jl to run the algorithm sequentially.

See also: foldxd, SequentialEx and ThreadedEx.

Keyword Arguments

  • pool::AbstractWorkerPool: Passed to Distributed.remotecall.

  • basesize::Integer = amount(array) ÷ nworkers(): A size of chunk in array that is processed by each worker. A smaller size may be required when computation time for processing each item can fluctuate a lot.

  • threads_basesize::Integer = basesize ÷ nthreads(): A size of chunk in array that is processed by each task in each worker process. The default setting assumes that the number of threads used in all workers are the same. For heterogeneous setup where each worker process has different number of threads, it may be required to use smaller threads_basesize and basesize to get a good performance.

  • simd: If true or :ivdep, enable SIMD using Base.@simd. If :ivdep, use @simd ivdep for ... end variant. Read Julia manual of Base.@simd to understand when it is appropriate to use this option. For example, simd = :ivdep must not be used with stateful transducer like Scan. If false (default), Base.@simd is not used.

Examples

julia> using Folds

julia> Folds.sum(1:3, DistributedEx())
6
source
Transducers.PreferParallelType
PreferParallel(; simd, basesize)

A "placeholder" executor that indicates preference to parallel execution.

This lets the input data collection decide preferred execution strategy (e.g., CUDAEx for CuArray when FoldsCUDA.jl is available), assuming that the reducing function is associative. The default executor is ThreadedEx. As an optional feature, some input data collections support (e.g., AbstractChannel) automatically demoting the execution strategy to SequentialEx. An error is thrown if the automatic detection fails,

source

Miscellaneous

Transducers.SplitByType
SplitBy(f; [keepend = false,] [keepempty = false,])

Split input collection into chunks delimited by the elements on which f returns true. This can be used to implement parallel and lazy versions of functions like eachline and split.

If keepend is true (or Val(true)), include the "delimiter"/end element at the end of each chunk. If keepempty is true (or Val(true)), include empty chunks. When keepend is true, the value of keepempty is irrelevant since the chunks cannot be empty (i.e., it at least contains the end).

The input collection (xs in SplitBy(...)(xs)) has to support eachindex and view or SubString.

Extended Help

Examples

For demonstration, consider the following input stream and SplitBy(iszero; ...) used with the following options:

input     keepend=false       keepend=false        keepend=true
          keepempty=false     keepempty=true

1           `.                  `.                  `.
2            | y1                | y1                | y1
3           ,'                  ,'                   |
0                                                   ,'
1           `.                  `.                  `.
2            | y2                | y2                | y2
3            |                   |                   |
4           ,'                  ,'                   |
0                               __ y3               ,'
0                                                    ] y3
1           `.                  `.                  `.
2            | y3                | y4                | y4

In the above diagram, yi (i = 1, 2, 3, 4) are the output of SplitBy. This can be checked in the REPL easily as follows. (Note: we are using Map(collect) for cleaner printing; it's not required unless the arrays is mutated in the downstream.)

julia> using Transducers

julia> xs = [1, 2, 3, 0, 1, 2, 3, 4, 0, 0, 1, 2];  # input

julia> xs |> SplitBy(iszero) |> Map(collect) |> collect
3-element Vector{Vector{Int64}}:
 [1, 2, 3]
 [1, 2, 3, 4]
 [1, 2]

julia> xs |> SplitBy(iszero; keepempty = true) |> Map(collect) |> collect
4-element Vector{Vector{Int64}}:
 [1, 2, 3]
 [1, 2, 3, 4]
 []
 [1, 2]

julia> xs |> SplitBy(iszero; keepend = true) |> Map(collect) |> collect
4-element Vector{Vector{Int64}}:
 [1, 2, 3, 0]
 [1, 2, 3, 4, 0]
 [0]
 [1, 2]
source
Transducers.TransducerMethod
Transducer(iterator::Iterators.Generator)
Transducer(iterator::Iterators.Filter)
Transducer(iterator::Iterators.Flatten)

Extract "processing" part of an iterator as a Transducer. The "data source" iterator (i.e., xs in (f(x) for x in xs)) is ignored and nothing must be used as a place holder (i.e., (f(x) for x in nothing)).

See also eduction.

Transducers.jl 0.3

New in version 0.3.

Examples

julia> using Transducers

julia> xf1 = Transducer(2x for x in nothing if x % 2 == 0);

julia> xf2 = opcompose(Filter(x -> x % 2 == 0), Map(x -> 2x));  # equivalent

julia> xs = 1:10
       collect(xf1, xs) == collect(xf2, xs)
true
source
Transducers.TransducerMethod
Transducer(o::OnlineStat)

Use an OnlineStat as a stateful transducer.

It is implemented as:

opcompose(Scan(fit!, CopyInit(o)), Map(value))

Examples

julia> using Transducers
       using OnlineStats: Mean

julia> collect(Transducer(Mean()), 1:4)
4-element Vector{Float64}:
 1.0
 1.5
 2.0
 2.5
source
Transducers.reducingfunctionFunction
reducingfunction(xf, step; simd)
xf'(step; simd)

Apply transducer xf to the reducing function step to create a new reducing function.

Transducers.jl 0.3

New in version 0.3.

Warning

Be careful using reducingfunction with stateful transducers like Scan with mutable init (e.g., Scan(push!, [])). See more in Examples below.

Arguments

  • xf::Transducer: A transducer.
  • step: A callable which accepts 1 and 2 arguments. If it only accepts 2 arguments, wrap it with Completing to "add" 1-argument form (i.e., complete protocol).

Keyword Arguments

Examples

julia> using Transducers

julia> rf = reducingfunction(Map(x -> x + 1), *);

julia> rf(10, 2) === 10 * (2 + 1)
true

Warning: Be careful when using reducingfunction with stateful transducers

Stateful Transducers themselves in Transducers.jl are not inherently broken with reducingfunction. However, it can produce incorrect results when combined with mutable states:

julia> scan_state = [];

julia> rf_bad = opcompose(Scan(push!, scan_state), Cat())'(string);

julia> transduce(rf_bad, "", 1:3)
"112123"

The first run works. However, observe that the vector scan_state is not empty anymore:

julia> scan_state
3-element Vector{Any}:
 1
 2
 3

Thus, the second run produces an incorrect result:

julia> transduce(rf_bad, "", 1:3)
"123112312123123"

One way to solve this issue is to use CopyInit or OnInit.

julia> scan_state = CopyInit([])
CopyInit(Any[])

julia> rf_good = opcompose(Scan(push!, scan_state), Cat())'(string);

julia> transduce(rf_good, "", 1:3)
"112123"

julia> scan_state
CopyInit(Any[])

julia> transduce(rf_good, "", 1:3)
"112123"
source
reducingfunction([xf::Transducer,] o::OnlineStat; simd)

Convert an OnlineStat to a reducing function. Returned function can be used with foldl, foldxt, and foldxd. Note that input o is only used as a "prototype"; i.e., it's not going to be mutated.

Examples

julia> using Transducers
       using OnlineStats: Mean

julia> foldl(reducingfunction(Mean()), Map(x -> x^2), 1:4)
Mean: n=4 | value=7.5

julia> foldl(Mean(), Map(x -> x^2), 1:4)  # equivalent to above
Mean: n=4 | value=7.5

julia> foldxt(Mean(), Map(x -> x^2), 1:4)  # threaded
Mean: n=4 | value=7.5

julia> foldl(Mean(), eduction(x^2 for x in 1:4))  # ditto
Mean: n=4 | value=7.5

julia> foldxt(Mean(), eduction(x^2 for x in 1:4))  # ditto
Mean: n=4 | value=7.5

foldxd can be used instead of foldxt. However the usual caveats of code availability for Distributed.jl apply.

source
Transducers.InitConstant
Init
Init(op) :: InitialValues.InitialValue

The canonical initializer; i.e., a singleton placeholder usable for init argument of foldl for binary functions with known initial values.

When init = Init is passed to foldl etc., Init(op) is called for the bottom reducing function op during the start phase. Init(op) returns InitialValue(op) which acts as the canonical initial value of op.

Examples

julia> using Transducers

julia> foldl(+, 1:3 |> Filter(isodd); init = Init)
4

julia> foldl(+, 2:2:4 |> Filter(isodd); init = Init)
InitialValue(+)

Extended help

Note that op passed to foldl etc. must be known to InitialValues.jl:

julia> unknown_op(a, b) = a + b;

julia> foldl(unknown_op, 2:2:4 |> Filter(isodd); init = Init)
ERROR: IdentityNotDefinedError: `init = Init` is specified but the identity element `InitialValue(op)` is not defined for
    op = unknown_op
[...]

InitialValues.asmonoid can be used to wrap a binary function to add ("adjoin") the identity value to its domain:

julia> using InitialValues: asmonoid

julia> foldl(asmonoid(unknown_op), 2:2:4 |> Filter(isodd); init = Init)
InitialValue(::InitialValues.AdjoinIdentity{typeof(unknown_op)})

When start(rf, Init) is called with a composite reducing function rf, Init(rf₀) is called for the bottom reducing function rf₀ of rf:

julia> rf = Take(3)'(+);  # `+` is the bottom reducing function

julia> acc = Transducers.start(rf, Init);

julia> Transducers.unwrap(rf, acc)
(3, InitialValue(+))
source
Transducers.OnInitType
OnInit(f)

Call a callable f to create an initial value.

See also CopyInit.

OnInit or CopyInit must be used whenever using in-place reduction with foldxt etc.

Examples

julia> using Transducers

julia> xf1 = Scan(push!, [])
Scan(push!, Any[])

julia> foldl(right, xf1, 1:3)
3-element Vector{Any}:
 1
 2
 3

julia> xf1
Scan(push!, Any[1, 2, 3])

Notice that the array is stored in xf1 and mutated in-place. As a result, second run of foldl contains the results from the first run:

julia> foldl(right, xf1, 10:11)
5-element Vector{Any}:
  1
  2
  3
 10
 11

This may not be desired. To avoid this behavior, create an OnInit object which takes a factory function to create a new initial value.

julia> xf2 = Scan(push!, OnInit(() -> []))
Scan(push!, OnInit(##9#10()))

julia> foldl(right, xf2, 1:3)
3-element Vector{Any}:
 1
 2
 3

julia> foldl(right, xf2, [10.0, 11.0])
2-element Vector{Any}:
 10.0
 11.0

Keyword argument init for transducible processes also accept an OnInit:

julia> foldl(push!, Map(identity), "abc"; init=OnInit(() -> []))
3-element Vector{Any}:
 'a': ASCII/Unicode U+0061 (category Ll: Letter, lowercase)
 'b': ASCII/Unicode U+0062 (category Ll: Letter, lowercase)
 'c': ASCII/Unicode U+0063 (category Ll: Letter, lowercase)

To create a copy of a mutable object, CopyInit is easier to use.

However, more powerful and generic pattern is to use push!! from BangBang.jl and initialize init with Union{}[] so that it automatically finds the minimal element type.

julia> using BangBang

julia> foldl(push!!, Map(identity), "abc"; init=Union{}[])
3-element Vector{Char}:
 'a': ASCII/Unicode U+0061 (category Ll: Letter, lowercase)
 'b': ASCII/Unicode U+0062 (category Ll: Letter, lowercase)
 'c': ASCII/Unicode U+0063 (category Ll: Letter, lowercase)
source
Transducers.CopyInitType
CopyInit(value)

This is equivalent to OnInit(() -> deepcopy(value)).

Transducers.jl 0.3

New in version 0.3.

Examples

julia> using Transducers

julia> init = CopyInit([]);

julia> foldl(push!, Map(identity), 1:3; init=init)
3-element Vector{Any}:
 1
 2
 3

julia> foldl(push!, Map(identity), 1:3; init=init)  # `init` can be reused
3-element Vector{Any}:
 1
 2
 3
source
Transducers.rightFunction
right([l, ]r) -> r

It is simply defined as

right(l, r) = r
right(r) = r

This function is meant to be used as step argument for foldl etc. for extracting the last output of the transducers.

Transducers.jl 0.3

Initial value must be manually specified. In 0.2, it was automatically set to nothing.

Examples

julia> using Transducers

julia> foldl(right, Take(5), 1:10)
5

julia> foldl(right, Drop(5), 1:3; init=0)  # using `init` as the default value
0
source
Transducers.setinputFunction
setinput(ed::Eduction, coll)

Set input collection of eduction ed to coll.

Transducers.jl 0.3

Previously, setinput combined with eduction was a recommended way to use transducers in a type stable manner. As of v0.3, all the foldl-like functions and eduction are type stable for many cases. This workaround is no more necessary.

Examples

julia> using Transducers

julia> ed = eduction(Map(x -> 2x), Float64[]);

julia> xs = ones(2, 3);

julia> foldl(+, setinput(ed, xs))
12.0
source
Transducers.AdHocFoldableType
AdHocFoldable(foldl, [collection = nothing])

Provide a different way to fold collection without creating a wrapper type.

Arguments

  • foldl::Function: a function that implements __foldl__.
  • collection: a collection passed to the last argument of foldl.

Examples

julia> using Transducers
       using Transducers: @next, complete
       using ArgCheck

julia> function uppertriangle(A::AbstractMatrix)
           @argcheck !Base.has_offset_axes(A)
           return AdHocFoldable(A) do rf, acc, A
               for j in 1:size(A, 2), i in 1:min(j, size(A, 1))
                   acc = @next(rf, acc, @inbounds A[i, j])
               end
               return complete(rf, acc)
           end
       end;

julia> A = reshape(1:6, (3, 2))
3×2 reshape(::UnitRange{Int64}, 3, 2) with eltype Int64:
 1  4
 2  5
 3  6

julia> collect(Map(identity), uppertriangle(A))
3-element Vector{Int64}:
 1
 4
 5

julia> function circularwindows(xs::AbstractVector, h::Integer)
           @argcheck !Base.has_offset_axes(xs)
           @argcheck h >= 0
           @argcheck 2 * h + 1 <= length(xs)
           return AdHocFoldable(xs) do rf, acc, xs
               buffer = similar(xs, 2 * h + 1)
               @inbounds for i in 1:h
                   buffer[1:h - i + 1] .= @view xs[end - h + i:end]
                   buffer[h - i + 2:end] .= @view xs[1:h + i]
                   acc = @next(rf, acc, buffer)
               end
               for i in h + 1:length(xs) - h
                   acc = @next(rf, acc, @inbounds @view xs[i - h:i + h])
               end
               @inbounds for i in 1:h
                   buffer[1:end - i] .= @view xs[end - 2 * h + i:end]
                   buffer[end - i + 1:end] .= @view xs[1:i]
                   acc = @next(rf, acc, buffer)
               end
               return complete(rf, acc)
           end
       end;

julia> collect(Map(collect), circularwindows(1:9, 2))
9-element Vector{Vector{Int64}}:
 [8, 9, 1, 2, 3]
 [9, 1, 2, 3, 4]
 [1, 2, 3, 4, 5]
 [2, 3, 4, 5, 6]
 [3, 4, 5, 6, 7]
 [4, 5, 6, 7, 8]
 [5, 6, 7, 8, 9]
 [6, 7, 8, 9, 1]
 [7, 8, 9, 1, 2]

julia> expressions(str::AbstractString; kwargs...) =
           AdHocFoldable(str) do rf, val, str
               pos = 1
               while true
                   expr, pos = Meta.parse(str, pos;
                                          raise = false,
                                          depwarn = false,
                                          kwargs...)
                   expr === nothing && break
                   val = @next(rf, val, expr)
               end
               return complete(rf, val)
           end;

julia> collect(Map(identity), expressions("""
       x = 1
       y = 2
       """))
2-element Vector{Expr}:
 :(x = 1)
 :(y = 2)

julia> counting = AdHocFoldable() do rf, acc, _
           i = 0
           while true
               i += 1
               acc = @next(rf, acc, i)
           end
       end;

julia> foreach(counting) do i
           @show i;
           i == 3 && return reduced()
       end;
i = 1
i = 2
i = 3
source
Transducers.withprogressFunction
withprogress(foldable) -> foldable′

Wrap a foldable so that progress is shown in logging-based progress meter (e.g., Juno) during foldl, foldxt, foldxd, etc.

For parallel reduction such as foldxt and foldxd, reasonably small basesize and threads_basesize (for foldxd) must be used to ensure that progress information is updated frequently. However, it may slow down the computation if basesize is too small.

Keyword Arguments

  • interval::Real: Minimum interval (in seconds) for how often progress is logged.

Examples

julia> using Transducers

julia> xf = Map() do x
           sleep(0.01)
           x
       end;

julia> foldl(+, xf, withprogress(1:100; interval=1e-3))  # see progress meter
5050

In foldl and foldxt, withprogress can be nested. This is not supported in foldxd.

julia> xf = opcompose(
           MapCat() do x
               withprogress(1:x; interval=1e-3)  # nested progress
           end,
           Map() do x
               sleep(0.5)
               x
           end,
       );

julia> if VERSION >= v"1.3-alpha"
           # Calling `sleep` in thread is safe in Julia 1.3:
           foldxt(+, xf, withprogress(1:10; interval=1e-3); basesize=1)
       else
           foldl(+, xf, withprogress(1:10; interval=1e-3))
       end
220
source

Deprecated

Base.reduceFunction
reduce(rf, [xf,] itr)

Multi-threaded reduce.

Warning

reduce is deprecated. Use foldxt instead.

source
Base.mapfoldlFunction
mapfoldl(xf::Transducer, step, reducible; init, simd)
Warning

mapfoldl(::Transducer, rf, itr) is deprecated. Use foldl(rf, ::Transducer, itr) if you do not need to call single-argument rf on complete. Use foldl(whencomplete(rf, rf), ::Transducer, itr) to call the single-argument method of rf on complete.

Like foldl but step is not automatically wrapped by Completing.

Examples

julia> using Transducers

julia> function step_demo(state, input)
           @show state, input
           state + input
       end;

julia> function step_demo(state)
           println("Finishing with state = ", state)
           state
       end;

julia> mapfoldl(Filter(isodd), step_demo, 1:4, init=0.0)
(state, input) = (0.0, 1)
(state, input) = (1.0, 3)
Finishing with state = 4.0
4.0
source
Base.mapreduceFunction
mapreduce(xf, step, reducible; init, simd)
Warning

mapreduce(::Transducer, rf, itr) is deprecated. Use foldxt(rf, ::Transducer, itr) if you do not need to call single-argument rf on complete. Use foldxt(whencomplete(rf, rf), ::Transducer, itr) to call the single-argument method of rf on complete.

Like foldxt but step is not automatically wrapped by Completing.

source
Transducers.KeepFunction
Keep(f)

Pass non-nothing output of f to the inner reducing step.

Warning

Keep(f) is a deprecated. Use ... |> Map(f) |> NotA(Nothing). If f does not return a Some, KeepSomething can also be used.

Examples

julia> using Transducers

julia> xf = Keep() do x
           if x < 3
               x + 1
           end
       end;

julia> collect(xf, 1:5)
2-element Vector{Int64}:
 2
 3
source