More Examples#

While some of the examples below are meant to demonstrate the features of the async-graph-data-flow library, others (e.g., running a synchronous function, sharing state across functions) show what could be possible – we aren’t limited by the implemented features! If you have questions or would like to contribute an example of your own, please open an issue at the GitHub repository for discussion.

Customizable Start Nodes#

When AsyncExecutor’s execute() is called, by default the start nodes (= nodes without incoming edges) of the graph are automatically detected and called. To override this default behavior, use the keyword argument start_nodes at execute() to select which nodes to execute instead and/or supply the arguments to whichever nodes you’ve selected.

import logging

from async_graph_data_flow import AsyncExecutor, AsyncGraph


async def node1():
    yield "hello"
    yield "world"


async def node2(data):
    print(f"Transformer received: {data}")
    yield "data1"
    yield "data2"


async def node3(data):
    print(f"Data in Load: {data}")
    yield


if __name__ == "__main__":
    logging.basicConfig(level="INFO")
    etl_graph = AsyncGraph()
    etl_graph.add_node(node1)
    etl_graph.add_node(node2)
    etl_graph.add_node(node3)
    etl_graph.add_edge("node1", "node2")
    etl_graph.add_edge("node2", "node3")

    print(f"Graph: {etl_graph.nodes_to_edges}")
    etl_executor = AsyncExecutor(etl_graph)
    etl_executor.turn_on_data_flow_logging()
    etl_executor.execute(start_nodes={"node2": ("new_data",)})

    # Output:
    # -------
    # Graph: {'node1': {'node2'}, 'node2': {'node3'}, 'node3': set()}
    # Transformer received: new_data
    # Data in Load: data1
    # Data in Load: data2
    # INFO:async_graph_data_flow.executor: node1 - in=0, out=0, err=0
    # INFO:async_graph_data_flow.executor: node2 - in=0, out=2, err=0
    # INFO:async_graph_data_flow.executor: node3 - in=2, out=2, err=0
from async_graph_data_flow import AsyncExecutor, AsyncGraph


async def node1(data):
    print(f"node1 received: {data}")
    yield data
    yield "hello"
    yield "world"


async def node2(data):
    print(f"node2 received: {data}")
    yield data
    yield "data1"
    yield "data2"


async def node3(data):
    print(f"node3 received: {data}")
    yield


if __name__ == "__main__":
    graph = AsyncGraph()
    graph.add_node(node1)
    graph.add_node(node2)
    graph.add_node(node3)
    graph.add_edge("node1", "node3")
    graph.add_edge("node2", "node3")

    print(f"Graph: {graph.nodes_to_edges}")

    executor = AsyncExecutor(graph)
    executor.execute(start_nodes={"node1": ("foo",), "node2": ("bar",)})

    # Output:
    # -------
    # Graph: {'node1': {'node3'}, 'node2': {'node3'}, 'node3': set()}
    # node1 received: foo
    # node2 received: bar
    # node3 received: foo
    # node3 received: hello
    # node3 received: world
    # node3 received: bar
    # node3 received: data1
    # node3 received: data2

Graph with Nodes only and No Edges#

It is possible to execute a graph with nodes only and no edges. In this case, if start_nodes at execute() are not specified (see above), then all nodes are start nodes (because all nodes don’t have an incoming edge), and therefore all nodes will execute concurrently upon graph execution. The start nodes and their arguments at the beginning of the graph execution are available at start_nodes of execute().

import asyncio
import time

from async_graph_data_flow import AsyncExecutor, AsyncGraph


async def node1():
    print("from node1")
    await asyncio.sleep(2)
    yield


async def node2():
    print("from node2")
    await asyncio.sleep(2)
    yield


async def node3():
    print("from node3")
    await asyncio.sleep(2)
    yield


if __name__ == "__main__":
    graph = AsyncGraph()
    graph.add_node(node1)
    graph.add_node(node2)
    graph.add_node(node3)
    # Nodes are added, but there are no edges.

    print(f"Graph: {graph.nodes_to_edges}")
    executor = AsyncExecutor(graph)

    t1 = time.time()
    executor.execute()
    t2 = time.time()
    print("execution time:", t2 - t1)
    print("start nodes:", executor.start_nodes)

    # Output:
    # -------
    # Graph: {'node1': set(), 'node2': set(), 'node3': set()}
    # from node1
    # from node2
    # from node3
    # execution time: 2.0039637088775635
    # start nodes: {'node3': (), 'node1': (), 'node2': ()}

