Transducers for Julia

Transducers are transformations of "sequence" of input that can be composed very efficiently. The interface used by transducers naturally describes a wide range of processes that is expressible as a succession of steps. Furthermore, transducers can be defined without specifying the details of the input and output (collections, streams, channels, etc.) and therefore achieves a full reusability. Transducers are introduced by Rich Hickey, the creator of the Clojure language. His Strange Loop talk is a great introduction to the idea of transducers.

Transducers.jl is an implementation of the transducers in Julia. Aiming to satisfy high-performance needs of Julia users, Transducers.jl uses a formulation that is pure [pure] and aiding type-stability.



using Pkg


If you are familiar with iterators (see also Base.Iterators and IterTools.jl) it would look very familiar to you:

julia> using Transducers

julia> 1:3 |> Map(x -> 2x) |> collect  # double each element
3-element Vector{Int64}:

julia> 1:6 |> Filter(iseven) |> collect  # collect only evens
3-element Vector{Int64}:

julia> 1:3 |> MapCat(x -> 1:x) |> collect  # concatenate mapped results
6-element Vector{Int64}:

Transducers can be composed:

julia> 1:6 |> Filter(iseven) |> Map(x -> 2x) |> collect
3-element Vector{Int64}:

An efficient way to use transducers is combination with foldl. The computation is compiled down to an efficient loop you would write by hand::

julia> foldl(+, 1:6 |> Filter(iseven) |> Map(x -> 2x))

For more detailed discussions on the difference to iterators, see Comparison to iterators.

List of transducers

Here is the list of pre-defined transducers:

Broadcasting()Broadcast inner reducing function over elements in the input. Roughly speaking, it transforms the inner reducing function op to op′(a, b) = op.(a, b). However, it has a better memory usage and better initial value handling.
Cat()Concatenate/flatten nested iterators.
Consecutive(size, step = size)Sliding window of width size and interval step. Yield tuples.
Count([start[, step]])Generate a sequence start, start + step, start + step + step, and so on.
Dedupe()De-duplicate consecutive items. Comparison operator which identifies duplicates can be specified by the eq parameter, which defaults to == (equal).
Drop(n)Drop first n items.
DropLast(n)Drop last n items.
DropWhile(pred)Drop items while pred returns true consecutively. It becomes a no-op after pred returns a false.
Enumerate([start[, step]])Transducer variant of Base.enumerate. The start and step arguments are optional and have the same meaning as in Count.
Filter(pred)Skip items for which pred is evaluated to false.
FlagFirst()Output (isfirst, input) where isfirst::Bool is true only for the first iteration and input is the original input.
GroupBy(key, rf, [init])Group the input stream by a function key and then fan-out each group of key-value pairs to the reducing function rf.
Interpose(sep)Interleave input items with a sep.
Iterated(f, init)Generate a sequence init, f(init), f(f(init)), f(f(f(init))), and so on.
KeepSomething(f = identity)Pass non-nothing output of f to the inner reducing step after possibly unwrapping Some.
Map(f)Apply unary function f to each input and pass the result to the inner reducing step.
MapCat(f)Concatenate output of f which is expected to return an iterable.
MapSplat(f)Like Map(f) but calls f(input...) for each input and then pass the result to the inner reducing step.
NondeterministicThreading(; basesize, ntasks = nthreads())Parallelize inner reducing function using ntasks.
NotA(T)Skip items of type T. Unlike Filter(!ismissing), downstream transducers can have a correct type information for NotA(Missing).
OfType(T)Include only items of type T.
Partition(size, step = size, flush = false)Sliding window of width size and interval step. Yield vectors.
PartitionBy(f)Group input sequence into chunks in which f returns a same value consecutively.
ReduceIf(pred)Stop fold when pred(x) returns true for the output x of the upstream transducer.
ReducePartitionBy(f, rf, [init])Reduce partitions determined by isequal on the output value of f with an associative reducing function rf. Partitions are reduced on-the-fly and no intermediate arrays are allocated.
Replace(assoc)Replace each input with the value in the associative container assoc (e.g., a dictionary, array, string) if it matches with a key/index. Otherwise output the input as-is.
Scan(f, [init = Init])Accumulate input with binary function f and pass the accumulated result so far to the inner reduction step.
ScanEmit(f, init[, onlast])Accumulate input x with a function f with the call signature (u, x) -> (y, u) and pass the result y to the inner reduction step.
TCat(basesize::Integer)Threaded version of Cat (concatenate/flatten).
Take(n)Take n items from the input sequence.
TakeLast(n)Take last n items from the input sequence.
TakeNth(n)Output every n item to the inner reducing step.
TakeWhile(pred)Take items while pred returns true. Abort the reduction when pred returns false for the first time.
Unique(by = identity)Pass only unseen item to the inner reducing step.
Zip(xforms...)Zip outputs of transducers xforms in a tuple and pass it to the inner reduction step.

  • pure...although not pure in the strong sense as Base.@pure.