Convert remaining `.to_asyncio.open_channel_from()` to `chan` fn-sig usage

ib_2025_updates
Tyler Goodlet 2025-09-22 12:58:23 -04:00
parent 40dca34fde
commit 4f8dc7693b
2 changed files with 10 additions and 16 deletions

View File

@ -1367,9 +1367,7 @@ async def load_aio_clients(
async def load_clients_for_trio( async def load_clients_for_trio(
from_trio: asyncio.Queue, chan: tractor.to_asyncio.LinkedTaskChannel,
to_trio: trio.abc.SendChannel,
) -> None: ) -> None:
''' '''
Pure async mngr proxy to ``load_aio_clients()``. Pure async mngr proxy to ``load_aio_clients()``.
@ -1382,8 +1380,7 @@ async def load_clients_for_trio(
disconnect_on_exit=False, disconnect_on_exit=False,
) as accts2clients: ) as accts2clients:
to_trio.send_nowait(accts2clients) chan.started_nowait(accts2clients)
# TODO: maybe a sync event to wait on instead? # TODO: maybe a sync event to wait on instead?
await asyncio.sleep(float('inf')) await asyncio.sleep(float('inf'))
@ -1530,23 +1527,22 @@ class MethodProxy:
async def open_aio_client_method_relay( async def open_aio_client_method_relay(
from_trio: asyncio.Queue, chan: tractor.to_asyncio.LinkedTaskChannel,
to_trio: trio.abc.SendChannel,
client: Client, client: Client,
event_consumers: dict[str, trio.Event], event_consumers: dict[str, trio.Event],
) -> None: ) -> None:
# sync with `open_client_proxy()` caller # sync with `open_client_proxy()` caller
to_trio.send_nowait(client) chan.started_nowait(client)
# TODO: separate channel for error handling? # TODO: separate channel for error handling?
client.inline_errors(to_trio) client.inline_errors(chan)
# relay all method requests to ``asyncio``-side client and deliver # relay all method requests to ``asyncio``-side client and deliver
# back results # back results
while not to_trio._closed: while not chan._to_trio._closed: # <- TODO, better check like `._web_bs`?
msg: tuple[str, dict]|dict|None = await from_trio.get() msg: tuple[str, dict]|dict|None = await chan.get()
match msg: match msg:
case None: # termination sentinel case None: # termination sentinel
log.info('asyncio `Client` method-proxy SHUTDOWN!') log.info('asyncio `Client` method-proxy SHUTDOWN!')
@ -1559,7 +1555,7 @@ async def open_aio_client_method_relay(
try: try:
resp = await meth(**kwargs) resp = await meth(**kwargs)
# echo the msg back # echo the msg back
to_trio.send_nowait({'result': resp}) chan.send_nowait({'result': resp})
except ( except (
RequestError, RequestError,
@ -1567,10 +1563,10 @@ async def open_aio_client_method_relay(
# TODO: relay all errors to trio? # TODO: relay all errors to trio?
# BaseException, # BaseException,
) as err: ) as err:
to_trio.send_nowait({'exception': err}) chan.send_nowait({'exception': err})
case {'error': content}: case {'error': content}:
to_trio.send_nowait({'exception': content}) chan.send_nowait({'exception': content})
case _: case _:
raise ValueError(f'Unhandled msg {msg}') raise ValueError(f'Unhandled msg {msg}')

View File

@ -684,8 +684,6 @@ async def get_bars(
_quote_streams: dict[str, trio.abc.ReceiveStream] = {} _quote_streams: dict[str, trio.abc.ReceiveStream] = {}
# TODO! update to the new style sig with,
# `chan: to_asyncio.LinkedTaskChannel,`
async def _setup_quote_stream( async def _setup_quote_stream(
chan: tractor.to_asyncio.LinkedTaskChannel, chan: tractor.to_asyncio.LinkedTaskChannel,
symbol: str, symbol: str,