import trio import pytest import tractor import math from piker.log import get_logger from piker.clearing._messages import ( Order ) from uuid import uuid4 from typing import ( AsyncContextManager, Any, Literal, ) from functools import partial from piker.pp import ( open_trade_ledger, open_pps, PpTable ) from piker.clearing import ( open_ems, ) from piker.clearing._client import ( OrderBook, ) from piker.clearing._messages import ( BrokerdPosition ) from exceptiongroup import BaseExceptionGroup log = get_logger(__name__) def test_paper_trade( open_test_pikerd: AsyncContextManager ): cleared_price: float test_exec_mode='live' test_account = 'paper' test_size = 1 test_broker = 'kraken' test_brokers = [test_broker] test_symbol = 'xbtusdt' test_fqsn = f'{test_symbol}.{test_broker}' test_pp_account = 'piker-paper' positions: dict[ # brokername, acctid tuple[str, str], list[BrokerdPosition], ] async def _async_main( open_pikerd: AsyncContextManager, action: Literal['buy', 'sell'] | None = None, price: int = 30000, assert_entries: bool = False, ) -> Any: oid: str = str(uuid4()) book: OrderBook global cleared_price global positions # Set up piker and EMS async with ( open_pikerd() as (_, _, _, services), open_ems( test_fqsn, mode=test_account, ) as ( book, trades_stream, pps, accounts, dialogs, ), ): # Send order to EMS if action: order = Order( exec_mode=test_exec_mode, action=action, oid=oid, account=test_account, size=test_size, symbol=test_fqsn, price=price, brokers=test_brokers ) book.send(order) await trio.sleep(2) # Assert entries are made in both ledger and PPS if assert_entries: cleared_ledger_entry = {} with open_trade_ledger(test_broker, test_account) as ledger: cleared_ledger_entry = ledger[oid] cleared_price = cleared_ledger_entry["price"] assert list(ledger.keys())[-1] == oid assert cleared_ledger_entry['size'] == test_size assert cleared_ledger_entry['fqsn'] == test_fqsn with open_pps(test_broker, test_pp_account) as table: pp_price = table.conf[test_broker][test_pp_account][test_fqsn]["ppu"] assert math.isclose(pp_price, cleared_ledger_entry['size'], rel_tol=1) assert table.brokername == test_broker assert table.acctid == test_pp_account positions = pps # Close piker like a user would raise KeyboardInterrupt # Open piker and ensure last pps price is the same as ledger entry async def _open_and_assert_pps(): await _async_main(open_test_pikerd) assert positions(test_broker, test_account)[-1] == cleared_price # Close position and assert empty position in pps async def _close_pp_and_assert(): await _async_main(open_test_pikerd, 'sell', 1) with open_pps(test_broker, test_pp_account) as table: assert len(table.pps) == 0 # run initial time and send sent and assert trade with pytest.raises( BaseExceptionGroup ) as exc_info: trio.run(partial(_async_main, open_pikerd=open_test_pikerd, action='buy', ) ) with pytest.raises( BaseExceptionGroup ) as exc_info: trio.run(_open_and_assert_pps) with pytest.raises( BaseExceptionGroup ) as exc_info: trio.run(_close_pp_and_assert)