Ensure actual pp is sent to ems

ensure not to write pp header on startup

Comment out pytest settings
Add comments explaining delete_testing_dir fixture
use nonlocal instead of global for test state

Add unpacking get_fqsn method
Format test_paper
Add comments explaining sync/async book.send calls
paper_trade_improvements_rebase
jaredgoldman 2023-02-23 15:21:10 -05:00
parent b80dfe4f9c
commit e8714c2d17
4 changed files with 141 additions and 119 deletions

View File

@ -36,7 +36,6 @@ import trio
import tractor import tractor
from .. import data from .. import data
from ..data._source import Symbol
from ..data.types import Struct from ..data.types import Struct
from ..pp import ( from ..pp import (
Position, Position,
@ -86,7 +85,7 @@ class PaperBoi(Struct):
# init edge case L1 spread # init edge case L1 spread
last_ask: tuple[float, float] = (float('inf'), 0) # price, size last_ask: tuple[float, float] = (float('inf'), 0) # price, size
last_bid: tuple[float, float] = (0, 0) last_bid: tuple[float, float] = (0, 0)
async def submit_limit( async def submit_limit(
self, self,
oid: str, # XXX: see return value oid: str, # XXX: see return value
@ -236,7 +235,7 @@ class PaperBoi(Struct):
) )
log.info(f'Fake filling order:\n{fill_msg}') log.info(f'Fake filling order:\n{fill_msg}')
await self.ems_trades_stream.send(fill_msg) await self.ems_trades_stream.send(fill_msg)
if order_complete: if order_complete:
msg = BrokerdStatus( msg = BrokerdStatus(
reqid=reqid, reqid=reqid,
@ -251,18 +250,6 @@ class PaperBoi(Struct):
# lookup any existing position # lookup any existing position
key = fqsn.rstrip(f'.{self.broker}') key = fqsn.rstrip(f'.{self.broker}')
pp = self._positions.setdefault(
fqsn,
Position(
Symbol(
key=key,
broker_info={self.broker: {}},
),
size=size,
ppu=price,
bsuid=key,
)
)
t = Transaction( t = Transaction(
fqsn=fqsn, fqsn=fqsn,
tid=oid, tid=oid,
@ -274,24 +261,24 @@ class PaperBoi(Struct):
) )
# Update in memory ledger per trade # Update in memory ledger per trade
ledger_entry = {} ledger_entry = {oid: t.to_dict()}
ledger_entry[oid] = t.to_dict()
# Store txn in state for PP update # Store txn in state for PP update
self._txn_dict[oid] = t self._txn_dict[oid] = t
self._trade_ledger.update(ledger_entry) self._trade_ledger.update(ledger_entry)
# Write to ledger toml # Write to ledger toml right now
with open_trade_ledger(self.broker, 'paper') as ledger: with open_trade_ledger(self.broker, 'paper') as ledger:
ledger.update(self._trade_ledger) ledger.update(self._trade_ledger)
# Write to pps toml # Write to pps toml right now
with open_pps(self.broker, 'piker-paper') as table: with open_pps(self.broker, 'piker-paper') as table:
table.update_from_trans(self._txn_dict) table.update_from_trans(self._txn_dict)
# save pps in local state # save pps in local state
self._positions.update(table.pps) self._positions = table.pps
pp.add_clear(t) # Ensure we have the latest positioning data when sending pp_msg
pp = self._positions[key]
pp_msg = BrokerdPosition( pp_msg = BrokerdPosition(
broker=self.broker, broker=self.broker,
@ -300,7 +287,7 @@ class PaperBoi(Struct):
# TODO: we need to look up the asset currency from # TODO: we need to look up the asset currency from
# broker info. i guess for crypto this can be # broker info. i guess for crypto this can be
# inferred from the pair? # inferred from the pair?
currency='', currency=key,
size=pp.size, size=pp.size,
avg_price=pp.ppu, avg_price=pp.ppu,
) )
@ -330,7 +317,6 @@ async def simulate_fills(
# https://github.com/quantopian/zipline/blob/master/zipline/finance/ledger.py # https://github.com/quantopian/zipline/blob/master/zipline/finance/ledger.py
# this stream may eventually contain multiple symbols # this stream may eventually contain multiple symbols
async for quotes in quote_stream: async for quotes in quote_stream:
for sym, quote in quotes.items(): for sym, quote in quotes.items():
for tick in iterticks( for tick in iterticks(
@ -423,7 +409,8 @@ async def simulate_fills(
# below unecessarily and further don't want to pop # below unecessarily and further don't want to pop
# simulated live orders prematurely. # simulated live orders prematurely.
case _: case _:
continue continue
# iterate all potentially clearable book prices # iterate all potentially clearable book prices
# in FIFO order per side. # in FIFO order per side.
for order_info, pred in iter_entries: for order_info, pred in iter_entries:
@ -437,7 +424,7 @@ async def simulate_fills(
'buy': buys, 'buy': buys,
'sell': sells 'sell': sells
}[action].inverse.pop(order_info) }[action].inverse.pop(order_info)
# clearing price would have filled entirely # clearing price would have filled entirely
await client.fake_fill( await client.fake_fill(
fqsn=sym, fqsn=sym,
@ -554,10 +541,10 @@ async def trades_dialogue(
): ):
with open_pps(broker, 'piker-paper') as table: with open_pps(broker, 'piker-paper', False) as table:
# save pps in local state # save pps in local state
_positions.update(table.pps) _positions.update(table.pps)
pp_msgs: list[BrokerdPosition] = [] pp_msgs: list[BrokerdPosition] = []
pos: Position pos: Position
token: str # f'{symbol}.{self.broker}' token: str # f'{symbol}.{self.broker}'
@ -569,8 +556,7 @@ async def trades_dialogue(
size=pos.size, size=pos.size,
avg_price=pos.ppu, avg_price=pos.ppu,
)) ))
# TODO: load paper positions per broker from .toml config file
# and pass as symbol to position data mapping: ``dict[str, dict]``
await ctx.started(( await ctx.started((
pp_msgs, pp_msgs,
['paper'], ['paper'],

View File

@ -1,3 +1,3 @@
[pytest] #[pytest]
trio_mode=True #trio_mode=True
log_cli=1 #log_cli=1

View File

@ -1,20 +1,26 @@
"""
Paper-mode testing
"""
import trio import trio
import pytest
import tractor
import math import math
import os
from shutil import rmtree from shutil import rmtree
from piker.config import get_app_dir from exceptiongroup import BaseExceptionGroup
from piker.log import get_logger
from piker.clearing._messages import (
Order
)
from uuid import uuid4
from typing import ( from typing import (
AsyncContextManager, AsyncContextManager,
Literal, Literal,
) )
from pathlib import Path
import pytest
import tractor
from tractor._exceptions import ContextCancelled
from uuid import uuid4
from functools import partial from functools import partial
from piker.config import get_app_dir
from piker.log import get_logger
from piker.clearing._messages import Order
from piker.pp import ( from piker.pp import (
open_trade_ledger, open_trade_ledger,
open_pps, open_pps,
@ -25,35 +31,36 @@ from piker.clearing import (
from piker.clearing._client import ( from piker.clearing._client import (
OrderBook, OrderBook,
) )
from piker.clearing._messages import ( from piker._cacheables import open_cached_client
BrokerdPosition from piker.clearing._messages import BrokerdPosition
)
from exceptiongroup import BaseExceptionGroup
log = get_logger(__name__) log = get_logger(__name__)
@pytest.fixture(scope="session") @pytest.fixture(scope="module")
def paper_cleanup(): def delete_testing_dir():
'''This fixture removes the temp directory
used for storing all config/ledger/pp data
created during testing sessions
'''
yield yield
app_dir = get_app_dir('piker') app_dir = Path(get_app_dir("piker")).resolve()
if os.path.isfile(app_dir): if app_dir.is_dir():
rmtree(app_dir) rmtree(str(app_dir))
assert not os.path.isfile(app_dir) assert not app_dir.is_dir()
def test_paper_trade( def get_fqsn(broker, symbol):
open_test_pikerd: AsyncContextManager, fqsn = f"{symbol}.{broker}"
paper_cleanup: None return (fqsn, symbol, broker)
):
def test_paper_trade(open_test_pikerd: AsyncContextManager):
cleared_price: float cleared_price: float
test_exec_mode='live' test_exec_mode = "live"
test_account = 'paper' test_account = "paper"
test_size = 1 test_size = 1
test_broker = 'kraken' (fqsn, symbol, broker) = get_fqsn("kraken", "xbtusdt")
test_brokers = [test_broker] brokers = [broker]
test_symbol = 'xbtusdt' test_pp_account = "piker-paper"
test_fqsn = f'{test_symbol}.{test_broker}'
test_pp_account = 'piker-paper'
positions: dict[ positions: dict[
# brokername, acctid # brokername, acctid
tuple[str, str], tuple[str, str],
@ -62,28 +69,25 @@ def test_paper_trade(
async def _async_main( async def _async_main(
open_pikerd: AsyncContextManager, open_pikerd: AsyncContextManager,
action: Literal['buy', 'sell'] | None = None, action: Literal["buy", "sell"] | None = None,
price: int = 30000, price: int = 30000,
assert_entries: bool = False, assert_entries: bool = False,
) -> None: ) -> None:
"""Spawn a paper piper actor, place a trade and assert entries are present '''Spawn a paper piper actor, place a trade and assert entries are present
in both trade ledger and pps tomls. Then restart piker and ensure in both trade ledger and pps tomls. Then restart piker and ensure
that pps from previous trade exists in the ems pps. that pps from previous trade exists in the ems pps.
Finally close the position and ensure that the position in pps.toml is closed. Finally close the position and ensure that the position in pps.toml is closed.
""" '''
oid: str = str(uuid4()) oid: str = str(uuid4())
book: OrderBook book: OrderBook
global cleared_price nonlocal cleared_price
global positions nonlocal positions
# Set up piker and EMS # Set up piker and EMS
async with ( async with (
open_pikerd() as (_, _, _, services), open_pikerd() as (_, _, _, services),
open_ems( open_ems(fqsn, mode="paper") as (
test_fqsn,
mode=test_account,
) as (
book, book,
trades_stream, trades_stream,
pps, pps,
@ -91,74 +95,107 @@ def test_paper_trade(
dialogs, dialogs,
), ),
): ):
# Send order to EMS # Send order to EMS
if action: if action:
order = Order( order = Order(
exec_mode=test_exec_mode, exec_mode=test_exec_mode,
action=action, action=action,
oid=oid, oid=oid,
account=test_account, account=test_account,
size=test_size, size=test_size,
symbol=test_fqsn, symbol=fqsn,
price=price, price=price,
brokers=test_brokers brokers=brokers,
) )
# This is actually a syncronous call to push a message
# to the async ems clue - hence why we call trio.sleep afterwards
book.send(order) book.send(order)
await trio.sleep(2) await trio.sleep(2)
# Assert entries are made in both ledger and PPS # Assert entries are made in both ledger and PPS
if assert_entries: if assert_entries:
cleared_ledger_entry = {} cleared_ledger_entry = {}
with open_trade_ledger(test_broker, test_account) as ledger: with open_trade_ledger(broker, test_account) as ledger:
cleared_ledger_entry = ledger[oid] cleared_ledger_entry = ledger[oid]
cleared_price = cleared_ledger_entry["price"] cleared_price = cleared_ledger_entry["price"]
assert list(ledger.keys())[-1] == oid assert list(ledger.keys())[-1] == oid
assert cleared_ledger_entry['size'] == test_size assert cleared_ledger_entry["size"] == test_size
assert cleared_ledger_entry['fqsn'] == test_fqsn assert cleared_ledger_entry["fqsn"] == fqsn
with open_pps(test_broker, test_pp_account) as table: with open_pps(broker, test_pp_account) as table:
pp_price = table.conf[test_broker][test_pp_account][test_fqsn]["ppu"] pp_price = table.conf[broker][test_pp_account][fqsn]["ppu"]
assert math.isclose(pp_price, cleared_ledger_entry['size'], rel_tol=1) # Ensure the price-per-unit (breakeven) price is close to our clearing price
assert table.brokername == test_broker assert math.isclose(
assert table.acctid == test_pp_account pp_price, cleared_ledger_entry["size"], rel_tol=1
)
assert table.brokername == broker
assert table.acctid == test_pp_account
positions = pps positions = pps
# Close piker like a user would # Close piker like a user would
raise KeyboardInterrupt raise KeyboardInterrupt
# Open piker and ensure last pps price is the same as ledger entry # Open piker load pps locally
async def _open_and_assert_pps(): # and ensure last pps price is the same as ledger entry
async def _open_and_assert_pps():
await _async_main(open_test_pikerd) await _async_main(open_test_pikerd)
assert positions(test_broker, test_account)[-1] == cleared_price assert positions(broker, test_account)[-1] == cleared_price
# Close position and assert empty position in pps # Close position and assert empty position in pps
async def _close_pp_and_assert(): async def _close_pp_and_assert():
await _async_main(open_test_pikerd, 'sell', 1) await _async_main(open_test_pikerd, "sell", 1)
with open_pps(test_broker, test_pp_account) as table: with open_pps(broker, test_pp_account) as table:
assert len(table.pps) == 0 assert len(table.pps) == 0
# run initial time and send sent and assert trade def _run_test_and_check(exception, fn):
with pytest.raises( with pytest.raises(exception) as exc_info:
BaseExceptionGroup trio.run(fn)
):
trio.run(partial(_async_main,
open_pikerd=open_test_pikerd,
action='buy',
)
)
with pytest.raises( for exception in exc_info.value.exceptions:
BaseExceptionGroup assert isinstance(exception, KeyboardInterrupt) or isinstance(
): exception, ContextCancelled
trio.run(_open_and_assert_pps) )
with pytest.raises( # Send and execute a trade and assert trade
BaseExceptionGroup _run_test_and_check(
BaseExceptionGroup,
partial(
_async_main,
open_pikerd=open_test_pikerd,
action="buy",
),
)
_run_test_and_check(BaseExceptionGroup, _open_and_assert_pps)
_run_test_and_check(BaseExceptionGroup, _close_pp_and_assert)
def test_paper_client(
open_test_pikerd: AsyncContextManager
):
async def _async_main(
open_pikerd: AsyncContextManager,
): ):
trio.run(_close_pp_and_assert) (fqsn, symbol, broker) = get_fqsn("kraken", "xbtusdt")
async with (
open_pikerd() as (_, _, _, services),
open_ems(fqsn, mode="paper") as (
book,
trades_stream,
pps,
accounts,
dialogs,
),
):
async with open_cached_client(broker) as client:
symbol_info = await client.symbol_info()
print(f'client: {symbol_info["XBTUSDT"]}')
trio.run(partial(
_async_main,
open_pikerd=open_test_pikerd,
),
)

View File

@ -10,7 +10,6 @@ import trio
import tractor import tractor
from piker.log import get_logger from piker.log import get_logger
from piker._daemon import ( from piker._daemon import (
find_service, find_service,
Services, Services,
@ -177,4 +176,4 @@ def test_ensure_ems_in_paper_actors(
trio.run(main) trio.run(main)
cancel_msg: str = '_emsd_main()` was remotely cancelled by its caller' cancel_msg: str = '_emsd_main()` was remotely cancelled by its caller'
assert cancel_msg in exc_info.value.args[0] assert cancel_msg in exc_info.value.args[0]