From b619e4a82d20f0882628db5d4efcabdcbc163f72 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 4 Apr 2023 13:03:52 -0400 Subject: [PATCH] WIP complete rework of paper engine tests More or less we need to be able to audit not only simple "make trades check pps.toml files" tests (which btw were great to get started!). We also need more sophisticated and granular order mgmt and service config scenarios, - full e2e EMS msg flow verification - multi-client (dis)connection scenarios and/or monitoring - dark order clearing and offline storage - accounting schema and position calcs detailing As such, this is the beginning to "modularlizingz" the components needed in the test harness to this end by breaking up the `OrderClient` control flows vs. position checking logic so as to allow for more flexible test scenario cases and likely `pytest` parametrizations over different transaction sequences. --- tests/test_paper.py | 442 +++++++++++++++++++++++++++----------------- 1 file changed, 271 insertions(+), 171 deletions(-) diff --git a/tests/test_paper.py b/tests/test_paper.py index 2f46c559..01ea57d4 100644 --- a/tests/test_paper.py +++ b/tests/test_paper.py @@ -1,230 +1,330 @@ ''' Paper-mode testing ''' - -import trio -from exceptiongroup import BaseExceptionGroup +from contextlib import ( + contextmanager as cm, +) from typing import ( + Awaitable, + Callable, AsyncContextManager, Literal, ) +import trio +# import pytest_trio +from exceptiongroup import BaseExceptionGroup + import pytest -from tractor._exceptions import ContextCancelled +import tractor from uuid import uuid4 from functools import partial +from piker.service import Services from piker.log import get_logger -from piker.clearing._messages import Order +from piker.clearing._messages import ( + Order, + Status, + # Cancel, + BrokerdPosition, +) +from piker.clearing import ( + open_ems, + OrderClient, +) +from piker.accounting._mktinfo import ( + unpack_fqme, +) from piker.accounting import ( open_pps, + Position, ) log = get_logger(__name__) -def get_fqsn(broker, symbol): - fqsn = f'{symbol}.{broker}' - return (fqsn, symbol, broker) +async def open_pikerd( + open_test_pikerd: AsyncContextManager, + +) -> Services: + async with ( + open_test_pikerd() as (_, _, _, services), + ): + yield services -oid = '' -test_exec_mode = 'live' -(fqsn, symbol, broker) = get_fqsn('kraken', 'xbtusdt') -brokers = [broker] -account = 'paper' - - -async def _async_main( - open_test_pikerd_and_ems: AsyncContextManager, - action: Literal['buy', 'sell'] | None = None, - price: int = 30000, +async def submit_order( + client: OrderClient, + trades_stream: tractor.MsgStream, + fqme: str, + action: Literal['buy', 'sell'], + price: float = 30000., executions: int = 1, size: float = 0.01, + exec_mode: str = 'live', + account: str = 'paper', - # Assert options - assert_entries: bool = False, - assert_pps: bool = False, - assert_zeroed_pps: bool = False, - assert_msg: bool = False, - -) -> None: +) -> list[Status | BrokerdPosition]: ''' Start piker, place a trade and assert data in pps stream, ledger and position table. ''' - oid: str = '' - last_msg = {} + sent: list[Order] = [] + broker, key, suffix = unpack_fqme(fqme) - async with open_test_pikerd_and_ems() as ( - services, - (book, trades_stream, pps, accounts, dialogs), - ): - if action: - for x in range(executions): - oid = str(uuid4()) - order = Order( - exec_mode=test_exec_mode, - action=action, - oid=oid, - account=account, - size=size, - symbol=fqsn, - price=price, - brokers=brokers, - ) - # This is actually a syncronous call to push a message - book.send(order) + for _ in range(executions): - async for msg in trades_stream: - last_msg = msg - match msg: - # Wait for position message before moving on - case {'name': 'position'}: - break - - # Teardown piker like a user would - raise KeyboardInterrupt - - if assert_entries or assert_pps or assert_zeroed_pps or assert_msg: - _assert( - assert_entries, - assert_pps, - assert_zeroed_pps, - pps, - last_msg, - size, - executions, + order = Order( + exec_mode=exec_mode, + action=action, + oid=str(uuid4()), + account=account, + size=size, + symbol=fqme, + price=price, + brokers=[broker], ) + sent.append(order) + await client.send(order) + + # TODO: i guess we should still test the old sync-API? + # client.send_nowait(order) + + msgs: list[Status | BrokerdPosition] = [] + async for msg in trades_stream: + print(f'Rx Order resp: {msg}') + match msg: + + # Wait for position message before moving on + case {'name': 'position'}: + ppmsg = BrokerdPosition(**msg) + msgs.append(ppmsg) + break + + case {'name': 'status'}: + msgs.append(Status(**msg)) + + return sent, msgs -def _assert( - assert_entries, - assert_pps, - assert_zerod_pps, - pps, - last_msg, - size, - executions, +def run_and_catch( + fn: Callable[..., Awaitable], + + expect_errs: tuple[Exception] = ( + KeyboardInterrupt, + tractor.ContextCancelled, + ) + ): - with ( - open_pps(broker, account, write_on_exit=False) as table, - ): - ''' - Assert multiple cases including pps, - ledger and final position message state - - ''' - if assert_entries: - for key, val in [ - ('broker', broker), - ('account', account), - ('symbol', fqsn), - ('size', size * executions), - ('currency', symbol), - ('avg_price', table.pps[symbol].ppu) - ]: - assert last_msg[key] == val - - if assert_pps: - last_ppu = pps[(broker, account)][-1] - assert last_ppu['avg_price'] == table.pps[symbol].ppu - - if assert_zerod_pps: - assert not bool(table.pps) - - -def _run_test_and_check(fn): ''' Close position and assert empty position in pps ''' - with pytest.raises(BaseExceptionGroup) as exc_info: + if expect_errs: + with pytest.raises(BaseExceptionGroup) as exc_info: + trio.run(fn) + + for err in exc_info.value.exceptions: + assert type(err) in expect_errs + else: trio.run(fn) - for exception in exc_info.value.exceptions: - assert isinstance(exception, KeyboardInterrupt) or isinstance( - exception, ContextCancelled - ) + +@cm +def load_and_check_pos( + order: Order, + ppmsg: BrokerdPosition, + +) -> None: + + with open_pps(ppmsg.broker, ppmsg.account) as table: + + # NOTE: a special case is here since the `PpTable.pps` are + # normally indexed by the particular broker's + # `Position.bs_mktid: str` (a unique market / symbol id provided + # by their systems/design) but for the paper engine case, this + # is the same the fqme. + pp: Position = table.pps[ppmsg.symbol] + + assert ppmsg.size == pp.size + assert ppmsg.avg_price == pp.ppu + + yield pp -def test_buy( - open_test_pikerd_and_ems: AsyncContextManager, +@pytest.mark.trio +async def test_ems_err_on_bad_broker( + open_pikerd: Services, + loglevel: str, +): + try: + async with open_ems( + 'doggy.smiles', + mode='paper', + loglevel=loglevel, + ) as _: + pytest.fail('EMS is working on non-broker!?') + except ModuleNotFoundError: + pass + + +async def atest_buy( + loglevel: str, ): ''' Enter a trade and assert entries are made in pps and ledger files. + Shutdown the ems-client and ensure on reconnect we get the expected + matching ``BrokerdPosition`` and pps.toml entries. + ''' - _run_test_and_check( - partial( - _async_main, - open_test_pikerd_and_ems=open_test_pikerd_and_ems, + broker: str = 'kraken' + mkt_key: str = 'xbtusdt' + fqme: str = f'{mkt_key}.{broker}' + + startup_pps: dict[ + tuple[str, str], # brokername, acctid + list[BrokerdPosition], + ] + + assert loglevel == 'info' + async with ( + open_ems( + fqme, + mode='paper', + loglevel=loglevel, + ) as ( + client, # OrderClient + trades_stream, + startup_pps, + accounts, + dialogs, + ) + ): + # no positions on startup + assert not startup_pps + assert 'paper' in accounts + + sent, msgs = await submit_order( + client, + trades_stream, + fqme, action='buy', - assert_entries=True, - ), - ) - - # Open ems and assert existence of pps entries - _run_test_and_check( - partial( - _async_main, - open_test_pikerd_and_ems=open_test_pikerd_and_ems, - assert_pps=True, - ), + size=1, + ) + + last_order = sent[-1] + + last_resp = msgs[-1] + assert isinstance(last_resp, BrokerdPosition) + + # check that pps.toml for account has been updated + with load_and_check_pos( + last_order, + last_resp, + ) as pos: + return pos + + # disconnect from EMS, then reconnect and ensure we get our same + # position relayed to us again. + + # _run_test_and_check( + # partial( + # _async_main, + # open_test_pikerd_and_ems=open_test_pikerd_and_ems, + # action='buy', + # assert_entries=True, + # ), + # ) + + # await _async_main( + # open_test_pikerd_and_ems=open_test_pikerd_and_ems, + # assert_pps=True, + # ) + # _run_test_and_check( + # partial( + # _async_main, + # open_test_pikerd_and_ems=open_test_pikerd_and_ems, + # assert_pps=True, + # ), + # ) + + +def test_open_long( + open_test_pikerd: AsyncContextManager, + loglevel: str, + +) -> None: + + async def atest(): + async with ( + open_test_pikerd() as (_, _, _, services), + ): + assert await atest_buy(loglevel) + + # Teardown piker like a user would from cli + # raise KeyboardInterrupt + + run_and_catch( + atest, + expect_errs=None, ) + # Open ems another time and assert existence of prior + # pps entries confirming they persisted -def test_sell( - open_test_pikerd_and_ems: AsyncContextManager, -): - ''' - Sell position and ensure pps are zeroed. - ''' - _run_test_and_check( - partial( - _async_main, - open_test_pikerd_and_ems=open_test_pikerd_and_ems, - action='sell', - price=1, - ), - ) +# def test_sell( +# open_test_pikerd_and_ems: AsyncContextManager, +# ): +# ''' +# Sell position and ensure pps are zeroed. - _run_test_and_check( - partial( - _async_main, - open_test_pikerd_and_ems=open_test_pikerd_and_ems, - assert_zeroed_pps=True, - ), - ) +# ''' +# _run_test_and_check( +# partial( +# _async_main, +# open_test_pikerd_and_ems=open_test_pikerd_and_ems, +# action='sell', +# price=1, +# ), +# ) + +# _run_test_and_check( +# partial( +# _async_main, +# open_test_pikerd_and_ems=open_test_pikerd_and_ems, +# assert_zeroed_pps=True, +# ), +# ) -def test_multi_sell( - open_test_pikerd_and_ems: AsyncContextManager, -): - ''' - Make 5 market limit buy orders and - then sell 5 slots at the same price. - Finally, assert cleared positions. +# def test_multi_sell( +# open_test_pikerd_and_ems: AsyncContextManager, +# ): +# ''' +# Make 5 market limit buy orders and +# then sell 5 slots at the same price. +# Finally, assert cleared positions. - ''' - _run_test_and_check( - partial( - _async_main, - open_test_pikerd_and_ems=open_test_pikerd_and_ems, - action='buy', - executions=5, - ), - ) +# ''' +# _run_test_and_check( +# partial( +# _async_main, +# open_test_pikerd_and_ems=open_test_pikerd_and_ems, +# action='buy', +# executions=5, +# ), +# ) - _run_test_and_check( - partial( - _async_main, - open_test_pikerd_and_ems=open_test_pikerd_and_ems, - action='sell', - executions=5, - price=1, - assert_zeroed_pps=True, - ), - ) +# _run_test_and_check( +# partial( +# _async_main, +# open_test_pikerd_and_ems=open_test_pikerd_and_ems, +# action='sell', +# executions=5, +# price=1, +# assert_zeroed_pps=True, +# ), +# )