How to write transducers

Transducers don't export public interface for implementing transducers (and reducible collections). Let's import some handy ones:

using Transducers
using Transducers: Transducer, R_, next, inner, xform

Stateless transducer

Let's write manually what opcompose(Filter(x -> x isa Int), Map(x -> x + 1)) would do:

struct AddOneIfInt <: Transducer end

function Transducers.next(rf::R_{AddOneIfInt}, result, input)
    if input isa Int

Output input + 1 is passed to the "inner" reducing step:

        next(inner(rf), result, input + 1)
    else

Filtering out is done by "doing nothing"; return result-so-far as-is:

        result
    end
end

Ineed, for integer collection, it increments input by one:

collect(AddOneIfInt(), 1:5)
5-element Vector{Int64}:
 2
 3
 4
 5
 6

Non integer elements are filtered out:

collect(AddOneIfInt(), Any[3, nothing, 2.0, missing, 5])
2-element Vector{Int64}:
 4
 6

Stateful transducer

AddOneIfInt is a stateless transducer which is very easy to implement. A stateful transducer needs a bit more code.

using Transducers: start, complete, wrap, unwrap, wrapping
using Random

Let's define a transducer that spits out a random past element from the buffer:

struct RandomRecall <: Transducer
    history::Int
    seed::Int
end
RandomRecall() = RandomRecall(3, 0)

A stateful transducer needs to implement Transducers.start to "allocate" its private state. Here, the private state is a buffer and a random number generator state rng:

function Transducers.start(rf::R_{RandomRecall}, result)
    buffer = []
    rng = MersenneTwister(xform(rf).seed)
    private_state = (buffer, rng)
    return wrap(rf, private_state, start(inner(rf), result))
end

Stateful transducer needs to unwrap its private state inside Transducers.next and then re-wrap it. There is a helper function Transducers.wrapping does that with the do block:

function Transducers.next(rf::R_{RandomRecall}, result, input)
    wrapping(rf, result) do (buffer, rng), iresult

Pickup a random element to be passed to the inner reducing function. Replace it with the new incoming one in the buffer:

        if length(buffer) < xform(rf).history
            push!(buffer, input)
            iinput = rand(rng, buffer)
        else
            i = rand(rng, 1:length(buffer))
            iinput = buffer[i]
            buffer[i] = input
        end

Call the inner reducing function. Note that iresult unwrapped by Transducers.wrapping must be passed to next:

        iresult = next(inner(rf), iresult, iinput)
        return (buffer, rng), iresult
    end
end

Any transducer with custom Transducers.start must have a corresponding Transducers.complete. It is responsible for unwrapping the result and call the complete for the inner reducing function.

function Transducers.complete(rf::R_{RandomRecall}, result)
    _private_state, inner_result = unwrap(rf, result)
    return complete(inner(rf), inner_result)
end

Here is how it works:

collect(RandomRecall(), 1:5)
5-element Vector{Int64}:
 1
 2
 2
 3
 4

Indeed, it picks up some random elements from the past elements. With slightly more complex transducer:

1:100 |> Filter(isodd) |> RandomRecall() |> Filter(x -> x > 10) |> Take(5) |> collect
5-element Vector{Int64}:
 11
 13
 15
 19
 17

Note that Transducers.complete can do more than unwrap and complete. It is useful for, e.g., flushing the buffer.

function Transducers.complete(rf::R_{RandomRecall}, result)
    (buffer, _), iresult = unwrap(rf, result)
    for x in buffer

Note that inner next can be called more than one time inside next and complete:

        iresult = next(inner(rf), iresult, x)
    end

complete for inner reducing function must be called exactly once:

    return complete(inner(rf), iresult)
end

This then adds 3 (= RandomRecall().history) more elements to the output:

collect(RandomRecall(), 1:5)
8-element Vector{Int64}:
 1
 2
 2
 3
 4
 1
 2
 5

This page was generated using Literate.jl.