Compare commits

..

No commits in common. "f30d8667103940cdc9e071219a0f8a481ae84d80" and "81e9a990bfb8ca5fd29b05734ea4629a764195d5" have entirely different histories.

8 changed files with 58 additions and 125 deletions

View File

@ -637,8 +637,8 @@ class PpTable(Struct):
active, closed = self.dump_active() active, closed = self.dump_active()
# ONLY dict-serialize all active positions; those that are # ONLY dict-serialize all active positions; those that are closed
# closed we don't store in the ``pps.toml``. # we don't store in the ``pps.toml``.
to_toml_dict = {} to_toml_dict = {}
for bs_mktid, pos in active.items(): for bs_mktid, pos in active.items():
@ -688,12 +688,13 @@ class PpTable(Struct):
self.conf.update(pp_entries) self.conf.update(pp_entries)
# if there are no active position entries according elif (
# to the toml dump output above, then clear the config self.brokername in self.conf and
# file of all entries. self.acctid in self.conf[self.brokername]
elif self.conf: ):
for entry in list(self.conf): del self.conf[self.brokername][self.acctid]
del self.conf[entry] if len(self.conf[self.brokername]) == 0:
del self.conf[self.brokername]
# TODO: why tf haven't they already done this for inline # TODO: why tf haven't they already done this for inline
# tables smh.. # tables smh..

View File