Data Flow Statistics and Logging#

AsyncExecutor’s data_flow_stats keeps track of data volumes and errors encountered at each node. data_flow_stats is a dictionary where a key is the name of a node, and its value is itself a dict that maps {"in", "out", "err"} to the counts of data items coming into the node, going out of the node, and unhandled errors from the node, respectively.

For a long-running graph execution, it is helpful to log such data flow information at a regular time interval. An AsyncExecutor instance has the method turn_on_data_flow_logging(), which you can call to turn on and configure logging.

import asyncio
import logging

from async_graph_data_flow import AsyncExecutor, AsyncGraph


async def extract():
    for i in range(0, 3):
        await asyncio.sleep(3)
        yield "hello"


async def transform(data):
    print(f"Transformer received: {data}")
    # await asyncio.sleep(3)
    yield str.title(data)


async def load(data):
    print(f"Data in Load: {data}")
    yield


if __name__ == "__main__":
    logging.basicConfig(level="INFO")
    etl_graph = AsyncGraph()
    etl_graph.add_node(extract, name="extract", max_tasks=1, queue_size=1_000)
    etl_graph.add_node(transform)
    etl_graph.add_node(load)
    etl_graph.add_edge("extract", "transform")
    etl_graph.add_edge("transform", "load")

    print(f"Graph: {etl_graph.nodes_to_edges}")
    etl_executor = AsyncExecutor(etl_graph)

    etl_executor.turn_on_data_flow_logging(
        time_interval=3, node_filter=["transform", "load"]
    )
    etl_executor.execute()

    # Output:
    # -------
    # Graph: {'extract': {'transform'}, 'transform': {'load'}, 'load': set()}
    # Transformer received: hello
    # Data in Load: Hello
    # INFO:async_graph_data_flow.executor: transform - in=1, out=1, err=0
    # INFO:async_graph_data_flow.executor: load - in=1, out=0, err=0
    # INFO:async_graph_data_flow.executor: transform - in=2, out=2, err=0
    # INFO:async_graph_data_flow.executor: load - in=2, out=1, err=0
    # Transformer received: hello
    # Data in Load: Hello
    # Transformer received: hello
    # Data in Load: Hello
    # INFO:async_graph_data_flow.executor: transform - in=3, out=2, err=0
    # INFO:async_graph_data_flow.executor: load - in=2, out=2, err=0
    # INFO:async_graph_data_flow.executor: transform - in=3, out=3, err=0
    # INFO:async_graph_data_flow.executor: load - in=3, out=3, err=0

Concurrent Tasks Per Node#

By default, each node creates one task at a time. To spawn multiple, concurrent tasks from a node, set max_tasks at add_node().

import asyncio
import time

from async_graph_data_flow import AsyncExecutor, AsyncGraph


async def node1():
    for i in range(5):
        yield i


async def node2(i):
    await asyncio.sleep(2)
    print(f"node2 received {i}")
    yield


if __name__ == "__main__":
    for max_tasks in (1, 5):
        print(f"max_tasks: {max_tasks}")
        graph = AsyncGraph()
        graph.add_node(node1)
        graph.add_node(
            node2,
            max_tasks=max_tasks,
        )
        graph.add_edge("node1", "node2")

        print(f"Graph: {graph.nodes_to_edges}")
        executor = AsyncExecutor(graph)

        t1 = time.time()
        executor.execute()
        t2 = time.time()
        print(f"execution time: {t2 - t1}")
        print()

        # Output:
        # -------
        # max_tasks: 1
        # Graph: {'node1': {'node2'}, 'node2': set()}
        # node2 received 0
        # node2 received 1
        # node2 received 2
        # node2 received 3
        # node2 received 4
        # execution time: 10.004492044448853
        #
        # max_tasks: 5
        # Graph: {'node1': {'node2'}, 'node2': set()}
        # node2 received 0
        # node2 received 1
        # node2 received 2
        # node2 received 3
        # node2 received 4
        # execution time: 2.001836061477661

Halting Graph Execution upon Exceptions#

By default, an unhandled exception from any node does not halt the graph execution. This behavior can be altered in two different ways:

  • To halt execution at any node, set halt_on_exception to True when initializing an AsyncGraph instance.

  • To halt execution at a specific node, set halt_on_exception to True when using add_node() to add the node in question to the graph.

import asyncio

