subsys_refinery #29

Open
goodboy wants to merge 22 commits from subsys_refinery into gitea_feats
21 changed files with 572 additions and 469 deletions

View File

@ -30,7 +30,8 @@ from types import ModuleType
from typing import (
Any,
Iterator,
Generator
Generator,
TYPE_CHECKING,
)
import pendulum
@ -59,8 +60,10 @@ from ..clearing._messages import (
BrokerdPosition,
)
from piker.types import Struct
from piker.data._symcache import SymbologyCache
from ..log import get_logger
from piker.log import get_logger
if TYPE_CHECKING:
from piker.data._symcache import SymbologyCache
log = get_logger(__name__)
@ -493,6 +496,17 @@ class Account(Struct):
_mktmap_table: dict[str, MktPair] | None = None,
only_require: list[str]|True = True,
# ^list of fqmes that are "required" to be processed from
# this ledger pass; we often don't care about others and
# definitely shouldn't always error in such cases.
# (eg. broker backend loaded that doesn't yet supsport the
# symcache but also, inside the paper engine we don't ad-hoc
# request `get_mkt_info()` for every symbol in the ledger,
# only the one for which we're simulating against).
# TODO, not sure if there's a better soln for this, ideally
# all backends get symcache support afap i guess..
) -> dict[str, Position]:
'''
Update the internal `.pps[str, Position]` table from input
@ -535,11 +549,32 @@ class Account(Struct):
if _mktmap_table is None:
raise
required: bool = (
only_require is True
or (
only_require is not True
and
fqme in only_require
)
)
# XXX: caller is allowed to provide a fallback
# mktmap table for the case where a new position is
# being added and the preloaded symcache didn't
# have this entry prior (eg. with frickin IB..)
mkt = _mktmap_table[fqme]
if (
not (mkt := _mktmap_table.get(fqme))
and
required
):
raise
elif not required:
continue
else:
# should be an entry retreived somewhere
assert mkt
if not (pos := pps.get(bs_mktid)):
@ -656,7 +691,7 @@ class Account(Struct):
def write_config(self) -> None:
'''
Write the current account state to the user's account TOML file, normally
something like ``pps.toml``.
something like `pps.toml`.
'''
# TODO: show diff output?

View File

@ -98,13 +98,14 @@ async def open_cached_client(
If one has not been setup do it and cache it.
'''
brokermod = get_brokermod(brokername)
brokermod: ModuleType = get_brokermod(brokername)
# TODO: make abstract or `typing.Protocol`
# client: Client
async with maybe_open_context(
acm_func=brokermod.get_client,
kwargs=kwargs,
) as (cache_hit, client):
if cache_hit:
log.runtime(f'Reusing existing {client}')

View File

