Compare commits

..

8 Commits

Author SHA1 Message Date
goodboy a00e9c0e64 Merge pull request 'ems_no_last_required: don't require `last` field to boot dark-pool engine' (#38) from ems_no_last_required into main
Submitted-in: https://www.pikers.dev/pikers/piker/pulls/38
2026-01-01 20:15:57 +00:00
goodboy cb694700c2 Merge pull request 'stop_is_oec: expect `trio.EndOfChannel` as graceful stream shutdown' (#52) from stop_is_eoc into main
Submitted-as: https://www.pikers.dev/pikers/piker/pulls/52
2026-01-01 19:57:35 +00:00
Tyler Goodlet 11c931f65d User `piker_pin` branch from gitea `tractor` repo 2026-01-01 14:50:23 -05:00
Tyler Goodlet 60390ae596 Various `.clearing` todos/notes on potential issues with loglevel settings.. 2025-02-21 16:25:22 -05:00
Tyler Goodlet 9592735aaa .clearing._ems: Don't require `first_quote['last']`
Instead just check for the field (which i'm not huge on the key-name for
anyway) and if not found get the "last price" from the real-time shm
buffer's latest 'close' sample.

Unrelatedly, use a `subs.copy()` in the `Router.client_broadcast()` loop
such that if a `client_stream` is popped on connection failure, we don't
RTE for the "size changed on iteration".
2025-02-21 16:25:22 -05:00
Tyler Goodlet 49841f5b91 Catch using `Sampler.bcast_errors` where possible
In all other possible IPC disconnect handling blocks. Also more
comprehensive typing throughout `uniform_rate_send()`.
2025-02-21 16:24:54 -05:00
Tyler Goodlet b2827ef3c3 Group bcast errors as `Sampler.bcast_errors`
A new class var `tuple[Exception]` such that the err set can be reffed
externally as needed for catching other similar pub-sub/IPC failures in
other (related) real-time sub-systems.

Also added some now-masked logging for debugging live-feed stream reading
issues that should ONLY be used for debugging since they'll greatly
degrade HFT perf. Used the new `log.mk_repr()` stuff (that one day we
should prolly pull from `modden` as a dep) for pretty console emissions.
2025-02-21 16:24:54 -05:00
Tyler Goodlet 2fc4ccf011 Suppress `trio.EndOfChannel`s raised by remote peer
Since now `tractor` will raise this native `trio`-exc translated from
a `Stop` msg when the peer gracefully terminates a `tractor.MsgStream`.
Just `info()` log in such cases versus continuing to warn for the
others.
2025-02-21 16:24:54 -05:00
12 changed files with 517 additions and 1021 deletions

View File

@ -51,7 +51,6 @@ __brokers__: list[str] = [
'ib',
'kraken',
'kucoin',
'deribit',
# broken but used to work
# 'questrade',
@ -62,6 +61,7 @@ __brokers__: list[str] = [
# wstrade
# iex
# deribit
# bitso
]

View File

@ -25,7 +25,6 @@ from .api import (
get_client,
)
from .feed import (
get_mkt_info,
open_history_client,
open_symbol_search,
stream_quotes,
@ -35,20 +34,15 @@ from .feed import (
# open_trade_dialog,
# norm_trade_records,
# )
from .venues import (
OptionPair,
)
log = get_logger(__name__)
__all__ = [
'get_client',
# 'trades_dialogue',
'get_mkt_info',
'open_history_client',
'open_symbol_search',
'stream_quotes',
'OptionPair',
# 'norm_trade_records',
]

File diff suppressed because it is too large Load Diff

View File

@ -18,59 +18,38 @@
Deribit backend.
'''
from __future__ import annotations
from contextlib import asynccontextmanager as acm
from datetime import datetime
from typing import (
# Any,
# Optional,
Callable,
)
# from pprint import pformat
from typing import Any, Optional, Callable
import time
import cryptofeed
import trio
from trio_typing import TaskStatus
from pendulum import (
from_timestamp,
)
import pendulum
from rapidfuzz import process as fuzzy
import numpy as np
import tractor
from piker.accounting import (
Asset,
MktPair,
unpack_fqme,
)
from piker.brokers import (
open_cached_client,
NoData,
from piker.brokers import open_cached_client
from piker.log import get_logger, get_console_log
from piker.data import ShmArray
from piker.brokers._util import (
BrokerError,
DataUnavailable,
)
from piker._cacheables import (
async_lifo_cache,
)
from piker.log import (
get_logger,
mk_repr,
)
from piker.data.validate import FeedInit
from cryptofeed import FeedHandler
from cryptofeed.defines import (
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
)
from cryptofeed.symbols import Symbol
from .api import (
Client,
# get_config,
piker_sym_to_cb_sym,
cb_sym_to_deribit_inst,
str_to_cb_sym,
Client, Trade,
get_config,
str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
maybe_open_price_feed
)
from .venues import (
Pair,
OptionPair,
Trade,
)
_spawn_kwargs = {
'infect_asyncio': True,
@ -85,215 +64,90 @@ async def open_history_client(
mkt: MktPair,
) -> tuple[Callable, int]:
fnstrument: str = mkt.bs_fqme
# TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client:
pair: OptionPair = client._pairs[mkt.dst.name]
# XXX NOTE, the cuckers use ms !!!
creation_time_s: int = pair.creation_timestamp/1000
async def get_ohlc(
timeframe: float,
end_dt: datetime | None = None,
start_dt: datetime | None = None,
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
) -> tuple[
np.ndarray,
datetime, # start
datetime, # end
]:
if timeframe != 60:
raise DataUnavailable('Only 1m bars are supported')
array: np.ndarray = await client.bars(
mkt,
array = await client.bars(
instrument,
start_dt=start_dt,
end_dt=end_dt,
)
if len(array) == 0:
if (
end_dt is None
):
raise DataUnavailable(
'No history seems to exist yet?\n\n'
f'{mkt}'
)
elif (
end_dt
and
end_dt.timestamp() < creation_time_s
):
# the contract can't have history
# before it was created.
pair_type_str: str = type(pair).__name__
create_dt: datetime = from_timestamp(creation_time_s)
raise DataUnavailable(
f'No history prior to\n'
f'`{pair_type_str}.creation_timestamp: int = '
f'{pair.creation_timestamp}\n\n'
f'------ deribit sux ------\n'
f'WHICH IN "NORMAL PEOPLE WHO USE EPOCH TIME" form is,\n'
f'creation_time_s: {creation_time_s}\n'
f'create_dt: {create_dt}\n'
)
raise NoData(
f'No frame for {start_dt} -> {end_dt}\n'
)
raise DataUnavailable
start_dt = from_timestamp(array[0]['time'])
end_dt = from_timestamp(array[-1]['time'])
times = array['time']
if not times.any():
raise ValueError(
'Bad frame with null-times?\n\n'
f'{times}'
)
if end_dt is None:
inow: int = round(time.time())
if (inow - times[-1]) > 60:
await tractor.pause()
start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time'])
return array, start_dt, end_dt
yield (
get_ohlc,
{ # backfill config
'erlangs': 3,
'rate': 3,
}
)
@async_lifo_cache()
async def get_mkt_info(
fqme: str,
) -> tuple[MktPair, Pair|OptionPair] | None:
# uppercase since kraken bs_mktid is always upper
if 'deribit' not in fqme.lower():
fqme += '.deribit'
mkt_mode: str = ''
broker, mkt_ep, venue, expiry = unpack_fqme(fqme)
# NOTE: we always upper case all tokens to be consistent with
# binance's symbology style for pairs, like `BTCUSDT`, but in
# theory we could also just keep things lower case; as long as
# we're consistent and the symcache matches whatever this func
# returns, always!
expiry: str = expiry.upper()
venue: str = venue.upper()
# venue_lower: str = venue.lower()
mkt_mode: str = 'option'
async with open_cached_client(
'deribit',
) as client:
assets: dict[str, Asset] = await client.get_assets()
pair_str: str = mkt_ep.lower()
pair: Pair = await client.exch_info(
sym=pair_str,
)
mkt_mode = pair.venue
client.mkt_mode = mkt_mode
dst: Asset | None = assets.get(pair.bs_dst_asset)
src: Asset | None = assets.get(pair.bs_src_asset)
mkt = MktPair(
dst=dst,
src=src,
price_tick=pair.price_tick,
size_tick=pair.size_tick,
bs_mktid=pair.symbol,
venue=mkt_mode,
broker='deribit',
_atype=mkt_mode,
_fqme_without_src=True,
# expiry=pair.expiry,
# XXX TODO, currently we don't use it since it's
# already "described" in the `OptionPair.symbol: str`
# and if we slap in the ISO repr it's kinda hideous..
# -[ ] figure out the best either std
)
return mkt, pair
yield get_ohlc, {'erlangs': 3, 'rate': 3}
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
loglevel: str = None,
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Open a live quote stream for the market set defined by `symbols`.
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
Internally this starts a `cryptofeed.FeedHandler` inside an `asyncio`-side
task and relays through L1 and `Trade` msgs here to our `trio.Task`.
'''
sym = symbols[0].split('.')[0]
init_msgs: list[FeedInit] = []
# multiline nested `dict` formatter (since rn quote-msgs are
# just that).
pfmt: Callable[[str], str] = mk_repr(
# so we can see `deribit`'s delightfully mega-long bs fields..
maxstring=100,
)
sym = symbols[0]
async with (
open_cached_client('deribit') as client,
send_chan as send_chan
):
mkt: MktPair
pair: Pair
mkt, pair = await get_mkt_info(sym)
# build out init msgs according to latest spec
init_msgs.append(
FeedInit(
mkt_info=mkt,
)
)
# build `cryptofeed` feed-handle
cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym)
init_msgs = {
# pass back token, and bool, signalling if we're the writer
# and that history has been written
sym: {
'symbol_info': {
'asset_type': 'option',
'price_tick_size': 0.0005
},
'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
},
}
from_cf: tractor.to_asyncio.LinkedTaskChannel
async with maybe_open_price_feed(sym) as from_cf:
nsym = piker_sym_to_cb_sym(sym)
# load the "last trades" summary
last_trades_res: cryptofeed.LastTradesResult = await client.last_trades(
cb_sym_to_deribit_inst(cf_sym),
count=1,
)
last_trades: list[Trade] = last_trades_res.trades
async with maybe_open_price_feed(sym) as stream:
# TODO, do we even need this or will the above always
# work?
# if not last_trades:
# await tractor.pause()
# async for typ, quote in from_cf:
# if typ == 'trade':
# last_trade = Trade(**(quote['data']))
# break
cache = await client.cache_symbols()
# else:
last_trade = Trade(
**(last_trades[0])
)
last_trades = (await client.last_trades(
cb_sym_to_deribit_inst(nsym), count=1)).trades
first_quote: dict = {
if len(last_trades) == 0:
last_trade = None
async for typ, quote in stream:
if typ == 'trade':
last_trade = Trade(**(quote['data']))
break
else:
last_trade = Trade(**(last_trades[0]))
first_quote = {
'symbol': sym,
'last': last_trade.price,
'brokerd_ts': last_trade.timestamp,
@ -304,84 +158,13 @@ async def stream_quotes(
'broker_ts': last_trade.timestamp
}]
}
task_status.started((
init_msgs,
first_quote,
))
task_status.started((init_msgs, first_quote))
feed_is_live.set()
# NOTE XXX, static for now!
# => since this only handles ONE mkt feed at a time we
# don't need a lookup table to map interleaved quotes
# from multiple possible mkt-pairs
topic: str = mkt.bs_fqme
# deliver until cancelled
async for typ, ref in from_cf:
match typ:
case 'trade':
trade: cryptofeed.types.Trade = ref
# TODO, re-impl this according to teh ideal
# fqme for opts that we choose!!
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(trade.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'last': trade.price,
'broker_ts': time.time(),
# ^TODO, name this `brokerd/datad_ts` and
# use `time.time_ns()` ??
'ticks': [{
'type': 'trade',
'price': float(trade.price),
'size': float(trade.amount),
'broker_ts': trade.timestamp,
}],
}
log.info(
f'deribit {typ!r} quote for {sym!r}\n\n'
f'{trade}\n\n'
f'{pfmt(piker_quote)}\n'
)
case 'l1':
book: cryptofeed.types.L1Book = ref
# TODO, so this is where we can possibly change things
# and instead lever the `MktPair.bs_fqme: str` output?
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(book.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'ticks': [
{'type': 'bid',
'price': float(book.bid_price),
'size': float(book.bid_size)},
{'type': 'bsize',
'price': float(book.bid_price),
'size': float(book.bid_size),},
{'type': 'ask',
'price': float(book.ask_price),
'size': float(book.ask_size),},
{'type': 'asize',
'price': float(book.ask_price),
'size': float(book.ask_size),}
]
}
await send_chan.send({
topic: piker_quote,
})
async for typ, quote in stream:
topic = quote['symbol']
await send_chan.send({topic: quote})
@tractor.context
@ -391,21 +174,12 @@ async def open_symbol_search(
async with open_cached_client('deribit') as client:
# load all symbols locally for fast search
# cache = client._pairs
cache = await client.cache_symbols()
await ctx.started()
async with ctx.open_stream() as stream:
pattern: str
async for pattern in stream:
# NOTE: pattern fuzzy-matching is done within
# the methd impl.
pairs: dict[str, Pair] = await client.search_symbols(
pattern,
)
# repack in fqme-keyed table
byfqme: dict[str, Pair] = {}
for pair in pairs.values():
byfqme[pair.bs_fqme] = pair
await stream.send(byfqme)
# repack in dict form
await stream.send(
await client.search_symbols(pattern))

View File

@ -1,196 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Per market data-type definitions and schemas types.
"""
from __future__ import annotations
import pendulum
from typing import (
Literal,
Optional,
)
from decimal import Decimal
from piker.types import Struct
# API endpoint paths by venue / sub-API
_domain: str = 'deribit.com'
_url = f'https://www.{_domain}'
# WEBsocketz
_ws_url: str = f'wss://www.{_domain}/ws/api/v2'
# test nets
_testnet_ws_url: str = f'wss://test.{_domain}/ws/api/v2'
MarketType = Literal[
'option'
]
def get_api_eps(venue: MarketType) -> tuple[str, str]:
'''
Return API ep root paths per venue.
'''
return {
'option': (
_ws_url,
),
}[venue]
class Pair(Struct, frozen=True, kw_only=True):
symbol: str
# src
quote_currency: str # 'BTC'
# dst
base_currency: str # "BTC",
tick_size: float # 0.0001 # [{'above_price': 0.005, 'tick_size': 0.0005}]
tick_size_steps: list[dict[str, float]]
@property
def price_tick(self) -> Decimal:
return Decimal(str(self.tick_size_steps[0]['above_price']))
@property
def size_tick(self) -> Decimal:
return Decimal(str(self.tick_size))
@property
def bs_fqme(self) -> str:
return f'{self.symbol}'
@property
def bs_mktid(self) -> str:
return f'{self.symbol}.{self.venue}'
class OptionPair(Pair, frozen=True):
taker_commission: float # 0.0003
strike: float # 5000.0
settlement_period: str # 'day'
settlement_currency: str # "BTC",
rfq: bool # false
price_index: str # 'btc_usd'
option_type: str # 'call'
min_trade_amount: float # 0.1
maker_commission: float # 0.0003
kind: str # 'option'
is_active: bool # true
instrument_type: str # 'reversed'
instrument_name: str # 'BTC-1SEP24-55000-C'
instrument_id: int # 364671
expiration_timestamp: int # 1725177600000
creation_timestamp: int # 1724918461000
counter_currency: str # 'USD'
contract_size: float # '1.0'
block_trade_tick_size: float # '0.0001'
block_trade_min_trade_amount: int # '25'
block_trade_commission: float # '0.003'
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.deribit:OptionPair'
# TODO, impl this without the MM:SS part of
# the `'THH:MM:SS..'` etc..
@property
def expiry(self) -> str:
iso_date = pendulum.from_timestamp(
self.expiration_timestamp / 1000
).isoformat()
return iso_date
@property
def venue(self) -> str:
return f'{self.instrument_type}_option'
@property
def bs_fqme(self) -> str:
return f'{self.symbol}'
@property
def bs_src_asset(self) -> str:
return f'{self.quote_currency}'
@property
def bs_dst_asset(self) -> str:
return f'{self.symbol}'
PAIRTYPES: dict[MarketType, Pair] = {
'option': OptionPair,
}
class JSONRPCResult(Struct):
id: int
usIn: int
usOut: int
usDiff: int
testnet: bool
jsonrpc: str = '2.0'
error: Optional[dict] = None
result: Optional[list[dict]] = None
class JSONRPCChannel(Struct):
method: str
params: dict
jsonrpc: str = '2.0'
class KLinesResult(Struct):
low: list[float]
cost: list[float]
high: list[float]
open: list[float]
close: list[float]
ticks: list[int]
status: str
volume: list[float]
class Trade(Struct):
iv: float
price: float
amount: float
trade_id: str
contracts: float
direction: str
trade_seq: int
timestamp: int
mark_price: float
index_price: float
tick_direction: int
instrument_name: str
combo_id: Optional[str] = '',
combo_trade_id: Optional[int] = 0,
block_trade_id: Optional[str] = '',
block_trade_leg_count: Optional[int] = 0,
class LastTradesResult(Struct):
trades: list[Trade]
has_more: bool

View File

@ -168,7 +168,6 @@ class OrderClient(Struct):
async def relay_orders_from_sync_code(
client: OrderClient,
symbol_key: str,
to_ems_stream: tractor.MsgStream,
@ -242,6 +241,11 @@ async def open_ems(
async with maybe_open_emsd(
broker,
# XXX NOTE, LOL so this determines the daemon `emsd` loglevel
# then FYI.. that's kinda wrong no?
# -[ ] shouldn't it be set by `pikerd -l` or no?
# -[ ] would make a lot more sense to have a subsys ctl for
# levels.. like `-l emsd.info` or something?
loglevel=loglevel,
) as portal:

View File

@ -653,7 +653,11 @@ class Router(Struct):
flume = feed.flumes[fqme]
first_quote: dict = flume.first_quote
book: DarkBook = self.get_dark_book(broker)
book.lasts[fqme]: float = float(first_quote['last'])
if not (last := first_quote.get('last')):
last: float = flume.rt_shm.array[-1]['close']
book.lasts[fqme]: float = float(last)
async with self.maybe_open_brokerd_dialog(
brokermod=brokermod,
@ -716,7 +720,7 @@ class Router(Struct):
subs = self.subscribers[sub_key]
sent_some: bool = False
for client_stream in subs:
for client_stream in subs.copy():
try:
await client_stream.send(msg)
sent_some = True
@ -1010,6 +1014,10 @@ async def translate_and_relay_brokerd_events(
status_msg.brokerd_msg = msg
status_msg.src = msg.broker_details['name']
if not status_msg.req:
# likely some order change state?
await tractor.pause()
else:
await router.client_broadcast(
status_msg.req.symbol,
status_msg,

View File

@ -297,6 +297,8 @@ class PaperBoi(Struct):
# transmit pp msg to ems
pp: Position = self.acnt.pps[bs_mktid]
# TODO, this will break if `require_only=True` was passed to
# `.update_from_ledger()`
pp_msg = BrokerdPosition(
broker=self.broker,

View File

@ -30,6 +30,7 @@ subsys: str = 'piker.clearing'
log = get_logger(subsys)
# TODO, oof doesn't this ignore the `loglevel` then???
get_console_log = partial(
get_console_log,
name=subsys,

View File

@ -95,6 +95,12 @@ class Sampler:
# history loading.
incr_task_cs: trio.CancelScope | None = None
bcast_errors: tuple[Exception] = (
trio.BrokenResourceError,
trio.ClosedResourceError,
trio.EndOfChannel,
)
# holds all the ``tractor.Context`` remote subscriptions for
# a particular sample period increment event: all subscribers are
# notified on a step.
@ -258,14 +264,15 @@ class Sampler:
subs: set
last_ts, subs = pair
task = trio.lowlevel.current_task()
log.debug(
f'SUBS {self.subscribers}\n'
f'PAIR {pair}\n'
f'TASK: {task}: {id(task)}\n'
f'broadcasting {period_s} -> {last_ts}\n'
# NOTE, for debugging pub-sub issues
# task = trio.lowlevel.current_task()
# log.debug(
# f'AlL-SUBS@{period_s!r}: {self.subscribers}\n'
# f'PAIR: {pair}\n'
# f'TASK: {task}: {id(task)}\n'
# f'broadcasting {period_s} -> {last_ts}\n'
# f'consumers: {subs}'
)
# )
borked: set[MsgStream] = set()
sent: set[MsgStream] = set()
while True:
@ -282,12 +289,11 @@ class Sampler:
await stream.send(msg)
sent.add(stream)
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
except self.bcast_errors as err:
log.error(
f'{stream._ctx.chan.uid} dropped connection'
f'Connection dropped for IPC ctx\n'
f'{stream._ctx}\n\n'
f'Due to {type(err)}'
)
borked.add(stream)
else:
@ -394,7 +400,8 @@ async def register_with_sampler(
finally:
if (
sub_for_broadcasts
and subs
and
subs
):
try:
subs.remove(stream)
@ -561,8 +568,7 @@ async def open_sample_stream(
async def sample_and_broadcast(
bus: _FeedsBus, # noqa
bus: _FeedsBus,
rt_shm: ShmArray,
hist_shm: ShmArray,
quote_stream: trio.abc.ReceiveChannel,
@ -582,11 +588,33 @@ async def sample_and_broadcast(
overruns = Counter()
# NOTE, only used for debugging live-data-feed issues, though
# this should be resolved more correctly in the future using the
# new typed-msgspec feats of `tractor`!
#
# XXX, a multiline nested `dict` formatter (since rn quote-msgs
# are just that).
# pfmt: Callable[[str], str] = mk_repr()
# iterate stream delivered by broker
async for quotes in quote_stream:
# print(quotes)
# TODO: ``numba`` this!
# XXX WARNING XXX only enable for debugging bc ow can cost
# ALOT of perf with HF-feedz!!!
#
# log.info(
# 'Rx live quotes:\n'
# f'{pfmt(quotes)}'
# )
# TODO,
# -[ ] `numba` or `cython`-nize this loop possibly?
# |_alternatively could we do it in rust somehow by upacking
# arrow msgs instead of using `msgspec`?
# -[ ] use `msgspec.Struct` support in new typed-msging from
# `tractor` to ensure only allowed msgs are transmitted?
#
for broker_symbol, quote in quotes.items():
# TODO: in theory you can send the IPC msg *before* writing
# to the sharedmem array to decrease latency, however, that
@ -659,6 +687,21 @@ async def sample_and_broadcast(
sub_key: str = broker_symbol.lower()
subs: set[Sub] = bus.get_subs(sub_key)
# TODO, figure out how to make this useful whilst
# incoporating feed "pausing" ..
#
# if not subs:
# all_bs_fqmes: list[str] = list(
# bus._subscribers.keys()
# )
# log.warning(
# f'No subscribers for {brokername!r} live-quote ??\n'
# f'broker_symbol: {broker_symbol}\n\n'
# f'Maybe the backend-sys symbol does not match one of,\n'
# f'{pfmt(all_bs_fqmes)}\n'
# )
# NOTE: by default the broker backend doesn't append
# it's own "name" into the fqme schema (but maybe it
# should?) so we have to manually generate the correct
@ -728,18 +771,14 @@ async def sample_and_broadcast(
if lags > 10:
await tractor.pause()
except (
trio.BrokenResourceError,
trio.ClosedResourceError,
trio.EndOfChannel,
):
except Sampler.bcast_errors as ipc_err:
ctx: Context = ipc._ctx
chan: Channel = ctx.chan
if ctx:
log.warning(
'Dropped `brokerd`-quotes-feed connection:\n'
f'{broker_symbol}:'
f'{ctx.cid}@{chan.uid}'
f'Dropped `brokerd`-feed for {broker_symbol!r} due to,\n'
f'x>) {ctx.cid}@{chan.uid}'
f'|_{ipc_err!r}\n\n'
)
if sub.throttle_rate:
assert ipc._closed
@ -756,12 +795,11 @@ async def sample_and_broadcast(
async def uniform_rate_send(
rate: float,
quote_stream: trio.abc.ReceiveChannel,
stream: MsgStream,
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
@ -779,13 +817,16 @@ async def uniform_rate_send(
https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9
'''
# TODO: compute the approx overhead latency per cycle
left_to_sleep = throttle_period = 1/rate - 0.000616
# ?TODO? dynamically compute the **actual** approx overhead latency per cycle
# instead of this magic # bidinezz?
throttle_period: float = 1/rate - 0.000616
left_to_sleep: float = throttle_period
# send cycle state
first_quote: dict|None
first_quote = last_quote = None
last_send = time.time()
diff = 0
last_send: float = time.time()
diff: float = 0
task_status.started()
ticks_by_type: dict[
@ -796,22 +837,28 @@ async def uniform_rate_send(
clear_types = _tick_groups['clears']
while True:
# compute the remaining time to sleep for this throttled cycle
left_to_sleep = throttle_period - diff
left_to_sleep: float = throttle_period - diff
if left_to_sleep > 0:
cs: trio.CancelScope
with trio.move_on_after(left_to_sleep) as cs:
sym: str
last_quote: dict
try:
sym, last_quote = await quote_stream.receive()
except trio.EndOfChannel:
log.exception(f"feed for {stream} ended?")
log.exception(
f'Live stream for feed for ended?\n'
f'<=c\n'
f' |_[{stream!r}\n'
)
break
diff = time.time() - last_send
diff: float = time.time() - last_send
if not first_quote:
first_quote = last_quote
first_quote: float = last_quote
# first_quote['tbt'] = ticks_by_type
if (throttle_period - diff) > 0:
@ -872,7 +919,9 @@ async def uniform_rate_send(
# TODO: now if only we could sync this to the display
# rate timing exactly lul
try:
await stream.send({sym: first_quote})
await stream.send({
sym: first_quote
})
except tractor.RemoteActorError as rme:
if rme.type is not tractor._exceptions.StreamOverrun:
raise
@ -883,19 +932,28 @@ async def uniform_rate_send(
f'{sym}:{ctx.cid}@{chan.uid}'
)
except (
# NOTE: any of these can be raised by ``tractor``'s IPC
# NOTE: any of these can be raised by `tractor`'s IPC
# transport-layer and we want to be highly resilient
# to consumers which crash or lose network connection.
# I.e. we **DO NOT** want to crash and propagate up to
# ``pikerd`` these kinds of errors!
trio.ClosedResourceError,
trio.BrokenResourceError,
except (
ConnectionResetError,
):
) + Sampler.bcast_errors as ipc_err:
match ipc_err:
case trio.EndOfChannel():
log.info(
f'{stream} terminated by peer,\n'
f'{ipc_err!r}'
)
case _:
# if the feed consumer goes down then drop
# out of this rate limiter
log.warning(f'{stream} closed')
log.warning(
f'{stream} closed due to,\n'
f'{ipc_err!r}'
)
await stream.aclose()
return

View File

@ -18,8 +18,8 @@
Log like a forester!
"""
import logging
import reprlib
import json
import reprlib
from typing import (
Callable,
)
@ -90,6 +90,8 @@ def colorize_json(
)
# TODO, eventually defer to the version in `modden` once
# it becomes a dep!
def mk_repr(
**repr_kws,
) -> Callable[[str], str]:

View File

@ -130,4 +130,5 @@ pyqtgraph = { git = "https://github.com/pikers/pyqtgraph.git" }
asyncvnc = { git = "https://github.com/pikers/asyncvnc.git", branch = "main" }
tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" }
msgspec = { git = "https://github.com/jcrist/msgspec.git" }
tractor = { path = "../tractor", editable = true }
tractor = { git = "https://pikers.dev/goodboy/tractor", branch = "piker_pin" }
# tractor = { path = "../tractor", editable = true }