Compare commits
14 Commits
d475347e53
...
3d00d27a12
Author | SHA1 | Date |
---|---|---|
|
3d00d27a12 | |
|
d545641d06 | |
|
54d4d9fd56 | |
|
5fc56cff98 | |
|
f04d2d27fd | |
|
1d19f01b79 | |
|
bceab54adb | |
|
84b1189e1a | |
|
1e16642c12 | |
|
7eb779a682 | |
|
a82f2fa578 | |
|
193f5c1ae9 | |
|
8901360695 | |
|
a14b599835 |
|
@ -29,6 +29,7 @@ import time
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
AsyncIterator,
|
AsyncIterator,
|
||||||
|
Callable,
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -53,6 +54,9 @@ from ._util import (
|
||||||
get_console_log,
|
get_console_log,
|
||||||
)
|
)
|
||||||
from ..service import maybe_spawn_daemon
|
from ..service import maybe_spawn_daemon
|
||||||
|
from piker.log import (
|
||||||
|
mk_repr,
|
||||||
|
)
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ._sharedmem import (
|
from ._sharedmem import (
|
||||||
|
@ -561,7 +565,6 @@ async def open_sample_stream(
|
||||||
|
|
||||||
|
|
||||||
async def sample_and_broadcast(
|
async def sample_and_broadcast(
|
||||||
|
|
||||||
bus: _FeedsBus, # noqa
|
bus: _FeedsBus, # noqa
|
||||||
rt_shm: ShmArray,
|
rt_shm: ShmArray,
|
||||||
hist_shm: ShmArray,
|
hist_shm: ShmArray,
|
||||||
|
@ -582,11 +585,22 @@ async def sample_and_broadcast(
|
||||||
|
|
||||||
overruns = Counter()
|
overruns = Counter()
|
||||||
|
|
||||||
|
# multiline nested `dict` formatter (since rn quote-msgs are
|
||||||
|
# just that).
|
||||||
|
pfmt: Callable[[str], str] = mk_repr()
|
||||||
|
|
||||||
# iterate stream delivered by broker
|
# iterate stream delivered by broker
|
||||||
async for quotes in quote_stream:
|
async for quotes in quote_stream:
|
||||||
# print(quotes)
|
|
||||||
|
|
||||||
# TODO: ``numba`` this!
|
# XXX WARNING XXX only enable for debugging bc ow can cost
|
||||||
|
# ALOT of perf with HF-feedz!!!
|
||||||
|
#
|
||||||
|
# log.info(
|
||||||
|
# 'Rx live quotes:\n'
|
||||||
|
# f'{pfmt(quotes)}'
|
||||||
|
# )
|
||||||
|
|
||||||
|
# TODO: `numba` this!
|
||||||
for broker_symbol, quote in quotes.items():
|
for broker_symbol, quote in quotes.items():
|
||||||
# TODO: in theory you can send the IPC msg *before* writing
|
# TODO: in theory you can send the IPC msg *before* writing
|
||||||
# to the sharedmem array to decrease latency, however, that
|
# to the sharedmem array to decrease latency, however, that
|
||||||
|
@ -659,6 +673,18 @@ async def sample_and_broadcast(
|
||||||
sub_key: str = broker_symbol.lower()
|
sub_key: str = broker_symbol.lower()
|
||||||
subs: set[Sub] = bus.get_subs(sub_key)
|
subs: set[Sub] = bus.get_subs(sub_key)
|
||||||
|
|
||||||
|
if not subs:
|
||||||
|
all_bs_fqmes: list[str] = list(
|
||||||
|
bus._subscribers.keys()
|
||||||
|
)
|
||||||
|
log.warning(
|
||||||
|
f'No subscribers for {brokername!r} live-quote ??\n'
|
||||||
|
f'broker_symbol: {broker_symbol}\n\n'
|
||||||
|
|
||||||
|
f'Maybe the backend-sys symbol does not match one of,\n'
|
||||||
|
f'{pfmt(all_bs_fqmes)}\n'
|
||||||
|
)
|
||||||
|
|
||||||
# NOTE: by default the broker backend doesn't append
|
# NOTE: by default the broker backend doesn't append
|
||||||
# it's own "name" into the fqme schema (but maybe it
|
# it's own "name" into the fqme schema (but maybe it
|
||||||
# should?) so we have to manually generate the correct
|
# should?) so we have to manually generate the correct
|
||||||
|
|
|
@ -359,8 +359,8 @@ async def open_autorecon_ws(
|
||||||
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
JSONRPC response-request style machinery for transparent multiplexing of msgs
|
JSONRPC response-request style machinery for transparent multiplexing
|
||||||
over a NoBsWs.
|
of msgs over a `NoBsWs`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
@ -377,19 +377,35 @@ async def open_jsonrpc_session(
|
||||||
url: str,
|
url: str,
|
||||||
start_id: int = 0,
|
start_id: int = 0,
|
||||||
response_type: type = JSONRPCResult,
|
response_type: type = JSONRPCResult,
|
||||||
request_type: Optional[type] = None,
|
msg_recv_timeout: float = float('inf'),
|
||||||
request_hook: Optional[Callable] = None,
|
# ^NOTE, since only `deribit` is using this jsonrpc stuff atm
|
||||||
error_hook: Optional[Callable] = None,
|
# and options mkts are generally "slow moving"..
|
||||||
|
#
|
||||||
|
# FURTHER if we break the underlying ws connection then since we
|
||||||
|
# don't pass a `fixture` to the task that manages `NoBsWs`, i.e.
|
||||||
|
# `_reconnect_forever()`, the jsonrpc "transport pipe" get's
|
||||||
|
# broken and never restored with wtv init sequence is required to
|
||||||
|
# re-establish a working req-resp session.
|
||||||
|
|
||||||
|
# request_type: Optional[type] = None,
|
||||||
|
# request_hook: Optional[Callable] = None,
|
||||||
|
# error_hook: Optional[Callable] = None,
|
||||||
) -> Callable[[str, dict], dict]:
|
) -> Callable[[str, dict], dict]:
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
trio.open_nursery() as n,
|
trio.open_nursery() as n,
|
||||||
open_autorecon_ws(url) as ws
|
open_autorecon_ws(
|
||||||
|
url=url,
|
||||||
|
msg_recv_timeout=msg_recv_timeout,
|
||||||
|
) as ws
|
||||||
):
|
):
|
||||||
rpc_id: Iterable = count(start_id)
|
rpc_id: Iterable = count(start_id)
|
||||||
rpc_results: dict[int, dict] = {}
|
rpc_results: dict[int, dict] = {}
|
||||||
|
|
||||||
async def json_rpc(method: str, params: dict) -> dict:
|
async def json_rpc(
|
||||||
|
method: str,
|
||||||
|
params: dict,
|
||||||
|
) -> dict:
|
||||||
'''
|
'''
|
||||||
perform a json rpc call and wait for the result, raise exception in
|
perform a json rpc call and wait for the result, raise exception in
|
||||||
case of error field present on response
|
case of error field present on response
|
||||||
|
|
|
@ -540,7 +540,10 @@ async def open_feed_bus(
|
||||||
# subscription since the backend isn't (yet) expected to
|
# subscription since the backend isn't (yet) expected to
|
||||||
# append it's own name to the fqme, so we filter on keys
|
# append it's own name to the fqme, so we filter on keys
|
||||||
# which *do not* include that name (e.g .ib) .
|
# which *do not* include that name (e.g .ib) .
|
||||||
bus._subscribers.setdefault(bs_fqme, set())
|
bus._subscribers.setdefault(
|
||||||
|
bs_fqme,
|
||||||
|
set(),
|
||||||
|
)
|
||||||
|
|
||||||
# sync feed subscribers with flume handles
|
# sync feed subscribers with flume handles
|
||||||
await ctx.started(
|
await ctx.started(
|
||||||
|
|
28
piker/log.py
28
piker/log.py
|
@ -18,7 +18,11 @@
|
||||||
Log like a forester!
|
Log like a forester!
|
||||||
"""
|
"""
|
||||||
import logging
|
import logging
|
||||||
|
import reprlib
|
||||||
import json
|
import json
|
||||||
|
from typing import (
|
||||||
|
Callable,
|
||||||
|
)
|
||||||
|
|
||||||
import tractor
|
import tractor
|
||||||
from pygments import (
|
from pygments import (
|
||||||
|
@ -84,3 +88,27 @@ def colorize_json(
|
||||||
# likeable styles: algol_nu, tango, monokai
|
# likeable styles: algol_nu, tango, monokai
|
||||||
formatters.TerminalTrueColorFormatter(style=style)
|
formatters.TerminalTrueColorFormatter(style=style)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def mk_repr(
|
||||||
|
**repr_kws,
|
||||||
|
) -> Callable[[str], str]:
|
||||||
|
'''
|
||||||
|
Allocate and deliver a `repr.Repr` instance with provided input
|
||||||
|
settings using the std-lib's `reprlib` mod,
|
||||||
|
* https://docs.python.org/3/library/reprlib.html
|
||||||
|
|
||||||
|
------ Ex. ------
|
||||||
|
An up to 6-layer-nested `dict` as multi-line:
|
||||||
|
- https://stackoverflow.com/a/79102479
|
||||||
|
- https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
|
||||||
|
|
||||||
|
'''
|
||||||
|
def_kws: dict[str, int] = dict(
|
||||||
|
indent=2,
|
||||||
|
maxlevel=6, # recursion levels
|
||||||
|
maxstring=66, # match editor line-len limit
|
||||||
|
)
|
||||||
|
def_kws |= repr_kws
|
||||||
|
reprr = reprlib.Repr(**def_kws)
|
||||||
|
return reprr.repr
|
||||||
|
|
|
@ -161,7 +161,12 @@ class NativeStorageClient:
|
||||||
|
|
||||||
def index_files(self):
|
def index_files(self):
|
||||||
for path in self._datadir.iterdir():
|
for path in self._datadir.iterdir():
|
||||||
if path.name in {'borked', 'expired',}:
|
if (
|
||||||
|
path.name in {'borked', 'expired',}
|
||||||
|
or
|
||||||
|
'.parquet' not in str(path)
|
||||||
|
):
|
||||||
|
# ignore all non-apache files (for now)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
key: str = path.name.rstrip('.parquet')
|
key: str = path.name.rstrip('.parquet')
|
||||||
|
|
|
@ -434,21 +434,32 @@ async def start_backfill(
|
||||||
# - some other unknown error (ib blocking the
|
# - some other unknown error (ib blocking the
|
||||||
# history bc they don't want you seeing how they
|
# history bc they don't want you seeing how they
|
||||||
# cucked all the tinas..)
|
# cucked all the tinas..)
|
||||||
if dur := frame_types.get(timeframe):
|
if (
|
||||||
# decrement by a frame's worth of duration and
|
frame_types
|
||||||
# retry a few times.
|
and
|
||||||
last_start_dt.subtract(
|
(dur := frame_types.get(timeframe))
|
||||||
|
):
|
||||||
|
# decrement by a duration's (frame) worth of time
|
||||||
|
# as maybe indicated by the backend to see if we
|
||||||
|
# can get older data before this possible
|
||||||
|
# "history gap".
|
||||||
|
orig_last_start_dt = last_start_dt
|
||||||
|
last_start_dt = last_start_dt.subtract(
|
||||||
seconds=dur.total_seconds()
|
seconds=dur.total_seconds()
|
||||||
)
|
)
|
||||||
log.warning(
|
log.warning(
|
||||||
f'{mod.name} -> EMPTY FRAME for end_dt?\n'
|
f'{mod.name} -> EMPTY FRAME for end_dt?\n'
|
||||||
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
|
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
|
||||||
'bf_until <- last_start_dt:\n'
|
f'Decrementing `end_dt` by {dur} and retry..\n\n'
|
||||||
f'{backfill_until_dt} <- {last_start_dt}\n'
|
|
||||||
f'Decrementing `end_dt` by {dur} and retry..\n'
|
f'orig_last_start_dt: {orig_last_start_dt}\n'
|
||||||
|
f'dur subtracted last_start_dt: {last_start_dt}\n'
|
||||||
|
f'bf_until: {backfill_until_dt}\n'
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
raise
|
||||||
|
|
||||||
# broker says there never was or is no more history to pull
|
# broker says there never was or is no more history to pull
|
||||||
except DataUnavailable:
|
except DataUnavailable:
|
||||||
log.warning(
|
log.warning(
|
||||||
|
|
Loading…
Reference in New Issue