WIP complete rework of paper engine tests

More or less we need to be able to audit not only simple "make trades
check pps.toml files" tests (which btw were great to get started!).

We also need more sophisticated and granular order mgmt and service
config scenarios,

- full e2e EMS msg flow verification
- multi-client (dis)connection scenarios and/or monitoring
- dark order clearing and offline storage
- accounting schema and position calcs detailing

As such, this is the beginning to "modularlizingz" the components needed
in the test harness to this end by breaking up the `OrderClient` control
flows vs. position checking logic so as to allow for more flexible test
scenario cases and likely `pytest` parametrizations over different
transaction sequences.
rekt_pps
Tyler Goodlet 2023-04-04 13:03:52 -04:00
parent d67031d9ab
commit b619e4a82d
1 changed files with 271 additions and 171 deletions

View File

@ -1,230 +1,330 @@
'''
Paper-mode testing
'''
import trio
from exceptiongroup import BaseExceptionGroup
from contextlib import (
contextmanager as cm,
)
from typing import (
Awaitable,
Callable,
AsyncContextManager,
Literal,
)
import trio
# import pytest_trio
from exceptiongroup import BaseExceptionGroup
import pytest
from tractor._exceptions import ContextCancelled
import tractor
from uuid import uuid4
from functools import partial
from piker.service import Services
from piker.log import get_logger
from piker.clearing._messages import Order
from piker.clearing._messages import (
Order,
Status,
# Cancel,
BrokerdPosition,
)
from piker.clearing import (
open_ems,
OrderClient,
)
from piker.accounting._mktinfo import (
unpack_fqme,
)
from piker.accounting import (
open_pps,
Position,
)
log = get_logger(__name__)
def get_fqsn(broker, symbol):
fqsn = f'{symbol}.{broker}'
return (fqsn, symbol, broker)
async def open_pikerd(
open_test_pikerd: AsyncContextManager,
) -> Services:
async with (
open_test_pikerd() as (_, _, _, services),
):
yield services
oid = ''
test_exec_mode = 'live'
(fqsn, symbol, broker) = get_fqsn('kraken', 'xbtusdt')
brokers = [broker]
account = 'paper'
async def _async_main(
open_test_pikerd_and_ems: AsyncContextManager,
action: Literal['buy', 'sell'] | None = None,
price: int = 30000,
async def submit_order(
client: OrderClient,
trades_stream: tractor.MsgStream,
fqme: str,
action: Literal['buy', 'sell'],
price: float = 30000.,
executions: int = 1,
size: float = 0.01,
exec_mode: str = 'live',
account: str = 'paper',
# Assert options
assert_entries: bool = False,
assert_pps: bool = False,
assert_zeroed_pps: bool = False,
assert_msg: bool = False,
) -> None:
) -> list[Status | BrokerdPosition]:
'''
Start piker, place a trade and assert data in
pps stream, ledger and position table.
'''
oid: str = ''
last_msg = {}
sent: list[Order] = []
broker, key, suffix = unpack_fqme(fqme)
for _ in range(executions):
async with open_test_pikerd_and_ems() as (
services,
(book, trades_stream, pps, accounts, dialogs),
):
if action:
for x in range(executions):
oid = str(uuid4())
order = Order(
exec_mode=test_exec_mode,
exec_mode=exec_mode,
action=action,
oid=oid,
oid=str(uuid4()),
account=account,
size=size,
symbol=fqsn,
symbol=fqme,
price=price,
brokers=brokers,
brokers=[broker],
)
# This is actually a syncronous call to push a message
book.send(order)
sent.append(order)
await client.send(order)
# TODO: i guess we should still test the old sync-API?
# client.send_nowait(order)
msgs: list[Status | BrokerdPosition] = []
async for msg in trades_stream:
last_msg = msg
print(f'Rx Order resp: {msg}')
match msg:
# Wait for position message before moving on
case {'name': 'position'}:
ppmsg = BrokerdPosition(**msg)
msgs.append(ppmsg)
break
# Teardown piker like a user would
raise KeyboardInterrupt
case {'name': 'status'}:
msgs.append(Status(**msg))
if assert_entries or assert_pps or assert_zeroed_pps or assert_msg:
_assert(
assert_entries,
assert_pps,
assert_zeroed_pps,
pps,
last_msg,
size,
executions,
return sent, msgs
def run_and_catch(
fn: Callable[..., Awaitable],
expect_errs: tuple[Exception] = (
KeyboardInterrupt,
tractor.ContextCancelled,
)
def _assert(
assert_entries,
assert_pps,
assert_zerod_pps,
pps,
last_msg,
size,
executions,
):
with (
open_pps(broker, account, write_on_exit=False) as table,
):
'''
Assert multiple cases including pps,
ledger and final position message state
'''
if assert_entries:
for key, val in [
('broker', broker),
('account', account),
('symbol', fqsn),
('size', size * executions),
('currency', symbol),
('avg_price', table.pps[symbol].ppu)
]:
assert last_msg[key] == val
if assert_pps:
last_ppu = pps[(broker, account)][-1]
assert last_ppu['avg_price'] == table.pps[symbol].ppu
if assert_zerod_pps:
assert not bool(table.pps)
def _run_test_and_check(fn):
'''
Close position and assert empty position in pps
'''
if expect_errs:
with pytest.raises(BaseExceptionGroup) as exc_info:
trio.run(fn)
for exception in exc_info.value.exceptions:
assert isinstance(exception, KeyboardInterrupt) or isinstance(
exception, ContextCancelled
)
for err in exc_info.value.exceptions:
assert type(err) in expect_errs
else:
trio.run(fn)
def test_buy(
open_test_pikerd_and_ems: AsyncContextManager,
@cm
def load_and_check_pos(
order: Order,
ppmsg: BrokerdPosition,
) -> None:
with open_pps(ppmsg.broker, ppmsg.account) as table:
# NOTE: a special case is here since the `PpTable.pps` are
# normally indexed by the particular broker's
# `Position.bs_mktid: str` (a unique market / symbol id provided
# by their systems/design) but for the paper engine case, this
# is the same the fqme.
pp: Position = table.pps[ppmsg.symbol]
assert ppmsg.size == pp.size
assert ppmsg.avg_price == pp.ppu
yield pp
@pytest.mark.trio
async def test_ems_err_on_bad_broker(
open_pikerd: Services,
loglevel: str,
):
try:
async with open_ems(
'doggy.smiles',
mode='paper',
loglevel=loglevel,
) as _:
pytest.fail('EMS is working on non-broker!?')
except ModuleNotFoundError:
pass
async def atest_buy(
loglevel: str,
):
'''
Enter a trade and assert entries are made in pps and ledger files.
Shutdown the ems-client and ensure on reconnect we get the expected
matching ``BrokerdPosition`` and pps.toml entries.
'''
_run_test_and_check(
partial(
_async_main,
open_test_pikerd_and_ems=open_test_pikerd_and_ems,
action='buy',
assert_entries=True,
),
broker: str = 'kraken'
mkt_key: str = 'xbtusdt'
fqme: str = f'{mkt_key}.{broker}'
startup_pps: dict[
tuple[str, str], # brokername, acctid
list[BrokerdPosition],
]
assert loglevel == 'info'
async with (
open_ems(
fqme,
mode='paper',
loglevel=loglevel,
) as (
client, # OrderClient
trades_stream,
startup_pps,
accounts,
dialogs,
)
# Open ems and assert existence of pps entries
_run_test_and_check(
partial(
_async_main,
open_test_pikerd_and_ems=open_test_pikerd_and_ems,
assert_pps=True,
),
)
def test_sell(
open_test_pikerd_and_ems: AsyncContextManager,
):
'''
Sell position and ensure pps are zeroed.
# no positions on startup
assert not startup_pps
assert 'paper' in accounts
'''
_run_test_and_check(
partial(
_async_main,
open_test_pikerd_and_ems=open_test_pikerd_and_ems,
action='sell',
price=1,
),
)
_run_test_and_check(
partial(
_async_main,
open_test_pikerd_and_ems=open_test_pikerd_and_ems,
assert_zeroed_pps=True,
),
)
def test_multi_sell(
open_test_pikerd_and_ems: AsyncContextManager,
):
'''
Make 5 market limit buy orders and
then sell 5 slots at the same price.
Finally, assert cleared positions.
'''
_run_test_and_check(
partial(
_async_main,
open_test_pikerd_and_ems=open_test_pikerd_and_ems,
sent, msgs = await submit_order(
client,
trades_stream,
fqme,
action='buy',
executions=5,
),
size=1,
)
_run_test_and_check(
partial(
_async_main,
open_test_pikerd_and_ems=open_test_pikerd_and_ems,
action='sell',
executions=5,
price=1,
assert_zeroed_pps=True,
),
last_order = sent[-1]
last_resp = msgs[-1]
assert isinstance(last_resp, BrokerdPosition)
# check that pps.toml for account has been updated
with load_and_check_pos(
last_order,
last_resp,
) as pos:
return pos
# disconnect from EMS, then reconnect and ensure we get our same
# position relayed to us again.
# _run_test_and_check(
# partial(
# _async_main,
# open_test_pikerd_and_ems=open_test_pikerd_and_ems,
# action='buy',
# assert_entries=True,
# ),
# )
# await _async_main(
# open_test_pikerd_and_ems=open_test_pikerd_and_ems,
# assert_pps=True,
# )
# _run_test_and_check(
# partial(
# _async_main,
# open_test_pikerd_and_ems=open_test_pikerd_and_ems,
# assert_pps=True,
# ),
# )
def test_open_long(
open_test_pikerd: AsyncContextManager,
loglevel: str,
) -> None:
async def atest():
async with (
open_test_pikerd() as (_, _, _, services),
):
assert await atest_buy(loglevel)
# Teardown piker like a user would from cli
# raise KeyboardInterrupt
run_and_catch(
atest,
expect_errs=None,
)
# Open ems another time and assert existence of prior
# pps entries confirming they persisted
# def test_sell(
# open_test_pikerd_and_ems: AsyncContextManager,
# ):
# '''
# Sell position and ensure pps are zeroed.
# '''
# _run_test_and_check(
# partial(
# _async_main,
# open_test_pikerd_and_ems=open_test_pikerd_and_ems,
# action='sell',
# price=1,
# ),
# )
# _run_test_and_check(
# partial(
# _async_main,
# open_test_pikerd_and_ems=open_test_pikerd_and_ems,
# assert_zeroed_pps=True,
# ),
# )
# def test_multi_sell(
# open_test_pikerd_and_ems: AsyncContextManager,
# ):
# '''
# Make 5 market limit buy orders and
# then sell 5 slots at the same price.
# Finally, assert cleared positions.
# '''
# _run_test_and_check(
# partial(
# _async_main,
# open_test_pikerd_and_ems=open_test_pikerd_and_ems,
# action='buy',
# executions=5,
# ),
# )
# _run_test_and_check(
# partial(
# _async_main,
# open_test_pikerd_and_ems=open_test_pikerd_and_ems,
# action='sell',
# executions=5,
# price=1,
# assert_zeroed_pps=True,
# ),
# )