@ -471,11 +471,15 @@ def search(
'''
# global opts
brokermods = list(config['brokermods'].values())
brokermods: list[ModuleType] = list(config['brokermods'].values())
# TODO: this is coming from the `search --pdb` NOT from
# the `piker --pdb` XD ..
# -[ ] pull from the parent click ctx's values..dumdum
# assert pdb
# define tractor entrypoint
async def main(func):
async with maybe_open_pikerd(
loglevel=config['loglevel'],
debug_mode=pdb,

View File

@ -22,7 +22,9 @@ routines should be primitive data types where possible.
"""
import inspect
from types import ModuleType
from typing import List, Dict, Any, Optional
from typing import (
Any,
)
import trio
@ -34,8 +36,10 @@ from ..accounting import MktPair
async def api(brokername: str, methname: str, **kwargs) -> dict:
"""Make (proxy through) a broker API call by name and return its result.
"""
'''
Make (proxy through) a broker API call by name and return its result.
'''
brokermod = get_brokermod(brokername)
async with brokermod.get_client() as client:
meth = getattr(client, methname, None)
@ -62,10 +66,14 @@ async def api(brokername: str, methname: str, **kwargs) -> dict:
async def stocks_quote(
brokermod: ModuleType,
tickers: List[str]
) -> Dict[str, Dict[str, Any]]:
"""Return quotes dict for ``tickers``.
"""
tickers: list[str]
) -> dict[str, dict[str, Any]]:
'''
Return a `dict` of snapshot quotes for the provided input
`tickers`: a `list` of fqmes.
'''
async with brokermod.get_client() as client:
return await client.quote(tickers)
@ -74,13 +82,15 @@ async def stocks_quote(
async def option_chain(
brokermod: ModuleType,
symbol: str,
date: Optional[str] = None,
) -> Dict[str, Dict[str, Dict[str, Any]]]:
"""Return option chain for ``symbol`` for ``date``.
date: str|None = None,
) -> dict[str, dict[str, dict[str, Any]]]:
'''
Return option chain for ``symbol`` for ``date``.
By default all expiries are returned. If ``date`` is provided
then contract quotes for that single expiry are returned.
"""
'''
async with brokermod.get_client() as client:
if date:
id = int((await client.tickers2ids([symbol]))[symbol])
@ -98,7 +108,7 @@ async def option_chain(
# async def contracts(
# brokermod: ModuleType,
# symbol: str,
# ) -> Dict[str, Dict[str, Dict[str, Any]]]:
# ) -> dict[str, dict[str, dict[str, Any]]]:
# """Return option contracts (all expiries) for ``symbol``.
# """
# async with brokermod.get_client() as client:
@ -110,15 +120,24 @@ async def bars(
brokermod: ModuleType,
symbol: str,
**kwargs,
) -> Dict[str, Dict[str, Dict[str, Any]]]:
"""Return option contracts (all expiries) for ``symbol``.
"""
) -> dict[str, dict[str, dict[str, Any]]]:
'''
Return option contracts (all expiries) for ``symbol``.
'''
async with brokermod.get_client() as client:
return await client.bars(symbol, **kwargs)
async def search_w_brokerd(name: str, pattern: str) -> dict:
async def search_w_brokerd(
name: str,
pattern: str,
) -> dict:
# TODO: WHY NOT WORK!?!
# when we `step` through the next block?
# import tractor
# await tractor.pause()
async with open_cached_client(name) as client:
# TODO: support multiple asset type concurrent searches.
@ -130,12 +149,12 @@ async def symbol_search(
pattern: str,
**kwargs,
) -> Dict[str, Dict[str, Dict[str, Any]]]:
) -> dict[str, dict[str, dict[str, Any]]]:
'''
Return symbol info from broker.
'''
results = []
results: list[str] = []
async def search_backend(
brokermod: ModuleType
@ -143,6 +162,13 @@ async def symbol_search(
brokername: str = mod.name
# TODO: figure this the FUCK OUT
# -> ok so obvi in the root actor any async task that's
# spawned outside the main tractor-root-actor task needs to
# call this..
# await tractor.devx._debug.maybe_init_greenback()
# tractor.pause_from_sync()
async with maybe_spawn_brokerd(
mod.name,
infect_asyncio=getattr(
@ -162,7 +188,6 @@ async def symbol_search(
))
async with trio.open_nursery() as n:
for mod in brokermods:
n.start_soon(search_backend, mod.name)
@ -172,11 +197,13 @@ async def symbol_search(
async def mkt_info(
brokermod: ModuleType,
fqme: str,
**kwargs,
) -> MktPair:
'''
Return MktPair info from broker including src and dst assets.
Return the `piker.accounting.MktPair` info struct from a given
backend broker tradable src/dst asset pair.
'''
async with open_cached_client(brokermod.name) as client:

View File

@ -168,7 +168,6 @@ class OrderClient(Struct):
async def relay_orders_from_sync_code(
client: OrderClient,
symbol_key: str,
to_ems_stream: tractor.MsgStream,
@ -242,6 +241,11 @@ async def open_ems(
async with maybe_open_emsd(
broker,
# XXX NOTE, LOL so this determines the daemon `emsd` loglevel
# then FYI.. that's kinda wrong no?
# -[ ] shouldn't it be set by `pikerd -l` or no?
# -[ ] would make a lot more sense to have a subsys ctl for
# levels.. like `-l emsd.info` or something?
loglevel=loglevel,
) as portal:

View File

@ -653,7 +653,11 @@ class Router(Struct):
flume = feed.flumes[fqme]
first_quote: dict = flume.first_quote
book: DarkBook = self.get_dark_book(broker)
book.lasts[fqme]: float = float(first_quote['last'])
if not (last := first_quote.get('last')):
last: float = flume.rt_shm.array[-1]['close']
book.lasts[fqme]: float = float(last)
async with self.maybe_open_brokerd_dialog(
brokermod=brokermod,
@ -716,7 +720,7 @@ class Router(Struct):
subs = self.subscribers[sub_key]
sent_some: bool = False
for client_stream in subs:
for client_stream in subs.copy():
try:
await client_stream.send(msg)
sent_some = True
@ -1010,10 +1014,14 @@ async def translate_and_relay_brokerd_events(
status_msg.brokerd_msg = msg
status_msg.src = msg.broker_details['name']
await router.client_broadcast(
status_msg.req.symbol,
status_msg,
)
if not status_msg.req:
# likely some order change state?
await tractor.pause()
else:
await router.client_broadcast(
status_msg.req.symbol,
status_msg,
)
if status == 'closed':
log.info(f'Execution for {oid} is complete!')

View File

@ -297,6 +297,8 @@ class PaperBoi(Struct):
# transmit pp msg to ems
pp: Position = self.acnt.pps[bs_mktid]
# TODO, this will break if `require_only=True` was passed to
# `.update_from_ledger()`
pp_msg = BrokerdPosition(
broker=self.broker,
@ -653,6 +655,7 @@ async def open_trade_dialog(
# in) use manually constructed table from calling
# the `.get_mkt_info()` provider EP above.
_mktmap_table=mkt_by_fqme,
only_require=list(mkt_by_fqme),
)
pp_msgs: list[BrokerdPosition] = []

View File

@ -30,6 +30,7 @@ subsys: str = 'piker.clearing'
log = get_logger(subsys)
# TODO, oof doesn't this ignore the `loglevel` then???
get_console_log = partial(
get_console_log,
name=subsys,

View File

@ -95,6 +95,12 @@ class Sampler:
# history loading.
incr_task_cs: trio.CancelScope | None = None
bcast_errors: tuple[Exception] = (
trio.BrokenResourceError,
trio.ClosedResourceError,
trio.EndOfChannel,
)
# holds all the ``tractor.Context`` remote subscriptions for
# a particular sample period increment event: all subscribers are
# notified on a step.
@ -258,14 +264,15 @@ class Sampler:
subs: set
last_ts, subs = pair
task = trio.lowlevel.current_task()
log.debug(
f'SUBS {self.subscribers}\n'
f'PAIR {pair}\n'
f'TASK: {task}: {id(task)}\n'
f'broadcasting {period_s} -> {last_ts}\n'
# f'consumers: {subs}'
)
# NOTE, for debugging pub-sub issues
# task = trio.lowlevel.current_task()
# log.debug(
# f'AlL-SUBS@{period_s!r}: {self.subscribers}\n'
# f'PAIR: {pair}\n'
# f'TASK: {task}: {id(task)}\n'
# f'broadcasting {period_s} -> {last_ts}\n'
# f'consumers: {subs}'
# )
borked: set[MsgStream] = set()
sent: set[MsgStream] = set()
while True:
@ -282,12 +289,11 @@ class Sampler:
await stream.send(msg)
sent.add(stream)
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
except self.bcast_errors as err:
log.error(
f'{stream._ctx.chan.uid} dropped connection'
f'Connection dropped for IPC ctx\n'
f'{stream._ctx}\n\n'
f'Due to {type(err)}'
)
borked.add(stream)
else:
@ -394,7 +400,8 @@ async def register_with_sampler(
finally:
if (
sub_for_broadcasts
and subs
and
subs
):
try:
subs.remove(stream)
@ -561,8 +568,7 @@ async def open_sample_stream(
async def sample_and_broadcast(
bus: _FeedsBus, # noqa
bus: _FeedsBus,
rt_shm: ShmArray,
hist_shm: ShmArray,
quote_stream: trio.abc.ReceiveChannel,
@ -582,11 +588,33 @@ async def sample_and_broadcast(
overruns = Counter()
# NOTE, only used for debugging live-data-feed issues, though
# this should be resolved more correctly in the future using the
# new typed-msgspec feats of `tractor`!
#
# XXX, a multiline nested `dict` formatter (since rn quote-msgs
# are just that).
# pfmt: Callable[[str], str] = mk_repr()
# iterate stream delivered by broker
async for quotes in quote_stream:
# print(quotes)
# TODO: ``numba`` this!
# XXX WARNING XXX only enable for debugging bc ow can cost
# ALOT of perf with HF-feedz!!!
#
# log.info(
# 'Rx live quotes:\n'
# f'{pfmt(quotes)}'
# )
# TODO,
# -[ ] `numba` or `cython`-nize this loop possibly?
# |_alternatively could we do it in rust somehow by upacking
# arrow msgs instead of using `msgspec`?
# -[ ] use `msgspec.Struct` support in new typed-msging from
# `tractor` to ensure only allowed msgs are transmitted?
#
for broker_symbol, quote in quotes.items():
# TODO: in theory you can send the IPC msg *before* writing
# to the sharedmem array to decrease latency, however, that
@ -659,6 +687,21 @@ async def sample_and_broadcast(
sub_key: str = broker_symbol.lower()
subs: set[Sub] = bus.get_subs(sub_key)
# TODO, figure out how to make this useful whilst
# incoporating feed "pausing" ..
#
# if not subs:
# all_bs_fqmes: list[str] = list(
# bus._subscribers.keys()
# )
# log.warning(
# f'No subscribers for {brokername!r} live-quote ??\n'
# f'broker_symbol: {broker_symbol}\n\n'
# f'Maybe the backend-sys symbol does not match one of,\n'
# f'{pfmt(all_bs_fqmes)}\n'
# )
# NOTE: by default the broker backend doesn't append
# it's own "name" into the fqme schema (but maybe it
# should?) so we have to manually generate the correct
@ -728,18 +771,14 @@ async def sample_and_broadcast(
if lags > 10:
await tractor.pause()
except (
trio.BrokenResourceError,
trio.ClosedResourceError,
trio.EndOfChannel,
):
except Sampler.bcast_errors as ipc_err:
ctx: Context = ipc._ctx
chan: Channel = ctx.chan
if ctx:
log.warning(
'Dropped `brokerd`-quotes-feed connection:\n'
f'{broker_symbol}:'
f'{ctx.cid}@{chan.uid}'
f'Dropped `brokerd`-feed for {broker_symbol!r} due to,\n'
f'x>) {ctx.cid}@{chan.uid}'
f'|_{ipc_err!r}\n\n'
)
if sub.throttle_rate:
assert ipc._closed
@ -756,12 +795,11 @@ async def sample_and_broadcast(
async def uniform_rate_send(
rate: float,
quote_stream: trio.abc.ReceiveChannel,
stream: MsgStream,
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
@ -779,13 +817,16 @@ async def uniform_rate_send(
https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9
'''
# TODO: compute the approx overhead latency per cycle
left_to_sleep = throttle_period = 1/rate - 0.000616
# ?TODO? dynamically compute the **actual** approx overhead latency per cycle
# instead of this magic # bidinezz?
throttle_period: float = 1/rate - 0.000616
left_to_sleep: float = throttle_period
# send cycle state
first_quote: dict|None
first_quote = last_quote = None
last_send = time.time()
diff = 0
last_send: float = time.time()
diff: float = 0
task_status.started()
ticks_by_type: dict[
@ -796,22 +837,28 @@ async def uniform_rate_send(
clear_types = _tick_groups['clears']
while True:
# compute the remaining time to sleep for this throttled cycle
left_to_sleep = throttle_period - diff
left_to_sleep: float = throttle_period - diff
if left_to_sleep > 0:
cs: trio.CancelScope
with trio.move_on_after(left_to_sleep) as cs:
sym: str
last_quote: dict
try:
sym, last_quote = await quote_stream.receive()
except trio.EndOfChannel:
log.exception(f"feed for {stream} ended?")
log.exception(
f'Live stream for feed for ended?\n'
f'<=c\n'
f' |_[{stream!r}\n'
)
break
diff = time.time() - last_send
diff: float = time.time() - last_send
if not first_quote:
first_quote = last_quote
first_quote: float = last_quote
# first_quote['tbt'] = ticks_by_type
if (throttle_period - diff) > 0:
@ -872,7 +919,9 @@ async def uniform_rate_send(
# TODO: now if only we could sync this to the display
# rate timing exactly lul
try:
await stream.send({sym: first_quote})
await stream.send({
sym: first_quote
})
except tractor.RemoteActorError as rme:
if rme.type is not tractor._exceptions.StreamOverrun:
raise
@ -883,19 +932,28 @@ async def uniform_rate_send(
f'{sym}:{ctx.cid}@{chan.uid}'
)
# NOTE: any of these can be raised by `tractor`'s IPC
# transport-layer and we want to be highly resilient
# to consumers which crash or lose network connection.
# I.e. we **DO NOT** want to crash and propagate up to
# ``pikerd`` these kinds of errors!
except (
# NOTE: any of these can be raised by ``tractor``'s IPC
# transport-layer and we want to be highly resilient
# to consumers which crash or lose network connection.
# I.e. we **DO NOT** want to crash and propagate up to
# ``pikerd`` these kinds of errors!
trio.ClosedResourceError,
trio.BrokenResourceError,
ConnectionResetError,
):
# if the feed consumer goes down then drop
# out of this rate limiter
log.warning(f'{stream} closed')
) + Sampler.bcast_errors as ipc_err:
match ipc_err:
case trio.EndOfChannel():
log.info(
f'{stream} terminated by peer,\n'
f'{ipc_err!r}'
)
case _:
# if the feed consumer goes down then drop
# out of this rate limiter
log.warning(
f'{stream} closed due to,\n'
f'{ipc_err!r}'
)
await stream.aclose()
return

View File

@ -31,6 +31,7 @@ from pathlib import Path
from pprint import pformat
from typing import (
Any,
Callable,
Sequence,
Hashable,
TYPE_CHECKING,
@ -56,7 +57,7 @@ from piker.brokers import (
)
if TYPE_CHECKING:
from ..accounting import (
from piker.accounting import (
Asset,
MktPair,
)
@ -149,57 +150,68 @@ class SymbologyCache(Struct):
'Implement `Client.get_assets()`!'
)
if get_mkt_pairs := getattr(client, 'get_mkt_pairs', None):
pairs: dict[str, Struct] = await get_mkt_pairs()
for bs_fqme, pair in pairs.items():
# NOTE: every backend defined pair should
# declare it's ns path for roundtrip
# serialization lookup.
if not getattr(pair, 'ns_path', None):
raise TypeError(
f'Pair-struct for {self.mod.name} MUST define a '
'`.ns_path: str`!\n'
f'{pair}'
)
entry = await self.mod.get_mkt_info(pair.bs_fqme)
if not entry:
continue
mkt: MktPair
pair: Struct
mkt, _pair = entry
assert _pair is pair, (
f'`{self.mod.name}` backend probably has a '
'keying-symmetry problem between the pair-`Struct` '
'returned from `Client.get_mkt_pairs()`and the '
'module level endpoint: `.get_mkt_info()`\n\n'
"Here's the struct diff:\n"
f'{_pair - pair}'
)
# NOTE XXX: this means backends MUST implement
# a `Struct.bs_mktid: str` field to provide
# a native-keyed map to their own symbol
# set(s).
self.pairs[pair.bs_mktid] = pair
# NOTE: `MktPair`s are keyed here using piker's
# internal FQME schema so that search,
# accounting and feed init can be accomplished
# a sane, uniform, normalized basis.
self.mktmaps[mkt.fqme] = mkt
self.pair_ns_path: str = tractor.msg.NamespacePath.from_ref(
pair,
)
else:
get_mkt_pairs: Callable|None = getattr(
client,
'get_mkt_pairs',
None,
)
if not get_mkt_pairs:
log.warning(
'No symbology cache `Pair` support for `{provider}`..\n'
'Implement `Client.get_mkt_pairs()`!'
)
return self
pairs: dict[str, Struct] = await get_mkt_pairs()
if not pairs:
log.warning(
'No pairs from intial {provider!r} sym-cache request?\n\n'
'`Client.get_mkt_pairs()` -> {pairs!r} ?'
)
return self
for bs_fqme, pair in pairs.items():
if not getattr(pair, 'ns_path', None):
# XXX: every backend defined pair must declare
# a `.ns_path: tractor.NamespacePath` to enable
# roundtrip serialization lookup from a local
# cache file.
raise TypeError(
f'Pair-struct for {self.mod.name} MUST define a '
'`.ns_path: str`!\n\n'
f'{pair!r}'
)
entry = await self.mod.get_mkt_info(pair.bs_fqme)
if not entry:
continue
mkt: MktPair
pair: Struct
mkt, _pair = entry
assert _pair is pair, (
f'`{self.mod.name}` backend probably has a '
'keying-symmetry problem between the pair-`Struct` '
'returned from `Client.get_mkt_pairs()`and the '
'module level endpoint: `.get_mkt_info()`\n\n'
"Here's the struct diff:\n"
f'{_pair - pair}'
)
# NOTE XXX: this means backends MUST implement
# a `Struct.bs_mktid: str` field to provide
# a native-keyed map to their own symbol
# set(s).
self.pairs[pair.bs_mktid] = pair
# NOTE: `MktPair`s are keyed here using piker's
# internal FQME schema so that search,
# accounting and feed init can be accomplished
# a sane, uniform, normalized basis.
self.mktmaps[mkt.fqme] = mkt
self.pair_ns_path: str = tractor.msg.NamespacePath.from_ref(
pair,
)
return self

View File

@ -273,7 +273,7 @@ async def _reconnect_forever(
nobsws._connected.set()
await trio.sleep_forever()
except HandshakeError:
log.exception(f'Retrying connection')
log.exception('Retrying connection')
# ws & nursery block ends
@ -359,8 +359,8 @@ async def open_autorecon_ws(
'''
JSONRPC response-request style machinery for transparent multiplexing of msgs
over a NoBsWs.
JSONRPC response-request style machinery for transparent multiplexing
of msgs over a `NoBsWs`.
'''
@ -377,43 +377,82 @@ async def open_jsonrpc_session(
url: str,
start_id: int = 0,
response_type: type = JSONRPCResult,
request_type: Optional[type] = None,
request_hook: Optional[Callable] = None,
error_hook: Optional[Callable] = None,
msg_recv_timeout: float = float('inf'),
# ^NOTE, since only `deribit` is using this jsonrpc stuff atm
# and options mkts are generally "slow moving"..
#
# FURTHER if we break the underlying ws connection then since we
# don't pass a `fixture` to the task that manages `NoBsWs`, i.e.
# `_reconnect_forever()`, the jsonrpc "transport pipe" get's
# broken and never restored with wtv init sequence is required to
# re-establish a working req-resp session.
) -> Callable[[str, dict], dict]:
'''
Init a json-RPC-over-websocket connection to the provided `url`.
A `json_rpc: Callable[[str, dict], dict` is delivered to the
caller for sending requests and a bg-`trio.Task` handles
processing of response msgs including error reporting/raising in
the parent/caller task.
'''
# NOTE, store all request msgs so we can raise errors on the
# caller side!
req_msgs: dict[int, dict] = {}
async with (
trio.open_nursery() as n,
open_autorecon_ws(url) as ws
trio.open_nursery() as tn,
open_autorecon_ws(
url=url,
msg_recv_timeout=msg_recv_timeout,
) as ws
):
rpc_id: Iterable = count(start_id)
rpc_id: Iterable[int] = count(start_id)
rpc_results: dict[int, dict] = {}
async def json_rpc(method: str, params: dict) -> dict:
async def json_rpc(
method: str,
params: dict,
) -> dict:
'''
perform a json rpc call and wait for the result, raise exception in
case of error field present on response
'''
nonlocal req_msgs
req_id: int = next(rpc_id)
msg = {
'jsonrpc': '2.0',
'id': next(rpc_id),
'id': req_id,
'method': method,
'params': params
}
_id = msg['id']
rpc_results[_id] = {
result = rpc_results[_id] = {
'result': None,
'event': trio.Event()
'error': None,
'event': trio.Event(), # signal caller resp arrived
}
req_msgs[_id] = msg
await ws.send_msg(msg)
# wait for reponse before unblocking requester code
await rpc_results[_id]['event'].wait()
ret = rpc_results[_id]['result']
if (maybe_result := result['result']):
ret = maybe_result
del rpc_results[_id]
del rpc_results[_id]
else:
err = result['error']
raise Exception(
f'JSONRPC request failed\n'
f'req: {msg}\n'
f'resp: {err}\n'
)
if ret.error is not None:
raise Exception(json.dumps(ret.error, indent=4))
@ -428,6 +467,7 @@ async def open_jsonrpc_session(
the server side.
'''
nonlocal req_msgs
async for msg in ws:
match msg:
case {
@ -451,19 +491,28 @@ async def open_jsonrpc_session(
'params': _,
}:
log.debug(f'Recieved\n{msg}')
if request_hook:
await request_hook(request_type(**msg))
case {
'error': error
}:
log.warning(f'Recieved\n{error}')
if error_hook:
await error_hook(response_type(**msg))
# retreive orig request msg, set error
# response in original "result" msg,
# THEN FINALLY set the event to signal caller
# to raise the error in the parent task.
req_id: int = error['id']
req_msg: dict = req_msgs[req_id]
result: dict = rpc_results[req_id]
result['error'] = error
result['event'].set()
log.error(
f'JSONRPC request failed\n'
f'req: {req_msg}\n'
f'resp: {error}\n'
)
case _:
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
n.start_soon(recv_task)
tn.start_soon(recv_task)
yield json_rpc
n.cancel_scope.cancel()
tn.cancel_scope.cancel()

View File

@ -786,7 +786,6 @@ async def install_brokerd_search(
@acm
async def maybe_open_feed(
fqmes: list[str],
loglevel: str | None = None,
@ -840,13 +839,12 @@ async def maybe_open_feed(
@acm
async def open_feed(
fqmes: list[str],
loglevel: str | None = None,
loglevel: str|None = None,
allow_overruns: bool = True,
start_stream: bool = True,
tick_throttle: float | None = None, # Hz
tick_throttle: float|None = None, # Hz
allow_remote_ctl_ui: bool = False,

View File

@ -36,10 +36,10 @@ from ._sharedmem import (
ShmArray,
_Token,
)
from piker.accounting import MktPair
if TYPE_CHECKING:
from ..accounting import MktPair
from .feed import Feed
from piker.data.feed import Feed
class Flume(Struct):
@ -82,7 +82,7 @@ class Flume(Struct):
# TODO: do we need this really if we can pull the `Portal` from
# ``tractor``'s internals?
feed: Feed | None = None
feed: Feed|None = None
@property
def rt_shm(self) -> ShmArray:

View File

@ -113,9 +113,9 @@ def validate_backend(
)
if ep is None:
log.warning(
f'Provider backend {mod.name} is missing '
f'{daemon_name} support :(\n'
f'The following endpoint is missing: {name}'
f'Provider backend {mod.name!r} is missing '
f'{daemon_name!r} support?\n'
f'|_module endpoint-func missing: {name!r}\n'
)
inits: list[

View File

@ -19,6 +19,10 @@ Log like a forester!
"""
import logging
import json
import reprlib
from typing import (
Callable,
)
import tractor
from pygments import (
@ -84,3 +88,29 @@ def colorize_json(
# likeable styles: algol_nu, tango, monokai
formatters.TerminalTrueColorFormatter(style=style)
)
# TODO, eventually defer to the version in `modden` once
# it becomes a dep!
def mk_repr(
**repr_kws,
) -> Callable[[str], str]:
'''
Allocate and deliver a `repr.Repr` instance with provided input
settings using the std-lib's `reprlib` mod,
* https://docs.python.org/3/library/reprlib.html
------ Ex. ------
An up to 6-layer-nested `dict` as multi-line:
- https://stackoverflow.com/a/79102479
- https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
'''
def_kws: dict[str, int] = dict(
indent=2,
maxlevel=6, # recursion levels
maxstring=66, # match editor line-len limit
)
def_kws |= repr_kws
reprr = reprlib.Repr(**def_kws)
return reprr.repr

View File

@ -119,6 +119,10 @@ async def open_piker_runtime(
# spawn other specialized daemons I think?
enable_modules=enable_modules,
# TODO: how to configure this?
# keep it on by default if debug mode is set?
maybe_enable_greenback=False,
**tractor_kwargs,
) as actor,

View File

@ -386,6 +386,8 @@ def ldshm(
open_annot_ctl() as actl,
):
shm_df: pl.DataFrame | None = None
tf2aids: dict[float, dict] = {}
for (
shmfile,
shm,
@ -526,16 +528,17 @@ def ldshm(
new_df,
step_gaps,
)
# last chance manual overwrites in REPL
await tractor.pause()
# await tractor.pause()
assert aids
tf2aids[period_s] = aids
else:
# allow interaction even when no ts problems.
await tractor.pause()
# assert not diff
assert not diff
await tractor.pause()
log.info('Exiting TSP shm anal-izer!')
if shm_df is None:
log.error(

View File

@ -161,7 +161,13 @@ class NativeStorageClient:
def index_files(self):
for path in self._datadir.iterdir():
if path.name in {'borked', 'expired',}:
if (
path.is_dir()
or
'.parquet' not in str(path)
# or
# path.name in {'borked', 'expired',}
):
continue
key: str = path.name.rstrip('.parquet')

View File

@ -44,8 +44,10 @@ import trio
from trio_typing import TaskStatus
import tractor
from pendulum import (
Interval,
DateTime,
Duration,
duration as mk_duration,
from_timestamp,
)
import numpy as np
@ -214,7 +216,8 @@ async def maybe_fill_null_segments(
# pair, immediately stop backfilling?
if (
start_dt
and end_dt < start_dt
and
end_dt < start_dt
):
await tractor.pause()
break
@ -262,6 +265,7 @@ async def maybe_fill_null_segments(
except tractor.ContextCancelled:
# log.exception
await tractor.pause()
raise
null_segs_detected.set()
# RECHECK for more null-gaps
@ -349,7 +353,7 @@ async def maybe_fill_null_segments(
async def start_backfill(
get_hist,
frame_types: dict[str, Duration] | None,
def_frame_duration: Duration,
mod: ModuleType,
mkt: MktPair,
shm: ShmArray,
@ -379,22 +383,23 @@ async def start_backfill(
update_start_on_prepend: bool = False
if backfill_until_dt is None:
# TODO: drop this right and just expose the backfill
# limits inside a [storage] section in conf.toml?
# when no tsdb "last datum" is provided, we just load
# some near-term history.
# periods = {
# 1: {'days': 1},
# 60: {'days': 14},
# }
# do a decently sized backfill and load it into storage.
# TODO: per-provider default history-durations?
# -[ ] inside the `open_history_client()` config allow
# declaring the history duration limits instead of
# guessing and/or applying the same limits to all?
#
# -[ ] allow declaring (default) per-provider backfill
# limits inside a [storage] sub-section in conf.toml?
#
# NOTE, when no tsdb "last datum" is provided, we just
# load some near-term history by presuming a "decently
# large" 60s duration limit and a much shorter 1s range.
periods = {
1: {'days': 2},
60: {'years': 6},
}
period_duration: int = periods[timeframe]
update_start_on_prepend = True
update_start_on_prepend: bool = True
# NOTE: manually set the "latest" datetime which we intend to
# backfill history "until" so as to adhere to the history
@ -416,7 +421,6 @@ async def start_backfill(
f'backfill_until_dt: {backfill_until_dt}\n'
f'last_start_dt: {last_start_dt}\n'
)
try:
(
array,
@ -426,71 +430,114 @@ async def start_backfill(
timeframe,
end_dt=last_start_dt,
)
except NoData as _daterr:
# 3 cases:
# - frame in the middle of a legit venue gap
# - history actually began at the `last_start_dt`
# - some other unknown error (ib blocking the
# history bc they don't want you seeing how they
# cucked all the tinas..)
if dur := frame_types.get(timeframe):
# decrement by a frame's worth of duration and
# retry a few times.
last_start_dt.subtract(
seconds=dur.total_seconds()
orig_last_start_dt: datetime = last_start_dt
gap_report: str = (
f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n'
f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n'
f'last_start_dt: {orig_last_start_dt}\n\n'
f'bf_until: {backfill_until_dt}\n'
)
# EMPTY FRAME signal with 3 (likely) causes:
#
# 1. range contains legit gap in venue history
# 2. history actually (edge case) **began** at the
# value `last_start_dt`
# 3. some other unknown error (ib blocking the
# history-query bc they don't want you seeing how
# they cucked all the tinas.. like with options
# hist)
#
if def_frame_duration:
# decrement by a duration's (frame) worth of time
# as maybe indicated by the backend to see if we
# can get older data before this possible
# "history gap".
last_start_dt: datetime = last_start_dt.subtract(
seconds=def_frame_duration.total_seconds()
)
log.warning(
f'{mod.name} -> EMPTY FRAME for end_dt?\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
f'Decrementing `end_dt` by {dur} and retry..\n'
gap_report += (
f'Decrementing `end_dt` and retrying with,\n'
f'def_frame_duration: {def_frame_duration}\n'
f'(new) last_start_dt: {last_start_dt}\n'
)
log.warning(gap_report)
# skip writing to shm/tsdb and try the next
# duration's worth of prior history.
continue
# broker says there never was or is no more history to pull
except DataUnavailable:
log.warning(
f'NO-MORE-DATA in range?\n'
f'`{mod.name}` halted history:\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
)
else:
# await tractor.pause()
raise DataUnavailable(gap_report)
# ugh, what's a better way?
# TODO: fwiw, we probably want a way to signal a throttle
# condition (eg. with ib) so that we can halt the
# request loop until the condition is resolved?
if timeframe > 1:
await tractor.pause()
# broker says there never was or is no more history to pull
except DataUnavailable as due:
message: str = due.args[0]
log.warning(
f'Provider {mod.name!r} halted backfill due to,\n\n'
f'{message}\n'
f'fqme: {mkt.fqme}\n'
f'timeframe: {timeframe}\n'
f'last_start_dt: {last_start_dt}\n'
f'bf_until: {backfill_until_dt}\n'
)
# UGH: what's a better way?
# TODO: backends are responsible for being correct on
# this right!?
# -[ ] in the `ib` case we could maybe offer some way
# to halt the request loop until the condition is
# resolved or should the backend be entirely in
# charge of solving such faults? yes, right?
return
time: np.ndarray = array['time']
assert (
array['time'][0]
time[0]
==
next_start_dt.timestamp()
)
diff = last_start_dt - next_start_dt
frame_time_diff_s = diff.seconds
assert time[-1] == next_end_dt.timestamp()
expected_dur: Interval = last_start_dt - next_start_dt
# frame's worth of sample-period-steps, in seconds
frame_size_s: float = len(array) * timeframe
expected_frame_size_s: float = frame_size_s + timeframe
if frame_time_diff_s > expected_frame_size_s:
recv_frame_dur: Duration = (
from_timestamp(array[-1]['time'])
-
from_timestamp(array[0]['time'])
)
if (
(lt_frame := (recv_frame_dur < expected_dur))
or
(null_frame := (frame_size_s == 0))
# ^XXX, should NEVER hit now!
):
# XXX: query result includes a start point prior to our
# expected "frame size" and thus is likely some kind of
# history gap (eg. market closed period, outage, etc.)
# so just report it to console for now.
if lt_frame:
reason = 'Possible GAP (or first-datum)'
else:
assert null_frame
reason = 'NULL-FRAME'
missing_dur: Interval = expected_dur.end - recv_frame_dur.end
log.warning(
'GAP DETECTED:\n'
f'last_start_dt: {last_start_dt}\n'
f'diff: {diff}\n'
f'frame_time_diff_s: {frame_time_diff_s}\n'
f'{timeframe}s-series {reason} detected!\n'
f'fqme: {mkt.fqme}\n'
f'last_start_dt: {last_start_dt}\n\n'
f'recv interval: {recv_frame_dur}\n'
f'expected interval: {expected_dur}\n\n'
f'Missing duration of history of {missing_dur.in_words()!r}\n'
f'{missing_dur}\n'
)
# await tractor.pause()
to_push = diff_history(
array,
@ -565,22 +612,27 @@ async def start_backfill(
# long-term storage.
if (
storage is not None
and write_tsdb
and
write_tsdb
):
log.info(
f'Writing {ln} frame to storage:\n'
f'{next_start_dt} -> {last_start_dt}'
)
# always drop the src asset token for
# NOTE, always drop the src asset token for
# non-currency-pair like market types (for now)
#
# THAT IS, for now our table key schema is NOT
# including the dst[/src] source asset token. SO,
# 'tsla.nasdaq.ib' over 'tsla/usd.nasdaq.ib' for
# historical reasons ONLY.
if mkt.dst.atype not in {
'crypto',
'crypto_currency',
'fiat', # a "forex pair"
'perpetual_future', # stupid "perps" from cex land
}:
# for now, our table key schema is not including
# the dst[/src] source asset token.
col_sym_key: str = mkt.get_fqme(
delim_char='',
without_src=True,
@ -685,7 +737,7 @@ async def back_load_from_tsdb(
last_tsdb_dt
and latest_start_dt
):
backfilled_size_s = (
backfilled_size_s: Duration = (
latest_start_dt - last_tsdb_dt
).seconds
# if the shm buffer len is not large enough to contain
@ -908,6 +960,8 @@ async def tsdb_backfill(
f'{pformat(config)}\n'
)
# concurrently load the provider's most-recent-frame AND any
# pre-existing tsdb history already saved in `piker` storage.
dt_eps: list[DateTime, DateTime] = []
async with trio.open_nursery() as tn:
tn.start_soon(
@ -918,7 +972,6 @@ async def tsdb_backfill(
timeframe,
config,
)
tsdb_entry: tuple = await load_tsdb_hist(
storage,
mkt,
@ -947,6 +1000,25 @@ async def tsdb_backfill(
mr_end_dt,
) = dt_eps
first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds
calced_frame_size: Duration = mk_duration(
seconds=first_frame_dur_s,
)
# NOTE, attempt to use the backend declared default frame
# sizing (as allowed by their time-series query APIs) and
# if not provided try to construct a default from the
# first frame received above.
def_frame_durs: dict[
int,
Duration,
]|None = config.get('frame_types', None)
if def_frame_durs:
def_frame_size: Duration = def_frame_durs[timeframe]
assert def_frame_size == calced_frame_size
else:
# use what we calced from first frame above.
def_frame_size = calced_frame_size
# NOTE: when there's no offline data, there's 2 cases:
# - data backend doesn't support timeframe/sample
# period (in which case `dt_eps` should be `None` and
@ -977,7 +1049,7 @@ async def tsdb_backfill(
partial(
start_backfill,
get_hist=get_hist,
frame_types=config.get('frame_types', None),
def_frame_duration=def_frame_size,
mod=mod,
mkt=mkt,
shm=shm,

View File

@ -616,6 +616,18 @@ def detect_price_gaps(
# ])
...
# TODO: probably just use the null_segs impl above?
def detect_vlm_gaps(
df: pl.DataFrame,
col: str = 'volume',
) -> pl.DataFrame:
vnull: pl.DataFrame = w_dts.filter(
pl.col(col) == 0
)
return vnull
def dedupe(
src_df: pl.DataFrame,
@ -626,7 +638,6 @@ def dedupe(
) -> tuple[
pl.DataFrame, # with dts
pl.DataFrame, # gaps
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
int, # len diff between input and deduped
]:
@ -639,19 +650,22 @@ def dedupe(
'''
wdts: pl.DataFrame = with_dts(src_df)
# maybe sort on any time field
if sort:
wdts = wdts.sort(by='time')
# TODO: detect out-of-order segments which were corrected!
# -[ ] report in log msg
# -[ ] possibly return segment sections which were moved?
deduped = wdts
# remove duplicated datetime samples/sections
deduped: pl.DataFrame = wdts.unique(
subset=['dt'],
# subset=['dt'],
subset=['time'],
maintain_order=True,
)
# maybe sort on any time field
if sort:
deduped = deduped.sort(by='time')
# TODO: detect out-of-order segments which were corrected!
# -[ ] report in log msg
# -[ ] possibly return segment sections which were moved?
diff: int = (
wdts.height
-

View File

@ -21,230 +21,4 @@ Extensions to built-in or (heavily used but 3rd party) friend-lib
types.
'''
from __future__ import annotations
from collections import UserList
from pprint import (
saferepr,
)
from typing import Any
from msgspec import (
msgpack,
Struct as _Struct,
structs,
)
class DiffDump(UserList):
'''
Very simple list delegator that repr() dumps (presumed) tuple
elements of the form `tuple[str, Any, Any]` in a nice
multi-line readable form for analyzing `Struct` diffs.
'''
def __repr__(self) -> str:
if not len(self):
return super().__repr__()
# format by displaying item pair's ``repr()`` on multiple,
# indented lines such that they are more easily visually
# comparable when printed to console when printed to
# console.
repstr: str = '[\n'
for k, left, right in self:
repstr += (
f'({k},\n'
f'\t{repr(left)},\n'
f'\t{repr(right)},\n'
')\n'
)
repstr += ']\n'
return repstr
class Struct(
_Struct,
# https://jcristharif.com/msgspec/structs.html#tagged-unions
# tag='pikerstruct',
# tag=True,
):
'''
A "human friendlier" (aka repl buddy) struct subtype.
'''
def _sin_props(self) -> Iterator[
tuple[
structs.FieldIinfo,
str,
Any,
]
]:
'''
Iterate over all non-@property fields of this struct.
'''
fi: structs.FieldInfo
for fi in structs.fields(self):
key: str = fi.name
val: Any = getattr(self, key)
yield fi, key, val
def to_dict(
self,
include_non_members: bool = True,
) -> dict:
'''
Like it sounds.. direct delegation to:
https://jcristharif.com/msgspec/api.html#msgspec.structs.asdict
BUT, by default we pop all non-member (aka not defined as
struct fields) fields by default.
'''
asdict: dict = structs.asdict(self)
if include_non_members:
return asdict
# only return a dict of the struct members
# which were provided as input, NOT anything
# added as type-defined `@property` methods!
sin_props: dict = {}
fi: structs.FieldInfo
for fi, k, v in self._sin_props():
sin_props[k] = asdict[k]
return sin_props
def pformat(
self,
field_indent: int = 2,
indent: int = 0,
) -> str:
'''
Recursion-safe `pprint.pformat()` style formatting of
a `msgspec.Struct` for sane reading by a human using a REPL.
'''
# global whitespace indent
ws: str = ' '*indent
# field whitespace indent
field_ws: str = ' '*(field_indent + indent)
# qtn: str = ws + self.__class__.__qualname__
qtn: str = self.__class__.__qualname__
obj_str: str = '' # accumulator
fi: structs.FieldInfo
k: str
v: Any
for fi, k, v in self._sin_props():
# TODO: how can we prefer `Literal['option1', 'option2,
# ..]` over .__name__ == `Literal` but still get only the
# latter for simple types like `str | int | None` etc..?
ft: type = fi.type
typ_name: str = getattr(ft, '__name__', str(ft))
# recurse to get sub-struct's `.pformat()` output Bo
if isinstance(v, Struct):
val_str: str = v.pformat(
indent=field_indent + indent,
field_indent=indent + field_indent,
)
else: # the `pprint` recursion-safe format:
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
val_str: str = saferepr(v)
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
return (
f'{qtn}(\n'
f'{obj_str}'
f'{ws})'
)
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty?
# def __repr__(self) -> str:
# ...
# __str__ = __repr__ = pformat
__repr__ = pformat
def copy(
self,
update: dict | None = None,
) -> Struct:
'''
Validate-typecast all self defined fields, return a copy of
us with all such fields.
NOTE: This is kinda like the default behaviour in
`pydantic.BaseModel` except a copy of the object is
returned making it compat with `frozen=True`.
'''
if update:
for k, v in update.items():
setattr(self, k, v)
# NOTE: roundtrip serialize to validate
# - enode to msgpack binary format,
# - decode that back to a struct.
return msgpack.Decoder(type=type(self)).decode(
msgpack.Encoder().encode(self)
)
def typecast(
self,
# TODO: allow only casting a named subset?
# fields: set[str] | None = None,
) -> None:
'''
Cast all fields using their declared type annotations
(kinda like what `pydantic` does by default).
NOTE: this of course won't work on frozen types, use
``.copy()`` above in such cases.
'''
# https://jcristharif.com/msgspec/api.html#msgspec.structs.fields
fi: structs.FieldInfo
for fi in structs.fields(self):
setattr(
self,
fi.name,
fi.type(getattr(self, fi.name)),
)
def __sub__(
self,
other: Struct,
) -> DiffDump[tuple[str, Any, Any]]:
'''
Compare fields/items key-wise and return a ``DiffDump``
for easy visual REPL comparison B)
'''
diffs: DiffDump[tuple[str, Any, Any]] = DiffDump()
for fi in structs.fields(self):
attr_name: str = fi.name
ours: Any = getattr(self, attr_name)
theirs: Any = getattr(other, attr_name)
if ours != theirs:
diffs.append((
attr_name,
ours,
theirs,
))
return diffs
from tractor.msg import Struct as Struct