Async enter/open the symcache in paper engine

Since we don't want to be doing a `trio.run()` from async code (being
already in the `tractor` runtime and all); for now just put a top level
block wrapping async enter until we figure out to embed it (likely)
inside `open_account()` and pass the ref to `open_trade_ledger()`.
account_tests
Tyler Goodlet 2023-07-10 17:52:23 -04:00
parent 108e8c7082
commit 06c581bfab
1 changed files with 123 additions and 117 deletions

View File

@ -49,9 +49,10 @@ from ..accounting import (
Transaction,
TransactionLedger,
open_trade_ledger,
open_pps,
open_account,
)
from ..data import iterticks
from ..data._symcache import open_symcache
from ..accounting import unpack_fqme
from ._util import (
log, # sub-sys logger
@ -541,141 +542,146 @@ async def open_trade_dialog(
# enable piker.clearing console log for *this* subactor
get_console_log(loglevel)
acnt: Account
ledger: TransactionLedger
with (
open_pps(
broker,
'paper',
write_on_exit=True,
) as acnt,
async with open_symcache(get_brokermod(broker)) as symcache:
acnt: Account
ledger: TransactionLedger
with (
open_trade_ledger(
broker,
'paper',
) as ledger
):
# NOTE: retreive market(pair) info from the backend broker
# since ledger entries (in their backend native format) often
# don't contain necessary market info per trade record entry..
# - if no fqme was passed in, we presume we're running in
# "ledger-sync-only mode" and thus we load mkt info for
# each symbol found in the ledger to a acnt table manually.
# TODO: probably do the symcache and ledger loading
# implicitly behind this? Deliver an account, and ledger
# pair or make the ledger an attr of the account?
open_account(
broker,
'paper',
write_on_exit=True,
) as acnt,
# TODO: how to process ledger info from backends?
# - should we be rolling our own actor-cached version of these
# client API refs or using portal IPC to send requests to the
# existing brokerd daemon?
# - alternatively we can possibly expect and use
# a `.broker.norm_trade_records()` ep?
brokermod = get_brokermod(broker)
gmi = getattr(brokermod, 'get_mkt_info', None)
open_trade_ledger(
broker,
'paper',
symcache=symcache,
) as ledger
):
# NOTE: retreive market(pair) info from the backend broker
# since ledger entries (in their backend native format) often
# don't contain necessary market info per trade record entry..
# - if no fqme was passed in, we presume we're running in
# "ledger-sync-only mode" and thus we load mkt info for
# each symbol found in the ledger to a acnt table manually.
# update all transactions with mkt info before
# loading any pps
mkt_by_fqme: dict[str, MktPair] = {}
if fqme:
bs_fqme, _, broker = fqme.rpartition('.')
mkt, pair = await brokermod.get_mkt_info(bs_fqme)
mkt_by_fqme[mkt.fqme] = mkt
# TODO: how to process ledger info from backends?
# - should we be rolling our own actor-cached version of these
# client API refs or using portal IPC to send requests to the
# existing brokerd daemon?
# - alternatively we can possibly expect and use
# a `.broker.norm_trade_records()` ep?
brokermod = get_brokermod(broker)
gmi = getattr(brokermod, 'get_mkt_info', None)
# for each sym in the ledger load it's `MktPair` info
for tid, txdict in ledger.data.items():
l_fqme: str = txdict.get('fqme') or txdict['fqsn']
# update all transactions with mkt info before
# loading any pps
mkt_by_fqme: dict[str, MktPair] = {}
if fqme:
bs_fqme, _, broker = fqme.rpartition('.')
mkt, pair = await brokermod.get_mkt_info(bs_fqme)
mkt_by_fqme[mkt.fqme] = mkt
if (
gmi
and l_fqme not in mkt_by_fqme
):
mkt, pair = await gmi(
l_fqme.rstrip(f'.{broker}'),
)
mkt_by_fqme[l_fqme] = mkt
# for each sym in the ledger load it's `MktPair` info
for tid, txdict in ledger.data.items():
l_fqme: str = txdict.get('fqme') or txdict['fqsn']
# if an ``fqme: str`` input was provided we only
# need a ``MktPair`` for that one market, since we're
# running in real simulated-clearing mode, not just ledger
# syncing.
if (
fqme is not None
and fqme in mkt_by_fqme
):
break
if (
gmi
and l_fqme not in mkt_by_fqme
):
mkt, pair = await gmi(
l_fqme.rstrip(f'.{broker}'),
)
mkt_by_fqme[l_fqme] = mkt
# update pos table from ledger history and provide a ``MktPair``
# lookup for internal position accounting calcs.
acnt.update_from_ledger(ledger)
# if an ``fqme: str`` input was provided we only
# need a ``MktPair`` for that one market, since we're
# running in real simulated-clearing mode, not just ledger
# syncing.
if (
fqme is not None
and fqme in mkt_by_fqme
):
break
pp_msgs: list[BrokerdPosition] = []
pos: Position
token: str # f'{symbol}.{self.broker}'
for token, pos in acnt.pps.items():
pp_msgs.append(BrokerdPosition(
broker=broker,
account='paper',
symbol=pos.mkt.fqme,
size=pos.size,
avg_price=pos.ppu,
# update pos table from ledger history and provide a ``MktPair``
# lookup for internal position accounting calcs.
acnt.update_from_ledger(ledger)
pp_msgs: list[BrokerdPosition] = []
pos: Position
token: str # f'{symbol}.{self.broker}'
for token, pos in acnt.pps.items():
pp_msgs.append(BrokerdPosition(
broker=broker,
account='paper',
symbol=pos.mkt.fqme,
size=pos.size,
avg_price=pos.ppu,
))
await ctx.started((
pp_msgs,
['paper'],
))
await ctx.started((
pp_msgs,
['paper'],
))
# write new positions state in case ledger was
# newer then that tracked in pps.toml
acnt.write_config()
# write new positions state in case ledger was
# newer then that tracked in pps.toml
acnt.write_config()
# exit early since no fqme was passed,
# normally this case is just to load
# positions "offline".
if fqme is None:
log.warning(
'Paper engine only running in position delivery mode!\n'
'NO SIMULATED CLEARING LOOP IS ACTIVE!'
)
await trio.sleep_forever()
return
async with (
data.open_feed(
[fqme],
loglevel=loglevel,
) as feed,
):
# sanity check all the mkt infos
for fqme, flume in feed.flumes.items():
mkt = mkt_by_fqme[fqme]
print(mkt - flume.mkt)
assert mkt == flume.mkt
# exit early since no fqme was passed,
# normally this case is just to load
# positions "offline".
if fqme is None:
log.warning(
'Paper engine only running in position delivery mode!\n'
'NO SIMULATED CLEARING LOOP IS ACTIVE!'
)
await trio.sleep_forever()
return
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
data.open_feed(
[fqme],
loglevel=loglevel,
) as feed,
):
client = PaperBoi(
broker=broker,
ems_trades_stream=ems_stream,
acnt=acnt,
ledger=ledger,
# sanity check all the mkt infos
for fqme, flume in feed.flumes.items():
mkt = mkt_by_fqme[fqme]
assert mkt == flume.mkt
_buys=_buys,
_sells=_sells,
_reqids=_reqids,
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
):
client = PaperBoi(
broker=broker,
ems_trades_stream=ems_stream,
acnt=acnt,
ledger=ledger,
_mkts=mkt_by_fqme,
_buys=_buys,
_sells=_sells,
_reqids=_reqids,
)
_mkts=mkt_by_fqme,
n.start_soon(
handle_order_requests,
client,
ems_stream,
)
)
# paper engine simulator clearing task
await simulate_fills(feed.streams[broker], client)
n.start_soon(
handle_order_requests,
client,
ems_stream,
)
# paper engine simulator clearing task
await simulate_fills(feed.streams[broker], client)
@acm