diff --git a/tests/test_paper.py b/tests/test_paper.py index 2f46c559..01ea57d4 100644 --- a/tests/test_paper.py +++ b/tests/test_paper.py @@ -1,230 +1,330 @@ ''' Paper-mode testing ''' - -import trio -from exceptiongroup import BaseExceptionGroup +from contextlib import ( + contextmanager as cm, +) from typing import ( + Awaitable, + Callable, AsyncContextManager, Literal, ) +import trio +# import pytest_trio +from exceptiongroup import BaseExceptionGroup + import pytest -from tractor._exceptions import ContextCancelled +import tractor from uuid import uuid4 from functools import partial +from piker.service import Services from piker.log import get_logger -from piker.clearing._messages import Order +from piker.clearing._messages import ( + Order, + Status, + # Cancel, + BrokerdPosition, +) +from piker.clearing import ( + open_ems, + OrderClient, +) +from piker.accounting._mktinfo import ( + unpack_fqme, +) from piker.accounting import ( open_pps, + Position, ) log = get_logger(__name__) -def get_fqsn(broker, symbol): - fqsn = f'{symbol}.{broker}' - return (fqsn, symbol, broker) +async def open_pikerd( + open_test_pikerd: AsyncContextManager, + +) -> Services: + async with ( + open_test_pikerd() as (_, _, _, services), + ): + yield services -oid = '' -test_exec_mode = 'live' -(fqsn, symbol, broker) = get_fqsn('kraken', 'xbtusdt') -brokers = [broker] -account = 'paper' - - -async def _async_main( - open_test_pikerd_and_ems: AsyncContextManager, - action: Literal['buy', 'sell'] | None = None, - price: int = 30000, +async def submit_order( + client: OrderClient, + trades_stream: tractor.MsgStream, + fqme: str, + action: Literal['buy', 'sell'], + price: float = 30000., executions: int = 1, size: float = 0.01, + exec_mode: str = 'live', + account: str = 'paper', - # Assert options - assert_entries: bool = False, - assert_pps: bool = False, - assert_zeroed_pps: bool = False, - assert_msg: bool = False, - -) -> None: +) -> list[Status | BrokerdPosition]: ''' Start piker, place a trade and assert data in pps stream, ledger and position table. ''' - oid: str = '' - last_msg = {} + sent: list[Order] = [] + broker, key, suffix = unpack_fqme(fqme) - async with open_test_pikerd_and_ems() as ( - services, - (book, trades_stream, pps, accounts, dialogs), - ): - if action: - for x in range(executions): - oid = str(uuid4()) - order = Order( - exec_mode=test_exec_mode, - action=action, - oid=oid, - account=account, - size=size, - symbol=fqsn, - price=price, - brokers=brokers, - ) - # This is actually a syncronous call to push a message - book.send(order) + for _ in range(executions): - async for msg in trades_stream: - last_msg = msg - match msg: - # Wait for position message before moving on - case {'name': 'position'}: - break - - # Teardown piker like a user would - raise KeyboardInterrupt - - if assert_entries or assert_pps or assert_zeroed_pps or assert_msg: - _assert( - assert_entries, - assert_pps, - assert_zeroed_pps, - pps, - last_msg, - size, - executions, + order = Order( + exec_mode=exec_mode, + action=action, + oid=str(uuid4()), + account=account, + size=size, + symbol=fqme, + price=price, + brokers=[broker], ) + sent.append(order) + await client.send(order) + + # TODO: i guess we should still test the old sync-API? + # client.send_nowait(order) + + msgs: list[Status | BrokerdPosition] = [] + async for msg in trades_stream: + print(f'Rx Order resp: {msg}') + match msg: + + # Wait for position message before moving on + case {'name': 'position'}: + ppmsg = BrokerdPosition(**msg) + msgs.append(ppmsg) + break + + case {'name': 'status'}: + msgs.append(Status(**msg)) + + return sent, msgs -def _assert( - assert_entries, - assert_pps, - assert_zerod_pps, - pps, - last_msg, - size, - executions, +def run_and_catch( + fn: Callable[..., Awaitable], + + expect_errs: tuple[Exception] = ( + KeyboardInterrupt, + tractor.ContextCancelled, + ) + ): - with ( - open_pps(broker, account, write_on_exit=False) as table, - ): - ''' - Assert multiple cases including pps, - ledger and final position message state - - ''' - if assert_entries: - for key, val in [ - ('broker', broker), - ('account', account), - ('symbol', fqsn), - ('size', size * executions), - ('currency', symbol), - ('avg_price', table.pps[symbol].ppu) - ]: - assert last_msg[key] == val - - if assert_pps: - last_ppu = pps[(broker, account)][-1] - assert last_ppu['avg_price'] == table.pps[symbol].ppu - - if assert_zerod_pps: - assert not bool(table.pps) - - -def _run_test_and_check(fn): ''' Close position and assert empty position in pps ''' - with pytest.raises(BaseExceptionGroup) as exc_info: + if expect_errs: + with pytest.raises(BaseExceptionGroup) as exc_info: + trio.run(fn) + + for err in exc_info.value.exceptions: + assert type(err) in expect_errs + else: trio.run(fn) - for exception in exc_info.value.exceptions: - assert isinstance(exception, KeyboardInterrupt) or isinstance( - exception, ContextCancelled - ) + +@cm +def load_and_check_pos( + order: Order, + ppmsg: BrokerdPosition, + +) -> None: + + with open_pps(ppmsg.broker, ppmsg.account) as table: + + # NOTE: a special case is here since the `PpTable.pps` are + # normally indexed by the particular broker's + # `Position.bs_mktid: str` (a unique market / symbol id provided + # by their systems/design) but for the paper engine case, this + # is the same the fqme. + pp: Position = table.pps[ppmsg.symbol] + + assert ppmsg.size == pp.size + assert ppmsg.avg_price == pp.ppu + + yield pp -def test_buy( - open_test_pikerd_and_ems: AsyncContextManager, +@pytest.mark.trio +async def test_ems_err_on_bad_broker( + open_pikerd: Services, + loglevel: str, +): + try: + async with open_ems( + 'doggy.smiles', + mode='paper', + loglevel=loglevel, + ) as _: + pytest.fail('EMS is working on non-broker!?') + except ModuleNotFoundError: + pass + + +async def atest_buy( + loglevel: str, ): ''' Enter a trade and assert entries are made in pps and ledger files. + Shutdown the ems-client and ensure on reconnect we get the expected + matching ``BrokerdPosition`` and pps.toml entries. + ''' - _run_test_and_check( - partial( - _async_main, - open_test_pikerd_and_ems=open_test_pikerd_and_ems, + broker: str = 'kraken' + mkt_key: str = 'xbtusdt' + fqme: str = f'{mkt_key}.{broker}' + + startup_pps: dict[ + tuple[str, str], # brokername, acctid + list[BrokerdPosition], + ] + + assert loglevel == 'info' + async with ( + open_ems( + fqme, + mode='paper', + loglevel=loglevel, + ) as ( + client, # OrderClient + trades_stream, + startup_pps, + accounts, + dialogs, + ) + ): + # no positions on startup + assert not startup_pps + assert 'paper' in accounts + + sent, msgs = await submit_order( + client, + trades_stream, + fqme, action='buy', - assert_entries=True, - ), - ) - - # Open ems and assert existence of pps entries - _run_test_and_check( - partial( - _async_main, - open_test_pikerd_and_ems=open_test_pikerd_and_ems, - assert_pps=True, - ), + size=1, + ) + + last_order = sent[-1] + + last_resp = msgs[-1] + assert isinstance(last_resp, BrokerdPosition) + + # check that pps.toml for account has been updated + with load_and_check_pos( + last_order, + last_resp, + ) as pos: + return pos + + # disconnect from EMS, then reconnect and ensure we get our same + # position relayed to us again. + + # _run_test_and_check( + # partial( + # _async_main, + # open_test_pikerd_and_ems=open_test_pikerd_and_ems, + # action='buy', + # assert_entries=True, + # ), + # ) + + # await _async_main( + # open_test_pikerd_and_ems=open_test_pikerd_and_ems, + # assert_pps=True, + # ) + # _run_test_and_check( + # partial( + # _async_main, + # open_test_pikerd_and_ems=open_test_pikerd_and_ems, + # assert_pps=True, + # ), + # ) + + +def test_open_long( + open_test_pikerd: AsyncContextManager, + loglevel: str, + +) -> None: + + async def atest(): + async with ( + open_test_pikerd() as (_, _, _, services), + ): + assert await atest_buy(loglevel) + + # Teardown piker like a user would from cli + # raise KeyboardInterrupt + + run_and_catch( + atest, + expect_errs=None, ) + # Open ems another time and assert existence of prior + # pps entries confirming they persisted -def test_sell( - open_test_pikerd_and_ems: AsyncContextManager, -): - ''' - Sell position and ensure pps are zeroed. - ''' - _run_test_and_check( - partial( - _async_main, - open_test_pikerd_and_ems=open_test_pikerd_and_ems, - action='sell', - price=1, - ), - ) +# def test_sell( +# open_test_pikerd_and_ems: AsyncContextManager, +# ): +# ''' +# Sell position and ensure pps are zeroed. - _run_test_and_check( - partial( - _async_main, - open_test_pikerd_and_ems=open_test_pikerd_and_ems, - assert_zeroed_pps=True, - ), - ) +# ''' +# _run_test_and_check( +# partial( +# _async_main, +# open_test_pikerd_and_ems=open_test_pikerd_and_ems, +# action='sell', +# price=1, +# ), +# ) + +# _run_test_and_check( +# partial( +# _async_main, +# open_test_pikerd_and_ems=open_test_pikerd_and_ems, +# assert_zeroed_pps=True, +# ), +# ) -def test_multi_sell( - open_test_pikerd_and_ems: AsyncContextManager, -): - ''' - Make 5 market limit buy orders and - then sell 5 slots at the same price. - Finally, assert cleared positions. +# def test_multi_sell( +# open_test_pikerd_and_ems: AsyncContextManager, +# ): +# ''' +# Make 5 market limit buy orders and +# then sell 5 slots at the same price. +# Finally, assert cleared positions. - ''' - _run_test_and_check( - partial( - _async_main, - open_test_pikerd_and_ems=open_test_pikerd_and_ems, - action='buy', - executions=5, - ), - ) +# ''' +# _run_test_and_check( +# partial( +# _async_main, +# open_test_pikerd_and_ems=open_test_pikerd_and_ems, +# action='buy', +# executions=5, +# ), +# ) - _run_test_and_check( - partial( - _async_main, - open_test_pikerd_and_ems=open_test_pikerd_and_ems, - action='sell', - executions=5, - price=1, - assert_zeroed_pps=True, - ), - ) +# _run_test_and_check( +# partial( +# _async_main, +# open_test_pikerd_and_ems=open_test_pikerd_and_ems, +# action='sell', +# executions=5, +# price=1, +# assert_zeroed_pps=True, +# ), +# )