Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet f30d866710 Rename fqsn -> fqme in feeds tests 2023-04-14 16:00:40 -04:00
Tyler Goodlet 1bd4fd80f8 `ib`: rejects their own fractional size tick..
Frickin ib, they give you the `0.001` (or wtv) in the
`ContractDetails.minSize: float` but won't accept fractional sizes
through the API.. Either way, it's probably not sane to be supporting
fractional order sizes for legacy instruments by default especially
since it in theory affects a lot of the clearing outcomes by having ib
do wtv magical junk behind the scenes to make it work..
2023-04-14 16:00:40 -04:00
Tyler Goodlet 65e42b79ac Rename ems test mod 2023-04-14 16:00:40 -04:00
Tyler Goodlet 2f5c456d4b More explicit test mod docstring 2023-04-14 16:00:40 -04:00
Tyler Goodlet 7786ffa889 Another fqsn -> fqme rename 2023-04-14 16:00:40 -04:00
Tyler Goodlet 3edc95dda0 Quantize order prices prior to `OrderClient.send()`
Order mode previously was just willy-nilly sending `float` prices
(particularly on order edits) which are generated from the associated
level line. This actually uses the `MktPair.price_tick: Decimal` to
ensure the value is rounded correctly before submission to the ems..

Also adjusts the order mode init to expect a table of tables of startup
position messages, with the inner table being keyed by fqme per msg.
2023-04-14 16:00:40 -04:00
Tyler Goodlet ecd7500ee6 Link `tractor` debug mode to `pytest` --pdb flag 2023-04-14 16:00:40 -04:00
Tyler Goodlet 114e7660aa Fix bad-fqme test, adjust prices based on buy/sell 2023-04-14 16:00:40 -04:00
Tyler Goodlet d29e9eeb31 Only flip size sign for seels if not already -ve 2023-04-14 16:00:40 -04:00
Tyler Goodlet 69d6e9bb4e Fix zero-pp entry to toml case for new file-per-account format 2023-04-14 16:00:40 -04:00
8 changed files with 125 additions and 58 deletions

View File

@ -637,8 +637,8 @@ class PpTable(Struct):
active, closed = self.dump_active() active, closed = self.dump_active()
# ONLY dict-serialize all active positions; those that are closed # ONLY dict-serialize all active positions; those that are
# we don't store in the ``pps.toml``. # closed we don't store in the ``pps.toml``.
to_toml_dict = {} to_toml_dict = {}
for bs_mktid, pos in active.items(): for bs_mktid, pos in active.items():
@ -688,13 +688,12 @@ class PpTable(Struct):
self.conf.update(pp_entries) self.conf.update(pp_entries)
elif ( # if there are no active position entries according
self.brokername in self.conf and # to the toml dump output above, then clear the config
self.acctid in self.conf[self.brokername] # file of all entries.
): elif self.conf:
del self.conf[self.brokername][self.acctid] for entry in list(self.conf):
if len(self.conf[self.brokername]) == 0: del self.conf[entry]
del self.conf[self.brokername]
# TODO: why tf haven't they already done this for inline # TODO: why tf haven't they already done this for inline
# tables smh.. # tables smh..

View File

