Compare commits
32 Commits
28ba1392bb
...
d3ae2b26f6
| Author | SHA1 | Date |
|---|---|---|
|
|
d3ae2b26f6 | |
|
|
d0328bd640 | |
|
|
3c92b0c255 | |
|
|
bd1fc32368 | |
|
|
d99c55b16f | |
|
|
d0789956d0 | |
|
|
305db791ee | |
|
|
ebc8c70779 | |
|
|
fe9ff1afe4 | |
|
|
e2f95c2bee | |
|
|
d92fcb982c | |
|
|
b61145ec5a | |
|
|
624cca091a | |
|
|
9045e18386 | |
|
|
23ea65e337 | |
|
|
ea2e374101 | |
|
|
f64fcc69ed | |
|
|
f3a20ed77f | |
|
|
95cdaf8114 | |
|
|
39dcaf528a | |
|
|
3f663e0e73 | |
|
|
de542c90fb | |
|
|
41559e6729 | |
|
|
93e22e27b9 | |
|
|
a00e9c0e64 | |
|
|
cb694700c2 | |
|
|
11c931f65d | |
|
|
60390ae596 | |
|
|
9592735aaa | |
|
|
49841f5b91 | |
|
|
b2827ef3c3 | |
|
|
2fc4ccf011 |
31
README.rst
31
README.rst
|
|
@ -88,7 +88,23 @@ a sane install with `uv`
|
|||
************************
|
||||
bc why install with `python` when you can faster with `rust` ::
|
||||
|
||||
uv lock
|
||||
uv sync
|
||||
|
||||
# ^ astral's docs,
|
||||
# https://docs.astral.sh/uv/concepts/projects/sync/
|
||||
|
||||
include all GUIs ::
|
||||
|
||||
uv sync --extra uis
|
||||
|
||||
AND with all our hacking tools::
|
||||
|
||||
uv sync --dev --extra uis
|
||||
|
||||
|
||||
Ensure you can run the root-daemon::
|
||||
|
||||
uv run pikerd [-l info --pdb]
|
||||
|
||||
|
||||
hacky install on nixos
|
||||
|
|
@ -103,7 +119,18 @@ start a chart
|
|||
*************
|
||||
run a realtime OHLCV chart stand-alone::
|
||||
|
||||
piker -l info chart btcusdt.spot.binance xmrusdt.spot.kraken
|
||||
[uv run] piker -l info chart btcusdt.spot.binance xmrusdt.spot.kraken
|
||||
|
||||
# ^^^ iff you haven't activated the py-env,
|
||||
# - https://docs.astral.sh/uv/concepts/projects/run/
|
||||
#
|
||||
# in order to create an explicit virt-env see,
|
||||
# - https://docs.astral.sh/uv/concepts/projects/layout/#the-project-environment
|
||||
# - https://docs.astral.sh/uv/pip/environments/
|
||||
#
|
||||
# use $UV_PROJECT_ENVIRONMENT to select any non-`.venv/`
|
||||
# as the venv sudir in the repo's root.
|
||||
# - https://docs.astral.sh/uv/reference/environment/#uv_project_environment
|
||||
|
||||
this runs a chart UI (with 1m sampled OHLCV) and shows 2 spot markets from 2 diff cexes
|
||||
overlayed on the same graph. Use of `piker` without first starting
|
||||
|
|
|
|||
|
|
@ -42,7 +42,6 @@ from ._mktinfo import (
|
|||
dec_digits,
|
||||
digits_to_dec,
|
||||
MktPair,
|
||||
Symbol,
|
||||
unpack_fqme,
|
||||
_derivs as DerivTypes,
|
||||
)
|
||||
|
|
@ -60,7 +59,6 @@ __all__ = [
|
|||
'Asset',
|
||||
'MktPair',
|
||||
'Position',
|
||||
'Symbol',
|
||||
'Transaction',
|
||||
'TransactionLedger',
|
||||
'dec_digits',
|
||||
|
|
|
|||
|
|
@ -390,8 +390,8 @@ class MktPair(Struct, frozen=True):
|
|||
cls,
|
||||
fqme: str,
|
||||
|
||||
price_tick: float | str,
|
||||
size_tick: float | str,
|
||||
price_tick: float|str,
|
||||
size_tick: float|str,
|
||||
bs_mktid: str,
|
||||
|
||||
broker: str | None = None,
|
||||
|
|
@ -677,90 +677,3 @@ def unpack_fqme(
|
|||
# '.'.join([mkt_ep, venue]),
|
||||
suffix,
|
||||
)
|
||||
|
||||
|
||||
class Symbol(Struct):
|
||||
'''
|
||||
I guess this is some kinda container thing for dealing with
|
||||
all the different meta-data formats from brokers?
|
||||
|
||||
'''
|
||||
key: str
|
||||
|
||||
broker: str = ''
|
||||
venue: str = ''
|
||||
|
||||
# precision descriptors for price and vlm
|
||||
tick_size: Decimal = Decimal('0.01')
|
||||
lot_tick_size: Decimal = Decimal('0.0')
|
||||
|
||||
suffix: str = ''
|
||||
broker_info: dict[str, dict[str, Any]] = {}
|
||||
|
||||
@classmethod
|
||||
def from_fqme(
|
||||
cls,
|
||||
fqsn: str,
|
||||
info: dict[str, Any],
|
||||
|
||||
) -> Symbol:
|
||||
broker, mktep, venue, suffix = unpack_fqme(fqsn)
|
||||
tick_size = info.get('price_tick_size', 0.01)
|
||||
lot_size = info.get('lot_tick_size', 0.0)
|
||||
|
||||
return Symbol(
|
||||
broker=broker,
|
||||
key=mktep,
|
||||
tick_size=tick_size,
|
||||
lot_tick_size=lot_size,
|
||||
venue=venue,
|
||||
suffix=suffix,
|
||||
broker_info={broker: info},
|
||||
)
|
||||
|
||||
@property
|
||||
def type_key(self) -> str:
|
||||
return list(self.broker_info.values())[0]['asset_type']
|
||||
|
||||
@property
|
||||
def tick_size_digits(self) -> int:
|
||||
return float_digits(self.tick_size)
|
||||
|
||||
@property
|
||||
def lot_size_digits(self) -> int:
|
||||
return float_digits(self.lot_tick_size)
|
||||
|
||||
@property
|
||||
def price_tick(self) -> Decimal:
|
||||
return Decimal(str(self.tick_size))
|
||||
|
||||
@property
|
||||
def size_tick(self) -> Decimal:
|
||||
return Decimal(str(self.lot_tick_size))
|
||||
|
||||
@property
|
||||
def broker(self) -> str:
|
||||
return list(self.broker_info.keys())[0]
|
||||
|
||||
@property
|
||||
def fqme(self) -> str:
|
||||
return maybe_cons_tokens([
|
||||
self.key, # final "pair name" (eg. qqq[/usd], btcusdt)
|
||||
self.venue,
|
||||
self.suffix, # includes expiry and other con info
|
||||
self.broker,
|
||||
])
|
||||
|
||||
def quantize(
|
||||
self,
|
||||
size: float,
|
||||
) -> Decimal:
|
||||
digits = float_digits(self.lot_tick_size)
|
||||
return Decimal(size).quantize(
|
||||
Decimal(f'1.{"0".ljust(digits, "0")}'),
|
||||
rounding=ROUND_HALF_EVEN
|
||||
)
|
||||
|
||||
# NOTE: when cast to `str` return fqme
|
||||
def __str__(self) -> str:
|
||||
return self.fqme
|
||||
|
|
|
|||
|
|
@ -96,7 +96,10 @@ async def _setup_persistent_brokerd(
|
|||
# - `open_symbol_search()`
|
||||
# NOTE: see ep invocation details inside `.data.feed`.
|
||||
try:
|
||||
async with trio.open_nursery() as service_nursery:
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as service_nursery
|
||||
):
|
||||
bus: _FeedsBus = feed.get_feed_bus(
|
||||
brokername,
|
||||
service_nursery,
|
||||
|
|
|
|||
|
|
@ -374,9 +374,14 @@ class Client:
|
|||
pair: Pair = pair_type(**item)
|
||||
except Exception as e:
|
||||
e.add_note(
|
||||
"\nDon't panic, prolly stupid binance changed their symbology schema again..\n"
|
||||
'Check out their API docs here:\n\n'
|
||||
'https://binance-docs.github.io/apidocs/spot/en/#exchange-information'
|
||||
f'\n'
|
||||
f'New or removed field we need to codify!\n'
|
||||
f'pair-type: {pair_type!r}\n'
|
||||
f'\n'
|
||||
f"Don't panic, prolly stupid binance changed their symbology schema again..\n"
|
||||
f'Check out their API docs here:\n'
|
||||
f'\n'
|
||||
f'https://binance-docs.github.io/apidocs/spot/en/#exchange-information\n'
|
||||
)
|
||||
raise
|
||||
pair_table[pair.symbol.upper()] = pair
|
||||
|
|
|
|||
|
|
@ -97,6 +97,8 @@ class Pair(Struct, frozen=True, kw_only=True):
|
|||
baseAsset: str
|
||||
baseAssetPrecision: int
|
||||
|
||||
permissionSets: list[list[str]]
|
||||
|
||||
filters: dict[
|
||||
str,
|
||||
str | int | float,
|
||||
|
|
@ -142,7 +144,11 @@ class SpotPair(Pair, frozen=True):
|
|||
defaultSelfTradePreventionMode: str
|
||||
allowedSelfTradePreventionModes: list[str]
|
||||
permissions: list[str]
|
||||
permissionSets: list[list[str]]
|
||||
|
||||
# can the paint botz creat liq gaps even easier on this asset?
|
||||
# Bp
|
||||
# https://developers.binance.com/docs/binance-spot-api-docs/faqs/order_amend_keep_priority
|
||||
amendAllowed: bool
|
||||
|
||||
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
|
||||
ns_path: str = 'piker.brokers.binance:SpotPair'
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ import urllib.parse
|
|||
import hashlib
|
||||
import hmac
|
||||
import base64
|
||||
import tractor
|
||||
import trio
|
||||
|
||||
from piker import config
|
||||
|
|
@ -372,8 +373,7 @@ class Client:
|
|||
# 1658347714, 'status': 'Success'}]}
|
||||
|
||||
if xfers:
|
||||
import tractor
|
||||
await tractor.pp()
|
||||
await tractor.pause()
|
||||
|
||||
trans: dict[str, Transaction] = {}
|
||||
for entry in xfers:
|
||||
|
|
@ -501,7 +501,8 @@ class Client:
|
|||
for xkey, data in resp['result'].items():
|
||||
|
||||
# NOTE: always cache in pairs tables for faster lookup
|
||||
pair = Pair(xname=xkey, **data)
|
||||
with tractor.devx.maybe_open_crash_handler(): # as bxerr:
|
||||
pair = Pair(xname=xkey, **data)
|
||||
|
||||
# register the above `Pair` structs for all
|
||||
# key-sets/monikers: a set of 4 (frickin) tables
|
||||
|
|
|
|||
|
|
@ -175,9 +175,8 @@ async def handle_order_requests(
|
|||
|
||||
case {
|
||||
'account': 'kraken.spot' as account,
|
||||
'action': action,
|
||||
} if action in {'buy', 'sell'}:
|
||||
|
||||
'action': 'buy'|'sell',
|
||||
}:
|
||||
# validate
|
||||
order = BrokerdOrder(**msg)
|
||||
|
||||
|
|
@ -262,6 +261,12 @@ async def handle_order_requests(
|
|||
} | extra
|
||||
|
||||
log.info(f'Submitting WS order request:\n{pformat(req)}')
|
||||
|
||||
# NOTE HOWTO, debug order requests
|
||||
#
|
||||
# if 'XRP' in pair:
|
||||
# await tractor.pause()
|
||||
|
||||
await ws.send_msg(req)
|
||||
|
||||
# placehold for sanity checking in relay loop
|
||||
|
|
@ -1085,6 +1090,8 @@ async def handle_order_updates(
|
|||
f'Failed to {action} order {reqid}:\n'
|
||||
f'{errmsg}'
|
||||
)
|
||||
# if tractor._state.debug_mode():
|
||||
# await tractor.pause()
|
||||
|
||||
symbol: str = 'N/A'
|
||||
if chain := apiflows.get(reqid):
|
||||
|
|
|
|||
|
|
@ -21,7 +21,6 @@ Symbology defs and search.
|
|||
from decimal import Decimal
|
||||
|
||||
import tractor
|
||||
from rapidfuzz import process as fuzzy
|
||||
|
||||
from piker._cacheables import (
|
||||
async_lifo_cache,
|
||||
|
|
@ -41,8 +40,13 @@ from piker.accounting._mktinfo import (
|
|||
)
|
||||
|
||||
|
||||
# https://www.kraken.com/features/api#get-tradable-pairs
|
||||
class Pair(Struct):
|
||||
'''
|
||||
A tradable asset pair as schema-defined by,
|
||||
|
||||
https://docs.kraken.com/api/docs/rest-api/get-tradable-asset-pairs
|
||||
|
||||
'''
|
||||
xname: str # idiotic bs_mktid equiv i guess?
|
||||
altname: str # alternate pair name
|
||||
wsname: str # WebSocket pair name (if available)
|
||||
|
|
@ -53,7 +57,6 @@ class Pair(Struct):
|
|||
lot: str # volume lot size
|
||||
|
||||
cost_decimals: int
|
||||
costmin: float
|
||||
pair_decimals: int # scaling decimal places for pair
|
||||
lot_decimals: int # scaling decimal places for volume
|
||||
|
||||
|
|
@ -79,6 +82,7 @@ class Pair(Struct):
|
|||
tick_size: float # min price step size
|
||||
status: str
|
||||
|
||||
costmin: str|None = None # XXX, only some mktpairs?
|
||||
short_position_limit: float = 0
|
||||
long_position_limit: float = float('inf')
|
||||
|
||||
|
|
|
|||
|
|
@ -25,7 +25,10 @@ from typing import TYPE_CHECKING
|
|||
|
||||
import trio
|
||||
import tractor
|
||||
from tractor.trionics import broadcast_receiver
|
||||
from tractor.trionics import (
|
||||
broadcast_receiver,
|
||||
collapse_eg,
|
||||
)
|
||||
|
||||
from ._util import (
|
||||
log, # sub-sys logger
|
||||
|
|
@ -168,7 +171,6 @@ class OrderClient(Struct):
|
|||
|
||||
|
||||
async def relay_orders_from_sync_code(
|
||||
|
||||
client: OrderClient,
|
||||
symbol_key: str,
|
||||
to_ems_stream: tractor.MsgStream,
|
||||
|
|
@ -242,6 +244,11 @@ async def open_ems(
|
|||
|
||||
async with maybe_open_emsd(
|
||||
broker,
|
||||
# XXX NOTE, LOL so this determines the daemon `emsd` loglevel
|
||||
# then FYI.. that's kinda wrong no?
|
||||
# -[ ] shouldn't it be set by `pikerd -l` or no?
|
||||
# -[ ] would make a lot more sense to have a subsys ctl for
|
||||
# levels.. like `-l emsd.info` or something?
|
||||
loglevel=loglevel,
|
||||
) as portal:
|
||||
|
||||
|
|
@ -281,8 +288,11 @@ async def open_ems(
|
|||
client._ems_stream = trades_stream
|
||||
|
||||
# start sync code order msg delivery task
|
||||
async with trio.open_nursery() as n:
|
||||
n.start_soon(
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
tn.start_soon(
|
||||
relay_orders_from_sync_code,
|
||||
client,
|
||||
fqme,
|
||||
|
|
@ -298,4 +308,4 @@ async def open_ems(
|
|||
)
|
||||
|
||||
# stop the sync-msg-relay task on exit.
|
||||
n.cancel_scope.cancel()
|
||||
tn.cancel_scope.cancel()
|
||||
|
|
|
|||
|
|
@ -76,7 +76,6 @@ if TYPE_CHECKING:
|
|||
|
||||
# TODO: numba all of this
|
||||
def mk_check(
|
||||
|
||||
trigger_price: float,
|
||||
known_last: float,
|
||||
action: str,
|
||||
|
|
@ -162,7 +161,7 @@ async def clear_dark_triggers(
|
|||
|
||||
router: Router,
|
||||
brokerd_orders_stream: tractor.MsgStream,
|
||||
quote_stream: tractor.ReceiveMsgStream, # noqa
|
||||
quote_stream: tractor.MsgStream,
|
||||
broker: str,
|
||||
fqme: str,
|
||||
|
||||
|
|
@ -178,6 +177,7 @@ async def clear_dark_triggers(
|
|||
'''
|
||||
# XXX: optimize this for speed!
|
||||
# TODO:
|
||||
# - port to the new ringbuf stuff in `tractor.ipc`!
|
||||
# - numba all this!
|
||||
# - this stream may eventually contain multiple symbols
|
||||
quote_stream._raise_on_lag = False
|
||||
|
|
@ -653,7 +653,11 @@ class Router(Struct):
|
|||
flume = feed.flumes[fqme]
|
||||
first_quote: dict = flume.first_quote
|
||||
book: DarkBook = self.get_dark_book(broker)
|
||||
book.lasts[fqme]: float = float(first_quote['last'])
|
||||
|
||||
if not (last := first_quote.get('last')):
|
||||
last: float = flume.rt_shm.array[-1]['close']
|
||||
|
||||
book.lasts[fqme]: float = float(last)
|
||||
|
||||
async with self.maybe_open_brokerd_dialog(
|
||||
brokermod=brokermod,
|
||||
|
|
@ -716,7 +720,7 @@ class Router(Struct):
|
|||
subs = self.subscribers[sub_key]
|
||||
|
||||
sent_some: bool = False
|
||||
for client_stream in subs:
|
||||
for client_stream in subs.copy():
|
||||
try:
|
||||
await client_stream.send(msg)
|
||||
sent_some = True
|
||||
|
|
@ -1010,10 +1014,14 @@ async def translate_and_relay_brokerd_events(
|
|||
status_msg.brokerd_msg = msg
|
||||
status_msg.src = msg.broker_details['name']
|
||||
|
||||
await router.client_broadcast(
|
||||
status_msg.req.symbol,
|
||||
status_msg,
|
||||
)
|
||||
if not status_msg.req:
|
||||
# likely some order change state?
|
||||
await tractor.pause()
|
||||
else:
|
||||
await router.client_broadcast(
|
||||
status_msg.req.symbol,
|
||||
status_msg,
|
||||
)
|
||||
|
||||
if status == 'closed':
|
||||
log.info(f'Execution for {oid} is complete!')
|
||||
|
|
@ -1182,12 +1190,16 @@ async def process_client_order_cmds(
|
|||
submitting live orders immediately if requested by the client.
|
||||
|
||||
'''
|
||||
# cmd: dict
|
||||
# TODO, only allow `msgspec.Struct` form!
|
||||
cmd: dict
|
||||
async for cmd in client_order_stream:
|
||||
log.info(f'Received order cmd:\n{pformat(cmd)}')
|
||||
log.info(
|
||||
f'Received order cmd:\n'
|
||||
f'{pformat(cmd)}\n'
|
||||
)
|
||||
|
||||
# CAWT DAMN we need struct support!
|
||||
oid = str(cmd['oid'])
|
||||
oid: str = str(cmd['oid'])
|
||||
|
||||
# register this stream as an active order dialog (msg flow) for
|
||||
# this order id such that translated message from the brokerd
|
||||
|
|
@ -1293,7 +1305,7 @@ async def process_client_order_cmds(
|
|||
case {
|
||||
'oid': oid,
|
||||
'symbol': fqme,
|
||||
'price': trigger_price,
|
||||
'price': price,
|
||||
'size': size,
|
||||
'action': ('buy' | 'sell') as action,
|
||||
'exec_mode': ('live' | 'paper'),
|
||||
|
|
@ -1325,7 +1337,7 @@ async def process_client_order_cmds(
|
|||
|
||||
symbol=sym,
|
||||
action=action,
|
||||
price=trigger_price,
|
||||
price=price,
|
||||
size=size,
|
||||
account=req.account,
|
||||
)
|
||||
|
|
@ -1347,7 +1359,11 @@ async def process_client_order_cmds(
|
|||
# (``translate_and_relay_brokerd_events()`` above) will
|
||||
# handle relaying the ems side responses back to
|
||||
# the client/cmd sender from this request
|
||||
log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
|
||||
log.info(
|
||||
f'Sending live order to {broker}:\n'
|
||||
f'{pformat(msg)}'
|
||||
)
|
||||
|
||||
await brokerd_order_stream.send(msg)
|
||||
|
||||
# an immediate response should be ``BrokerdOrderAck``
|
||||
|
|
@ -1363,7 +1379,7 @@ async def process_client_order_cmds(
|
|||
case {
|
||||
'oid': oid,
|
||||
'symbol': fqme,
|
||||
'price': trigger_price,
|
||||
'price': price,
|
||||
'size': size,
|
||||
'exec_mode': exec_mode,
|
||||
'action': action,
|
||||
|
|
@ -1391,7 +1407,12 @@ async def process_client_order_cmds(
|
|||
if isnan(last):
|
||||
last = flume.rt_shm.array[-1]['close']
|
||||
|
||||
pred = mk_check(trigger_price, last, action)
|
||||
trigger_price: float = float(price)
|
||||
pred = mk_check(
|
||||
trigger_price,
|
||||
last,
|
||||
action,
|
||||
)
|
||||
|
||||
# NOTE: for dark orders currently we submit
|
||||
# the triggered live order at a price 5 ticks
|
||||
|
|
@ -1531,7 +1552,7 @@ async def _emsd_main(
|
|||
ctx: tractor.Context,
|
||||
fqme: str,
|
||||
exec_mode: str, # ('paper', 'live')
|
||||
loglevel: str | None = None,
|
||||
loglevel: str|None = None,
|
||||
|
||||
) -> tuple[
|
||||
dict[
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ Clearing sub-system message and protocols.
|
|||
|
||||
"""
|
||||
from __future__ import annotations
|
||||
from decimal import Decimal
|
||||
from typing import (
|
||||
Literal,
|
||||
)
|
||||
|
|
@ -71,7 +72,15 @@ class Order(Struct):
|
|||
symbol: str # | MktPair
|
||||
account: str # should we set a default as '' ?
|
||||
|
||||
price: float
|
||||
# https://docs.python.org/3/library/decimal.html#decimal-objects
|
||||
#
|
||||
# ?TODO? decimal usage throughout?
|
||||
# -[ ] possibly leverage the `Encoder(decimal_format='number')`
|
||||
# bit?
|
||||
# |_https://jcristharif.com/msgspec/supported-types.html#decimal
|
||||
# -[ ] should we also use it for .size?
|
||||
#
|
||||
price: Decimal
|
||||
size: float # -ve is "sell", +ve is "buy"
|
||||
|
||||
brokers: list[str] = []
|
||||
|
|
@ -178,7 +187,7 @@ class BrokerdOrder(Struct):
|
|||
time_ns: int
|
||||
|
||||
symbol: str # fqme
|
||||
price: float
|
||||
price: Decimal
|
||||
size: float
|
||||
|
||||
# TODO: if we instead rely on a +ve/-ve size to determine
|
||||
|
|
|
|||
|
|
@ -297,6 +297,8 @@ class PaperBoi(Struct):
|
|||
|
||||
# transmit pp msg to ems
|
||||
pp: Position = self.acnt.pps[bs_mktid]
|
||||
# TODO, this will break if `require_only=True` was passed to
|
||||
# `.update_from_ledger()`
|
||||
|
||||
pp_msg = BrokerdPosition(
|
||||
broker=self.broker,
|
||||
|
|
@ -508,7 +510,7 @@ async def handle_order_requests(
|
|||
reqid = await client.submit_limit(
|
||||
oid=order.oid,
|
||||
symbol=f'{order.symbol}.{client.broker}',
|
||||
price=order.price,
|
||||
price=float(order.price),
|
||||
action=order.action,
|
||||
size=order.size,
|
||||
# XXX: by default 0 tells ``ib_insync`` methods that
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ subsys: str = 'piker.clearing'
|
|||
|
||||
log = get_logger(subsys)
|
||||
|
||||
# TODO, oof doesn't this ignore the `loglevel` then???
|
||||
get_console_log = partial(
|
||||
get_console_log,
|
||||
name=subsys,
|
||||
|
|
|
|||
|
|
@ -335,7 +335,7 @@ def services(config, tl, ports):
|
|||
name='service_query',
|
||||
loglevel=config['loglevel'] if tl else None,
|
||||
),
|
||||
tractor.get_arbiter(
|
||||
tractor.get_registry(
|
||||
host=host,
|
||||
port=ports[0]
|
||||
) as portal
|
||||
|
|
|
|||
|
|
@ -95,6 +95,12 @@ class Sampler:
|
|||
# history loading.
|
||||
incr_task_cs: trio.CancelScope | None = None
|
||||
|
||||
bcast_errors: tuple[Exception] = (
|
||||
trio.BrokenResourceError,
|
||||
trio.ClosedResourceError,
|
||||
trio.EndOfChannel,
|
||||
)
|
||||
|
||||
# holds all the ``tractor.Context`` remote subscriptions for
|
||||
# a particular sample period increment event: all subscribers are
|
||||
# notified on a step.
|
||||
|
|
@ -258,14 +264,15 @@ class Sampler:
|
|||
subs: set
|
||||
last_ts, subs = pair
|
||||
|
||||
task = trio.lowlevel.current_task()
|
||||
log.debug(
|
||||
f'SUBS {self.subscribers}\n'
|
||||
f'PAIR {pair}\n'
|
||||
f'TASK: {task}: {id(task)}\n'
|
||||
f'broadcasting {period_s} -> {last_ts}\n'
|
||||
# f'consumers: {subs}'
|
||||
)
|
||||
# NOTE, for debugging pub-sub issues
|
||||
# task = trio.lowlevel.current_task()
|
||||
# log.debug(
|
||||
# f'AlL-SUBS@{period_s!r}: {self.subscribers}\n'
|
||||
# f'PAIR: {pair}\n'
|
||||
# f'TASK: {task}: {id(task)}\n'
|
||||
# f'broadcasting {period_s} -> {last_ts}\n'
|
||||
# f'consumers: {subs}'
|
||||
# )
|
||||
borked: set[MsgStream] = set()
|
||||
sent: set[MsgStream] = set()
|
||||
while True:
|
||||
|
|
@ -282,12 +289,11 @@ class Sampler:
|
|||
await stream.send(msg)
|
||||
sent.add(stream)
|
||||
|
||||
except (
|
||||
trio.BrokenResourceError,
|
||||
trio.ClosedResourceError
|
||||
):
|
||||
except self.bcast_errors as err:
|
||||
log.error(
|
||||
f'{stream._ctx.chan.uid} dropped connection'
|
||||
f'Connection dropped for IPC ctx\n'
|
||||
f'{stream._ctx}\n\n'
|
||||
f'Due to {type(err)}'
|
||||
)
|
||||
borked.add(stream)
|
||||
else:
|
||||
|
|
@ -394,7 +400,8 @@ async def register_with_sampler(
|
|||
finally:
|
||||
if (
|
||||
sub_for_broadcasts
|
||||
and subs
|
||||
and
|
||||
subs
|
||||
):
|
||||
try:
|
||||
subs.remove(stream)
|
||||
|
|
@ -561,8 +568,7 @@ async def open_sample_stream(
|
|||
|
||||
|
||||
async def sample_and_broadcast(
|
||||
|
||||
bus: _FeedsBus, # noqa
|
||||
bus: _FeedsBus,
|
||||
rt_shm: ShmArray,
|
||||
hist_shm: ShmArray,
|
||||
quote_stream: trio.abc.ReceiveChannel,
|
||||
|
|
@ -582,11 +588,33 @@ async def sample_and_broadcast(
|
|||
|
||||
overruns = Counter()
|
||||
|
||||
# NOTE, only used for debugging live-data-feed issues, though
|
||||
# this should be resolved more correctly in the future using the
|
||||
# new typed-msgspec feats of `tractor`!
|
||||
#
|
||||
# XXX, a multiline nested `dict` formatter (since rn quote-msgs
|
||||
# are just that).
|
||||
# pfmt: Callable[[str], str] = mk_repr()
|
||||
|
||||
# iterate stream delivered by broker
|
||||
async for quotes in quote_stream:
|
||||
# print(quotes)
|
||||
|
||||
# TODO: ``numba`` this!
|
||||
# XXX WARNING XXX only enable for debugging bc ow can cost
|
||||
# ALOT of perf with HF-feedz!!!
|
||||
#
|
||||
# log.info(
|
||||
# 'Rx live quotes:\n'
|
||||
# f'{pfmt(quotes)}'
|
||||
# )
|
||||
|
||||
# TODO,
|
||||
# -[ ] `numba` or `cython`-nize this loop possibly?
|
||||
# |_alternatively could we do it in rust somehow by upacking
|
||||
# arrow msgs instead of using `msgspec`?
|
||||
# -[ ] use `msgspec.Struct` support in new typed-msging from
|
||||
# `tractor` to ensure only allowed msgs are transmitted?
|
||||
#
|
||||
for broker_symbol, quote in quotes.items():
|
||||
# TODO: in theory you can send the IPC msg *before* writing
|
||||
# to the sharedmem array to decrease latency, however, that
|
||||
|
|
@ -659,6 +687,21 @@ async def sample_and_broadcast(
|
|||
sub_key: str = broker_symbol.lower()
|
||||
subs: set[Sub] = bus.get_subs(sub_key)
|
||||
|
||||
# TODO, figure out how to make this useful whilst
|
||||
# incoporating feed "pausing" ..
|
||||
#
|
||||
# if not subs:
|
||||
# all_bs_fqmes: list[str] = list(
|
||||
# bus._subscribers.keys()
|
||||
# )
|
||||
# log.warning(
|
||||
# f'No subscribers for {brokername!r} live-quote ??\n'
|
||||
# f'broker_symbol: {broker_symbol}\n\n'
|
||||
|
||||
# f'Maybe the backend-sys symbol does not match one of,\n'
|
||||
# f'{pfmt(all_bs_fqmes)}\n'
|
||||
# )
|
||||
|
||||
# NOTE: by default the broker backend doesn't append
|
||||
# it's own "name" into the fqme schema (but maybe it
|
||||
# should?) so we have to manually generate the correct
|
||||
|
|
@ -697,7 +740,7 @@ async def sample_and_broadcast(
|
|||
|
||||
log.warning(
|
||||
f'Feed OVERRUN {sub_key}'
|
||||
'@{bus.brokername} -> \n'
|
||||
f'@{bus.brokername} -> \n'
|
||||
f'feed @ {chan.uid}\n'
|
||||
f'throttle = {throttle} Hz'
|
||||
)
|
||||
|
|
@ -728,18 +771,14 @@ async def sample_and_broadcast(
|
|||
if lags > 10:
|
||||
await tractor.pause()
|
||||
|
||||
except (
|
||||
trio.BrokenResourceError,
|
||||
trio.ClosedResourceError,
|
||||
trio.EndOfChannel,
|
||||
):
|
||||
except Sampler.bcast_errors as ipc_err:
|
||||
ctx: Context = ipc._ctx
|
||||
chan: Channel = ctx.chan
|
||||
if ctx:
|
||||
log.warning(
|
||||
'Dropped `brokerd`-quotes-feed connection:\n'
|
||||
f'{broker_symbol}:'
|
||||
f'{ctx.cid}@{chan.uid}'
|
||||
f'Dropped `brokerd`-feed for {broker_symbol!r} due to,\n'
|
||||
f'x>) {ctx.cid}@{chan.uid}'
|
||||
f'|_{ipc_err!r}\n\n'
|
||||
)
|
||||
if sub.throttle_rate:
|
||||
assert ipc._closed
|
||||
|
|
@ -756,12 +795,11 @@ async def sample_and_broadcast(
|
|||
|
||||
|
||||
async def uniform_rate_send(
|
||||
|
||||
rate: float,
|
||||
quote_stream: trio.abc.ReceiveChannel,
|
||||
stream: MsgStream,
|
||||
|
||||
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
|
||||
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
|
|
@ -779,13 +817,16 @@ async def uniform_rate_send(
|
|||
https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9
|
||||
|
||||
'''
|
||||
# TODO: compute the approx overhead latency per cycle
|
||||
left_to_sleep = throttle_period = 1/rate - 0.000616
|
||||
# ?TODO? dynamically compute the **actual** approx overhead latency per cycle
|
||||
# instead of this magic # bidinezz?
|
||||
throttle_period: float = 1/rate - 0.000616
|
||||
left_to_sleep: float = throttle_period
|
||||
|
||||
# send cycle state
|
||||
first_quote: dict|None
|
||||
first_quote = last_quote = None
|
||||
last_send = time.time()
|
||||
diff = 0
|
||||
last_send: float = time.time()
|
||||
diff: float = 0
|
||||
|
||||
task_status.started()
|
||||
ticks_by_type: dict[
|
||||
|
|
@ -796,22 +837,28 @@ async def uniform_rate_send(
|
|||
clear_types = _tick_groups['clears']
|
||||
|
||||
while True:
|
||||
|
||||
# compute the remaining time to sleep for this throttled cycle
|
||||
left_to_sleep = throttle_period - diff
|
||||
left_to_sleep: float = throttle_period - diff
|
||||
|
||||
if left_to_sleep > 0:
|
||||
cs: trio.CancelScope
|
||||
with trio.move_on_after(left_to_sleep) as cs:
|
||||
sym: str
|
||||
last_quote: dict
|
||||
try:
|
||||
sym, last_quote = await quote_stream.receive()
|
||||
except trio.EndOfChannel:
|
||||
log.exception(f"feed for {stream} ended?")
|
||||
log.exception(
|
||||
f'Live stream for feed for ended?\n'
|
||||
f'<=c\n'
|
||||
f' |_[{stream!r}\n'
|
||||
)
|
||||
break
|
||||
|
||||
diff = time.time() - last_send
|
||||
diff: float = time.time() - last_send
|
||||
|
||||
if not first_quote:
|
||||
first_quote = last_quote
|
||||
first_quote: float = last_quote
|
||||
# first_quote['tbt'] = ticks_by_type
|
||||
|
||||
if (throttle_period - diff) > 0:
|
||||
|
|
@ -872,7 +919,9 @@ async def uniform_rate_send(
|
|||
# TODO: now if only we could sync this to the display
|
||||
# rate timing exactly lul
|
||||
try:
|
||||
await stream.send({sym: first_quote})
|
||||
await stream.send({
|
||||
sym: first_quote
|
||||
})
|
||||
except tractor.RemoteActorError as rme:
|
||||
if rme.type is not tractor._exceptions.StreamOverrun:
|
||||
raise
|
||||
|
|
@ -883,19 +932,28 @@ async def uniform_rate_send(
|
|||
f'{sym}:{ctx.cid}@{chan.uid}'
|
||||
)
|
||||
|
||||
# NOTE: any of these can be raised by `tractor`'s IPC
|
||||
# transport-layer and we want to be highly resilient
|
||||
# to consumers which crash or lose network connection.
|
||||
# I.e. we **DO NOT** want to crash and propagate up to
|
||||
# ``pikerd`` these kinds of errors!
|
||||
except (
|
||||
# NOTE: any of these can be raised by ``tractor``'s IPC
|
||||
# transport-layer and we want to be highly resilient
|
||||
# to consumers which crash or lose network connection.
|
||||
# I.e. we **DO NOT** want to crash and propagate up to
|
||||
# ``pikerd`` these kinds of errors!
|
||||
trio.ClosedResourceError,
|
||||
trio.BrokenResourceError,
|
||||
ConnectionResetError,
|
||||
):
|
||||
# if the feed consumer goes down then drop
|
||||
# out of this rate limiter
|
||||
log.warning(f'{stream} closed')
|
||||
) + Sampler.bcast_errors as ipc_err:
|
||||
match ipc_err:
|
||||
case trio.EndOfChannel():
|
||||
log.info(
|
||||
f'{stream} terminated by peer,\n'
|
||||
f'{ipc_err!r}'
|
||||
)
|
||||
case _:
|
||||
# if the feed consumer goes down then drop
|
||||
# out of this rate limiter
|
||||
log.warning(
|
||||
f'{stream} closed due to,\n'
|
||||
f'{ipc_err!r}'
|
||||
)
|
||||
|
||||
await stream.aclose()
|
||||
return
|
||||
|
||||
|
|
|
|||
|
|
@ -39,6 +39,7 @@ from typing import (
|
|||
AsyncContextManager,
|
||||
Awaitable,
|
||||
Sequence,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
import trio
|
||||
|
|
@ -75,6 +76,10 @@ from ._sampling import (
|
|||
uniform_rate_send,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from tractor._addr import Address
|
||||
from tractor.msg.types import Aid
|
||||
|
||||
|
||||
class Sub(Struct, frozen=True):
|
||||
'''
|
||||
|
|
@ -899,19 +904,19 @@ async def open_feed(
|
|||
feed.portals[brokermod] = portal
|
||||
|
||||
# fill out "status info" that the UI can show
|
||||
host, port = portal.channel.raddr
|
||||
if host == '127.0.0.1':
|
||||
host = 'localhost'
|
||||
|
||||
chan: tractor.Channel = portal.chan
|
||||
raddr: Address = chan.raddr
|
||||
aid: Aid = chan.aid
|
||||
# TAG_feed_status_update
|
||||
feed.status.update({
|
||||
'actor_name': portal.channel.uid[0],
|
||||
'host': host,
|
||||
'port': port,
|
||||
'actor_id': aid,
|
||||
'actor_short_id': f'{aid.name}@{aid.pid}',
|
||||
'ipc': chan.raddr.proto_key,
|
||||
'ipc_addr': raddr,
|
||||
'hist_shm': 'NA',
|
||||
'rt_shm': 'NA',
|
||||
'throttle_rate': tick_throttle,
|
||||
'throttle_hz': tick_throttle,
|
||||
})
|
||||
# feed.status.update(init_msg.pop('status', {}))
|
||||
|
||||
# (allocate and) connect to any feed bus for this broker
|
||||
bus_ctxs.append(
|
||||
|
|
|
|||
|
|
@ -498,6 +498,7 @@ async def cascade(
|
|||
|
||||
func_name: str = func.__name__
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(), # avoid multi-taskc tb in console
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
# TODO: might be better to just make a "restart" method where
|
||||
|
|
|
|||
30
piker/log.py
30
piker/log.py
|
|
@ -19,6 +19,10 @@ Log like a forester!
|
|||
"""
|
||||
import logging
|
||||
import json
|
||||
import reprlib
|
||||
from typing import (
|
||||
Callable,
|
||||
)
|
||||
|
||||
import tractor
|
||||
from pygments import (
|
||||
|
|
@ -84,3 +88,29 @@ def colorize_json(
|
|||
# likeable styles: algol_nu, tango, monokai
|
||||
formatters.TerminalTrueColorFormatter(style=style)
|
||||
)
|
||||
|
||||
|
||||
# TODO, eventually defer to the version in `modden` once
|
||||
# it becomes a dep!
|
||||
def mk_repr(
|
||||
**repr_kws,
|
||||
) -> Callable[[str], str]:
|
||||
'''
|
||||
Allocate and deliver a `repr.Repr` instance with provided input
|
||||
settings using the std-lib's `reprlib` mod,
|
||||
* https://docs.python.org/3/library/reprlib.html
|
||||
|
||||
------ Ex. ------
|
||||
An up to 6-layer-nested `dict` as multi-line:
|
||||
- https://stackoverflow.com/a/79102479
|
||||
- https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
|
||||
|
||||
'''
|
||||
def_kws: dict[str, int] = dict(
|
||||
indent=2,
|
||||
maxlevel=6, # recursion levels
|
||||
maxstring=66, # match editor line-len limit
|
||||
)
|
||||
def_kws |= repr_kws
|
||||
reprr = reprlib.Repr(**def_kws)
|
||||
return reprr.repr
|
||||
|
|
|
|||
|
|
@ -200,7 +200,8 @@ async def open_pikerd(
|
|||
reg_addrs,
|
||||
),
|
||||
tractor.open_nursery() as actor_nursery,
|
||||
trio.open_nursery() as service_nursery,
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as service_tn,
|
||||
):
|
||||
for addr in reg_addrs:
|
||||
if addr not in root_actor.accept_addrs:
|
||||
|
|
@ -211,7 +212,7 @@ async def open_pikerd(
|
|||
|
||||
# assign globally for future daemon/task creation
|
||||
Services.actor_n = actor_nursery
|
||||
Services.service_n = service_nursery
|
||||
Services.service_n = service_tn
|
||||
Services.debug_mode = debug_mode
|
||||
|
||||
try:
|
||||
|
|
@ -221,7 +222,7 @@ async def open_pikerd(
|
|||
# TODO: is this more clever/efficient?
|
||||
# if 'samplerd' in Services.service_tasks:
|
||||
# await Services.cancel_service('samplerd')
|
||||
service_nursery.cancel_scope.cancel()
|
||||
service_tn.cancel_scope.cancel()
|
||||
|
||||
|
||||
# TODO: do we even need this?
|
||||
|
|
|
|||
|
|
@ -517,7 +517,7 @@ def with_dts(
|
|||
|
||||
'''
|
||||
return df.with_columns([
|
||||
pl.col(time_col).shift(1).suffix('_prev'),
|
||||
pl.col(time_col).shift(1).name.suffix('_prev'),
|
||||
pl.col(time_col).diff().alias('s_diff'),
|
||||
pl.from_epoch(pl.col(time_col)).alias('dt'),
|
||||
]).with_columns([
|
||||
|
|
@ -623,7 +623,7 @@ def detect_vlm_gaps(
|
|||
|
||||
) -> pl.DataFrame:
|
||||
|
||||
vnull: pl.DataFrame = w_dts.filter(
|
||||
vnull: pl.DataFrame = df.filter(
|
||||
pl.col(col) == 0
|
||||
)
|
||||
return vnull
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ Main app startup and run.
|
|||
from functools import partial
|
||||
from types import ModuleType
|
||||
|
||||
import tractor
|
||||
import trio
|
||||
|
||||
from piker.ui.qt import (
|
||||
|
|
@ -116,6 +117,7 @@ async def _async_main(
|
|||
needed_brokermods[brokername] = brokers[brokername]
|
||||
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as root_n,
|
||||
):
|
||||
# set root nursery and task stack for spawning other charts/feeds
|
||||
|
|
|
|||
|
|
@ -33,7 +33,6 @@ import trio
|
|||
|
||||
from piker.ui.qt import (
|
||||
QtCore,
|
||||
QtWidgets,
|
||||
Qt,
|
||||
QLineF,
|
||||
QFrame,
|
||||
|
|
|
|||
|
|
@ -1445,7 +1445,10 @@ async def display_symbol_data(
|
|||
# for pause/resume on mouse interaction
|
||||
rt_chart.feed = feed
|
||||
|
||||
async with trio.open_nursery() as ln:
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as ln,
|
||||
):
|
||||
# if available load volume related built-in display(s)
|
||||
vlm_charts: dict[
|
||||
str,
|
||||
|
|
|
|||
|
|
@ -22,7 +22,10 @@ from contextlib import asynccontextmanager as acm
|
|||
from typing import Callable
|
||||
|
||||
import trio
|
||||
from tractor.trionics import gather_contexts
|
||||
from tractor.trionics import (
|
||||
gather_contexts,
|
||||
collapse_eg,
|
||||
)
|
||||
|
||||
from piker.ui.qt import (
|
||||
QtCore,
|
||||
|
|
@ -207,7 +210,10 @@ async def open_signal_handler(
|
|||
async for args in recv:
|
||||
await async_handler(*args)
|
||||
|
||||
async with trio.open_nursery() as tn:
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as tn
|
||||
):
|
||||
tn.start_soon(proxy_to_handler)
|
||||
async with send:
|
||||
yield
|
||||
|
|
@ -242,6 +248,7 @@ async def open_handlers(
|
|||
widget: QWidget
|
||||
streams: list[trio.abc.ReceiveChannel]
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
gather_contexts([
|
||||
open_event_stream(
|
||||
|
|
|
|||
|
|
@ -18,10 +18,11 @@
|
|||
Feed status and controls widget(s) for embedding in a UI-pane.
|
||||
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
from textwrap import dedent
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import (
|
||||
Any,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
# from PyQt5.QtCore import Qt
|
||||
|
||||
|
|
@ -49,35 +50,55 @@ def mk_feed_label(
|
|||
a feed control protocol.
|
||||
|
||||
'''
|
||||
status = feed.status
|
||||
status: dict[str, Any] = feed.status
|
||||
assert status
|
||||
|
||||
msg = dedent("""
|
||||
actor: **{actor_name}**\n
|
||||
|_ @**{host}:{port}**\n
|
||||
""")
|
||||
# SO tips on ws/nls,
|
||||
# https://stackoverflow.com/a/15721400
|
||||
ws: str = ' '
|
||||
# nl: str = '<br>' # dun work?
|
||||
actor_info_repr: str = (
|
||||
f')> **{status["actor_short_id"]}**\n'
|
||||
'\n' # bc md?
|
||||
)
|
||||
|
||||
for key, val in status.items():
|
||||
if key in ('host', 'port', 'actor_name'):
|
||||
continue
|
||||
msg += f'\n|_ {key}: **{{{key}}}**\n'
|
||||
# fields to select *IN* for display
|
||||
# (see `.data.feed.open_feed()` status
|
||||
# update -> TAG_feed_status_update)
|
||||
for key in [
|
||||
'ipc',
|
||||
'hist_shm',
|
||||
'rt_shm',
|
||||
'throttle_hz',
|
||||
]:
|
||||
# NOTE, the 2nd key is filled via `.format()` updates.
|
||||
actor_info_repr += (
|
||||
f'\n' # bc md?
|
||||
f'{ws}|_{key}: **{{{key}}}**\n'
|
||||
)
|
||||
# ^TODO? formatting and content..
|
||||
# -[ ] showing which fqme is "forward" on the
|
||||
# chart/fsp/order-mode?
|
||||
# '|_ flows: **{symbols}**\n'
|
||||
#
|
||||
# -[x] why isn't the indent working?
|
||||
# => markdown, now solved..
|
||||
|
||||
feed_label = FormatLabel(
|
||||
fmt_str=msg,
|
||||
# |_ streams: **{symbols}**\n
|
||||
fmt_str=actor_info_repr,
|
||||
font=_font.font,
|
||||
font_size=_font_small.px_size,
|
||||
font_color='default_lightest',
|
||||
)
|
||||
|
||||
# ?TODO, remove this?
|
||||
# form.vbox.setAlignment(feed_label, Qt.AlignBottom)
|
||||
# form.vbox.setAlignment(Qt.AlignBottom)
|
||||
_ = chart.height() - (
|
||||
form.height() +
|
||||
form.fill_bar.height()
|
||||
# feed_label.height()
|
||||
)
|
||||
# _ = chart.height() - (
|
||||
# form.height() +
|
||||
# form.fill_bar.height()
|
||||
# # feed_label.height()
|
||||
# )
|
||||
|
||||
feed_label.format(**feed.status)
|
||||
|
||||
return feed_label
|
||||
|
|
|
|||
|
|
@ -600,6 +600,7 @@ async def open_fsp_admin(
|
|||
kwargs=kwargs,
|
||||
) as (cache_hit, cluster_map),
|
||||
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
if cache_hit:
|
||||
|
|
@ -613,6 +614,8 @@ async def open_fsp_admin(
|
|||
)
|
||||
try:
|
||||
yield admin
|
||||
|
||||
# ??TODO, does this *need* to be inside a finally?
|
||||
finally:
|
||||
# terminate all tasks via signals
|
||||
for key, entry in admin._registry.items():
|
||||
|
|
|
|||
|
|
@ -285,18 +285,20 @@ class FormatLabel(QLabel):
|
|||
font_size: int,
|
||||
font_color: str,
|
||||
|
||||
use_md: bool = True,
|
||||
|
||||
parent=None,
|
||||
|
||||
) -> None:
|
||||
|
||||
super().__init__(parent)
|
||||
|
||||
# by default set the format string verbatim and expect user to
|
||||
# call ``.format()`` later (presumably they'll notice the
|
||||
# by default set the format string verbatim and expect user
|
||||
# to call ``.format()`` later (presumably they'll notice the
|
||||
# unformatted content if ``fmt_str`` isn't meant to be
|
||||
# unformatted).
|
||||
self.fmt_str = fmt_str
|
||||
self.setText(fmt_str)
|
||||
# self.setText(fmt_str) # ?TODO, why here?
|
||||
|
||||
self.setStyleSheet(
|
||||
f"""QLabel {{
|
||||
|
|
@ -306,9 +308,10 @@ class FormatLabel(QLabel):
|
|||
"""
|
||||
)
|
||||
self.setFont(_font.font)
|
||||
self.setTextFormat(
|
||||
Qt.TextFormat.MarkdownText
|
||||
)
|
||||
if use_md:
|
||||
self.setTextFormat(
|
||||
Qt.TextFormat.MarkdownText
|
||||
)
|
||||
self.setMargin(0)
|
||||
|
||||
self.setSizePolicy(
|
||||
|
|
@ -316,7 +319,10 @@ class FormatLabel(QLabel):
|
|||
size_policy.Expanding,
|
||||
)
|
||||
self.setAlignment(
|
||||
Qt.AlignVCenter | Qt.AlignLeft
|
||||
Qt.AlignLeft
|
||||
|
|
||||
Qt.AlignBottom
|
||||
# Qt.AlignVCenter
|
||||
)
|
||||
self.setText(self.fmt_str)
|
||||
|
||||
|
|
|
|||
|
|
@ -269,6 +269,8 @@ def hcolor(name: str) -> str:
|
|||
|
||||
# default ohlc-bars/curve gray
|
||||
'bracket': '#666666', # like the logo
|
||||
'pikers': '#616161', # a trader shade of..
|
||||
'beast': '#161616', # in the dark alone.
|
||||
|
||||
# bluish
|
||||
'charcoal': '#36454F',
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ Chart trading, the only way to scalp.
|
|||
from __future__ import annotations
|
||||
from contextlib import asynccontextmanager
|
||||
from dataclasses import dataclass, field
|
||||
from decimal import Decimal
|
||||
from functools import partial
|
||||
from pprint import pformat
|
||||
import time
|
||||
|
|
@ -41,7 +42,6 @@ from piker.accounting import (
|
|||
Position,
|
||||
mk_allocator,
|
||||
MktPair,
|
||||
Symbol,
|
||||
)
|
||||
from piker.clearing import (
|
||||
open_ems,
|
||||
|
|
@ -143,6 +143,15 @@ class OrderMode:
|
|||
}
|
||||
_staged_order: Order | None = None
|
||||
|
||||
@property
|
||||
def curr_mkt(self) -> MktPair:
|
||||
'''
|
||||
Deliver the currently selected `MktPair` according
|
||||
chart state.
|
||||
|
||||
'''
|
||||
return self.chart.linked.mkt
|
||||
|
||||
def on_level_change_update_next_order_info(
|
||||
self,
|
||||
level: float,
|
||||
|
|
@ -172,7 +181,11 @@ class OrderMode:
|
|||
line.update_labels(order_info)
|
||||
|
||||
# update bound-in staged order
|
||||
order.price = level
|
||||
mkt: MktPair = self.curr_mkt
|
||||
order.price: Decimal = mkt.quantize(
|
||||
size=level,
|
||||
quantity_type='price',
|
||||
)
|
||||
order.size = order_info['size']
|
||||
|
||||
# when an order is changed we flip the settings side-pane to
|
||||
|
|
@ -187,7 +200,9 @@ class OrderMode:
|
|||
|
||||
) -> LevelLine:
|
||||
|
||||
level = order.price
|
||||
# TODO, if we instead just always decimalize at the ems layer
|
||||
# we can avoid this back-n-forth casting?
|
||||
level = float(order.price)
|
||||
|
||||
line = order_line(
|
||||
chart or self.chart,
|
||||
|
|
@ -224,7 +239,11 @@ class OrderMode:
|
|||
# the order mode allocator but we still need to update the
|
||||
# "staged" order message we'll send to the ems
|
||||
def update_order_price(y: float) -> None:
|
||||
order.price = y
|
||||
mkt: MktPair = self.curr_mkt
|
||||
order.price: Decimal = mkt.quantize(
|
||||
size=y,
|
||||
quantity_type='price',
|
||||
)
|
||||
|
||||
line._on_level_change = update_order_price
|
||||
|
||||
|
|
@ -275,34 +294,31 @@ class OrderMode:
|
|||
chart = cursor.linked.chart
|
||||
if (
|
||||
not chart
|
||||
and cursor
|
||||
and cursor.active_plot
|
||||
and
|
||||
cursor
|
||||
and
|
||||
cursor.active_plot
|
||||
):
|
||||
return
|
||||
|
||||
chart = cursor.active_plot
|
||||
price = cursor._datum_xy[1]
|
||||
price: float = cursor._datum_xy[1]
|
||||
if not price:
|
||||
# zero prices are not supported by any means
|
||||
# since that's illogical / a no-op.
|
||||
return
|
||||
|
||||
mkt: MktPair = self.chart.linked.mkt
|
||||
|
||||
# NOTE : we could also use instead,
|
||||
# mkt.quantize(price, quantity_type='price')
|
||||
# but it returns a Decimal and it's probably gonna
|
||||
# be slower?
|
||||
# TODO: should we be enforcing this precision
|
||||
# at a different layer in the stack? right now
|
||||
# any precision error will literally be relayed
|
||||
# all the way back from the backend.
|
||||
|
||||
price = round(
|
||||
price,
|
||||
ndigits=mkt.price_tick_digits,
|
||||
# at a different layer in the stack?
|
||||
# |_ might require `MktPair` tracking in the EMS?
|
||||
# |_ right now any precision error will be relayed
|
||||
# all the way back from the backend and vice-versa..
|
||||
#
|
||||
mkt: MktPair = self.curr_mkt
|
||||
price: Decimal = mkt.quantize(
|
||||
size=price,
|
||||
quantity_type='price',
|
||||
)
|
||||
|
||||
order = self._staged_order = Order(
|
||||
action=action,
|
||||
price=price,
|
||||
|
|
@ -378,7 +394,7 @@ class OrderMode:
|
|||
'oid': oid,
|
||||
})
|
||||
|
||||
if order.price <= 0:
|
||||
if float(order.price) <= 0:
|
||||
log.error(
|
||||
'*!? Invalid `Order.price <= 0` ?!*\n'
|
||||
# TODO: make this present multi-line in object form
|
||||
|
|
@ -515,14 +531,15 @@ class OrderMode:
|
|||
# if an order msg is provided update the line
|
||||
# **from** that msg.
|
||||
if order:
|
||||
if order.price <= 0:
|
||||
price: float = float(order.price)
|
||||
if price <= 0:
|
||||
log.error(f'Order has 0 price, cancelling..\n{order}')
|
||||
self.cancel_orders([order.oid])
|
||||
return None
|
||||
|
||||
line.set_level(order.price)
|
||||
line.set_level(price)
|
||||
self.on_level_change_update_next_order_info(
|
||||
level=order.price,
|
||||
level=price,
|
||||
line=line,
|
||||
order=order,
|
||||
# use the corresponding position tracker for the
|
||||
|
|
@ -681,9 +698,9 @@ class OrderMode:
|
|||
) -> Dialog | None:
|
||||
# NOTE: the `.order` attr **must** be set with the
|
||||
# equivalent order msg in order to be loaded.
|
||||
order = msg.req
|
||||
order: Order = msg.req
|
||||
oid = str(msg.oid)
|
||||
symbol = order.symbol
|
||||
symbol: str = order.symbol
|
||||
|
||||
# TODO: MEGA UGGG ZONEEEE!
|
||||
src = msg.src
|
||||
|
|
@ -702,13 +719,22 @@ class OrderMode:
|
|||
order.oid = str(order.oid)
|
||||
order.brokers = [brokername]
|
||||
|
||||
# TODO: change this over to `MktPair`, but it's
|
||||
# gonna be tough since we don't have any such data
|
||||
# really in our clearing msg schema..
|
||||
order.symbol = Symbol.from_fqme(
|
||||
fqsn=fqme,
|
||||
info={},
|
||||
)
|
||||
# ?TODO? change this over to `MktPair`, but it's gonna be
|
||||
# tough since we don't have any such data really in our
|
||||
# clearing msg schema..
|
||||
# BUT WAIT! WHY do we even want/need this!?
|
||||
#
|
||||
# order.symbol = self.curr_mkt
|
||||
#
|
||||
# XXX, the old approach.. which i don't quire member why..
|
||||
# -[ ] verify we for sure don't require this any more!
|
||||
# |_https://github.com/pikers/piker/issues/517
|
||||
#
|
||||
# order.symbol = Symbol.from_fqme(
|
||||
# fqsn=fqme,
|
||||
# info={},
|
||||
# )
|
||||
|
||||
maybe_dialog: Dialog | None = self.submit_order(
|
||||
send_msg=False,
|
||||
order=order,
|
||||
|
|
@ -766,6 +792,7 @@ async def open_order_mode(
|
|||
brokerd_accounts,
|
||||
ems_dialog_msgs,
|
||||
),
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
|
||||
):
|
||||
|
|
@ -1101,7 +1128,7 @@ async def process_trade_msg(
|
|||
)
|
||||
)
|
||||
):
|
||||
msg.req = order
|
||||
msg.req: Order = order
|
||||
dialog: (
|
||||
Dialog
|
||||
# NOTE: on an invalid order submission (eg.
|
||||
|
|
@ -1166,7 +1193,7 @@ async def process_trade_msg(
|
|||
tm = time.time()
|
||||
mode.on_fill(
|
||||
oid,
|
||||
price=req.price,
|
||||
price=float(req.price),
|
||||
time_s=tm,
|
||||
)
|
||||
mode.lines.remove_line(uuid=oid)
|
||||
|
|
@ -1221,7 +1248,7 @@ async def process_trade_msg(
|
|||
tm = details['broker_time']
|
||||
mode.on_fill(
|
||||
oid,
|
||||
price=details['price'],
|
||||
price=float(details['price']),
|
||||
time_s=tm,
|
||||
pointing='up' if action == 'buy' else 'down',
|
||||
)
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ name = "piker"
|
|||
version = "0.1.0a0dev0"
|
||||
description = "trading gear for hackers"
|
||||
authors = [{ name = "Tyler Goodlet", email = "goodboy_foss@protonmail.com" }]
|
||||
requires-python = ">=3.12, <3.13"
|
||||
requires-python = ">=3.12"
|
||||
license = "AGPL-3.0-or-later"
|
||||
readme = "README.rst"
|
||||
keywords = [
|
||||
|
|
@ -39,8 +39,8 @@ classifiers = [
|
|||
"Operating System :: POSIX :: Linux",
|
||||
"Programming Language :: Python :: Implementation :: CPython",
|
||||
"Programming Language :: Python :: 3 :: Only",
|
||||
"Programming Language :: Python :: 3.11",
|
||||
"Programming Language :: Python :: 3.12",
|
||||
"Programming Language :: Python :: 3.13",
|
||||
"Intended Audience :: Financial and Insurance Industry",
|
||||
"Intended Audience :: Science/Research",
|
||||
"Intended Audience :: Developers",
|
||||
|
|
@ -49,13 +49,13 @@ classifiers = [
|
|||
dependencies = [
|
||||
"async-generator >=1.10, <2.0.0",
|
||||
"attrs >=23.1.0, <24.0.0",
|
||||
"bidict >=0.22.1, <0.23.0",
|
||||
"bidict >=0.23.1",
|
||||
"colorama >=0.4.6, <0.5.0",
|
||||
"colorlog >=6.7.0, <7.0.0",
|
||||
"ib-insync >=0.9.86, <0.10.0",
|
||||
"numba >=0.59.0, <0.60.0",
|
||||
"numpy >=1.25, <2.0",
|
||||
"polars >=0.18.13, <0.19.0",
|
||||
"numpy>=2.0",
|
||||
"polars >=0.20.6",
|
||||
"polars-fuzzy-match>=0.1.5",
|
||||
"pygments >=2.16.1, <3.0.0",
|
||||
"rich >=13.5.2, <14.0.0",
|
||||
"tomli >=2.0.1, <3.0.0",
|
||||
|
|
@ -65,16 +65,18 @@ dependencies = [
|
|||
"typer >=0.9.0, <1.0.0",
|
||||
"rapidfuzz >=3.5.2, <4.0.0",
|
||||
"pdbp >=1.5.0, <2.0.0",
|
||||
"trio >=0.24, <0.25",
|
||||
"trio >=0.27",
|
||||
"pendulum >=3.0.0, <4.0.0",
|
||||
"httpx >=0.27.0, <0.28.0",
|
||||
"cryptofeed >=2.4.0, <3.0.0",
|
||||
"pyarrow >=17.0.0, <18.0.0",
|
||||
"pyarrow>=18.0.0",
|
||||
"websockets ==12.0",
|
||||
"msgspec",
|
||||
"msgspec>=0.19.0,<0.20",
|
||||
"tractor",
|
||||
"asyncvnc",
|
||||
"tomlkit",
|
||||
"trio-typing>=0.10.0",
|
||||
"numba>=0.61.0",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
|
|
@ -125,9 +127,19 @@ include = ["piker"]
|
|||
[tool.hatch.build.targets.wheel]
|
||||
include = ["piker"]
|
||||
|
||||
|
||||
# TODO? move to a `uv.toml`?
|
||||
[tool.uv]
|
||||
python-preference = 'system'
|
||||
python-downloads = 'manual'
|
||||
|
||||
|
||||
[tool.uv.sources]
|
||||
pyqtgraph = { git = "https://github.com/pikers/pyqtgraph.git" }
|
||||
asyncvnc = { git = "https://github.com/pikers/asyncvnc.git", branch = "main" }
|
||||
tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" }
|
||||
msgspec = { git = "https://github.com/jcrist/msgspec.git" }
|
||||
tractor = { path = "../tractor", editable = true }
|
||||
|
||||
# XXX, since we're like, always hacking new shite atm.. Bp
|
||||
tractor = { git = "https://pikers.dev/goodboy/tractor", branch = "piker_pin" }
|
||||
# tractor = { git = "https://pikers.dev/goodboy/tractor", branch = "main" }
|
||||
# tractor = { path = "../tractor", editable = true }
|
||||
|
|
|
|||
|
|
@ -62,8 +62,9 @@ ignore-init-module-imports = false
|
|||
fixable = ["ALL"]
|
||||
unfixable = []
|
||||
|
||||
# TODO? uhh why no work!?
|
||||
# Allow unused variables when underscore-prefixed.
|
||||
dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
|
||||
# dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
|
||||
|
||||
[format]
|
||||
# Use single quotes in `ruff format`.
|
||||
|
|
|
|||
|
|
@ -0,0 +1 @@
|
|||
TAG_feed_status_update ./piker/data/feed.py /TAG_feed_status_update/
|
||||
|
|
@ -179,7 +179,7 @@ def test_ems_err_on_bad_broker(
|
|||
# NOTE: emsd should error on the actor's enabled modules
|
||||
# import phase, when looking for a backend named `doggy`.
|
||||
except tractor.RemoteActorError as re:
|
||||
assert re.type == ModuleNotFoundError
|
||||
assert re.type is ModuleNotFoundError
|
||||
|
||||
run_and_tollerate_cancels(load_bad_fqme)
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue