API Reference#

class async_graph_data_flow.AsyncGraph(halt_on_exception: bool = False)#
Attributes:
edges

The set of edges, each with the names of the source and destination nodes.

nodes

The list of nodes, each with its function and configurations.

nodes_to_edges

The mapping between source nodes and their destination nodes.

Methods

add_edge(src_node, dst_node)

Add an edge.

add_node(func, *[, name, halt_on_exception, ...])

Add a node by providing its function and optional configurations.

__init__(halt_on_exception: bool = False) None#

Initialize a graph.

Parameters:
halt_on_exceptionbool, optional

To halt graph execution when any node has an unhandled exception, set this argument to True. Defaults to False.

add_edge(src_node: str | Callable, dst_node: str | Callable) None#

Add an edge.

Parameters:
src_nodestr | Callable

The source node, either the function name or the function itself.

dst_nodestr | Callable

The destination node, either the function name or the function itself.

add_node(func: Callable, *, name: str | None = None, halt_on_exception: bool = False, unpack_input: bool = True, max_tasks: int = 1, queue_size: int = 10000, check_async_gen: bool = True) None#

Add a node by providing its function and optional configurations.

Parameters:
funcCallable

The function that this node runs. See notes below for the function’s requirements.

namestr, optional

The name of this node. If not provided, the __name__ attribute func is used.

halt_on_exceptionbool, optional

To halt graph execution when this node has an unhandled exception, set this argument to True. Defaults to False.

unpack_inputbool, optional

By default (i.e., unpack_input is True), func with arguments yielded from a source node is called as either func(*args) or func(**kwargs). To call func(arg) with no unpacking, set unpack_input to False. See notes below for more details.

max_tasksint, optional

The number of tasks that this node runs concurrently.

queue_sizeint, optional

The maximum number of data items allowed to be in the Queue object between this node as a source and its destination node(s).

check_async_genbool, optional

If True (the default), the callable func is verified to be an async generator function by inspect.isasyncgenfunction(). Pass in False to disable this check if func would fail the check while the callable under the hood is still an async generator function (e.g., your function is wrapped by a decorator).

Notes

These details concern the requirements of the node’s function and the keyword argument unpack_input.

  • Each function in the graph must be an asynchronous generator function, i.e., it’s defined by async def and it yields.

  • Each function can have any signature, with no arguments or with any valid argument types (those listed here). This being said, because the functions are connected as a graph, in a given edge the destination function must be able to correctly handle whatever is yielded by the source function. Specifically:

    • If the destination function takes no args (i.e., async def dest_func():  # no input args), then it is called as dest_func() with no args, regardless of what the source function yields.

    • If the destination function can take args, and if the source function yields a tuple, then the tuple args will be unpacked and dest_func(*args) will be called.

    • If the source function yields a dict instead, then dest_func(**args) will be called.

    • In case the destination function requires more input values than are available from the unpacked args (from either a tuple or dict), a TypeError is raised.

    • If the source function yields a tuple or dict, and for whatever reason you do not want to unpack it, set unpack_input to False at add_node() when constructing the AsyncGraph object.

    • If the destination function can take args, and if the source function yields an object that is neither a tuple nor a dict, then the destination function is simply called with the object obj as dest_func(obj). Note how you may take advantage of this behavior. For instance, when your destination function has keyword arguments (e.g., async def dest_func(a, b=..., c=...):  # b and c are kwargs) calling dest_func(a) is valid with a value for a yielded from the source function, and both b and c have their own default values.

property edges: set[tuple[str, str]]#

The set of edges, each with the names of the source and destination nodes.

property nodes: list[dict[str, Any]]#

The list of nodes, each with its function and configurations.

property nodes_to_edges: dict[str, set[str]]#

The mapping between source nodes and their destination nodes.

class async_graph_data_flow.AsyncExecutor(graph: AsyncGraph, *, logger: Logger = None, max_exceptions: int = 1000)#
Attributes:
data_flow_stats

Data flow statistics.

exceptions

Exceptions from the graph execution.

graph

The graph to execute.

start_nodes

Start nodes and their arguments.

Methods

execute([start_nodes])

Start executing the functions along the graph.

turn_off_data_flow_logging()

Turn off data flow logging.

turn_on_data_flow_logging([node_format, ...])

Turn on and configure data flow logging.

__init__(graph: AsyncGraph, *, logger: Logger = None, max_exceptions: int = 1000)#

Initialize an executor.

Parameters:
graphAsyncGraph
loggerlogging.Logger, optional

Provide a logger for any customization. If not provided, a generic logging.getLogger(__name__) is used.

max_exceptionsint, optional

The maximum number of unhandled exceptions to keep track of at each node. If the number of exceptions at a node exceeds this threshold, only the most recent exceptions are kept. See also exceptions.

property data_flow_stats: dict[str, dict[str, int]] | None#

Data flow statistics.

These statistics keep track of (i) the number of times data has passed into each node, (ii) the number of times data has come out of each node, and (iii) the number of errors each node has had. The key is a node by name (str), and the value is a dict with three keys (str) of "in", "out", and "err", each corresponding to its count (int).

property exceptions: dict[str, list[Exception]] | None#

Exceptions from the graph execution.

The key is a node by name (str), and the value is the list of exceptions raised from the node.

execute(start_nodes: dict[str, tuple] = None) None#

Start executing the functions along the graph.

Parameters:
start_nodesdict[str, tuple], optional

Specify the start node(s), and optionally their arguments. Each key in this dictionary is the name (str) of the node function, and its corresponding value is the args (tuple) (in which case the node function will be called as func(*args) – provide None if you want func() with no args). If start_nodes is None or isn’t provided, nodes that have no incoming edges are treated as start nodes.

property graph: AsyncGraph#

The graph to execute.

property start_nodes: dict[str, tuple]#

Start nodes and their arguments.

This is a dictionary that maps each start node (str) to its arguments to be passed in when the graph execution begins.

turn_off_data_flow_logging()#

Turn off data flow logging.

turn_on_data_flow_logging(node_format: str | None = None, node_filter: Iterable[str] | None = None, time_interval: int | None = None, logger: Logger | None = None)#

Turn on and configure data flow logging.

For a long-running graph execution, it is helpful to log such data flow information at a regular time interval.

Parameters:
node_formatstr, optional

Logging format for each node’s statistics of (i) the number of times data has passed in the node, (ii) the number of times data has come out of the node, and (iii) the number of errors the node has had. If not provided, the default is " {node} - in={in}, out={out}, err={err}".

node_filter: Iterable[str], optional

Filter to see logs from only the specified nodes by name. If not provided, all nodes’ statistics will be logged.

time_interval: int, optional

Time interval in seconds between data flow logs. If not provided, the default is 60 seconds.

loggerlogging.Logger, optional

Provide a logger for any customization. If not provided, a generic logging.getLogger(__name__) is used.

class async_graph_data_flow.graph.InvalidAsyncGraphError#
Attributes:
args

Methods

add_note

Exception.add_note(note) -- add a note to the exception

with_traceback

Exception.with_traceback(tb) -- set self.__traceback__ to tb and return self.