From 188508575acacf6ab0287797323e2e9be820e89f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 26 Jul 2023 12:21:27 -0400 Subject: [PATCH] Utilize the new `_mktmap_table` input in paper engine In cases where a brokerd backend doesn't yet support a symcache we need to do manual `.get_mkt_info()` queries and stash them in a table that we pass in for the mkt failover lookup to `Account.update_from_ledger()`. Set the `PaperBoi._mkts` to this table for use on real-time ledger writes in `.fake_fill()`. --- piker/clearing/_paper_engine.py | 90 ++++++++++++++++++++++----------- piker/clearing/_util.py | 3 -- 2 files changed, 61 insertions(+), 32 deletions(-) diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 03b69c0e..67c789a4 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -30,6 +30,7 @@ import time from typing import ( Callable, ) +from types import ModuleType import uuid from bidict import bidict @@ -37,23 +38,24 @@ import pendulum import trio import tractor -from ..brokers import get_brokermod -from .. import data -from ..data.types import Struct -from ..accounting._mktinfo import ( - MktPair, -) -from ..accounting import ( +from piker.brokers import get_brokermod +from piker.accounting import ( Position, Account, Transaction, TransactionLedger, open_trade_ledger, open_account, + MktPair, + unpack_fqme, +) +from piker.data import ( + open_feed, + iterticks, + Struct, + open_symcache, + SymbologyCache, ) -from ..data import iterticks -from ..data._symcache import open_symcache -from ..accounting import unpack_fqme from ._util import ( log, # sub-sys logger get_console_log, @@ -262,7 +264,6 @@ class PaperBoi(Struct): # we don't actually have any unique backend symbol ourselves # other then this thing, our fqme address. bs_mktid: str = fqme - assert self._mkts[fqme].fqme == fqme t = Transaction( fqme=fqme, tid=oid, @@ -278,6 +279,11 @@ class PaperBoi(Struct): self.acnt.update_from_ledger( {oid: t}, symcache=self.ledger._symcache, + + # XXX when a backend has no symcache support yet we can + # simply pass in the gmi() retreived table created + # during init :o + _mktmap_table=self._mkts, ) # transmit pp msg to ems @@ -544,7 +550,9 @@ async def open_trade_dialog( # enable piker.clearing console log for *this* subactor get_console_log(loglevel) + symcache: SymbologyCache async with open_symcache(get_brokermod(broker)) as symcache: + acnt: Account ledger: TransactionLedger with ( @@ -564,38 +572,53 @@ async def open_trade_dialog( symcache=symcache, ) as ledger ): - # NOTE: retreive market(pair) info from the backend broker - # since ledger entries (in their backend native format) often - # don't contain necessary market info per trade record entry.. - # - if no fqme was passed in, we presume we're running in - # "ledger-sync-only mode" and thus we load mkt info for - # each symbol found in the ledger to a acnt table manually. + # NOTE: WE MUST retreive market(pair) info from each + # backend broker since ledger entries (in their + # provider-native format) often don't contain necessary + # market info per trade record entry.. + # FURTHER, if no fqme was passed in, we presume we're + # running in "ledger-sync-only mode" and thus we load + # mkt info for each symbol found in the ledger to + # an acnt table manually. # TODO: how to process ledger info from backends? # - should we be rolling our own actor-cached version of these - # client API refs or using portal IPC to send requests to the - # existing brokerd daemon? + # client API refs or using portal IPC to send requests to the + # existing brokerd daemon? # - alternatively we can possibly expect and use - # a `.broker.norm_trade_records()` ep? - brokermod = get_brokermod(broker) - gmi = getattr(brokermod, 'get_mkt_info', None) + # a `.broker.ledger.norm_trade()` ep? + brokermod: ModuleType = get_brokermod(broker) + gmi: Callable = getattr(brokermod, 'get_mkt_info', None) # update all transactions with mkt info before # loading any pps mkt_by_fqme: dict[str, MktPair] = {} - if fqme: + if ( + fqme + and fqme not in symcache.mktmaps + ): + log.warning( + f'Symcache for {broker} has no `{fqme}` entry?\n' + 'Manually requesting mkt map data via `.get_mkt_info()`..' + ) + bs_fqme, _, broker = fqme.rpartition('.') - mkt, pair = await brokermod.get_mkt_info(bs_fqme) + mkt, pair = await gmi(bs_fqme) mkt_by_fqme[mkt.fqme] = mkt - # for each sym in the ledger load it's `MktPair` info + # for each sym in the ledger load its `MktPair` info for tid, txdict in ledger.data.items(): l_fqme: str = txdict.get('fqme') or txdict['fqsn'] if ( gmi + and l_fqme not in symcache.mktmaps and l_fqme not in mkt_by_fqme ): + log.warning( + f'Symcache for {broker} has no `{l_fqme}` entry?\n' + 'Manually requesting mkt map data via `.get_mkt_info()`..' + ) mkt, pair = await gmi( l_fqme.rstrip(f'.{broker}'), ) @@ -613,7 +636,15 @@ async def open_trade_dialog( # update pos table from ledger history and provide a ``MktPair`` # lookup for internal position accounting calcs. - acnt.update_from_ledger(ledger) + acnt.update_from_ledger( + ledger, + + # NOTE: if the symcache fails on fqme lookup + # (either sycache not yet supported or not filled + # in) use manually constructed table from calling + # the `.get_mkt_info()` provider EP above. + _mktmap_table=mkt_by_fqme, + ) pp_msgs: list[BrokerdPosition] = [] pos: Position @@ -649,15 +680,15 @@ async def open_trade_dialog( return async with ( - data.open_feed( + open_feed( [fqme], loglevel=loglevel, ) as feed, ): # sanity check all the mkt infos for fqme, flume in feed.flumes.items(): - mkt = mkt_by_fqme[fqme] - assert mkt == flume.mkt + mkt = symcache.mktmaps.get(fqme) or mkt_by_fqme[fqme] + assert mkt == flume.mkt async with ( ctx.open_stream() as ems_stream, @@ -741,6 +772,7 @@ def norm_trade( tid: str, txdict: dict, pairs: dict[str, Struct], + symcache: SymbologyCache | None = None, ) -> Transaction: from pendulum import ( diff --git a/piker/clearing/_util.py b/piker/clearing/_util.py index d3c0fb8e..3ba7f55f 100644 --- a/piker/clearing/_util.py +++ b/piker/clearing/_util.py @@ -36,9 +36,6 @@ get_console_log = partial( ) -# TODO: use this in other backends like kraken which currently has -# a less formalized version more or less: -# `apiflows[reqid].maps.append(status_msg.to_dict())` class OrderDialogs(Struct): ''' Order control dialog (and thus transaction) tracking via