@ -619,7 +619,7 @@ async def _setup_quote_stream(
async def open_aio_quote_stream( async def open_aio_quote_stream(
symbol: str, symbol: str,
contract: Contract | None = None, contract: Optional[Contract] = None,
) -> trio.abc.ReceiveStream: ) -> trio.abc.ReceiveStream:
@ -741,9 +741,9 @@ async def stream_quotes(
try: try:
( (
con, # Contract con,
first_ticker, # Ticker first_ticker,
details, # ContractDetails details,
) = await proxy.get_sym_details(symbol=sym) ) = await proxy.get_sym_details(symbol=sym)
except ConnectionError: except ConnectionError:
log.exception(f'Proxy is ded {proxy._aio_ns}') log.exception(f'Proxy is ded {proxy._aio_ns}')
@ -759,7 +759,6 @@ async def stream_quotes(
''' '''
# pass back some symbol info like min_tick, trading_hours, etc. # pass back some symbol info like min_tick, trading_hours, etc.
con: Contract = details.contract
syminfo = asdict(details) syminfo = asdict(details)
syminfo.update(syminfo['contract']) syminfo.update(syminfo['contract'])
@ -786,11 +785,6 @@ async def stream_quotes(
price_tick: Decimal = Decimal(str(syminfo['minTick'])) price_tick: Decimal = Decimal(str(syminfo['minTick']))
size_tick: Decimal = Decimal(str(syminfo['minSize']).rstrip('0')) size_tick: Decimal = Decimal(str(syminfo['minSize']).rstrip('0'))
# XXX: GRRRR they don't support fractional share sizes for
# stocks from the API?!
if con.secType == 'STK':
size_tick = Decimal('1')
syminfo['price_tick_size'] = price_tick syminfo['price_tick_size'] = price_tick
# NOTE: as you'd expect for "legacy" assets, the "volume # NOTE: as you'd expect for "legacy" assets, the "volume
# precision" is normally discreet. # precision" is normally discreet.

View File

@ -124,10 +124,7 @@ class PaperBoi(Struct):
# in the broker trades event processing loop # in the broker trades event processing loop
await trio.sleep(0.05) await trio.sleep(0.05)
if ( if action == 'sell':
action == 'sell'
and size > 0
):
size = -size size = -size
msg = BrokerdStatus( msg = BrokerdStatus(

View File

@ -941,7 +941,7 @@ class BackendInitMsg(Struct, frozen=True):
from each backend broker/data provider. from each backend broker/data provider.
''' '''
fqme: str fqsn: str
symbol_info: dict | None = None symbol_info: dict | None = None
mkt_info: MktPair | None = None mkt_info: MktPair | None = None
shm_write_opts: dict[str, Any] | None = None shm_write_opts: dict[str, Any] | None = None
@ -1284,9 +1284,7 @@ async def open_feed_bus(
# sync feed subscribers with flume handles # sync feed subscribers with flume handles
await ctx.started( await ctx.started(
{fqsn: flume.to_msg() {fqsn: flume.to_msg() for fqsn, flume in flumes.items()}
for fqsn, flume in flumes.items()
}
) )
if not start_stream: if not start_stream:

View File

@ -289,20 +289,6 @@ class OrderMode:
symbol = self.chart.linked.symbol symbol = self.chart.linked.symbol
# NOTE : we could also use instead,
# symbol.quantize(price, quantity_type='price')
# but it returns a Decimal and it's probably gonna
# be slower?
# TODO: should we be enforcing this precision
# at a different layer in the stack? right now
# any precision error will literally be relayed
# all the way back from the backend.
price = round(
price,
ndigits=symbol.tick_size_digits,
)
order = self._staged_order = Order( order = self._staged_order = Order(
action=action, action=action,
price=price, price=price,
@ -373,7 +359,7 @@ class OrderMode:
# NOTE: we have to str-ify `MktPair` first since we can't # NOTE: we have to str-ify `MktPair` first since we can't
# cast to it without being mega explicit with # cast to it without being mega explicit with
# `msgspec.Struct`, which we're not yet.. # `msgspec.Struct`, which we're not yet..
order: Order = staged.copy({ order = staged.copy({
'symbol': str(staged.symbol), 'symbol': str(staged.symbol),
'oid': oid, 'oid': oid,
}) })
@ -450,17 +436,8 @@ class OrderMode:
line: LevelLine, line: LevelLine,
) -> None: ) -> None:
'''
Retreive the level line's end state, compute the size
and price for the new price-level, send an update msg to
the EMS, adjust mirrored level line on secondary chart.
''' level = line.value()
mktinfo = self.chart.linked.symbol
level = round(
line.value(),
ndigits=mktinfo.tick_size_digits,
)
# updated by level change callback set in ``.new_line_from_order()`` # updated by level change callback set in ``.new_line_from_order()``
dialog = line.dialog dialog = line.dialog
size = dialog.order.size size = dialog.order.size
@ -712,7 +689,7 @@ async def open_order_mode(
# symbol names (i.e. the same names you'd get back in search # symbol names (i.e. the same names you'd get back in search
# results) in order for position msgs to correctly trigger the # results) in order for position msgs to correctly trigger the
# display of a position indicator on screen. # display of a position indicator on screen.
position_msgs: dict[str, dict[str, BrokerdPosition]] position_msgs: dict[str, list[BrokerdPosition]]
# spawn EMS actor-service # spawn EMS actor-service
async with ( async with (
@ -895,11 +872,8 @@ async def open_order_mode(
# Pack position messages by account, should only be one-to-one. # Pack position messages by account, should only be one-to-one.
# NOTE: requires the backend exactly specifies # NOTE: requires the backend exactly specifies
# the expected symbol key in its positions msg. # the expected symbol key in its positions msg.
for ( for (broker, acctid), msgs in position_msgs.items():
(broker, acctid), for msg in msgs:
pps_by_fqme
) in position_msgs.items():
for msg in pps_by_fqme.values():
await process_trade_msg( await process_trade_msg(
mode, mode,
client, client,

View File

@ -87,11 +87,8 @@ def log(
@acm @acm
async def _open_test_pikerd( async def _open_test_pikerd(
tmpconfdir: str, tmpconfdir: str,
reg_addr: tuple[str, int] | None = None, reg_addr: tuple[str, int] | None = None,
loglevel: str = 'warning', loglevel: str = 'warning',
debug_mode: bool = False,
**kwargs, **kwargs,
) -> tuple[ ) -> tuple[
@ -125,7 +122,7 @@ async def _open_test_pikerd(
# or just in sequence per test, so we keep root. # or just in sequence per test, so we keep root.
drop_root_perms_for_ahab=False, drop_root_perms_for_ahab=False,
debug_mode=debug_mode, debug_mode=True,
**kwargs, **kwargs,
@ -181,8 +178,6 @@ def open_test_pikerd(
# bind in level from fixture, which is itself set by # bind in level from fixture, which is itself set by
# `--ll <value>` cli flag. # `--ll <value>` cli flag.
loglevel=loglevel, loglevel=loglevel,
debug_mode=request.config.option.usepdb
) )
# NOTE: the `tmp_dir` fixture will wipe any files older then 3 test # NOTE: the `tmp_dir` fixture will wipe any files older then 3 test

View File

@ -13,14 +13,13 @@ from piker.data import (
ShmArray, ShmArray,
open_feed, open_feed,
) )
from piker.data.flows import Flume
from piker.accounting._mktinfo import ( from piker.accounting._mktinfo import (
unpack_fqsn, unpack_fqsn,
) )
@pytest.mark.parametrize( @pytest.mark.parametrize(
'fqmes', 'fqsns',
[ [
# binance # binance
(100, {'btcusdt.binance', 'ethusdt.binance'}, False), (100, {'btcusdt.binance', 'ethusdt.binance'}, False),
@ -31,20 +30,20 @@ from piker.accounting._mktinfo import (
# binance + kraken # binance + kraken
(100, {'btcusdt.binance', 'xbtusd.kraken'}, False), (100, {'btcusdt.binance', 'xbtusd.kraken'}, False),
], ],
ids=lambda param: f'quotes={param[0]}@fqmes={param[1]}', ids=lambda param: f'quotes={param[0]}@fqsns={param[1]}',
) )
def test_multi_fqsn_feed( def test_multi_fqsn_feed(
open_test_pikerd: AsyncContextManager, open_test_pikerd: AsyncContextManager,
fqmes: set[str], fqsns: set[str],
loglevel: str, loglevel: str,
ci_env: bool ci_env: bool
): ):
''' '''
Start a real-time data feed for provided fqme and pull Start a real-time data feed for provided fqsn and pull
a few quotes then simply shut down. a few quotes then simply shut down.
''' '''
max_quotes, fqmes, run_in_ci = fqmes max_quotes, fqsns, run_in_ci = fqsns
if ( if (
ci_env ci_env
@ -53,15 +52,15 @@ def test_multi_fqsn_feed(
pytest.skip('Skipping CI disabled test due to feed restrictions') pytest.skip('Skipping CI disabled test due to feed restrictions')
brokers = set() brokers = set()
for fqme in fqmes: for fqsn in fqsns:
brokername, key, suffix = unpack_fqsn(fqme) brokername, key, suffix = unpack_fqsn(fqsn)
brokers.add(brokername) brokers.add(brokername)
async def main(): async def main():
async with ( async with (
open_test_pikerd(), open_test_pikerd(),
open_feed( open_feed(
fqmes, fqsns,
loglevel=loglevel, loglevel=loglevel,
# TODO: ensure throttle rate is applied # TODO: ensure throttle rate is applied
@ -72,20 +71,20 @@ def test_multi_fqsn_feed(
) as feed ) as feed
): ):
# verify shm buffers exist # verify shm buffers exist
for fqin in fqmes: for fqin in fqsns:
flume = feed.flumes[fqin] flume = feed.flumes[fqin]
ohlcv: ShmArray = flume.rt_shm ohlcv: ShmArray = flume.rt_shm
hist_ohlcv: ShmArray = flume.hist_shm hist_ohlcv: ShmArray = flume.hist_shm
async with feed.open_multi_stream(brokers) as stream: async with feed.open_multi_stream(brokers) as stream:
# pull the first startup quotes, one for each fqme, and # pull the first startup quotes, one for each fqsn, and
# ensure they match each flume's startup quote value. # ensure they match each flume's startup quote value.
fqsns_copy = fqmes.copy() fqsns_copy = fqsns.copy()
with trio.fail_after(0.5): with trio.fail_after(0.5):
for _ in range(1): for _ in range(1):
first_quotes = await stream.receive() first_quotes = await stream.receive()
for fqme, quote in first_quotes.items(): for fqsn, quote in first_quotes.items():
# XXX: TODO: WTF apparently this error will get # XXX: TODO: WTF apparently this error will get
# supressed and only show up in the teardown # supressed and only show up in the teardown
@ -93,18 +92,18 @@ def test_multi_fqsn_feed(
# <tractorbugurl> # <tractorbugurl>
# assert 0 # assert 0
fqsns_copy.remove(fqme) fqsns_copy.remove(fqsn)
flume: Flume = feed.flumes[fqme] flume = feed.flumes[fqsn]
assert quote['last'] == flume.first_quote['last'] assert quote['last'] == flume.first_quote['last']
cntr = Counter() cntr = Counter()
with trio.fail_after(6): with trio.fail_after(6):
async for quotes in stream: async for quotes in stream:
for fqme, quote in quotes.items(): for fqsn, quote in quotes.items():
cntr[fqme] += 1 cntr[fqsn] += 1
# await tractor.breakpoint() # await tractor.breakpoint()
flume = feed.flumes[fqme] flume = feed.flumes[fqsn]
ohlcv: ShmArray = flume.rt_shm ohlcv: ShmArray = flume.rt_shm
hist_ohlcv: ShmArray = flume.hist_shm hist_ohlcv: ShmArray = flume.hist_shm
@ -117,7 +116,7 @@ def test_multi_fqsn_feed(
# assert last == rt_row['close'] # assert last == rt_row['close']
# assert last == hist_row['close'] # assert last == hist_row['close']
pprint( pprint(
f'{fqme}: {quote}\n' f'{fqsn}: {quote}\n'
f'rt_ohlc: {rt_row}\n' f'rt_ohlc: {rt_row}\n'
f'hist_ohlc: {hist_row}\n' f'hist_ohlc: {hist_row}\n'
) )
@ -125,6 +124,6 @@ def test_multi_fqsn_feed(
if cntr.total() >= max_quotes: if cntr.total() >= max_quotes:
break break
assert set(cntr.keys()) == fqmes assert set(cntr.keys()) == fqsns
trio.run(main) trio.run(main)

View File

@ -1,13 +1,5 @@
''' '''
Execution mgmt system (EMS) e2e testing. Paper-mode testing
Most tests leverage our paper clearing engine found (currently) in
``piker.clearing._paper_engine`.
Ideally in the longer run we are able to support forms of (non-clearing)
live order tests against certain backends that make it possible to do
so..
''' '''
from contextlib import ( from contextlib import (
contextmanager as cm, contextmanager as cm,
@ -167,27 +159,21 @@ def load_and_check_pos(
yield pp yield pp
def test_ems_err_on_bad_broker( @pytest.mark.trio
async def test_ems_err_on_bad_broker(
open_test_pikerd: Services, open_test_pikerd: Services,
loglevel: str, loglevel: str,
): ):
async def load_bad_fqme():
try: try:
async with ( async with open_ems(
open_test_pikerd() as (_, _, _, services), 'doggy.smiles',
open_ems(
'doggycoin.doggy',
mode='paper', mode='paper',
loglevel=loglevel, loglevel=loglevel,
) as _ ) as _:
):
pytest.fail('EMS is working on non-broker!?') pytest.fail('EMS is working on non-broker!?')
except ModuleNotFoundError: except ModuleNotFoundError:
pass pass
run_and_tollerate_cancels(load_bad_fqme)
async def match_ppmsgs_on_ems_boot( async def match_ppmsgs_on_ems_boot(
ppmsgs: list[BrokerdPosition], ppmsgs: list[BrokerdPosition],
@ -278,14 +264,12 @@ async def submit_and_check(
od: dict od: dict
for od in fills: for od in fills:
print(f'Sending order {od} for fill') print(f'Sending order {od} for fill')
size = od['size']
sent, msgs = await order_and_and_wait_for_ppmsg( sent, msgs = await order_and_and_wait_for_ppmsg(
client, client,
trades_stream, trades_stream,
fqme, fqme,
action='buy' if size > 0 else 'sell', action='buy',
price=100e3 if size > 0 else 0, size=od['size'],
size=size,
) )
last_order: Order = sent[-1] last_order: Order = sent[-1]
@ -316,8 +300,7 @@ async def submit_and_check(
{'size': 0.001}, {'size': 0.001},
), ),
# multi-partial entry and exits from net-zero, to short and back # multi-partial entry and exits.
# to net-zero.
( (
# enters # enters
{'size': 0.001}, {'size': 0.001},
@ -331,18 +314,10 @@ async def submit_and_check(
{'size': 0.001}, {'size': 0.001},
{'size': 0.002}, {'size': 0.002},
# nearly back to zero. # exits to get back to zero.
{'size': -0.001}, {'size': -0.001},
# switch to net-short
{'size': -0.025}, {'size': -0.025},
{'size': -0.0195}, {'size': -0.0195},
# another entry
{'size': 0.001},
# final cover to net-zero again.
{'size': 0.038},
), ),
], ],
ids='fills={}'.format, ids='fills={}'.format,
@ -353,7 +328,7 @@ def test_multi_fill_positions(
fills: tuple[dict], fills: tuple[dict],
check_cross_session: bool = False, check_cross_session: bool = True,
) -> None: ) -> None: