Transducers and Transducible processes
BangBang.append!!
Base.:∘
Base.adjoint
Base.append!
Base.collect
Base.copy
Base.copy!
Base.foldl
Base.foreach
Base.map!
Base.mapfoldl
Base.mapreduce
Base.reduce
Transducers.Keep
Transducers.Zip
Transducers.append_unordered!
Transducers.channel_unordered
Transducers.dcollect
Transducers.dcopy
Transducers.dreduce
Transducers.dtransduce
Transducers.eduction
Transducers.foldxd
Transducers.foldxl
Transducers.foldxt
Transducers.ifunreduced
Transducers.reduced
Transducers.reducingfunction
Transducers.right
Transducers.setinput
Transducers.tcollect
Transducers.tcopy
Transducers.transduce
Transducers.unreduced
Transducers.whencombine
Transducers.whencomplete
Transducers.wheninit
Transducers.whenstart
Transducers.withprogress
Base.Channel
Transducers.AdHocFoldable
Transducers.Broadcasting
Transducers.Cat
Transducers.Completing
Transducers.Consecutive
Transducers.CopyInit
Transducers.Count
Transducers.Dedupe
Transducers.DistributedEx
Transducers.Drop
Transducers.DropLast
Transducers.DropWhile
Transducers.Enumerate
Transducers.Filter
Transducers.FlagFirst
Transducers.GetIndex
Transducers.GroupBy
Transducers.Inject
Transducers.Interpose
Transducers.Iterated
Transducers.KeepSomething
Transducers.Map
Transducers.MapCat
Transducers.MapSplat
Transducers.NondeterministicThreading
Transducers.NotA
Transducers.OfType
Transducers.OnInit
Transducers.Partition
Transducers.PartitionBy
Transducers.PreferParallel
Transducers.ProductRF
Transducers.ReduceIf
Transducers.ReducePartitionBy
Transducers.Reduced
Transducers.Replace
Transducers.Scan
Transducers.ScanEmit
Transducers.SequentialEx
Transducers.SetIndex
Transducers.SplitBy
Transducers.TCat
Transducers.Take
Transducers.TakeLast
Transducers.TakeNth
Transducers.TakeWhile
Transducers.TeeRF
Transducers.ThreadedEx
Transducers.Transducer
Transducers.Transducer
Transducers.Transducer
Transducers.Unique
Transducers.ZipSource
Transducible processes
Transducers.foldxl
— Functionfoldxl(step, xf::Transducer, reducible; init, simd) :: T
foldxl(step, reducible; init, simd) :: T
foldl(step, xf::Transducer, reducible; init, simd) :: T
foldl(step, ed::Eduction; init, simd) :: T
transduce(xf, step, init, reducible, [executor]; simd) :: Union{T, Reduced{T}}
eXtended left fold. Compose transducer xf
with reducing step function step
and reduce itr
using it.
transduce
differs from foldxl
as Reduced{T}
is returned if the transducer xf
or step
aborts the reduction and step
is not automatically wrapped by Completing
.
This API is modeled after transduce
in Clojure.
For parallel versions, see foldxt
and foldxd
.
See also: Empty result handling.
Arguments
xf::Transducer
: A transducer.step
: A callable which accepts 1 and 2 arguments. If it only accepts 2 arguments, wrap it withCompleting
to "add" 1-argument form (i.e.,complete
protocol).reducible
: A reducible object (array, dictionary, any iterator, etc.).executor
: Specify an executor. SeeSequentialEx
.init
: An initial value fed to the first argument to reducing step functionstep
. This argument can be omitted for well know binary operations like+
or*
. Supported binary operations are listed in InitialValues.jl documentation. WhenInit
(not the result ofInit
, such asInit(*)
) is given, it is automatically "instantiated" asInit(step)
(wherestep
is appropriately unwrapped ifstep
is aCompleting
). See Empty result handling in the manual for more information.simd
: Iftrue
or:ivdep
, enable SIMD usingBase.@simd
. If:ivdep
, use@simd ivdep for ... end
variant. Read Julia manual ofBase.@simd
to understand when it is appropriate to use this option. For example,simd = :ivdep
must not be used with stateful transducer likeScan
. Iffalse
(default),Base.@simd
is not used.
Examples
julia> using Transducers
julia> foldl(Filter(isodd), 1:4, init=0.0) do state, input
@show state, input
state + input
end
(state, input) = (0.0, 1)
(state, input) = (1.0, 3)
4.0
julia> foldxl(+, 1:5 |> Filter(isodd))
9
julia> 1:5 |> Filter(isodd) |> foldxl(+)
9
Since TeeRF
requires the extended fold protocol, foldl(TeeRF(min, max), [5, 2, 6, 8, 3])
does not work while it works with foldxl
:
julia> foldxl(TeeRF(min, max), [5, 2, 6, 8, 3])
(2, 8)
The unary method of foldlx
is useful when combined with |>
:
julia> (1:5, 4:-1:1) |> Cat() |> Filter(isodd) |> Enumerate() |> foldxl() do a, b
a[2] < b[2] ? b : a
end
(3, 5)
Transducers.transduce
— Functiontransduce(xf, step, init, reducible) :: Union{T, Reduced{T}}
See foldxl
.
Base.foldl
— Functionfoldl(step, xf::Transducer, reducible; init, simd) :: T
foldl(step, ed::Eduction; init, simd) :: T
See foldxl
.
Base.foreach
— Functionforeach(eff, xf::Transducer, reducible; simd)
foreach(eff, ed::Eduction; simd)
Feed the results of xf
processing items in reducible
into a unary function eff
. This is useful when the primary computation at the bottom is the side-effect. It is also equivalent to foreach(eff, eduction(xf, coll))
. Note that
foreach(eduction(xf, coll)) do x
...
end
can be more efficient than
for x in eduction(xf, coll)
...
end
as the former does not have to translate the transducer protocol to the iterator protocol.
foreach
supports all constructs in the native for
loop as well as the enhancements [julia_issue_22891] to break
with a value (break D(x)
below) and append the else
clause (E(x)
below).
This native for
loop
ans = for x in xs
A(x)
B(x) && break
C(x) && break D(x)
else
E(x)
end
can be written as
ans = foreach(Map(identity), xs) do x
A(x)
B(x) && return reduced()
C(x) && return reduced(D(x))
x # required for passing `x` to `E(x)` below
end |> ifunreduced() do x
E(x)
end
See: mapfoldl
, reduced
, ifunreduced
.
foreach
is changed to return what the do
block (eff
function) returns as-is in version 0.3. This was required for supporting "for-else" (|> ifunreduced
). Previously, it only supported break-with-value and always applied unreduced
before it returns.
Examples
julia> using Transducers
julia> foreach(eduction(Filter(isodd), 1:4)) do input
@show input
end
input = 1
input = 3
3
julia> foreach(Filter(!ismissing), [1, missing, 2, 3]) do input
@show input
if iseven(input)
return reduced()
end
end
input = 1
input = 2
Reduced(nothing)
It is often useful to append |> unreduced
to unwrap Reduced
in the final result (note that |>
here is the standard function application, not the transducer composition).
julia> foreach(Filter(!ismissing), [1, missing, 2, 3]) do input
reduced("got $input")
end |> unreduced
"got 1"
Combination of break-with-value and for-else is useful for triggering action after (e.g.) some kind of membership testing failed:
julia> has2(xs) = foreach(Filter(!ismissing), xs) do input
input == 2 && reduced(true)
end |> ifunreduced() do input
@show input
false
end;
julia> has2([1, missing, 2, 3])
true
julia> has2([1, missing])
input = false
false
However, note the output input = false
in the last example. This is because how &&
works in Julia
julia> false && "otherwise"
false
Thus, pure membership testing functions like has2
above can be written in a more concise manner:
julia> simpler_has2(xs) = foreach(Filter(!ismissing), xs) do input
input == 2 && reduced(true)
end |> unreduced;
julia> simpler_has2([1, missing, 2, 3])
true
julia> simpler_has2([1, missing])
false
Transducers.foldxt
— Functionfoldxt(step, xf, reducible; [init, simd, basesize, stoppable, nestlevel]) :: T
eXtended threaded fold (reduce). This is a multi-threaded reduce
based on extended fold protocol defined in Transducers.jl.
The "bottom" reduction function step(::T, ::T) :: T
must be associative and init
must be its identity element.
Transducers composing xf
must be stateless (e.g., Map
, Filter
, Cat
, etc.) except for ScanEmit
. Note that Scan
is not supported (although possible in theory). Early termination requires Julia ≥ 1.3.
Use tcollect
or tcopy
to collect results into a container.
See also: Parallel processing tutorial, foldxl
, foldxd
.
Keyword Arguments
basesize::Integer = amount(reducible) ÷ nthreads()
: A size of chunk inreducible
that is processed by each worker. A smaller size may be required when:stoppable::Bool
: [This option usually does not have to be set manually.] The threaded fold executed in the "stoppable" mode used for optimizing reduction withreduced
has a slight overhead ifreduced
is not used. This mode can be disabled by passingstoppable = false
. It is usually automatically detected and set appropriately. Note that this option is purely for optimization and does not affect the result value.nestlevel::Union{Integer,Val}
: Specify how many innerCat
(flatten) transducers to be multi-threaded (usingTCat
). It must be a positive integer,Val
of positive integer, orVal(:inf)
.Val(:inf)
means to use multi-threading for allCat
transducers. Note thatCat
transducer should be statically known. That is to say, the fold implementation sees twoCat
s in... |> Map(f) |> Cat() |> Cat()
but only oneCat
in... |> Map(x -> f(x) |> Cat()) |> Cat()
even though they are semantically identical.For other keyword arguments, see
foldl
.
Keyword option stoppable
requires at least Transducers.jl 0.4.23.
Examples
julia> using Transducers
julia> foldxt(+, 1:3 |> Map(exp) |> Map(log))
6.0
julia> using BangBang: append!!
julia> foldxt(append!!, Map(x -> 1:x), 1:2; basesize=1, init=Union{}[])
3-element Vector{Int64}:
1
1
2
julia> 1:5 |> Filter(isodd) |> foldxt(+)
9
julia> foldxt(TeeRF(min, max), [5, 2, 6, 8, 3])
(2, 8)
Transducers.foldxd
— Functionfoldxd(step, xform::Transducer, array; [init, simd, basesize, threads_basesize, pool])
eXtended distributed fold (reduce). This is a distributed reduce
based on extended fold protocol defined in Transducers.jl.
Unlike foldxt
, early termination by reduced
is not supported yet.
Use dcollect
or dcopy
to collect results into a container.
See also: Parallel processing tutorial, foldxl
, foldxt
.
New in version 0.4.3.
Keyword Arguments
pool::AbstractWorkerPool
: Passed toDistributed.remotecall
.basesize::Integer = amount(array) ÷ nworkers()
: A size of chunk inarray
that is processed by each worker. A smaller size may be required when computation time for processing each item can fluctuate a lot.threads_basesize::Integer = basesize ÷ nthreads()
: A size of chunk inarray
that is processed by each task in each worker process. The default setting assumes that the number of threads used in all workers are the same. For heterogeneous setup where each worker process has different number of threads, it may be required to use smallerthreads_basesize
andbasesize
to get a good performance.For other keyword arguments, see
foldl
.
Examples
julia> using Transducers
julia> foldxd(+, 1:3 |> Map(exp) |> Map(log))
6.0
julia> 1:5 |> Filter(isodd) |> foldxd(+)
9
julia> foldxd(TeeRF(min, max), [5, 2, 6, 8, 3])
(2, 8)
Transducers.dtransduce
— Functiondtransduce(xform::Transducer, step, init, array; [simd, basesize, threads_basesize, pool])
Transducers.eduction
— Functioneduction(xf::Transducer, coll)
xf(coll)
coll |> xf
Create a iterable and reducible object.
This API is modeled after eduction
in Clojure.
Even though eduction
returns an iterable, it is highly recommended to use the foldl
-based method provided by Transducers.jl when the performance is important.
Examples
julia> using Transducers
julia> for x in 1:1000 |> Filter(isodd) |> Take(3) # slow
@show x
end
x = 1
x = 3
x = 5
julia> foreach(1:1000 |> Filter(isodd) |> Take(3)) do x # better
@show x
end;
x = 1
x = 3
x = 5
eduction(iterator::Iterators.Generator)
eduction(iterator::Iterators.Filter)
eduction(iterator::Iterators.Flatten)
Convert an iterator
to an eduction. The iterators that are typically used in the generator comprehensions are supported.
New in version 0.3.
Examples
julia> using Transducers
julia> iter = (y for x in 1:10 if x % 2 == 0 for y in (x, x + 1));
julia> ed = eduction(iter);
julia> collect(iter) == collect(ed)
true
Base.map!
— Functionmap!(xf::Transducer, dest, src; simd)
Feed src
to transducer xf
, storing the result in dest
. Collections dest
and src
must have the same shape. Transducer xf
may contain filtering transducers. If some entries src
are skipped, the corresponding entries in dest
will be unchanged. Transducer xf
must not contain any expansive transducers such as MapCat
.
See also copy!
.
Examples
julia> using Transducers
julia> xs = collect(1:5)
ys = zero(xs)
map!(Filter(isodd), ys, xs)
5-element Vector{Int64}:
1
0
3
0
5
julia> ans === ys
true
Base.copy!
— Functioncopy!(xf::Transducer, dest, src)
Feed src
to transducer xf
, storing the result in dest
. Collections dest
and src
may have the same shape. Source src
must be iterable. Destination dest
must implement empty!
and push!
.
See also map!
.
Examples
julia> using Transducers
julia> copy!(opcompose(PartitionBy(x -> x ÷ 3), Map(sum)), Int[], 1:10)
4-element Vector{Int64}:
3
12
21
19
Base.copy
— Functioncopy(xf::Transducer, T, foldable) :: Union{T, Empty{T}}
copy(xf::Transducer, foldable::T) :: Union{T, Empty{T}}
copy([T,] eduction::Eduction) :: Union{T, Empty{T}}
Process foldable
with a transducer xf
and then create a container of type T
filled with the result. Return BangBang.Empty{T}
if the transducer does not produce anything. (This is because there is no consistent interface to create an empty container given its type and not all containers support creating an empty container.)
For parallel versions, see tcopy
and dcopy
.
New in version 0.4.4.
copy
now accepts eductions.
Examples
julia> using Transducers
using BangBang: Empty
julia> copy(Map(x -> x => x^2), Dict, 2:2)
Dict{Int64, Int64} with 1 entry:
2 => 4
julia> @assert copy(Filter(_ -> false), Set, 1:1) === Empty(Set)
julia> using TypedTables
julia> @assert copy(Map(x -> (a=x, b=x^2)), Table, 1:1) == Table(a=[1], b=[1])
julia> using StructArrays
julia> @assert copy(Map(x -> (a=x, b=x^2)), StructVector, 1:1) == StructVector(a=[1], b=[1])
julia> using DataFrames
julia> @assert copy(
Map(x -> (A = x.a + 1, B = x.b + 1)),
DataFrame(a = [1], b = [2]),
) == DataFrame(A = [2], B = [3])
Transducers.tcopy
— Functiontcopy(xf::Transducer, T, reducible; basesize) :: Union{T, Empty{T}}
tcopy(xf::Transducer, reducible::T; basesize) :: Union{T, Empty{T}}
tcopy([T,] itr; basesize) :: Union{T, Empty{T}}
Thread-based parallel version of copy
. Keyword arguments are passed to foldxt
.
See also: Parallel processing tutorial (especially Example: parallel collect
).
New in version 0.4.5.
tcopy
now accepts iterator comprehensions and eductions.
Examples
julia> using Transducers
julia> tcopy(Map(x -> x => x^2), Dict, 2:2)
Dict{Int64, Int64} with 1 entry:
2 => 4
julia> using TypedTables
julia> @assert tcopy(Map(x -> (a=x,)), Table, 1:1) == Table(a=[1])
julia> using StructArrays
julia> @assert tcopy(Map(x -> (a=x,)), StructVector, 1:1) == StructVector(a=[1])
tcopy
works with iterator comprehensions and eductions (unlike copy
, there is no need for manual conversion with eduction
):
julia> table = StructVector(a = [1, 2, 3], b = [5, 6, 7]);
julia> @assert tcopy(
(A = row.a + 1, B = row.b - 1) for row in table if isodd(row.a)
) == StructVector(A = [2, 4], B = [4, 6])
julia> @assert tcopy(
DataFrame,
(A = row.a + 1, B = row.b - 1) for row in table if isodd(row.a)
) == DataFrame(A = [2, 4], B = [4, 6])
julia> @assert table |>
Filter(row -> isodd(row.a)) |> Map(row -> (A = row.a + 1, B = row.b - 1)) |>
tcopy == StructVector(A = [2, 4], B = [4, 6])
If you have Cat
or MapCat
at the end of the transducer, consider using foldxt
directly:
julia> using Transducers
using DataFrames
julia> @assert tcopy(
DataFrame,
1:2 |> Map(x -> DataFrame(a = [x])) |> MapCat(eachrow);
basesize = 1,
) == DataFrame(a = [1, 2])
julia> using BangBang: Empty, append!!
julia> @assert foldxt(
append!!,
Map(x -> DataFrame(a = [x])),
1:2;
basesize = 1,
# init = Empty(DataFrame),
) == DataFrame(a = [1, 2])
Note that above snippet assumes that it is OK to mutate the dataframe returned by the transducer. Use init = Empty(DataFrame)
if this is not the case.
This approach of using foldxt
works with other containers; e.g., with TypedTables.Table
:
julia> using TypedTables
julia> @assert foldxt(
append!!,
Map(x -> Table(a = [x])),
1:2;
basesize = 1,
# init = Empty(Table),
) == Table(a = [1, 2])
Transducers.dcopy
— Functiondcopy(xf::Transducer, T, reducible; [basesize, threads_basesize]) :: Union{T, Empty{T}}
dcopy(xf::Transducer, reducible::T; [basesize, threads_basesize]) :: Union{T, Empty{T}}
dcopy([T,] itr; [basesize, threads_basesize]) :: Union{T, Empty{T}}
Distributed.jl-based parallel version of copy
. Keyword arguments are passed to foldxd
. For examples, see tcopy
.
See also: Parallel processing tutorial (especially Example: parallel collect
).
New in version 0.4.5.
dcopy
now accepts iterator comprehensions and eductions.
Base.append!
— Functionappend!(xf::Transducer, dest, src) -> dest
This API is modeled after into
in Clojure.
The performance of append!(dest, src::Eduction)
is poor. Use append!!
instead if two-argument form is preferred.
Examples
julia> using Transducers
julia> append!(Drop(2), [-1, -2], 1:5)
5-element Vector{Int64}:
-1
-2
3
4
5
BangBang.append!!
— FunctionBangBang.append!!(xf::Transducer, dest, src) -> dest′
BangBang.append!!(dest, src::Eduction) -> dest′
Mutate-or-widen version of append!
.
New in version 0.4.4.
Performance optimization for append!!(dest, src::Eduction)
requires version 0.4.37.
Examples
julia> using Transducers, BangBang
julia> append!!(opcompose(Drop(2), Map(x -> x + 0.0)), [-1, -2], 1:5)
5-element Vector{Float64}:
-1.0
-2.0
3.0
4.0
5.0
Base.collect
— Functioncollect(xf::Transducer, itr) :: Vector
collect(ed::Eduction) :: Vector
Process an iterable itr
using a transducer xf
and collect the result into a Vector
.
For parallel versions, see tcollect
and dcollect
.
collect
now accepts eductions.
Examples
julia> using Transducers
julia> collect(Interpose(missing), 1:3)
5-element Vector{Union{Missing, Int64}}:
1
missing
2
missing
3
Transducers.tcollect
— Functiontcollect(xf::Transducer, reducible; basesize) :: Union{Vector, Empty{Vector}}
tcollect(itr; basesize) :: Union{Vector, Empty{Vector}}
Thread-based parallel version of collect
. This is just a short-hand notation of tcopy(xf, Vector, reducible)
. Use tcopy
to get a container other than a Vector
.
See also: Parallel processing tutorial (especially Example: parallel collect
).
New in version 0.4.5.
tcollect
now accepts iterator comprehensions and eductions.
Examples
julia> using Transducers
julia> tcollect(Map(x -> x^2), 1:2)
2-element Vector{Int64}:
1
4
julia> tcollect(x^2 for x in 1:2)
2-element Vector{Int64}:
1
4
Transducers.dcollect
— Functiondcollect(xf::Transducer, reducible; [basesize, threads_basesize]) :: Union{Vector, Empty{Vector}}
dcollect(itr; [basesize, threads_basesize]) :: Union{Vector, Empty{Vector}}
Distributed.jl-based parallel version of collect
. This is just a short-hand notation of dcopy(xf, Vector, reducible)
. Use dcopy
to get a container other than a Vector
.
See also: Parallel processing tutorial (especially Example: parallel collect
).
New in version 0.4.5.
dcollect
now accepts iterator comprehensions and eductions.
Base.Channel
— TypeChannel(xf::Transducer, itr; kwargs...)
Channel(ed::Eduction; kwargs...)
Pipe items from an iterable itr
processed by the transducer xf
through a channel. Channel(xf, itr)
and Channel(eduction(xf, itr))
are equivalent. Note that itr
itself can be a Channel
.
Keyword arguments are passed to Channel(function; kwargs...)
.
Examples
julia> using Transducers
julia> ch1 = Channel(Filter(isodd), 1:5);
julia> ch2 = Channel(Map(x -> 2x - 1), ch1);
julia> ed = eduction(Map(x -> 1:x), ch2);
julia> ch3 = Channel(Cat(), ed);
julia> foreach(PartitionBy(isequal(1)), ch3) do input
@show input
end;
input = [1, 1]
input = [2, 3, 4, 5]
input = [1]
input = [2, 3, 4, 5, 6, 7, 8, 9]
Experimental transducible processes
Transducers.channel_unordered
— Functionchannel_unordered(xf, input; eltype, size, ntasks, basesize) :: Channel{eltype}
channel_unordered(itr; eltype, size, ntasks, basesize) :: Channel{eltype}
Provide elements in input
processed by a transducer xf
through a Channel
.
Unary method channel_unordered(itr)
produces a Channel
that provides elements in the input iterator itr
with possibly different order. Iterator comprehensions and eduction
s can be passed as the input itr
.
Use append_unordered!
to send outputs to an existing channel.
New in version 0.4.8.
Unary method channel_unordered(itr)
requires Transducers.jl 0.4.9.
This API is experimental. Backward incompatible change, including the removal of this API, is more likely to occur than other parts of this package.
Keyword Arguments
eltype::Type
: element type of returned channelsize
: The size ofChannel
. A non-negativeInt
orInf
.ntasks::Int
: Number of concurrent tasks. Default toThreads.nthreads()
.basesize
: The "batch size" of the processing; i.e., the number of elements to be processed in a single task. Default to 1.
Examples
julia> using Transducers: Map, channel_unordered
julia> sort!(collect(channel_unordered(Map(x -> x + 1), 1:3)))
3-element Vector{Any}:
2
3
4
julia> sort!(collect(channel_unordered(x + 1 for x in 1:3 if isodd(x))))
2-element Vector{Any}:
2
4
Transducers.append_unordered!
— Functionappend_unordered!(output, xf, input; ntasks, basesize)
append_unordered!(output, itr; ntasks, basesize)
Process input
elements through a transducer xf
and then push!
them into output
in undefined order.
Binary method append_unordered!(output, itr)
is like append!(output, itr)
but without order guarantee. Iterator comprehensions and eduction
s can be passed as the input itr
.
output
(typically a Channel
) must implement thread-safe push!(output, x)
method.
See also channel_unordered
.
New in version 0.4.8.
Binary method append_unordered!(output, itr)
requires Transducers.jl 0.4.9.
This API is experimental. Backward incompatible change, including the removal of this API, is more likely to occur than other parts of this package.
Examples
julia> using Transducers: Map, append_unordered!
julia> input = Channel(Map(identity), 1:3);
julia> output = Channel{Int}(0);
julia> task = @async try
append_unordered!(output, Map(x -> x + 1), input)
finally
close(output)
end;
julia> sort!(collect(output))
3-element Vector{Int64}:
2
3
4
julia> input = Channel(Map(identity), 1:3);
julia> output = Channel{Int}(0);
julia> task = @async try
append_unordered!(output, (y for x in input if isodd(x) for y in 1:x))
finally
close(output)
end;
julia> sort!(collect(output))
4-element Vector{Int64}:
1
1
2
3
Transducers
Transducers.Transducer
— TypeTransducer
The abstract type for transducers.
A transducer xf
can be used as both iterator transformation xf(itr)
and reducing function transformation xf'(rf)
.
See also adjoint
for xf'(rf)
.
The call overload xf(rf)
requires Transducers.jl 0.4.39 or later.
The call overload xf(rf)
requires Julia 1.3 or later. For older Julia versions, use eduction
.
Examples
julia> using Transducers
julia> xs = Map(inv)(2:2:4)
2-element StepRange{Int64, Int64} |>
Map(inv)
julia> collect(xs)
2-element Vector{Float64}:
0.5
0.25
julia> rf = Map(inv)'(+)
Reduction(
Map(inv),
BottomRF(
+))
julia> rf(1, 4) # +(1, inv(4))
1.25
Base.:∘
— Functionf ⨟ g
g ∘ f
opcompose(f, g)
compose(g, f)
Composition of transducers.
Transducers.jl 0.4.39 or later is required for composing transducers with ∘
and other operators and functions derived from it.
Transducers written as f |> g |> h
in previous versions of Transducers.jl can now be written as f ⨟ g ⨟ h
(in Julia 1.5 or later) or opcompose(f, g, h)
.
"op" in opcompose
does not stand for operator; it stands for opposite.
Base.adjoint
— Functionxf'
xf'(rf₁)
is a shortcut for calling reducingfunction(xf, rf₁)
.
More precisely, adjoint xf′
of a transducer xf
is a reducing function transform rf₁ -> rf₂
. That is to say, xf'
a function that maps a reducing function rf₁
to another reducing function rf₂
.
Examples
julia> using Transducers
julia> y = Map(inv)'(+)(10, 2)
10.5
julia> y == +(10, inv(2))
true
Transducers.Broadcasting
— TypeBroadcasting()
Broadcast inner reducing function over elements in the input. Roughly speaking, it transforms the inner reducing function op
to op′(a, b) = op.(a, b)
. However, it has a better memory usage and better initial value handling.
If the input is an array, the array created at the first iteration is reused if it can hold the element types of subsequent iterations. Otherwise, the array type is widen as needed.
If init
passed to the fold function is a lazy "initializer" object such as OnInit
, it is initialized independently for each item in the first input array. This makes using Broadcasting
for (possibly) in-place functions safe.
New in version 0.4.32.
Broadcasting
transducer is not supported in Julia 1.0.
Examples
julia> using Transducers
julia> foldl(+, Broadcasting(), [[1, 2], [3, 4], 5])
2-element Vector{Int64}:
9
11
julia> foldl(+, Broadcasting(), [(0,), [1], [2.0], [3 + 0im]])
1-element Vector{ComplexF64}:
6.0 + 0.0im
julia> foldl(
*,
[[[1], [10, 100]], [[2], [20, 200]], [[3], [30, 300]]] |>
Broadcasting() |> Broadcasting(),
)
2-element Vector{Vector{Int64}}:
[6]
[6000, 6000000]
When processing nested data structure (e.g., array-of-arrays) and mutating accumulator in-place, be careful with sharing the accumulators with each processing of items in the input. For example, this is a bad example:
julia> add!(a, b) = a .+= b;
julia> foldl(add!, Broadcasting(), [[[1], [2, 3]], [[4, 5], 6]];
init = Ref([0, 0]))
2-element Vector{Vector{Int64}}:
[13, 15]
[13, 15]
julia> ans[1] === ans[2] # they are the same object
true
Use OnInit
to initialize a new array with each item in the input:
julia> foldl(add!, Broadcasting(), [[[1], [2, 3]], [[4, 5], 6]];
init = OnInit(() -> [0, 0]))
2-element Vector{Vector{Int64}}:
[5, 6]
[8, 9]
julia> ans == [
add!(add!([0, 0], [1]), [4, 5]),
add!(add!([0, 0], [2, 3]), 6),
]
true
Transducers.Cat
— TypeCat()
Concatenate/flatten nested iterators.
This API is modeled after cat
in Clojure.
Examples
julia> using Transducers
julia> collect(Cat(), [[1, 2], [3], [4, 5]]) == 1:5
true
Transducers.Consecutive
— TypeConsecutive(size, step = size)
Consecutive(size; step = size)
Sliding window of width size
and interval step
. Yield tuples.
This transducer is like Partition
but feeds tuples to the downstream transducers instead of vectors.
If step == 1
, this transducer supports parallel reduction for any collections; i.e., Consecutive(size, 1)'(op)
is associative if op
is associative.
Currently, in parallel folds, Consecutive(size, 1)
cannot be used with reducing functions that can produce a Reduced
.
If step > 1
, this transducer can, in principle, support parallel reduction if the input colection allows random access (e.g., arrays). However, this feature is not implemented yet.
Examples
julia> using Transducers
julia> 1:8 |> Consecutive(3) |> collect
2-element Vector{Tuple{Int64, Int64, Int64}}:
(1, 2, 3)
(4, 5, 6)
julia> 1:8 |> Consecutive(3; step = 1) |> collect
6-element Vector{Tuple{Int64, Int64, Int64}}:
(1, 2, 3)
(2, 3, 4)
(3, 4, 5)
(4, 5, 6)
(5, 6, 7)
(6, 7, 8)
Transducers.Count
— TypeCount([start[, step]])
Generate a sequence start
, start + step
, start + step + step
, and so on.
Note that input is ignored. To use the input in the downstream reduction steps, use Zip
.
start
defaults to 1 and step
defaults to oneunit(start)
.
See also: Iterators.countfrom
. Enumerate
Examples
julia> using Transducers
julia> collect(Zip(Map(identity), Count()), -3:-1)
3-element Vector{Tuple{Int64, Int64}}:
(-3, 1)
(-2, 2)
(-1, 3)
julia> using Dates
julia> 1:3 |> Zip(Map(identity), Count(Day(1))) |> MapSplat(*) |> collect ==
[Day(1), Day(4), Day(9)]
true
Transducers.Dedupe
— TypeDedupe()
Dedupe(eq)
De-duplicate consecutive items. Comparison operator which identifies duplicates can be specified by the eq
parameter, which defaults to ==
(equal).
This API is modeled after dedupe
in Clojure.
Examples
julia> using Transducers
julia> collect(Dedupe(), [1, 1, 2, 1, 3, 3, 2])
5-element Vector{Int64}:
1
2
1
3
2
Transducers.Drop
— TypeDrop(n)
Drop first n
items.
This API is modeled after drop
in Clojure.
Examples
julia> using Transducers
julia> 1:5 |> Drop(3) |> collect
2-element Vector{Int64}:
4
5
Transducers.DropLast
— TypeDropLast(n)
Drop last n
items.
This API is modeled after drop-last
in Clojure.
Examples
julia> using Transducers
julia> 1:5 |> DropLast(2) |> collect
3-element Vector{Int64}:
1
2
3
julia> 1:1 |> DropLast(2) |> collect == []
true
julia> 1:0 |> DropLast(2) |> collect == []
true
Transducers.DropWhile
— TypeDropWhile(pred)
Drop items while pred
returns true
consecutively. It becomes a no-op after pred
returns a false
.
This API is modeled after drop-while
in Clojure.
Examples
julia> using Transducers
julia> collect(DropWhile(x -> x < 3), [1:5; 1:2])
5-element Vector{Int64}:
3
4
5
1
2
Transducers.Enumerate
— TypeEnumerate([start[, step]])
Transducer variant of Base.enumerate
. The start
and step
arguments are optional and have the same meaning as in Count
.
Examples
julia> using Transducers
julia> collect(Enumerate(), ["A", "B", "C"])
3-element Vector{Tuple{Int64, String}}:
(1, "A")
(2, "B")
(3, "C")
julia> start=2; step=3;
julia> collect(Enumerate(start, step), ["A", "B", "C"])
3-element Vector{Tuple{Int64, String}}:
(2, "A")
(5, "B")
(8, "C")
Transducers.Filter
— TypeFilter(pred)
Skip items for which pred
is evaluated to false
.
This API is modeled after filter
in Clojure.
Examples
julia> using Transducers
julia> 1:3 |> Filter(iseven) |> collect
1-element Vector{Int64}:
2
Transducers.FlagFirst
— TypeFlagFirst()
Output (isfirst, input)
where isfirst::Bool
is true
only for the first iteration and input
is the original input.
See also: IterTools.flagfirst
Examples
julia> using Transducers
julia> collect(FlagFirst(), 1:3)
3-element Vector{Tuple{Bool, Int64}}:
(1, 1)
(0, 2)
(0, 3)
Transducers.GroupBy
— TypeGroupBy(key, rf, [init])
GroupBy(key, xf::Transducer, [step = right, [init]])
Group the input stream by a function key
and then fan-out each group of key-value pairs to the reducing function rf
.
For example, if GroupBy
is used as in:
xs |> Map(upstream) |> GroupBy(key, rf, init) |> Map(downstream)
then the "function signatures" would be:
upstream(_) :: V
key(::V) :: K
rf(::Y, ::Pair{K, V}) ::Y
downstream(::Dict{K, Y})
That is to say,
Ouput of the
upstream
is fed into the functionkey
that produces the group key (of typeK
).For each new group key, a new transducible process is started with the initial state
init :: Y
. PassOnInit
orCopyInit
object toinit
for creating a dedicated (possibly mutable) state for each group.After one "nested" reducing function
rf
is called, the intermediate result dictionary (of typeDict{K, Y}
) accumulating the current and all preceding results is then fed into thedownstream
.
See also groupreduce
in SplitApplyCombine.jl.
New in version 0.3.
Examples
julia> using Transducers
using BangBang # for `push!!`
julia> foldl(right, GroupBy(string, Map(last), push!!), [1, 2, 1, 2, 3])
Transducers.GroupByViewDict{String,Vector{Int64},…}(...):
"1" => [1, 1]
"2" => [2, 2]
"3" => [3]
Note that the reduction stops if one of the group returns a reduced
. This can be used, for example, to find if there is a group with a sum grater than 3 and stop the computation as soon as it is find:
julia> result = transduce(
GroupBy(
string,
opcompose(Map(last), Scan(+), ReduceIf(x -> x > 3)),
),
right,
nothing,
[1, 2, 1, 2, 3],
);
julia> result isa Reduced
true
julia> unreduced(result)
Transducers.GroupByViewDict{String,Any,…}(...):
"1" => 2
"2" => 4
Transducers.Interpose
— TypeInterpose(sep)
Interleave input items with a sep
.
This API is modeled after interpose
in Clojure.
Examples
julia> using Transducers
julia> collect(Interpose(missing), 1:3)
5-element Vector{Union{Missing, Int64}}:
1
missing
2
missing
3
Transducers.Iterated
— TypeIterated(f, init)
Generate a sequence init
, f(init)
, f(f(init))
, f(f(f(init)))
, and so on.
Note that input is ignored. To use the input in the downstream reduction steps, use Zip
.
Pass OnInit
or CopyInit
object to init
for creating a dedicated (possibly mutable) state for each fold.
The idea is taken from IterTools.iterated
Examples
julia> using Transducers
julia> collect(Iterated(x -> 2x, 1), 1:5)
5-element Vector{Int64}:
1
2
4
8
16
julia> collect(Zip(Map(identity), Iterated(x -> 2x, 1)), 1:5)
5-element Vector{Tuple{Int64, Int64}}:
(1, 1)
(2, 2)
(3, 4)
(4, 8)
(5, 16)
Transducers.KeepSomething
— TypeKeepSomething(f = identity)
Pass non-nothing
output of f
to the inner reducing step after possibly unwrapping Some
.
This API is modeled after keep
in Clojure.
Examples
julia> using Transducers
julia> xf = KeepSomething() do x
if x == 0
:zero
elseif x == 1
Some(:one)
elseif x == 2
Some(nothing)
end
end;
julia> collect(xf, 0:3)
3-element Vector{Union{Nothing, Symbol}}:
:zero
:one
nothing
Note that NotA(Nothing)
can be used to avoid automatically unwrapping Some
:
julia> [Some(1), 2, nothing] |> KeepSomething() |> collect
2-element Vector{Int64}:
1
2
julia> [Some(1), 2, nothing] |> NotA(Nothing) |> collect
2-element Vector{Any}:
Some(1)
2
Transducers.Map
— TypeMap(f)
Apply unary function f
to each input and pass the result to the inner reducing step.
This API is modeled after map
in Clojure.
Examples
julia> using Transducers
julia> collect(Map(x -> 2x), 1:3)
3-element Vector{Int64}:
2
4
6
Transducers.MapCat
— TypeMapCat(f)
Concatenate output of f
which is expected to return an iterable.
This API is modeled after mapcat
in Clojure.
Examples
julia> using Transducers
julia> collect(MapCat(x -> 1:x), 1:3)
6-element Vector{Int64}:
1
1
2
1
2
3
Transducers.MapSplat
— TypeMapSplat(f)
Like Map(f)
but calls f(input...)
for each input
and then pass the result to the inner reducing step.
Examples
julia> using Transducers
julia> collect(MapSplat(*), zip(1:3, 10:10:30))
3-element Vector{Int64}:
10
40
90
Transducers.NondeterministicThreading
— TypeNondeterministicThreading(; basesize, ntasks = nthreads())
Parallelize inner reducing function using ntasks
.
Given
# ,-- Not parallelized
# ______/__
foldxl(rf, xs |> xfo |> NondeterministicThreading() |> xfi)
# == ===
# Parallelized Parallelized
the inner transducer xfi
and the reducing function rf
are parallelized but the outer transducer xfo
and the iteration over data collection xs
are not parallelized.
The scheduling of the tasks (hence the call tree of the inner reducing function) is non-deterministic. It means that the result is deterministic only if the inner reducing function is exactly associative. If the inner reducing function is only approximately associative (e.g., addition of floating point numbers), the result of reduction is likely different for each time.
The outer transducers and iterate
are processed sequentially. In particular, the data collection does not have to implement SplittablesBase.halve
.
Currently, the default basesize
is 1. However, it may be changed in the future (e.g. it may be automatically tuned at run-time).
NondeterministicThreading
does not work with Julia < 1.3.
Keyword Arguments
basesize::Integer
: The number of input elements to be accumulated in a buffer before sent to a task.ntasks::Integer
: The number of tasks@spawn
ed. The default value isThreads.nthreads()
. A number larger thanThreads.nthreads()
may be useful if the inner reducing function contains I/O and does not consume too much resource (e.g., memory).
Examples
julia> using Transducers
julia> collect(1:4 |> Filter(isodd))
2-element Vector{Int64}:
1
3
julia> collect(1:4 |> NondeterministicThreading() |> Filter(isodd))
2-element Vector{Int64}:
1
3
Transducers.NotA
— TypeNotA(T)
Skip items of type T
. Unlike Filter(!ismissing)
, downstream transducers can have a correct type information for NotA(Missing)
.
See also: OfType
Examples
julia> using Transducers
julia> [1, missing, 2] |> NotA(Missing) |> collect
2-element Vector{Int64}:
1
2
Transducers.OfType
— TypeOfType(T)
Include only items of type T
.
See also: NotA
Examples
julia> using Transducers
julia> [1, missing, 2] |> OfType(Int) |> collect
2-element Vector{Int64}:
1
2
Transducers.Partition
— TypePartition(size, step = size, flush = false)
Partition(size; step = size, flush = false)
Sliding window of width size
and interval step
. Yield vectors.
Note: step
= size
is the default. Hence, the default behavior is non-overlapping windows.
For small size
, see Consecutive
for a similar transducer that yields tuples instead.
The vector passed to the inner reducing function is valid only during its immediate reduction step. It must be reduced immediately or copied.
This API is modeled after partition-all
in Clojure.
Examples
julia> using Transducers
julia> 1:8 |> Partition(3) |> Map(copy) |> collect
2-element Vector{Vector{Int64}}:
[1, 2, 3]
[4, 5, 6]
julia> 1:8 |> Partition(3; flush=true) |> Map(copy) |> collect
3-element Vector{Vector{Int64}}:
[1, 2, 3]
[4, 5, 6]
[7, 8]
julia> 1:8 |> Partition(3; step=1) |> Map(copy) |> collect
6-element Vector{Vector{Int64}}:
[1, 2, 3]
[2, 3, 4]
[3, 4, 5]
[4, 5, 6]
[5, 6, 7]
[6, 7, 8]
Transducers.PartitionBy
— TypePartitionBy(f)
Group input sequence into chunks in which f
returns a same value consecutively.
The vector passed to the inner reducing function is valid only during its immediate reduction step. It must be reduced immediately or copied.
This API is modeled after partition-by
in Clojure.
Examples
julia> using Transducers
julia> 1:9 |> PartitionBy(x -> (x + 1) ÷ 3) |> Map(copy) |> collect
4-element Vector{UnitRange{Int64}}:
1:1
2:4
5:7
8:9
Transducers.ReduceIf
— TypeReduceIf(pred)
Stop fold when pred(x)
returns true
for the output x
of the upstream transducer.
Examples
julia> using Transducers
julia> foldl(right, ReduceIf(x -> x == 3), 1:10)
3
Transducers.ReducePartitionBy
— TypeReducePartitionBy(f, rf, [init])
Reduce partitions determined by isequal
on the output value of f
with an associative reducing function rf
. Partitions are reduced on-the-fly and no intermediate arrays are allocated.
Examples
Consider the input 1:6
"keyed" by a function x -> x ÷ 3
:
julia> map(x -> x ÷ 3, 1:6)
6-element Vector{Int64}:
0
0
1
1
1
2
i.e., there are three partitions with the key values 0
, 1
, and 2
. We can use ReducePartitionBy
to compute, e.g., the length and the sum of each partition by:
julia> using Transducers
julia> 1:6 |> ReducePartitionBy(x -> x ÷ 3, Map(_ -> 1)'(+)) |> collect
3-element Vector{Int64}:
2
3
1
julia> 1:6 |> ReducePartitionBy(x -> x ÷ 3, +) |> collect
3-element Vector{Int64}:
3
12
6
Transducers.Replace
— TypeReplace(assoc)
Replace each input with the value in the associative container assoc
(e.g., a dictionary, array, string) if it matches with a key/index. Otherwise output the input as-is.
This API is modeled after replace
in Clojure.
Examples
julia> using Transducers
julia> collect(Replace(Dict('a' => 'A')), "abc")
3-element Vector{Char}:
'A': ASCII/Unicode U+0041 (category Lu: Letter, uppercase)
'b': ASCII/Unicode U+0062 (category Ll: Letter, lowercase)
'c': ASCII/Unicode U+0063 (category Ll: Letter, lowercase)
julia> collect(Replace([:a, :b, :c]), 0:4)
5-element Vector{Any}:
0
:a
:b
:c
4
julia> collect(Replace("abc"), 0:4)
5-element Vector{Any}:
0
'a': ASCII/Unicode U+0061 (category Ll: Letter, lowercase)
'b': ASCII/Unicode U+0062 (category Ll: Letter, lowercase)
'c': ASCII/Unicode U+0063 (category Ll: Letter, lowercase)
4
Transducers.Scan
— TypeScan(f, [init = Init])
Accumulate input with binary function f
and pass the accumulated result so far to the inner reduction step.
The inner reducing step receives the sequence y₁, y₂, y₃, ..., yₙ, ...
when the sequence x₁, x₂, x₃, ..., xₙ, ...
is fed to Scan(f)
.
y₁ = f(init, x₁)
y₂ = f(y₁, x₂)
y₃ = f(y₂, x₃)
...
yₙ = f(yₙ₋₁, xₙ)
This is a generalized version of the prefix sum also known as cumulative sum, inclusive scan, or scan.
Note that the associativity of f
is not required when the transducer is used in a process that gurantee an order, such as foldl
.
Unless f
is a function with known identity element such as +
, *
, min
, max
, and append!
, the initial state init
must be provided.
Pass OnInit
or CopyInit
object to init
for creating a dedicated (possibly mutable) state for each fold.
Examples
julia> using Transducers
julia> collect(Scan(*), 1:3)
3-element Vector{Int64}:
1
2
6
julia> 1:3 |> Map(x -> x + im) |> Scan(*) |> collect
3-element Vector{Complex{Int64}}:
1 + 1im
1 + 3im
0 + 10im
julia> collect(Scan(*, 10), 1:3)
3-element Vector{Int64}:
10
20
60
Transducers.ScanEmit
— TypeScanEmit(f, init[, onlast])
Accumulate input x
with a function f
with the call signature (u, x) -> (y, u)
and pass the result y
to the inner reduction step.
The inner reducing step receives the sequence y₁, y₂, y₃, ..., yₙ, ...
computed as follows
u₀ = init
y₁, u₁ = f(u₀, x₁)
y₂, u₂ = f(u₁, x₂)
y₃, u₃ = f(u₂, x₃)
...
yₙ, uₙ = f(uₙ₋₁, xₙ)
...
yₒₒ = onlast(uₒₒ)
when the sequence x₁, x₂, x₃, ..., xₙ, ...
is fed to ScanEmit(f)
.
Pass OnInit
or CopyInit
object to init
for creating a dedicated (possibly mutable) state for each fold.
Examples
julia> using Transducers
julia> collect(ScanEmit(tuple, 0), 1:3)
3-element Vector{Int64}:
0
1
2
Transducers.TCat
— TypeTCat(basesize::Integer)
Threaded version of Cat
(concatenate/flatten).
To use this transducer, all the downstream (inner) transducers must be stateless (or of type ScanEmit
) and the reducing function must be associative. See also: Parallel processing tutorial.
Note that the upstream (outer) transducers need not to be stateless as long as it is called with non-parallel reduction such as foldl
and collect
.
Examples
julia> using Transducers
julia> 1:3 |> Map(x -> 1:x) |> TCat(1) |> tcollect
6-element Vector{Int64}:
1
1
2
1
2
3
julia> 1:3 |> Scan(+) |> Map(x -> 1:x) |> TCat(1) |> collect
10-element Vector{Int64}:
1
1
2
3
1
2
3
4
5
6
Transducers.Take
— TypeTake(n)
Take n
items from the input sequence.
This API is modeled after take
in Clojure.
Examples
julia> using Transducers
julia> 1:10 |> Take(2) |> collect
2-element Vector{Int64}:
1
2
julia> 1:2 |> Take(5) |> collect
2-element Vector{Int64}:
1
2
Using a low-level API, it's possible to distinguish if the output is truncated or not:
julia> transduce(Take(3), Completing(push!!), Init, 1:2)
2-element Vector{Int64}:
1
2
julia> transduce(Take(3), Completing(push!!), Init, 1:4)
Reduced([1, 2, 3])
julia> transduce(Take(3), Completing(push!!), Init, 1:0)
InitialValue(push!!)
See also transduce
, Reduced
and Completing
.
Transducers.TakeLast
— TypeTakeLast(n)
Take last n
items from the input sequence.
Examples
julia> using Transducers
julia> 1:10 |> TakeLast(2) |> collect
2-element Vector{Int64}:
9
10
julia> 1:2 |> TakeLast(5) |> collect
2-element Vector{Int64}:
1
2
Transducers.TakeNth
— TypeTakeNth(n)
Output every n
item to the inner reducing step.
This API is modeled after take-nth
in Clojure.
Examples
julia> using Transducers
julia> 1:9 |> TakeNth(3) |> collect
3-element Vector{Int64}:
1
4
7
Transducers.TakeWhile
— TypeTakeWhile(pred)
Take items while pred
returns true
. Abort the reduction when pred
returns false
for the first time.
This API is modeled after take-while
in Clojure.
Examples
julia> using Transducers
julia> [1, 2, 3, 1, 2] |> TakeWhile(x -> x < 3) |> collect
2-element Vector{Int64}:
1
2
Transducers.Unique
— TypeUnique(by = identity)
Pass only unseen item to the inner reducing step.
The item is distinguished by the output of function by
when given.
This API is modeled after distinct
in Clojure.
New in version 0.4.2.
Examples
julia> using Transducers
julia> [1, 1, 2, -1, 3, 3, 2] |> Unique() |> collect
4-element Vector{Int64}:
1
2
-1
3
julia> [1, 1, 2, -1, 3, 3, 2] |> Unique(x -> x^2) |> collect
3-element Vector{Int64}:
1
2
3
Transducers.Zip
— MethodZip(xforms...)
Zip outputs of transducers xforms
in a tuple and pass it to the inner reduction step.
Head transducers drive tail transducers. Be careful when using it with transducers other than Map
, especially the contractive ones like PartitionBy
and the expansive ones like MapCat
.
Examples
julia> using Transducers
julia> collect(Zip(Map(identity), Map(x -> 10x), Map(x -> 100x)), 1:3)
3-element Vector{Tuple{Int64, Int64, Int64}}:
(1, 10, 100)
(2, 20, 200)
(3, 30, 300)
Experimental transducers
Transducers.ZipSource
— TypeZipSource(xform::Transducer)
Branch input into two "flows", inject one into xform
and then merge (zip) the output of xform
with the original (source) input.
This API is experimental. Backward incompatible change, including the removal of this API, is more likely to occur than other parts of this package.
To illustrate how it works, consider the following usage
collection |> xf0 |> ZipSource(xf1) |> xf2
where xf0
, xf1
, and xf2
are some transducers. Schematically, the output yn
from xfn
flows as follows:
xf0 xf1 xf2
---- y0 ------ y1 ---.-- (y0, y1) ----->
| |
`-------------'
Examples
julia> using Transducers
using Transducers: ZipSource
julia> collect(ZipSource(opcompose(Filter(isodd), Map(x -> x + 1))), 1:5)
3-element Vector{Tuple{Int64, Int64}}:
(1, 2)
(3, 4)
(5, 6)
Transducers.GetIndex
— TypeGetIndex(array)
GetIndex{inbounds}(array)
Transform an integer input i
to array[i]
.
This API is experimental. Backward incompatible change, including the removal of this API, is more likely to occur than other parts of this package.
Examples
julia> using Transducers
using Transducers: GetIndex
julia> collect(GetIndex('a':'z'), [2, 3, 4])
3-element Vector{Char}:
'b': ASCII/Unicode U+0062 (category Ll: Letter, lowercase)
'c': ASCII/Unicode U+0063 (category Ll: Letter, lowercase)
'd': ASCII/Unicode U+0064 (category Ll: Letter, lowercase)
julia> collect(GetIndex{true}('a':'z'), [2, 3, 4]) # Inbounds
3-element Vector{Char}:
'b': ASCII/Unicode U+0062 (category Ll: Letter, lowercase)
'c': ASCII/Unicode U+0063 (category Ll: Letter, lowercase)
'd': ASCII/Unicode U+0064 (category Ll: Letter, lowercase)
Transducers.SetIndex
— TypeSetIndex(array)
SetIndex{inbounds}(array)
Perform array[i] = v
for each input pair (i, v)
.
This API is experimental. Backward incompatible change, including the removal of this API, is more likely to occur than other parts of this package.
Examples
julia> using Transducers
using Transducers: SetIndex
julia> ys = zeros(3);
julia> foldl(first ∘ tuple, SetIndex(ys), [(1, 11.1), (3, 33.3)], init=nothing)
julia> ys
3-element Vector{Float64}:
11.1
0.0
33.3
Transducers.Inject
— TypeInject(iterator)
Inject the output from iterator
to the stream processed by the inner reduction step.
This API is experimental. Backward incompatible change, including the removal of this API, is more likely to occur than other parts of this package.
Examples
julia> using Transducers
using Transducers: Inject
julia> collect(Inject(Iterators.cycle("hello")), 1:8)
8-element Vector{Tuple{Int64, Char}}:
(1, 'h')
(2, 'e')
(3, 'l')
(4, 'l')
(5, 'o')
(6, 'h')
(7, 'e')
(8, 'l')
julia> collect(Inject(Iterators.repeated([1 2])), 1:4)
4-element Vector{Tuple{Int64, Matrix{Int64}}}:
(1, [1 2])
(2, [1 2])
(3, [1 2])
(4, [1 2])
julia> collect(Inject(Iterators.product(1:2, 3:5)), 1:100)
6-element Vector{Tuple{Int64, Tuple{Int64, Int64}}}:
(1, (1, 3))
(2, (2, 3))
(3, (1, 4))
(4, (2, 4))
(5, (1, 5))
(6, (2, 5))
Other reducing function combinators
Transducers.TeeRF
— TypeTeeRF(reducing_functions::Tuple)
TeeRF(reducing_functions...)
Combine multiple reducing functions into a new reducing function that "multicast" the input to multiple reducing functions.
Roughly speaking, TeeRF(op₁, op₂, ..., opₙ)
is equivalent to
((a₁, a₂, ..., aₙ), x) -> (op₁(a₁, x), op₂(a₂, x), ..., opₙ(aₙ, x))
For combine
, it behaves like ProductRF
.
New in version 0.4.32.
Examples
julia> using Transducers
julia> extrema′(xs, xf = Map(identity)) = foldl(TeeRF(min, max), xf, xs);
julia> extrema′([5, 2, 6, 8, 3])
(2, 8)
Note that the input is considered empty unless all reducing functions call their bottom reducing functions. Specify init
to obain results even when the input collection is empty or all filtered out.
julia> filtering_max = Filter(isodd)'(max);
julia> foldl(TeeRF(min, filtering_max), Map(identity), [5, 2, 6, 8, 3])
(2, 5)
julia> foldl(TeeRF(min, filtering_max), Map(identity), 2:2:8)
ERROR: EmptyResultError: ...
julia> foldl(TeeRF(min, filtering_max), Map(identity), 2:2:8; init = Init)
(2, InitialValue(max))
Transducers.ProductRF
— TypeProductRF(reducing_functions::Tuple)
ProductRF(reducing_functions...)
Combine N
reducing functions into a new reducing function that work on N
-tuple. The i
-th reducing function recieves the i
-th element of the input tuple.
Roughly speaking, ProductRF(op₁, op₂, ..., opₙ)
is equivalent to
((a₁, a₂, ..., aₙ), (b₁, b₂, ..., bₙ)) -> (op₁(a₁, b₁), op₂(a₂, b₂), ..., opₙ(aₙ, bₙ))
New in version 0.4.32.
Examples
Like TeeRF
, ProductRF
can be used to drive multiple reducing functions. ProductRF
is more "low-level" in the sense that TeeRF
can be defined in terms of ProductRF
(other direction is much harder):
julia> using Transducers
julia> TeeRF′(fs...) = reducingfunction(
Map(x -> ntuple(_ -> x, length(fs))),
ProductRF(fs...),
);
julia> foldl(TeeRF′(min, max), Map(identity), [5, 2, 6, 8, 3])
(2, 8)
ProductRF
may be useful for handling pre-existing stream whose item type is already a tuple:
julia> foldl(ProductRF(&, +), Map(x -> (isodd(x), x)), [5, 2, 6, 8, 3])
(false, 24)
julia> foldl(TeeRF(reducingfunction(Map(isodd), &), +), Map(identity), [5, 2, 6, 8, 3])
(false, 24)
Transducers.wheninit
— Functionwheninit(oninit, rf) -> rf′
wheninit(oninit) -> rf -> rf′
whenstart(start, rf) -> rf′
whenstart(start) -> rf -> rf′
whencomplete(complete, rf) -> rf′
whencomplete(complete) -> rf -> rf′
whencombine(combine, rf) -> rf′
whencombine(combine) -> rf -> rf′
Add initialization/completion/merging phase to arbitrary reducing function.
The functions passed to those combinators are used as follows in foldl
:
init′ = oninit() # if oninit is given; otherwise standard `init`-preprocessing
acc = start(init′)
for x in collection
acc += rf(acc, x)
end
result = acc
return complete(result)
In foldxt
, a collection is split in multiple parts and then above foldl
except for complete
is run on them, yielding multiple result
s which are combined by repeatedly calling combine(result_1, result_2)
. Note that this allows non-associative function for next
while combine
must be associative.
See also next
, start
, complete
, and combine
.
Arguments
rf
: reducing functiononinit
: nullary function that generates an initial value forrf
start
: unary function that pre-process the initial value forrf
complete
: unary function that post-process the accumulatorcombine
: (approximately) associative binary function for combining multiple results ofrf
(before post-processed bycomplete
).
Extended help
Examples
An example for using non-associative reducing function in foldxt
:
julia> using Transducers
julia> collector! = push! |> whencombine(append!) |> wheninit(() -> []);
julia> foldxt(collector!, Filter(isodd), 1:5; basesize = 1)
3-element Vector{Any}:
1
3
5
More "tightly" typed vector can returned by using BangBang.jl interface:
julia> collector!! = push!! |> whencombine(append!!) |> wheninit(Vector{Union{}});
julia> foldxt(collector!!, Filter(isodd), 1:5; basesize = 1)
3-element Vector{Int64}:
1
3
5
Online averaging algorithm can be implemented, e.g., by:
julia> averaging = function add_average((sum, count), x)
(sum + x, count + 1)
end |> wheninit() do
(Init(+), 0)
end |> whencombine() do (sum1, count1), (sum2, count2)
(sum1 + sum2), (count1 + count2)
end |> whencomplete() do (sum, count)
sum / count
end;
julia> foldl(averaging, Filter(isodd), 1:5)
3.0
julia> foldxt(averaging, Filter(isodd), 1:50; basesize = 1)
25.0
An alternative implementation is to use Map
to construct a singleton solution and then merge it into the accumulated solution:
julia> averaging2 = function merge_average((sum1, count1), (sum2, count2))
(sum1 + sum2, count1 + count2)
end |> whencomplete() do (sum, count)
sum / count
end |> Map() do x
(x, 1)
end'; # `'` here is important
julia> foldl(averaging2, Filter(isodd), 1:5)
3.0
julia> foldxt(averaging2, Filter(isodd), 1:50; basesize = 1)
25.0
Transducers.whenstart
— FunctionSee wheninit
Transducers.whencomplete
— FunctionSee wheninit
Transducers.whencombine
— FunctionSee wheninit
Early termination
Transducers.Reduced
— TypeReduced
The type signaling transducible processes to abort.
Examples
julia> using Transducers
julia> function step_demo(y, x)
if x > 5
return reduced(y)
else
return y + x
end
end;
julia> result = transduce(Map(identity), Completing(step_demo), 0, 1:10)
Reduced(15)
julia> result isa Reduced
true
julia> unreduced(result)
15
julia> result = transduce(Map(identity), Completing(step_demo), 0, 1:4)
10
julia> result isa Reduced
false
julia> unreduced(result)
10
Transducers.reduced
— Functionreduced([x = nothing])
Stop transducible process with the final value x
(default: nothing
). Return x
as-is if it already is a reduced
value.
This API is modeled after ensure-reduced
in Clojure.
Examples
julia> using Transducers
julia> foldl(Enumerate(), "abcdef"; init=0) do y, (i, x)
if x == 'd'
return reduced(y)
end
return y + i
end
6
julia> foreach(Enumerate(), "abc") do (i, x)
println(i, ' ', x)
if x == 'b'
return reduced()
end
end;
1 a
2 b
Transducers.unreduced
— Functionunreduced(x)
Unwrap x
if it is a Reduced
; do nothing otherwise.
This API is modeled after unreduced
in Clojure.
Transducers.ifunreduced
— Functionifunreduced(f, [x])
Equivalent to unreduced(x)
if x
is a Reduced
; otherwise run f(x)
. Return a curried version if x
is not provided.
See: foreach
.
Examples
julia> using Transducers
julia> 1 |> ifunreduced() do x
println("called with x = ", x)
end
called with x = 1
julia> reduced(1) |> ifunreduced() do x
println("called with x = ", x)
end
1
Notice that nothing is printed in the last example.
Implementation
ifunreduced(f) = x -> ifunreduced(f, x)
ifunreduced(f, x::Reduced) = unreduced(x)
ifunreduced(f, x) = f(x)
Executors
Transducers.SequentialEx
— TypeSequentialEx(; simd)
Sequential fold executor. It can be passed to APIs from packages such as Folds.jl and FLoops.jl to run the algorithm sequentially.
See also: foldxl
, ThreadedEx
and DistributedEx
.
Keyword Arguments
simd
: Iftrue
or:ivdep
, enable SIMD usingBase.@simd
. If:ivdep
, use@simd ivdep for ... end
variant. Read Julia manual ofBase.@simd
to understand when it is appropriate to use this option. For example,simd = :ivdep
must not be used with stateful transducer likeScan
. Iffalse
(default),Base.@simd
is not used.
Examples
julia> using Folds
julia> Folds.sum(1:3, SequentialEx())
6
Transducers.ThreadedEx
— TypeThreadedEx(; basesize, stoppable, nestlevel, simd)
Multi-threaded fold executor. This is the default [1] parallel executor used by Folds.jl and FLoops.jl.
See also: foldxt
, SequentialEx
and DistributedEx
.
Keyword Arguments
basesize::Integer = amount(reducible) ÷ nthreads()
: A size of chunk inreducible
that is processed by each worker. A smaller size may be required when:stoppable::Bool
: [This option usually does not have to be set manually.] The threaded fold executed in the "stoppable" mode used for optimizing reduction withreduced
has a slight overhead ifreduced
is not used. This mode can be disabled by passingstoppable = false
. It is usually automatically detected and set appropriately. Note that this option is purely for optimization and does not affect the result value.nestlevel::Union{Integer,Val}
: Specify how many innerCat
(flatten) transducers to be multi-threaded (usingTCat
). It must be a positive integer,Val
of positive integer, orVal(:inf)
.Val(:inf)
means to use multi-threading for allCat
transducers. Note thatCat
transducer should be statically known. That is to say, the fold implementation sees twoCat
s in... |> Map(f) |> Cat() |> Cat()
but only oneCat
in... |> Map(x -> f(x) |> Cat()) |> Cat()
even though they are semantically identical.simd
: Iftrue
or:ivdep
, enable SIMD usingBase.@simd
. If:ivdep
, use@simd ivdep for ... end
variant. Read Julia manual ofBase.@simd
to understand when it is appropriate to use this option. For example,simd = :ivdep
must not be used with stateful transducer likeScan
. Iffalse
(default),Base.@simd
is not used.
Examples
julia> using Folds
julia> Folds.sum(1:3, ThreadedEx(basesize = 1))
6
Transducers.DistributedEx
— TypeDistributedEx(; pool, basesize, threads_basesize, simd)
Distributed fold executor. It can be passed to APIs from packages such as Folds.jl and FLoops.jl to run the algorithm sequentially.
See also: foldxd
, SequentialEx
and ThreadedEx
.
Keyword Arguments
pool::AbstractWorkerPool
: Passed toDistributed.remotecall
.basesize::Integer = amount(array) ÷ nworkers()
: A size of chunk inarray
that is processed by each worker. A smaller size may be required when computation time for processing each item can fluctuate a lot.threads_basesize::Integer = basesize ÷ nthreads()
: A size of chunk inarray
that is processed by each task in each worker process. The default setting assumes that the number of threads used in all workers are the same. For heterogeneous setup where each worker process has different number of threads, it may be required to use smallerthreads_basesize
andbasesize
to get a good performance.simd
: Iftrue
or:ivdep
, enable SIMD usingBase.@simd
. If:ivdep
, use@simd ivdep for ... end
variant. Read Julia manual ofBase.@simd
to understand when it is appropriate to use this option. For example,simd = :ivdep
must not be used with stateful transducer likeScan
. Iffalse
(default),Base.@simd
is not used.
Examples
julia> using Folds
julia> Folds.sum(1:3, DistributedEx())
6
Transducers.PreferParallel
— TypePreferParallel(; simd, basesize)
A "placeholder" executor that indicates preference to parallel execution.
This lets the input data collection decide preferred execution strategy (e.g., CUDAEx
for CuArray
when FoldsCUDA.jl is available), assuming that the reducing function is associative. The default executor is ThreadedEx
. As an optional feature, some input data collections support (e.g., AbstractChannel
) automatically demoting the execution strategy to SequentialEx
. An error is thrown if the automatic detection fails,
Miscellaneous
Transducers.SplitBy
— TypeSplitBy(f; [keepend = false,] [keepempty = false,])
Split input collection into chunks delimited by the elements on which f
returns true
. This can be used to implement parallel and lazy versions of functions like eachline
and split
.
If keepend
is true
(or Val(true)
), include the "delimiter"/end element at the end of each chunk. If keepempty
is true
(or Val(true)
), include empty chunks. When keepend
is true
, the value of keepempty
is irrelevant since the chunks cannot be empty (i.e., it at least contains the end).
The input collection (xs
in SplitBy(...)(xs)
) has to support eachindex
and view
or SubString
.
Extended Help
Examples
For demonstration, consider the following input stream and SplitBy(iszero; ...)
used with the following options:
input keepend=false keepend=false keepend=true
keepempty=false keepempty=true
1 `. `. `.
2 | y1 | y1 | y1
3 ,' ,' |
0 ,'
1 `. `. `.
2 | y2 | y2 | y2
3 | | |
4 ,' ,' |
0 __ y3 ,'
0 ] y3
1 `. `. `.
2 | y3 | y4 | y4
In the above diagram, yi
(i = 1, 2, 3, 4
) are the output of SplitBy
. This can be checked in the REPL easily as follows. (Note: we are using Map(collect)
for cleaner printing; it's not required unless the arrays is mutated in the downstream.)
julia> using Transducers
julia> xs = [1, 2, 3, 0, 1, 2, 3, 4, 0, 0, 1, 2]; # input
julia> xs |> SplitBy(iszero) |> Map(collect) |> collect
3-element Vector{Vector{Int64}}:
[1, 2, 3]
[1, 2, 3, 4]
[1, 2]
julia> xs |> SplitBy(iszero; keepempty = true) |> Map(collect) |> collect
4-element Vector{Vector{Int64}}:
[1, 2, 3]
[1, 2, 3, 4]
[]
[1, 2]
julia> xs |> SplitBy(iszero; keepend = true) |> Map(collect) |> collect
4-element Vector{Vector{Int64}}:
[1, 2, 3, 0]
[1, 2, 3, 4, 0]
[0]
[1, 2]
Transducers.Transducer
— MethodTransducer(iterator::Iterators.Generator)
Transducer(iterator::Iterators.Filter)
Transducer(iterator::Iterators.Flatten)
Extract "processing" part of an iterator
as a Transducer
. The "data source" iterator (i.e., xs
in (f(x) for x in xs)
) is ignored and nothing
must be used as a place holder (i.e., (f(x) for x in nothing)
).
See also eduction
.
New in version 0.3.
Examples
julia> using Transducers
julia> xf1 = Transducer(2x for x in nothing if x % 2 == 0);
julia> xf2 = opcompose(Filter(x -> x % 2 == 0), Map(x -> 2x)); # equivalent
julia> xs = 1:10
collect(xf1, xs) == collect(xf2, xs)
true
Transducers.Transducer
— MethodTransducer(o::OnlineStat)
Use an OnlineStat
as a stateful transducer.
It is implemented as:
opcompose(Scan(fit!, CopyInit(o)), Map(value))
Examples
julia> using Transducers
using OnlineStats: Mean
julia> collect(Transducer(Mean()), 1:4)
4-element Vector{Float64}:
1.0
1.5
2.0
2.5
Transducers.reducingfunction
— Functionreducingfunction(xf, step; simd)
xf'(step; simd)
Apply transducer xf
to the reducing function step
to create a new reducing function.
New in version 0.3.
Be careful using reducingfunction
with stateful transducers like Scan
with mutable init
(e.g., Scan(push!, [])
). See more in Examples below.
Arguments
xf::Transducer
: A transducer.step
: A callable which accepts 1 and 2 arguments. If it only accepts 2 arguments, wrap it withCompleting
to "add" 1-argument form (i.e.,complete
protocol).
Keyword Arguments
simd
:false
,true
, or:ivdep
. Seemaybe_usesimd
.
Examples
julia> using Transducers
julia> rf = reducingfunction(Map(x -> x + 1), *);
julia> rf(10, 2) === 10 * (2 + 1)
true
Warning: Be careful when using reducingfunction
with stateful transducers
Stateful Transducer
s themselves in Transducers.jl are not inherently broken with reducingfunction
. However, it can produce incorrect results when combined with mutable states:
julia> scan_state = [];
julia> rf_bad = opcompose(Scan(push!, scan_state), Cat())'(string);
julia> transduce(rf_bad, "", 1:3)
"112123"
The first run works. However, observe that the vector scan_state
is not empty anymore:
julia> scan_state
3-element Vector{Any}:
1
2
3
Thus, the second run produces an incorrect result:
julia> transduce(rf_bad, "", 1:3)
"123112312123123"
One way to solve this issue is to use CopyInit
or OnInit
.
julia> scan_state = CopyInit([])
CopyInit(Any[])
julia> rf_good = opcompose(Scan(push!, scan_state), Cat())'(string);
julia> transduce(rf_good, "", 1:3)
"112123"
julia> scan_state
CopyInit(Any[])
julia> transduce(rf_good, "", 1:3)
"112123"
reducingfunction([xf::Transducer,] o::OnlineStat; simd)
Convert an OnlineStat
to a reducing function. Returned function can be used with foldl
, foldxt
, and foldxd
. Note that input o
is only used as a "prototype"; i.e., it's not going to be mutated.
Examples
julia> using Transducers
using OnlineStats: Mean
julia> foldl(reducingfunction(Mean()), Map(x -> x^2), 1:4)
Mean: n=4 | value=7.5
julia> foldl(Mean(), Map(x -> x^2), 1:4) # equivalent to above
Mean: n=4 | value=7.5
julia> foldxt(Mean(), Map(x -> x^2), 1:4) # threaded
Mean: n=4 | value=7.5
julia> foldl(Mean(), eduction(x^2 for x in 1:4)) # ditto
Mean: n=4 | value=7.5
julia> foldxt(Mean(), eduction(x^2 for x in 1:4)) # ditto
Mean: n=4 | value=7.5
foldxd
can be used instead of foldxt
. However the usual caveats of code availability for Distributed.jl apply.
Transducers.Completing
— TypeCompleting(function)
Wrap a function
to add a no-op complete
protocol. Use it when passing a function
without unary method to transduce
etc.
This API is modeled after completing
in Clojure.
Transducers.Init
— ConstantInit
Init(op) :: InitialValues.InitialValue
The canonical initializer; i.e., a singleton placeholder usable for init
argument of foldl
for binary functions with known initial values.
When init = Init
is passed to foldl
etc., Init(op)
is called for the bottom reducing function op
during the start
phase. Init(op)
returns InitialValue(op)
which acts as the canonical initial value of op
.
Examples
julia> using Transducers
julia> foldl(+, 1:3 |> Filter(isodd); init = Init)
4
julia> foldl(+, 2:2:4 |> Filter(isodd); init = Init)
InitialValue(+)
Extended help
Note that op
passed to foldl
etc. must be known to InitialValues.jl:
julia> unknown_op(a, b) = a + b;
julia> foldl(unknown_op, 2:2:4 |> Filter(isodd); init = Init)
ERROR: IdentityNotDefinedError: `init = Init` is specified but the identity element `InitialValue(op)` is not defined for
op = unknown_op
[...]
InitialValues.asmonoid
can be used to wrap a binary function to add ("adjoin") the identity value to its domain:
julia> using InitialValues: asmonoid
julia> foldl(asmonoid(unknown_op), 2:2:4 |> Filter(isodd); init = Init)
InitialValue(::InitialValues.AdjoinIdentity{typeof(unknown_op)})
When start(rf, Init)
is called with a composite reducing function rf
, Init(rf₀)
is called for the bottom reducing function rf₀
of rf
:
julia> rf = Take(3)'(+); # `+` is the bottom reducing function
julia> acc = Transducers.start(rf, Init);
julia> Transducers.unwrap(rf, acc)
(3, InitialValue(+))
Transducers.OnInit
— TypeOnInit(f)
Call a callable f
to create an initial value.
See also CopyInit
.
OnInit
or CopyInit
must be used whenever using in-place reduction with foldxt
etc.
Examples
julia> using Transducers
julia> xf1 = Scan(push!, [])
Scan(push!, Any[])
julia> foldl(right, xf1, 1:3)
3-element Vector{Any}:
1
2
3
julia> xf1
Scan(push!, Any[1, 2, 3])
Notice that the array is stored in xf1
and mutated in-place. As a result, second run of foldl
contains the results from the first run:
julia> foldl(right, xf1, 10:11)
5-element Vector{Any}:
1
2
3
10
11
This may not be desired. To avoid this behavior, create an OnInit
object which takes a factory function to create a new initial value.
julia> xf2 = Scan(push!, OnInit(() -> []))
Scan(push!, OnInit(##9#10()))
julia> foldl(right, xf2, 1:3)
3-element Vector{Any}:
1
2
3
julia> foldl(right, xf2, [10.0, 11.0])
2-element Vector{Any}:
10.0
11.0
Keyword argument init
for transducible processes also accept an OnInit
:
julia> foldl(push!, Map(identity), "abc"; init=OnInit(() -> []))
3-element Vector{Any}:
'a': ASCII/Unicode U+0061 (category Ll: Letter, lowercase)
'b': ASCII/Unicode U+0062 (category Ll: Letter, lowercase)
'c': ASCII/Unicode U+0063 (category Ll: Letter, lowercase)
To create a copy of a mutable object, CopyInit
is easier to use.
However, more powerful and generic pattern is to use push!!
from BangBang.jl and initialize init
with Union{}[]
so that it automatically finds the minimal element type.
julia> using BangBang
julia> foldl(push!!, Map(identity), "abc"; init=Union{}[])
3-element Vector{Char}:
'a': ASCII/Unicode U+0061 (category Ll: Letter, lowercase)
'b': ASCII/Unicode U+0062 (category Ll: Letter, lowercase)
'c': ASCII/Unicode U+0063 (category Ll: Letter, lowercase)
Transducers.CopyInit
— TypeCopyInit(value)
This is equivalent to OnInit(() -> deepcopy(value))
.
New in version 0.3.
Examples
julia> using Transducers
julia> init = CopyInit([]);
julia> foldl(push!, Map(identity), 1:3; init=init)
3-element Vector{Any}:
1
2
3
julia> foldl(push!, Map(identity), 1:3; init=init) # `init` can be reused
3-element Vector{Any}:
1
2
3
Transducers.right
— Functionright([l, ]r) -> r
It is simply defined as
right(l, r) = r
right(r) = r
This function is meant to be used as step
argument for foldl
etc. for extracting the last output of the transducers.
Initial value must be manually specified. In 0.2, it was automatically set to nothing
.
Examples
julia> using Transducers
julia> foldl(right, Take(5), 1:10)
5
julia> foldl(right, Drop(5), 1:3; init=0) # using `init` as the default value
0
Transducers.setinput
— Functionsetinput(ed::Eduction, coll)
Set input collection of eduction ed
to coll
.
Previously, setinput
combined with eduction
was a recommended way to use transducers in a type stable manner. As of v0.3, all the foldl
-like functions and eduction
are type stable for many cases. This workaround is no more necessary.
Examples
julia> using Transducers
julia> ed = eduction(Map(x -> 2x), Float64[]);
julia> xs = ones(2, 3);
julia> foldl(+, setinput(ed, xs))
12.0
Transducers.AdHocFoldable
— TypeAdHocFoldable(foldl, [collection = nothing])
Provide a different way to fold collection
without creating a wrapper type.
Arguments
foldl::Function
: a function that implements__foldl__
.collection
: a collection passed to the last argument offoldl
.
Examples
julia> using Transducers
using Transducers: @next, complete
using ArgCheck
julia> function uppertriangle(A::AbstractMatrix)
@argcheck !Base.has_offset_axes(A)
return AdHocFoldable(A) do rf, acc, A
for j in 1:size(A, 2), i in 1:min(j, size(A, 1))
acc = @next(rf, acc, @inbounds A[i, j])
end
return complete(rf, acc)
end
end;
julia> A = reshape(1:6, (3, 2))
3×2 reshape(::UnitRange{Int64}, 3, 2) with eltype Int64:
1 4
2 5
3 6
julia> collect(Map(identity), uppertriangle(A))
3-element Vector{Int64}:
1
4
5
julia> function circularwindows(xs::AbstractVector, h::Integer)
@argcheck !Base.has_offset_axes(xs)
@argcheck h >= 0
@argcheck 2 * h + 1 <= length(xs)
return AdHocFoldable(xs) do rf, acc, xs
buffer = similar(xs, 2 * h + 1)
@inbounds for i in 1:h
buffer[1:h - i + 1] .= @view xs[end - h + i:end]
buffer[h - i + 2:end] .= @view xs[1:h + i]
acc = @next(rf, acc, buffer)
end
for i in h + 1:length(xs) - h
acc = @next(rf, acc, @inbounds @view xs[i - h:i + h])
end
@inbounds for i in 1:h
buffer[1:end - i] .= @view xs[end - 2 * h + i:end]
buffer[end - i + 1:end] .= @view xs[1:i]
acc = @next(rf, acc, buffer)
end
return complete(rf, acc)
end
end;
julia> collect(Map(collect), circularwindows(1:9, 2))
9-element Vector{Vector{Int64}}:
[8, 9, 1, 2, 3]
[9, 1, 2, 3, 4]
[1, 2, 3, 4, 5]
[2, 3, 4, 5, 6]
[3, 4, 5, 6, 7]
[4, 5, 6, 7, 8]
[5, 6, 7, 8, 9]
[6, 7, 8, 9, 1]
[7, 8, 9, 1, 2]
julia> expressions(str::AbstractString; kwargs...) =
AdHocFoldable(str) do rf, val, str
pos = 1
while true
expr, pos = Meta.parse(str, pos;
raise = false,
depwarn = false,
kwargs...)
expr === nothing && break
val = @next(rf, val, expr)
end
return complete(rf, val)
end;
julia> collect(Map(identity), expressions("""
x = 1
y = 2
"""))
2-element Vector{Expr}:
:(x = 1)
:(y = 2)
julia> counting = AdHocFoldable() do rf, acc, _
i = 0
while true
i += 1
acc = @next(rf, acc, i)
end
end;
julia> foreach(counting) do i
@show i;
i == 3 && return reduced()
end;
i = 1
i = 2
i = 3
Transducers.withprogress
— Functionwithprogress(foldable) -> foldable′
Wrap a foldable so that progress is shown in logging-based progress meter (e.g., Juno) during foldl
, foldxt
, foldxd
, etc.
For parallel reduction such as foldxt
and foldxd
, reasonably small basesize
and threads_basesize
(for foldxd
) must be used to ensure that progress information is updated frequently. However, it may slow down the computation if basesize
is too small.
Keyword Arguments
interval::Real
: Minimum interval (in seconds) for how often progress is logged.
Examples
julia> using Transducers
julia> xf = Map() do x
sleep(0.01)
x
end;
julia> foldl(+, xf, withprogress(1:100; interval=1e-3)) # see progress meter
5050
In foldl
and foldxt
, withprogress
can be nested. This is not supported in foldxd
.
julia> xf = opcompose(
MapCat() do x
withprogress(1:x; interval=1e-3) # nested progress
end,
Map() do x
sleep(0.5)
x
end,
);
julia> if VERSION >= v"1.3-alpha"
# Calling `sleep` in thread is safe in Julia 1.3:
foldxt(+, xf, withprogress(1:10; interval=1e-3); basesize=1)
else
foldl(+, xf, withprogress(1:10; interval=1e-3))
end
220
Deprecated
Base.reduce
— FunctionTransducers.dreduce
— FunctionBase.mapfoldl
— Functionmapfoldl(xf::Transducer, step, reducible; init, simd)
mapfoldl(::Transducer, rf, itr)
is deprecated. Use foldl(rf, ::Transducer, itr)
if you do not need to call single-argument rf
on complete
. Use foldl(whencomplete(rf, rf), ::Transducer, itr)
to call the single-argument method of rf
on complete.
Like foldl
but step
is not automatically wrapped by Completing
.
Examples
julia> using Transducers
julia> function step_demo(state, input)
@show state, input
state + input
end;
julia> function step_demo(state)
println("Finishing with state = ", state)
state
end;
julia> mapfoldl(Filter(isodd), step_demo, 1:4, init=0.0)
(state, input) = (0.0, 1)
(state, input) = (1.0, 3)
Finishing with state = 4.0
4.0
Base.mapreduce
— Functionmapreduce(xf, step, reducible; init, simd)
mapreduce(::Transducer, rf, itr)
is deprecated. Use foldxt(rf, ::Transducer, itr)
if you do not need to call single-argument rf
on complete
. Use foldxt(whencomplete(rf, rf), ::Transducer, itr)
to call the single-argument method of rf
on complete.
Like foldxt
but step
is not automatically wrapped by Completing
.
Transducers.Keep
— FunctionKeep(f)
Pass non-nothing
output of f
to the inner reducing step.
Keep(f)
is a deprecated. Use ... |> Map(f) |> NotA(Nothing)
. If f
does not return a Some
, KeepSomething
can also be used.
Examples
julia> using Transducers
julia> xf = Keep() do x
if x < 3
x + 1
end
end;
julia> collect(xf, 1:5)
2-element Vector{Int64}:
2
3
- julia_issue_22891See also: break with value + loop else clauses (JuliaLang/julia#22891)
- 1More specifically, Folds.jl and FLoops.jl uses
PreferParallel
which in turn defaults toThreadedEx
.