Utilize the new `_mktmap_table` input in paper engine

In cases where a brokerd backend doesn't yet support a symcache we need
to do manual `.get_mkt_info()` queries and stash them in a table that we
pass in for the mkt failover lookup to `Account.update_from_ledger()`.
Set the `PaperBoi._mkts` to this table for use on real-time ledger
writes in `.fake_fill()`.
account_tests
Tyler Goodlet 2023-07-26 12:21:27 -04:00
parent bebc817d19
commit 188508575a
2 changed files with 61 additions and 32 deletions

View File

@ -30,6 +30,7 @@ import time
from typing import (
Callable,
)
from types import ModuleType
import uuid
from bidict import bidict
@ -37,23 +38,24 @@ import pendulum
import trio
import tractor
from ..brokers import get_brokermod
from .. import data
from ..data.types import Struct
from ..accounting._mktinfo import (
MktPair,
)
from ..accounting import (
from piker.brokers import get_brokermod
from piker.accounting import (
Position,
Account,
Transaction,
TransactionLedger,
open_trade_ledger,
open_account,
MktPair,
unpack_fqme,
)
from piker.data import (
open_feed,
iterticks,
Struct,
open_symcache,
SymbologyCache,
)
from ..data import iterticks
from ..data._symcache import open_symcache
from ..accounting import unpack_fqme
from ._util import (
log, # sub-sys logger
get_console_log,
@ -262,7 +264,6 @@ class PaperBoi(Struct):
# we don't actually have any unique backend symbol ourselves
# other then this thing, our fqme address.
bs_mktid: str = fqme
assert self._mkts[fqme].fqme == fqme
t = Transaction(
fqme=fqme,
tid=oid,
@ -278,6 +279,11 @@ class PaperBoi(Struct):
self.acnt.update_from_ledger(
{oid: t},
symcache=self.ledger._symcache,
# XXX when a backend has no symcache support yet we can
# simply pass in the gmi() retreived table created
# during init :o
_mktmap_table=self._mkts,
)
# transmit pp msg to ems
@ -544,7 +550,9 @@ async def open_trade_dialog(
# enable piker.clearing console log for *this* subactor
get_console_log(loglevel)
symcache: SymbologyCache
async with open_symcache(get_brokermod(broker)) as symcache:
acnt: Account
ledger: TransactionLedger
with (
@ -564,38 +572,53 @@ async def open_trade_dialog(
symcache=symcache,
) as ledger
):
# NOTE: retreive market(pair) info from the backend broker
# since ledger entries (in their backend native format) often
# don't contain necessary market info per trade record entry..
# - if no fqme was passed in, we presume we're running in
# "ledger-sync-only mode" and thus we load mkt info for
# each symbol found in the ledger to a acnt table manually.
# NOTE: WE MUST retreive market(pair) info from each
# backend broker since ledger entries (in their
# provider-native format) often don't contain necessary
# market info per trade record entry..
# FURTHER, if no fqme was passed in, we presume we're
# running in "ledger-sync-only mode" and thus we load
# mkt info for each symbol found in the ledger to
# an acnt table manually.
# TODO: how to process ledger info from backends?
# - should we be rolling our own actor-cached version of these
# client API refs or using portal IPC to send requests to the
# existing brokerd daemon?
# client API refs or using portal IPC to send requests to the
# existing brokerd daemon?
# - alternatively we can possibly expect and use
# a `.broker.norm_trade_records()` ep?
brokermod = get_brokermod(broker)
gmi = getattr(brokermod, 'get_mkt_info', None)
# a `.broker.ledger.norm_trade()` ep?
brokermod: ModuleType = get_brokermod(broker)
gmi: Callable = getattr(brokermod, 'get_mkt_info', None)
# update all transactions with mkt info before
# loading any pps
mkt_by_fqme: dict[str, MktPair] = {}
if fqme:
if (
fqme
and fqme not in symcache.mktmaps
):
log.warning(
f'Symcache for {broker} has no `{fqme}` entry?\n'
'Manually requesting mkt map data via `.get_mkt_info()`..'
)
bs_fqme, _, broker = fqme.rpartition('.')
mkt, pair = await brokermod.get_mkt_info(bs_fqme)
mkt, pair = await gmi(bs_fqme)
mkt_by_fqme[mkt.fqme] = mkt
# for each sym in the ledger load it's `MktPair` info
# for each sym in the ledger load its `MktPair` info
for tid, txdict in ledger.data.items():
l_fqme: str = txdict.get('fqme') or txdict['fqsn']
if (
gmi
and l_fqme not in symcache.mktmaps
and l_fqme not in mkt_by_fqme
):
log.warning(
f'Symcache for {broker} has no `{l_fqme}` entry?\n'
'Manually requesting mkt map data via `.get_mkt_info()`..'
)
mkt, pair = await gmi(
l_fqme.rstrip(f'.{broker}'),
)
@ -613,7 +636,15 @@ async def open_trade_dialog(
# update pos table from ledger history and provide a ``MktPair``
# lookup for internal position accounting calcs.
acnt.update_from_ledger(ledger)
acnt.update_from_ledger(
ledger,
# NOTE: if the symcache fails on fqme lookup
# (either sycache not yet supported or not filled
# in) use manually constructed table from calling
# the `.get_mkt_info()` provider EP above.
_mktmap_table=mkt_by_fqme,
)
pp_msgs: list[BrokerdPosition] = []
pos: Position
@ -649,15 +680,15 @@ async def open_trade_dialog(
return
async with (
data.open_feed(
open_feed(
[fqme],
loglevel=loglevel,
) as feed,
):
# sanity check all the mkt infos
for fqme, flume in feed.flumes.items():
mkt = mkt_by_fqme[fqme]
assert mkt == flume.mkt
mkt = symcache.mktmaps.get(fqme) or mkt_by_fqme[fqme]
assert mkt == flume.mkt
async with (
ctx.open_stream() as ems_stream,
@ -741,6 +772,7 @@ def norm_trade(
tid: str,
txdict: dict,
pairs: dict[str, Struct],
symcache: SymbologyCache | None = None,
) -> Transaction:
from pendulum import (

View File

@ -36,9 +36,6 @@ get_console_log = partial(
)
# TODO: use this in other backends like kraken which currently has
# a less formalized version more or less:
# `apiflows[reqid].maps.append(status_msg.to_dict())`
class OrderDialogs(Struct):
'''
Order control dialog (and thus transaction) tracking via