""" Functions that allow to route data around upstream iterator """
from __future__ import annotations
from typing import (
Any,
Callable,
Generator,
Iterable,
)
__all__ = ['StoreOnly', 'route_in', 'route_out']
class StoreOnly:
pass
[docs]
def route_out(iterable: Iterable,
data_store: list,
splitter: Callable[[Any], tuple[Any, Any]],
) -> Generator:
""" Route data around the consumer of this iterable
:func:`route_out` allows its user to:
1. store data that is received from an iterable,
2. determine whether this data should be yielded to a consumer of
``route_out``, by calling :func:`splitter`.
To determine which data is to be yielded to the consumer and which data
should only be stored but not yielded, :func:`route_out` calls
:func:`splitter`. :func:`splitter` is called for each item of the input
iterable, with the item as sole argument. The function should return a
tuple of two elements. The first element is the data that is to be
yielded to the consumer. The second element is the data that is to be
stored in the list ``data_store``. If the first element of the tuple is
``datalad_next.itertools.StoreOnly``, no data is yielded to the
consumer.
:func:`route_in` can be used to combine data that was previously
stored by :func:`route_out` with the data that is yielded by
:func:`route_out` and with the data the was not processed, i.e. not yielded
by :func:`route_out`.
The items yielded by :func:`route_in` will be in the same
order in which they were passed into :func:`route_out`, including the
items that were not yielded by :func:`route_out` because :func:`splitter`
returned ``StoreOnly`` in the first element of the result-tuple.
The combination of the two functions :func:`route_out` and :func:`route_in`
can be used to "carry" additional data along with data that is processed by
iterators. And it can be used to route data around iterators that cannot
process certain data.
For example, a user has an iterator to divide the number ``2`` by all
numbers in a list. The user wants the iterator to process all numbers in a
divisor list, except from zeros, In this case :func:`route_out` and
:func:`route_in` can be used as follows:
.. code-block:: python
from math import nan
from datalad_next.itertools import route_out, route_in, StoreOnly
def splitter(divisor):
# if divisor == 0, return `StoreOnly` in the first element of the
# result tuple to indicate that route_out should not yield this
# element to its consumer
return (StoreOnly, divisor) if divisor == 0 else (divisor, divisor)
def joiner(processed_data, stored_data):
#
return nan if processed_data is StoreOnly else processed_data
divisors = [0, 1, 0, 2, 0, 3, 0, 4]
store = list()
r = route_in(
map(
lambda x: 2.0 / x,
route_out(
divisors,
store,
splitter
)
),
store,
joiner
)
print(list(r))
The example about will print ``[nan, 2.0, nan, 1.0, nan, 0.6666666666666666, nan, 0.5]``.
Parameters
----------
iterable: Iterable
The iterable that yields the input data
data_store: list
The list that is used to store the data that is routed out
splitter: Callable[[Any], tuple[Any, Any | None]]
The function that is used to determine which part of the input data,
if any, is to be yielded to the consumer and which data is to
be stored in the list ``data_store``.
The function is called for each item of
the input iterable with the item as sole argument. It should return a
tuple of two elements. If the first element is not
``datalad_next.itertools.StoreOnly``, it is yielded to the consumer.
If the first element is ``datalad_next.itertools.StoreOnly``,
nothing is yielded to the consumer. The second element is stored in the
list ``data_store``.
The cardinality of ``data_store`` will be the same as the cardinality of
the input iterable.
"""
for item in iterable:
data_to_process, data_to_store = splitter(item)
data_store.append((data_to_process, data_to_store))
if data_to_process is not StoreOnly:
yield data_to_process
[docs]
def route_in(iterable: Iterable,
data_store: list,
joiner: Callable[[Any, Any], Any]
) -> Generator:
""" Yield previously rerouted data to the consumer
This function is the counter-part to :func:`route_out`. It takes the iterable
``iterable`` and a data store given in ``data_store`` and yields items
in the same order in which :func:`route_out` received them from its
underlying iterable (using the same data store). This includes items that
were not yielded by :func:`route_out`, but only stored.
:func:`route_in` uses :func:`joiner`-function to determine how stored and
optionally processed data should be joined into a single item, which is
then yielded by :func:`route_in`.
:func:`route_in` calls :func:`joiner` with a 2-tuple. The first
element of the tuple is either ``datalad_next.itertools.StoreOnly`` or the
next item from the underlying iterator. The second element is the data
that was stored in the data store. The result of :func:`joiner` which will
be yielded by :func:`route_in`.
This module provides a standard joiner-function: :func:`join_with_list`
that works with splitter-functions that return a list as second element of
the result tuple.
The cardinality of ``iterable`` must match the number of processed data
elements in the data store. The output cardinality of :func:`route_in` will
be the cardinality of the input iterable of the corresponding
:func:`route_out`-call. Given the following code:
.. code-block:: python
store_1 = list()
route_in(
some_generator(
route_out(input_iterable, store_1, splitter_1)
),
store_1,
joiner_1
)
:func:`route_in` will yield the same number of elements as ``input_iterable``.
But, the number of elements processed by ``some_generator`` is determined by
the :func:`splitter_1` in :func:`route_out`, i.e. by the number of
:func:`splitter_1`-results that have don't have
``datalad_next.itertools.don_process`` as first element.
Parameters
----------
iterable: Iterable
The iterable that yields the input data.
data_store: list
The list from which the data that is to be "routed in" is read.
joiner: Callable[[Any, Any], Any]
A function that determines how the items that are yielded by
``iterable`` should be combined with the corresponding data from
``data_store``, in order to yield the final result.
The first argument to ``joiner`` is the item that is yielded by
``iterable``, or ``datalad_next.itertools.StoreOnly`` if no data
was processed in the corresponding step. The second argument is the
data that was stored in ``data_store`` in the corresponding step.
"""
for element in iterable:
processed, stored = data_store.pop(0)
# yield stored-only content until we find an item that was processed
while processed is StoreOnly:
yield joiner(processed, stored)
processed, stored = data_store.pop(0)
yield joiner(element, stored)
# we reached the end of the incoming iterable.
# this means that we must not find any remaining items in `data_store`
# that indicate that they would have a corresponding item in the
# iterable (processed is not StoreOnly)
for processed, stored in data_store:
assert processed is StoreOnly, \
"iterable did not yield matching item for route-in item, cardinality mismatch?"
yield joiner(processed, stored)
# rather than pop() in the last loop, we just yielded from the list
# now this information is no longer needed
del data_store[:]