From 4f8dc7693ba47c916e8360158c6ced16b143c9d2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 22 Sep 2025 12:58:23 -0400 Subject: [PATCH] Convert remaining `.to_asyncio.open_channel_from()` to `chan` fn-sig usage --- piker/brokers/ib/api.py | 24 ++++++++++-------------- piker/brokers/ib/feed.py | 2 -- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 01659bf5..2ccfd403 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -1367,9 +1367,7 @@ async def load_aio_clients( async def load_clients_for_trio( - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, - + chan: tractor.to_asyncio.LinkedTaskChannel, ) -> None: ''' Pure async mngr proxy to ``load_aio_clients()``. @@ -1382,8 +1380,7 @@ async def load_clients_for_trio( disconnect_on_exit=False, ) as accts2clients: - to_trio.send_nowait(accts2clients) - + chan.started_nowait(accts2clients) # TODO: maybe a sync event to wait on instead? await asyncio.sleep(float('inf')) @@ -1530,23 +1527,22 @@ class MethodProxy: async def open_aio_client_method_relay( - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, + chan: tractor.to_asyncio.LinkedTaskChannel, client: Client, event_consumers: dict[str, trio.Event], ) -> None: # sync with `open_client_proxy()` caller - to_trio.send_nowait(client) + chan.started_nowait(client) # TODO: separate channel for error handling? - client.inline_errors(to_trio) + client.inline_errors(chan) # relay all method requests to ``asyncio``-side client and deliver # back results - while not to_trio._closed: - msg: tuple[str, dict]|dict|None = await from_trio.get() + while not chan._to_trio._closed: # <- TODO, better check like `._web_bs`? + msg: tuple[str, dict]|dict|None = await chan.get() match msg: case None: # termination sentinel log.info('asyncio `Client` method-proxy SHUTDOWN!') @@ -1559,7 +1555,7 @@ async def open_aio_client_method_relay( try: resp = await meth(**kwargs) # echo the msg back - to_trio.send_nowait({'result': resp}) + chan.send_nowait({'result': resp}) except ( RequestError, @@ -1567,10 +1563,10 @@ async def open_aio_client_method_relay( # TODO: relay all errors to trio? # BaseException, ) as err: - to_trio.send_nowait({'exception': err}) + chan.send_nowait({'exception': err}) case {'error': content}: - to_trio.send_nowait({'exception': content}) + chan.send_nowait({'exception': content}) case _: raise ValueError(f'Unhandled msg {msg}') diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 6799b6d9..6782e010 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -684,8 +684,6 @@ async def get_bars( _quote_streams: dict[str, trio.abc.ReceiveStream] = {} -# TODO! update to the new style sig with, -# `chan: to_asyncio.LinkedTaskChannel,` async def _setup_quote_stream( chan: tractor.to_asyncio.LinkedTaskChannel, symbol: str,