Spurious first-draft of EG collapsing

Topically, throughout various (seemingly) console-UX-affecting or benign
spots in the code base; nothing that required more intervention beyond
things superficial. A few spots also include `trio.Nursery` ref renames
(always to something with a `tn` in it) and log-level reductions to
quiet (benign) console noise oriented around issues meant to be solved
long..

Note there's still a couple spots i left with the loose-ify flag because
i haven't fully tested them without using the latest version of
`tractor.trionics.collapse_eg()`, but more then likely they should flip
over fine.
alt_tpts_for_perf
Tyler Goodlet 2025-09-15 19:10:20 -04:00
parent a45de0b710
commit 69eac7bb15
10 changed files with 68 additions and 27 deletions

View File

@ -121,6 +121,7 @@ async def bot_main():
# tick_throttle=10, # tick_throttle=10,
) as feed, ) as feed,
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
assert accounts assert accounts

View File

@ -362,7 +362,11 @@ class Position(Struct):
# added: bool = False # added: bool = False
tid: str = t.tid tid: str = t.tid
if tid in self._events: if tid in self._events:
log.warning(f'{t} is already added?!') log.debug(
f'Txn is already added?\n'
f'\n'
f'{t}\n'
)
# return added # return added
# TODO: apparently this IS possible with a dict but not # TODO: apparently this IS possible with a dict but not
@ -696,7 +700,7 @@ class Account(Struct):
else: else:
# TODO: we reallly need a diff set of # TODO: we reallly need a diff set of
# loglevels/colors per subsys. # loglevels/colors per subsys.
log.warning( log.debug(
f'Recent position for {fqme} was closed!' f'Recent position for {fqme} was closed!'
) )

View File

@ -440,6 +440,7 @@ async def open_trade_dialog(
# - ledger: TransactionLedger # - ledger: TransactionLedger
async with ( async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
): ):

View File

