From 4123c971397c824ab1e882554566ccea3dfc6422 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 10 Jul 2023 12:08:11 -0400 Subject: [PATCH] Add symcache support to paper eng - add the `.norm_trade()` required ep (for symcache offline loading) - port to new `Account` apis (which now require a symcache input) --- piker/clearing/_paper_engine.py | 67 ++++++++++++++++++++++++--------- 1 file changed, 49 insertions(+), 18 deletions(-) diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 2df4eb4e..13f15cb7 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -45,7 +45,7 @@ from ..accounting._mktinfo import ( ) from ..accounting import ( Position, - PpTable, + Account, Transaction, TransactionLedger, open_trade_ledger, @@ -77,10 +77,8 @@ class PaperBoi(Struct): ''' broker: str - ems_trades_stream: tractor.MsgStream - - ppt: PpTable + acnt: Account ledger: TransactionLedger # map of paper "live" orders which be used @@ -263,9 +261,9 @@ class PaperBoi(Struct): # we don't actually have any unique backend symbol ourselves # other then this thing, our fqme address. bs_mktid: str = fqme + assert self._mkts[fqme].fqme == fqme t = Transaction( fqme=fqme, - sym=self._mkts[fqme], tid=oid, size=size, price=price, @@ -276,10 +274,13 @@ class PaperBoi(Struct): # update in-mem ledger and pos table self.ledger.update_from_t(t) - self.ppt.update_from_trans({oid: t}) + self.acnt.update_from_ledger( + {oid: t}, + symcache=self.ledger._symcache, + ) # transmit pp msg to ems - pp = self.ppt.pps[bs_mktid] + pp = self.acnt.pps[bs_mktid] pp_msg = BrokerdPosition( broker=self.broker, account='paper', @@ -296,7 +297,7 @@ class PaperBoi(Struct): # write all updates to filesys immediately # (adds latency but that works for simulation anyway) self.ledger.write_config() - self.ppt.write_config() + self.acnt.write_config() await self.ems_trades_stream.send(pp_msg) @@ -540,14 +541,14 @@ async def open_trade_dialog( # enable piker.clearing console log for *this* subactor get_console_log(loglevel) - ppt: PpTable + acnt: Account ledger: TransactionLedger with ( open_pps( broker, 'paper', write_on_exit=True, - ) as ppt, + ) as acnt, open_trade_ledger( broker, @@ -559,7 +560,7 @@ async def open_trade_dialog( # don't contain necessary market info per trade record entry.. # - if no fqme was passed in, we presume we're running in # "ledger-sync-only mode" and thus we load mkt info for - # each symbol found in the ledger to a ppt table manually. + # each symbol found in the ledger to a acnt table manually. # TODO: how to process ledger info from backends? # - should we be rolling our own actor-cached version of these @@ -575,7 +576,7 @@ async def open_trade_dialog( mkt_by_fqme: dict[str, MktPair] = {} if fqme: bs_fqme, _, broker = fqme.rpartition('.') - mkt, _ = await brokermod.get_mkt_info(bs_fqme) + mkt, pair = await brokermod.get_mkt_info(bs_fqme) mkt_by_fqme[mkt.fqme] = mkt # for each sym in the ledger load it's `MktPair` info @@ -586,7 +587,7 @@ async def open_trade_dialog( gmi and l_fqme not in mkt_by_fqme ): - mkt, pair = await brokermod.get_mkt_info( + mkt, pair = await gmi( l_fqme.rstrip(f'.{broker}'), ) mkt_by_fqme[l_fqme] = mkt @@ -603,12 +604,12 @@ async def open_trade_dialog( # update pos table from ledger history and provide a ``MktPair`` # lookup for internal position accounting calcs. - ppt.update_from_trans(ledger.to_trans(mkt_by_fqme=mkt_by_fqme)) + acnt.update_from_ledger(ledger) pp_msgs: list[BrokerdPosition] = [] pos: Position token: str # f'{symbol}.{self.broker}' - for token, pos in ppt.pps.items(): + for token, pos in acnt.pps.items(): pp_msgs.append(BrokerdPosition( broker=broker, account='paper', @@ -624,7 +625,7 @@ async def open_trade_dialog( # write new positions state in case ledger was # newer then that tracked in pps.toml - ppt.write_config() + acnt.write_config() # exit early since no fqme was passed, # normally this case is just to load @@ -645,7 +646,9 @@ async def open_trade_dialog( ): # sanity check all the mkt infos for fqme, flume in feed.flumes.items(): - assert mkt_by_fqme[fqme] == flume.mkt + mkt = mkt_by_fqme[fqme] + print(mkt - flume.mkt) + assert mkt == flume.mkt async with ( ctx.open_stream() as ems_stream, @@ -654,7 +657,7 @@ async def open_trade_dialog( client = PaperBoi( broker=broker, ems_trades_stream=ems_stream, - ppt=ppt, + acnt=acnt, ledger=ledger, _buys=_buys, @@ -723,3 +726,31 @@ async def open_paperboi( await ctx.cancel() if we_spawned: await portal.cancel_actor() + + +def norm_trade( + txdict: dict, + +) -> Transaction: + from pendulum import ( + DateTime, + parse, + ) + + # special field handling for datetimes + # to ensure pendulum is used! + dt: DateTime = parse(txdict['dt']) + expiry: str | None = txdict.get('expiry') + fqme: str = txdict.get('fqme') or txdict.pop('fqsn') + + return Transaction( + fqme=fqme, + tid=txdict['tid'], + dt=dt, + price=txdict['price'], + size=txdict['size'], + cost=txdict.get('cost', 0), + bs_mktid=txdict['bs_mktid'], + expiry=parse(expiry) if expiry else None, + etype='clear', + )