Tutorial: missing value handling
This tutorial illustrates the usage of Transducers.jl by stepping through various handling of missing values.
using Transducers
Dot product
Here is a simple way to compute a dot product using mapfoldl
and MapSplat
:
mapfoldl(MapSplat(*), +, zip(1:3, 10:2:14))
76
Let's see what it does step by step. First we create a "printer" transducer using the following function (see Map
):
xf_printer(label) = Map() do x
println(label, ": ", x)
return x # just return it as-is
end
This transducer just pass-through the input while printing its value (prefixed by a label
). Let's sandwich the previous MapSplat(*)
with it:
mapfoldl(
xf_printer(" input") |> MapSplat(*) |> xf_printer("output"),
+, zip(1:3, 10:2:14))
input: (1, 10)
output: 10
input: (2, 12)
output: 24
input: (3, 14)
output: 42
You can see that the input tuple (1, 10)
is splatted into function *
by MapSplat
which then outputs 10
. This is repeated for all inputs.
Perhaps unfortunately, this way of computing a dot product propagates any missing values contained in the input arrays to the result (which may actually be desired in certain cases).
xs = [1, missing, 3, 2]
ys = [10, 14, missing, 12]
mapfoldl(MapSplat(*), +, zip(xs, ys))
missing
However, it is very simple to ignore any missing values using OfType
:
xf_mdot = OfType(Tuple{Vararg{Number}}) |> MapSplat(*)
mapfoldl(xf_mdot, +, zip(xs, ys))
34
Here, Tuple{Vararg{Number}}
is a type that matches with a tuple of any length with numbers. It does not match with a tuple if it has a missing
.
@assert (1, 0.5) isa Tuple{Vararg{Number}}
@assert (1, 0.5, 2im) isa Tuple{Vararg{Number}}
@assert !((1, missing) isa Tuple{Vararg{Number}})
Covariance
Transducer xf_mdot
above can also be used to compute the covariance. First, we need the number of pairs of elements in xs
and ys
that both of them are not missing
:
nonmissings = mapfoldl(OfType(Tuple{Vararg{Number}}) |> Count(),
right,
zip(xs, ys);
init = 0)
2
We do this by using Count
and right
. Count
ignores input and count the number of times the input is provided. Since OfType(Tuple{Vararg{Number}})
provides the inputs to the downstream transducer only if there is no missing
values, this correctly counts the number of non-missing pairs. Function right
is simply defined as right(l, r) = r
(and right(r) = r
). Thus, the whole mapfoldl
returns the last output of Count
. In case Count
never gets called (i.e., there are no non-missing pairs), we pass init=0
.
mapfoldl(OfType(Tuple{Vararg{Number}}) |> Count(),
right,
zip(Int[], Int[]);
init = 0)
0
Finally, we have to pre-process the input to xf_mdot
by subtracting the average. It's easy to do with Map
:
using Statistics: mean
function xf_demean(xs, ys)
xmean = mean(skipmissing(xs))
ymean = mean(skipmissing(ys))
return Map(((x, y),) -> (x - xmean, y - ymean))
end
mapfoldl(xf_demean(xs, ys) |> xf_mdot, +, zip(xs, ys)) / nonmissings
1.0
Addition
How do we use transducers for vector-to-vector transformation? Here is a function to calculate $y = x + y$ while ignoring missing values in $x$. First, mandatory input shape check:
function add_skipmissing!(ys, xs)
length(ys) == length(xs) || error("length(ys) != length(xs)")
firstindex(ys) == 1 || error("firstindex(ys) != 1")
For filtering out missing values from xs
while tracking indices, we use Enumerate
and Filter
. To iterate over the output of the transducer, foreach
is used instead of mapfoldl
since mutating an array is better expressed as a side-effect than a fold.
foreach(Enumerate() |> Filter(!(ismissing ∘ last)), xs) do (i, xi)
@inbounds ys[i] += xi
end
We then return the mutated value to behave like the rest of Julia functions (push!
, mul!
, etc.):
return ys
end
Example:
add_skipmissing!([100, 110, 120], [1, missing, 2])
3-element Array{Int64,1}:
101
110
122
Vectorized reduction
foldl
, mapfoldl
, etc. in Base
support dims
argument. Transducers.jl does not support this argument as of writing. However, this can easily be emulated using eachcol
, eachrow
, or eachslice
iterators in Base
.
xs = [
0 missing 1 2
3 4 5 missing
missing 6 7 missing
]
function xf_sum_columns(prototype)
T = Base.nonmissingtype(eltype(prototype)) # subtract Missing from type
dims = size(prototype)
return Scan(add_skipmissing!, Initializer(_ -> zeros(T, dims)))
end
We use Initializer
here to allocate the "output array" into which the columns are added by add_skipmissing!
.
mapfoldl(xf_sum_columns(xs[:, 1]), right, eachcol(xs))
3-element Array{Int64,1}:
3
12
13
Above computation returns the sum over each row without taking into account the relationship within a column. Another possibly useful reduction is the sum of the columns with no missing values. This can easily be done by prepending a filter:
mapfoldl(Filter(x -> !any(ismissing, x)) |> xf_sum_columns(xs[:, 1]),
right, eachcol(xs))
3-element Array{Int64,1}:
1
5
7
Note that above combination of Scan
and right
is redundant. For example, we can simply pass add_skipmissing!
to "normal" foldl
:
foldl(add_skipmissing!, eachcol(xs), init=zeros(Int, size(xs, 1)))
3-element Array{Int64,1}:
3
12
13
However, packaging it as a transducer is sometimes useful as it can be composed with other transducers and "bottom" reducing function. For example, vectorized version of cumsum
can easily obtained by composing it with append!
(and then reshape
after mapfoldl
):
result = mapfoldl(
xf_sum_columns(xs[:, 1]),
Completing(append!),
eachcol(xs);
init = Int[])
reshape(result, (size(xs, 1), :))
3×4 Array{Int64,2}:
0 0 1 3
3 7 12 12
0 6 13 13
Note that we need Completing
here since append!
does not have the unary method used for complete
protocol.
Argmax
Another useful operation to do ignoring missing values is argmax
/argmin
. It can be implemented using Enumerate() |> Filter(!(ismissing ∘ last))
(see also add_skipmissing!
above) composed with ScanEmit
. We first need to define a function to be called by ScanEmit
:
# ,--- current state
# |
# | ,-- input
# | |
function argmax_step((argmax, max), (index, value))
argmax, max = value > max ? (index, value) : (argmax, max)
return argmax, (argmax, max)
# \ \
# \ \__ next state
# \
# \__ output
end
This function is passed to ScanEmit
with the initial state:
xf_argmax = Enumerate() |> Filter(!(ismissing ∘ last)) |>
ScanEmit(argmax_step, (0, typemin(Int)))
# |
# initial state
As ScanEmit
is one of the most complex (and powerful) transducer, it may require some comments on how above code works:
The state
(argmax, max)
is initialized to(0, typemin(Int))
inxf_argmax
. This is the first value passed to the first argument(argmax, max)
ofargmax_step
.The upstream transducer
Enumerate()
provides(index, value)
-pair which becomes the input (the second argument) ofargmax_step
.Function
argmax_step
must return a pair. The first item becomes the output ofScanEmit
. In this case that's the index of the largest item seen so far.The second item in the returned pair is fed back to
argmax_step
in the next call.
We have the argmax function by extracting the last output of xf_argmax
:
mapfoldl(xf_argmax, right, [1, 3, missing, 2])
2
Side note: We use typemin(Int)
as the initial value of max
for simplicity. In practice, it should be typemin(eltype(input_array))
. See the next section for how to do it using Initializer
.
Extrema
Transducer xf_argmax
in the previous section only outputs the index of the maximum element so far. To output the maximum element as well, we can simply use Scan
. Also, while we are at it, let's support both argmax and argmin. To this end, we parametrize the function passed to Scan
by the comparison function >
and <
. Following function argext_step
takes the function >
or <
and return a function appropriate for Scan.
argext_step(should_update) =
((oldindex, oldvalue), (index, value)) ->
should_update(oldvalue, value) ? (index, value) : (oldindex, oldvalue)
Another problem with xf_argmax
is that it does not handle non-Int
input types. To properly handle different input types, we use Initializer
. Initializer
needs a function that maps an input type to the initial state. We first define a helper function
init_helper(::typeof(>), ::Type{Tuple{F, S}}) where {F, S} = (zero(F), typemax(S))
init_helper(::typeof(<), ::Type{Tuple{F, S}}) where {F, S} = (zero(F), typemin(S))
...which is partially evaluated before passed to Initializer
:
argext_init(should_update) = Initializer(TT -> init_helper(should_update, TT))
Finally we construct a Scan
transducer using argext_step
and argext_init
:
xf_scanext(should_update) = Scan(argext_step(should_update),
argext_init(should_update))
Passing <
gives us the argmax transducer:
mapfoldl(
Enumerate() |> OfType(Tuple{Integer, Number}) |> xf_scanext(<),
right, [1.0, 3.0, missing, 2.0])
We use OfType(Tuple{Integer, Number})
instead of Filter(!(ismissing ∘ last))
as typemin
used in xf_scanext
requires to know that the input type does not include missing
.
We now have transducers xf_scanext(<)
and xf_scanext(>)
for argmax and argmin, respectively. We can compute them concurrently by Zip
'ing them together:
xf_fullextrema = Enumerate() |> OfType(Tuple{Integer, Number}) |>
Zip(xf_scanext(>), xf_scanext(<))
mapfoldl(xf_fullextrema, right, [1.0, 3.0, -1.0, missing, 2.0])
((3, -1.0), (2, 3.0))
This transducer produces a tuple ((argmin, min), (argmax, max))
. To output only indices, append an appropriate Map
:
xf_argextrema =
xf_fullextrema |> Map() do ((argmin, min), (argmax, max))
(argmin, argmax)
end
mapfoldl(xf_argextrema, right, [1.0, 3.0, -1.0, missing, 2.0])
(3, 2)
This page was generated using Literate.jl.