@ -619,7 +619,7 @@ async def _setup_quote_stream(
async def open_aio_quote_stream( async def open_aio_quote_stream(
symbol: str, symbol: str,
contract: Optional[Contract] = None, contract: Contract | None = None,
) -> trio.abc.ReceiveStream: ) -> trio.abc.ReceiveStream:
@ -741,9 +741,9 @@ async def stream_quotes(
try: try:
( (
con, con, # Contract
first_ticker, first_ticker, # Ticker
details, details, # ContractDetails
) = await proxy.get_sym_details(symbol=sym) ) = await proxy.get_sym_details(symbol=sym)
except ConnectionError: except ConnectionError:
log.exception(f'Proxy is ded {proxy._aio_ns}') log.exception(f'Proxy is ded {proxy._aio_ns}')
@ -759,6 +759,7 @@ async def stream_quotes(
''' '''
# pass back some symbol info like min_tick, trading_hours, etc. # pass back some symbol info like min_tick, trading_hours, etc.
con: Contract = details.contract
syminfo = asdict(details) syminfo = asdict(details)
syminfo.update(syminfo['contract']) syminfo.update(syminfo['contract'])
@ -785,6 +786,11 @@ async def stream_quotes(
price_tick: Decimal = Decimal(str(syminfo['minTick'])) price_tick: Decimal = Decimal(str(syminfo['minTick']))
size_tick: Decimal = Decimal(str(syminfo['minSize']).rstrip('0')) size_tick: Decimal = Decimal(str(syminfo['minSize']).rstrip('0'))
# XXX: GRRRR they don't support fractional share sizes for
# stocks from the API?!
if con.secType == 'STK':
size_tick = Decimal('1')
syminfo['price_tick_size'] = price_tick syminfo['price_tick_size'] = price_tick
# NOTE: as you'd expect for "legacy" assets, the "volume # NOTE: as you'd expect for "legacy" assets, the "volume
# precision" is normally discreet. # precision" is normally discreet.

View File

@ -124,7 +124,10 @@ class PaperBoi(Struct):
# in the broker trades event processing loop # in the broker trades event processing loop
await trio.sleep(0.05) await trio.sleep(0.05)
if action == 'sell': if (
action == 'sell'
and size > 0
):
size = -size size = -size
msg = BrokerdStatus( msg = BrokerdStatus(

View File

@ -941,7 +941,7 @@ class BackendInitMsg(Struct, frozen=True):
from each backend broker/data provider. from each backend broker/data provider.
''' '''
fqsn: str fqme: str
symbol_info: dict | None = None symbol_info: dict | None = None
mkt_info: MktPair | None = None mkt_info: MktPair | None = None
shm_write_opts: dict[str, Any] | None = None shm_write_opts: dict[str, Any] | None = None
@ -1284,7 +1284,9 @@ async def open_feed_bus(
# sync feed subscribers with flume handles # sync feed subscribers with flume handles
await ctx.started( await ctx.started(
{fqsn: flume.to_msg() for fqsn, flume in flumes.items()} {fqsn: flume.to_msg()
for fqsn, flume in flumes.items()
}
) )
if not start_stream: if not start_stream:

View File

@ -289,6 +289,20 @@ class OrderMode:
symbol = self.chart.linked.symbol symbol = self.chart.linked.symbol
# NOTE : we could also use instead,
# symbol.quantize(price, quantity_type='price')
# but it returns a Decimal and it's probably gonna
# be slower?
# TODO: should we be enforcing this precision
# at a different layer in the stack? right now
# any precision error will literally be relayed
# all the way back from the backend.
price = round(
price,
ndigits=symbol.tick_size_digits,
)
order = self._staged_order = Order( order = self._staged_order = Order(
action=action, action=action,
price=price, price=price,
@ -359,7 +373,7 @@ class OrderMode:
# NOTE: we have to str-ify `MktPair` first since we can't # NOTE: we have to str-ify `MktPair` first since we can't
# cast to it without being mega explicit with # cast to it without being mega explicit with
# `msgspec.Struct`, which we're not yet.. # `msgspec.Struct`, which we're not yet..
order = staged.copy({ order: Order = staged.copy({
'symbol': str(staged.symbol), 'symbol': str(staged.symbol),
'oid': oid, 'oid': oid,
}) })
@ -436,8 +450,17 @@ class OrderMode:
line: LevelLine, line: LevelLine,
) -> None: ) -> None:
'''
Retreive the level line's end state, compute the size
and price for the new price-level, send an update msg to
the EMS, adjust mirrored level line on secondary chart.
level = line.value() '''
mktinfo = self.chart.linked.symbol
level = round(
line.value(),
ndigits=mktinfo.tick_size_digits,
)
# updated by level change callback set in ``.new_line_from_order()`` # updated by level change callback set in ``.new_line_from_order()``
dialog = line.dialog dialog = line.dialog
size = dialog.order.size size = dialog.order.size
@ -689,7 +712,7 @@ async def open_order_mode(
# symbol names (i.e. the same names you'd get back in search # symbol names (i.e. the same names you'd get back in search
# results) in order for position msgs to correctly trigger the # results) in order for position msgs to correctly trigger the
# display of a position indicator on screen. # display of a position indicator on screen.
position_msgs: dict[str, list[BrokerdPosition]] position_msgs: dict[str, dict[str, BrokerdPosition]]
# spawn EMS actor-service # spawn EMS actor-service
async with ( async with (
@ -872,8 +895,11 @@ async def open_order_mode(
# Pack position messages by account, should only be one-to-one. # Pack position messages by account, should only be one-to-one.
# NOTE: requires the backend exactly specifies # NOTE: requires the backend exactly specifies
# the expected symbol key in its positions msg. # the expected symbol key in its positions msg.
for (broker, acctid), msgs in position_msgs.items(): for (
for msg in msgs: (broker, acctid),
pps_by_fqme
) in position_msgs.items():
for msg in pps_by_fqme.values():
await process_trade_msg( await process_trade_msg(
mode, mode,
client, client,

View File

@ -87,8 +87,11 @@ def log(
@acm @acm
async def _open_test_pikerd( async def _open_test_pikerd(
tmpconfdir: str, tmpconfdir: str,
reg_addr: tuple[str, int] | None = None, reg_addr: tuple[str, int] | None = None,
loglevel: str = 'warning', loglevel: str = 'warning',
debug_mode: bool = False,
**kwargs, **kwargs,
) -> tuple[ ) -> tuple[
@ -122,7 +125,7 @@ async def _open_test_pikerd(
# or just in sequence per test, so we keep root. # or just in sequence per test, so we keep root.
drop_root_perms_for_ahab=False, drop_root_perms_for_ahab=False,
debug_mode=True, debug_mode=debug_mode,
**kwargs, **kwargs,
@ -178,6 +181,8 @@ def open_test_pikerd(
# bind in level from fixture, which is itself set by # bind in level from fixture, which is itself set by
# `--ll <value>` cli flag. # `--ll <value>` cli flag.
loglevel=loglevel, loglevel=loglevel,
debug_mode=request.config.option.usepdb
) )
# NOTE: the `tmp_dir` fixture will wipe any files older then 3 test # NOTE: the `tmp_dir` fixture will wipe any files older then 3 test

View File

@ -1,5 +1,13 @@
''' '''
Paper-mode testing Execution mgmt system (EMS) e2e testing.
Most tests leverage our paper clearing engine found (currently) in
``piker.clearing._paper_engine`.
Ideally in the longer run we are able to support forms of (non-clearing)
live order tests against certain backends that make it possible to do
so..
''' '''
from contextlib import ( from contextlib import (
contextmanager as cm, contextmanager as cm,
@ -159,21 +167,27 @@ def load_and_check_pos(
yield pp yield pp
@pytest.mark.trio def test_ems_err_on_bad_broker(
async def test_ems_err_on_bad_broker(
open_test_pikerd: Services, open_test_pikerd: Services,
loglevel: str, loglevel: str,
): ):
async def load_bad_fqme():
try: try:
async with open_ems( async with (
'doggy.smiles', open_test_pikerd() as (_, _, _, services),
open_ems(
'doggycoin.doggy',
mode='paper', mode='paper',
loglevel=loglevel, loglevel=loglevel,
) as _: ) as _
):
pytest.fail('EMS is working on non-broker!?') pytest.fail('EMS is working on non-broker!?')
except ModuleNotFoundError: except ModuleNotFoundError:
pass pass
run_and_tollerate_cancels(load_bad_fqme)
async def match_ppmsgs_on_ems_boot( async def match_ppmsgs_on_ems_boot(
ppmsgs: list[BrokerdPosition], ppmsgs: list[BrokerdPosition],
@ -264,12 +278,14 @@ async def submit_and_check(
od: dict od: dict
for od in fills: for od in fills:
print(f'Sending order {od} for fill') print(f'Sending order {od} for fill')
size = od['size']
sent, msgs = await order_and_and_wait_for_ppmsg( sent, msgs = await order_and_and_wait_for_ppmsg(
client, client,
trades_stream, trades_stream,
fqme, fqme,
action='buy', action='buy' if size > 0 else 'sell',
size=od['size'], price=100e3 if size > 0 else 0,
size=size,
) )
last_order: Order = sent[-1] last_order: Order = sent[-1]
@ -300,7 +316,8 @@ async def submit_and_check(
{'size': 0.001}, {'size': 0.001},
), ),
# multi-partial entry and exits. # multi-partial entry and exits from net-zero, to short and back
# to net-zero.
( (
# enters # enters
{'size': 0.001}, {'size': 0.001},
@ -314,10 +331,18 @@ async def submit_and_check(
{'size': 0.001}, {'size': 0.001},
{'size': 0.002}, {'size': 0.002},
# exits to get back to zero. # nearly back to zero.
{'size': -0.001}, {'size': -0.001},
# switch to net-short
{'size': -0.025}, {'size': -0.025},
{'size': -0.0195}, {'size': -0.0195},
# another entry
{'size': 0.001},
# final cover to net-zero again.
{'size': 0.038},
), ),
], ],
ids='fills={}'.format, ids='fills={}'.format,
@ -328,7 +353,7 @@ def test_multi_fill_positions(
fills: tuple[dict], fills: tuple[dict],
check_cross_session: bool = True, check_cross_session: bool = False,
) -> None: ) -> None:

View File

@ -13,13 +13,14 @@ from piker.data import (
ShmArray, ShmArray,
open_feed, open_feed,
) )
from piker.data.flows import Flume
from piker.accounting._mktinfo import ( from piker.accounting._mktinfo import (
unpack_fqsn, unpack_fqsn,
) )
@pytest.mark.parametrize( @pytest.mark.parametrize(
'fqsns', 'fqmes',
[ [
# binance # binance
(100, {'btcusdt.binance', 'ethusdt.binance'}, False), (100, {'btcusdt.binance', 'ethusdt.binance'}, False),
@ -30,20 +31,20 @@ from piker.accounting._mktinfo import (
# binance + kraken # binance + kraken
(100, {'btcusdt.binance', 'xbtusd.kraken'}, False), (100, {'btcusdt.binance', 'xbtusd.kraken'}, False),
], ],
ids=lambda param: f'quotes={param[0]}@fqsns={param[1]}', ids=lambda param: f'quotes={param[0]}@fqmes={param[1]}',
) )
def test_multi_fqsn_feed( def test_multi_fqsn_feed(
open_test_pikerd: AsyncContextManager, open_test_pikerd: AsyncContextManager,
fqsns: set[str], fqmes: set[str],
loglevel: str, loglevel: str,
ci_env: bool ci_env: bool
): ):
''' '''
Start a real-time data feed for provided fqsn and pull Start a real-time data feed for provided fqme and pull
a few quotes then simply shut down. a few quotes then simply shut down.
''' '''
max_quotes, fqsns, run_in_ci = fqsns max_quotes, fqmes, run_in_ci = fqmes
if ( if (
ci_env ci_env
@ -52,15 +53,15 @@ def test_multi_fqsn_feed(
pytest.skip('Skipping CI disabled test due to feed restrictions') pytest.skip('Skipping CI disabled test due to feed restrictions')
brokers = set() brokers = set()
for fqsn in fqsns: for fqme in fqmes:
brokername, key, suffix = unpack_fqsn(fqsn) brokername, key, suffix = unpack_fqsn(fqme)
brokers.add(brokername) brokers.add(brokername)
async def main(): async def main():
async with ( async with (
open_test_pikerd(), open_test_pikerd(),
open_feed( open_feed(
fqsns, fqmes,
loglevel=loglevel, loglevel=loglevel,
# TODO: ensure throttle rate is applied # TODO: ensure throttle rate is applied
@ -71,20 +72,20 @@ def test_multi_fqsn_feed(
) as feed ) as feed
): ):
# verify shm buffers exist # verify shm buffers exist
for fqin in fqsns: for fqin in fqmes:
flume = feed.flumes[fqin] flume = feed.flumes[fqin]
ohlcv: ShmArray = flume.rt_shm ohlcv: ShmArray = flume.rt_shm
hist_ohlcv: ShmArray = flume.hist_shm hist_ohlcv: ShmArray = flume.hist_shm
async with feed.open_multi_stream(brokers) as stream: async with feed.open_multi_stream(brokers) as stream:
# pull the first startup quotes, one for each fqsn, and # pull the first startup quotes, one for each fqme, and
# ensure they match each flume's startup quote value. # ensure they match each flume's startup quote value.
fqsns_copy = fqsns.copy() fqsns_copy = fqmes.copy()
with trio.fail_after(0.5): with trio.fail_after(0.5):
for _ in range(1): for _ in range(1):
first_quotes = await stream.receive() first_quotes = await stream.receive()
for fqsn, quote in first_quotes.items(): for fqme, quote in first_quotes.items():
# XXX: TODO: WTF apparently this error will get # XXX: TODO: WTF apparently this error will get
# supressed and only show up in the teardown # supressed and only show up in the teardown
@ -92,18 +93,18 @@ def test_multi_fqsn_feed(
# <tractorbugurl> # <tractorbugurl>
# assert 0 # assert 0
fqsns_copy.remove(fqsn) fqsns_copy.remove(fqme)
flume = feed.flumes[fqsn] flume: Flume = feed.flumes[fqme]
assert quote['last'] == flume.first_quote['last'] assert quote['last'] == flume.first_quote['last']
cntr = Counter() cntr = Counter()
with trio.fail_after(6): with trio.fail_after(6):
async for quotes in stream: async for quotes in stream:
for fqsn, quote in quotes.items(): for fqme, quote in quotes.items():
cntr[fqsn] += 1 cntr[fqme] += 1
# await tractor.breakpoint() # await tractor.breakpoint()
flume = feed.flumes[fqsn] flume = feed.flumes[fqme]
ohlcv: ShmArray = flume.rt_shm ohlcv: ShmArray = flume.rt_shm
hist_ohlcv: ShmArray = flume.hist_shm hist_ohlcv: ShmArray = flume.hist_shm
@ -116,7 +117,7 @@ def test_multi_fqsn_feed(
# assert last == rt_row['close'] # assert last == rt_row['close']
# assert last == hist_row['close'] # assert last == hist_row['close']
pprint( pprint(
f'{fqsn}: {quote}\n' f'{fqme}: {quote}\n'
f'rt_ohlc: {rt_row}\n' f'rt_ohlc: {rt_row}\n'
f'hist_ohlc: {hist_row}\n' f'hist_ohlc: {hist_row}\n'
) )
@ -124,6 +125,6 @@ def test_multi_fqsn_feed(
if cntr.total() >= max_quotes: if cntr.total() >= max_quotes:
break break
assert set(cntr.keys()) == fqsns assert set(cntr.keys()) == fqmes
trio.run(main) trio.run(main)