Compare commits

...

10 Commits

9 changed files with 221 additions and 171 deletions

View File

@ -236,6 +236,7 @@ async def open_ems(
or mode == 'paper' or mode == 'paper'
): ):
mode = 'paper' mode = 'paper'
from ._ems import _emsd_main from ._ems import _emsd_main
async with ( async with (
# connect to emsd # connect to emsd
@ -254,6 +255,7 @@ async def open_ems(
dialogs, dialogs,
) )
), ),
# open 2-way trade command stream # open 2-way trade command stream
ctx.open_stream() as trades_stream, ctx.open_stream() as trades_stream,
): ):
@ -264,6 +266,7 @@ async def open_ems(
fqsn, fqsn,
trades_stream trades_stream
) )
yield ( yield (
book, book,
trades_stream, trades_stream,

View File

@ -1452,6 +1452,7 @@ async def _emsd_main(
brokerd_stream = relay.brokerd_stream brokerd_stream = relay.brokerd_stream
dark_book = _router.get_dark_book(broker) dark_book = _router.get_dark_book(broker)
# signal to client that we're started and deliver # signal to client that we're started and deliver
# all known pps and accounts for this ``brokerd``. # all known pps and accounts for this ``brokerd``.
await ems_ctx.started(( await ems_ctx.started((

View File

@ -30,6 +30,7 @@ from typing import (
Callable, Callable,
) )
import uuid import uuid
from bidict import bidict from bidict import bidict
import pendulum import pendulum
import trio import trio
@ -42,7 +43,6 @@ from ..pp import (
Transaction, Transaction,
open_trade_ledger, open_trade_ledger,
open_pps, open_pps,
load_pps_from_ledger
) )
from ..data._normalize import iterticks from ..data._normalize import iterticks
from ..data._source import unpack_fqsn from ..data._source import unpack_fqsn
@ -81,7 +81,6 @@ class PaperBoi(Struct):
_reqids: bidict _reqids: bidict
_positions: dict[str, Position] _positions: dict[str, Position]
_trade_ledger: dict[str, Any] _trade_ledger: dict[str, Any]
_txn_dict: dict[str, Transaction] = {}
# init edge case L1 spread # init edge case L1 spread
last_ask: tuple[float, float] = (float('inf'), 0) # price, size last_ask: tuple[float, float] = (float('inf'), 0) # price, size
@ -263,12 +262,11 @@ class PaperBoi(Struct):
with ( with (
open_trade_ledger(self.broker, 'paper') as ledger, open_trade_ledger(self.broker, 'paper') as ledger,
open_pps(self.broker, 'piker-paper') as table open_pps(self.broker, 'paper') as table
): ):
ledger.update({oid: t.to_dict()}) ledger.update({oid: t.to_dict()})
# Write to pps toml right now # Write to pps toml right now
table.update_from_trans({oid: t}) table.update_from_trans({oid: t})
load_pps_from_ledger(self.broker, 'piker-paper')
pp = table.pps[key] pp = table.pps[key]
pp_msg = BrokerdPosition( pp_msg = BrokerdPosition(
@ -532,7 +530,7 @@ async def trades_dialogue(
): ):
with open_pps(broker, 'piker-paper', False) as table: with open_pps(broker, 'paper', False) as table:
# save pps in local state # save pps in local state
_positions.update(table.pps) _positions.update(table.pps)

View File

@ -33,6 +33,7 @@ from .log import get_logger
log = get_logger('broker-config') log = get_logger('broker-config')
# taken from ``click`` since apparently they have some # taken from ``click`` since apparently they have some
# super weirdness with sigint and sudo..no clue # super weirdness with sigint and sudo..no clue
def get_app_dir(app_name, roaming=True, force_posix=False): def get_app_dir(app_name, roaming=True, force_posix=False):
@ -70,6 +71,7 @@ def get_app_dir(app_name, roaming=True, force_posix=False):
dot instead of the XDG config home or darwin's dot instead of the XDG config home or darwin's
application support folder. application support folder.
""" """
def _posixify(name): def _posixify(name):
return "-".join(name.split()).lower() return "-".join(name.split()).lower()
@ -92,13 +94,14 @@ def get_app_dir(app_name, roaming=True, force_posix=False):
os.path.expanduser("~/.{}".format(_posixify(app_name)))) os.path.expanduser("~/.{}".format(_posixify(app_name))))
if sys.platform == "darwin": if sys.platform == "darwin":
return os.path.join( return os.path.join(
os.path.expanduser("~/Los.mkdir(_config_dir)ibrary/Application Support"), app_name os.path.expanduser("~/Library/Application Support"), app_name
) )
return os.path.join( return os.path.join(
os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")), os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")),
_posixify(app_name), _posixify(app_name),
) )
_config_dir = _click_config_dir = get_app_dir('piker') _config_dir = _click_config_dir = get_app_dir('piker')
_parent_user = os.environ.get('SUDO_USER') _parent_user = os.environ.get('SUDO_USER')

View File

@ -1467,7 +1467,7 @@ async def maybe_open_feed(
'tick_throttle': kwargs.get('tick_throttle'), 'tick_throttle': kwargs.get('tick_throttle'),
# XXX: super critical to have bool defaults here XD # XXX: super critical to have bool defaults here XD
# 'backpressure': kwargs.get('backpressure', True), 'backpressure': kwargs.get('backpressure', True),
'start_stream': kwargs.get('start_stream', True), 'start_stream': kwargs.get('start_stream', True),
}, },
key=fqsn, key=fqsn,

View File

@ -35,6 +35,7 @@ from typing import (
) )
import time import time
from math import isnan from math import isnan
from pathlib import Path
from bidict import bidict from bidict import bidict
from msgspec.msgpack import encode, decode from msgspec.msgpack import encode, decode
@ -134,7 +135,7 @@ def start_marketstore(
# create dirs when dne # create dirs when dne
if not os.path.isdir(config._config_dir): if not os.path.isdir(config._config_dir):
os.mkdir(config._config_dir) Path(config._config_dir).mkdir(parents=True, exist_ok=True)
if not os.path.isdir(mktsdir): if not os.path.isdir(mktsdir):
os.mkdir(mktsdir) os.mkdir(mktsdir)

View File

@ -35,7 +35,6 @@ from typing import (
Union, Union,
Generator Generator
) )
from typing import Generator
import pendulum import pendulum
from pendulum import datetime, now from pendulum import datetime, now
@ -56,6 +55,7 @@ log = get_logger(__name__)
def open_trade_ledger( def open_trade_ledger(
broker: str, broker: str,
account: str, account: str,
) -> Generator[dict, None, None]: ) -> Generator[dict, None, None]:
''' '''
Indempotently create and read in a trade log file from the Indempotently create and read in a trade log file from the
@ -84,6 +84,7 @@ def open_trade_ledger(
ledger = tomli.load(cf) ledger = tomli.load(cf)
print(f'Ledger load took {time.time() - start}s') print(f'Ledger load took {time.time() - start}s')
cpy = ledger.copy() cpy = ledger.copy()
try: try:
yield cpy yield cpy
finally: finally:
@ -544,6 +545,7 @@ class PpTable(Struct):
pps = self.pps pps = self.pps
updated: dict[str, Position] = {} updated: dict[str, Position] = {}
# lifo update all pps from records # lifo update all pps from records
for tid, t in trans.items(): for tid, t in trans.items():
@ -583,7 +585,6 @@ class PpTable(Struct):
pp.add_clear(t) pp.add_clear(t)
updated[t.bsuid] = pp updated[t.bsuid] = pp
# minimize clears tables and update sizing. # minimize clears tables and update sizing.
for bsuid, pp in updated.items(): for bsuid, pp in updated.items():
pp.ensure_state() pp.ensure_state()
@ -680,6 +681,7 @@ class PpTable(Struct):
# TODO: show diff output? # TODO: show diff output?
# https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries
print(f'Updating ``pps.toml`` for {path}:\n') print(f'Updating ``pps.toml`` for {path}:\n')
# active, closed_pp_objs = table.dump_active() # active, closed_pp_objs = table.dump_active()
pp_entries = self.to_toml() pp_entries = self.to_toml()
self.conf[self.brokername][self.acctid] = pp_entries self.conf[self.brokername][self.acctid] = pp_entries
@ -975,7 +977,6 @@ def open_pps(
pp.ensure_state() pp.ensure_state()
try: try:
# breakpoint()
yield table yield table
finally: finally:
if write_on_exit: if write_on_exit:

View File

@ -1,6 +1,9 @@
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
import os import os
from typing import AsyncContextManager
from pathlib import Path
from shutil import rmtree
import pytest import pytest
import tractor import tractor
@ -11,6 +14,7 @@ from piker import (
from piker._daemon import ( from piker._daemon import (
Services, Services,
) )
from piker.clearing._client import open_ems
def pytest_addoption(parser): def pytest_addoption(parser):
@ -132,3 +136,49 @@ def open_test_pikerd(
# - no leaked subprocs or shm buffers # - no leaked subprocs or shm buffers
# - all requested container service are torn down # - all requested container service are torn down
# - certain ``tractor`` runtime state? # - certain ``tractor`` runtime state?
@acm
async def _open_test_pikerd_and_ems(
fqsn,
mode,
loglevel,
open_test_pikerd
):
async with (
open_test_pikerd() as (_, _, _, services),
open_ems(
fqsn,
mode=mode,
loglevel=loglevel,
) as ems_services):
yield (services, ems_services)
@pytest.fixture
def open_test_pikerd_and_ems(
open_test_pikerd,
fqsn: str = 'xbtusdt.kraken',
mode: str = 'paper',
loglevel: str = 'info',
):
yield partial(
_open_test_pikerd_and_ems,
fqsn,
mode,
loglevel,
open_test_pikerd
)
@pytest.fixture(scope='session')
def delete_testing_dir():
'''This fixture removes the temp directory
used for storing all config/ledger/pp data
created during testing sessions
'''
yield
app_dir = Path(config.get_app_dir('piker')).resolve()
if app_dir.is_dir():
rmtree(str(app_dir))
assert not app_dir.is_dir()

View File

@ -1,16 +1,13 @@
""" '''
Paper-mode testing Paper-mode testing
""" '''
import trio import trio
import math
from shutil import rmtree
from exceptiongroup import BaseExceptionGroup from exceptiongroup import BaseExceptionGroup
from typing import ( from typing import (
AsyncContextManager, AsyncContextManager,
Literal, Literal,
) )
from pathlib import Path
import pytest import pytest
import tractor import tractor
@ -18,94 +15,61 @@ from tractor._exceptions import ContextCancelled
from uuid import uuid4 from uuid import uuid4
from functools import partial from functools import partial
from piker.config import get_app_dir
from piker.log import get_logger from piker.log import get_logger
from piker.clearing._messages import Order from piker.clearing._messages import Order
from piker.pp import ( from piker.pp import (
PpTable,
open_trade_ledger, open_trade_ledger,
open_pps, open_pps,
) )
from piker.clearing import (
open_ems,
)
from piker.clearing._client import (
OrderBook,
)
from piker._cacheables import open_cached_client
from piker.clearing._messages import BrokerdPosition
log = get_logger(__name__) log = get_logger(__name__)
@pytest.fixture(scope="session")
def delete_testing_dir():
"""This fixture removes the temp directory
used for storing all config/ledger/pp data
created during testing sessions
"""
yield
app_dir = Path(get_app_dir("piker")).resolve()
if app_dir.is_dir():
rmtree(str(app_dir))
assert not app_dir.is_dir()
def get_fqsn(broker, symbol): def get_fqsn(broker, symbol):
fqsn = f"{symbol}.{broker}" fqsn = f'{symbol}.{broker}'
return (fqsn, symbol, broker) return (fqsn, symbol, broker)
def test_paper_trade(open_test_pikerd: AsyncContextManager, delete_testing_dir): oid = ''
oid = "" test_exec_mode = 'live'
test_exec_mode = "live" (fqsn, symbol, broker) = get_fqsn('kraken', 'xbtusdt')
test_account = "paper" brokers = [broker]
test_size = 1 account = 'paper'
(fqsn, symbol, broker) = get_fqsn("kraken", "xbtusdt")
brokers = [broker]
test_pp_account = "piker-paper"
positions: dict[
# brokername, acctid
tuple[str, str],
list[BrokerdPosition],
]
async def _async_main(
action: Literal["buy", "sell"] | None = None, async def _async_main(
open_test_pikerd_and_ems: AsyncContextManager,
action: Literal['buy', 'sell'] | None = None,
price: int = 30000, price: int = 30000,
executions: int = 1,
size: float = 0.01,
# Assert options
assert_entries: bool = False, assert_entries: bool = False,
assert_pps: bool = False, assert_pps: bool = False,
assert_zeroed_pps: bool = False, assert_zeroed_pps: bool = False,
assert_msg: bool = False,
) -> None: ) -> None:
"""Spawn a paper piper actor, place a trade and assert entries are present '''
in both trade ledger and pps tomls. Then restart piker and ensure Start piker, place a trade and assert data in
that pps from previous trade exists in the ems pps. pps stream, ledger and position table.
Finally close the position and ensure that the position in pps.toml is closed. '''
"""
nonlocal oid
book: OrderBook
nonlocal positions
# Set up piker and EMS oid: str = ''
async with ( last_msg = {}
open_test_pikerd() as (_, _, _, services),
open_ems(fqsn, mode="paper") as ( async with open_test_pikerd_and_ems() as (
book, services,
trades_stream, (book, trades_stream, pps, accounts, dialogs),
pps,
accounts,
dialogs,
),
): ):
# Send order to EMS
if action: if action:
for x in range(executions):
oid = str(uuid4()) oid = str(uuid4())
order = Order( order = Order(
exec_mode=test_exec_mode, exec_mode=test_exec_mode,
action=action, action=action,
oid=oid, oid=oid,
account=test_account, account=account,
size=test_size, size=size,
symbol=fqsn, symbol=fqsn,
price=price, price=price,
brokers=brokers, brokers=brokers,
@ -114,45 +78,63 @@ def test_paper_trade(open_test_pikerd: AsyncContextManager, delete_testing_dir):
# to the async ems clue - hence why we call trio.sleep afterwards # to the async ems clue - hence why we call trio.sleep afterwards
book.send(order) book.send(order)
await trio.sleep(2) async for msg in trades_stream:
last_msg = msg
match msg:
# Wait for position message before moving on
case {'name': 'position'}:
break
# Assert entries are made in both ledger and PPS if assert_entries or assert_pps or assert_zeroed_pps or assert_msg:
if assert_entries or assert_pps or assert_zeroed_pps: _assert(
_assert(assert_entries, assert_pps, assert_zeroed_pps, pps) assert_entries,
assert_pps,
# Close piker like a user would assert_zeroed_pps,
raise KeyboardInterrupt pps,
last_msg,
def _assert(assert_entries: bool, assert_pps: bool, assert_zerod_pps, pps): size,
with ( executions,
open_trade_ledger(broker, test_account) as ledger,
open_pps(broker, test_pp_account) as table,
):
# assert that entires are have been written
if assert_entries:
cleared_ledger_entry = ledger[oid]
assert list(ledger.keys())[-1] == oid
assert cleared_ledger_entry["size"] == test_size
assert cleared_ledger_entry["fqsn"] == fqsn
pp_price = table.conf[broker][test_pp_account][fqsn]["ppu"]
# Ensure the price-per-unit (breakeven) price is close to our clearing price
assert math.isclose(pp_price, cleared_ledger_entry["size"], rel_tol=1)
assert table.brokername == broker
assert table.acctid == test_pp_account
# assert that the last pps price is the same as the ledger price
if assert_pps:
assert (
pps[(broker, test_account)][-1]["avg_price"] == ledger[oid]["price"]
) )
if assert_zerod_pps: # Teardown piker like a user would
# assert that positions are present raise KeyboardInterrupt
assert not bool(table)
# Close position and assert empty position in pps
def _run_test_and_check(exception, fn): def _assert(
with pytest.raises(exception) as exc_info: assert_entries,
assert_pps,
assert_zerod_pps,
pps,
last_msg,
size,
executions,
):
with (
open_trade_ledger(broker, account) as ledger,
open_pps(broker, account) as table,
):
'''
Assert multiple cases including pps, ledger and final position message state
'''
if assert_entries:
assert last_msg['broker'] == broker
assert last_msg['account'] == account
assert last_msg['symbol'] == fqsn
assert last_msg['size'] == size * executions
assert last_msg['currency'] == symbol
assert last_msg['avg_price'] == table.pps[symbol].ppu
if assert_pps:
last_ppu = pps[(broker, account)][-1]
assert last_ppu['avg_price'] == table.pps[symbol].ppu
if assert_zerod_pps:
assert not bool(table.pps)
# Close position and assert empty position in pps
def _run_test_and_check(fn):
with pytest.raises(BaseExceptionGroup) as exc_info:
trio.run(fn) trio.run(fn)
for exception in exc_info.value.exceptions: for exception in exc_info.value.exceptions:
@ -160,57 +142,68 @@ def test_paper_trade(open_test_pikerd: AsyncContextManager, delete_testing_dir):
exception, ContextCancelled exception, ContextCancelled
) )
# Setablend and execute a trade and assert trade
_run_test_and_check(
BaseExceptionGroup,
partial(_async_main, action="buy", assert_entries=True),
)
def test_buy(open_test_pikerd_and_ems: AsyncContextManager, delete_testing_dir):
# Enter a trade and assert entries are made in pps and ledger files
_run_test_and_check( _run_test_and_check(
BaseExceptionGroup,
partial(_async_main, assert_pps=True),
)
_run_test_and_check(
BaseExceptionGroup,
partial( partial(
_async_main, action="sell", price=1 _async_main,
open_test_pikerd_and_ems=open_test_pikerd_and_ems,
action='buy',
assert_entries=True,
), ),
) )
# Open ems and assert existence of pps entries
_run_test_and_check( _run_test_and_check(
BaseExceptionGroup,
partial( partial(
_async_main, assert_zeroed_pps=True _async_main,
open_test_pikerd_and_ems=open_test_pikerd_and_ems,
assert_pps=True,
), ),
) )
def test_sell(open_test_pikerd_and_ems: AsyncContextManager, delete_testing_dir):
# Sell position
_run_test_and_check(
partial(
_async_main,
open_test_pikerd_and_ems=open_test_pikerd_and_ems,
action='sell',
price=1,
),
)
# def test_paper_client(open_test_pikerd: AsyncContextManager): # Ensure pps are zeroed
# async def _async_main( _run_test_and_check(
# open_pikerd: AsyncContextManager, partial(
# ): _async_main,
# (fqsn, symbol, broker) = get_fqsn("kraken", "xbtusdt") open_test_pikerd_and_ems=open_test_pikerd_and_ems,
# async with ( assert_zeroed_pps=True,
# open_pikerd() as (_, _, _, services), ),
# open_ems(fqsn, mode="paper") as ( )
# book,
# trades_stream, @pytest.mark.skip(reason="Due to precision issues, this test will currently fail")
# pps, def test_multi_sell(open_test_pikerd_and_ems: AsyncContextManager, delete_testing_dir):
# accounts, # Make 5 market limit buy orders
# dialogs, _run_test_and_check(
# ), partial(
# ): _async_main,
# # async with open_cached_client(broker) as client: open_test_pikerd_and_ems=open_test_pikerd_and_ems,
# # symbol_info = await client.symbol_info() action='buy',
# # print(f'client: {symbol_info['XBTUSDT']}') executions=5,
# with (open_pps(broker, "piker-paper") as table,): ),
# print(f"table: {table}") )
#
# trio.run( # Sell 5 slots at the same price, assert cleared positions
# partial( _run_test_and_check(
# _async_main, partial(
# open_pikerd=open_test_pikerd, _async_main,
# ), open_test_pikerd_and_ems=open_test_pikerd_and_ems,
# ) action='sell',
executions=5,
price=1,
assert_zeroed_pps=True,
),
)