Compare commits
No commits in common. "f30d8667103940cdc9e071219a0f8a481ae84d80" and "81e9a990bfb8ca5fd29b05734ea4629a764195d5" have entirely different histories.
f30d866710
...
81e9a990bf
|
@ -637,8 +637,8 @@ class PpTable(Struct):
|
||||||
|
|
||||||
active, closed = self.dump_active()
|
active, closed = self.dump_active()
|
||||||
|
|
||||||
# ONLY dict-serialize all active positions; those that are
|
# ONLY dict-serialize all active positions; those that are closed
|
||||||
# closed we don't store in the ``pps.toml``.
|
# we don't store in the ``pps.toml``.
|
||||||
to_toml_dict = {}
|
to_toml_dict = {}
|
||||||
|
|
||||||
for bs_mktid, pos in active.items():
|
for bs_mktid, pos in active.items():
|
||||||
|
@ -688,12 +688,13 @@ class PpTable(Struct):
|
||||||
|
|
||||||
self.conf.update(pp_entries)
|
self.conf.update(pp_entries)
|
||||||
|
|
||||||
# if there are no active position entries according
|
elif (
|
||||||
# to the toml dump output above, then clear the config
|
self.brokername in self.conf and
|
||||||
# file of all entries.
|
self.acctid in self.conf[self.brokername]
|
||||||
elif self.conf:
|
):
|
||||||
for entry in list(self.conf):
|
del self.conf[self.brokername][self.acctid]
|
||||||
del self.conf[entry]
|
if len(self.conf[self.brokername]) == 0:
|
||||||
|
del self.conf[self.brokername]
|
||||||
|
|
||||||
# TODO: why tf haven't they already done this for inline
|
# TODO: why tf haven't they already done this for inline
|
||||||
# tables smh..
|
# tables smh..
|
||||||
|
|
|
@ -619,7 +619,7 @@ async def _setup_quote_stream(
|
||||||
async def open_aio_quote_stream(
|
async def open_aio_quote_stream(
|
||||||
|
|
||||||
symbol: str,
|
symbol: str,
|
||||||
contract: Contract | None = None,
|
contract: Optional[Contract] = None,
|
||||||
|
|
||||||
) -> trio.abc.ReceiveStream:
|
) -> trio.abc.ReceiveStream:
|
||||||
|
|
||||||
|
@ -741,9 +741,9 @@ async def stream_quotes(
|
||||||
|
|
||||||
try:
|
try:
|
||||||
(
|
(
|
||||||
con, # Contract
|
con,
|
||||||
first_ticker, # Ticker
|
first_ticker,
|
||||||
details, # ContractDetails
|
details,
|
||||||
) = await proxy.get_sym_details(symbol=sym)
|
) = await proxy.get_sym_details(symbol=sym)
|
||||||
except ConnectionError:
|
except ConnectionError:
|
||||||
log.exception(f'Proxy is ded {proxy._aio_ns}')
|
log.exception(f'Proxy is ded {proxy._aio_ns}')
|
||||||
|
@ -759,7 +759,6 @@ async def stream_quotes(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# pass back some symbol info like min_tick, trading_hours, etc.
|
# pass back some symbol info like min_tick, trading_hours, etc.
|
||||||
con: Contract = details.contract
|
|
||||||
syminfo = asdict(details)
|
syminfo = asdict(details)
|
||||||
syminfo.update(syminfo['contract'])
|
syminfo.update(syminfo['contract'])
|
||||||
|
|
||||||
|
@ -786,11 +785,6 @@ async def stream_quotes(
|
||||||
price_tick: Decimal = Decimal(str(syminfo['minTick']))
|
price_tick: Decimal = Decimal(str(syminfo['minTick']))
|
||||||
size_tick: Decimal = Decimal(str(syminfo['minSize']).rstrip('0'))
|
size_tick: Decimal = Decimal(str(syminfo['minSize']).rstrip('0'))
|
||||||
|
|
||||||
# XXX: GRRRR they don't support fractional share sizes for
|
|
||||||
# stocks from the API?!
|
|
||||||
if con.secType == 'STK':
|
|
||||||
size_tick = Decimal('1')
|
|
||||||
|
|
||||||
syminfo['price_tick_size'] = price_tick
|
syminfo['price_tick_size'] = price_tick
|
||||||
# NOTE: as you'd expect for "legacy" assets, the "volume
|
# NOTE: as you'd expect for "legacy" assets, the "volume
|
||||||
# precision" is normally discreet.
|
# precision" is normally discreet.
|
||||||
|
|
|
@ -124,10 +124,7 @@ class PaperBoi(Struct):
|
||||||
# in the broker trades event processing loop
|
# in the broker trades event processing loop
|
||||||
await trio.sleep(0.05)
|
await trio.sleep(0.05)
|
||||||
|
|
||||||
if (
|
if action == 'sell':
|
||||||
action == 'sell'
|
|
||||||
and size > 0
|
|
||||||
):
|
|
||||||
size = -size
|
size = -size
|
||||||
|
|
||||||
msg = BrokerdStatus(
|
msg = BrokerdStatus(
|
||||||
|
|
|
@ -941,7 +941,7 @@ class BackendInitMsg(Struct, frozen=True):
|
||||||
from each backend broker/data provider.
|
from each backend broker/data provider.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
fqme: str
|
fqsn: str
|
||||||
symbol_info: dict | None = None
|
symbol_info: dict | None = None
|
||||||
mkt_info: MktPair | None = None
|
mkt_info: MktPair | None = None
|
||||||
shm_write_opts: dict[str, Any] | None = None
|
shm_write_opts: dict[str, Any] | None = None
|
||||||
|
@ -1284,9 +1284,7 @@ async def open_feed_bus(
|
||||||
|
|
||||||
# sync feed subscribers with flume handles
|
# sync feed subscribers with flume handles
|
||||||
await ctx.started(
|
await ctx.started(
|
||||||
{fqsn: flume.to_msg()
|
{fqsn: flume.to_msg() for fqsn, flume in flumes.items()}
|
||||||
for fqsn, flume in flumes.items()
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if not start_stream:
|
if not start_stream:
|
||||||
|
|
|
@ -289,20 +289,6 @@ class OrderMode:
|
||||||
|
|
||||||
symbol = self.chart.linked.symbol
|
symbol = self.chart.linked.symbol
|
||||||
|
|
||||||
# NOTE : we could also use instead,
|
|
||||||
# symbol.quantize(price, quantity_type='price')
|
|
||||||
# but it returns a Decimal and it's probably gonna
|
|
||||||
# be slower?
|
|
||||||
# TODO: should we be enforcing this precision
|
|
||||||
# at a different layer in the stack? right now
|
|
||||||
# any precision error will literally be relayed
|
|
||||||
# all the way back from the backend.
|
|
||||||
|
|
||||||
price = round(
|
|
||||||
price,
|
|
||||||
ndigits=symbol.tick_size_digits,
|
|
||||||
)
|
|
||||||
|
|
||||||
order = self._staged_order = Order(
|
order = self._staged_order = Order(
|
||||||
action=action,
|
action=action,
|
||||||
price=price,
|
price=price,
|
||||||
|
@ -373,7 +359,7 @@ class OrderMode:
|
||||||
# NOTE: we have to str-ify `MktPair` first since we can't
|
# NOTE: we have to str-ify `MktPair` first since we can't
|
||||||
# cast to it without being mega explicit with
|
# cast to it without being mega explicit with
|
||||||
# `msgspec.Struct`, which we're not yet..
|
# `msgspec.Struct`, which we're not yet..
|
||||||
order: Order = staged.copy({
|
order = staged.copy({
|
||||||
'symbol': str(staged.symbol),
|
'symbol': str(staged.symbol),
|
||||||
'oid': oid,
|
'oid': oid,
|
||||||
})
|
})
|
||||||
|
@ -450,17 +436,8 @@ class OrderMode:
|
||||||
line: LevelLine,
|
line: LevelLine,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
|
||||||
Retreive the level line's end state, compute the size
|
|
||||||
and price for the new price-level, send an update msg to
|
|
||||||
the EMS, adjust mirrored level line on secondary chart.
|
|
||||||
|
|
||||||
'''
|
level = line.value()
|
||||||
mktinfo = self.chart.linked.symbol
|
|
||||||
level = round(
|
|
||||||
line.value(),
|
|
||||||
ndigits=mktinfo.tick_size_digits,
|
|
||||||
)
|
|
||||||
# updated by level change callback set in ``.new_line_from_order()``
|
# updated by level change callback set in ``.new_line_from_order()``
|
||||||
dialog = line.dialog
|
dialog = line.dialog
|
||||||
size = dialog.order.size
|
size = dialog.order.size
|
||||||
|
@ -712,7 +689,7 @@ async def open_order_mode(
|
||||||
# symbol names (i.e. the same names you'd get back in search
|
# symbol names (i.e. the same names you'd get back in search
|
||||||
# results) in order for position msgs to correctly trigger the
|
# results) in order for position msgs to correctly trigger the
|
||||||
# display of a position indicator on screen.
|
# display of a position indicator on screen.
|
||||||
position_msgs: dict[str, dict[str, BrokerdPosition]]
|
position_msgs: dict[str, list[BrokerdPosition]]
|
||||||
|
|
||||||
# spawn EMS actor-service
|
# spawn EMS actor-service
|
||||||
async with (
|
async with (
|
||||||
|
@ -895,11 +872,8 @@ async def open_order_mode(
|
||||||
# Pack position messages by account, should only be one-to-one.
|
# Pack position messages by account, should only be one-to-one.
|
||||||
# NOTE: requires the backend exactly specifies
|
# NOTE: requires the backend exactly specifies
|
||||||
# the expected symbol key in its positions msg.
|
# the expected symbol key in its positions msg.
|
||||||
for (
|
for (broker, acctid), msgs in position_msgs.items():
|
||||||
(broker, acctid),
|
for msg in msgs:
|
||||||
pps_by_fqme
|
|
||||||
) in position_msgs.items():
|
|
||||||
for msg in pps_by_fqme.values():
|
|
||||||
await process_trade_msg(
|
await process_trade_msg(
|
||||||
mode,
|
mode,
|
||||||
client,
|
client,
|
||||||
|
|
|
@ -87,11 +87,8 @@ def log(
|
||||||
@acm
|
@acm
|
||||||
async def _open_test_pikerd(
|
async def _open_test_pikerd(
|
||||||
tmpconfdir: str,
|
tmpconfdir: str,
|
||||||
|
|
||||||
reg_addr: tuple[str, int] | None = None,
|
reg_addr: tuple[str, int] | None = None,
|
||||||
loglevel: str = 'warning',
|
loglevel: str = 'warning',
|
||||||
debug_mode: bool = False,
|
|
||||||
|
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> tuple[
|
) -> tuple[
|
||||||
|
@ -125,7 +122,7 @@ async def _open_test_pikerd(
|
||||||
# or just in sequence per test, so we keep root.
|
# or just in sequence per test, so we keep root.
|
||||||
drop_root_perms_for_ahab=False,
|
drop_root_perms_for_ahab=False,
|
||||||
|
|
||||||
debug_mode=debug_mode,
|
debug_mode=True,
|
||||||
|
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
|
@ -181,8 +178,6 @@ def open_test_pikerd(
|
||||||
# bind in level from fixture, which is itself set by
|
# bind in level from fixture, which is itself set by
|
||||||
# `--ll <value>` cli flag.
|
# `--ll <value>` cli flag.
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
|
|
||||||
debug_mode=request.config.option.usepdb
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# NOTE: the `tmp_dir` fixture will wipe any files older then 3 test
|
# NOTE: the `tmp_dir` fixture will wipe any files older then 3 test
|
||||||
|
|
|
@ -13,14 +13,13 @@ from piker.data import (
|
||||||
ShmArray,
|
ShmArray,
|
||||||
open_feed,
|
open_feed,
|
||||||
)
|
)
|
||||||
from piker.data.flows import Flume
|
|
||||||
from piker.accounting._mktinfo import (
|
from piker.accounting._mktinfo import (
|
||||||
unpack_fqsn,
|
unpack_fqsn,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'fqmes',
|
'fqsns',
|
||||||
[
|
[
|
||||||
# binance
|
# binance
|
||||||
(100, {'btcusdt.binance', 'ethusdt.binance'}, False),
|
(100, {'btcusdt.binance', 'ethusdt.binance'}, False),
|
||||||
|
@ -31,20 +30,20 @@ from piker.accounting._mktinfo import (
|
||||||
# binance + kraken
|
# binance + kraken
|
||||||
(100, {'btcusdt.binance', 'xbtusd.kraken'}, False),
|
(100, {'btcusdt.binance', 'xbtusd.kraken'}, False),
|
||||||
],
|
],
|
||||||
ids=lambda param: f'quotes={param[0]}@fqmes={param[1]}',
|
ids=lambda param: f'quotes={param[0]}@fqsns={param[1]}',
|
||||||
)
|
)
|
||||||
def test_multi_fqsn_feed(
|
def test_multi_fqsn_feed(
|
||||||
open_test_pikerd: AsyncContextManager,
|
open_test_pikerd: AsyncContextManager,
|
||||||
fqmes: set[str],
|
fqsns: set[str],
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
ci_env: bool
|
ci_env: bool
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Start a real-time data feed for provided fqme and pull
|
Start a real-time data feed for provided fqsn and pull
|
||||||
a few quotes then simply shut down.
|
a few quotes then simply shut down.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
max_quotes, fqmes, run_in_ci = fqmes
|
max_quotes, fqsns, run_in_ci = fqsns
|
||||||
|
|
||||||
if (
|
if (
|
||||||
ci_env
|
ci_env
|
||||||
|
@ -53,15 +52,15 @@ def test_multi_fqsn_feed(
|
||||||
pytest.skip('Skipping CI disabled test due to feed restrictions')
|
pytest.skip('Skipping CI disabled test due to feed restrictions')
|
||||||
|
|
||||||
brokers = set()
|
brokers = set()
|
||||||
for fqme in fqmes:
|
for fqsn in fqsns:
|
||||||
brokername, key, suffix = unpack_fqsn(fqme)
|
brokername, key, suffix = unpack_fqsn(fqsn)
|
||||||
brokers.add(brokername)
|
brokers.add(brokername)
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
async with (
|
async with (
|
||||||
open_test_pikerd(),
|
open_test_pikerd(),
|
||||||
open_feed(
|
open_feed(
|
||||||
fqmes,
|
fqsns,
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
|
|
||||||
# TODO: ensure throttle rate is applied
|
# TODO: ensure throttle rate is applied
|
||||||
|
@ -72,20 +71,20 @@ def test_multi_fqsn_feed(
|
||||||
) as feed
|
) as feed
|
||||||
):
|
):
|
||||||
# verify shm buffers exist
|
# verify shm buffers exist
|
||||||
for fqin in fqmes:
|
for fqin in fqsns:
|
||||||
flume = feed.flumes[fqin]
|
flume = feed.flumes[fqin]
|
||||||
ohlcv: ShmArray = flume.rt_shm
|
ohlcv: ShmArray = flume.rt_shm
|
||||||
hist_ohlcv: ShmArray = flume.hist_shm
|
hist_ohlcv: ShmArray = flume.hist_shm
|
||||||
|
|
||||||
async with feed.open_multi_stream(brokers) as stream:
|
async with feed.open_multi_stream(brokers) as stream:
|
||||||
|
|
||||||
# pull the first startup quotes, one for each fqme, and
|
# pull the first startup quotes, one for each fqsn, and
|
||||||
# ensure they match each flume's startup quote value.
|
# ensure they match each flume's startup quote value.
|
||||||
fqsns_copy = fqmes.copy()
|
fqsns_copy = fqsns.copy()
|
||||||
with trio.fail_after(0.5):
|
with trio.fail_after(0.5):
|
||||||
for _ in range(1):
|
for _ in range(1):
|
||||||
first_quotes = await stream.receive()
|
first_quotes = await stream.receive()
|
||||||
for fqme, quote in first_quotes.items():
|
for fqsn, quote in first_quotes.items():
|
||||||
|
|
||||||
# XXX: TODO: WTF apparently this error will get
|
# XXX: TODO: WTF apparently this error will get
|
||||||
# supressed and only show up in the teardown
|
# supressed and only show up in the teardown
|
||||||
|
@ -93,18 +92,18 @@ def test_multi_fqsn_feed(
|
||||||
# <tractorbugurl>
|
# <tractorbugurl>
|
||||||
# assert 0
|
# assert 0
|
||||||
|
|
||||||
fqsns_copy.remove(fqme)
|
fqsns_copy.remove(fqsn)
|
||||||
flume: Flume = feed.flumes[fqme]
|
flume = feed.flumes[fqsn]
|
||||||
assert quote['last'] == flume.first_quote['last']
|
assert quote['last'] == flume.first_quote['last']
|
||||||
|
|
||||||
cntr = Counter()
|
cntr = Counter()
|
||||||
with trio.fail_after(6):
|
with trio.fail_after(6):
|
||||||
async for quotes in stream:
|
async for quotes in stream:
|
||||||
for fqme, quote in quotes.items():
|
for fqsn, quote in quotes.items():
|
||||||
cntr[fqme] += 1
|
cntr[fqsn] += 1
|
||||||
|
|
||||||
# await tractor.breakpoint()
|
# await tractor.breakpoint()
|
||||||
flume = feed.flumes[fqme]
|
flume = feed.flumes[fqsn]
|
||||||
ohlcv: ShmArray = flume.rt_shm
|
ohlcv: ShmArray = flume.rt_shm
|
||||||
hist_ohlcv: ShmArray = flume.hist_shm
|
hist_ohlcv: ShmArray = flume.hist_shm
|
||||||
|
|
||||||
|
@ -117,7 +116,7 @@ def test_multi_fqsn_feed(
|
||||||
# assert last == rt_row['close']
|
# assert last == rt_row['close']
|
||||||
# assert last == hist_row['close']
|
# assert last == hist_row['close']
|
||||||
pprint(
|
pprint(
|
||||||
f'{fqme}: {quote}\n'
|
f'{fqsn}: {quote}\n'
|
||||||
f'rt_ohlc: {rt_row}\n'
|
f'rt_ohlc: {rt_row}\n'
|
||||||
f'hist_ohlc: {hist_row}\n'
|
f'hist_ohlc: {hist_row}\n'
|
||||||
)
|
)
|
||||||
|
@ -125,6 +124,6 @@ def test_multi_fqsn_feed(
|
||||||
if cntr.total() >= max_quotes:
|
if cntr.total() >= max_quotes:
|
||||||
break
|
break
|
||||||
|
|
||||||
assert set(cntr.keys()) == fqmes
|
assert set(cntr.keys()) == fqsns
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
|
@ -1,13 +1,5 @@
|
||||||
'''
|
'''
|
||||||
Execution mgmt system (EMS) e2e testing.
|
Paper-mode testing
|
||||||
|
|
||||||
Most tests leverage our paper clearing engine found (currently) in
|
|
||||||
``piker.clearing._paper_engine`.
|
|
||||||
|
|
||||||
Ideally in the longer run we are able to support forms of (non-clearing)
|
|
||||||
live order tests against certain backends that make it possible to do
|
|
||||||
so..
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
contextmanager as cm,
|
contextmanager as cm,
|
||||||
|
@ -167,26 +159,20 @@ def load_and_check_pos(
|
||||||
yield pp
|
yield pp
|
||||||
|
|
||||||
|
|
||||||
def test_ems_err_on_bad_broker(
|
@pytest.mark.trio
|
||||||
|
async def test_ems_err_on_bad_broker(
|
||||||
open_test_pikerd: Services,
|
open_test_pikerd: Services,
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
):
|
):
|
||||||
async def load_bad_fqme():
|
try:
|
||||||
try:
|
async with open_ems(
|
||||||
async with (
|
'doggy.smiles',
|
||||||
open_test_pikerd() as (_, _, _, services),
|
mode='paper',
|
||||||
|
loglevel=loglevel,
|
||||||
open_ems(
|
) as _:
|
||||||
'doggycoin.doggy',
|
pytest.fail('EMS is working on non-broker!?')
|
||||||
mode='paper',
|
except ModuleNotFoundError:
|
||||||
loglevel=loglevel,
|
pass
|
||||||
) as _
|
|
||||||
):
|
|
||||||
pytest.fail('EMS is working on non-broker!?')
|
|
||||||
except ModuleNotFoundError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
run_and_tollerate_cancels(load_bad_fqme)
|
|
||||||
|
|
||||||
|
|
||||||
async def match_ppmsgs_on_ems_boot(
|
async def match_ppmsgs_on_ems_boot(
|
||||||
|
@ -278,14 +264,12 @@ async def submit_and_check(
|
||||||
od: dict
|
od: dict
|
||||||
for od in fills:
|
for od in fills:
|
||||||
print(f'Sending order {od} for fill')
|
print(f'Sending order {od} for fill')
|
||||||
size = od['size']
|
|
||||||
sent, msgs = await order_and_and_wait_for_ppmsg(
|
sent, msgs = await order_and_and_wait_for_ppmsg(
|
||||||
client,
|
client,
|
||||||
trades_stream,
|
trades_stream,
|
||||||
fqme,
|
fqme,
|
||||||
action='buy' if size > 0 else 'sell',
|
action='buy',
|
||||||
price=100e3 if size > 0 else 0,
|
size=od['size'],
|
||||||
size=size,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
last_order: Order = sent[-1]
|
last_order: Order = sent[-1]
|
||||||
|
@ -316,8 +300,7 @@ async def submit_and_check(
|
||||||
{'size': 0.001},
|
{'size': 0.001},
|
||||||
),
|
),
|
||||||
|
|
||||||
# multi-partial entry and exits from net-zero, to short and back
|
# multi-partial entry and exits.
|
||||||
# to net-zero.
|
|
||||||
(
|
(
|
||||||
# enters
|
# enters
|
||||||
{'size': 0.001},
|
{'size': 0.001},
|
||||||
|
@ -331,18 +314,10 @@ async def submit_and_check(
|
||||||
{'size': 0.001},
|
{'size': 0.001},
|
||||||
{'size': 0.002},
|
{'size': 0.002},
|
||||||
|
|
||||||
# nearly back to zero.
|
# exits to get back to zero.
|
||||||
{'size': -0.001},
|
{'size': -0.001},
|
||||||
|
|
||||||
# switch to net-short
|
|
||||||
{'size': -0.025},
|
{'size': -0.025},
|
||||||
{'size': -0.0195},
|
{'size': -0.0195},
|
||||||
|
|
||||||
# another entry
|
|
||||||
{'size': 0.001},
|
|
||||||
|
|
||||||
# final cover to net-zero again.
|
|
||||||
{'size': 0.038},
|
|
||||||
),
|
),
|
||||||
],
|
],
|
||||||
ids='fills={}'.format,
|
ids='fills={}'.format,
|
||||||
|
@ -353,7 +328,7 @@ def test_multi_fill_positions(
|
||||||
|
|
||||||
fills: tuple[dict],
|
fills: tuple[dict],
|
||||||
|
|
||||||
check_cross_session: bool = False,
|
check_cross_session: bool = True,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
Loading…
Reference in New Issue