How to write transducers
Transducers don't export public interface for implementing transducers (and reducible collections). Let's import some handy ones:
using Transducers
using Transducers: Transducer, R_, next, inner, xformStateless transducer
Let's write manually what Filter(x -> x isa Int) |> Map(x -> x + 1) would do:
struct AddOneIfInt <: Transducer end
function Transducers.next(rf::R_{AddOneIfInt}, result, input)
if input isa IntOutput input + 1 is passed to the "inner" reducing step:
next(inner(rf), result, input + 1)
elseFiltering out is done by "doing nothing"; return result-so-far as-is:
result
end
endIneed, for integer collection, it increments input by one:
collect(AddOneIfInt(), 1:5)5-element Array{Any,1}:
2
3
4
5
6Non integer elements are filtered out:
collect(AddOneIfInt(), Any[3, nothing, 2.0, missing, 5])2-element Array{Any,1}:
4
6Notice that output eltype is Any; it can be fixed by defining outtype:
Transducers.outtype(::AddOneIfInt, _intype) = Int
collect(AddOneIfInt(), 1:5)5-element Array{Int64,1}:
2
3
4
5
6Stateful transducer
AddOneIfInt is a stateless transducer which is very easy to implement. A stateful transducer needs a bit more code.
using Transducers: start, complete, InType, wrap, unwrap, wrapping
using RandomLet's define a transducer that spits out a random past element from the buffer:
struct RandomRecall <: Transducer
history::Int
seed::Int
end
RandomRecall() = RandomRecall(3, 0)A stateful transducer needs to implement Transducers.start to "allocate" its private state. Here, the private state is a buffer and a random number generator state rng:
function Transducers.start(rf::R_{RandomRecall}, result)
buffer = InType(rf)[]
rng = MersenneTwister(xform(rf).seed)
private_state = (buffer, rng)
return wrap(rf, private_state, start(inner(rf), result))
endStateful transducer needs to unwrap its private state inside Transducers.next and then re-wrap it. There is a helper function Transducers.wrapping does that with the do block:
function Transducers.next(rf::R_{RandomRecall}, result, input)
wrapping(rf, result) do (buffer, rng), iresultPickup a random element to be passed to the inner reducing function. Replace it with the new incoming one in the buffer:
if length(buffer) < xform(rf).history
push!(buffer, input)
iinput = rand(rng, buffer)
else
i = rand(rng, 1:length(buffer))
iinput = buffer[i]
buffer[i] = input
endCall the inner reducing function. Note that iresult unwrapped by Transducers.wrapping must be passed to next:
iresult = next(inner(rf), iresult, iinput)
return (buffer, rng), iresult
end
end
Transducers.outtype(::RandomRecall, intype) = intypeIndeed, it picks up some random elements from the past elements:
collect(RandomRecall(), 1:5)5-element Array{Int64,1}:
1
1
2
2
1With slightly more complex transducer:
collect(Filter(isodd) |> RandomRecall() |> Filter(x -> x > 10) |> Take(5), 1:100)5-element Array{Int64,1}:
13
11
19
17
21Another overloadable API is Transducers.complete. It is invoked at the end of each transducible process. It is useful for, e.g., flushing buffer.
function Transducers.complete(rf::R_{RandomRecall}, result)
(buffer, _), iresult = unwrap(rf, result)
for x in bufferNote that inner next can be called more than one time inside next and complete:
iresult = next(inner(rf), iresult, x)
endcomplete for inner reducing function must be called exactly once:
return complete(inner(rf), iresult)
endThis then adds 3 (= RandomRecall().history) more elements to the output:
collect(RandomRecall(), 1:5)8-element Array{Int64,1}:
1
1
2
2
1
5
4
3This page was generated using Literate.jl.