Compare commits

...

21 Commits

Author SHA1 Message Date
Tyler Goodlet b61145ec5a Add missing f-str prefix to log line 2026-01-01 15:16:22 -05:00
Tyler Goodlet 624cca091a Port to newer `tractor.get_registry()` 2026-01-01 15:16:22 -05:00
Tyler Goodlet 9045e18386 binance: add new `permissionSets` to base `Pair` 2026-01-01 15:16:22 -05:00
Tyler Goodlet 23ea65e337 Fix type-check assertion in ems test to use `is` 2026-01-01 15:16:22 -05:00
Tyler Goodlet ea2e374101 Update `binance` spot pairs with `amendAllowed`
As per API updates,
https://developers.binance.com/docs/binance-spot-api-docs
https://developers.binance.com/docs/binance-spot-api-docs/faqs/order_amend_keep_priority

I also slightly tweaked the filed mismatch exception note to include the
`repr(pair_type)` so the dev can know which pair types should be
changed.
2026-01-01 15:16:22 -05:00
Tyler Goodlet f64fcc69ed Update legacy type to `tractor.MsgStream` 2026-01-01 15:16:22 -05:00
Tyler Goodlet f3a20ed77f TOSQUASH: 84ad34f51, one more `float` cast for paperboi.. 2026-01-01 15:16:22 -05:00
Tyler Goodlet 95cdaf8114 TOSQUASH: 84ad34f51, lingering `float` casts.. 2026-01-01 15:16:22 -05:00
Tyler Goodlet 39dcaf528a Drop variable regex from `ruff.toml`
Same as in other projects, seems to be not parsing and causing `ruff` to
crash?!?
2026-01-01 15:16:22 -05:00
Tyler Goodlet 3f663e0e73 `.kraken`: add masked pauses for order req debug
Such that the next time i inevitably must debug the some order-request
error status or precision discrepancy, i have the mkt-symbol branch
ready to go. Also, switch to `'action': 'buy'|'sell' as action,` style
`case` matching instead of the post-`if` predicate style.
2026-01-01 15:16:22 -05:00
Tyler Goodlet de542c90fb Cast to `float` as needed from order-mode and ems
Since we're not quite yet using automatic typed msging from
`tractor`/`msgspec` (i.e. still manually decoding order ctl msgs from
built-in types..`dict`s still not `msgspec.Struct`) this adds the
appropriate typecasting ops to ensure the required precision is attained
prior to processing and/or submission to a brokerd backend service.

For the `.clearing._ems`,
- flip all `trigger_price` previously presumed to be `float` to just
  the field-identical `price: Decimal` and ensure we cast to `float`
  for any `trigger_price` usage, like before passing to `mk_check()`.

For `.ui.order_mode.OrderMode`,
- add a new `.curr_mkt: MktPair` convenience property to get the
  chart-active value.
- ensure we always use the `.curr_mkt.quantize() -> Decimal` before
  setting any IPC-msg's `.price` field!
- always cast `float(Order.price)` before use in setting line-levels.
- don't bother setting `Order.symbol` to a (now fully removed) `Symbol`
  instance since it's not really required-for-use anywhere; leaving it
  a `str` (per the type-annot) is fine for now?
2026-01-01 15:16:22 -05:00
Tyler Goodlet 41559e6729 Finally drop `Symbol`
It was replaced by `MktPair` long ago in,
https://github.com/pikers/piker/pull/489

with follow up for final removal in,
https://github.com/pikers/piker/issues/517

Resolves #517
2026-01-01 15:16:22 -05:00
Tyler Goodlet 93e22e27b9 Mk `Brokerd[Order].price` avoid `float`-errs
By re-typing to a `.price: Decimal` field on both legs of the EMS.

It seems we must do it ourselves since,
- these msg's (fields) are relayed through the clearing engine to each
  `brokerd` backend and,
- bc many (if not all) of those backends `.broker`-clients (nor their
  encapsulated "brokerage services") **are not** doing any
  precision-truncation themselves.

