diff --git a/tractor/_ipc.py b/tractor/_ipc.py index 32b8966..9d34b3a 100644 --- a/tractor/_ipc.py +++ b/tractor/_ipc.py @@ -4,7 +4,6 @@ Inter-process comms abstractions import typing from typing import Any, Tuple, Optional from functools import partial -import inspect import msgpack import trio diff --git a/tractor/msg.py b/tractor/msg.py index 5b343b6..560e644 100644 --- a/tractor/msg.py +++ b/tractor/msg.py @@ -29,9 +29,13 @@ async def fan_out_to_ctxs( return tuple(topics2ctxs.keys()) agen = pub_async_gen_func(get_topics=get_topics) + async with aclosing(agen) as pub_gen: + async for published in pub_gen: + ctx_payloads: Dict[str, Any] = {} + for topic, data in published.items(): log.debug(f"publishing {topic, data}") # build a new dict packet or invoke provided packetizer