Rewrite order ctl tests as a parametrization

More or less a complete rework which allows passing a detailed
clearing/fills input and allows for *not* rebooting the runtime / ems
between each position check.

Some further enhancements:
- use (unit) fractional sizes to simulate both the more realistic and
  more "complex position calculation" case; since this is crypto.
- add a no-fqme-found test.
- factor cross-session/offline pos storage (pps.toml) checks into
  a `load_and_check_pos()` helper which does all entry loading directly
  from a provided `BrokerdPosition` msg.
- use the new `OrderClient.send()` async api.
rekt_pps
Tyler Goodlet 2023-04-10 19:05:36 -04:00
parent e524c6fe4f
commit 30af91a82c
1 changed files with 190 additions and 149 deletions

View File

@ -18,7 +18,6 @@ from exceptiongroup import BaseExceptionGroup
import pytest
import tractor
from uuid import uuid4
from functools import partial
from piker.service import Services
from piker.log import get_logger
@ -53,14 +52,15 @@ async def open_pikerd(
yield services
async def submit_order(
async def order_and_and_wait_for_ppmsg(
client: OrderClient,
trades_stream: tractor.MsgStream,
fqme: str,
action: Literal['buy', 'sell'],
price: float = 30000.,
executions: int = 1,
price: float = 100e3, # just a super high price.
size: float = 0.01,
exec_mode: str = 'live',
account: str = 'paper',
@ -73,51 +73,49 @@ async def submit_order(
sent: list[Order] = []
broker, key, suffix = unpack_fqme(fqme)
for _ in range(executions):
order = Order(
exec_mode=exec_mode,
action=action, # TODO: remove this from our schema?
oid=str(uuid4()),
account=account,
size=size,
symbol=fqme,
price=price,
brokers=[broker],
)
sent.append(order)
await client.send(order)
order = Order(
exec_mode=exec_mode,
action=action,
oid=str(uuid4()),
account=account,
size=size,
symbol=fqme,
price=price,
brokers=[broker],
)
sent.append(order)
await client.send(order)
# TODO: i guess we should still test the old sync-API?
# client.send_nowait(order)
# TODO: i guess we should still test the old sync-API?
# client.send_nowait(order)
# Wait for position message before moving on to verify flow(s)
# for the multi-order position entry/exit.
msgs: list[Status | BrokerdPosition] = []
async for msg in trades_stream:
match msg:
case {'name': 'position'}:
ppmsg = BrokerdPosition(**msg)
msgs.append(ppmsg)
break
# Wait for position message before moving on to verify flow(s)
# for the multi-order position entry/exit.
msgs: list[Status | BrokerdPosition] = []
async for msg in trades_stream:
match msg:
case {'name': 'position'}:
ppmsg = BrokerdPosition(**msg)
msgs.append(ppmsg)
break
case {'name': 'status'}:
msgs.append(Status(**msg))
case {'name': 'status'}:
msgs.append(Status(**msg))
return sent, msgs
def run_and_catch(
def run_and_tollerate_cancels(
fn: Callable[..., Awaitable],
expect_errs: tuple[Exception] = (
KeyboardInterrupt,
tractor.ContextCancelled,
)
expect_errs: tuple[Exception] | None = None,
tollerate_errs: tuple[Exception] = (tractor.ContextCancelled,),
):
'''
Close position and assert empty position in pps
Run ``trio``-``piker`` runtime with potential tolerance for
inter-actor cancellation during teardown (normally just
`tractor.ContextCancelled`s).
'''
if expect_errs:
@ -127,7 +125,10 @@ def run_and_catch(
for err in exc_info.value.exceptions:
assert type(err) in expect_errs
else:
trio.run(fn)
try:
trio.run(fn)
except tollerate_errs:
pass
@cm
@ -139,22 +140,28 @@ def load_and_check_pos(
with open_pps(ppmsg.broker, ppmsg.account) as table:
# NOTE: a special case is here since the `PpTable.pps` are
# normally indexed by the particular broker's
# `Position.bs_mktid: str` (a unique market / symbol id provided
# by their systems/design) but for the paper engine case, this
# is the same the fqme.
pp: Position = table.pps[ppmsg.symbol]
if ppmsg.size == 0:
assert ppmsg.symbol not in table.pps
yield None
return
assert ppmsg.size == pp.size
assert ppmsg.avg_price == pp.ppu
else:
# NOTE: a special case is here since the `PpTable.pps` are
# normally indexed by the particular broker's
# `Position.bs_mktid: str` (a unique market / symbol id provided
# by their systems/design) but for the paper engine case, this
# is the same the fqme.
pp: Position = table.pps[ppmsg.symbol]
yield pp
assert ppmsg.size == pp.size
assert ppmsg.avg_price == pp.ppu
yield pp
@pytest.mark.trio
async def test_ems_err_on_bad_broker(
open_pikerd: Services,
open_test_pikerd: Services,
loglevel: str,
):
try:
@ -168,9 +175,60 @@ async def test_ems_err_on_bad_broker(
pass
async def atest_buy(
async def match_ppmsgs_on_ems_boot(
ppmsgs: list[BrokerdPosition],
) -> None:
'''
Given a list of input position msgs, verify they match
what is loaded from the EMS on connect.
'''
by_acct: dict[tuple, list[BrokerdPosition]] = {}
for msg in ppmsgs:
by_acct.setdefault(
(msg.broker, msg.account),
[],
).append(msg)
# TODO: actually support multi-mkts to `open_ems()`
# but for now just pass the first fqme.
fqme = msg.symbol
# disconnect from EMS, reconnect and ensure we get our same
# position relayed to us again in the startup msg.
async with (
open_ems(
fqme,
mode='paper',
loglevel='info',
) as (
_, # OrderClient
_, # tractor.MsgStream
startup_pps,
accounts,
_, # dialogs,
)
):
for (broker, account), ppmsgs in by_acct.items():
assert account in accounts
# lookup all msgs rx-ed for this account
rx_msgs = startup_pps[(broker, account)]
for expect_ppmsg in ppmsgs:
rx_msg = BrokerdPosition(**rx_msgs[expect_ppmsg.symbol])
assert rx_msg == expect_ppmsg
async def submit_and_check(
fills: tuple[dict],
loglevel: str,
):
) -> tuple[
BrokerdPosition,
Position,
]:
'''
Enter a trade and assert entries are made in pps and ledger files.
@ -203,125 +261,108 @@ async def atest_buy(
assert not startup_pps
assert 'paper' in accounts
sent, msgs = await submit_order(
client,
trades_stream,
fqme,
action='buy',
size=0.01,
)
last_order = sent[-1]
od: dict
for od in fills:
print(f'Sending order {od} for fill')
sent, msgs = await order_and_and_wait_for_ppmsg(
client,
trades_stream,
fqme,
action='buy',
size=od['size'],
)
last_order: Order = sent[-1]
last_resp = msgs[-1]
assert isinstance(last_resp, BrokerdPosition)
ppmsg = last_resp
# check that pps.toml for account has been updated
# and all ems position msgs match that state.
with load_and_check_pos(
last_order,
last_resp,
ppmsg,
) as pos:
return pos
pass
# disconnect from EMS, then reconnect and ensure we get our same
# position relayed to us again.
# _run_test_and_check(
# partial(
# _async_main,
# open_test_pikerd_and_ems=open_test_pikerd_and_ems,
# action='buy',
# assert_entries=True,
# ),
# )
# await _async_main(
# open_test_pikerd_and_ems=open_test_pikerd_and_ems,
# assert_pps=True,
# )
# _run_test_and_check(
# partial(
# _async_main,
# open_test_pikerd_and_ems=open_test_pikerd_and_ems,
# assert_pps=True,
# ),
# )
return ppmsg, pos
def test_open_long(
@pytest.mark.parametrize(
'fills',
[
# buy and leave
({'size': 0.001},),
# sell short, then buy back to net-zero in dst
(
{'size': -0.001},
{'size': 0.001},
),
# multi-partial entry and exits.
(
# enters
{'size': 0.001},
{'size': 0.002},
# partial exit
{'size': -0.001},
# partial enter
{'size': 0.0015},
{'size': 0.001},
{'size': 0.002},
# exits to get back to zero.
{'size': -0.001},
{'size': -0.025},
{'size': -0.0195},
),
],
ids='fills={}'.format,
)
def test_multi_fill_positions(
open_test_pikerd: AsyncContextManager,
loglevel: str,
fills: tuple[dict],
check_cross_session: bool = True,
) -> None:
ppmsg: BrokerdPosition
pos: Position
accum_size: float = 0
for fill in fills:
accum_size += fill['size']
async def atest():
# export to outer scope for audit on second runtime-boot.
nonlocal ppmsg, pos
async with (
open_test_pikerd() as (_, _, _, services),
):
assert await atest_buy(loglevel)
ppmsg, pos = await submit_and_check(
fills=fills,
loglevel=loglevel,
)
assert ppmsg.size == accum_size
# Teardown piker like a user would from cli
# raise KeyboardInterrupt
run_and_tollerate_cancels(atest)
run_and_catch(
atest,
expect_errs=None,
)
# Open ems another time and assert existence of prior
# pps entries confirming they persisted
if check_cross_session or accum_size != 0:
# rerun just to check that position info is persistent for the paper
# account (i.e. a user can expect to see paper pps persist across
# runtime sessions.
async def just_check_pp():
async with (
open_test_pikerd() as (_, _, _, services),
):
await match_ppmsgs_on_ems_boot([ppmsg])
# def test_sell(
# open_test_pikerd_and_ems: AsyncContextManager,
# ):
# '''
# Sell position and ensure pps are zeroed.
# '''
# _run_test_and_check(
# partial(
# _async_main,
# open_test_pikerd_and_ems=open_test_pikerd_and_ems,
# action='sell',
# price=1,
# ),
# )
# _run_test_and_check(
# partial(
# _async_main,
# open_test_pikerd_and_ems=open_test_pikerd_and_ems,
# assert_zeroed_pps=True,
# ),
# )
# def test_multi_sell(
# open_test_pikerd_and_ems: AsyncContextManager,
# ):
# '''
# Make 5 market limit buy orders and
# then sell 5 slots at the same price.
# Finally, assert cleared positions.
# '''
# _run_test_and_check(
# partial(
# _async_main,
# open_test_pikerd_and_ems=open_test_pikerd_and_ems,
# action='buy',
# executions=5,
# ),
# )
# _run_test_and_check(
# partial(
# _async_main,
# open_test_pikerd_and_ems=open_test_pikerd_and_ems,
# action='sell',
# executions=5,
# price=1,
# assert_zeroed_pps=True,
# ),
# )
run_and_tollerate_cancels(just_check_pp)