So, for now, instead we opt to expect rounding at the source. This means
we will explicitly require casting to/from `float` at the line-graphics
interface to the order-clearing-engine (as implemented throughout
`.ui.order_mode.OrderMode`); and this is coming shortly.
2026-01-01 15:16:22 -05:00
Gud Boi a00e9c0e64 Merge pull request 'ems_no_last_required: don't require `last` field to boot dark-pool engine' (#38) from ems_no_last_required into main
Submitted-in: https://www.pikers.dev/pikers/piker/pulls/38
2026-01-01 20:15:57 +00:00
Gud Boi cb694700c2 Merge pull request 'stop_is_oec: expect `trio.EndOfChannel` as graceful stream shutdown' (#52) from stop_is_eoc into main
Submitted-as: https://www.pikers.dev/pikers/piker/pulls/52
2026-01-01 19:57:35 +00:00
Tyler Goodlet 11c931f65d User `piker_pin` branch from gitea `tractor` repo 2026-01-01 14:50:23 -05:00
Tyler Goodlet 60390ae596 Various `.clearing` todos/notes on potential issues with loglevel settings.. 2025-02-21 16:25:22 -05:00
Tyler Goodlet 9592735aaa .clearing._ems: Don't require `first_quote['last']`
Instead just check for the field (which i'm not huge on the key-name for
anyway) and if not found get the "last price" from the real-time shm
buffer's latest 'close' sample.

Unrelatedly, use a `subs.copy()` in the `Router.client_broadcast()` loop
such that if a `client_stream` is popped on connection failure, we don't
RTE for the "size changed on iteration".
2025-02-21 16:25:22 -05:00
Tyler Goodlet 49841f5b91 Catch using `Sampler.bcast_errors` where possible
In all other possible IPC disconnect handling blocks. Also more
comprehensive typing throughout `uniform_rate_send()`.
2025-02-21 16:24:54 -05:00
Tyler Goodlet b2827ef3c3 Group bcast errors as `Sampler.bcast_errors`
A new class var `tuple[Exception]` such that the err set can be reffed
externally as needed for catching other similar pub-sub/IPC failures in
other (related) real-time sub-systems.

Also added some now-masked logging for debugging live-feed stream reading
issues that should ONLY be used for debugging since they'll greatly
degrade HFT perf. Used the new `log.mk_repr()` stuff (that one day we
should prolly pull from `modden` as a dep) for pretty console emissions.
2025-02-21 16:24:54 -05:00
Tyler Goodlet 2fc4ccf011 Suppress `trio.EndOfChannel`s raised by remote peer
Since now `tractor` will raise this native `trio`-exc translated from
a `Stop` msg when the peer gracefully terminates a `tractor.MsgStream`.
Just `info()` log in such cases versus continuing to warn for the
others.
2025-02-21 16:24:54 -05:00
17 changed files with 291 additions and 209 deletions

View File

@ -42,7 +42,6 @@ from ._mktinfo import (
dec_digits, dec_digits,
digits_to_dec, digits_to_dec,
MktPair, MktPair,
Symbol,
unpack_fqme, unpack_fqme,
_derivs as DerivTypes, _derivs as DerivTypes,
) )
@ -60,7 +59,6 @@ __all__ = [
'Asset', 'Asset',
'MktPair', 'MktPair',
'Position', 'Position',
'Symbol',
'Transaction', 'Transaction',
'TransactionLedger', 'TransactionLedger',
'dec_digits', 'dec_digits',

View File

@ -677,90 +677,3 @@ def unpack_fqme(
# '.'.join([mkt_ep, venue]), # '.'.join([mkt_ep, venue]),
suffix, suffix,
) )
class Symbol(Struct):
'''
I guess this is some kinda container thing for dealing with
all the different meta-data formats from brokers?
'''
key: str
broker: str = ''
venue: str = ''
# precision descriptors for price and vlm
tick_size: Decimal = Decimal('0.01')
lot_tick_size: Decimal = Decimal('0.0')
suffix: str = ''
broker_info: dict[str, dict[str, Any]] = {}
@classmethod
def from_fqme(
cls,
fqsn: str,
info: dict[str, Any],
) -> Symbol:
broker, mktep, venue, suffix = unpack_fqme(fqsn)
tick_size = info.get('price_tick_size', 0.01)
lot_size = info.get('lot_tick_size', 0.0)
return Symbol(
broker=broker,
key=mktep,
tick_size=tick_size,
lot_tick_size=lot_size,
venue=venue,
suffix=suffix,
broker_info={broker: info},
)
@property
def type_key(self) -> str:
return list(self.broker_info.values())[0]['asset_type']
@property
def tick_size_digits(self) -> int:
return float_digits(self.tick_size)
@property
def lot_size_digits(self) -> int:
return float_digits(self.lot_tick_size)
@property
def price_tick(self) -> Decimal:
return Decimal(str(self.tick_size))
@property
def size_tick(self) -> Decimal:
return Decimal(str(self.lot_tick_size))
@property
def broker(self) -> str:
return list(self.broker_info.keys())[0]
@property
def fqme(self) -> str:
return maybe_cons_tokens([
self.key, # final "pair name" (eg. qqq[/usd], btcusdt)
self.venue,
self.suffix, # includes expiry and other con info
self.broker,
])
def quantize(
self,
size: float,
) -> Decimal:
digits = float_digits(self.lot_tick_size)
return Decimal(size).quantize(
Decimal(f'1.{"0".ljust(digits, "0")}'),
rounding=ROUND_HALF_EVEN
)
# NOTE: when cast to `str` return fqme
def __str__(self) -> str:
return self.fqme

View File

@ -374,9 +374,14 @@ class Client:
pair: Pair = pair_type(**item) pair: Pair = pair_type(**item)
except Exception as e: except Exception as e:
e.add_note( e.add_note(
"\nDon't panic, prolly stupid binance changed their symbology schema again..\n" f'\n'
'Check out their API docs here:\n\n' f'New or removed field we need to codify!\n'
'https://binance-docs.github.io/apidocs/spot/en/#exchange-information' f'pair-type: {pair_type!r}\n'
f'\n'
f"Don't panic, prolly stupid binance changed their symbology schema again..\n"
f'Check out their API docs here:\n'
f'\n'
f'https://binance-docs.github.io/apidocs/spot/en/#exchange-information\n'
) )
raise raise
pair_table[pair.symbol.upper()] = pair pair_table[pair.symbol.upper()] = pair

View File

@ -97,6 +97,8 @@ class Pair(Struct, frozen=True, kw_only=True):
baseAsset: str baseAsset: str
baseAssetPrecision: int baseAssetPrecision: int
permissionSets: list[list[str]]
filters: dict[ filters: dict[
str, str,
str | int | float, str | int | float,
@ -142,7 +144,11 @@ class SpotPair(Pair, frozen=True):
defaultSelfTradePreventionMode: str defaultSelfTradePreventionMode: str
allowedSelfTradePreventionModes: list[str] allowedSelfTradePreventionModes: list[str]
permissions: list[str] permissions: list[str]
permissionSets: list[list[str]]
# can the paint botz creat liq gaps even easier on this asset?
# Bp
# https://developers.binance.com/docs/binance-spot-api-docs/faqs/order_amend_keep_priority
amendAllowed: bool
# NOTE: see `.data._symcache.SymbologyCache.load()` for why # NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.binance:SpotPair' ns_path: str = 'piker.brokers.binance:SpotPair'

View File

@ -175,9 +175,8 @@ async def handle_order_requests(
case { case {
'account': 'kraken.spot' as account, 'account': 'kraken.spot' as account,
'action': action, 'action': 'buy'|'sell',
} if action in {'buy', 'sell'}: }:
# validate # validate
order = BrokerdOrder(**msg) order = BrokerdOrder(**msg)
@ -262,6 +261,12 @@ async def handle_order_requests(
} | extra } | extra
log.info(f'Submitting WS order request:\n{pformat(req)}') log.info(f'Submitting WS order request:\n{pformat(req)}')
# NOTE HOWTO, debug order requests
#
# if 'XRP' in pair:
# await tractor.pause()
await ws.send_msg(req) await ws.send_msg(req)
# placehold for sanity checking in relay loop # placehold for sanity checking in relay loop
@ -1085,6 +1090,8 @@ async def handle_order_updates(
f'Failed to {action} order {reqid}:\n' f'Failed to {action} order {reqid}:\n'
f'{errmsg}' f'{errmsg}'
) )
# if tractor._state.debug_mode():
# await tractor.pause()
symbol: str = 'N/A' symbol: str = 'N/A'
if chain := apiflows.get(reqid): if chain := apiflows.get(reqid):

View File

@ -168,7 +168,6 @@ class OrderClient(Struct):
async def relay_orders_from_sync_code( async def relay_orders_from_sync_code(
client: OrderClient, client: OrderClient,
symbol_key: str, symbol_key: str,
to_ems_stream: tractor.MsgStream, to_ems_stream: tractor.MsgStream,
@ -242,6 +241,11 @@ async def open_ems(
async with maybe_open_emsd( async with maybe_open_emsd(
broker, broker,
# XXX NOTE, LOL so this determines the daemon `emsd` loglevel
# then FYI.. that's kinda wrong no?
# -[ ] shouldn't it be set by `pikerd -l` or no?
# -[ ] would make a lot more sense to have a subsys ctl for
# levels.. like `-l emsd.info` or something?
loglevel=loglevel, loglevel=loglevel,
) as portal: ) as portal:

View File

@ -76,7 +76,6 @@ if TYPE_CHECKING:
# TODO: numba all of this # TODO: numba all of this
def mk_check( def mk_check(
trigger_price: float, trigger_price: float,
known_last: float, known_last: float,
action: str, action: str,
@ -162,7 +161,7 @@ async def clear_dark_triggers(
router: Router, router: Router,
brokerd_orders_stream: tractor.MsgStream, brokerd_orders_stream: tractor.MsgStream,
quote_stream: tractor.ReceiveMsgStream, # noqa quote_stream: tractor.MsgStream,
broker: str, broker: str,
fqme: str, fqme: str,
@ -178,6 +177,7 @@ async def clear_dark_triggers(
''' '''
# XXX: optimize this for speed! # XXX: optimize this for speed!
# TODO: # TODO:
# - port to the new ringbuf stuff in `tractor.ipc`!
# - numba all this! # - numba all this!
# - this stream may eventually contain multiple symbols # - this stream may eventually contain multiple symbols
quote_stream._raise_on_lag = False quote_stream._raise_on_lag = False
@ -653,7 +653,11 @@ class Router(Struct):
flume = feed.flumes[fqme] flume = feed.flumes[fqme]
first_quote: dict = flume.first_quote first_quote: dict = flume.first_quote
book: DarkBook = self.get_dark_book(broker) book: DarkBook = self.get_dark_book(broker)
book.lasts[fqme]: float = float(first_quote['last'])
if not (last := first_quote.get('last')):
last: float = flume.rt_shm.array[-1]['close']
book.lasts[fqme]: float = float(last)
async with self.maybe_open_brokerd_dialog( async with self.maybe_open_brokerd_dialog(
brokermod=brokermod, brokermod=brokermod,
@ -716,7 +720,7 @@ class Router(Struct):
subs = self.subscribers[sub_key] subs = self.subscribers[sub_key]
sent_some: bool = False sent_some: bool = False
for client_stream in subs: for client_stream in subs.copy():
try: try:
await client_stream.send(msg) await client_stream.send(msg)
sent_some = True sent_some = True
@ -1010,6 +1014,10 @@ async def translate_and_relay_brokerd_events(
status_msg.brokerd_msg = msg status_msg.brokerd_msg = msg
status_msg.src = msg.broker_details['name'] status_msg.src = msg.broker_details['name']
if not status_msg.req:
# likely some order change state?
await tractor.pause()
else:
await router.client_broadcast( await router.client_broadcast(
status_msg.req.symbol, status_msg.req.symbol,
status_msg, status_msg,
@ -1182,12 +1190,16 @@ async def process_client_order_cmds(
submitting live orders immediately if requested by the client. submitting live orders immediately if requested by the client.
''' '''
# cmd: dict # TODO, only allow `msgspec.Struct` form!
cmd: dict
async for cmd in client_order_stream: async for cmd in client_order_stream:
log.info(f'Received order cmd:\n{pformat(cmd)}') log.info(
f'Received order cmd:\n'
f'{pformat(cmd)}\n'
)
# CAWT DAMN we need struct support! # CAWT DAMN we need struct support!
oid = str(cmd['oid']) oid: str = str(cmd['oid'])
# register this stream as an active order dialog (msg flow) for # register this stream as an active order dialog (msg flow) for
# this order id such that translated message from the brokerd # this order id such that translated message from the brokerd
@ -1293,7 +1305,7 @@ async def process_client_order_cmds(
case { case {
'oid': oid, 'oid': oid,
'symbol': fqme, 'symbol': fqme,
'price': trigger_price, 'price': price,
'size': size, 'size': size,
'action': ('buy' | 'sell') as action, 'action': ('buy' | 'sell') as action,
'exec_mode': ('live' | 'paper'), 'exec_mode': ('live' | 'paper'),
@ -1325,7 +1337,7 @@ async def process_client_order_cmds(
symbol=sym, symbol=sym,
action=action, action=action,
price=trigger_price, price=price,
size=size, size=size,
account=req.account, account=req.account,
) )
@ -1347,7 +1359,11 @@ async def process_client_order_cmds(
# (``translate_and_relay_brokerd_events()`` above) will # (``translate_and_relay_brokerd_events()`` above) will
# handle relaying the ems side responses back to # handle relaying the ems side responses back to
# the client/cmd sender from this request # the client/cmd sender from this request
log.info(f'Sending live order to {broker}:\n{pformat(msg)}') log.info(
f'Sending live order to {broker}:\n'
f'{pformat(msg)}'
)
await brokerd_order_stream.send(msg) await brokerd_order_stream.send(msg)
# an immediate response should be ``BrokerdOrderAck`` # an immediate response should be ``BrokerdOrderAck``
@ -1363,7 +1379,7 @@ async def process_client_order_cmds(
case { case {
'oid': oid, 'oid': oid,
'symbol': fqme, 'symbol': fqme,
'price': trigger_price, 'price': price,
'size': size, 'size': size,
'exec_mode': exec_mode, 'exec_mode': exec_mode,
'action': action, 'action': action,
@ -1391,7 +1407,12 @@ async def process_client_order_cmds(
if isnan(last): if isnan(last):
last = flume.rt_shm.array[-1]['close'] last = flume.rt_shm.array[-1]['close']
pred = mk_check(trigger_price, last, action) trigger_price: float = float(price)
pred = mk_check(
trigger_price,
last,
action,
)
# NOTE: for dark orders currently we submit # NOTE: for dark orders currently we submit
# the triggered live order at a price 5 ticks # the triggered live order at a price 5 ticks

View File

@ -19,6 +19,7 @@ Clearing sub-system message and protocols.
""" """
from __future__ import annotations from __future__ import annotations
from decimal import Decimal
from typing import ( from typing import (
Literal, Literal,
) )
@ -71,7 +72,15 @@ class Order(Struct):
symbol: str # | MktPair symbol: str # | MktPair
account: str # should we set a default as '' ? account: str # should we set a default as '' ?
price: float # https://docs.python.org/3/library/decimal.html#decimal-objects
#
# ?TODO? decimal usage throughout?
# -[ ] possibly leverage the `Encoder(decimal_format='number')`
# bit?
# |_https://jcristharif.com/msgspec/supported-types.html#decimal
# -[ ] should we also use it for .size?
#
price: Decimal
size: float # -ve is "sell", +ve is "buy" size: float # -ve is "sell", +ve is "buy"
brokers: list[str] = [] brokers: list[str] = []
@ -178,7 +187,7 @@ class BrokerdOrder(Struct):
time_ns: int time_ns: int
symbol: str # fqme symbol: str # fqme
price: float price: Decimal
size: float size: float
# TODO: if we instead rely on a +ve/-ve size to determine # TODO: if we instead rely on a +ve/-ve size to determine

View File

@ -297,6 +297,8 @@ class PaperBoi(Struct):
# transmit pp msg to ems # transmit pp msg to ems
pp: Position = self.acnt.pps[bs_mktid] pp: Position = self.acnt.pps[bs_mktid]
# TODO, this will break if `require_only=True` was passed to
# `.update_from_ledger()`
pp_msg = BrokerdPosition( pp_msg = BrokerdPosition(
broker=self.broker, broker=self.broker,
@ -508,7 +510,7 @@ async def handle_order_requests(
reqid = await client.submit_limit( reqid = await client.submit_limit(
oid=order.oid, oid=order.oid,
symbol=f'{order.symbol}.{client.broker}', symbol=f'{order.symbol}.{client.broker}',
price=order.price, price=float(order.price),
action=order.action, action=order.action,
size=order.size, size=order.size,
# XXX: by default 0 tells ``ib_insync`` methods that # XXX: by default 0 tells ``ib_insync`` methods that

View File

@ -30,6 +30,7 @@ subsys: str = 'piker.clearing'
log = get_logger(subsys) log = get_logger(subsys)
# TODO, oof doesn't this ignore the `loglevel` then???
get_console_log = partial( get_console_log = partial(
get_console_log, get_console_log,
name=subsys, name=subsys,

View File

@ -335,7 +335,7 @@ def services(config, tl, ports):
name='service_query', name='service_query',
loglevel=config['loglevel'] if tl else None, loglevel=config['loglevel'] if tl else None,
), ),
tractor.get_arbiter( tractor.get_registry(
host=host, host=host,
port=ports[0] port=ports[0]
) as portal ) as portal

View File

@ -95,6 +95,12 @@ class Sampler:
# history loading. # history loading.
incr_task_cs: trio.CancelScope | None = None incr_task_cs: trio.CancelScope | None = None
bcast_errors: tuple[Exception] = (
trio.BrokenResourceError,
trio.ClosedResourceError,
trio.EndOfChannel,
)
# holds all the ``tractor.Context`` remote subscriptions for # holds all the ``tractor.Context`` remote subscriptions for
# a particular sample period increment event: all subscribers are # a particular sample period increment event: all subscribers are
# notified on a step. # notified on a step.
@ -258,14 +264,15 @@ class Sampler:
subs: set subs: set
last_ts, subs = pair last_ts, subs = pair
task = trio.lowlevel.current_task() # NOTE, for debugging pub-sub issues
log.debug( # task = trio.lowlevel.current_task()
f'SUBS {self.subscribers}\n' # log.debug(
f'PAIR {pair}\n' # f'AlL-SUBS@{period_s!r}: {self.subscribers}\n'
f'TASK: {task}: {id(task)}\n' # f'PAIR: {pair}\n'
f'broadcasting {period_s} -> {last_ts}\n' # f'TASK: {task}: {id(task)}\n'
# f'broadcasting {period_s} -> {last_ts}\n'
# f'consumers: {subs}' # f'consumers: {subs}'
) # )
borked: set[MsgStream] = set() borked: set[MsgStream] = set()
sent: set[MsgStream] = set() sent: set[MsgStream] = set()
while True: while True:
@ -282,12 +289,11 @@ class Sampler:
await stream.send(msg) await stream.send(msg)
sent.add(stream) sent.add(stream)
except ( except self.bcast_errors as err:
trio.BrokenResourceError,
trio.ClosedResourceError
):
log.error( log.error(
f'{stream._ctx.chan.uid} dropped connection' f'Connection dropped for IPC ctx\n'
f'{stream._ctx}\n\n'
f'Due to {type(err)}'
) )
borked.add(stream) borked.add(stream)
else: else:
@ -394,7 +400,8 @@ async def register_with_sampler(
finally: finally:
if ( if (
sub_for_broadcasts sub_for_broadcasts
and subs and
subs
): ):
try: try:
subs.remove(stream) subs.remove(stream)
@ -561,8 +568,7 @@ async def open_sample_stream(
async def sample_and_broadcast( async def sample_and_broadcast(
bus: _FeedsBus,
bus: _FeedsBus, # noqa
rt_shm: ShmArray, rt_shm: ShmArray,
hist_shm: ShmArray, hist_shm: ShmArray,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
@ -582,11 +588,33 @@ async def sample_and_broadcast(
overruns = Counter() overruns = Counter()
# NOTE, only used for debugging live-data-feed issues, though
# this should be resolved more correctly in the future using the
# new typed-msgspec feats of `tractor`!
#
# XXX, a multiline nested `dict` formatter (since rn quote-msgs
# are just that).
# pfmt: Callable[[str], str] = mk_repr()
# iterate stream delivered by broker # iterate stream delivered by broker
async for quotes in quote_stream: async for quotes in quote_stream:
# print(quotes) # print(quotes)
# TODO: ``numba`` this! # XXX WARNING XXX only enable for debugging bc ow can cost
# ALOT of perf with HF-feedz!!!
#
# log.info(
# 'Rx live quotes:\n'
# f'{pfmt(quotes)}'
# )
# TODO,
# -[ ] `numba` or `cython`-nize this loop possibly?
# |_alternatively could we do it in rust somehow by upacking
# arrow msgs instead of using `msgspec`?
# -[ ] use `msgspec.Struct` support in new typed-msging from
# `tractor` to ensure only allowed msgs are transmitted?
#
for broker_symbol, quote in quotes.items(): for broker_symbol, quote in quotes.items():
# TODO: in theory you can send the IPC msg *before* writing # TODO: in theory you can send the IPC msg *before* writing
# to the sharedmem array to decrease latency, however, that # to the sharedmem array to decrease latency, however, that
@ -659,6 +687,21 @@ async def sample_and_broadcast(
sub_key: str = broker_symbol.lower() sub_key: str = broker_symbol.lower()
subs: set[Sub] = bus.get_subs(sub_key) subs: set[Sub] = bus.get_subs(sub_key)
# TODO, figure out how to make this useful whilst
# incoporating feed "pausing" ..
#
# if not subs:
# all_bs_fqmes: list[str] = list(
# bus._subscribers.keys()
# )
# log.warning(
# f'No subscribers for {brokername!r} live-quote ??\n'
# f'broker_symbol: {broker_symbol}\n\n'
# f'Maybe the backend-sys symbol does not match one of,\n'
# f'{pfmt(all_bs_fqmes)}\n'
# )
# NOTE: by default the broker backend doesn't append # NOTE: by default the broker backend doesn't append
# it's own "name" into the fqme schema (but maybe it # it's own "name" into the fqme schema (but maybe it
# should?) so we have to manually generate the correct # should?) so we have to manually generate the correct
@ -697,7 +740,7 @@ async def sample_and_broadcast(
log.warning( log.warning(
f'Feed OVERRUN {sub_key}' f'Feed OVERRUN {sub_key}'
'@{bus.brokername} -> \n' f'@{bus.brokername} -> \n'
f'feed @ {chan.uid}\n' f'feed @ {chan.uid}\n'
f'throttle = {throttle} Hz' f'throttle = {throttle} Hz'
) )
@ -728,18 +771,14 @@ async def sample_and_broadcast(
if lags > 10: if lags > 10:
await tractor.pause() await tractor.pause()
except ( except Sampler.bcast_errors as ipc_err:
trio.BrokenResourceError,
trio.ClosedResourceError,
trio.EndOfChannel,
):
ctx: Context = ipc._ctx ctx: Context = ipc._ctx
chan: Channel = ctx.chan chan: Channel = ctx.chan
if ctx: if ctx:
log.warning( log.warning(
'Dropped `brokerd`-quotes-feed connection:\n' f'Dropped `brokerd`-feed for {broker_symbol!r} due to,\n'
f'{broker_symbol}:' f'x>) {ctx.cid}@{chan.uid}'
f'{ctx.cid}@{chan.uid}' f'|_{ipc_err!r}\n\n'
) )
if sub.throttle_rate: if sub.throttle_rate:
assert ipc._closed assert ipc._closed
@ -756,12 +795,11 @@ async def sample_and_broadcast(
async def uniform_rate_send( async def uniform_rate_send(
rate: float, rate: float,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
stream: MsgStream, stream: MsgStream,
task_status: TaskStatus = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
''' '''
@ -779,13 +817,16 @@ async def uniform_rate_send(
https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9 https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9
''' '''
# TODO: compute the approx overhead latency per cycle # ?TODO? dynamically compute the **actual** approx overhead latency per cycle
left_to_sleep = throttle_period = 1/rate - 0.000616 # instead of this magic # bidinezz?
throttle_period: float = 1/rate - 0.000616
left_to_sleep: float = throttle_period
# send cycle state # send cycle state
first_quote: dict|None
first_quote = last_quote = None first_quote = last_quote = None
last_send = time.time() last_send: float = time.time()
diff = 0 diff: float = 0
task_status.started() task_status.started()
ticks_by_type: dict[ ticks_by_type: dict[
@ -796,22 +837,28 @@ async def uniform_rate_send(
clear_types = _tick_groups['clears'] clear_types = _tick_groups['clears']
while True: while True:
# compute the remaining time to sleep for this throttled cycle # compute the remaining time to sleep for this throttled cycle
left_to_sleep = throttle_period - diff left_to_sleep: float = throttle_period - diff
if left_to_sleep > 0: if left_to_sleep > 0:
cs: trio.CancelScope
with trio.move_on_after(left_to_sleep) as cs: with trio.move_on_after(left_to_sleep) as cs:
sym: str
last_quote: dict
try: try:
sym, last_quote = await quote_stream.receive() sym, last_quote = await quote_stream.receive()
except trio.EndOfChannel: except trio.EndOfChannel:
log.exception(f"feed for {stream} ended?") log.exception(
f'Live stream for feed for ended?\n'
f'<=c\n'
f' |_[{stream!r}\n'
)
break break
diff = time.time() - last_send diff: float = time.time() - last_send
if not first_quote: if not first_quote:
first_quote = last_quote first_quote: float = last_quote
# first_quote['tbt'] = ticks_by_type # first_quote['tbt'] = ticks_by_type
if (throttle_period - diff) > 0: if (throttle_period - diff) > 0:
@ -872,7 +919,9 @@ async def uniform_rate_send(
# TODO: now if only we could sync this to the display # TODO: now if only we could sync this to the display
# rate timing exactly lul # rate timing exactly lul
try: try:
await stream.send({sym: first_quote}) await stream.send({
sym: first_quote
})
except tractor.RemoteActorError as rme: except tractor.RemoteActorError as rme:
if rme.type is not tractor._exceptions.StreamOverrun: if rme.type is not tractor._exceptions.StreamOverrun:
raise raise
@ -883,19 +932,28 @@ async def uniform_rate_send(
f'{sym}:{ctx.cid}@{chan.uid}' f'{sym}:{ctx.cid}@{chan.uid}'
) )
except ( # NOTE: any of these can be raised by `tractor`'s IPC
# NOTE: any of these can be raised by ``tractor``'s IPC
# transport-layer and we want to be highly resilient # transport-layer and we want to be highly resilient
# to consumers which crash or lose network connection. # to consumers which crash or lose network connection.
# I.e. we **DO NOT** want to crash and propagate up to # I.e. we **DO NOT** want to crash and propagate up to
# ``pikerd`` these kinds of errors! # ``pikerd`` these kinds of errors!
trio.ClosedResourceError, except (
trio.BrokenResourceError,
ConnectionResetError, ConnectionResetError,
): ) + Sampler.bcast_errors as ipc_err:
match ipc_err:
case trio.EndOfChannel():
log.info(
f'{stream} terminated by peer,\n'
f'{ipc_err!r}'
)
case _:
# if the feed consumer goes down then drop # if the feed consumer goes down then drop
# out of this rate limiter # out of this rate limiter
log.warning(f'{stream} closed') log.warning(
f'{stream} closed due to,\n'
f'{ipc_err!r}'
)
await stream.aclose() await stream.aclose()
return return

View File

@ -19,6 +19,10 @@ Log like a forester!
""" """
import logging import logging
import json import json
import reprlib
from typing import (
Callable,
)
import tractor import tractor
from pygments import ( from pygments import (
@ -84,3 +88,29 @@ def colorize_json(
# likeable styles: algol_nu, tango, monokai # likeable styles: algol_nu, tango, monokai
formatters.TerminalTrueColorFormatter(style=style) formatters.TerminalTrueColorFormatter(style=style)
) )
# TODO, eventually defer to the version in `modden` once
# it becomes a dep!
def mk_repr(
**repr_kws,
) -> Callable[[str], str]:
'''
Allocate and deliver a `repr.Repr` instance with provided input
settings using the std-lib's `reprlib` mod,
* https://docs.python.org/3/library/reprlib.html
------ Ex. ------
An up to 6-layer-nested `dict` as multi-line:
- https://stackoverflow.com/a/79102479
- https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
'''
def_kws: dict[str, int] = dict(
indent=2,
maxlevel=6, # recursion levels
maxstring=66, # match editor line-len limit
)
def_kws |= repr_kws
reprr = reprlib.Repr(**def_kws)
return reprr.repr

View File

@ -21,6 +21,7 @@ Chart trading, the only way to scalp.
from __future__ import annotations from __future__ import annotations
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass, field from dataclasses import dataclass, field
from decimal import Decimal
from functools import partial from functools import partial
from pprint import pformat from pprint import pformat
import time import time
@ -41,7 +42,6 @@ from piker.accounting import (
Position, Position,
mk_allocator, mk_allocator,
MktPair, MktPair,
Symbol,
) )
from piker.clearing import ( from piker.clearing import (
open_ems, open_ems,
@ -143,6 +143,15 @@ class OrderMode:
} }
_staged_order: Order | None = None _staged_order: Order | None = None
@property
def curr_mkt(self) -> MktPair:
'''
Deliver the currently selected `MktPair` according
chart state.
'''
return self.chart.linked.mkt
def on_level_change_update_next_order_info( def on_level_change_update_next_order_info(
self, self,
level: float, level: float,
@ -172,7 +181,11 @@ class OrderMode:
line.update_labels(order_info) line.update_labels(order_info)
# update bound-in staged order # update bound-in staged order
order.price = level mkt: MktPair = self.curr_mkt
order.price: Decimal = mkt.quantize(
size=level,
quantity_type='price',
)
order.size = order_info['size'] order.size = order_info['size']
# when an order is changed we flip the settings side-pane to # when an order is changed we flip the settings side-pane to
@ -187,7 +200,9 @@ class OrderMode:
) -> LevelLine: ) -> LevelLine:
level = order.price # TODO, if we instead just always decimalize at the ems layer
# we can avoid this back-n-forth casting?
level = float(order.price)
line = order_line( line = order_line(
chart or self.chart, chart or self.chart,
@ -224,7 +239,11 @@ class OrderMode:
# the order mode allocator but we still need to update the # the order mode allocator but we still need to update the
# "staged" order message we'll send to the ems # "staged" order message we'll send to the ems
def update_order_price(y: float) -> None: def update_order_price(y: float) -> None:
order.price = y mkt: MktPair = self.curr_mkt
order.price: Decimal = mkt.quantize(
size=y,
quantity_type='price',
)
line._on_level_change = update_order_price line._on_level_change = update_order_price
@ -275,34 +294,31 @@ class OrderMode:
chart = cursor.linked.chart chart = cursor.linked.chart
if ( if (
not chart not chart
and cursor and
and cursor.active_plot cursor
and
cursor.active_plot
): ):
return return
chart = cursor.active_plot chart = cursor.active_plot
price = cursor._datum_xy[1] price: float = cursor._datum_xy[1]
if not price: if not price:
# zero prices are not supported by any means # zero prices are not supported by any means
# since that's illogical / a no-op. # since that's illogical / a no-op.
return return
mkt: MktPair = self.chart.linked.mkt
# NOTE : we could also use instead,
# mkt.quantize(price, quantity_type='price')
# but it returns a Decimal and it's probably gonna
# be slower?
# TODO: should we be enforcing this precision # TODO: should we be enforcing this precision
# at a different layer in the stack? right now # at a different layer in the stack?
# any precision error will literally be relayed # |_ might require `MktPair` tracking in the EMS?
# all the way back from the backend. # |_ right now any precision error will be relayed
# all the way back from the backend and vice-versa..
price = round( #
price, mkt: MktPair = self.curr_mkt
ndigits=mkt.price_tick_digits, price: Decimal = mkt.quantize(
size=price,
quantity_type='price',
) )
order = self._staged_order = Order( order = self._staged_order = Order(
action=action, action=action,
price=price, price=price,
@ -378,7 +394,7 @@ class OrderMode:
'oid': oid, 'oid': oid,
}) })
if order.price <= 0: if float(order.price) <= 0:
log.error( log.error(
'*!? Invalid `Order.price <= 0` ?!*\n' '*!? Invalid `Order.price <= 0` ?!*\n'
# TODO: make this present multi-line in object form # TODO: make this present multi-line in object form
@ -515,14 +531,15 @@ class OrderMode:
# if an order msg is provided update the line # if an order msg is provided update the line
# **from** that msg. # **from** that msg.
if order: if order:
if order.price <= 0: price: float = float(order.price)
if price <= 0:
log.error(f'Order has 0 price, cancelling..\n{order}') log.error(f'Order has 0 price, cancelling..\n{order}')
self.cancel_orders([order.oid]) self.cancel_orders([order.oid])
return None return None
line.set_level(order.price) line.set_level(price)
self.on_level_change_update_next_order_info( self.on_level_change_update_next_order_info(
level=order.price, level=price,
line=line, line=line,
order=order, order=order,
# use the corresponding position tracker for the # use the corresponding position tracker for the
@ -681,9 +698,9 @@ class OrderMode:
) -> Dialog | None: ) -> Dialog | None:
# NOTE: the `.order` attr **must** be set with the # NOTE: the `.order` attr **must** be set with the
# equivalent order msg in order to be loaded. # equivalent order msg in order to be loaded.
order = msg.req order: Order = msg.req
oid = str(msg.oid) oid = str(msg.oid)
symbol = order.symbol symbol: str = order.symbol
# TODO: MEGA UGGG ZONEEEE! # TODO: MEGA UGGG ZONEEEE!
src = msg.src src = msg.src
@ -702,13 +719,22 @@ class OrderMode:
order.oid = str(order.oid) order.oid = str(order.oid)
order.brokers = [brokername] order.brokers = [brokername]
# TODO: change this over to `MktPair`, but it's # ?TODO? change this over to `MktPair`, but it's gonna be
# gonna be tough since we don't have any such data # tough since we don't have any such data really in our
# really in our clearing msg schema.. # clearing msg schema..
order.symbol = Symbol.from_fqme( # BUT WAIT! WHY do we even want/need this!?
fqsn=fqme, #
info={}, # order.symbol = self.curr_mkt
) #
# XXX, the old approach.. which i don't quire member why..
# -[ ] verify we for sure don't require this any more!
# |_https://github.com/pikers/piker/issues/517
#
# order.symbol = Symbol.from_fqme(
# fqsn=fqme,
# info={},
# )
maybe_dialog: Dialog | None = self.submit_order( maybe_dialog: Dialog | None = self.submit_order(
send_msg=False, send_msg=False,
order=order, order=order,
@ -1101,7 +1127,7 @@ async def process_trade_msg(
) )
) )
): ):
msg.req = order msg.req: Order = order
dialog: ( dialog: (
Dialog Dialog
# NOTE: on an invalid order submission (eg. # NOTE: on an invalid order submission (eg.
@ -1166,7 +1192,7 @@ async def process_trade_msg(
tm = time.time() tm = time.time()
mode.on_fill( mode.on_fill(
oid, oid,
price=req.price, price=float(req.price),
time_s=tm, time_s=tm,
) )
mode.lines.remove_line(uuid=oid) mode.lines.remove_line(uuid=oid)
@ -1221,7 +1247,7 @@ async def process_trade_msg(
tm = details['broker_time'] tm = details['broker_time']
mode.on_fill( mode.on_fill(
oid, oid,
price=details['price'], price=float(details['price']),
time_s=tm, time_s=tm,
pointing='up' if action == 'buy' else 'down', pointing='up' if action == 'buy' else 'down',
) )

View File

@ -130,4 +130,5 @@ pyqtgraph = { git = "https://github.com/pikers/pyqtgraph.git" }
asyncvnc = { git = "https://github.com/pikers/asyncvnc.git", branch = "main" } asyncvnc = { git = "https://github.com/pikers/asyncvnc.git", branch = "main" }
tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" } tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" }
msgspec = { git = "https://github.com/jcrist/msgspec.git" } msgspec = { git = "https://github.com/jcrist/msgspec.git" }
tractor = { path = "../tractor", editable = true } tractor = { git = "https://pikers.dev/goodboy/tractor", branch = "piker_pin" }
# tractor = { path = "../tractor", editable = true }

View File

@ -62,8 +62,9 @@ ignore-init-module-imports = false
fixable = ["ALL"] fixable = ["ALL"]
unfixable = [] unfixable = []
# TODO? uhh why no work!?
# Allow unused variables when underscore-prefixed. # Allow unused variables when underscore-prefixed.
dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" # dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
[format] [format]
# Use single quotes in `ruff format`. # Use single quotes in `ruff format`.

View File

@ -179,7 +179,7 @@ def test_ems_err_on_bad_broker(
# NOTE: emsd should error on the actor's enabled modules # NOTE: emsd should error on the actor's enabled modules
# import phase, when looking for a backend named `doggy`. # import phase, when looking for a backend named `doggy`.
except tractor.RemoteActorError as re: except tractor.RemoteActorError as re:
assert re.type == ModuleNotFoundError assert re.type is ModuleNotFoundError
run_and_tollerate_cancels(load_bad_fqme) run_and_tollerate_cancels(load_bad_fqme)