diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 4444306a..c53e02e5 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -36,7 +36,6 @@ import trio import tractor from .. import data -from ..data._source import Symbol from ..data.types import Struct from ..pp import ( Position, @@ -86,7 +85,7 @@ class PaperBoi(Struct): # init edge case L1 spread last_ask: tuple[float, float] = (float('inf'), 0) # price, size last_bid: tuple[float, float] = (0, 0) - + async def submit_limit( self, oid: str, # XXX: see return value @@ -236,7 +235,7 @@ class PaperBoi(Struct): ) log.info(f'Fake filling order:\n{fill_msg}') await self.ems_trades_stream.send(fill_msg) - + if order_complete: msg = BrokerdStatus( reqid=reqid, @@ -251,18 +250,6 @@ class PaperBoi(Struct): # lookup any existing position key = fqsn.rstrip(f'.{self.broker}') - pp = self._positions.setdefault( - fqsn, - Position( - Symbol( - key=key, - broker_info={self.broker: {}}, - ), - size=size, - ppu=price, - bsuid=key, - ) - ) t = Transaction( fqsn=fqsn, tid=oid, @@ -274,24 +261,24 @@ class PaperBoi(Struct): ) # Update in memory ledger per trade - ledger_entry = {} - ledger_entry[oid] = t.to_dict() - + ledger_entry = {oid: t.to_dict()} + # Store txn in state for PP update self._txn_dict[oid] = t self._trade_ledger.update(ledger_entry) - # Write to ledger toml + # Write to ledger toml right now with open_trade_ledger(self.broker, 'paper') as ledger: - ledger.update(self._trade_ledger) + ledger.update(self._trade_ledger) - # Write to pps toml - with open_pps(self.broker, 'piker-paper') as table: + # Write to pps toml right now + with open_pps(self.broker, 'piker-paper') as table: table.update_from_trans(self._txn_dict) # save pps in local state - self._positions.update(table.pps) + self._positions = table.pps - pp.add_clear(t) + # Ensure we have the latest positioning data when sending pp_msg + pp = self._positions[key] pp_msg = BrokerdPosition( broker=self.broker, @@ -300,7 +287,7 @@ class PaperBoi(Struct): # TODO: we need to look up the asset currency from # broker info. i guess for crypto this can be # inferred from the pair? - currency='', + currency=key, size=pp.size, avg_price=pp.ppu, ) @@ -330,7 +317,6 @@ async def simulate_fills( # https://github.com/quantopian/zipline/blob/master/zipline/finance/ledger.py # this stream may eventually contain multiple symbols - async for quotes in quote_stream: for sym, quote in quotes.items(): for tick in iterticks( @@ -423,7 +409,8 @@ async def simulate_fills( # below unecessarily and further don't want to pop # simulated live orders prematurely. case _: - continue + continue + # iterate all potentially clearable book prices # in FIFO order per side. for order_info, pred in iter_entries: @@ -437,7 +424,7 @@ async def simulate_fills( 'buy': buys, 'sell': sells }[action].inverse.pop(order_info) - + # clearing price would have filled entirely await client.fake_fill( fqsn=sym, @@ -554,10 +541,10 @@ async def trades_dialogue( ): - with open_pps(broker, 'piker-paper') as table: + with open_pps(broker, 'piker-paper', False) as table: # save pps in local state _positions.update(table.pps) - + pp_msgs: list[BrokerdPosition] = [] pos: Position token: str # f'{symbol}.{self.broker}' @@ -569,8 +556,7 @@ async def trades_dialogue( size=pos.size, avg_price=pos.ppu, )) - # TODO: load paper positions per broker from .toml config file - # and pass as symbol to position data mapping: ``dict[str, dict]`` + await ctx.started(( pp_msgs, ['paper'], diff --git a/pytest.ini b/pytest.ini index ffadaba4..19d9d41b 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,3 +1,3 @@ -[pytest] -trio_mode=True -log_cli=1 +#[pytest] +#trio_mode=True +#log_cli=1 diff --git a/tests/test_paper.py b/tests/test_paper.py index 91fa0d07..943fb9d5 100644 --- a/tests/test_paper.py +++ b/tests/test_paper.py @@ -1,20 +1,26 @@ +""" +Paper-mode testing +""" + import trio -import pytest -import tractor import math -import os from shutil import rmtree -from piker.config import get_app_dir -from piker.log import get_logger -from piker.clearing._messages import ( - Order -) -from uuid import uuid4 +from exceptiongroup import BaseExceptionGroup from typing import ( AsyncContextManager, Literal, ) +from pathlib import Path + +import pytest +import tractor +from tractor._exceptions import ContextCancelled +from uuid import uuid4 from functools import partial + +from piker.config import get_app_dir +from piker.log import get_logger +from piker.clearing._messages import Order from piker.pp import ( open_trade_ledger, open_pps, @@ -25,35 +31,36 @@ from piker.clearing import ( from piker.clearing._client import ( OrderBook, ) -from piker.clearing._messages import ( - BrokerdPosition -) -from exceptiongroup import BaseExceptionGroup +from piker._cacheables import open_cached_client +from piker.clearing._messages import BrokerdPosition log = get_logger(__name__) -@pytest.fixture(scope="session") -def paper_cleanup(): +@pytest.fixture(scope="module") +def delete_testing_dir(): + '''This fixture removes the temp directory + used for storing all config/ledger/pp data + created during testing sessions + ''' yield - app_dir = get_app_dir('piker') - if os.path.isfile(app_dir): - rmtree(app_dir) - assert not os.path.isfile(app_dir) + app_dir = Path(get_app_dir("piker")).resolve() + if app_dir.is_dir(): + rmtree(str(app_dir)) + assert not app_dir.is_dir() -def test_paper_trade( - open_test_pikerd: AsyncContextManager, - paper_cleanup: None -): +def get_fqsn(broker, symbol): + fqsn = f"{symbol}.{broker}" + return (fqsn, symbol, broker) + +def test_paper_trade(open_test_pikerd: AsyncContextManager): cleared_price: float - test_exec_mode='live' - test_account = 'paper' - test_size = 1 - test_broker = 'kraken' - test_brokers = [test_broker] - test_symbol = 'xbtusdt' - test_fqsn = f'{test_symbol}.{test_broker}' - test_pp_account = 'piker-paper' + test_exec_mode = "live" + test_account = "paper" + test_size = 1 + (fqsn, symbol, broker) = get_fqsn("kraken", "xbtusdt") + brokers = [broker] + test_pp_account = "piker-paper" positions: dict[ # brokername, acctid tuple[str, str], @@ -62,28 +69,25 @@ def test_paper_trade( async def _async_main( open_pikerd: AsyncContextManager, - action: Literal['buy', 'sell'] | None = None, + action: Literal["buy", "sell"] | None = None, price: int = 30000, assert_entries: bool = False, ) -> None: - """Spawn a paper piper actor, place a trade and assert entries are present - in both trade ledger and pps tomls. Then restart piker and ensure - that pps from previous trade exists in the ems pps. - Finally close the position and ensure that the position in pps.toml is closed. - """ + '''Spawn a paper piper actor, place a trade and assert entries are present + in both trade ledger and pps tomls. Then restart piker and ensure + that pps from previous trade exists in the ems pps. + Finally close the position and ensure that the position in pps.toml is closed. + ''' oid: str = str(uuid4()) book: OrderBook - global cleared_price - global positions - + nonlocal cleared_price + nonlocal positions + # Set up piker and EMS async with ( open_pikerd() as (_, _, _, services), - open_ems( - test_fqsn, - mode=test_account, - ) as ( + open_ems(fqsn, mode="paper") as ( book, trades_stream, pps, @@ -91,74 +95,107 @@ def test_paper_trade( dialogs, ), ): - # Send order to EMS - if action: + if action: order = Order( exec_mode=test_exec_mode, action=action, oid=oid, account=test_account, size=test_size, - symbol=test_fqsn, + symbol=fqsn, price=price, - brokers=test_brokers + brokers=brokers, ) - + # This is actually a syncronous call to push a message + # to the async ems clue - hence why we call trio.sleep afterwards book.send(order) - + await trio.sleep(2) # Assert entries are made in both ledger and PPS if assert_entries: cleared_ledger_entry = {} - with open_trade_ledger(test_broker, test_account) as ledger: + with open_trade_ledger(broker, test_account) as ledger: cleared_ledger_entry = ledger[oid] cleared_price = cleared_ledger_entry["price"] assert list(ledger.keys())[-1] == oid - assert cleared_ledger_entry['size'] == test_size - assert cleared_ledger_entry['fqsn'] == test_fqsn - - with open_pps(test_broker, test_pp_account) as table: - pp_price = table.conf[test_broker][test_pp_account][test_fqsn]["ppu"] - assert math.isclose(pp_price, cleared_ledger_entry['size'], rel_tol=1) - assert table.brokername == test_broker - assert table.acctid == test_pp_account - + assert cleared_ledger_entry["size"] == test_size + assert cleared_ledger_entry["fqsn"] == fqsn + + with open_pps(broker, test_pp_account) as table: + pp_price = table.conf[broker][test_pp_account][fqsn]["ppu"] + # Ensure the price-per-unit (breakeven) price is close to our clearing price + assert math.isclose( + pp_price, cleared_ledger_entry["size"], rel_tol=1 + ) + assert table.brokername == broker + assert table.acctid == test_pp_account + positions = pps # Close piker like a user would raise KeyboardInterrupt - # Open piker and ensure last pps price is the same as ledger entry - async def _open_and_assert_pps(): + # Open piker load pps locally + # and ensure last pps price is the same as ledger entry + async def _open_and_assert_pps(): await _async_main(open_test_pikerd) - assert positions(test_broker, test_account)[-1] == cleared_price - + assert positions(broker, test_account)[-1] == cleared_price + # Close position and assert empty position in pps async def _close_pp_and_assert(): - await _async_main(open_test_pikerd, 'sell', 1) - with open_pps(test_broker, test_pp_account) as table: + await _async_main(open_test_pikerd, "sell", 1) + with open_pps(broker, test_pp_account) as table: assert len(table.pps) == 0 - # run initial time and send sent and assert trade - with pytest.raises( - BaseExceptionGroup - ): - trio.run(partial(_async_main, - open_pikerd=open_test_pikerd, - action='buy', - ) - ) + def _run_test_and_check(exception, fn): + with pytest.raises(exception) as exc_info: + trio.run(fn) - with pytest.raises( - BaseExceptionGroup - ): - trio.run(_open_and_assert_pps) + for exception in exc_info.value.exceptions: + assert isinstance(exception, KeyboardInterrupt) or isinstance( + exception, ContextCancelled + ) - with pytest.raises( - BaseExceptionGroup + # Send and execute a trade and assert trade + _run_test_and_check( + BaseExceptionGroup, + partial( + _async_main, + open_pikerd=open_test_pikerd, + action="buy", + ), + ) + _run_test_and_check(BaseExceptionGroup, _open_and_assert_pps) + _run_test_and_check(BaseExceptionGroup, _close_pp_and_assert) + + +def test_paper_client( + open_test_pikerd: AsyncContextManager +): + + async def _async_main( + open_pikerd: AsyncContextManager, ): - trio.run(_close_pp_and_assert) + (fqsn, symbol, broker) = get_fqsn("kraken", "xbtusdt") + async with ( + open_pikerd() as (_, _, _, services), + open_ems(fqsn, mode="paper") as ( + book, + trades_stream, + pps, + accounts, + dialogs, + ), + ): + async with open_cached_client(broker) as client: + symbol_info = await client.symbol_info() + print(f'client: {symbol_info["XBTUSDT"]}') + trio.run(partial( + _async_main, + open_pikerd=open_test_pikerd, + ), + ) diff --git a/tests/test_services.py b/tests/test_services.py index e44a21f4..763b438e 100644 --- a/tests/test_services.py +++ b/tests/test_services.py @@ -10,7 +10,6 @@ import trio import tractor from piker.log import get_logger - from piker._daemon import ( find_service, Services, @@ -177,4 +176,4 @@ def test_ensure_ems_in_paper_actors( trio.run(main) cancel_msg: str = '_emsd_main()` was remotely cancelled by its caller' - assert cancel_msg in exc_info.value.args[0] + assert cancel_msg in exc_info.value.args[0]