from async_graph_data_flow import AsyncExecutor, AsyncGraph


async def extract():
    yield "hello"
    yield "world"
    yield "civis"
    yield "analytics"
    yield "done"


async def transform(data):
    # print(f"Transformer: {data}")
    await asyncio.sleep(3)
    yield str.title(data)


async def load(data):
    print(f"Load: {data}")

    if data in ["World", "Analytics"]:
        await asyncio.sleep(0)
        raise Exception(f"intentional error for {data}")

    yield data


async def output(data):
    print("output:", data)
    yield


if __name__ == "__main__":
    etl_graph = AsyncGraph()
    etl_graph.add_node(extract, name="extract", max_tasks=1, queue_size=1_000)
    etl_graph.add_node(transform)
    etl_graph.add_node(load, halt_on_exception=True)
    etl_graph.add_node(output)
    etl_graph.add_edge("extract", "transform")
    etl_graph.add_edge("transform", "load")
    etl_graph.add_edge("load", "output")

    print(f"Graph: {etl_graph.nodes_to_edges}")
    AsyncExecutor(etl_graph).execute()

    # Output:
    # -------
    # Graph: {
    #     'extract': {'transform'},
    #     'transform': {'load'},
    #     'load': {'output'},
    #     'output': set(),
    # }
    # Load: Hello
    # output: Hello
    # Load: World
    # Traceback (most recent call last):
    #   File "/path/to/src/async_graph_data_flow/executor.py", line 188, in _consumer
    #     next_data_item = await anext(coro)
    #   File "/path/examples/halt_on_exception_at_a_specific_node.py", line 25, in load
    #     raise Exception(f"intentional error for {data}")
    # Exception: intentional error for World
    #
    # Pipeline execution halted due to an exception in load node
import asyncio

from async_graph_data_flow import AsyncExecutor, AsyncGraph


async def extract():
    yield "hello"
    yield "world"
    yield "civis"
    yield "analytics"
    yield "done"


async def transform(data):
    # print(f"Transformer: {data}")
    await asyncio.sleep(3)
    yield str.title(data)


async def load(data):
    print(f"Load: {data}")

    if data in ["World", "Analytics"]:
        await asyncio.sleep(0)
        raise Exception(f"intentional error for {data}")

    yield data


async def output(data):
    print("output:", data)
    yield


if __name__ == "__main__":
    etl_graph = AsyncGraph(halt_on_exception=True)
    etl_graph.add_node(extract, name="extract", max_tasks=1, queue_size=1_000)
    etl_graph.add_node(transform)
    etl_graph.add_node(load)
    etl_graph.add_node(output)
    etl_graph.add_edge("extract", "transform")
    etl_graph.add_edge("transform", "load")
    etl_graph.add_edge("load", "output")

    print(f"Graph: {etl_graph.nodes_to_edges}")
    AsyncExecutor(etl_graph).execute()

    # Output:
    # -------
    # Graph: {
    #     'extract': {'transform'},
    #     'transform': {'load'},
    #     'load': {'output'},
    #     'output': set(),
    # }
    # Load: Hello
    # output: Hello
    # Load: World
    # Traceback (most recent call last):
    #   File "/path/to/src/async_graph_data_flow/executor.py", line 188, in _consumer
    #     next_data_item = await anext(coro)
    #   File "/path/to/examples/halt_on_exception_at_any_node.py", line 25, in load
    #     raise Exception(f"intentional error for {data}")
    # Exception: intentional error for World
    #
    # Pipeline execution halted due to an exception in load node

Accessing and Raising an Exception#

While it’s possible to halt the graph execution due to unhandled exceptions (see Halting Graph Execution upon Exceptions), these exceptions are not raised from within the execute() call. Instead, AsyncExecutor’s exceptions allows access to the exceptions from the nodes, and you can determine what to do with this information (e.g., raise an exception on your own).

from async_graph_data_flow import AsyncExecutor, AsyncGraph


async def node1():
    yield "hello"
    yield "world"


async def node2(data):
    raise ValueError(f"bad data: {data}")
    yield


