Compare commits

..

17 Commits

Author SHA1 Message Date
Tyler Goodlet 521a194819 `.brokers.cli`: module type and todo for `--pdb` flag to NOT src from sub-cmd 2025-02-21 16:49:13 -05:00
Tyler Goodlet a64b4272f4 Type loaded backend modules 2025-02-21 16:49:13 -05:00
Tyler Goodlet b536c093d5 Bump various `.brokers.core` doc string content/style 2025-02-21 16:49:13 -05:00
Tyler Goodlet c3f3b25524 Teensie `piker.data` styling tweaks
- use more compact optional value style with `|`-union
- fix `.flows` typing-only import since we need `MktPair` to be
  immediately defined for use on a `msgspec.Struct` field.
- more "tree-like" warning msg in `.validate()` reporting.
2025-02-21 16:37:42 -05:00
Tyler Goodlet 8dcdf7c9a9 Invert `getattr()` check for `get_mkt_pairs()` ep
Such that we `return` early when not defined by the provider backend to
reduce an indent level in `SymbologyCache.load()`.
2025-02-21 16:37:42 -05:00
Tyler Goodlet 4d15b9bfdd Allow ledger passes to ignore (symcache) unknown fqmes
For example in the paper-eng, if you have a backend that doesn't fully
support a symcache (yet) it's handy to be able to ignore processing
other paper-eng txns when all you care about at the moment is the
simulated symbol.

