Add symcache support to paper eng

- add the `.norm_trade()` required ep (for symcache offline loading)
- port to new `Account` apis (which now require a symcache input)
account_tests
Tyler Goodlet 2023-07-10 12:08:11 -04:00
parent 55c3d617fa
commit 4123c97139
1 changed files with 49 additions and 18 deletions

View File

@ -45,7 +45,7 @@ from ..accounting._mktinfo import (
) )
from ..accounting import ( from ..accounting import (
Position, Position,
PpTable, Account,
Transaction, Transaction,
TransactionLedger, TransactionLedger,
open_trade_ledger, open_trade_ledger,
@ -77,10 +77,8 @@ class PaperBoi(Struct):
''' '''
broker: str broker: str
ems_trades_stream: tractor.MsgStream ems_trades_stream: tractor.MsgStream
acnt: Account
ppt: PpTable
ledger: TransactionLedger ledger: TransactionLedger
# map of paper "live" orders which be used # map of paper "live" orders which be used
@ -263,9 +261,9 @@ class PaperBoi(Struct):
# we don't actually have any unique backend symbol ourselves # we don't actually have any unique backend symbol ourselves
# other then this thing, our fqme address. # other then this thing, our fqme address.
bs_mktid: str = fqme bs_mktid: str = fqme
assert self._mkts[fqme].fqme == fqme
t = Transaction( t = Transaction(
fqme=fqme, fqme=fqme,
sym=self._mkts[fqme],
tid=oid, tid=oid,
size=size, size=size,
price=price, price=price,
@ -276,10 +274,13 @@ class PaperBoi(Struct):
# update in-mem ledger and pos table # update in-mem ledger and pos table
self.ledger.update_from_t(t) self.ledger.update_from_t(t)
self.ppt.update_from_trans({oid: t}) self.acnt.update_from_ledger(
{oid: t},
symcache=self.ledger._symcache,
)
# transmit pp msg to ems # transmit pp msg to ems
pp = self.ppt.pps[bs_mktid] pp = self.acnt.pps[bs_mktid]
pp_msg = BrokerdPosition( pp_msg = BrokerdPosition(
broker=self.broker, broker=self.broker,
account='paper', account='paper',
@ -296,7 +297,7 @@ class PaperBoi(Struct):
# write all updates to filesys immediately # write all updates to filesys immediately
# (adds latency but that works for simulation anyway) # (adds latency but that works for simulation anyway)
self.ledger.write_config() self.ledger.write_config()
self.ppt.write_config() self.acnt.write_config()
await self.ems_trades_stream.send(pp_msg) await self.ems_trades_stream.send(pp_msg)
@ -540,14 +541,14 @@ async def open_trade_dialog(
# enable piker.clearing console log for *this* subactor # enable piker.clearing console log for *this* subactor
get_console_log(loglevel) get_console_log(loglevel)
ppt: PpTable acnt: Account
ledger: TransactionLedger ledger: TransactionLedger
with ( with (
open_pps( open_pps(
broker, broker,
'paper', 'paper',
write_on_exit=True, write_on_exit=True,
) as ppt, ) as acnt,
open_trade_ledger( open_trade_ledger(
broker, broker,
@ -559,7 +560,7 @@ async def open_trade_dialog(
# don't contain necessary market info per trade record entry.. # don't contain necessary market info per trade record entry..
# - if no fqme was passed in, we presume we're running in # - if no fqme was passed in, we presume we're running in
# "ledger-sync-only mode" and thus we load mkt info for # "ledger-sync-only mode" and thus we load mkt info for
# each symbol found in the ledger to a ppt table manually. # each symbol found in the ledger to a acnt table manually.
# TODO: how to process ledger info from backends? # TODO: how to process ledger info from backends?
# - should we be rolling our own actor-cached version of these # - should we be rolling our own actor-cached version of these
@ -575,7 +576,7 @@ async def open_trade_dialog(
mkt_by_fqme: dict[str, MktPair] = {} mkt_by_fqme: dict[str, MktPair] = {}
if fqme: if fqme:
bs_fqme, _, broker = fqme.rpartition('.') bs_fqme, _, broker = fqme.rpartition('.')
mkt, _ = await brokermod.get_mkt_info(bs_fqme) mkt, pair = await brokermod.get_mkt_info(bs_fqme)
mkt_by_fqme[mkt.fqme] = mkt mkt_by_fqme[mkt.fqme] = mkt
# for each sym in the ledger load it's `MktPair` info # for each sym in the ledger load it's `MktPair` info
@ -586,7 +587,7 @@ async def open_trade_dialog(
gmi gmi
and l_fqme not in mkt_by_fqme and l_fqme not in mkt_by_fqme
): ):
mkt, pair = await brokermod.get_mkt_info( mkt, pair = await gmi(
l_fqme.rstrip(f'.{broker}'), l_fqme.rstrip(f'.{broker}'),
) )
mkt_by_fqme[l_fqme] = mkt mkt_by_fqme[l_fqme] = mkt
@ -603,12 +604,12 @@ async def open_trade_dialog(
# update pos table from ledger history and provide a ``MktPair`` # update pos table from ledger history and provide a ``MktPair``
# lookup for internal position accounting calcs. # lookup for internal position accounting calcs.
ppt.update_from_trans(ledger.to_trans(mkt_by_fqme=mkt_by_fqme)) acnt.update_from_ledger(ledger)
pp_msgs: list[BrokerdPosition] = [] pp_msgs: list[BrokerdPosition] = []
pos: Position pos: Position
token: str # f'{symbol}.{self.broker}' token: str # f'{symbol}.{self.broker}'
for token, pos in ppt.pps.items(): for token, pos in acnt.pps.items():
pp_msgs.append(BrokerdPosition( pp_msgs.append(BrokerdPosition(
broker=broker, broker=broker,
account='paper', account='paper',
@ -624,7 +625,7 @@ async def open_trade_dialog(
# write new positions state in case ledger was # write new positions state in case ledger was
# newer then that tracked in pps.toml # newer then that tracked in pps.toml
ppt.write_config() acnt.write_config()
# exit early since no fqme was passed, # exit early since no fqme was passed,
# normally this case is just to load # normally this case is just to load
@ -645,7 +646,9 @@ async def open_trade_dialog(
): ):
# sanity check all the mkt infos # sanity check all the mkt infos
for fqme, flume in feed.flumes.items(): for fqme, flume in feed.flumes.items():
assert mkt_by_fqme[fqme] == flume.mkt mkt = mkt_by_fqme[fqme]
print(mkt - flume.mkt)
assert mkt == flume.mkt
async with ( async with (
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
@ -654,7 +657,7 @@ async def open_trade_dialog(
client = PaperBoi( client = PaperBoi(
broker=broker, broker=broker,
ems_trades_stream=ems_stream, ems_trades_stream=ems_stream,
ppt=ppt, acnt=acnt,
ledger=ledger, ledger=ledger,
_buys=_buys, _buys=_buys,
@ -723,3 +726,31 @@ async def open_paperboi(
await ctx.cancel() await ctx.cancel()
if we_spawned: if we_spawned:
await portal.cancel_actor() await portal.cancel_actor()
def norm_trade(
txdict: dict,
) -> Transaction:
from pendulum import (
DateTime,
parse,
)
# special field handling for datetimes
# to ensure pendulum is used!
dt: DateTime = parse(txdict['dt'])
expiry: str | None = txdict.get('expiry')
fqme: str = txdict.get('fqme') or txdict.pop('fqsn')
return Transaction(
fqme=fqme,
tid=txdict['tid'],
dt=dt,
price=txdict['price'],
size=txdict['size'],
cost=txdict.get('cost', 0),
bs_mktid=txdict['bs_mktid'],
expiry=parse(expiry) if expiry else None,
etype='clear',
)