if __name__ == "__main__":
    etl_graph = AsyncGraph()
    etl_graph.add_node(node1)
    etl_graph.add_node(node2)
    etl_graph.add_edge("node1", "node2")

    print(f"Graph: {etl_graph.nodes_to_edges}")
    executor = AsyncExecutor(etl_graph)
    executor.execute()

    if any(excs := executor.exceptions.values()):
        for exc in excs:
            print(f"Exceptions: {exc}")
        raise RuntimeError("oh no! something went wrong in the graph execution")

    # Output:
    # -------
    # Graph: {'node1': {'node2'}, 'node2': set()}
    #
    # ... (logging for the unhandled exceptions from the nodes,
    #      without actually raising the exceptions)
    #
    # Exceptions: []
    # Exceptions: [ValueError('bad data: hello'), ValueError('bad data: world')]
    # Traceback (most recent call last):
    # File "/path/to/examples/raising_an_exception.py", line 28, in <module>
    #     raise RuntimeError("oh no! something went wrong in the graph execution")
    # RuntimeError: oh no! something went wrong in the graph execution

Incorporating a Synchronous Function#

async-graph-data-flow is for asynchronous functions by design, but what if you need to run a synchronous function? Inside a node’s async function, you may grab the asyncio’s running loop, then call the synchronous function with this loop.

import asyncio
import time

from async_graph_data_flow import AsyncExecutor, AsyncGraph


async def extract():
    yield "hello"
    yield "world"


async def transform(data):
    print(f"Transformer received: {data}")
    await asyncio.sleep(3)
    yield str.title(data)


async def load1(data):
    print(f"Data in Load1: {data}")
    for i in range(0, 3):
        await asyncio.sleep(4)
        yield f"load1 {i}"


async def load2(data):
    print(f"Data in Load2: {data}")
    cloop = asyncio.get_running_loop()
    for i in range(0, 3):
        await cloop.run_in_executor(None, time.sleep, 4)
        yield f"load2 {i}"


async def output(data):
    print("output:", data)
    yield


if __name__ == "__main__":
    etl_graph = AsyncGraph()
    etl_graph.add_node(extract, name="extract", max_tasks=1, queue_size=1_000)
    etl_graph.add_node(transform)
    etl_graph.add_node(load1)
    etl_graph.add_node(load2)
    etl_graph.add_node(output)
    etl_graph.add_edge("extract", "transform")
    etl_graph.add_edge("transform", "load1")
    etl_graph.add_edge("transform", "load2")
    etl_graph.add_edge("load1", "output")
    etl_graph.add_edge("load2", "output")

    print(f"Graph: {etl_graph.nodes_to_edges}")
    AsyncExecutor(etl_graph).execute()

    # Output:
    # -------
    # Graph: {
    #     'extract': {'transform'},
    #     'transform': {'load1', 'load2'},
    #     'load1': {'output'},
    #     'load2': {'output'},
    #     'output': set(),
    # }
    # Transformer received: hello
    # Transformer received: world
    # Data in Load1: Hello
    # Data in Load2: Hello
    # output: load1 0
    # output: load2 0
    # output: load1 1
    # output: load2 1
    # Data in Load1: World
    # output: load1 2
    # Data in Load2: World
    # output: load2 2
    # output: load1 0
    # output: load2 0
    # output: load1 1
    # output: load2 1
    # output: load1 2
    # output: load2 2

Shared State Across Asynchronous Functions#

Sharing state across the async functions is possible if you pass the same object around them. Such an object can be a custom class instance with methods and attributes as needed.

import asyncio

from async_graph_data_flow import AsyncExecutor, AsyncGraph


class SharedState:
    def __init__(self):
        self._dict = {}

    async def get(self, key):
        while True:
            try:
                return self._dict[key]
            except KeyError:
                await asyncio.sleep(0.001)

    async def set(self, key, value):
        self._dict[key] = value


async def node1(shared_state):
    await shared_state.set("current_data", "hello")
    yield shared_state
    await asyncio.sleep(2)

    await shared_state.set("current_data", "world")
    yield shared_state


async def node2(shared_state):
    current_data = await shared_state.get("current_data")
    print(f"node2 received: {current_data}")
    await asyncio.sleep(3)
    yield shared_state, current_data


if __name__ == "__main__":
    etl_graph = AsyncGraph()
    etl_graph.add_node(node1)
    etl_graph.add_node(node2)
    etl_graph.add_edge("node1", "node2")

    print(f"Graph: {etl_graph.nodes_to_edges}")
    shared_state = SharedState()
    AsyncExecutor(etl_graph).execute(start_nodes={"node1": (shared_state,)})

    # Output:
    # -------
    # Graph: {'node1': {'node2'}, 'node2': set()}
    # node2 received: hello
    # node2 received: world