diff --git a/tests/test_paper.py b/tests/test_paper.py index 75be3dc2..0e791197 100644 --- a/tests/test_paper.py +++ b/tests/test_paper.py @@ -18,7 +18,6 @@ from exceptiongroup import BaseExceptionGroup import pytest import tractor from uuid import uuid4 -from functools import partial from piker.service import Services from piker.log import get_logger @@ -53,14 +52,15 @@ async def open_pikerd( yield services -async def submit_order( +async def order_and_and_wait_for_ppmsg( client: OrderClient, trades_stream: tractor.MsgStream, fqme: str, + action: Literal['buy', 'sell'], - price: float = 30000., - executions: int = 1, + price: float = 100e3, # just a super high price. size: float = 0.01, + exec_mode: str = 'live', account: str = 'paper', @@ -73,51 +73,49 @@ async def submit_order( sent: list[Order] = [] broker, key, suffix = unpack_fqme(fqme) - for _ in range(executions): + order = Order( + exec_mode=exec_mode, + action=action, # TODO: remove this from our schema? + oid=str(uuid4()), + account=account, + size=size, + symbol=fqme, + price=price, + brokers=[broker], + ) + sent.append(order) + await client.send(order) - order = Order( - exec_mode=exec_mode, - action=action, - oid=str(uuid4()), - account=account, - size=size, - symbol=fqme, - price=price, - brokers=[broker], - ) - sent.append(order) - await client.send(order) + # TODO: i guess we should still test the old sync-API? + # client.send_nowait(order) - # TODO: i guess we should still test the old sync-API? - # client.send_nowait(order) + # Wait for position message before moving on to verify flow(s) + # for the multi-order position entry/exit. + msgs: list[Status | BrokerdPosition] = [] + async for msg in trades_stream: + match msg: + case {'name': 'position'}: + ppmsg = BrokerdPosition(**msg) + msgs.append(ppmsg) + break - # Wait for position message before moving on to verify flow(s) - # for the multi-order position entry/exit. - msgs: list[Status | BrokerdPosition] = [] - async for msg in trades_stream: - match msg: - case {'name': 'position'}: - ppmsg = BrokerdPosition(**msg) - msgs.append(ppmsg) - break - - case {'name': 'status'}: - msgs.append(Status(**msg)) + case {'name': 'status'}: + msgs.append(Status(**msg)) return sent, msgs -def run_and_catch( +def run_and_tollerate_cancels( fn: Callable[..., Awaitable], - expect_errs: tuple[Exception] = ( - KeyboardInterrupt, - tractor.ContextCancelled, - ) + expect_errs: tuple[Exception] | None = None, + tollerate_errs: tuple[Exception] = (tractor.ContextCancelled,), ): ''' - Close position and assert empty position in pps + Run ``trio``-``piker`` runtime with potential tolerance for + inter-actor cancellation during teardown (normally just + `tractor.ContextCancelled`s). ''' if expect_errs: @@ -127,7 +125,10 @@ def run_and_catch( for err in exc_info.value.exceptions: assert type(err) in expect_errs else: - trio.run(fn) + try: + trio.run(fn) + except tollerate_errs: + pass @cm @@ -139,22 +140,28 @@ def load_and_check_pos( with open_pps(ppmsg.broker, ppmsg.account) as table: - # NOTE: a special case is here since the `PpTable.pps` are - # normally indexed by the particular broker's - # `Position.bs_mktid: str` (a unique market / symbol id provided - # by their systems/design) but for the paper engine case, this - # is the same the fqme. - pp: Position = table.pps[ppmsg.symbol] + if ppmsg.size == 0: + assert ppmsg.symbol not in table.pps + yield None + return - assert ppmsg.size == pp.size - assert ppmsg.avg_price == pp.ppu + else: + # NOTE: a special case is here since the `PpTable.pps` are + # normally indexed by the particular broker's + # `Position.bs_mktid: str` (a unique market / symbol id provided + # by their systems/design) but for the paper engine case, this + # is the same the fqme. + pp: Position = table.pps[ppmsg.symbol] - yield pp + assert ppmsg.size == pp.size + assert ppmsg.avg_price == pp.ppu + + yield pp @pytest.mark.trio async def test_ems_err_on_bad_broker( - open_pikerd: Services, + open_test_pikerd: Services, loglevel: str, ): try: @@ -168,9 +175,60 @@ async def test_ems_err_on_bad_broker( pass -async def atest_buy( +async def match_ppmsgs_on_ems_boot( + ppmsgs: list[BrokerdPosition], + +) -> None: + ''' + Given a list of input position msgs, verify they match + what is loaded from the EMS on connect. + + ''' + by_acct: dict[tuple, list[BrokerdPosition]] = {} + for msg in ppmsgs: + by_acct.setdefault( + (msg.broker, msg.account), + [], + ).append(msg) + + # TODO: actually support multi-mkts to `open_ems()` + # but for now just pass the first fqme. + fqme = msg.symbol + + # disconnect from EMS, reconnect and ensure we get our same + # position relayed to us again in the startup msg. + async with ( + open_ems( + fqme, + mode='paper', + loglevel='info', + ) as ( + _, # OrderClient + _, # tractor.MsgStream + startup_pps, + accounts, + _, # dialogs, + ) + ): + for (broker, account), ppmsgs in by_acct.items(): + assert account in accounts + + # lookup all msgs rx-ed for this account + rx_msgs = startup_pps[(broker, account)] + + for expect_ppmsg in ppmsgs: + rx_msg = BrokerdPosition(**rx_msgs[expect_ppmsg.symbol]) + assert rx_msg == expect_ppmsg + + +async def submit_and_check( + fills: tuple[dict], loglevel: str, -): + +) -> tuple[ + BrokerdPosition, + Position, +]: ''' Enter a trade and assert entries are made in pps and ledger files. @@ -203,125 +261,108 @@ async def atest_buy( assert not startup_pps assert 'paper' in accounts - sent, msgs = await submit_order( - client, - trades_stream, - fqme, - action='buy', - size=0.01, - ) - - last_order = sent[-1] + od: dict + for od in fills: + print(f'Sending order {od} for fill') + sent, msgs = await order_and_and_wait_for_ppmsg( + client, + trades_stream, + fqme, + action='buy', + size=od['size'], + ) + last_order: Order = sent[-1] last_resp = msgs[-1] assert isinstance(last_resp, BrokerdPosition) + ppmsg = last_resp # check that pps.toml for account has been updated + # and all ems position msgs match that state. with load_and_check_pos( last_order, - last_resp, + ppmsg, ) as pos: - return pos + pass - # disconnect from EMS, then reconnect and ensure we get our same - # position relayed to us again. - - # _run_test_and_check( - # partial( - # _async_main, - # open_test_pikerd_and_ems=open_test_pikerd_and_ems, - # action='buy', - # assert_entries=True, - # ), - # ) - - # await _async_main( - # open_test_pikerd_and_ems=open_test_pikerd_and_ems, - # assert_pps=True, - # ) - # _run_test_and_check( - # partial( - # _async_main, - # open_test_pikerd_and_ems=open_test_pikerd_and_ems, - # assert_pps=True, - # ), - # ) + return ppmsg, pos -def test_open_long( +@pytest.mark.parametrize( + 'fills', + [ + # buy and leave + ({'size': 0.001},), + + # sell short, then buy back to net-zero in dst + ( + {'size': -0.001}, + {'size': 0.001}, + ), + + # multi-partial entry and exits. + ( + # enters + {'size': 0.001}, + {'size': 0.002}, + + # partial exit + {'size': -0.001}, + + # partial enter + {'size': 0.0015}, + {'size': 0.001}, + {'size': 0.002}, + + # exits to get back to zero. + {'size': -0.001}, + {'size': -0.025}, + {'size': -0.0195}, + ), + ], + ids='fills={}'.format, +) +def test_multi_fill_positions( open_test_pikerd: AsyncContextManager, loglevel: str, + fills: tuple[dict], + + check_cross_session: bool = True, + ) -> None: + ppmsg: BrokerdPosition + pos: Position + + accum_size: float = 0 + for fill in fills: + accum_size += fill['size'] + async def atest(): + + # export to outer scope for audit on second runtime-boot. + nonlocal ppmsg, pos + async with ( open_test_pikerd() as (_, _, _, services), ): - assert await atest_buy(loglevel) + ppmsg, pos = await submit_and_check( + fills=fills, + loglevel=loglevel, + ) + assert ppmsg.size == accum_size - # Teardown piker like a user would from cli - # raise KeyboardInterrupt + run_and_tollerate_cancels(atest) - run_and_catch( - atest, - expect_errs=None, - ) - # Open ems another time and assert existence of prior - # pps entries confirming they persisted + if check_cross_session or accum_size != 0: + # rerun just to check that position info is persistent for the paper + # account (i.e. a user can expect to see paper pps persist across + # runtime sessions. + async def just_check_pp(): + async with ( + open_test_pikerd() as (_, _, _, services), + ): + await match_ppmsgs_on_ems_boot([ppmsg]) - - -# def test_sell( -# open_test_pikerd_and_ems: AsyncContextManager, -# ): -# ''' -# Sell position and ensure pps are zeroed. - -# ''' -# _run_test_and_check( -# partial( -# _async_main, -# open_test_pikerd_and_ems=open_test_pikerd_and_ems, -# action='sell', -# price=1, -# ), -# ) - -# _run_test_and_check( -# partial( -# _async_main, -# open_test_pikerd_and_ems=open_test_pikerd_and_ems, -# assert_zeroed_pps=True, -# ), -# ) - - -# def test_multi_sell( -# open_test_pikerd_and_ems: AsyncContextManager, -# ): -# ''' -# Make 5 market limit buy orders and -# then sell 5 slots at the same price. -# Finally, assert cleared positions. - -# ''' -# _run_test_and_check( -# partial( -# _async_main, -# open_test_pikerd_and_ems=open_test_pikerd_and_ems, -# action='buy', -# executions=5, -# ), -# ) - -# _run_test_and_check( -# partial( -# _async_main, -# open_test_pikerd_and_ems=open_test_pikerd_and_ems, -# action='sell', -# executions=5, -# price=1, -# assert_zeroed_pps=True, -# ), -# ) + run_and_tollerate_cancels(just_check_pp)