More Examples#
While some of the examples below are meant to demonstrate the features
of the async-graph-data-flow
library,
others (e.g., running a synchronous function, sharing state across functions)
show what could be possible – we aren’t limited by the implemented features!
If you have questions or would like to contribute an example of your own,
please open an issue at the
GitHub repository
for discussion.
Customizable Start Nodes#
When AsyncExecutor
’s execute()
is called,
by default the start nodes (= nodes without incoming edges) of the graph are automatically detected and called.
To override this default behavior,
use the keyword argument start_nodes
at execute()
to select which nodes to execute instead and/or supply the arguments to whichever nodes you’ve selected.
import logging
from async_graph_data_flow import AsyncExecutor, AsyncGraph
async def node1():
yield "hello"
yield "world"
async def node2(data):
print(f"Transformer received: {data}")
yield "data1"
yield "data2"
async def node3(data):
print(f"Data in Load: {data}")
yield
if __name__ == "__main__":
logging.basicConfig(level="INFO")
etl_graph = AsyncGraph()
etl_graph.add_node(node1)
etl_graph.add_node(node2)
etl_graph.add_node(node3)
etl_graph.add_edge("node1", "node2")
etl_graph.add_edge("node2", "node3")
print(f"Graph: {etl_graph.nodes_to_edges}")
etl_executor = AsyncExecutor(etl_graph)
etl_executor.turn_on_data_flow_logging()
etl_executor.execute(start_nodes={"node2": ("new_data",)})
# Output:
# -------
# Graph: {'node1': {'node2'}, 'node2': {'node3'}, 'node3': set()}
# Transformer received: new_data
# Data in Load: data1
# Data in Load: data2
# INFO:async_graph_data_flow.executor: node1 - in=0, out=0, err=0
# INFO:async_graph_data_flow.executor: node2 - in=0, out=2, err=0
# INFO:async_graph_data_flow.executor: node3 - in=2, out=2, err=0
from async_graph_data_flow import AsyncExecutor, AsyncGraph
async def node1(data):
print(f"node1 received: {data}")
yield data
yield "hello"
yield "world"
async def node2(data):
print(f"node2 received: {data}")
yield data
yield "data1"
yield "data2"
async def node3(data):
print(f"node3 received: {data}")
yield
if __name__ == "__main__":
graph = AsyncGraph()
graph.add_node(node1)
graph.add_node(node2)
graph.add_node(node3)
graph.add_edge("node1", "node3")
graph.add_edge("node2", "node3")
print(f"Graph: {graph.nodes_to_edges}")
executor = AsyncExecutor(graph)
executor.execute(start_nodes={"node1": ("foo",), "node2": ("bar",)})
# Output:
# -------
# Graph: {'node1': {'node3'}, 'node2': {'node3'}, 'node3': set()}
# node1 received: foo
# node2 received: bar
# node3 received: foo
# node3 received: hello
# node3 received: world
# node3 received: bar
# node3 received: data1
# node3 received: data2
Graph with Nodes only and No Edges#
It is possible to execute a graph with nodes only and no edges.
In this case,
if start_nodes
at execute()
are not specified (see above),
then all nodes are start nodes (because all nodes don’t have an incoming edge),
and therefore all nodes will execute concurrently upon graph execution.
The start nodes and their arguments at the beginning of the graph execution are available at
start_nodes
of execute()
.
import asyncio
import time
from async_graph_data_flow import AsyncExecutor, AsyncGraph
async def node1():
print("from node1")
await asyncio.sleep(2)
yield
async def node2():
print("from node2")
await asyncio.sleep(2)
yield
async def node3():
print("from node3")
await asyncio.sleep(2)
yield
if __name__ == "__main__":
graph = AsyncGraph()
graph.add_node(node1)
graph.add_node(node2)
graph.add_node(node3)
# Nodes are added, but there are no edges.
print(f"Graph: {graph.nodes_to_edges}")
executor = AsyncExecutor(graph)
t1 = time.time()
executor.execute()
t2 = time.time()
print("execution time:", t2 - t1)
print("start nodes:", executor.start_nodes)
# Output:
# -------
# Graph: {'node1': set(), 'node2': set(), 'node3': set()}
# from node1
# from node2
# from node3
# execution time: 2.0039637088775635
# start nodes: {'node3': (), 'node1': (), 'node2': ()}
Data Flow Statistics and Logging#
AsyncExecutor
’s data_flow_stats
keeps track of data volumes and errors encountered at each node.
data_flow_stats
is a dictionary
where a key is the name of a node,
and its value is itself a dict that maps {"in", "out", "err"}
to the counts of data items coming into the node,
going out of the node,
and unhandled errors from the node, respectively.
For a long-running graph execution,
it is helpful to log such data flow information at a regular time interval.
An AsyncExecutor
instance has
the method turn_on_data_flow_logging()
,
which you can call to turn on and configure logging.
import asyncio
import logging
from async_graph_data_flow import AsyncExecutor, AsyncGraph
async def extract():
for i in range(0, 3):
await asyncio.sleep(3)
yield "hello"
async def transform(data):
print(f"Transformer received: {data}")
# await asyncio.sleep(3)
yield str.title(data)
async def load(data):
print(f"Data in Load: {data}")
yield
if __name__ == "__main__":
logging.basicConfig(level="INFO")
etl_graph = AsyncGraph()
etl_graph.add_node(extract, name="extract", max_tasks=1, queue_size=1_000)
etl_graph.add_node(transform)
etl_graph.add_node(load)
etl_graph.add_edge("extract", "transform")
etl_graph.add_edge("transform", "load")
print(f"Graph: {etl_graph.nodes_to_edges}")
etl_executor = AsyncExecutor(etl_graph)
etl_executor.turn_on_data_flow_logging(
time_interval=3, node_filter=["transform", "load"]
)
etl_executor.execute()
# Output:
# -------
# Graph: {'extract': {'transform'}, 'transform': {'load'}, 'load': set()}
# Transformer received: hello
# Data in Load: Hello
# INFO:async_graph_data_flow.executor: transform - in=1, out=1, err=0
# INFO:async_graph_data_flow.executor: load - in=1, out=0, err=0
# INFO:async_graph_data_flow.executor: transform - in=2, out=2, err=0
# INFO:async_graph_data_flow.executor: load - in=2, out=1, err=0
# Transformer received: hello
# Data in Load: Hello
# Transformer received: hello
# Data in Load: Hello
# INFO:async_graph_data_flow.executor: transform - in=3, out=2, err=0
# INFO:async_graph_data_flow.executor: load - in=2, out=2, err=0
# INFO:async_graph_data_flow.executor: transform - in=3, out=3, err=0
# INFO:async_graph_data_flow.executor: load - in=3, out=3, err=0
Concurrent Tasks Per Node#
By default, each node creates one task at a time.
To spawn multiple, concurrent tasks from a node,
set max_tasks
at add_node()
.
import asyncio
import time
from async_graph_data_flow import AsyncExecutor, AsyncGraph
async def node1():
for i in range(5):
yield i
async def node2(i):
await asyncio.sleep(2)
print(f"node2 received {i}")
yield
if __name__ == "__main__":
for max_tasks in (1, 5):
print(f"max_tasks: {max_tasks}")
graph = AsyncGraph()
graph.add_node(node1)
graph.add_node(
node2,
max_tasks=max_tasks,
)
graph.add_edge("node1", "node2")
print(f"Graph: {graph.nodes_to_edges}")
executor = AsyncExecutor(graph)
t1 = time.time()
executor.execute()
t2 = time.time()
print(f"execution time: {t2 - t1}")
print()
# Output:
# -------
# max_tasks: 1
# Graph: {'node1': {'node2'}, 'node2': set()}
# node2 received 0
# node2 received 1
# node2 received 2
# node2 received 3
# node2 received 4
# execution time: 10.004492044448853
#
# max_tasks: 5
# Graph: {'node1': {'node2'}, 'node2': set()}
# node2 received 0
# node2 received 1
# node2 received 2
# node2 received 3
# node2 received 4
# execution time: 2.001836061477661
Halting Graph Execution upon Exceptions#
By default, an unhandled exception from any node does not halt the graph execution. This behavior can be altered in two different ways:
To halt execution at any node, set
halt_on_exception
toTrue
when initializing anAsyncGraph
instance.To halt execution at a specific node, set
halt_on_exception
toTrue
when usingadd_node()
to add the node in question to the graph.
import asyncio
from async_graph_data_flow import AsyncExecutor, AsyncGraph
async def extract():
yield "hello"
yield "world"
yield "civis"
yield "analytics"
yield "done"
async def transform(data):
# print(f"Transformer: {data}")
await asyncio.sleep(3)
yield str.title(data)
async def load(data):
print(f"Load: {data}")
if data in ["World", "Analytics"]:
await asyncio.sleep(0)
raise Exception(f"intentional error for {data}")
yield data
async def output(data):
print("output:", data)
yield
if __name__ == "__main__":
etl_graph = AsyncGraph()
etl_graph.add_node(extract, name="extract", max_tasks=1, queue_size=1_000)
etl_graph.add_node(transform)
etl_graph.add_node(load, halt_on_exception=True)
etl_graph.add_node(output)
etl_graph.add_edge("extract", "transform")
etl_graph.add_edge("transform", "load")
etl_graph.add_edge("load", "output")
print(f"Graph: {etl_graph.nodes_to_edges}")
AsyncExecutor(etl_graph).execute()
# Output:
# -------
# Graph: {
# 'extract': {'transform'},
# 'transform': {'load'},
# 'load': {'output'},
# 'output': set(),
# }
# Load: Hello
# output: Hello
# Load: World
# Traceback (most recent call last):
# File "/path/to/src/async_graph_data_flow/executor.py", line 188, in _consumer
# next_data_item = await anext(coro)
# File "/path/examples/halt_on_exception_at_a_specific_node.py", line 25, in load
# raise Exception(f"intentional error for {data}")
# Exception: intentional error for World
#
# Pipeline execution halted due to an exception in load node
import asyncio
from async_graph_data_flow import AsyncExecutor, AsyncGraph
async def extract():
yield "hello"
yield "world"
yield "civis"
yield "analytics"
yield "done"
async def transform(data):
# print(f"Transformer: {data}")
await asyncio.sleep(3)
yield str.title(data)
async def load(data):
print(f"Load: {data}")
if data in ["World", "Analytics"]:
await asyncio.sleep(0)
raise Exception(f"intentional error for {data}")
yield data
async def output(data):
print("output:", data)
yield
if __name__ == "__main__":
etl_graph = AsyncGraph(halt_on_exception=True)
etl_graph.add_node(extract, name="extract", max_tasks=1, queue_size=1_000)
etl_graph.add_node(transform)
etl_graph.add_node(load)
etl_graph.add_node(output)
etl_graph.add_edge("extract", "transform")
etl_graph.add_edge("transform", "load")
etl_graph.add_edge("load", "output")
print(f"Graph: {etl_graph.nodes_to_edges}")
AsyncExecutor(etl_graph).execute()
# Output:
# -------
# Graph: {
# 'extract': {'transform'},
# 'transform': {'load'},
# 'load': {'output'},
# 'output': set(),
# }
# Load: Hello
# output: Hello
# Load: World
# Traceback (most recent call last):
# File "/path/to/src/async_graph_data_flow/executor.py", line 188, in _consumer
# next_data_item = await anext(coro)
# File "/path/to/examples/halt_on_exception_at_any_node.py", line 25, in load
# raise Exception(f"intentional error for {data}")
# Exception: intentional error for World
#
# Pipeline execution halted due to an exception in load node
Accessing and Raising an Exception#
While it’s possible to halt the graph execution due to unhandled exceptions
(see Halting Graph Execution upon Exceptions),
these exceptions are not raised from within the execute()
call.
Instead, AsyncExecutor
’s exceptions
allows access to the exceptions from the nodes,
and you can determine what to do with this information
(e.g., raise an exception on your own).
from async_graph_data_flow import AsyncExecutor, AsyncGraph
async def node1():
yield "hello"
yield "world"
async def node2(data):
raise ValueError(f"bad data: {data}")
yield
if __name__ == "__main__":
etl_graph = AsyncGraph()
etl_graph.add_node(node1)
etl_graph.add_node(node2)
etl_graph.add_edge("node1", "node2")
print(f"Graph: {etl_graph.nodes_to_edges}")
executor = AsyncExecutor(etl_graph)
executor.execute()
if any(excs := executor.exceptions.values()):
for exc in excs:
print(f"Exceptions: {exc}")
raise RuntimeError("oh no! something went wrong in the graph execution")
# Output:
# -------
# Graph: {'node1': {'node2'}, 'node2': set()}
#
# ... (logging for the unhandled exceptions from the nodes,
# without actually raising the exceptions)
#
# Exceptions: []
# Exceptions: [ValueError('bad data: hello'), ValueError('bad data: world')]
# Traceback (most recent call last):
# File "/path/to/examples/raising_an_exception.py", line 28, in <module>
# raise RuntimeError("oh no! something went wrong in the graph execution")
# RuntimeError: oh no! something went wrong in the graph execution
Incorporating a Synchronous Function#
async-graph-data-flow
is for asynchronous functions by design,
but what if you need to run a synchronous function?
Inside a node’s async function,
you may grab the asyncio’s running loop,
then call the synchronous function with this loop.
import asyncio
import time
from async_graph_data_flow import AsyncExecutor, AsyncGraph
async def extract():
yield "hello"
yield "world"
async def transform(data):
print(f"Transformer received: {data}")
await asyncio.sleep(3)
yield str.title(data)
async def load1(data):
print(f"Data in Load1: {data}")
for i in range(0, 3):
await asyncio.sleep(4)
yield f"load1 {i}"
async def load2(data):
print(f"Data in Load2: {data}")
cloop = asyncio.get_running_loop()
for i in range(0, 3):
await cloop.run_in_executor(None, time.sleep, 4)
yield f"load2 {i}"
async def output(data):
print("output:", data)
yield
if __name__ == "__main__":
etl_graph = AsyncGraph()
etl_graph.add_node(extract, name="extract", max_tasks=1, queue_size=1_000)
etl_graph.add_node(transform)
etl_graph.add_node(load1)
etl_graph.add_node(load2)
etl_graph.add_node(output)
etl_graph.add_edge("extract", "transform")
etl_graph.add_edge("transform", "load1")
etl_graph.add_edge("transform", "load2")
etl_graph.add_edge("load1", "output")
etl_graph.add_edge("load2", "output")
print(f"Graph: {etl_graph.nodes_to_edges}")
AsyncExecutor(etl_graph).execute()
# Output:
# -------
# Graph: {
# 'extract': {'transform'},
# 'transform': {'load1', 'load2'},
# 'load1': {'output'},
# 'load2': {'output'},
# 'output': set(),
# }
# Transformer received: hello
# Transformer received: world
# Data in Load1: Hello
# Data in Load2: Hello
# output: load1 0
# output: load2 0
# output: load1 1
# output: load2 1
# Data in Load1: World
# output: load1 2
# Data in Load2: World
# output: load2 2
# output: load1 0
# output: load2 0
# output: load1 1
# output: load2 1
# output: load1 2
# output: load2 2