NOTE, that currently this will still result in a key-error when you load
more then one mkt with the paper engine (for which the backend does not
have the symcache implemented) since no fqme ad-hoc query was made for
the 2nd symbol (and i'm not sure we should support that kinda hackery
over just encouraging the sym-cache being added?). Def needs a little
more thought depending on how many backends are never going to be able
to (easily) support caching..
2025-02-21 16:37:42 -05:00
goodboy 5e371f1d73 Merge pull request 'jsonrpc_err_in_rent' (#41) from jsonrpc_err_in_rent into gitea_feats
Reviewed-on: #41
2025-02-21 21:21:02 +00:00
goodboy 6c221bb348 Merge pull request 'tsp_gaps: fixes for fault-less OHLCV time-series loads' (#35) from tsp_gaps into gitea_feats
Reviewed-on: #35
2025-02-21 20:46:37 +00:00
Tyler Goodlet e391c896f8 Mk jsronrpc's underlying ws timeout `float('inf')`
Since currently we're only using this IPC subsys for `deribit`, and
generally speaking we're primarly supporting options markets (which are
fairly "slow moving"), flip to a default of NOT resetting the `NoBsWs`
on timeout since doing so normally breaks the jsron-rpc IPC session.
Without a proper `fixture` passed to `open_autorecon_ws()` (which we
should eventually implement!!) relying on a timeout-to-reset more or
less will just cause breakage issues - a proper reconnect sequence must
be implemented before using that feature.

Deats,
- expose and proxy through the `msg_recv_timeout` from
  `open_jsonrpc_session()` into the underlying `open_autorecon_ws()`
  call.
2025-02-19 17:05:13 -05:00
Tyler Goodlet 5633f5614d Doc-n-clean `.data._web_bs.open_jsonrpc_session()`
Add a doc-string reflecting recent refinements, drop all the old hook
params, rename `n: trio.Nursery` -> `tn` for "task nursery" fitting with
code base's naming style.
2025-02-19 17:05:13 -05:00
Tyler Goodlet 76735189de data._web_bs: try to raise jsonrpc errors in parent task 2025-02-19 17:05:13 -05:00
Tyler Goodlet d49608f74e Refine history gap/termination signalling
Namely handling backends which do not provide a default "frame
size-duration" in their init-config by making the backfiller guess the
value based on the first frame received.

Deats,
- adjust `start_backfill()` to take a more explicit
  `def_frame_duration: Duration` expected to be unpacked from any
  backend hist init-config by the `tsdb_backfill()` caller which now
  also computes a value from the first received frame when the config
  section isn't provided.
- in `start_backfill()` we now always expect the `def_frame_duration`
  input and always decrement the query range by this value whenever
  a `NoData` is raised by the provider-backend paired with an explicit
  `log.warning()` about the handling.
- also relay any `DataUnavailable.args[0]` message from the provider
  in the handler.
- repair "gap reporting" which checks for expected frame duration vs.
  that received with much better humanized logging on the missing
  segment using `pendulum.Interval/Duration.in_words()` output.
2025-02-19 17:01:24 -05:00
Tyler Goodlet bf0ac93aa3 Only use `frame_types` if delivered during enter
The `open_history_client()` provider endpoint can *optionally*
deliver a `frame_types: dict[int, pendulum.Duration]` subsection in its
`config: dict[str, dict]` (as was implemented with the `ib` backend).
This allows the `tsp` backfilling machinery to use this "recommended
frame duration" to subtract from the `last_start_dt` any time a `NoData`
gap is signalled by the `get_hist()` call allowing gaps to be ignored
safely without missing history by knowing the next earliest dt we can
query from using the `end_dt`. However, currently all crypto$ providers
haven't implemented this feat yet..

As such only try to use the `frame_types` feature if provided when
handling `NoData` conditions inside `tsp.start_backfill()` and otherwise
raise as normal.
2025-02-19 17:01:24 -05:00
Tyler Goodlet d7179d47b0 `.tsp._anal`: add (unused) `detect_vlm_gaps()` 2025-02-19 17:01:24 -05:00
Tyler Goodlet c390e87536 `.storage.cli`: collect gap-markup-aids into `tf2aids: dict` prior to pause for introspection 2025-02-19 17:01:24 -05:00
Tyler Goodlet 5e4a6d61c7 Ignore any non-`.parquet` files under `.config/piker/nativedb/` subdir 2025-02-19 17:01:24 -05:00
Tyler Goodlet 3caaa30b03 Mask no-data pause, add perps to no-`/src`-in-fqme asset set
Was orig for debugging an issue with `kucoin` i think but definitely
shouldn't be left in XD

Also add `'perpetual_future'` to the `.start_backfill()` input literal
set since we don't expect the 'btc/usd.perp.binance' for now.
2025-02-19 17:01:24 -05:00
8 changed files with 282 additions and 163 deletions

View File

@ -168,6 +168,7 @@ class OrderClient(Struct):
async def relay_orders_from_sync_code(
client: OrderClient,
symbol_key: str,
to_ems_stream: tractor.MsgStream,
@ -241,11 +242,6 @@ async def open_ems(
async with maybe_open_emsd(
broker,
# XXX NOTE, LOL so this determines the daemon `emsd` loglevel
# then FYI.. that's kinda wrong no?
# -[ ] shouldn't it be set by `pikerd -l` or no?
# -[ ] would make a lot more sense to have a subsys ctl for
# levels.. like `-l emsd.info` or something?
loglevel=loglevel,
) as portal:

View File

@ -653,11 +653,7 @@ class Router(Struct):
flume = feed.flumes[fqme]
first_quote: dict = flume.first_quote
book: DarkBook = self.get_dark_book(broker)
if not (last := first_quote.get('last')):
last: float = flume.rt_shm.array[-1]['close']
book.lasts[fqme]: float = float(last)
book.lasts[fqme]: float = float(first_quote['last'])
async with self.maybe_open_brokerd_dialog(
brokermod=brokermod,
@ -720,7 +716,7 @@ class Router(Struct):
subs = self.subscribers[sub_key]
sent_some: bool = False
for client_stream in subs.copy():
for client_stream in subs:
try:
await client_stream.send(msg)
sent_some = True
@ -1014,14 +1010,10 @@ async def translate_and_relay_brokerd_events(
status_msg.brokerd_msg = msg
status_msg.src = msg.broker_details['name']
if not status_msg.req:
# likely some order change state?
await tractor.pause()
else:
await router.client_broadcast(
status_msg.req.symbol,
status_msg,
)
await router.client_broadcast(
status_msg.req.symbol,
status_msg,
)
if status == 'closed':
log.info(f'Execution for {oid} is complete!')

View File

@ -297,8 +297,6 @@ class PaperBoi(Struct):
# transmit pp msg to ems
pp: Position = self.acnt.pps[bs_mktid]
# TODO, this will break if `require_only=True` was passed to
# `.update_from_ledger()`
pp_msg = BrokerdPosition(
broker=self.broker,

View File

@ -30,7 +30,6 @@ subsys: str = 'piker.clearing'
log = get_logger(subsys)
# TODO, oof doesn't this ignore the `loglevel` then???
get_console_log = partial(
get_console_log,
name=subsys,

View File

@ -95,12 +95,6 @@ class Sampler:
# history loading.
incr_task_cs: trio.CancelScope | None = None
bcast_errors: tuple[Exception] = (
trio.BrokenResourceError,
trio.ClosedResourceError,
trio.EndOfChannel,
)
# holds all the ``tractor.Context`` remote subscriptions for
# a particular sample period increment event: all subscribers are
# notified on a step.
@ -264,15 +258,14 @@ class Sampler:
subs: set
last_ts, subs = pair
# NOTE, for debugging pub-sub issues
# task = trio.lowlevel.current_task()
# log.debug(
# f'AlL-SUBS@{period_s!r}: {self.subscribers}\n'
# f'PAIR: {pair}\n'
# f'TASK: {task}: {id(task)}\n'
# f'broadcasting {period_s} -> {last_ts}\n'
# f'consumers: {subs}'
# )
task = trio.lowlevel.current_task()
log.debug(
f'SUBS {self.subscribers}\n'
f'PAIR {pair}\n'
f'TASK: {task}: {id(task)}\n'
f'broadcasting {period_s} -> {last_ts}\n'
# f'consumers: {subs}'
)
borked: set[MsgStream] = set()
sent: set[MsgStream] = set()
while True:
@ -289,11 +282,12 @@ class Sampler:
await stream.send(msg)
sent.add(stream)
except self.bcast_errors as err:
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
log.error(
f'Connection dropped for IPC ctx\n'
f'{stream._ctx}\n\n'
f'Due to {type(err)}'
f'{stream._ctx.chan.uid} dropped connection'
)
borked.add(stream)
else:
@ -400,8 +394,7 @@ async def register_with_sampler(
finally:
if (
sub_for_broadcasts
and
subs
and subs
):
try:
subs.remove(stream)
@ -568,7 +561,8 @@ async def open_sample_stream(
async def sample_and_broadcast(
bus: _FeedsBus,
bus: _FeedsBus, # noqa
rt_shm: ShmArray,
hist_shm: ShmArray,
quote_stream: trio.abc.ReceiveChannel,
@ -588,33 +582,11 @@ async def sample_and_broadcast(
overruns = Counter()
# NOTE, only used for debugging live-data-feed issues, though
# this should be resolved more correctly in the future using the
# new typed-msgspec feats of `tractor`!
#
# XXX, a multiline nested `dict` formatter (since rn quote-msgs
# are just that).
# pfmt: Callable[[str], str] = mk_repr()
# iterate stream delivered by broker
async for quotes in quote_stream:
# print(quotes)
# XXX WARNING XXX only enable for debugging bc ow can cost
# ALOT of perf with HF-feedz!!!
#
# log.info(
# 'Rx live quotes:\n'
# f'{pfmt(quotes)}'
# )
# TODO,
# -[ ] `numba` or `cython`-nize this loop possibly?
# |_alternatively could we do it in rust somehow by upacking
# arrow msgs instead of using `msgspec`?
# -[ ] use `msgspec.Struct` support in new typed-msging from
# `tractor` to ensure only allowed msgs are transmitted?
#
# TODO: ``numba`` this!
for broker_symbol, quote in quotes.items():
# TODO: in theory you can send the IPC msg *before* writing
# to the sharedmem array to decrease latency, however, that
@ -687,21 +659,6 @@ async def sample_and_broadcast(
sub_key: str = broker_symbol.lower()
subs: set[Sub] = bus.get_subs(sub_key)
# TODO, figure out how to make this useful whilst
# incoporating feed "pausing" ..
#
# if not subs:
# all_bs_fqmes: list[str] = list(
# bus._subscribers.keys()
# )
# log.warning(
# f'No subscribers for {brokername!r} live-quote ??\n'
# f'broker_symbol: {broker_symbol}\n\n'
# f'Maybe the backend-sys symbol does not match one of,\n'
# f'{pfmt(all_bs_fqmes)}\n'
# )
# NOTE: by default the broker backend doesn't append
# it's own "name" into the fqme schema (but maybe it
# should?) so we have to manually generate the correct
@ -771,14 +728,18 @@ async def sample_and_broadcast(
if lags > 10:
await tractor.pause()
except Sampler.bcast_errors as ipc_err:
except (
trio.BrokenResourceError,
trio.ClosedResourceError,
trio.EndOfChannel,
):
ctx: Context = ipc._ctx
chan: Channel = ctx.chan
if ctx:
log.warning(
f'Dropped `brokerd`-feed for {broker_symbol!r} due to,\n'
f'x>) {ctx.cid}@{chan.uid}'
f'|_{ipc_err!r}\n\n'
'Dropped `brokerd`-quotes-feed connection:\n'
f'{broker_symbol}:'
f'{ctx.cid}@{chan.uid}'
)
if sub.throttle_rate:
assert ipc._closed
@ -795,11 +756,12 @@ async def sample_and_broadcast(
async def uniform_rate_send(
rate: float,
quote_stream: trio.abc.ReceiveChannel,
stream: MsgStream,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
) -> None:
'''
@ -817,16 +779,13 @@ async def uniform_rate_send(
https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9
'''
# ?TODO? dynamically compute the **actual** approx overhead latency per cycle
# instead of this magic # bidinezz?
throttle_period: float = 1/rate - 0.000616
left_to_sleep: float = throttle_period
# TODO: compute the approx overhead latency per cycle
left_to_sleep = throttle_period = 1/rate - 0.000616
# send cycle state
first_quote: dict|None
first_quote = last_quote = None
last_send: float = time.time()
diff: float = 0
last_send = time.time()
diff = 0
task_status.started()
ticks_by_type: dict[
@ -837,28 +796,22 @@ async def uniform_rate_send(
clear_types = _tick_groups['clears']
while True:
# compute the remaining time to sleep for this throttled cycle
left_to_sleep: float = throttle_period - diff
left_to_sleep = throttle_period - diff
if left_to_sleep > 0:
cs: trio.CancelScope
with trio.move_on_after(left_to_sleep) as cs:
sym: str
last_quote: dict
try:
sym, last_quote = await quote_stream.receive()
except trio.EndOfChannel:
log.exception(
f'Live stream for feed for ended?\n'
f'<=c\n'
f' |_[{stream!r}\n'
)
log.exception(f"feed for {stream} ended?")
break
diff: float = time.time() - last_send
diff = time.time() - last_send
if not first_quote:
first_quote: float = last_quote
first_quote = last_quote
# first_quote['tbt'] = ticks_by_type
if (throttle_period - diff) > 0:
@ -919,9 +872,7 @@ async def uniform_rate_send(
# TODO: now if only we could sync this to the display
# rate timing exactly lul
try:
await stream.send({
sym: first_quote
})
await stream.send({sym: first_quote})
except tractor.RemoteActorError as rme:
if rme.type is not tractor._exceptions.StreamOverrun:
raise
@ -932,28 +883,19 @@ async def uniform_rate_send(
f'{sym}:{ctx.cid}@{chan.uid}'
)
# NOTE: any of these can be raised by `tractor`'s IPC
# transport-layer and we want to be highly resilient
# to consumers which crash or lose network connection.
# I.e. we **DO NOT** want to crash and propagate up to
# ``pikerd`` these kinds of errors!
except (
# NOTE: any of these can be raised by ``tractor``'s IPC
# transport-layer and we want to be highly resilient
# to consumers which crash or lose network connection.
# I.e. we **DO NOT** want to crash and propagate up to
# ``pikerd`` these kinds of errors!
trio.ClosedResourceError,
trio.BrokenResourceError,
ConnectionResetError,
) + Sampler.bcast_errors as ipc_err:
match ipc_err:
case trio.EndOfChannel():
log.info(
f'{stream} terminated by peer,\n'
f'{ipc_err!r}'
)
case _:
# if the feed consumer goes down then drop
# out of this rate limiter
log.warning(
f'{stream} closed due to,\n'
f'{ipc_err!r}'
)
):
# if the feed consumer goes down then drop
# out of this rate limiter
log.warning(f'{stream} closed')
await stream.aclose()
return

View File

@ -19,10 +19,6 @@ Log like a forester!
"""
import logging
import json
import reprlib
from typing import (
Callable,
)
import tractor
from pygments import (
@ -88,29 +84,3 @@ def colorize_json(
# likeable styles: algol_nu, tango, monokai
formatters.TerminalTrueColorFormatter(style=style)
)
# TODO, eventually defer to the version in `modden` once
# it becomes a dep!
def mk_repr(
**repr_kws,
) -> Callable[[str], str]:
'''
Allocate and deliver a `repr.Repr` instance with provided input
settings using the std-lib's `reprlib` mod,
* https://docs.python.org/3/library/reprlib.html
------ Ex. ------
An up to 6-layer-nested `dict` as multi-line:
- https://stackoverflow.com/a/79102479
- https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
'''
def_kws: dict[str, int] = dict(
indent=2,
maxlevel=6, # recursion levels
maxstring=66, # match editor line-len limit
)
def_kws |= repr_kws
reprr = reprlib.Repr(**def_kws)
return reprr.repr

View File

@ -119,10 +119,6 @@ async def open_piker_runtime(
# spawn other specialized daemons I think?
enable_modules=enable_modules,
# TODO: how to configure this?
# keep it on by default if debug mode is set?
maybe_enable_greenback=False,
**tractor_kwargs,
) as actor,

View File

@ -21,4 +21,230 @@ Extensions to built-in or (heavily used but 3rd party) friend-lib
types.
'''
from tractor.msg import Struct as Struct
from __future__ import annotations
from collections import UserList
from pprint import (
saferepr,
)
from typing import Any
from msgspec import (
msgpack,
Struct as _Struct,
structs,
)
class DiffDump(UserList):
'''
Very simple list delegator that repr() dumps (presumed) tuple
elements of the form `tuple[str, Any, Any]` in a nice
multi-line readable form for analyzing `Struct` diffs.
'''
def __repr__(self) -> str:
if not len(self):
return super().__repr__()
# format by displaying item pair's ``repr()`` on multiple,
# indented lines such that they are more easily visually
# comparable when printed to console when printed to
# console.
repstr: str = '[\n'
for k, left, right in self:
repstr += (
f'({k},\n'
f'\t{repr(left)},\n'
f'\t{repr(right)},\n'
')\n'
)
repstr += ']\n'
return repstr
class Struct(
_Struct,
# https://jcristharif.com/msgspec/structs.html#tagged-unions
# tag='pikerstruct',
# tag=True,
):
'''
A "human friendlier" (aka repl buddy) struct subtype.
'''
def _sin_props(self) -> Iterator[
tuple[
structs.FieldIinfo,
str,
Any,
]
]:
'''
Iterate over all non-@property fields of this struct.
'''
fi: structs.FieldInfo
for fi in structs.fields(self):
key: str = fi.name
val: Any = getattr(self, key)
yield fi, key, val
def to_dict(
self,
include_non_members: bool = True,
) -> dict:
'''
Like it sounds.. direct delegation to:
https://jcristharif.com/msgspec/api.html#msgspec.structs.asdict
BUT, by default we pop all non-member (aka not defined as
struct fields) fields by default.
'''
asdict: dict = structs.asdict(self)
if include_non_members:
return asdict
# only return a dict of the struct members
# which were provided as input, NOT anything
# added as type-defined `@property` methods!
sin_props: dict = {}
fi: structs.FieldInfo
for fi, k, v in self._sin_props():
sin_props[k] = asdict[k]
return sin_props
def pformat(
self,
field_indent: int = 2,
indent: int = 0,
) -> str:
'''
Recursion-safe `pprint.pformat()` style formatting of
a `msgspec.Struct` for sane reading by a human using a REPL.
'''
# global whitespace indent
ws: str = ' '*indent
# field whitespace indent
field_ws: str = ' '*(field_indent + indent)
# qtn: str = ws + self.__class__.__qualname__
qtn: str = self.__class__.__qualname__
obj_str: str = '' # accumulator
fi: structs.FieldInfo
k: str
v: Any
for fi, k, v in self._sin_props():
# TODO: how can we prefer `Literal['option1', 'option2,
# ..]` over .__name__ == `Literal` but still get only the
# latter for simple types like `str | int | None` etc..?
ft: type = fi.type
typ_name: str = getattr(ft, '__name__', str(ft))
# recurse to get sub-struct's `.pformat()` output Bo
if isinstance(v, Struct):
val_str: str = v.pformat(
indent=field_indent + indent,
field_indent=indent + field_indent,
)
else: # the `pprint` recursion-safe format:
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
val_str: str = saferepr(v)
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
return (
f'{qtn}(\n'
f'{obj_str}'
f'{ws})'
)
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty?
# def __repr__(self) -> str:
# ...
# __str__ = __repr__ = pformat
__repr__ = pformat
def copy(
self,
update: dict | None = None,
) -> Struct:
'''
Validate-typecast all self defined fields, return a copy of
us with all such fields.
NOTE: This is kinda like the default behaviour in
`pydantic.BaseModel` except a copy of the object is
returned making it compat with `frozen=True`.
'''
if update:
for k, v in update.items():
setattr(self, k, v)
# NOTE: roundtrip serialize to validate
# - enode to msgpack binary format,
# - decode that back to a struct.
return msgpack.Decoder(type=type(self)).decode(
msgpack.Encoder().encode(self)
)
def typecast(
self,
# TODO: allow only casting a named subset?
# fields: set[str] | None = None,
) -> None:
'''
Cast all fields using their declared type annotations
(kinda like what `pydantic` does by default).
NOTE: this of course won't work on frozen types, use
``.copy()`` above in such cases.
'''
# https://jcristharif.com/msgspec/api.html#msgspec.structs.fields
fi: structs.FieldInfo
for fi in structs.fields(self):
setattr(
self,
fi.name,
fi.type(getattr(self, fi.name)),
)
def __sub__(
self,
other: Struct,
) -> DiffDump[tuple[str, Any, Any]]:
'''
Compare fields/items key-wise and return a ``DiffDump``
for easy visual REPL comparison B)
'''
diffs: DiffDump[tuple[str, Any, Any]] = DiffDump()
for fi in structs.fields(self):
attr_name: str = fi.name
ours: Any = getattr(self, attr_name)
theirs: Any = getattr(other, attr_name)
if ours != theirs:
diffs.append((
attr_name,
ours,
theirs,
))
return diffs