API Reference#
- class async_graph_data_flow.AsyncGraph(halt_on_exception: bool = False)#
- Attributes:
edges
The set of edges, each with the names of the source and destination nodes.
nodes
The list of nodes, each with its function and configurations.
nodes_to_edges
The mapping between source nodes and their destination nodes.
Methods
add_edge
(src_node, dst_node)Add an edge.
add_node
(func, *[, name, halt_on_exception, ...])Add a node by providing its function and optional configurations.
- __init__(halt_on_exception: bool = False) None #
Initialize a graph.
- Parameters:
- halt_on_exceptionbool, optional
To halt graph execution when any node has an unhandled exception, set this argument to
True
. Defaults toFalse
.
- add_edge(src_node: str | Callable, dst_node: str | Callable) None #
Add an edge.
- Parameters:
- src_nodestr | Callable
The source node, either the function name or the function itself.
- dst_nodestr | Callable
The destination node, either the function name or the function itself.
- add_node(func: Callable, *, name: str | None = None, halt_on_exception: bool = False, unpack_input: bool = True, max_tasks: int = 1, queue_size: int = 10000, check_async_gen: bool = True) None #
Add a node by providing its function and optional configurations.
- Parameters:
- funcCallable
The function that this node runs. See notes below for the function’s requirements.
- namestr, optional
The name of this node. If not provided, the
__name__
attributefunc
is used.- halt_on_exceptionbool, optional
To halt graph execution when this node has an unhandled exception, set this argument to
True
. Defaults toFalse
.- unpack_inputbool, optional
By default (i.e.,
unpack_input
isTrue
),func
with arguments yielded from a source node is called as eitherfunc(*args)
orfunc(**kwargs)
. To callfunc(arg)
with no unpacking, setunpack_input
toFalse
. See notes below for more details.- max_tasksint, optional
The number of tasks that this node runs concurrently.
- queue_sizeint, optional
The maximum number of data items allowed to be in the
Queue
object between this node as a source and its destination node(s).- check_async_genbool, optional
If
True
(the default), the callablefunc
is verified to be an async generator function byinspect.isasyncgenfunction()
. Pass inFalse
to disable this check iffunc
would fail the check while the callable under the hood is still an async generator function (e.g., your function is wrapped by a decorator).
Notes
These details concern the requirements of the node’s function and the keyword argument
unpack_input
.Each function in the graph must be an asynchronous generator function, i.e., it’s defined by
async def
and it yields.Each function can have any signature, with no arguments or with any valid argument types (those listed here). This being said, because the functions are connected as a graph, in a given edge the destination function must be able to correctly handle whatever is yielded by the source function. Specifically:
If the destination function takes no args (i.e.,
async def dest_func(): # no input args
), then it is called as dest_func() with no args, regardless of what the source function yields.If the destination function can take args, and if the source function yields a tuple, then the tuple
args
will be unpacked and dest_func(*args) will be called.If the source function yields a dict instead, then dest_func(**args) will be called.
In case the destination function requires more input values than are available from the unpacked args (from either a tuple or dict), a
TypeError
is raised.If the source function yields a tuple or dict, and for whatever reason you do not want to unpack it, set
unpack_input
toFalse
atadd_node()
when constructing theAsyncGraph
object.If the destination function can take args, and if the source function yields an object that is neither a tuple nor a dict, then the destination function is simply called with the object
obj
as dest_func(obj). Note how you may take advantage of this behavior. For instance, when your destination function has keyword arguments (e.g.,async def dest_func(a, b=..., c=...): # b and c are kwargs
) callingdest_func(a)
is valid with a value fora
yielded from the source function, and bothb
andc
have their own default values.
- class async_graph_data_flow.AsyncExecutor(graph: AsyncGraph, *, logger: Logger = None, max_exceptions: int = 1000)#
- Attributes:
data_flow_stats
Data flow statistics.
exceptions
Exceptions from the graph execution.
graph
The graph to execute.
start_nodes
Start nodes and their arguments.
Methods
execute
([start_nodes])Start executing the functions along the graph.
Turn off data flow logging.
turn_on_data_flow_logging
([node_format, ...])Turn on and configure data flow logging.
- __init__(graph: AsyncGraph, *, logger: Logger = None, max_exceptions: int = 1000)#
Initialize an executor.
- Parameters:
- graphAsyncGraph
- loggerlogging.Logger, optional
Provide a logger for any customization. If not provided, a generic
logging.getLogger(__name__)
is used.- max_exceptionsint, optional
The maximum number of unhandled exceptions to keep track of at each node. If the number of exceptions at a node exceeds this threshold, only the most recent exceptions are kept. See also
exceptions
.
- property data_flow_stats: dict[str, dict[str, int]] | None#
Data flow statistics.
These statistics keep track of (i) the number of times data has passed into each node, (ii) the number of times data has come out of each node, and (iii) the number of errors each node has had. The key is a node by name (str), and the value is a dict with three keys (str) of
"in"
,"out"
, and"err"
, each corresponding to its count (int).
- property exceptions: dict[str, list[Exception]] | None#
Exceptions from the graph execution.
The key is a node by name (str), and the value is the list of exceptions raised from the node.
- execute(start_nodes: dict[str, tuple] = None) None #
Start executing the functions along the graph.
- Parameters:
- start_nodesdict[str, tuple], optional
Specify the start node(s), and optionally their arguments. Each key in this dictionary is the name (str) of the node function, and its corresponding value is the args (tuple) (in which case the node function will be called as
func(*args)
– provideNone
if you wantfunc()
with no args). Ifstart_nodes
isNone
or isn’t provided, nodes that have no incoming edges are treated as start nodes.
- property graph: AsyncGraph#
The graph to execute.
- property start_nodes: dict[str, tuple]#
Start nodes and their arguments.
This is a dictionary that maps each start node (str) to its arguments to be passed in when the graph execution begins.
- turn_off_data_flow_logging()#
Turn off data flow logging.
- turn_on_data_flow_logging(node_format: str | None = None, node_filter: Iterable[str] | None = None, time_interval: int | None = None, logger: Logger | None = None)#
Turn on and configure data flow logging.
For a long-running graph execution, it is helpful to log such data flow information at a regular time interval.
- Parameters:
- node_formatstr, optional
Logging format for each node’s statistics of (i) the number of times data has passed
in
the node, (ii) the number of times data has comeout
of the node, and (iii) the number of errors the node has had. If not provided, the default is" {node} - in={in}, out={out}, err={err}"
.- node_filter: Iterable[str], optional
Filter to see logs from only the specified nodes by name. If not provided, all nodes’ statistics will be logged.
- time_interval: int, optional
Time interval in seconds between data flow logs. If not provided, the default is 60 seconds.
- loggerlogging.Logger, optional
Provide a logger for any customization. If not provided, a generic
logging.getLogger(__name__)
is used.
- class async_graph_data_flow.graph.InvalidAsyncGraphError#
- Attributes:
- args
Methods
add_note
Exception.add_note(note) -- add a note to the exception
with_traceback
Exception.with_traceback(tb) -- set self.__traceback__ to tb and return self.