Compare commits
10 Commits
81e9a990bf
...
f30d866710
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | f30d866710 | |
Tyler Goodlet | 1bd4fd80f8 | |
Tyler Goodlet | 65e42b79ac | |
Tyler Goodlet | 2f5c456d4b | |
Tyler Goodlet | 7786ffa889 | |
Tyler Goodlet | 3edc95dda0 | |
Tyler Goodlet | ecd7500ee6 | |
Tyler Goodlet | 114e7660aa | |
Tyler Goodlet | d29e9eeb31 | |
Tyler Goodlet | 69d6e9bb4e |
|
@ -637,8 +637,8 @@ class PpTable(Struct):
|
|||
|
||||
active, closed = self.dump_active()
|
||||
|
||||
# ONLY dict-serialize all active positions; those that are closed
|
||||
# we don't store in the ``pps.toml``.
|
||||
# ONLY dict-serialize all active positions; those that are
|
||||
# closed we don't store in the ``pps.toml``.
|
||||
to_toml_dict = {}
|
||||
|
||||
for bs_mktid, pos in active.items():
|
||||
|
@ -688,13 +688,12 @@ class PpTable(Struct):
|
|||
|
||||
self.conf.update(pp_entries)
|
||||
|
||||
elif (
|
||||
self.brokername in self.conf and
|
||||
self.acctid in self.conf[self.brokername]
|
||||
):
|
||||
del self.conf[self.brokername][self.acctid]
|
||||
if len(self.conf[self.brokername]) == 0:
|
||||
del self.conf[self.brokername]
|
||||
# if there are no active position entries according
|
||||
# to the toml dump output above, then clear the config
|
||||
# file of all entries.
|
||||
elif self.conf:
|
||||
for entry in list(self.conf):
|
||||
del self.conf[entry]
|
||||
|
||||
# TODO: why tf haven't they already done this for inline
|
||||
# tables smh..
|
||||
|
|
|
@ -619,7 +619,7 @@ async def _setup_quote_stream(
|
|||
async def open_aio_quote_stream(
|
||||
|
||||
symbol: str,
|
||||
contract: Optional[Contract] = None,
|
||||
contract: Contract | None = None,
|
||||
|
||||
) -> trio.abc.ReceiveStream:
|
||||
|
||||
|
@ -741,9 +741,9 @@ async def stream_quotes(
|
|||
|
||||
try:
|
||||
(
|
||||
con,
|
||||
first_ticker,
|
||||
details,
|
||||
con, # Contract
|
||||
first_ticker, # Ticker
|
||||
details, # ContractDetails
|
||||
) = await proxy.get_sym_details(symbol=sym)
|
||||
except ConnectionError:
|
||||
log.exception(f'Proxy is ded {proxy._aio_ns}')
|
||||
|
@ -759,6 +759,7 @@ async def stream_quotes(
|
|||
|
||||
'''
|
||||
# pass back some symbol info like min_tick, trading_hours, etc.
|
||||
con: Contract = details.contract
|
||||
syminfo = asdict(details)
|
||||
syminfo.update(syminfo['contract'])
|
||||
|
||||
|
@ -785,6 +786,11 @@ async def stream_quotes(
|
|||
price_tick: Decimal = Decimal(str(syminfo['minTick']))
|
||||
size_tick: Decimal = Decimal(str(syminfo['minSize']).rstrip('0'))
|
||||
|
||||
# XXX: GRRRR they don't support fractional share sizes for
|
||||
# stocks from the API?!
|
||||
if con.secType == 'STK':
|
||||
size_tick = Decimal('1')
|
||||
|
||||
syminfo['price_tick_size'] = price_tick
|
||||
# NOTE: as you'd expect for "legacy" assets, the "volume
|
||||
# precision" is normally discreet.
|
||||
|
|
|
@ -124,7 +124,10 @@ class PaperBoi(Struct):
|
|||
# in the broker trades event processing loop
|
||||
await trio.sleep(0.05)
|
||||
|
||||
if action == 'sell':
|
||||
if (
|
||||
action == 'sell'
|
||||
and size > 0
|
||||
):
|
||||
size = -size
|
||||
|
||||
msg = BrokerdStatus(
|
||||
|
|
|
@ -941,7 +941,7 @@ class BackendInitMsg(Struct, frozen=True):
|
|||
from each backend broker/data provider.
|
||||
|
||||
'''
|
||||
fqsn: str
|
||||
fqme: str
|
||||
symbol_info: dict | None = None
|
||||
mkt_info: MktPair | None = None
|
||||
shm_write_opts: dict[str, Any] | None = None
|
||||
|
@ -1284,7 +1284,9 @@ async def open_feed_bus(
|
|||
|
||||
# sync feed subscribers with flume handles
|
||||
await ctx.started(
|
||||
{fqsn: flume.to_msg() for fqsn, flume in flumes.items()}
|
||||
{fqsn: flume.to_msg()
|
||||
for fqsn, flume in flumes.items()
|
||||
}
|
||||
)
|
||||
|
||||
if not start_stream:
|
||||
|
|
|
@ -289,6 +289,20 @@ class OrderMode:
|
|||
|
||||
symbol = self.chart.linked.symbol
|
||||
|
||||
# NOTE : we could also use instead,
|
||||
# symbol.quantize(price, quantity_type='price')
|
||||
# but it returns a Decimal and it's probably gonna
|
||||
# be slower?
|
||||
# TODO: should we be enforcing this precision
|
||||
# at a different layer in the stack? right now
|
||||
# any precision error will literally be relayed
|
||||
# all the way back from the backend.
|
||||
|
||||
price = round(
|
||||
price,
|
||||
ndigits=symbol.tick_size_digits,
|
||||
)
|
||||
|
||||
order = self._staged_order = Order(
|
||||
action=action,
|
||||
price=price,
|
||||
|
@ -359,7 +373,7 @@ class OrderMode:
|
|||
# NOTE: we have to str-ify `MktPair` first since we can't
|
||||
# cast to it without being mega explicit with
|
||||
# `msgspec.Struct`, which we're not yet..
|
||||
order = staged.copy({
|
||||
order: Order = staged.copy({
|
||||
'symbol': str(staged.symbol),
|
||||
'oid': oid,
|
||||
})
|
||||
|
@ -436,8 +450,17 @@ class OrderMode:
|
|||
line: LevelLine,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Retreive the level line's end state, compute the size
|
||||
and price for the new price-level, send an update msg to
|
||||
the EMS, adjust mirrored level line on secondary chart.
|
||||
|
||||
level = line.value()
|
||||
'''
|
||||
mktinfo = self.chart.linked.symbol
|
||||
level = round(
|
||||
line.value(),
|
||||
ndigits=mktinfo.tick_size_digits,
|
||||
)
|
||||
# updated by level change callback set in ``.new_line_from_order()``
|
||||
dialog = line.dialog
|
||||
size = dialog.order.size
|
||||
|
@ -689,7 +712,7 @@ async def open_order_mode(
|
|||
# symbol names (i.e. the same names you'd get back in search
|
||||
# results) in order for position msgs to correctly trigger the
|
||||
# display of a position indicator on screen.
|
||||
position_msgs: dict[str, list[BrokerdPosition]]
|
||||
position_msgs: dict[str, dict[str, BrokerdPosition]]
|
||||
|
||||
# spawn EMS actor-service
|
||||
async with (
|
||||
|
@ -872,8 +895,11 @@ async def open_order_mode(
|
|||
# Pack position messages by account, should only be one-to-one.
|
||||
# NOTE: requires the backend exactly specifies
|
||||
# the expected symbol key in its positions msg.
|
||||
for (broker, acctid), msgs in position_msgs.items():
|
||||
for msg in msgs:
|
||||
for (
|
||||
(broker, acctid),
|
||||
pps_by_fqme
|
||||
) in position_msgs.items():
|
||||
for msg in pps_by_fqme.values():
|
||||
await process_trade_msg(
|
||||
mode,
|
||||
client,
|
||||
|
|
|
@ -87,8 +87,11 @@ def log(
|
|||
@acm
|
||||
async def _open_test_pikerd(
|
||||
tmpconfdir: str,
|
||||
|
||||
reg_addr: tuple[str, int] | None = None,
|
||||
loglevel: str = 'warning',
|
||||
debug_mode: bool = False,
|
||||
|
||||
**kwargs,
|
||||
|
||||
) -> tuple[
|
||||
|
@ -122,7 +125,7 @@ async def _open_test_pikerd(
|
|||
# or just in sequence per test, so we keep root.
|
||||
drop_root_perms_for_ahab=False,
|
||||
|
||||
debug_mode=True,
|
||||
debug_mode=debug_mode,
|
||||
|
||||
**kwargs,
|
||||
|
||||
|
@ -178,6 +181,8 @@ def open_test_pikerd(
|
|||
# bind in level from fixture, which is itself set by
|
||||
# `--ll <value>` cli flag.
|
||||
loglevel=loglevel,
|
||||
|
||||
debug_mode=request.config.option.usepdb
|
||||
)
|
||||
|
||||
# NOTE: the `tmp_dir` fixture will wipe any files older then 3 test
|
||||
|
|
|
@ -1,5 +1,13 @@
|
|||
'''
|
||||
Paper-mode testing
|
||||
Execution mgmt system (EMS) e2e testing.
|
||||
|
||||
Most tests leverage our paper clearing engine found (currently) in
|
||||
``piker.clearing._paper_engine`.
|
||||
|
||||
Ideally in the longer run we are able to support forms of (non-clearing)
|
||||
live order tests against certain backends that make it possible to do
|
||||
so..
|
||||
|
||||
'''
|
||||
from contextlib import (
|
||||
contextmanager as cm,
|
||||
|
@ -159,20 +167,26 @@ def load_and_check_pos(
|
|||
yield pp
|
||||
|
||||
|
||||
@pytest.mark.trio
|
||||
async def test_ems_err_on_bad_broker(
|
||||
def test_ems_err_on_bad_broker(
|
||||
open_test_pikerd: Services,
|
||||
loglevel: str,
|
||||
):
|
||||
try:
|
||||
async with open_ems(
|
||||
'doggy.smiles',
|
||||
mode='paper',
|
||||
loglevel=loglevel,
|
||||
) as _:
|
||||
pytest.fail('EMS is working on non-broker!?')
|
||||
except ModuleNotFoundError:
|
||||
pass
|
||||
async def load_bad_fqme():
|
||||
try:
|
||||
async with (
|
||||
open_test_pikerd() as (_, _, _, services),
|
||||
|
||||
open_ems(
|
||||
'doggycoin.doggy',
|
||||
mode='paper',
|
||||
loglevel=loglevel,
|
||||
) as _
|
||||
):
|
||||
pytest.fail('EMS is working on non-broker!?')
|
||||
except ModuleNotFoundError:
|
||||
pass
|
||||
|
||||
run_and_tollerate_cancels(load_bad_fqme)
|
||||
|
||||
|
||||
async def match_ppmsgs_on_ems_boot(
|
||||
|
@ -264,12 +278,14 @@ async def submit_and_check(
|
|||
od: dict
|
||||
for od in fills:
|
||||
print(f'Sending order {od} for fill')
|
||||
size = od['size']
|
||||
sent, msgs = await order_and_and_wait_for_ppmsg(
|
||||
client,
|
||||
trades_stream,
|
||||
fqme,
|
||||
action='buy',
|
||||
size=od['size'],
|
||||
action='buy' if size > 0 else 'sell',
|
||||
price=100e3 if size > 0 else 0,
|
||||
size=size,
|
||||
)
|
||||
|
||||
last_order: Order = sent[-1]
|
||||
|
@ -300,7 +316,8 @@ async def submit_and_check(
|
|||
{'size': 0.001},
|
||||
),
|
||||
|
||||
# multi-partial entry and exits.
|
||||
# multi-partial entry and exits from net-zero, to short and back
|
||||
# to net-zero.
|
||||
(
|
||||
# enters
|
||||
{'size': 0.001},
|
||||
|
@ -314,10 +331,18 @@ async def submit_and_check(
|
|||
{'size': 0.001},
|
||||
{'size': 0.002},
|
||||
|
||||
# exits to get back to zero.
|
||||
# nearly back to zero.
|
||||
{'size': -0.001},
|
||||
|
||||
# switch to net-short
|
||||
{'size': -0.025},
|
||||
{'size': -0.0195},
|
||||
|
||||
# another entry
|
||||
{'size': 0.001},
|
||||
|
||||
# final cover to net-zero again.
|
||||
{'size': 0.038},
|
||||
),
|
||||
],
|
||||
ids='fills={}'.format,
|
||||
|
@ -328,7 +353,7 @@ def test_multi_fill_positions(
|
|||
|
||||
fills: tuple[dict],
|
||||
|
||||
check_cross_session: bool = True,
|
||||
check_cross_session: bool = False,
|
||||
|
||||
) -> None:
|
||||
|
|
@ -13,13 +13,14 @@ from piker.data import (
|
|||
ShmArray,
|
||||
open_feed,
|
||||
)
|
||||
from piker.data.flows import Flume
|
||||
from piker.accounting._mktinfo import (
|
||||
unpack_fqsn,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'fqsns',
|
||||
'fqmes',
|
||||
[
|
||||
# binance
|
||||
(100, {'btcusdt.binance', 'ethusdt.binance'}, False),
|
||||
|
@ -30,20 +31,20 @@ from piker.accounting._mktinfo import (
|
|||
# binance + kraken
|
||||
(100, {'btcusdt.binance', 'xbtusd.kraken'}, False),
|
||||
],
|
||||
ids=lambda param: f'quotes={param[0]}@fqsns={param[1]}',
|
||||
ids=lambda param: f'quotes={param[0]}@fqmes={param[1]}',
|
||||
)
|
||||
def test_multi_fqsn_feed(
|
||||
open_test_pikerd: AsyncContextManager,
|
||||
fqsns: set[str],
|
||||
fqmes: set[str],
|
||||
loglevel: str,
|
||||
ci_env: bool
|
||||
):
|
||||
'''
|
||||
Start a real-time data feed for provided fqsn and pull
|
||||
Start a real-time data feed for provided fqme and pull
|
||||
a few quotes then simply shut down.
|
||||
|
||||
'''
|
||||
max_quotes, fqsns, run_in_ci = fqsns
|
||||
max_quotes, fqmes, run_in_ci = fqmes
|
||||
|
||||
if (
|
||||
ci_env
|
||||
|
@ -52,15 +53,15 @@ def test_multi_fqsn_feed(
|
|||
pytest.skip('Skipping CI disabled test due to feed restrictions')
|
||||
|
||||
brokers = set()
|
||||
for fqsn in fqsns:
|
||||
brokername, key, suffix = unpack_fqsn(fqsn)
|
||||
for fqme in fqmes:
|
||||
brokername, key, suffix = unpack_fqsn(fqme)
|
||||
brokers.add(brokername)
|
||||
|
||||
async def main():
|
||||
async with (
|
||||
open_test_pikerd(),
|
||||
open_feed(
|
||||
fqsns,
|
||||
fqmes,
|
||||
loglevel=loglevel,
|
||||
|
||||
# TODO: ensure throttle rate is applied
|
||||
|
@ -71,20 +72,20 @@ def test_multi_fqsn_feed(
|
|||
) as feed
|
||||
):
|
||||
# verify shm buffers exist
|
||||
for fqin in fqsns:
|
||||
for fqin in fqmes:
|
||||
flume = feed.flumes[fqin]
|
||||
ohlcv: ShmArray = flume.rt_shm
|
||||
hist_ohlcv: ShmArray = flume.hist_shm
|
||||
|
||||
async with feed.open_multi_stream(brokers) as stream:
|
||||
|
||||
# pull the first startup quotes, one for each fqsn, and
|
||||
# pull the first startup quotes, one for each fqme, and
|
||||
# ensure they match each flume's startup quote value.
|
||||
fqsns_copy = fqsns.copy()
|
||||
fqsns_copy = fqmes.copy()
|
||||
with trio.fail_after(0.5):
|
||||
for _ in range(1):
|
||||
first_quotes = await stream.receive()
|
||||
for fqsn, quote in first_quotes.items():
|
||||
for fqme, quote in first_quotes.items():
|
||||
|
||||
# XXX: TODO: WTF apparently this error will get
|
||||
# supressed and only show up in the teardown
|
||||
|
@ -92,18 +93,18 @@ def test_multi_fqsn_feed(
|
|||
# <tractorbugurl>
|
||||
# assert 0
|
||||
|
||||
fqsns_copy.remove(fqsn)
|
||||
flume = feed.flumes[fqsn]
|
||||
fqsns_copy.remove(fqme)
|
||||
flume: Flume = feed.flumes[fqme]
|
||||
assert quote['last'] == flume.first_quote['last']
|
||||
|
||||
cntr = Counter()
|
||||
with trio.fail_after(6):
|
||||
async for quotes in stream:
|
||||
for fqsn, quote in quotes.items():
|
||||
cntr[fqsn] += 1
|
||||
for fqme, quote in quotes.items():
|
||||
cntr[fqme] += 1
|
||||
|
||||
# await tractor.breakpoint()
|
||||
flume = feed.flumes[fqsn]
|
||||
flume = feed.flumes[fqme]
|
||||
ohlcv: ShmArray = flume.rt_shm
|
||||
hist_ohlcv: ShmArray = flume.hist_shm
|
||||
|
||||
|
@ -116,7 +117,7 @@ def test_multi_fqsn_feed(
|
|||
# assert last == rt_row['close']
|
||||
# assert last == hist_row['close']
|
||||
pprint(
|
||||
f'{fqsn}: {quote}\n'
|
||||
f'{fqme}: {quote}\n'
|
||||
f'rt_ohlc: {rt_row}\n'
|
||||
f'hist_ohlc: {hist_row}\n'
|
||||
)
|
||||
|
@ -124,6 +125,6 @@ def test_multi_fqsn_feed(
|
|||
if cntr.total() >= max_quotes:
|
||||
break
|
||||
|
||||
assert set(cntr.keys()) == fqsns
|
||||
assert set(cntr.keys()) == fqmes
|
||||
|
||||
trio.run(main)
|
||||
|
|
Loading…
Reference in New Issue