Compare commits

..

No commits in common. "3e5e704c2f0ebc69f825bdb3ed16612cd0ac9504" and "b4d3bcf240fdc285a027d0357e828c0c7aca72bc" have entirely different histories.

3 changed files with 32 additions and 50 deletions

View File

@ -42,6 +42,7 @@ from trio_typing import TaskStatus
from pendulum import ( from pendulum import (
from_timestamp, from_timestamp,
) )
from rapidfuzz import process as fuzzy
import numpy as np import numpy as np
import tractor import tractor
@ -546,7 +547,7 @@ async def open_symbol_search(
) )
# repack in fqme-keyed table # repack in fqme-keyed table
byfqme: dict[str, Pair] = {} byfqme: dict[start, Pair] = {}
for pair in pairs.values(): for pair in pairs.values():
byfqme[pair.bs_fqme] = pair byfqme[pair.bs_fqme] = pair

View File

@ -30,6 +30,7 @@ import time
from typing import ( from typing import (
Any, Any,
AsyncIterator, AsyncIterator,
Callable,
TYPE_CHECKING, TYPE_CHECKING,
) )
@ -100,12 +101,6 @@ class Sampler:
# history loading. # history loading.
incr_task_cs: trio.CancelScope | None = None incr_task_cs: trio.CancelScope | None = None
bcast_errors: tuple[Exception] = (
trio.BrokenResourceError,
trio.ClosedResourceError,
trio.EndOfChannel,
)
# holds all the ``tractor.Context`` remote subscriptions for # holds all the ``tractor.Context`` remote subscriptions for
# a particular sample period increment event: all subscribers are # a particular sample period increment event: all subscribers are
# notified on a step. # notified on a step.
@ -269,15 +264,14 @@ class Sampler:
subs: set subs: set
last_ts, subs = pair last_ts, subs = pair
# NOTE, for debugging pub-sub issues task = trio.lowlevel.current_task()
# task = trio.lowlevel.current_task() log.debug(
# log.debug( f'SUBS {self.subscribers}\n'
# f'AlL-SUBS@{period_s!r}: {self.subscribers}\n' f'PAIR {pair}\n'
# f'PAIR: {pair}\n' f'TASK: {task}: {id(task)}\n'
# f'TASK: {task}: {id(task)}\n' f'broadcasting {period_s} -> {last_ts}\n'
# f'broadcasting {period_s} -> {last_ts}\n' # f'consumers: {subs}'
# f'consumers: {subs}' )
# )
borked: set[MsgStream] = set() borked: set[MsgStream] = set()
sent: set[MsgStream] = set() sent: set[MsgStream] = set()
while True: while True:
@ -294,11 +288,12 @@ class Sampler:
await stream.send(msg) await stream.send(msg)
sent.add(stream) sent.add(stream)
except self.bcast_errors as err: except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
log.error( log.error(
f'Connection dropped for IPC ctx\n' f'{stream._ctx.chan.uid} dropped connection'
f'{stream._ctx}\n\n'
f'Due to {type(err)}'
) )
borked.add(stream) borked.add(stream)
else: else:
@ -584,7 +579,7 @@ async def open_sample_stream(
async def sample_and_broadcast( async def sample_and_broadcast(
bus: _FeedsBus, bus: _FeedsBus, # noqa
rt_shm: ShmArray, rt_shm: ShmArray,
hist_shm: ShmArray, hist_shm: ShmArray,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
@ -604,13 +599,9 @@ async def sample_and_broadcast(
overruns = Counter() overruns = Counter()
# NOTE, only used for debugging live-data-feed issues, though # multiline nested `dict` formatter (since rn quote-msgs are
# this should be resolved more correctly in the future using the # just that).
# new typed-msgspec feats of `tractor`! pfmt: Callable[[str], str] = mk_repr()
#
# XXX, a multiline nested `dict` formatter (since rn quote-msgs
# are just that).
# pfmt: Callable[[str], str] = mk_repr()
# iterate stream delivered by broker # iterate stream delivered by broker
async for quotes in quote_stream: async for quotes in quote_stream:
@ -623,13 +614,7 @@ async def sample_and_broadcast(
# f'{pfmt(quotes)}' # f'{pfmt(quotes)}'
# ) # )
# TODO, # TODO: `numba` this!
# -[ ] `numba` or `cython`-nize this loop possibly?
# |_alternatively could we do it in rust somehow by upacking
# arrow msgs instead of using `msgspec`?
# -[ ] use `msgspec.Struct` support in new typed-msging from
# `tractor` to ensure only allowed msgs are transmitted?
#
for broker_symbol, quote in quotes.items(): for broker_symbol, quote in quotes.items():
# TODO: in theory you can send the IPC msg *before* writing # TODO: in theory you can send the IPC msg *before* writing
# to the sharedmem array to decrease latency, however, that # to the sharedmem array to decrease latency, however, that
@ -702,20 +687,17 @@ async def sample_and_broadcast(
sub_key: str = broker_symbol.lower() sub_key: str = broker_symbol.lower()
subs: set[Sub] = bus.get_subs(sub_key) subs: set[Sub] = bus.get_subs(sub_key)
# TODO, figure out how to make this useful whilst if not subs:
# incoporating feed "pausing" .. all_bs_fqmes: list[str] = list(
# bus._subscribers.keys()
# if not subs: )
# all_bs_fqmes: list[str] = list( log.warning(
# bus._subscribers.keys() f'No subscribers for {brokername!r} live-quote ??\n'
# ) f'broker_symbol: {broker_symbol}\n\n'
# log.warning(
# f'No subscribers for {brokername!r} live-quote ??\n'
# f'broker_symbol: {broker_symbol}\n\n'
# f'Maybe the backend-sys symbol does not match one of,\n' f'Maybe the backend-sys symbol does not match one of,\n'
# f'{pfmt(all_bs_fqmes)}\n' f'{pfmt(all_bs_fqmes)}\n'
# ) )
# NOTE: by default the broker backend doesn't append # NOTE: by default the broker backend doesn't append
# it's own "name" into the fqme schema (but maybe it # it's own "name" into the fqme schema (but maybe it
@ -814,6 +796,7 @@ async def sample_and_broadcast(
async def uniform_rate_send( async def uniform_rate_send(
rate: float, rate: float,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
stream: MsgStream, stream: MsgStream,

View File

@ -90,8 +90,6 @@ def colorize_json(
) )
# TODO, eventually defer to the version in `modden` once
# it becomes a dep!
def mk_repr( def mk_repr(
**repr_kws, **repr_kws,
) -> Callable[[str], str]: ) -> Callable[[str], str]: