Ensure actual pp is sent to ems

ensure not to write pp header on startup

Comment out pytest settings
Add comments explaining delete_testing_dir fixture
use nonlocal instead of global for test state

Add unpacking get_fqsn method
Format test_paper
Add comments explaining sync/async book.send calls
explicit_write_pps_on_exit
jaredgoldman 2023-02-23 15:21:10 -05:00
parent db2e2ed78f
commit c99381216d
4 changed files with 141 additions and 119 deletions

View File

@ -36,7 +36,6 @@ import trio
import tractor
from .. import data
from ..data._source import Symbol
from ..data.types import Struct
from ..pp import (
Position,
@ -86,7 +85,7 @@ class PaperBoi(Struct):
# init edge case L1 spread
last_ask: tuple[float, float] = (float('inf'), 0) # price, size
last_bid: tuple[float, float] = (0, 0)
async def submit_limit(
self,
oid: str, # XXX: see return value
@ -236,7 +235,7 @@ class PaperBoi(Struct):
)
log.info(f'Fake filling order:\n{fill_msg}')
await self.ems_trades_stream.send(fill_msg)
if order_complete:
msg = BrokerdStatus(
reqid=reqid,
@ -251,18 +250,6 @@ class PaperBoi(Struct):
# lookup any existing position
key = fqsn.rstrip(f'.{self.broker}')
pp = self._positions.setdefault(
fqsn,
Position(
Symbol(
key=key,
broker_info={self.broker: {}},
),
size=size,
ppu=price,
bsuid=key,
)
)
t = Transaction(
fqsn=fqsn,
tid=oid,
@ -274,24 +261,24 @@ class PaperBoi(Struct):
)
# Update in memory ledger per trade
ledger_entry = {}
ledger_entry[oid] = t.to_dict()
ledger_entry = {oid: t.to_dict()}
# Store txn in state for PP update
self._txn_dict[oid] = t
self._trade_ledger.update(ledger_entry)
# Write to ledger toml
# Write to ledger toml right now
with open_trade_ledger(self.broker, 'paper') as ledger:
ledger.update(self._trade_ledger)
ledger.update(self._trade_ledger)
# Write to pps toml
with open_pps(self.broker, 'piker-paper') as table:
# Write to pps toml right now
with open_pps(self.broker, 'piker-paper') as table:
table.update_from_trans(self._txn_dict)
# save pps in local state
self._positions.update(table.pps)
self._positions = table.pps
pp.add_clear(t)
# Ensure we have the latest positioning data when sending pp_msg
pp = self._positions[key]
pp_msg = BrokerdPosition(
broker=self.broker,
@ -300,7 +287,7 @@ class PaperBoi(Struct):
# TODO: we need to look up the asset currency from
# broker info. i guess for crypto this can be
# inferred from the pair?
currency='',
currency=key,
size=pp.size,
avg_price=pp.ppu,
)
@ -330,7 +317,6 @@ async def simulate_fills(
# https://github.com/quantopian/zipline/blob/master/zipline/finance/ledger.py
# this stream may eventually contain multiple symbols
async for quotes in quote_stream:
for sym, quote in quotes.items():
for tick in iterticks(
@ -423,7 +409,8 @@ async def simulate_fills(
# below unecessarily and further don't want to pop
# simulated live orders prematurely.
case _:
continue
continue
# iterate all potentially clearable book prices
# in FIFO order per side.
for order_info, pred in iter_entries:
@ -437,7 +424,7 @@ async def simulate_fills(
'buy': buys,
'sell': sells
}[action].inverse.pop(order_info)
# clearing price would have filled entirely
await client.fake_fill(
fqsn=sym,
@ -554,10 +541,10 @@ async def trades_dialogue(
):
with open_pps(broker, 'piker-paper') as table:
with open_pps(broker, 'piker-paper', False) as table:
# save pps in local state
_positions.update(table.pps)
pp_msgs: list[BrokerdPosition] = []
pos: Position
token: str # f'{symbol}.{self.broker}'
@ -569,8 +556,7 @@ async def trades_dialogue(
size=pos.size,
avg_price=pos.ppu,
))
# TODO: load paper positions per broker from .toml config file
# and pass as symbol to position data mapping: ``dict[str, dict]``
await ctx.started((
pp_msgs,
['paper'],

View File

@ -1,3 +1,3 @@
[pytest]
trio_mode=True
log_cli=1
#[pytest]
#trio_mode=True
#log_cli=1

View File

@ -1,20 +1,26 @@
"""
Paper-mode testing
"""
import trio
import pytest
import tractor
import math
import os
from shutil import rmtree
from piker.config import get_app_dir
from piker.log import get_logger
from piker.clearing._messages import (
Order
)
from uuid import uuid4
from exceptiongroup import BaseExceptionGroup
from typing import (
AsyncContextManager,
Literal,
)
from pathlib import Path
import pytest
import tractor
from tractor._exceptions import ContextCancelled
from uuid import uuid4
from functools import partial
from piker.config import get_app_dir
from piker.log import get_logger
from piker.clearing._messages import Order
from piker.pp import (
open_trade_ledger,
open_pps,
@ -25,35 +31,36 @@ from piker.clearing import (
from piker.clearing._client import (
OrderBook,
)
from piker.clearing._messages import (
BrokerdPosition
)
from exceptiongroup import BaseExceptionGroup
from piker._cacheables import open_cached_client
from piker.clearing._messages import BrokerdPosition
log = get_logger(__name__)
@pytest.fixture(scope="session")
def paper_cleanup():
@pytest.fixture(scope="module")
def delete_testing_dir():
'''This fixture removes the temp directory
used for storing all config/ledger/pp data
created during testing sessions
'''
yield
app_dir = get_app_dir('piker')
if os.path.isfile(app_dir):
rmtree(app_dir)
assert not os.path.isfile(app_dir)
app_dir = Path(get_app_dir("piker")).resolve()
if app_dir.is_dir():
rmtree(str(app_dir))
assert not app_dir.is_dir()
def test_paper_trade(
open_test_pikerd: AsyncContextManager,
paper_cleanup: None
):
def get_fqsn(broker, symbol):
fqsn = f"{symbol}.{broker}"
return (fqsn, symbol, broker)
def test_paper_trade(open_test_pikerd: AsyncContextManager):
cleared_price: float
test_exec_mode='live'
test_account = 'paper'
test_size = 1
test_broker = 'kraken'
test_brokers = [test_broker]
test_symbol = 'xbtusdt'
test_fqsn = f'{test_symbol}.{test_broker}'
test_pp_account = 'piker-paper'
test_exec_mode = "live"
test_account = "paper"
test_size = 1
(fqsn, symbol, broker) = get_fqsn("kraken", "xbtusdt")
brokers = [broker]
test_pp_account = "piker-paper"
positions: dict[
# brokername, acctid
tuple[str, str],
@ -62,28 +69,25 @@ def test_paper_trade(
async def _async_main(
open_pikerd: AsyncContextManager,
action: Literal['buy', 'sell'] | None = None,
action: Literal["buy", "sell"] | None = None,
price: int = 30000,
assert_entries: bool = False,
) -> None:
"""Spawn a paper piper actor, place a trade and assert entries are present
in both trade ledger and pps tomls. Then restart piker and ensure
that pps from previous trade exists in the ems pps.
Finally close the position and ensure that the position in pps.toml is closed.
"""
'''Spawn a paper piper actor, place a trade and assert entries are present
in both trade ledger and pps tomls. Then restart piker and ensure
that pps from previous trade exists in the ems pps.
Finally close the position and ensure that the position in pps.toml is closed.
'''
oid: str = str(uuid4())
book: OrderBook
global cleared_price
global positions
nonlocal cleared_price
nonlocal positions
# Set up piker and EMS
async with (
open_pikerd() as (_, _, _, services),
open_ems(
test_fqsn,
mode=test_account,
) as (
open_ems(fqsn, mode="paper") as (
book,
trades_stream,
pps,
@ -91,74 +95,107 @@ def test_paper_trade(
dialogs,
),
):
# Send order to EMS
if action:
if action:
order = Order(
exec_mode=test_exec_mode,
action=action,
oid=oid,
account=test_account,
size=test_size,
symbol=test_fqsn,
symbol=fqsn,
price=price,
brokers=test_brokers
brokers=brokers,
)
# This is actually a syncronous call to push a message
# to the async ems clue - hence why we call trio.sleep afterwards
book.send(order)
await trio.sleep(2)
# Assert entries are made in both ledger and PPS
if assert_entries:
cleared_ledger_entry = {}
with open_trade_ledger(test_broker, test_account) as ledger:
with open_trade_ledger(broker, test_account) as ledger:
cleared_ledger_entry = ledger[oid]
cleared_price = cleared_ledger_entry["price"]
assert list(ledger.keys())[-1] == oid
assert cleared_ledger_entry['size'] == test_size
assert cleared_ledger_entry['fqsn'] == test_fqsn
with open_pps(test_broker, test_pp_account) as table:
pp_price = table.conf[test_broker][test_pp_account][test_fqsn]["ppu"]
assert math.isclose(pp_price, cleared_ledger_entry['size'], rel_tol=1)
assert table.brokername == test_broker
assert table.acctid == test_pp_account
assert cleared_ledger_entry["size"] == test_size
assert cleared_ledger_entry["fqsn"] == fqsn
with open_pps(broker, test_pp_account) as table:
pp_price = table.conf[broker][test_pp_account][fqsn]["ppu"]
# Ensure the price-per-unit (breakeven) price is close to our clearing price
assert math.isclose(
pp_price, cleared_ledger_entry["size"], rel_tol=1
)
assert table.brokername == broker
assert table.acctid == test_pp_account
positions = pps
# Close piker like a user would
raise KeyboardInterrupt
# Open piker and ensure last pps price is the same as ledger entry
async def _open_and_assert_pps():
# Open piker load pps locally
# and ensure last pps price is the same as ledger entry
async def _open_and_assert_pps():
await _async_main(open_test_pikerd)
assert positions(test_broker, test_account)[-1] == cleared_price
assert positions(broker, test_account)[-1] == cleared_price
# Close position and assert empty position in pps
async def _close_pp_and_assert():
await _async_main(open_test_pikerd, 'sell', 1)
with open_pps(test_broker, test_pp_account) as table:
await _async_main(open_test_pikerd, "sell", 1)
with open_pps(broker, test_pp_account) as table:
assert len(table.pps) == 0
# run initial time and send sent and assert trade
with pytest.raises(
BaseExceptionGroup
):
trio.run(partial(_async_main,
open_pikerd=open_test_pikerd,
action='buy',
)
)
def _run_test_and_check(exception, fn):
with pytest.raises(exception) as exc_info:
trio.run(fn)
with pytest.raises(
BaseExceptionGroup
):
trio.run(_open_and_assert_pps)
for exception in exc_info.value.exceptions:
assert isinstance(exception, KeyboardInterrupt) or isinstance(
exception, ContextCancelled
)
with pytest.raises(
BaseExceptionGroup
# Send and execute a trade and assert trade
_run_test_and_check(
BaseExceptionGroup,
partial(
_async_main,
open_pikerd=open_test_pikerd,
action="buy",
),
)
_run_test_and_check(BaseExceptionGroup, _open_and_assert_pps)
_run_test_and_check(BaseExceptionGroup, _close_pp_and_assert)
def test_paper_client(
open_test_pikerd: AsyncContextManager
):
async def _async_main(
open_pikerd: AsyncContextManager,
):
trio.run(_close_pp_and_assert)
(fqsn, symbol, broker) = get_fqsn("kraken", "xbtusdt")
async with (
open_pikerd() as (_, _, _, services),
open_ems(fqsn, mode="paper") as (
book,
trades_stream,
pps,
accounts,
dialogs,
),
):
async with open_cached_client(broker) as client:
symbol_info = await client.symbol_info()
print(f'client: {symbol_info["XBTUSDT"]}')
trio.run(partial(
_async_main,
open_pikerd=open_test_pikerd,
),
)

View File

@ -10,7 +10,6 @@ import trio
import tractor
from piker.log import get_logger
from piker._daemon import (
find_service,
Services,
@ -177,4 +176,4 @@ def test_ensure_ems_in_paper_actors(
trio.run(main)
cancel_msg: str = '_emsd_main()` was remotely cancelled by its caller'
assert cancel_msg in exc_info.value.args[0]
assert cancel_msg in exc_info.value.args[0]