Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet f30d866710 Rename fqsn -> fqme in feeds tests 2023-04-14 16:00:40 -04:00
Tyler Goodlet 1bd4fd80f8 `ib`: rejects their own fractional size tick..
Frickin ib, they give you the `0.001` (or wtv) in the
`ContractDetails.minSize: float` but won't accept fractional sizes
through the API.. Either way, it's probably not sane to be supporting
fractional order sizes for legacy instruments by default especially
since it in theory affects a lot of the clearing outcomes by having ib
do wtv magical junk behind the scenes to make it work..
2023-04-14 16:00:40 -04:00
Tyler Goodlet 65e42b79ac Rename ems test mod 2023-04-14 16:00:40 -04:00
Tyler Goodlet 2f5c456d4b More explicit test mod docstring 2023-04-14 16:00:40 -04:00
Tyler Goodlet 7786ffa889 Another fqsn -> fqme rename 2023-04-14 16:00:40 -04:00
Tyler Goodlet 3edc95dda0 Quantize order prices prior to `OrderClient.send()`
Order mode previously was just willy-nilly sending `float` prices
(particularly on order edits) which are generated from the associated
level line. This actually uses the `MktPair.price_tick: Decimal` to
ensure the value is rounded correctly before submission to the ems..

Also adjusts the order mode init to expect a table of tables of startup
position messages, with the inner table being keyed by fqme per msg.
2023-04-14 16:00:40 -04:00
Tyler Goodlet ecd7500ee6 Link `tractor` debug mode to `pytest` --pdb flag 2023-04-14 16:00:40 -04:00
Tyler Goodlet 114e7660aa Fix bad-fqme test, adjust prices based on buy/sell 2023-04-14 16:00:40 -04:00
Tyler Goodlet d29e9eeb31 Only flip size sign for seels if not already -ve 2023-04-14 16:00:40 -04:00
Tyler Goodlet 69d6e9bb4e Fix zero-pp entry to toml case for new file-per-account format 2023-04-14 16:00:40 -04:00
8 changed files with 125 additions and 58 deletions

View File

@ -637,8 +637,8 @@ class PpTable(Struct):
active, closed = self.dump_active()
# ONLY dict-serialize all active positions; those that are closed
# we don't store in the ``pps.toml``.
# ONLY dict-serialize all active positions; those that are
# closed we don't store in the ``pps.toml``.
to_toml_dict = {}
for bs_mktid, pos in active.items():
@ -688,13 +688,12 @@ class PpTable(Struct):
self.conf.update(pp_entries)
elif (
self.brokername in self.conf and
self.acctid in self.conf[self.brokername]
):
del self.conf[self.brokername][self.acctid]
if len(self.conf[self.brokername]) == 0:
del self.conf[self.brokername]
# if there are no active position entries according
# to the toml dump output above, then clear the config
# file of all entries.
elif self.conf:
for entry in list(self.conf):
del self.conf[entry]
# TODO: why tf haven't they already done this for inline
# tables smh..

View File

