diff --git a/examples/basic_order_bot.py b/examples/basic_order_bot.py index 51575cc3..7aff6966 100644 --- a/examples/basic_order_bot.py +++ b/examples/basic_order_bot.py @@ -121,6 +121,7 @@ async def bot_main(): # tick_throttle=10, ) as feed, + tractor.trionics.collapse_eg(), trio.open_nursery() as tn, ): assert accounts diff --git a/piker/accounting/_pos.py b/piker/accounting/_pos.py index 1b305009..e815881d 100644 --- a/piker/accounting/_pos.py +++ b/piker/accounting/_pos.py @@ -362,7 +362,11 @@ class Position(Struct): # added: bool = False tid: str = t.tid if tid in self._events: - log.warning(f'{t} is already added?!') + log.debug( + f'Txn is already added?\n' + f'\n' + f'{t}\n' + ) # return added # TODO: apparently this IS possible with a dict but not @@ -696,7 +700,7 @@ class Account(Struct): else: # TODO: we reallly need a diff set of # loglevels/colors per subsys. - log.warning( + log.debug( f'Recent position for {fqme} was closed!' ) diff --git a/piker/brokers/binance/broker.py b/piker/brokers/binance/broker.py index a13ce38f..919e8152 100644 --- a/piker/brokers/binance/broker.py +++ b/piker/brokers/binance/broker.py @@ -440,6 +440,7 @@ async def open_trade_dialog( # - ledger: TransactionLedger async with ( + tractor.trionics.collapse_eg(), trio.open_nursery() as tn, ctx.open_stream() as ems_stream, ): diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index a84c3f6f..436e1952 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -38,6 +38,7 @@ from typing import ( ) from pendulum import now +import tractor import trio import numpy as np from tractor.trionics import ( @@ -708,6 +709,7 @@ async def get_client( ) -> Client: async with ( + tractor.trionics.collapse_eg(), trio.open_nursery() as n, open_jsonrpc_session( _ws_url, response_type=JSONRPCResult diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 09e842d3..5c8563ba 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -48,6 +48,7 @@ from bidict import bidict import trio import tractor from tractor import to_asyncio +from tractor import trionics from pendulum import ( from_timestamp, DateTime, @@ -1373,8 +1374,8 @@ async def load_clients_for_trio( ''' Pure async mngr proxy to ``load_aio_clients()``. - This is a bootstrap entrypoing to call from - a ``tractor.to_asyncio.open_channel_from()``. + This is a bootstrap entrypoint to call from + a `tractor.to_asyncio.open_channel_from()`. ''' async with load_aio_clients( @@ -1395,7 +1396,10 @@ async def open_client_proxies() -> tuple[ async with ( tractor.trionics.maybe_open_context( acm_func=tractor.to_asyncio.open_channel_from, - kwargs={'target': load_clients_for_trio}, + kwargs={ + 'target': load_clients_for_trio, + # ^XXX, kwarg to `open_channel_from()` + }, # lock around current actor task access # TODO: maybe this should be the default in tractor? @@ -1588,7 +1592,8 @@ async def open_client_proxy( event_consumers=event_table, ) as (first, chan), - trio.open_nursery() as relay_n, + trionics.collapse_eg(), # loose-ify + trio.open_nursery() as relay_tn, ): assert isinstance(first, Client) @@ -1628,7 +1633,7 @@ async def open_client_proxy( continue - relay_n.start_soon(relay_events) + relay_tn.start_soon(relay_events) yield proxy diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index ddda9020..b78f2880 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -34,6 +34,7 @@ import trio from trio_typing import TaskStatus import tractor from tractor.to_asyncio import LinkedTaskChannel +from tractor import trionics from ib_insync.contract import ( Contract, ) @@ -407,7 +408,7 @@ async def update_and_audit_pos_msg( # TODO: make this a "propaganda" log level? if ibpos.avgCost != msg.avg_price: - log.warning( + log.debug( f'IB "FIFO" avg price for {msg.symbol} is DIFF:\n' f'ib: {ibfmtmsg}\n' '---------------------------\n' @@ -738,7 +739,7 @@ async def open_trade_dialog( f'UNEXPECTED POSITION says IB => {msg.symbol}\n' 'Maybe they LIQUIDATED YOU or your ledger is wrong?\n' ) - log.error(logmsg) + log.debug(logmsg) await ctx.started(( all_positions, @@ -747,21 +748,22 @@ async def open_trade_dialog( async with ( ctx.open_stream() as ems_stream, - trio.open_nursery() as n, + trionics.collapse_eg(), + trio.open_nursery() as tn, ): # relay existing open orders to ems for msg in order_msgs: await ems_stream.send(msg) for client in set(aioclients.values()): - trade_event_stream: LinkedTaskChannel = await n.start( + trade_event_stream: LinkedTaskChannel = await tn.start( open_trade_event_stream, client, ) # start order request handler **before** local trades # event loop - n.start_soon( + tn.start_soon( handle_order_requests, ems_stream, accounts_def, @@ -769,7 +771,7 @@ async def open_trade_dialog( ) # allocate event relay tasks for each client connection - n.start_soon( + tn.start_soon( deliver_trade_events, trade_event_stream, diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index aad849aa..6c49b8e9 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -42,6 +42,7 @@ from bidict import bidict import trio from trio_typing import TaskStatus import tractor +from tractor import trionics from ._util import ( log, # sub-sys logger @@ -500,7 +501,7 @@ class Router(Struct): ''' # setup at actor spawn time - nursery: trio.Nursery + _tn: trio.Nursery # broker to book map books: dict[str, DarkBook] = {} @@ -666,7 +667,7 @@ class Router(Struct): # dark book clearing loop, also lives with parent # daemon to allow dark order clearing while no # client is connected. - self.nursery.start_soon( + self._tn.start_soon( clear_dark_triggers, self, relay.brokerd_stream, @@ -689,7 +690,7 @@ class Router(Struct): # spawn a ``brokerd`` order control dialog stream # that syncs lifetime with the parent `emsd` daemon. - self.nursery.start_soon( + self._tn.start_soon( translate_and_relay_brokerd_events, broker, relay.brokerd_stream, @@ -763,10 +764,12 @@ async def _setup_persistent_emsd( global _router - # open a root "service nursery" for the ``emsd`` actor - async with trio.open_nursery() as service_nursery: - - _router = Router(nursery=service_nursery) + # open a root "service task-nursery" for the `emsd`-actor + async with ( + trionics.collapse_eg(), + trio.open_nursery() as tn + ): + _router = Router(_tn=tn) # TODO: send back the full set of persistent # orders/execs? @@ -1511,7 +1514,7 @@ async def maybe_open_trade_relays( loglevel: str = 'info', ): - fqme, relay, feed, client_ready = await _router.nursery.start( + fqme, relay, feed, client_ready = await _router._tn.start( _router.open_trade_relays, fqme, exec_mode, diff --git a/piker/data/feed.py b/piker/data/feed.py index 05834e9d..18404954 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -728,7 +728,10 @@ class Feed(Struct): async for msg in stream: await tx.send(msg) - async with trio.open_nursery() as nurse: + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery() as nurse + ): # spawn a relay task for each stream so that they all # multiplex to a common channel. for brokername in mods: diff --git a/piker/tsp/__init__.py b/piker/tsp/__init__.py index f10f0f75..121fcbb7 100644 --- a/piker/tsp/__init__.py +++ b/piker/tsp/__init__.py @@ -963,7 +963,10 @@ async def tsdb_backfill( # concurrently load the provider's most-recent-frame AND any # pre-existing tsdb history already saved in `piker` storage. dt_eps: list[DateTime, DateTime] = [] - async with trio.open_nursery() as tn: + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery() as tn + ): tn.start_soon( push_latest_frame, dt_eps, @@ -1012,9 +1015,16 @@ async def tsdb_backfill( int, Duration, ]|None = config.get('frame_types', None) + if def_frame_durs: def_frame_size: Duration = def_frame_durs[timeframe] - assert def_frame_size == calced_frame_size + + if def_frame_size != calced_frame_size: + log.warning( + f'Expected frame size {def_frame_size}\n' + f'Rxed frame {calced_frame_size}\n' + ) + # await tractor.pause() else: # use what we calced from first frame above. def_frame_size = calced_frame_size @@ -1043,7 +1053,9 @@ async def tsdb_backfill( # if there is a gap to backfill from the first # history frame until the last datum loaded from the tsdb # continue that now in the background - async with trio.open_nursery() as tn: + async with trio.open_nursery( + strict_exception_groups=False, + ) as tn: bf_done = await tn.start( partial( @@ -1308,6 +1320,7 @@ async def manage_history( # sampling period) data set since normally differently # sampled timeseries can be loaded / process independently # ;) + tractor.trionics.collapse_eg(), trio.open_nursery() as tn, ): log.info( diff --git a/tests/test_questrade.py b/tests/test_questrade.py index 4614b4f9..79e51ef5 100644 --- a/tests/test_questrade.py +++ b/tests/test_questrade.py @@ -142,7 +142,12 @@ async def test_concurrent_tokens_refresh(us_symbols, loglevel): # async with tractor.open_nursery() as n: # await n.run_in_actor('other', intermittently_refresh_tokens) - async with trio.open_nursery() as n: + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery( + # strict_exception_groups=False, + ) as n + ): quoter = await qt.stock_quoter(client, us_symbols) @@ -383,7 +388,9 @@ async def test_quote_streaming(tmx_symbols, loglevel, stream_what): else: symbols = [tmx_symbols] - async with trio.open_nursery() as n: + async with trio.open_nursery( + strict_exception_groups=False, + ) as n: for syms, func in zip(symbols, stream_what): n.start_soon(func, feed, syms)