Awaitables para ejecucion de digrafos
Hace poco me tocó resolver un problema en el trabajo que me hizo dar cuenta de la existencia de un patrón para programación concurrente.
El problema era procesar (llamar a un endpoint) con una serie de registros desde BigQuery con la restriccion de que si algun registro $R_i$ tiene una fecha anterior a otro registro $R_j$ y comparten el valor en algunas de las columnas con otro entonces $R_i$ debe procesarse antes que $R_j$ y se debia esperar al menos tres segundos a que se terminen todos los efectos secundarios del proceso.
En el momento la cantidad de registros a procesar rondaba los 2.000.000, por lo que hacer un proceso secuencial esperando tres segundos entre los registros no resultaba la mejor solución.
Planteo
Formalizando un poco el problema, definimos:
- $R_i = [t_i, a_i, b_i, …]$
- $R_i > R_j \iff t_i > t_j$ y $\exists{z}z_i = z_j$
Lo que estamos definiendo termina siendo un orden parcial, lo que permite armar un grafo de dependencias que define como deberiamos procesar los registros.
Solución 1: Ejecución con coordinador
Una posible solución es levantar todos los registros en memoria, armar un digrafo, realizar un ordenamiento topológico1 y resolver en orden.
Primero definimos un registro con el chequeo de dependencias y un metodo que ejecuta lo que necesitamos.
from typing import Any
import asyncio
class Record:
def __init__(self, timestamp: int, values: dict[str, Any]):
self.timestamp = timestamp
self.values = values
def depends_on(self, other: "Record") -> bool:
if other.timestamp > self.timestamp:
return False
for k, v in self.values.items():
if k in other.values and other.values[k] == v:
return True
return False
def __str__(self):
return f"{self.timestamp}=" + \
":".join([str(v) for v in self.values.values()])
async def execute(self): # Dummy execution
await asyncio.sleep(2)
Definimolos la lista que representa la lista entera de registros o tareas a procesar.
records = [
Record(0, {"nombre": "Juan", "edad": 23}),
Record(1, {"nombre": "Juan", "edad": 24}),
Record(2, {"nombre": "Juan", "edad": 32}),
Record(3, {"nombre": "Ana", "edad": 34}),
Record(4, {"nombre": "Pedro", "edad": 24}),
Record(5, {"nombre": "Ana", "edad": 24,}),
]
Aprovechando la libreria de networkx para graficar armamos un grafo.
import networkx as nx
G = nx.DiGraph()
for i in range(len(records)):
r_i = records[i]
G.add_node(r_i)
for i in range(len(records)):
r_i = records[i]
for j in range(len(records)):
if i == j:
continue
r_j = records[j]
if r_i.depends_on(r_j):
G.add_edge(r_j, r_i)

Una vez planteado el digrafo, solo queda ejecutar en algun órden válido. Tomamos los nodos que no tienen dependencia y los ejecutamos. Cuando alguno termina validamos si alguno nuevo se liberó (no le quedan dependencias) y lo ponemos en la cola de ejecución. El algoritmo termina cuando no quedan nodos por ejecutar.
from time import perf_counter
SG = G.copy()
start = perf_counter()
async def run(record: Record):
print(f"{perf_counter() - start:.3f}: Executing node: {record}")
await record.execute()
return record
running = []
for u in SG:
if len(list(SG.predecessors(u))) == 0:
running.append(asyncio.create_task(run(u)))
while len(SG) > 0:
finished, running = await asyncio.wait(running, return_when=asyncio.FIRST_COMPLETED)
successors = set()
for task in finished:
successors.update(SG.successors(task.result()))
SG.remove_node(task.result())
for v in successors:
if len(list(SG.predecessors(v))) == 0:
running.add(asyncio.create_task(run(v)))
print(f"Execution took: {perf_counter() - start:.3f}s")
0.000: Executing node: 0=Juan:23
0.000: Executing node: 3=Ana:34
2.003: Executing node: 1=Juan:24
4.005: Executing node: 4=Pedro:24
4.005: Executing node: 2=Juan:32
6.007: Executing node: 5=Ana:24
Execution took: 8.009s
La solución funciona, de hecho no hay muchas optimizaciones grandes a hacer, se podria manejar mejor el tema de los predecesores de nodos y evitar modificar el grafo sobre la marcha, pero son detalles.
Solución 2: Ejecución ‘distribuida’
Aprovechando las primitivas de asyncio (de concurrencia en general) podemos delegar el manejo de dependencia de los nodos al scheduler async.
from time import perf_counter
from asyncio import Task
from dataclasses import dataclass
@dataclass
class RunningRecord:
record: Record
task: Task
start = perf_counter()
async def run(record: Record, dependencies: list[RunningRecord]):
if len(dependencies) > 0:
await asyncio.wait(map(lambda d: d.task, dependencies))
print(f"{perf_counter() - start:.3f}: Executing node: {record}")
await record.execute()
return record
running: list[RunningRecord] = []
def new_record(record: Record):
dependencies = [r for r in running if record.depends_on(r.record)]
running.append(
RunningRecord(
record,
asyncio.create_task(
run(record, dependencies)
)
)
)
for r in sorted(records, key=lambda r: r.timestamp):
new_record(r)
await asyncio.gather(*[r.task for r in running])
print(f"Execution took: {perf_counter() - start:.3f}s")
0.001: Executing node: 0=Juan:23
0.001: Executing node: 3=Ana:34
2.003: Executing node: 1=Juan:24
4.003: Executing node: 2=Juan:32
4.003: Executing node: 4=Pedro:24
6.005: Executing node: 5=Ana:24
Execution took: 8.007s
Aprovechando que $t_i < t_j \implies R_i \not> R_j$, podemos levantar los nodos de manera ordenada sabiendo que solo tenemos que comparar con los anteriores para evaluar dependencia.
Además podemos evitar armar un digrafo y en base a eso ejecutar los nodos, directamente la ejecución de un nodo tiene como dependencia la ejecución de sus predecesores.
await asyncio.wait(map(lambda d: d.task, dependencies))
Otro detalle que se puede ver estructurando el código de ésta manera es que podemos agregar nodos al grafo implicito mientras está ejecutando. (Con la solución 1 también se puede, solo que es un poco más complejo).
Con un poco de magia podemos limitar los recursos en uso del sistema, levantando de a una cierta cantidad de registros en orden y procesar, limitando la cantidad de nodos “vivos” con alguna limitante por tamaño de arreglo o chequeo de ram.
También es importante ver que no hay dependencia real en asyncio, se puede usar cualquier libreria de concurrencia (o implementación propia) que permita esperar sobre futures.
Código fuente
-
https://en.wikipedia.org/wiki/Topological_sorting ↩