Port paper engine to latest `.accounting` sys fixes

- only preload necessary (one for clearing, all for ledger sync)
  `MktPair` info from the backend using `.get_mkt_info()`, build the
  `mkt_by_fqme: dict[str, MktPair]` and pass it to
  `TransactionLedger.iter_trans()`.
- use new `TransactionLedger.update_from_t()` method on clears.
- sanity check all `mkt_by_fqme` entries against `Flume.mkt` values
  when we open a data feed.
- rename `PaperBoi._syms` -> `._mkts`.
rekt_pps
Tyler Goodlet 2023-04-17 16:31:21 -04:00
parent 7ee6f36e62
commit 4b7ac1d895
1 changed files with 47 additions and 34 deletions

View File

@ -38,7 +38,6 @@ from ..brokers import get_brokermod
from .. import data
from ..data.types import Struct
from ..accounting._mktinfo import (
Symbol,
MktPair,
)
from ..accounting import (
@ -85,7 +84,7 @@ class PaperBoi(Struct):
_buys: defaultdict[str, bidict]
_sells: defaultdict[str, bidict]
_reqids: bidict
_syms: dict[str, Symbol] = {}
_mkts: dict[str, MktPair] = {}
# init edge case L1 spread
last_ask: tuple[float, float] = (float('inf'), 0) # price, size
@ -262,7 +261,7 @@ class PaperBoi(Struct):
bs_mktid: str = fqme
t = Transaction(
fqsn=fqme,
sym=self._syms[fqme],
sym=self._mkts[fqme],
tid=oid,
size=size,
price=price,
@ -271,11 +270,8 @@ class PaperBoi(Struct):
bs_mktid=bs_mktid,
)
tx = t.to_dict()
tx.pop('sym')
# update in-mem ledger and pos table
self.ledger.update({oid: tx})
self.ledger.update_from_t(t)
self.ppt.update_from_trans({oid: t})
# transmit pp msg to ems
@ -293,12 +289,13 @@ class PaperBoi(Struct):
# inferred from the pair?
# currency=bs_mktid,
)
await self.ems_trades_stream.send(pp_msg)
# write all updates to filesys
# write all updates to filesys immediately
# (adds latency but that works for simulation anyway)
self.ledger.write_config()
self.ppt.write_config()
await self.ems_trades_stream.send(pp_msg)
async def simulate_fills(
quote_stream: tractor.MsgStream, # noqa
@ -552,36 +549,51 @@ async def trades_dialogue(
'paper',
) as ledger
):
# attempt to get market info from the backend instead of presuming
# the ledger entries have everything correct.
# NOTE: retreive market(pair) info from the backend broker
# since ledger entries (in their backend native format) often
# don't contain necessary market info per trade record entry..
# - if no fqme was passed in, we presume we're running in
# "ledger-sync-only mode" and thus we load mkt info for
# each symbol found in the ledger to a ppt table manually.
# TODO: how to process ledger info from backends?
# - should we be rolling our own actor-cached version of these
# client API refs or using portal IPC to send requests to the
# existing brokerd daemon?
# - alternatively we can possibly expect and use
# a `.broker.norm_trade_records()` ep?
fqmes: list[str] = [fqme]
if fqme is None:
fqmes = list(ppt.pps)
brokermod = get_brokermod(broker)
gmi = getattr(brokermod, 'get_mkt_info', None)
for fqme in fqmes:
mkt: MktPair | None = None
brokermod = get_brokermod(broker)
gmi = getattr(brokermod, 'get_mkt_info', None)
if gmi:
# update all transactions with mkt info before
# loading any pps
mkt_by_fqme: dict[str, MktPair | None] = {}
for tid, tdict in ledger.data.items():
# TODO: switch this to fqme
l_fqme = tdict['fqsn']
if (
gmi
and l_fqme not in mkt_by_fqme
):
mkt, pair = await brokermod.get_mkt_info(
fqme.rstrip(f'.{broker}'),
l_fqme.rstrip(f'.{broker}'),
)
mkt_by_fqme[l_fqme] = mkt
# update pos table from ledger history
ppt.update_from_trans(
ledger.to_trans(),
# if an ``fqme: str`` input was provided we only
# need a ``MktPair`` for that one market, since we're
# running in real simulated-clearing mode, not just ledger
# syncing.
if (
fqme is not None
and fqme in mkt_by_fqme
):
break
# NOTE: here we pass in any `MktPair` provided by the
# backend broker instead of assuming the pps.toml contains
# the correct contents!
force_mkt=mkt
)
# update pos table from ledger history and provide a ``MktPair``
# lookup for internal position accounting calcs.
ppt.update_from_trans(ledger.to_trans(mkt_by_fqme=mkt_by_fqme))
pp_msgs: list[BrokerdPosition] = []
pos: Position
@ -621,6 +633,10 @@ async def trades_dialogue(
loglevel=loglevel,
) as feed,
):
# sanity check all the mkt infos
for fqme, flume in feed.flumes.items():
assert mkt_by_fqme[fqme] == flume.mkt
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
@ -635,11 +651,8 @@ async def trades_dialogue(
_sells=_sells,
_reqids=_reqids,
# TODO: load postions from ledger file
_syms={
fqme: flume.symbol
for fqme, flume in feed.flumes.items()
}
_mkts=mkt_by_fqme,
)
n.start_soon(