@ -38,6 +38,7 @@ from typing import (
) )
from pendulum import now from pendulum import now
import tractor
import trio import trio
import numpy as np import numpy as np
from tractor.trionics import ( from tractor.trionics import (
@ -708,6 +709,7 @@ async def get_client(
) -> Client: ) -> Client:
async with ( async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as n, trio.open_nursery() as n,
open_jsonrpc_session( open_jsonrpc_session(
_ws_url, response_type=JSONRPCResult _ws_url, response_type=JSONRPCResult

View File

@ -48,6 +48,7 @@ from bidict import bidict
import trio import trio
import tractor import tractor
from tractor import to_asyncio from tractor import to_asyncio
from tractor import trionics
from pendulum import ( from pendulum import (
from_timestamp, from_timestamp,
DateTime, DateTime,
@ -1373,8 +1374,8 @@ async def load_clients_for_trio(
''' '''
Pure async mngr proxy to ``load_aio_clients()``. Pure async mngr proxy to ``load_aio_clients()``.
This is a bootstrap entrypoing to call from This is a bootstrap entrypoint to call from
a ``tractor.to_asyncio.open_channel_from()``. a `tractor.to_asyncio.open_channel_from()`.
''' '''
async with load_aio_clients( async with load_aio_clients(
@ -1395,7 +1396,10 @@ async def open_client_proxies() -> tuple[
async with ( async with (
tractor.trionics.maybe_open_context( tractor.trionics.maybe_open_context(
acm_func=tractor.to_asyncio.open_channel_from, acm_func=tractor.to_asyncio.open_channel_from,
kwargs={'target': load_clients_for_trio}, kwargs={
'target': load_clients_for_trio,
# ^XXX, kwarg to `open_channel_from()`
},
# lock around current actor task access # lock around current actor task access
# TODO: maybe this should be the default in tractor? # TODO: maybe this should be the default in tractor?
@ -1588,7 +1592,8 @@ async def open_client_proxy(
event_consumers=event_table, event_consumers=event_table,
) as (first, chan), ) as (first, chan),
trio.open_nursery() as relay_n, trionics.collapse_eg(), # loose-ify
trio.open_nursery() as relay_tn,
): ):
assert isinstance(first, Client) assert isinstance(first, Client)
@ -1628,7 +1633,7 @@ async def open_client_proxy(
continue continue
relay_n.start_soon(relay_events) relay_tn.start_soon(relay_events)
yield proxy yield proxy

View File

@ -34,6 +34,7 @@ import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from tractor.to_asyncio import LinkedTaskChannel from tractor.to_asyncio import LinkedTaskChannel
from tractor import trionics
from ib_insync.contract import ( from ib_insync.contract import (
Contract, Contract,
) )
@ -407,7 +408,7 @@ async def update_and_audit_pos_msg(
# TODO: make this a "propaganda" log level? # TODO: make this a "propaganda" log level?
if ibpos.avgCost != msg.avg_price: if ibpos.avgCost != msg.avg_price:
log.warning( log.debug(
f'IB "FIFO" avg price for {msg.symbol} is DIFF:\n' f'IB "FIFO" avg price for {msg.symbol} is DIFF:\n'
f'ib: {ibfmtmsg}\n' f'ib: {ibfmtmsg}\n'
'---------------------------\n' '---------------------------\n'
@ -738,7 +739,7 @@ async def open_trade_dialog(
f'UNEXPECTED POSITION says IB => {msg.symbol}\n' f'UNEXPECTED POSITION says IB => {msg.symbol}\n'
'Maybe they LIQUIDATED YOU or your ledger is wrong?\n' 'Maybe they LIQUIDATED YOU or your ledger is wrong?\n'
) )
log.error(logmsg) log.debug(logmsg)
await ctx.started(( await ctx.started((
all_positions, all_positions,
@ -747,21 +748,22 @@ async def open_trade_dialog(
async with ( async with (
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
trio.open_nursery() as n, trionics.collapse_eg(),
trio.open_nursery() as tn,
): ):
# relay existing open orders to ems # relay existing open orders to ems
for msg in order_msgs: for msg in order_msgs:
await ems_stream.send(msg) await ems_stream.send(msg)
for client in set(aioclients.values()): for client in set(aioclients.values()):
trade_event_stream: LinkedTaskChannel = await n.start( trade_event_stream: LinkedTaskChannel = await tn.start(
open_trade_event_stream, open_trade_event_stream,
client, client,
) )
# start order request handler **before** local trades # start order request handler **before** local trades
# event loop # event loop
n.start_soon( tn.start_soon(
handle_order_requests, handle_order_requests,
ems_stream, ems_stream,
accounts_def, accounts_def,
@ -769,7 +771,7 @@ async def open_trade_dialog(
) )
# allocate event relay tasks for each client connection # allocate event relay tasks for each client connection
n.start_soon( tn.start_soon(
deliver_trade_events, deliver_trade_events,
trade_event_stream, trade_event_stream,

View File

@ -42,6 +42,7 @@ from bidict import bidict
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from tractor import trionics
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
@ -500,7 +501,7 @@ class Router(Struct):
''' '''
# setup at actor spawn time # setup at actor spawn time
nursery: trio.Nursery _tn: trio.Nursery
# broker to book map # broker to book map
books: dict[str, DarkBook] = {} books: dict[str, DarkBook] = {}
@ -666,7 +667,7 @@ class Router(Struct):
# dark book clearing loop, also lives with parent # dark book clearing loop, also lives with parent
# daemon to allow dark order clearing while no # daemon to allow dark order clearing while no
# client is connected. # client is connected.
self.nursery.start_soon( self._tn.start_soon(
clear_dark_triggers, clear_dark_triggers,
self, self,
relay.brokerd_stream, relay.brokerd_stream,
@ -689,7 +690,7 @@ class Router(Struct):
# spawn a ``brokerd`` order control dialog stream # spawn a ``brokerd`` order control dialog stream
# that syncs lifetime with the parent `emsd` daemon. # that syncs lifetime with the parent `emsd` daemon.
self.nursery.start_soon( self._tn.start_soon(
translate_and_relay_brokerd_events, translate_and_relay_brokerd_events,
broker, broker,
relay.brokerd_stream, relay.brokerd_stream,
@ -763,10 +764,12 @@ async def _setup_persistent_emsd(
global _router global _router
# open a root "service nursery" for the ``emsd`` actor # open a root "service task-nursery" for the `emsd`-actor
async with trio.open_nursery() as service_nursery: async with (
trionics.collapse_eg(),
_router = Router(nursery=service_nursery) trio.open_nursery() as tn
):
_router = Router(_tn=tn)
# TODO: send back the full set of persistent # TODO: send back the full set of persistent
# orders/execs? # orders/execs?
@ -1511,7 +1514,7 @@ async def maybe_open_trade_relays(
loglevel: str = 'info', loglevel: str = 'info',
): ):
fqme, relay, feed, client_ready = await _router.nursery.start( fqme, relay, feed, client_ready = await _router._tn.start(
_router.open_trade_relays, _router.open_trade_relays,
fqme, fqme,
exec_mode, exec_mode,

View File

@ -728,7 +728,10 @@ class Feed(Struct):
async for msg in stream: async for msg in stream:
await tx.send(msg) await tx.send(msg)
async with trio.open_nursery() as nurse: async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as nurse
):
# spawn a relay task for each stream so that they all # spawn a relay task for each stream so that they all
# multiplex to a common channel. # multiplex to a common channel.
for brokername in mods: for brokername in mods:

View File

@ -963,7 +963,10 @@ async def tsdb_backfill(
# concurrently load the provider's most-recent-frame AND any # concurrently load the provider's most-recent-frame AND any
# pre-existing tsdb history already saved in `piker` storage. # pre-existing tsdb history already saved in `piker` storage.
dt_eps: list[DateTime, DateTime] = [] dt_eps: list[DateTime, DateTime] = []
async with trio.open_nursery() as tn: async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
tn.start_soon( tn.start_soon(
push_latest_frame, push_latest_frame,
dt_eps, dt_eps,
@ -1012,9 +1015,16 @@ async def tsdb_backfill(
int, int,
Duration, Duration,
]|None = config.get('frame_types', None) ]|None = config.get('frame_types', None)
if def_frame_durs: if def_frame_durs:
def_frame_size: Duration = def_frame_durs[timeframe] def_frame_size: Duration = def_frame_durs[timeframe]
assert def_frame_size == calced_frame_size
if def_frame_size != calced_frame_size:
log.warning(
f'Expected frame size {def_frame_size}\n'
f'Rxed frame {calced_frame_size}\n'
)
# await tractor.pause()
else: else:
# use what we calced from first frame above. # use what we calced from first frame above.
def_frame_size = calced_frame_size def_frame_size = calced_frame_size
@ -1043,7 +1053,9 @@ async def tsdb_backfill(
# if there is a gap to backfill from the first # if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb # history frame until the last datum loaded from the tsdb
# continue that now in the background # continue that now in the background
async with trio.open_nursery() as tn: async with trio.open_nursery(
strict_exception_groups=False,
) as tn:
bf_done = await tn.start( bf_done = await tn.start(
partial( partial(
@ -1308,6 +1320,7 @@ async def manage_history(
# sampling period) data set since normally differently # sampling period) data set since normally differently
# sampled timeseries can be loaded / process independently # sampled timeseries can be loaded / process independently
# ;) # ;)
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
log.info( log.info(

View File

@ -142,7 +142,12 @@ async def test_concurrent_tokens_refresh(us_symbols, loglevel):
# async with tractor.open_nursery() as n: # async with tractor.open_nursery() as n:
# await n.run_in_actor('other', intermittently_refresh_tokens) # await n.run_in_actor('other', intermittently_refresh_tokens)
async with trio.open_nursery() as n: async with (
tractor.trionics.collapse_eg(),
trio.open_nursery(
# strict_exception_groups=False,
) as n
):
quoter = await qt.stock_quoter(client, us_symbols) quoter = await qt.stock_quoter(client, us_symbols)
@ -383,7 +388,9 @@ async def test_quote_streaming(tmx_symbols, loglevel, stream_what):
else: else:
symbols = [tmx_symbols] symbols = [tmx_symbols]
async with trio.open_nursery() as n: async with trio.open_nursery(
strict_exception_groups=False,
) as n:
for syms, func in zip(symbols, stream_what): for syms, func in zip(symbols, stream_what):
n.start_soon(func, feed, syms) n.start_soon(func, feed, syms)