Merge pull request #462 from pikers/paper_trade_improvements_rebase

Paper trade improvements
explicit_write_pps_on_exit
jaredgoldman 2023-02-28 14:30:20 -05:00 committed by GitHub
commit d04fe366ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 351 additions and 42 deletions

View File

@ -100,7 +100,7 @@ class Order(Struct):
price: float
size: float # -ve is "sell", +ve is "buy"
brokers: Optional[list[str]] = []
brokers: list[str] = []
class Cancel(Struct):

View File

@ -37,11 +37,12 @@ import trio
import tractor
from .. import data
from ..data._source import Symbol
from ..data.types import Struct
from ..pp import (
Position,
Transaction,
open_trade_ledger,
open_pps,
)
from ..data._normalize import iterticks
from ..data._source import unpack_fqsn
@ -56,6 +57,7 @@ from ._messages import (
BrokerdError,
)
from ..config import load
log = get_logger(__name__)
@ -234,8 +236,6 @@ class PaperBoi(Struct):
log.info(f'Fake filling order:\n{fill_msg}')
await self.ems_trades_stream.send(fill_msg)
self._trade_ledger.update(fill_msg.to_dict())
if order_complete:
msg = BrokerdStatus(
reqid=reqid,
@ -250,18 +250,6 @@ class PaperBoi(Struct):
# lookup any existing position
key = fqsn.rstrip(f'.{self.broker}')
pp = self._positions.setdefault(
fqsn,
Position(
Symbol(
key=key,
broker_info={self.broker: {}},
),
size=size,
ppu=price,
bsuid=key,
)
)
t = Transaction(
fqsn=fqsn,
tid=oid,
@ -271,21 +259,29 @@ class PaperBoi(Struct):
dt=pendulum.from_timestamp(fill_time_s),
bsuid=key,
)
pp.add_clear(t)
pp_msg = BrokerdPosition(
broker=self.broker,
account='paper',
symbol=fqsn,
# TODO: we need to look up the asset currency from
# broker info. i guess for crypto this can be
# inferred from the pair?
currency='',
size=pp.size,
avg_price=pp.ppu,
)
with (
open_trade_ledger(self.broker, 'paper') as ledger,
open_pps(self.broker, 'paper', True) as table
):
ledger.update({oid: t.to_dict()})
# Write to pps toml right now
table.update_from_trans({oid: t})
await self.ems_trades_stream.send(pp_msg)
pp = table.pps[key]
pp_msg = BrokerdPosition(
broker=self.broker,
account='paper',
symbol=fqsn,
# TODO: we need to look up the asset currency from
# broker info. i guess for crypto this can be
# inferred from the pair?
currency=key,
size=pp.size,
avg_price=pp.ppu,
)
await self.ems_trades_stream.send(pp_msg)
async def simulate_fills(
@ -533,6 +529,11 @@ async def trades_dialogue(
) as feed,
):
with open_pps(broker, 'paper') as table:
# save pps in local state
_positions.update(table.pps)
pp_msgs: list[BrokerdPosition] = []
pos: Position
token: str # f'{symbol}.{self.broker}'
@ -545,8 +546,6 @@ async def trades_dialogue(
avg_price=pos.ppu,
))
# TODO: load paper positions per broker from .toml config file
# and pass as symbol to position data mapping: ``dict[str, dict]``
await ctx.started((
pp_msgs,
['paper'],
@ -564,7 +563,6 @@ async def trades_dialogue(
_reqids=_reqids,
# TODO: load paper positions from ``positions.toml``
_positions=_positions,
# TODO: load postions from ledger file

View File

@ -25,10 +25,10 @@ from os import path
from os.path import dirname
import shutil
from typing import Optional
from pathlib import Path
from bidict import bidict
import toml
from piker.testing import TEST_CONFIG_DIR_PATH
from .log import get_logger
log = get_logger('broker-config')
@ -75,6 +75,13 @@ def get_app_dir(app_name, roaming=True, force_posix=False):
def _posixify(name):
return "-".join(name.split()).lower()
# TODO: This is a hacky way to a) determine we're testing
# and b) creating a test dir. We should aim to set a variable
# within the tractor runtimes and store testing config data
# outside of the users filesystem
if "pytest" in sys.modules:
app_name = os.path.join(app_name, TEST_CONFIG_DIR_PATH)
# if WIN:
if platform.system() == 'Windows':
key = "APPDATA" if roaming else "LOCALAPPDATA"
@ -115,6 +122,7 @@ _conf_names: set[str] = {
'pps',
'trades',
'watchlists',
'paper_trades'
}
_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json')
@ -198,7 +206,7 @@ def load(
path = path or get_conf_path(conf_name)
if not os.path.isdir(_config_dir):
os.mkdir(_config_dir)
Path(_config_dir).mkdir(parents=True, exist_ok=True)
if not os.path.isfile(path):
fn = _conf_fn_w_ext(conf_name)
@ -212,6 +220,10 @@ def load(
# if one exists.
if os.path.isfile(template):
shutil.copyfile(template, path)
else:
# create an empty file
with open(path, 'x'):
pass
else:
with open(path, 'r'):
pass # touch it

View File

@ -35,6 +35,7 @@ from typing import (
)
import time
from math import isnan
from pathlib import Path
from bidict import bidict
from msgspec.msgpack import encode, decode
@ -134,7 +135,7 @@ def start_marketstore(
# create dirs when dne
if not os.path.isdir(config._config_dir):
os.mkdir(config._config_dir)
Path(config._config_dir).mkdir(parents=True, exist_ok=True)
if not os.path.isdir(mktsdir):
os.mkdir(mktsdir)

View File

@ -20,6 +20,7 @@ that doesn't try to cuk most humans who prefer to not lose their moneys..
(looking at you `ib` and dirt-bird friends)
'''
from __future__ import annotations
from contextlib import contextmanager as cm
from pprint import pformat
import os
@ -32,6 +33,7 @@ from typing import (
Iterator,
Optional,
Union,
Generator
)
import pendulum
@ -54,7 +56,7 @@ def open_trade_ledger(
broker: str,
account: str,
) -> dict:
) -> Generator[dict, None, None]:
'''
Indempotently create and read in a trade log file from the
``<configuration_dir>/ledgers/`` directory.
@ -90,8 +92,7 @@ def open_trade_ledger(
# TODO: show diff output?
# https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries
print(f'Updating ledger for {tradesfile}:\n')
ledger.update(cpy)
ledger.update(cpy)
# we write on close the mutated ledger data
with open(tradesfile, 'w') as cf:
toml.dump(ledger, cf)
@ -879,9 +880,9 @@ class PpsEncoder(toml.TomlEncoder):
def open_pps(
brokername: str,
acctid: str,
write_on_exit: bool = True,
write_on_exit: bool = False,
) -> PpTable:
) -> Generator[PpTable, None, None]:
'''
Read out broker-specific position entries from
incremental update file: ``pps.toml``.

View File

@ -0,0 +1 @@
TEST_CONFIG_DIR_PATH = '_testing'

3
pytest.ini 100644
View File

@ -0,0 +1,3 @@
#[pytest]
#trio_mode=True
#log_cli=1

View File

@ -1,6 +1,9 @@
from contextlib import asynccontextmanager as acm
from functools import partial
import os
from typing import AsyncContextManager
from pathlib import Path
from shutil import rmtree
import pytest
import tractor
@ -11,6 +14,7 @@ from piker import (
from piker._daemon import (
Services,
)
from piker.clearing._client import open_ems
def pytest_addoption(parser):
@ -132,3 +136,52 @@ def open_test_pikerd(
# - no leaked subprocs or shm buffers
# - all requested container service are torn down
# - certain ``tractor`` runtime state?
@acm
async def _open_test_pikerd_and_ems(
fqsn,
mode,
loglevel,
open_test_pikerd
):
async with (
open_test_pikerd() as (_, _, _, services),
open_ems(
fqsn,
mode=mode,
loglevel=loglevel,
) as ems_services):
yield (services, ems_services)
@pytest.fixture
def open_test_pikerd_and_ems(
open_test_pikerd,
fqsn: str = 'xbtusdt.kraken',
mode: str = 'paper',
loglevel: str = 'info',
):
yield partial(
_open_test_pikerd_and_ems,
fqsn,
mode,
loglevel,
open_test_pikerd
)
@pytest.fixture(scope='module')
def delete_testing_dir():
'''
This fixture removes the temp directory
used for storing all config/ledger/pp data
created during testing sessions. During test runs
this file can be found in .config/piker/_testing
'''
yield
app_dir = Path(config.get_app_dir('piker')).resolve()
if app_dir.is_dir():
rmtree(str(app_dir))
assert not app_dir.is_dir()

239
tests/test_paper.py 100644
View File

@ -0,0 +1,239 @@
'''
Paper-mode testing
'''
import trio
from exceptiongroup import BaseExceptionGroup
from typing import (
AsyncContextManager,
Literal,
)
import pytest
from tractor._exceptions import ContextCancelled
from uuid import uuid4
from functools import partial
from piker.log import get_logger
from piker.clearing._messages import Order
from piker.pp import (
open_trade_ledger,
open_pps,
)
log = get_logger(__name__)
def get_fqsn(broker, symbol):
fqsn = f'{symbol}.{broker}'
return (fqsn, symbol, broker)
oid = ''
test_exec_mode = 'live'
(fqsn, symbol, broker) = get_fqsn('kraken', 'xbtusdt')
brokers = [broker]
account = 'paper'
async def _async_main(
open_test_pikerd_and_ems: AsyncContextManager,
action: Literal['buy', 'sell'] | None = None,
price: int = 30000,
executions: int = 1,
size: float = 0.01,
# Assert options
assert_entries: bool = False,
assert_pps: bool = False,
assert_zeroed_pps: bool = False,
assert_msg: bool = False,
) -> None:
'''
Start piker, place a trade and assert data in
pps stream, ledger and position table.
'''
oid: str = ''
last_msg = {}
async with open_test_pikerd_and_ems() as (
services,
(book, trades_stream, pps, accounts, dialogs),
):
if action:
for x in range(executions):
oid = str(uuid4())
order = Order(
exec_mode=test_exec_mode,
action=action,
oid=oid,
account=account,
size=size,
symbol=fqsn,
price=price,
brokers=brokers,
)
# This is actually a syncronous call to push a message
book.send(order)
async for msg in trades_stream:
last_msg = msg
match msg:
# Wait for position message before moving on
case {'name': 'position'}:
break
if (
assert_entries
or assert_pps
or assert_zeroed_pps
or assert_msg
):
_assert(
assert_entries,
assert_pps,
assert_zeroed_pps,
pps,
last_msg,
size,
executions,
)
# Teardown piker like a user would
raise KeyboardInterrupt
def _assert(
assert_entries,
assert_pps,
assert_zerod_pps,
pps,
last_msg,
size,
executions,
):
with (
open_pps(broker, account, write_on_exit=False) as table,
):
'''
Assert multiple cases including pps,
ledger and final position message state
'''
if assert_entries:
for key, val in [
('broker', broker),
('account', account),
('symbol', fqsn),
('size', size * executions),
('currency', symbol),
('avg_price', table.pps[symbol].ppu)
]:
assert last_msg[key] == val
if assert_pps:
last_ppu = pps[(broker, account)][-1]
assert last_ppu['avg_price'] == table.pps[symbol].ppu
if assert_zerod_pps:
assert not bool(table.pps)
def _run_test_and_check(fn):
'''
Close position and assert empty position in pps
'''
with pytest.raises(BaseExceptionGroup) as exc_info:
trio.run(fn)
for exception in exc_info.value.exceptions:
assert isinstance(exception, KeyboardInterrupt) or isinstance(
exception, ContextCancelled
)
def test_buy(
open_test_pikerd_and_ems: AsyncContextManager,
delete_testing_dir
):
'''
Enter a trade and assert entries are made in pps and ledger files.
'''
_run_test_and_check(
partial(
_async_main,
open_test_pikerd_and_ems=open_test_pikerd_and_ems,
action='buy',
assert_entries=True,
),
)
# Open ems and assert existence of pps entries
_run_test_and_check(
partial(
_async_main,
open_test_pikerd_and_ems=open_test_pikerd_and_ems,
assert_pps=True,
),
)
def test_sell(
open_test_pikerd_and_ems: AsyncContextManager,
delete_testing_dir
):
'''
Sell position and ensure pps are zeroed.
'''
_run_test_and_check(
partial(
_async_main,
open_test_pikerd_and_ems=open_test_pikerd_and_ems,
action='sell',
price=1,
),
)
_run_test_and_check(
partial(
_async_main,
open_test_pikerd_and_ems=open_test_pikerd_and_ems,
assert_zeroed_pps=True,
),
)
@pytest.mark.xfail(reason='Due to precision issues, this test will currently fail')
def test_multi_sell(
open_test_pikerd_and_ems: AsyncContextManager,
delete_testing_dir
):
'''
Make 5 market limit buy orders and
then sell 5 slots at the same price.
Finally, assert cleared positions.
'''
_run_test_and_check(
partial(
_async_main,
open_test_pikerd_and_ems=open_test_pikerd_and_ems,
action='buy',
executions=5,
),
)
_run_test_and_check(
partial(
_async_main,
open_test_pikerd_and_ems=open_test_pikerd_and_ems,
action='sell',
executions=5,
price=1,
assert_zeroed_pps=True,
),
)

View File

@ -9,6 +9,7 @@ import pytest
import trio
import tractor
from piker.log import get_logger
from piker._daemon import (
find_service,
Services,
@ -174,5 +175,5 @@ def test_ensure_ems_in_paper_actors(
) as exc_info:
trio.run(main)
cancel_msg: str = '`_emsd_main()` was remotely cancelled by its caller'
cancel_msg: str = '_emsd_main()` was remotely cancelled by its caller'
assert cancel_msg in exc_info.value.args[0]