@ -619,7 +619,7 @@ async def _setup_quote_stream(
async def open_aio_quote_stream(
symbol: str,
contract: Optional[Contract] = None,
contract: Contract | None = None,
) -> trio.abc.ReceiveStream:
@ -741,9 +741,9 @@ async def stream_quotes(
try:
(
con,
first_ticker,
details,
con, # Contract
first_ticker, # Ticker
details, # ContractDetails
) = await proxy.get_sym_details(symbol=sym)
except ConnectionError:
log.exception(f'Proxy is ded {proxy._aio_ns}')
@ -759,6 +759,7 @@ async def stream_quotes(
'''
# pass back some symbol info like min_tick, trading_hours, etc.
con: Contract = details.contract
syminfo = asdict(details)
syminfo.update(syminfo['contract'])
@ -785,6 +786,11 @@ async def stream_quotes(
price_tick: Decimal = Decimal(str(syminfo['minTick']))
size_tick: Decimal = Decimal(str(syminfo['minSize']).rstrip('0'))
# XXX: GRRRR they don't support fractional share sizes for
# stocks from the API?!
if con.secType == 'STK':
size_tick = Decimal('1')
syminfo['price_tick_size'] = price_tick
# NOTE: as you'd expect for "legacy" assets, the "volume
# precision" is normally discreet.

View File

@ -124,7 +124,10 @@ class PaperBoi(Struct):
# in the broker trades event processing loop
await trio.sleep(0.05)
if action == 'sell':
if (
action == 'sell'
and size > 0
):
size = -size
msg = BrokerdStatus(

View File

@ -941,7 +941,7 @@ class BackendInitMsg(Struct, frozen=True):
from each backend broker/data provider.
'''
fqsn: str
fqme: str
symbol_info: dict | None = None
mkt_info: MktPair | None = None
shm_write_opts: dict[str, Any] | None = None
@ -1284,7 +1284,9 @@ async def open_feed_bus(
# sync feed subscribers with flume handles
await ctx.started(
{fqsn: flume.to_msg() for fqsn, flume in flumes.items()}
{fqsn: flume.to_msg()
for fqsn, flume in flumes.items()
}
)
if not start_stream:

View File

@ -289,6 +289,20 @@ class OrderMode:
symbol = self.chart.linked.symbol
# NOTE : we could also use instead,
# symbol.quantize(price, quantity_type='price')
# but it returns a Decimal and it's probably gonna
# be slower?
# TODO: should we be enforcing this precision
# at a different layer in the stack? right now
# any precision error will literally be relayed
# all the way back from the backend.
price = round(
price,
ndigits=symbol.tick_size_digits,
)
order = self._staged_order = Order(
action=action,
price=price,
@ -359,7 +373,7 @@ class OrderMode:
# NOTE: we have to str-ify `MktPair` first since we can't
# cast to it without being mega explicit with
# `msgspec.Struct`, which we're not yet..
order = staged.copy({
order: Order = staged.copy({
'symbol': str(staged.symbol),
'oid': oid,
})
@ -436,8 +450,17 @@ class OrderMode:
line: LevelLine,
) -> None:
'''
Retreive the level line's end state, compute the size
and price for the new price-level, send an update msg to
the EMS, adjust mirrored level line on secondary chart.
level = line.value()
'''
mktinfo = self.chart.linked.symbol
level = round(
line.value(),
ndigits=mktinfo.tick_size_digits,
)
# updated by level change callback set in ``.new_line_from_order()``
dialog = line.dialog
size = dialog.order.size
@ -689,7 +712,7 @@ async def open_order_mode(
# symbol names (i.e. the same names you'd get back in search
# results) in order for position msgs to correctly trigger the
# display of a position indicator on screen.
position_msgs: dict[str, list[BrokerdPosition]]
position_msgs: dict[str, dict[str, BrokerdPosition]]
# spawn EMS actor-service
async with (
@ -872,8 +895,11 @@ async def open_order_mode(
# Pack position messages by account, should only be one-to-one.
# NOTE: requires the backend exactly specifies
# the expected symbol key in its positions msg.
for (broker, acctid), msgs in position_msgs.items():
for msg in msgs:
for (
(broker, acctid),
pps_by_fqme
) in position_msgs.items():
for msg in pps_by_fqme.values():
await process_trade_msg(
mode,
client,

View File

@ -87,8 +87,11 @@ def log(
@acm
async def _open_test_pikerd(
tmpconfdir: str,
reg_addr: tuple[str, int] | None = None,
loglevel: str = 'warning',
debug_mode: bool = False,
**kwargs,
) -> tuple[
@ -122,7 +125,7 @@ async def _open_test_pikerd(
# or just in sequence per test, so we keep root.
drop_root_perms_for_ahab=False,
debug_mode=True,
debug_mode=debug_mode,
**kwargs,
@ -178,6 +181,8 @@ def open_test_pikerd(
# bind in level from fixture, which is itself set by
# `--ll <value>` cli flag.
loglevel=loglevel,
debug_mode=request.config.option.usepdb
)
# NOTE: the `tmp_dir` fixture will wipe any files older then 3 test

View File

@ -1,5 +1,13 @@
'''
Paper-mode testing
Execution mgmt system (EMS) e2e testing.
Most tests leverage our paper clearing engine found (currently) in
``piker.clearing._paper_engine`.
Ideally in the longer run we are able to support forms of (non-clearing)
live order tests against certain backends that make it possible to do
so..
'''
from contextlib import (
contextmanager as cm,
@ -159,21 +167,27 @@ def load_and_check_pos(
yield pp
@pytest.mark.trio
async def test_ems_err_on_bad_broker(
def test_ems_err_on_bad_broker(
open_test_pikerd: Services,
loglevel: str,
):
async def load_bad_fqme():
try:
async with open_ems(
'doggy.smiles',
async with (
open_test_pikerd() as (_, _, _, services),
open_ems(
'doggycoin.doggy',
mode='paper',
loglevel=loglevel,
) as _:
) as _
):
pytest.fail('EMS is working on non-broker!?')
except ModuleNotFoundError:
pass
run_and_tollerate_cancels(load_bad_fqme)
async def match_ppmsgs_on_ems_boot(
ppmsgs: list[BrokerdPosition],
@ -264,12 +278,14 @@ async def submit_and_check(
od: dict
for od in fills:
print(f'Sending order {od} for fill')
size = od['size']
sent, msgs = await order_and_and_wait_for_ppmsg(
client,
trades_stream,
fqme,
action='buy',
size=od['size'],
action='buy' if size > 0 else 'sell',
price=100e3 if size > 0 else 0,
size=size,
)
last_order: Order = sent[-1]
@ -300,7 +316,8 @@ async def submit_and_check(
{'size': 0.001},
),
# multi-partial entry and exits.
# multi-partial entry and exits from net-zero, to short and back
# to net-zero.
(
# enters
{'size': 0.001},
@ -314,10 +331,18 @@ async def submit_and_check(
{'size': 0.001},
{'size': 0.002},
# exits to get back to zero.
# nearly back to zero.
{'size': -0.001},
# switch to net-short
{'size': -0.025},
{'size': -0.0195},
# another entry
{'size': 0.001},
# final cover to net-zero again.
{'size': 0.038},
),
],
ids='fills={}'.format,
@ -328,7 +353,7 @@ def test_multi_fill_positions(
fills: tuple[dict],
check_cross_session: bool = True,
check_cross_session: bool = False,
) -> None:

View File

@ -13,13 +13,14 @@ from piker.data import (
ShmArray,
open_feed,
)
from piker.data.flows import Flume
from piker.accounting._mktinfo import (
unpack_fqsn,
)
@pytest.mark.parametrize(
'fqsns',
'fqmes',
[
# binance
(100, {'btcusdt.binance', 'ethusdt.binance'}, False),
@ -30,20 +31,20 @@ from piker.accounting._mktinfo import (
# binance + kraken
(100, {'btcusdt.binance', 'xbtusd.kraken'}, False),
],
ids=lambda param: f'quotes={param[0]}@fqsns={param[1]}',
ids=lambda param: f'quotes={param[0]}@fqmes={param[1]}',
)
def test_multi_fqsn_feed(
open_test_pikerd: AsyncContextManager,
fqsns: set[str],
fqmes: set[str],
loglevel: str,
ci_env: bool
):
'''
Start a real-time data feed for provided fqsn and pull
Start a real-time data feed for provided fqme and pull
a few quotes then simply shut down.
'''
max_quotes, fqsns, run_in_ci = fqsns
max_quotes, fqmes, run_in_ci = fqmes
if (
ci_env
@ -52,15 +53,15 @@ def test_multi_fqsn_feed(
pytest.skip('Skipping CI disabled test due to feed restrictions')
brokers = set()
for fqsn in fqsns:
brokername, key, suffix = unpack_fqsn(fqsn)
for fqme in fqmes:
brokername, key, suffix = unpack_fqsn(fqme)
brokers.add(brokername)
async def main():
async with (
open_test_pikerd(),
open_feed(
fqsns,
fqmes,
loglevel=loglevel,
# TODO: ensure throttle rate is applied
@ -71,20 +72,20 @@ def test_multi_fqsn_feed(
) as feed
):
# verify shm buffers exist
for fqin in fqsns:
for fqin in fqmes:
flume = feed.flumes[fqin]
ohlcv: ShmArray = flume.rt_shm
hist_ohlcv: ShmArray = flume.hist_shm
async with feed.open_multi_stream(brokers) as stream:
# pull the first startup quotes, one for each fqsn, and
# pull the first startup quotes, one for each fqme, and
# ensure they match each flume's startup quote value.
fqsns_copy = fqsns.copy()
fqsns_copy = fqmes.copy()
with trio.fail_after(0.5):
for _ in range(1):
first_quotes = await stream.receive()
for fqsn, quote in first_quotes.items():
for fqme, quote in first_quotes.items():
# XXX: TODO: WTF apparently this error will get
# supressed and only show up in the teardown
@ -92,18 +93,18 @@ def test_multi_fqsn_feed(
# <tractorbugurl>
# assert 0
fqsns_copy.remove(fqsn)
flume = feed.flumes[fqsn]
fqsns_copy.remove(fqme)
flume: Flume = feed.flumes[fqme]
assert quote['last'] == flume.first_quote['last']
cntr = Counter()
with trio.fail_after(6):
async for quotes in stream:
for fqsn, quote in quotes.items():
cntr[fqsn] += 1
for fqme, quote in quotes.items():
cntr[fqme] += 1
# await tractor.breakpoint()
flume = feed.flumes[fqsn]
flume = feed.flumes[fqme]
ohlcv: ShmArray = flume.rt_shm
hist_ohlcv: ShmArray = flume.hist_shm
@ -116,7 +117,7 @@ def test_multi_fqsn_feed(
# assert last == rt_row['close']
# assert last == hist_row['close']
pprint(
f'{fqsn}: {quote}\n'
f'{fqme}: {quote}\n'
f'rt_ohlc: {rt_row}\n'
f'hist_ohlc: {hist_row}\n'
)
@ -124,6 +125,6 @@ def test_multi_fqsn_feed(
if cntr.total() >= max_quotes:
break
assert set(cntr.keys()) == fqsns
assert set(cntr.keys()) == fqmes
trio.run(main)