Compare commits

..

No commits in common. "a5edaa9b5c9c9232d18fa6b970419a693d2eb78d" and "4eb9b68b0e6e69d03294dc0f16120e10ec4ab3d1" have entirely different histories.

9 changed files with 173 additions and 223 deletions

View File

@ -236,7 +236,6 @@ async def open_ems(
or mode == 'paper'
):
mode = 'paper'
from ._ems import _emsd_main
async with (
# connect to emsd
@ -255,7 +254,6 @@ async def open_ems(
dialogs,
)
),
# open 2-way trade command stream
ctx.open_stream() as trades_stream,
):
@ -266,7 +264,6 @@ async def open_ems(
fqsn,
trades_stream
)
yield (
book,
trades_stream,

View File

@ -1452,7 +1452,6 @@ async def _emsd_main(
brokerd_stream = relay.brokerd_stream
dark_book = _router.get_dark_book(broker)
# signal to client that we're started and deliver
# all known pps and accounts for this ``brokerd``.
await ems_ctx.started((

View File

@ -30,7 +30,6 @@ from typing import (
Callable,
)
import uuid
from bidict import bidict
import pendulum
import trio
@ -43,6 +42,7 @@ from ..pp import (
Transaction,
open_trade_ledger,
open_pps,
load_pps_from_ledger
)
from ..data._normalize import iterticks
from ..data._source import unpack_fqsn
@ -81,6 +81,7 @@ class PaperBoi(Struct):
_reqids: bidict
_positions: dict[str, Position]
_trade_ledger: dict[str, Any]
_txn_dict: dict[str, Transaction] = {}
# init edge case L1 spread
last_ask: tuple[float, float] = (float('inf'), 0) # price, size
@ -262,11 +263,12 @@ class PaperBoi(Struct):
with (
open_trade_ledger(self.broker, 'paper') as ledger,
open_pps(self.broker, 'paper') as table
open_pps(self.broker, 'piker-paper') as table
):
ledger.update({oid: t.to_dict()})
# Write to pps toml right now
table.update_from_trans({oid: t})
load_pps_from_ledger(self.broker, 'piker-paper')
pp = table.pps[key]
pp_msg = BrokerdPosition(
@ -530,7 +532,7 @@ async def trades_dialogue(
):
with open_pps(broker, 'paper', False) as table:
with open_pps(broker, 'piker-paper', False) as table:
# save pps in local state
_positions.update(table.pps)

View File

@ -33,7 +33,6 @@ from .log import get_logger
log = get_logger('broker-config')
# taken from ``click`` since apparently they have some
# super weirdness with sigint and sudo..no clue
def get_app_dir(app_name, roaming=True, force_posix=False):
@ -71,7 +70,6 @@ def get_app_dir(app_name, roaming=True, force_posix=False):
dot instead of the XDG config home or darwin's
application support folder.
"""
def _posixify(name):
return "-".join(name.split()).lower()
@ -94,14 +92,13 @@ def get_app_dir(app_name, roaming=True, force_posix=False):
os.path.expanduser("~/.{}".format(_posixify(app_name))))
if sys.platform == "darwin":
return os.path.join(
os.path.expanduser("~/Library/Application Support"), app_name
os.path.expanduser("~/Los.mkdir(_config_dir)ibrary/Application Support"), app_name
)
return os.path.join(
os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")),
_posixify(app_name),
)
_config_dir = _click_config_dir = get_app_dir('piker')
_parent_user = os.environ.get('SUDO_USER')

View File

@ -1467,7 +1467,7 @@ async def maybe_open_feed(
'tick_throttle': kwargs.get('tick_throttle'),
# XXX: super critical to have bool defaults here XD
'backpressure': kwargs.get('backpressure', True),
# 'backpressure': kwargs.get('backpressure', True),
'start_stream': kwargs.get('start_stream', True),
},
key=fqsn,

View File

@ -35,7 +35,6 @@ from typing import (
)
import time
from math import isnan
from pathlib import Path
from bidict import bidict
from msgspec.msgpack import encode, decode
@ -135,7 +134,7 @@ def start_marketstore(
# create dirs when dne
if not os.path.isdir(config._config_dir):
Path(config._config_dir).mkdir(parents=True, exist_ok=True)
os.mkdir(config._config_dir)
if not os.path.isdir(mktsdir):
os.mkdir(mktsdir)

View File

@ -35,6 +35,7 @@ from typing import (
Union,
Generator
)
from typing import Generator
import pendulum
from pendulum import datetime, now
@ -55,7 +56,6 @@ log = get_logger(__name__)
def open_trade_ledger(
broker: str,
account: str,
) -> Generator[dict, None, None]:
'''
Indempotently create and read in a trade log file from the
@ -84,7 +84,6 @@ def open_trade_ledger(
ledger = tomli.load(cf)
print(f'Ledger load took {time.time() - start}s')
cpy = ledger.copy()
try:
yield cpy
finally:
@ -545,7 +544,6 @@ class PpTable(Struct):
pps = self.pps
updated: dict[str, Position] = {}
# lifo update all pps from records
for tid, t in trans.items():
@ -585,6 +583,7 @@ class PpTable(Struct):
pp.add_clear(t)
updated[t.bsuid] = pp
# minimize clears tables and update sizing.
for bsuid, pp in updated.items():
pp.ensure_state()
@ -681,7 +680,6 @@ class PpTable(Struct):
# TODO: show diff output?
# https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries
print(f'Updating ``pps.toml`` for {path}:\n')
# active, closed_pp_objs = table.dump_active()
pp_entries = self.to_toml()
self.conf[self.brokername][self.acctid] = pp_entries
@ -977,6 +975,7 @@ def open_pps(
pp.ensure_state()
try:
# breakpoint()
yield table
finally:
if write_on_exit:

View File

@ -1,9 +1,6 @@
from contextlib import asynccontextmanager as acm
from functools import partial
import os
from typing import AsyncContextManager
from pathlib import Path
from shutil import rmtree
import pytest
import tractor
@ -14,7 +11,6 @@ from piker import (
from piker._daemon import (
Services,
)
from piker.clearing._client import open_ems
def pytest_addoption(parser):
@ -136,49 +132,3 @@ def open_test_pikerd(
# - no leaked subprocs or shm buffers
# - all requested container service are torn down
# - certain ``tractor`` runtime state?
@acm
async def _open_test_pikerd_and_ems(
fqsn,
mode,
loglevel,
open_test_pikerd
):
async with (
open_test_pikerd() as (_, _, _, services),
open_ems(
fqsn,
mode=mode,
loglevel=loglevel,
) as ems_services):
yield (services, ems_services)
@pytest.fixture
def open_test_pikerd_and_ems(
open_test_pikerd,
fqsn: str = 'xbtusdt.kraken',
mode: str = 'paper',
loglevel: str = 'info',
):
yield partial(
_open_test_pikerd_and_ems,
fqsn,
mode,
loglevel,
open_test_pikerd
)
@pytest.fixture(scope='session')
def delete_testing_dir():
'''This fixture removes the temp directory
used for storing all config/ledger/pp data
created during testing sessions
'''
yield
app_dir = Path(config.get_app_dir('piker')).resolve()
if app_dir.is_dir():
rmtree(str(app_dir))
assert not app_dir.is_dir()

View File

@ -1,13 +1,16 @@
'''
"""
Paper-mode testing
'''
"""
import trio
import math
from shutil import rmtree
from exceptiongroup import BaseExceptionGroup
from typing import (
AsyncContextManager,
Literal,
)
from pathlib import Path
import pytest
import tractor
@ -15,61 +18,94 @@ from tractor._exceptions import ContextCancelled
from uuid import uuid4
from functools import partial
from piker.config import get_app_dir
from piker.log import get_logger
from piker.clearing._messages import Order
from piker.pp import (
PpTable,
open_trade_ledger,
open_pps,
)
from piker.clearing import (
open_ems,
)
from piker.clearing._client import (
OrderBook,
)
from piker._cacheables import open_cached_client
from piker.clearing._messages import BrokerdPosition
log = get_logger(__name__)
@pytest.fixture(scope="session")
def delete_testing_dir():
"""This fixture removes the temp directory
used for storing all config/ledger/pp data
created during testing sessions
"""
yield
app_dir = Path(get_app_dir("piker")).resolve()
if app_dir.is_dir():
rmtree(str(app_dir))
assert not app_dir.is_dir()
def get_fqsn(broker, symbol):
fqsn = f'{symbol}.{broker}'
fqsn = f"{symbol}.{broker}"
return (fqsn, symbol, broker)
oid = ''
test_exec_mode = 'live'
(fqsn, symbol, broker) = get_fqsn('kraken', 'xbtusdt')
def test_paper_trade(open_test_pikerd: AsyncContextManager, delete_testing_dir):
oid = ""
test_exec_mode = "live"
test_account = "paper"
test_size = 1
(fqsn, symbol, broker) = get_fqsn("kraken", "xbtusdt")
brokers = [broker]
account = 'paper'
test_pp_account = "piker-paper"
positions: dict[
# brokername, acctid
tuple[str, str],
list[BrokerdPosition],
]
async def _async_main(
open_test_pikerd_and_ems: AsyncContextManager,
action: Literal['buy', 'sell'] | None = None,
action: Literal["buy", "sell"] | None = None,
price: int = 30000,
executions: int = 1,
size: float = 0.01,
# Assert options
assert_entries: bool = False,
assert_pps: bool = False,
assert_zeroed_pps: bool = False,
assert_msg: bool = False,
) -> None:
'''
Start piker, place a trade and assert data in
pps stream, ledger and position table.
'''
"""Spawn a paper piper actor, place a trade and assert entries are present
in both trade ledger and pps tomls. Then restart piker and ensure
that pps from previous trade exists in the ems pps.
Finally close the position and ensure that the position in pps.toml is closed.
"""
nonlocal oid
book: OrderBook
nonlocal positions
oid: str = ''
last_msg = {}
async with open_test_pikerd_and_ems() as (
services,
(book, trades_stream, pps, accounts, dialogs),
# Set up piker and EMS
async with (
open_test_pikerd() as (_, _, _, services),
open_ems(fqsn, mode="paper") as (
book,
trades_stream,
pps,
accounts,
dialogs,
),
):
# Send order to EMS
if action:
for x in range(executions):
oid = str(uuid4())
order = Order(
exec_mode=test_exec_mode,
action=action,
oid=oid,
account=account,
size=size,
account=test_account,
size=test_size,
symbol=fqsn,
price=price,
brokers=brokers,
@ -78,63 +114,45 @@ async def _async_main(
# to the async ems clue - hence why we call trio.sleep afterwards
book.send(order)
async for msg in trades_stream:
last_msg = msg
match msg:
# Wait for position message before moving on
case {'name': 'position'}:
break
await trio.sleep(2)
if assert_entries or assert_pps or assert_zeroed_pps or assert_msg:
_assert(
assert_entries,
assert_pps,
assert_zeroed_pps,
pps,
last_msg,
size,
executions,
)
# Assert entries are made in both ledger and PPS
if assert_entries or assert_pps or assert_zeroed_pps:
_assert(assert_entries, assert_pps, assert_zeroed_pps, pps)
# Teardown piker like a user would
# Close piker like a user would
raise KeyboardInterrupt
def _assert(
assert_entries,
assert_pps,
assert_zerod_pps,
pps,
last_msg,
size,
executions,
):
def _assert(assert_entries: bool, assert_pps: bool, assert_zerod_pps, pps):
with (
open_trade_ledger(broker, account) as ledger,
open_pps(broker, account) as table,
open_trade_ledger(broker, test_account) as ledger,
open_pps(broker, test_pp_account) as table,
):
'''
Assert multiple cases including pps, ledger and final position message state
'''
# assert that entires are have been written
if assert_entries:
assert last_msg['broker'] == broker
assert last_msg['account'] == account
assert last_msg['symbol'] == fqsn
assert last_msg['size'] == size * executions
assert last_msg['currency'] == symbol
assert last_msg['avg_price'] == table.pps[symbol].ppu
cleared_ledger_entry = ledger[oid]
assert list(ledger.keys())[-1] == oid
assert cleared_ledger_entry["size"] == test_size
assert cleared_ledger_entry["fqsn"] == fqsn
pp_price = table.conf[broker][test_pp_account][fqsn]["ppu"]
# Ensure the price-per-unit (breakeven) price is close to our clearing price
assert math.isclose(pp_price, cleared_ledger_entry["size"], rel_tol=1)
assert table.brokername == broker
assert table.acctid == test_pp_account
# assert that the last pps price is the same as the ledger price
if assert_pps:
last_ppu = pps[(broker, account)][-1]
assert last_ppu['avg_price'] == table.pps[symbol].ppu
assert (
pps[(broker, test_account)][-1]["avg_price"] == ledger[oid]["price"]
)
if assert_zerod_pps:
assert not bool(table.pps)
# assert that positions are present
assert not bool(table)
# Close position and assert empty position in pps
def _run_test_and_check(fn):
with pytest.raises(BaseExceptionGroup) as exc_info:
def _run_test_and_check(exception, fn):
with pytest.raises(exception) as exc_info:
trio.run(fn)
for exception in exc_info.value.exceptions:
@ -142,68 +160,57 @@ def _run_test_and_check(fn):
exception, ContextCancelled
)
def test_buy(open_test_pikerd_and_ems: AsyncContextManager, delete_testing_dir):
# Enter a trade and assert entries are made in pps and ledger files
# Setablend and execute a trade and assert trade
_run_test_and_check(
BaseExceptionGroup,
partial(_async_main, action="buy", assert_entries=True),
)
_run_test_and_check(
BaseExceptionGroup,
partial(_async_main, assert_pps=True),
)
_run_test_and_check(
BaseExceptionGroup,
partial(
_async_main,
open_test_pikerd_and_ems=open_test_pikerd_and_ems,
action='buy',
assert_entries=True,
_async_main, action="sell", price=1
),
)
# Open ems and assert existence of pps entries
_run_test_and_check(
BaseExceptionGroup,
partial(
_async_main,
open_test_pikerd_and_ems=open_test_pikerd_and_ems,
assert_pps=True,
_async_main, assert_zeroed_pps=True
),
)
def test_sell(open_test_pikerd_and_ems: AsyncContextManager, delete_testing_dir):
# Sell position
_run_test_and_check(
partial(
_async_main,
open_test_pikerd_and_ems=open_test_pikerd_and_ems,
action='sell',
price=1,
),
)
# Ensure pps are zeroed
_run_test_and_check(
partial(
_async_main,
open_test_pikerd_and_ems=open_test_pikerd_and_ems,
assert_zeroed_pps=True,
),
)
@pytest.mark.skip(reason="Due to precision issues, this test will currently fail")
def test_multi_sell(open_test_pikerd_and_ems: AsyncContextManager, delete_testing_dir):
# Make 5 market limit buy orders
_run_test_and_check(
partial(
_async_main,
open_test_pikerd_and_ems=open_test_pikerd_and_ems,
action='buy',
executions=5,
),
)
# Sell 5 slots at the same price, assert cleared positions
_run_test_and_check(
partial(
_async_main,
open_test_pikerd_and_ems=open_test_pikerd_and_ems,
action='sell',
executions=5,
price=1,
assert_zeroed_pps=True,
),
)
# def test_paper_client(open_test_pikerd: AsyncContextManager):
# async def _async_main(
# open_pikerd: AsyncContextManager,
# ):
# (fqsn, symbol, broker) = get_fqsn("kraken", "xbtusdt")
# async with (
# open_pikerd() as (_, _, _, services),
# open_ems(fqsn, mode="paper") as (
# book,
# trades_stream,
# pps,
# accounts,
# dialogs,
# ),
# ):
# # async with open_cached_client(broker) as client:
# # symbol_info = await client.symbol_info()
# # print(f'client: {symbol_info['XBTUSDT']}')
# with (open_pps(broker, "piker-paper") as table,):
# print(f"table: {table}")
#
# trio.run(
# partial(
# _async_main,
# open_pikerd=open_test_pikerd,
# ),
# )