diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 13f15cb7..861cd389 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -49,9 +49,10 @@ from ..accounting import ( Transaction, TransactionLedger, open_trade_ledger, - open_pps, + open_account, ) from ..data import iterticks +from ..data._symcache import open_symcache from ..accounting import unpack_fqme from ._util import ( log, # sub-sys logger @@ -541,141 +542,146 @@ async def open_trade_dialog( # enable piker.clearing console log for *this* subactor get_console_log(loglevel) - acnt: Account - ledger: TransactionLedger - with ( - open_pps( - broker, - 'paper', - write_on_exit=True, - ) as acnt, + async with open_symcache(get_brokermod(broker)) as symcache: + acnt: Account + ledger: TransactionLedger + with ( - open_trade_ledger( - broker, - 'paper', - ) as ledger - ): - # NOTE: retreive market(pair) info from the backend broker - # since ledger entries (in their backend native format) often - # don't contain necessary market info per trade record entry.. - # - if no fqme was passed in, we presume we're running in - # "ledger-sync-only mode" and thus we load mkt info for - # each symbol found in the ledger to a acnt table manually. + # TODO: probably do the symcache and ledger loading + # implicitly behind this? Deliver an account, and ledger + # pair or make the ledger an attr of the account? + open_account( + broker, + 'paper', + write_on_exit=True, + ) as acnt, - # TODO: how to process ledger info from backends? - # - should we be rolling our own actor-cached version of these - # client API refs or using portal IPC to send requests to the - # existing brokerd daemon? - # - alternatively we can possibly expect and use - # a `.broker.norm_trade_records()` ep? - brokermod = get_brokermod(broker) - gmi = getattr(brokermod, 'get_mkt_info', None) + open_trade_ledger( + broker, + 'paper', + symcache=symcache, + ) as ledger + ): + # NOTE: retreive market(pair) info from the backend broker + # since ledger entries (in their backend native format) often + # don't contain necessary market info per trade record entry.. + # - if no fqme was passed in, we presume we're running in + # "ledger-sync-only mode" and thus we load mkt info for + # each symbol found in the ledger to a acnt table manually. - # update all transactions with mkt info before - # loading any pps - mkt_by_fqme: dict[str, MktPair] = {} - if fqme: - bs_fqme, _, broker = fqme.rpartition('.') - mkt, pair = await brokermod.get_mkt_info(bs_fqme) - mkt_by_fqme[mkt.fqme] = mkt + # TODO: how to process ledger info from backends? + # - should we be rolling our own actor-cached version of these + # client API refs or using portal IPC to send requests to the + # existing brokerd daemon? + # - alternatively we can possibly expect and use + # a `.broker.norm_trade_records()` ep? + brokermod = get_brokermod(broker) + gmi = getattr(brokermod, 'get_mkt_info', None) - # for each sym in the ledger load it's `MktPair` info - for tid, txdict in ledger.data.items(): - l_fqme: str = txdict.get('fqme') or txdict['fqsn'] + # update all transactions with mkt info before + # loading any pps + mkt_by_fqme: dict[str, MktPair] = {} + if fqme: + bs_fqme, _, broker = fqme.rpartition('.') + mkt, pair = await brokermod.get_mkt_info(bs_fqme) + mkt_by_fqme[mkt.fqme] = mkt - if ( - gmi - and l_fqme not in mkt_by_fqme - ): - mkt, pair = await gmi( - l_fqme.rstrip(f'.{broker}'), - ) - mkt_by_fqme[l_fqme] = mkt + # for each sym in the ledger load it's `MktPair` info + for tid, txdict in ledger.data.items(): + l_fqme: str = txdict.get('fqme') or txdict['fqsn'] - # if an ``fqme: str`` input was provided we only - # need a ``MktPair`` for that one market, since we're - # running in real simulated-clearing mode, not just ledger - # syncing. - if ( - fqme is not None - and fqme in mkt_by_fqme - ): - break + if ( + gmi + and l_fqme not in mkt_by_fqme + ): + mkt, pair = await gmi( + l_fqme.rstrip(f'.{broker}'), + ) + mkt_by_fqme[l_fqme] = mkt - # update pos table from ledger history and provide a ``MktPair`` - # lookup for internal position accounting calcs. - acnt.update_from_ledger(ledger) + # if an ``fqme: str`` input was provided we only + # need a ``MktPair`` for that one market, since we're + # running in real simulated-clearing mode, not just ledger + # syncing. + if ( + fqme is not None + and fqme in mkt_by_fqme + ): + break - pp_msgs: list[BrokerdPosition] = [] - pos: Position - token: str # f'{symbol}.{self.broker}' - for token, pos in acnt.pps.items(): - pp_msgs.append(BrokerdPosition( - broker=broker, - account='paper', - symbol=pos.mkt.fqme, - size=pos.size, - avg_price=pos.ppu, + # update pos table from ledger history and provide a ``MktPair`` + # lookup for internal position accounting calcs. + acnt.update_from_ledger(ledger) + + pp_msgs: list[BrokerdPosition] = [] + pos: Position + token: str # f'{symbol}.{self.broker}' + for token, pos in acnt.pps.items(): + pp_msgs.append(BrokerdPosition( + broker=broker, + account='paper', + symbol=pos.mkt.fqme, + size=pos.size, + avg_price=pos.ppu, + )) + + await ctx.started(( + pp_msgs, + ['paper'], )) - await ctx.started(( - pp_msgs, - ['paper'], - )) + # write new positions state in case ledger was + # newer then that tracked in pps.toml + acnt.write_config() - # write new positions state in case ledger was - # newer then that tracked in pps.toml - acnt.write_config() - - # exit early since no fqme was passed, - # normally this case is just to load - # positions "offline". - if fqme is None: - log.warning( - 'Paper engine only running in position delivery mode!\n' - 'NO SIMULATED CLEARING LOOP IS ACTIVE!' - ) - await trio.sleep_forever() - return - - async with ( - data.open_feed( - [fqme], - loglevel=loglevel, - ) as feed, - ): - # sanity check all the mkt infos - for fqme, flume in feed.flumes.items(): - mkt = mkt_by_fqme[fqme] - print(mkt - flume.mkt) - assert mkt == flume.mkt + # exit early since no fqme was passed, + # normally this case is just to load + # positions "offline". + if fqme is None: + log.warning( + 'Paper engine only running in position delivery mode!\n' + 'NO SIMULATED CLEARING LOOP IS ACTIVE!' + ) + await trio.sleep_forever() + return async with ( - ctx.open_stream() as ems_stream, - trio.open_nursery() as n, + data.open_feed( + [fqme], + loglevel=loglevel, + ) as feed, ): - client = PaperBoi( - broker=broker, - ems_trades_stream=ems_stream, - acnt=acnt, - ledger=ledger, + # sanity check all the mkt infos + for fqme, flume in feed.flumes.items(): + mkt = mkt_by_fqme[fqme] + assert mkt == flume.mkt - _buys=_buys, - _sells=_sells, - _reqids=_reqids, + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + client = PaperBoi( + broker=broker, + ems_trades_stream=ems_stream, + acnt=acnt, + ledger=ledger, - _mkts=mkt_by_fqme, + _buys=_buys, + _sells=_sells, + _reqids=_reqids, - ) + _mkts=mkt_by_fqme, - n.start_soon( - handle_order_requests, - client, - ems_stream, - ) + ) - # paper engine simulator clearing task - await simulate_fills(feed.streams[broker], client) + n.start_soon( + handle_order_requests, + client, + ems_stream, + ) + + # paper engine simulator clearing task + await simulate_fills(feed.streams[broker], client) @acm