From 4b7ac1d895c263fe729894db423e1a3d7600bf6c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 17 Apr 2023 16:31:21 -0400 Subject: [PATCH] Port paper engine to latest `.accounting` sys fixes - only preload necessary (one for clearing, all for ledger sync) `MktPair` info from the backend using `.get_mkt_info()`, build the `mkt_by_fqme: dict[str, MktPair]` and pass it to `TransactionLedger.iter_trans()`. - use new `TransactionLedger.update_from_t()` method on clears. - sanity check all `mkt_by_fqme` entries against `Flume.mkt` values when we open a data feed. - rename `PaperBoi._syms` -> `._mkts`. --- piker/clearing/_paper_engine.py | 81 +++++++++++++++++++-------------- 1 file changed, 47 insertions(+), 34 deletions(-) diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 23e1d347..511d625c 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -38,7 +38,6 @@ from ..brokers import get_brokermod from .. import data from ..data.types import Struct from ..accounting._mktinfo import ( - Symbol, MktPair, ) from ..accounting import ( @@ -85,7 +84,7 @@ class PaperBoi(Struct): _buys: defaultdict[str, bidict] _sells: defaultdict[str, bidict] _reqids: bidict - _syms: dict[str, Symbol] = {} + _mkts: dict[str, MktPair] = {} # init edge case L1 spread last_ask: tuple[float, float] = (float('inf'), 0) # price, size @@ -262,7 +261,7 @@ class PaperBoi(Struct): bs_mktid: str = fqme t = Transaction( fqsn=fqme, - sym=self._syms[fqme], + sym=self._mkts[fqme], tid=oid, size=size, price=price, @@ -271,11 +270,8 @@ class PaperBoi(Struct): bs_mktid=bs_mktid, ) - tx = t.to_dict() - tx.pop('sym') - # update in-mem ledger and pos table - self.ledger.update({oid: tx}) + self.ledger.update_from_t(t) self.ppt.update_from_trans({oid: t}) # transmit pp msg to ems @@ -293,12 +289,13 @@ class PaperBoi(Struct): # inferred from the pair? # currency=bs_mktid, ) - await self.ems_trades_stream.send(pp_msg) - - # write all updates to filesys + # write all updates to filesys immediately + # (adds latency but that works for simulation anyway) self.ledger.write_config() self.ppt.write_config() + await self.ems_trades_stream.send(pp_msg) + async def simulate_fills( quote_stream: tractor.MsgStream, # noqa @@ -552,36 +549,51 @@ async def trades_dialogue( 'paper', ) as ledger ): - # attempt to get market info from the backend instead of presuming - # the ledger entries have everything correct. + # NOTE: retreive market(pair) info from the backend broker + # since ledger entries (in their backend native format) often + # don't contain necessary market info per trade record entry.. + # - if no fqme was passed in, we presume we're running in + # "ledger-sync-only mode" and thus we load mkt info for + # each symbol found in the ledger to a ppt table manually. + # TODO: how to process ledger info from backends? # - should we be rolling our own actor-cached version of these # client API refs or using portal IPC to send requests to the # existing brokerd daemon? # - alternatively we can possibly expect and use # a `.broker.norm_trade_records()` ep? - fqmes: list[str] = [fqme] - if fqme is None: - fqmes = list(ppt.pps) + brokermod = get_brokermod(broker) + gmi = getattr(brokermod, 'get_mkt_info', None) - for fqme in fqmes: - mkt: MktPair | None = None - brokermod = get_brokermod(broker) - gmi = getattr(brokermod, 'get_mkt_info', None) - if gmi: + # update all transactions with mkt info before + # loading any pps + mkt_by_fqme: dict[str, MktPair | None] = {} + for tid, tdict in ledger.data.items(): + + # TODO: switch this to fqme + l_fqme = tdict['fqsn'] + if ( + gmi + and l_fqme not in mkt_by_fqme + ): mkt, pair = await brokermod.get_mkt_info( - fqme.rstrip(f'.{broker}'), + l_fqme.rstrip(f'.{broker}'), ) + mkt_by_fqme[l_fqme] = mkt - # update pos table from ledger history - ppt.update_from_trans( - ledger.to_trans(), + # if an ``fqme: str`` input was provided we only + # need a ``MktPair`` for that one market, since we're + # running in real simulated-clearing mode, not just ledger + # syncing. + if ( + fqme is not None + and fqme in mkt_by_fqme + ): + break - # NOTE: here we pass in any `MktPair` provided by the - # backend broker instead of assuming the pps.toml contains - # the correct contents! - force_mkt=mkt - ) + # update pos table from ledger history and provide a ``MktPair`` + # lookup for internal position accounting calcs. + ppt.update_from_trans(ledger.to_trans(mkt_by_fqme=mkt_by_fqme)) pp_msgs: list[BrokerdPosition] = [] pos: Position @@ -621,6 +633,10 @@ async def trades_dialogue( loglevel=loglevel, ) as feed, ): + # sanity check all the mkt infos + for fqme, flume in feed.flumes.items(): + assert mkt_by_fqme[fqme] == flume.mkt + async with ( ctx.open_stream() as ems_stream, trio.open_nursery() as n, @@ -635,11 +651,8 @@ async def trades_dialogue( _sells=_sells, _reqids=_reqids, - # TODO: load postions from ledger file - _syms={ - fqme: flume.symbol - for fqme, flume in feed.flumes.items() - } + _mkts=mkt_by_fqme, + ) n.start_soon(