Compare commits

..

No commits in common. "f30d8667103940cdc9e071219a0f8a481ae84d80" and "81e9a990bfb8ca5fd29b05734ea4629a764195d5" have entirely different histories.

8 changed files with 58 additions and 125 deletions

View File

@ -637,8 +637,8 @@ class PpTable(Struct):
active, closed = self.dump_active()
# ONLY dict-serialize all active positions; those that are
# closed we don't store in the ``pps.toml``.
# ONLY dict-serialize all active positions; those that are closed
# we don't store in the ``pps.toml``.
to_toml_dict = {}
for bs_mktid, pos in active.items():
@ -688,12 +688,13 @@ class PpTable(Struct):
self.conf.update(pp_entries)
# if there are no active position entries according
# to the toml dump output above, then clear the config
# file of all entries.
elif self.conf:
for entry in list(self.conf):
del self.conf[entry]
elif (
self.brokername in self.conf and
self.acctid in self.conf[self.brokername]
):
del self.conf[self.brokername][self.acctid]
if len(self.conf[self.brokername]) == 0:
del self.conf[self.brokername]
# TODO: why tf haven't they already done this for inline
# tables smh..

View File

@ -619,7 +619,7 @@ async def _setup_quote_stream(
async def open_aio_quote_stream(
symbol: str,
contract: Contract | None = None,
contract: Optional[Contract] = None,
) -> trio.abc.ReceiveStream:
@ -741,9 +741,9 @@ async def stream_quotes(
try:
(
con, # Contract
first_ticker, # Ticker
details, # ContractDetails
con,
first_ticker,
details,
) = await proxy.get_sym_details(symbol=sym)
except ConnectionError:
log.exception(f'Proxy is ded {proxy._aio_ns}')
@ -759,7 +759,6 @@ async def stream_quotes(
'''
# pass back some symbol info like min_tick, trading_hours, etc.
con: Contract = details.contract
syminfo = asdict(details)
syminfo.update(syminfo['contract'])
@ -786,11 +785,6 @@ async def stream_quotes(
price_tick: Decimal = Decimal(str(syminfo['minTick']))
size_tick: Decimal = Decimal(str(syminfo['minSize']).rstrip('0'))
# XXX: GRRRR they don't support fractional share sizes for
# stocks from the API?!
if con.secType == 'STK':
size_tick = Decimal('1')
syminfo['price_tick_size'] = price_tick
# NOTE: as you'd expect for "legacy" assets, the "volume
# precision" is normally discreet.

View File

@ -124,10 +124,7 @@ class PaperBoi(Struct):
# in the broker trades event processing loop
await trio.sleep(0.05)
if (
action == 'sell'
and size > 0
):
if action == 'sell':
size = -size
msg = BrokerdStatus(

View File

@ -941,7 +941,7 @@ class BackendInitMsg(Struct, frozen=True):
from each backend broker/data provider.
'''
fqme: str
fqsn: str
symbol_info: dict | None = None
mkt_info: MktPair | None = None
shm_write_opts: dict[str, Any] | None = None
@ -1284,9 +1284,7 @@ async def open_feed_bus(
# sync feed subscribers with flume handles
await ctx.started(
{fqsn: flume.to_msg()
for fqsn, flume in flumes.items()
}
{fqsn: flume.to_msg() for fqsn, flume in flumes.items()}
)
if not start_stream:

View File

@ -289,20 +289,6 @@ class OrderMode:
symbol = self.chart.linked.symbol
# NOTE : we could also use instead,
# symbol.quantize(price, quantity_type='price')
# but it returns a Decimal and it's probably gonna
# be slower?
# TODO: should we be enforcing this precision
# at a different layer in the stack? right now
# any precision error will literally be relayed
# all the way back from the backend.
price = round(
price,
ndigits=symbol.tick_size_digits,
)
order = self._staged_order = Order(
action=action,
price=price,
@ -373,7 +359,7 @@ class OrderMode:
# NOTE: we have to str-ify `MktPair` first since we can't
# cast to it without being mega explicit with
# `msgspec.Struct`, which we're not yet..
order: Order = staged.copy({
order = staged.copy({
'symbol': str(staged.symbol),
'oid': oid,
})
@ -450,17 +436,8 @@ class OrderMode:
line: LevelLine,
) -> None:
'''
Retreive the level line's end state, compute the size
and price for the new price-level, send an update msg to
the EMS, adjust mirrored level line on secondary chart.
'''
mktinfo = self.chart.linked.symbol
level = round(
line.value(),
ndigits=mktinfo.tick_size_digits,
)
level = line.value()
# updated by level change callback set in ``.new_line_from_order()``
dialog = line.dialog
size = dialog.order.size
@ -712,7 +689,7 @@ async def open_order_mode(
# symbol names (i.e. the same names you'd get back in search
# results) in order for position msgs to correctly trigger the
# display of a position indicator on screen.
position_msgs: dict[str, dict[str, BrokerdPosition]]
position_msgs: dict[str, list[BrokerdPosition]]
# spawn EMS actor-service
async with (
@ -895,11 +872,8 @@ async def open_order_mode(
# Pack position messages by account, should only be one-to-one.
# NOTE: requires the backend exactly specifies
# the expected symbol key in its positions msg.
for (
(broker, acctid),
pps_by_fqme
) in position_msgs.items():
for msg in pps_by_fqme.values():
for (broker, acctid), msgs in position_msgs.items():
for msg in msgs:
await process_trade_msg(
mode,
client,

View File

@ -87,11 +87,8 @@ def log(
@acm
async def _open_test_pikerd(
tmpconfdir: str,
reg_addr: tuple[str, int] | None = None,
loglevel: str = 'warning',
debug_mode: bool = False,
**kwargs,
) -> tuple[
@ -125,7 +122,7 @@ async def _open_test_pikerd(
# or just in sequence per test, so we keep root.
drop_root_perms_for_ahab=False,
debug_mode=debug_mode,
debug_mode=True,
**kwargs,
@ -181,8 +178,6 @@ def open_test_pikerd(
# bind in level from fixture, which is itself set by
# `--ll <value>` cli flag.
loglevel=loglevel,
debug_mode=request.config.option.usepdb
)
# NOTE: the `tmp_dir` fixture will wipe any files older then 3 test

View File

@ -13,14 +13,13 @@ from piker.data import (
ShmArray,
open_feed,
)
from piker.data.flows import Flume
from piker.accounting._mktinfo import (
unpack_fqsn,
)
@pytest.mark.parametrize(
'fqmes',
'fqsns',
[
# binance
(100, {'btcusdt.binance', 'ethusdt.binance'}, False),
@ -31,20 +30,20 @@ from piker.accounting._mktinfo import (
# binance + kraken
(100, {'btcusdt.binance', 'xbtusd.kraken'}, False),
],
ids=lambda param: f'quotes={param[0]}@fqmes={param[1]}',
ids=lambda param: f'quotes={param[0]}@fqsns={param[1]}',
)
def test_multi_fqsn_feed(
open_test_pikerd: AsyncContextManager,
fqmes: set[str],
fqsns: set[str],
loglevel: str,
ci_env: bool
):
'''
Start a real-time data feed for provided fqme and pull
Start a real-time data feed for provided fqsn and pull
a few quotes then simply shut down.
'''
max_quotes, fqmes, run_in_ci = fqmes
max_quotes, fqsns, run_in_ci = fqsns
if (
ci_env
@ -53,15 +52,15 @@ def test_multi_fqsn_feed(
pytest.skip('Skipping CI disabled test due to feed restrictions')
brokers = set()
for fqme in fqmes:
brokername, key, suffix = unpack_fqsn(fqme)
for fqsn in fqsns:
brokername, key, suffix = unpack_fqsn(fqsn)
brokers.add(brokername)
async def main():
async with (
open_test_pikerd(),
open_feed(
fqmes,
fqsns,
loglevel=loglevel,
# TODO: ensure throttle rate is applied
@ -72,20 +71,20 @@ def test_multi_fqsn_feed(
) as feed
):
# verify shm buffers exist
for fqin in fqmes:
for fqin in fqsns:
flume = feed.flumes[fqin]
ohlcv: ShmArray = flume.rt_shm
hist_ohlcv: ShmArray = flume.hist_shm
async with feed.open_multi_stream(brokers) as stream:
# pull the first startup quotes, one for each fqme, and
# pull the first startup quotes, one for each fqsn, and
# ensure they match each flume's startup quote value.
fqsns_copy = fqmes.copy()
fqsns_copy = fqsns.copy()
with trio.fail_after(0.5):
for _ in range(1):
first_quotes = await stream.receive()
for fqme, quote in first_quotes.items():
for fqsn, quote in first_quotes.items():
# XXX: TODO: WTF apparently this error will get
# supressed and only show up in the teardown
@ -93,18 +92,18 @@ def test_multi_fqsn_feed(
# <tractorbugurl>
# assert 0
fqsns_copy.remove(fqme)
flume: Flume = feed.flumes[fqme]
fqsns_copy.remove(fqsn)
flume = feed.flumes[fqsn]
assert quote['last'] == flume.first_quote['last']
cntr = Counter()
with trio.fail_after(6):
async for quotes in stream:
for fqme, quote in quotes.items():
cntr[fqme] += 1
for fqsn, quote in quotes.items():
cntr[fqsn] += 1
# await tractor.breakpoint()
flume = feed.flumes[fqme]
flume = feed.flumes[fqsn]
ohlcv: ShmArray = flume.rt_shm
hist_ohlcv: ShmArray = flume.hist_shm
@ -117,7 +116,7 @@ def test_multi_fqsn_feed(
# assert last == rt_row['close']
# assert last == hist_row['close']
pprint(
f'{fqme}: {quote}\n'
f'{fqsn}: {quote}\n'
f'rt_ohlc: {rt_row}\n'
f'hist_ohlc: {hist_row}\n'
)
@ -125,6 +124,6 @@ def test_multi_fqsn_feed(
if cntr.total() >= max_quotes:
break
assert set(cntr.keys()) == fqmes
assert set(cntr.keys()) == fqsns
trio.run(main)

View File

@ -1,13 +1,5 @@
'''
Execution mgmt system (EMS) e2e testing.
Most tests leverage our paper clearing engine found (currently) in
``piker.clearing._paper_engine`.
Ideally in the longer run we are able to support forms of (non-clearing)
live order tests against certain backends that make it possible to do
so..
Paper-mode testing
'''
from contextlib import (
contextmanager as cm,
@ -167,26 +159,20 @@ def load_and_check_pos(
yield pp
def test_ems_err_on_bad_broker(
@pytest.mark.trio
async def test_ems_err_on_bad_broker(
open_test_pikerd: Services,
loglevel: str,
):
async def load_bad_fqme():
try:
async with (
open_test_pikerd() as (_, _, _, services),
open_ems(
'doggycoin.doggy',
mode='paper',
loglevel=loglevel,
) as _
):
pytest.fail('EMS is working on non-broker!?')
except ModuleNotFoundError:
pass
run_and_tollerate_cancels(load_bad_fqme)
try:
async with open_ems(
'doggy.smiles',
mode='paper',
loglevel=loglevel,
) as _:
pytest.fail('EMS is working on non-broker!?')
except ModuleNotFoundError:
pass
async def match_ppmsgs_on_ems_boot(
@ -278,14 +264,12 @@ async def submit_and_check(
od: dict
for od in fills:
print(f'Sending order {od} for fill')
size = od['size']
sent, msgs = await order_and_and_wait_for_ppmsg(
client,
trades_stream,
fqme,
action='buy' if size > 0 else 'sell',
price=100e3 if size > 0 else 0,
size=size,
action='buy',
size=od['size'],
)
last_order: Order = sent[-1]
@ -316,8 +300,7 @@ async def submit_and_check(
{'size': 0.001},
),
# multi-partial entry and exits from net-zero, to short and back
# to net-zero.
# multi-partial entry and exits.
(
# enters
{'size': 0.001},
@ -331,18 +314,10 @@ async def submit_and_check(
{'size': 0.001},
{'size': 0.002},
# nearly back to zero.
# exits to get back to zero.
{'size': -0.001},
# switch to net-short
{'size': -0.025},
{'size': -0.0195},
# another entry
{'size': 0.001},
# final cover to net-zero again.
{'size': 0.038},
),
],
ids='fills={}'.format,
@ -353,7 +328,7 @@ def test_multi_fill_positions(
fills: tuple[dict],
check_cross_session: bool = False,
check_cross_session: bool = True,
) -> None: