Transducer interface

Core interface for transducers


When defining a transducer type X, it is often required to dispatch on type rf::R_{X} (Reducing Function) which bundles the current transducer xform(rf)::X and the inner reducing function inner(rf)::R_.

Transducers.xform(rf::R_{X}) -> xf :: X

Return the transducer xf associated with rf. Returned transducer xf is "atomic"; i.e., it is not a Composition transducer type.

Transducers.start(rf::R_{X}, state)

This is an optional interface for a transducer. Default implementation just calls start of the inner reducing function; i.e.,

start(rf::Reduction, result) = start(inner(rf), result)

If the transducer X is stateful, it can "bundle" its private state with wrap:

start(rf::R_{X}, result) = wrap(rf, PRIVATE_STATE, start(inner(rf), result))

where PRIVATE_STATE is an initial value for the private state that can be used inside next via wrapping.

See Take, PartitionBy, etc. for real-world examples.

Side notes: There is no related API in Clojure's Transducers. Transducers.jl uses it to implement stateful transducers using "pure" functions. The idea is based on a slightly different approach taken in C++ Transducer library atria.

Transducers.nextFunction{X}, state, input)

This is the only required interface. It takes the following form (if start is not defined):

next(rf::R_{X}, result, input) =
    # code calling next(inner(rf), result, possibly_modified_input)

When calling next, it is almost always a better idea to use the macro form @next. See the details in its documentation.

See Map, Filter, Cat, etc. for real-world examples.

@next(rf, state, input)

It is expanded to

result = next(rf, state, input)
result isa Reduced && return result

This is usually the best way to call next as checking for Reduced is required to support early termination.

See also: next, Reduced, @return_if_reduced.

Transducers.complete(rf::R_{X}, state)

This is an optional interface for a transducer. If transducer X has some internal state, this is the last chance to "flush" the result.

See PartitionBy, etc. for real-world examples.

If start(rf::R_{X}, state) is defined, complete must unwarp state before returning state to the outer reducing function.

Transducers.jl 0.3

In Transducers.jl 0.2, complete had a fallback implementation to automatically call unwrap when wrap is called in start. Relying on this fallback implementation is now deprecated.

Transducers.combine(rf::R_{X}, state_left, state_right)

This is an optional interface for a transducer. If transducer X is stateful (i.e., wrap is used in start), it has to be able to combine the private states to support fold functions that require an associative reducing function such as foldxt. Typical implementation takes the following form:

function combine(rf::R_{X}, a, b)
    #   ,---- `ua` and `ub` are the private state of the transducer `X`
    #  /  ,-- `ira` and `irb` are the states of inner reducing functions
    # /  /
    ua, ira = unwrap(rf, a)
    ub, irb = unwrap(rf, b)
    irc = combine(inner(rf), ira, irb)
    uc = # somehow combine private states `ua` and `ub`
    return wrap(rf, uc, irc)

See ScanEmit, etc. for real-world examples.


Helpers for stateful transducers

wrap(rf::R_{X}, state, iresult)

Pack private state for reducing function rf (or rather the transducer X) with the result iresult returned from the inner reducing function inner(rf). This packed result is typically passed to the outer reducing function.

This is intended to be used only in start. Inside next, use wrapping.

Implementation detail

If iresult is a Reduced, wrap actually unwraps all internal state iresult recursively. However, this is an implementation detail that should not matter when writing transducers.

Consider a reducing step constructed as

rf = opcompose(xf₁, xf₂, xf₃)'(f)

where each xfₙ is a stateful transducer and hence needs a private state stateₙ and this stateₙ is constructed in each start(::R_{typeof(xfₙ)}, result). Then, calling start(rf, result)) is equivalent to

     state₁,                     # private state for xf₁
          state₂,                # private state for xf₂
               state₃,           # private state for xf₃

or equivalently

result₃ = result
result₂ = wrap(inner(inner(rf)), state₃, result₃)
result₁ = wrap(inner(rf),        state₂, result₂)
result₀ = wrap(rf,               state₁, result₁)

The inner most step function receives the original result as the first argument while transducible processes such as foldl only sees the outer-most "tree" result₀ during the reduction.

See wrapping, unwrap, and start.

unwrap(rf, result)

Unwrap wraped result to a private state and inner result. Following identity holds:

unwrap(rf, wrap(rf, state, iresult)) == (state, iresult)

This is intended to be used only in complete. Inside next, use wrapping.

wrapping(f, rf, result)

Function f must take two argument state and iresult, and return a tuple (state, iresult). This is intended to be used only in next, possibly with a do block.

next(rf::R_{MyTransducer}, result, input) =
    wrapping(rf, result) do my_state, iresult
        # code calling `next(inner(rf), iresult, possibly_modified_input)`
        return my_state, iresult  # possibly modified

See wrap, unwrap, and next.


Interface for reducibles

__foldl__(rf, init, reducible::T)

Left fold a reducible with reducing function rf and initial value init. This is primary an API for overloading when the reducible "container" or "context" (e.g., I/O stream) of type T can provide a better reduction mechanism than the default iterator-based one.

For a simple iterable type MyType, a valid implementation is:

function __foldl__(rf, val, itr::MyType)
    for x in itr
        val = @next(rf, val, x)
    return complete(rf, val)

although in this case default __foldl__ can handle MyType and thus there is no need for defining it. In general, defining __foldl__ is useful only when there is a better way to go over items in reducible than Base.iterate.

See also: @next.

@return_if_reduced expr

It transforms the given expression to:

val = expr
val isa Reduced && return val

See also @next.

Transducers.jl 0.3

In v0.2, the calling convention was @return_if_reduced complete(rf, val) and it was transformed to val isa Reduced && return reduced(complete(rf, unreduced(val))). For the rationale behind the change, see this commit message.


julia> using Transducers: @return_if_reduced

julia> @macroexpand @return_if_reduced f(x)
    #158#val = f(x)
    #= ... =#
        #158#val isa Transducers.Reduced && return #158#val
        #= ... =#