Compare commits

..

47 Commits

Author SHA1 Message Date
Tyler Goodlet d66d1c1597 binance: add `AggTrade.nq: float`: "normal quantity" field.. 2026-01-06 21:46:09 -05:00
Tyler Goodlet de232215e1 binance: handle new `TRADIFI_PERPETUAL`.. 2026-01-06 21:46:09 -05:00
Tyler Goodlet 30679dc478 binance: add `Pair.opoAllowed` field
Handle new API field per 2025-12-02 update.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-06 21:46:09 -05:00
Tyler Goodlet 351d898915 binance: set `Pair.pegInstructionsAllowed = False`
Lol, a cheeky unforeseen bug due to TOML's lack of a null type and
thinking i can render an `Optional` field on a `msgspec.Struct`
(defaulted to `None`) the `binance.symcache.toml` cache file..

I didn't catch this when i first updated to the 3.1 API in f7caa75228
because i never did a cache-files flush.. lesson learned and we **really
need tests for this**!!
2026-01-06 21:46:09 -05:00
Tyler Goodlet 93c1f8ede5 Add fix for binance API 3.1 rollout..
See https://developers.binance.com/docs/binance-spot-api-docs#2025-08-26
2026-01-06 21:46:09 -05:00
Tyler Goodlet 7e7d678bfb kraken: add crash-handling around `Pair()` init
Since it can otherwise be difficult to debug due to nursery cancellation
(we need that taskman yo!).
2026-01-06 21:46:09 -05:00
Tyler Goodlet 300670af96 kraken: `Pair.costmin` is now optional?
Some pairs don't seem to define it but it's not listed as deprecated on
official API page (new one now linked in type def's doc string).
2026-01-06 21:46:09 -05:00
Tyler Goodlet 991e1014e4 binance: add new `permissionSets` to base `Pair` 2026-01-06 21:46:09 -05:00
Tyler Goodlet 7a0b2b4dae Update `binance` spot pairs with `amendAllowed`
As per API updates,
https://developers.binance.com/docs/binance-spot-api-docs
https://developers.binance.com/docs/binance-spot-api-docs/faqs/order_amend_keep_priority

I also slightly tweaked the filed mismatch exception note to include the
`repr(pair_type)` so the dev can know which pair types should be
changed.
2026-01-06 21:46:09 -05:00
Tyler Goodlet adb687193a `.kraken`: add masked pauses for order req debug
Such that the next time i inevitably must debug the some order-request
error status or precision discrepancy, i have the mkt-symbol branch
ready to go. Also, switch to `'action': 'buy'|'sell' as action,` style
`case` matching instead of the post-`if` predicate style.
2026-01-06 21:46:09 -05:00
Tyler Goodlet 9003c28fb4 `.questrade`: link in ws-API issue! 2026-01-06 21:46:09 -05:00
Tyler Goodlet 3047ba260a `.kraken.broker`: need to `await verify_balances()` .. 2026-01-06 21:46:09 -05:00
Tyler Goodlet 4d950f15a6 `.brokers.ib.feed`: better `tractor.to_asyncio` typing and var naming throughout! 2026-01-06 21:46:09 -05:00
Tyler Goodlet 4f215bfc7d `.brokers.cli`: module type and todo for `--pdb` flag to NOT src from sub-cmd 2026-01-06 21:46:08 -05:00
Tyler Goodlet ec9c03408a Type loaded backend modules 2026-01-06 21:46:08 -05:00
Tyler Goodlet ede617b60e Use `pytest` plugin now exposed by `tractor` 2026-01-06 21:44:19 -05:00
Tyler Goodlet 8313ae2e96 `.ui._search`: collapse EGs as needed, use `tn` naming. 2026-01-06 21:44:19 -05:00
Tyler Goodlet 095773e7da Port `.data._web_bs` stuff to strict-EGs
Using `tractor.trionics.collapse_eg()` as needed and doing
some renames, in similar style as elsewhere:
- `pcs` -> `rent_cs`,
- `n` -> `tn` for nursery handles,

Also,
- tweak the `._reconnect_forever()` while loop to use the
  (also) `trio`-internal
  `mc_state: trio._channel.MemoryChannelState = snd._state` instead
  of `snd._close` to poll for open send/receive consumer task counts
  since,
    1. it seems more reliable then using the `snd._closed`,
    2. there's no other way to access the info.. afaik?

- handle `ConnectionRejected` explicitly alongside handshake-errs as
  a retry case.
- add a base-exc handler which `.exception()` reports the reconnect
  attempt failure explicitly.
- drop some lingering `Optional` usage.
2026-01-06 21:44:19 -05:00
Tyler Goodlet e99d3e6282 Port `.cli` & `.service` to latest `tractor` registry APIs
Namely changes for the `registry_addrs: list`, enable_transports: list`
and related `tractor._addr` primitive requirements.

Other updates include,
- passing `maybe_enable_greenback=True`,
- additional exc logging around `pikerd` syncing/booting,
- changing to newer `Context.wait_for_result()`,
- dropping (unnecessary?) `maybe_open_crash_handler()` around `pikerd` ep.
2026-01-06 21:44:19 -05:00
Tyler Goodlet 819bd990d2 binance; unmask around send-chan @acm usage 2026-01-06 21:44:19 -05:00
Tyler Goodlet 905352ff2f Spurious first-draft of EG collapsing
Topically, throughout various (seemingly) console-UX-affecting or benign
spots in the code base; nothing that required more intervention beyond
things superficial. A few spots also include `trio.Nursery` ref renames
(always to something with a `tn` in it) and log-level reductions to
quiet (benign) console noise oriented around issues meant to be solved
long..

Note there's still a couple spots i left with the loose-ify flag because
i haven't fully tested them without using the latest version of
`tractor.trionics.collapse_eg()`, but more then likely they should flip
over fine.
2026-01-06 21:44:19 -05:00
Tyler Goodlet 641b85df70 Use `.trionics.collapse_eg()` in `.deribit.api`
Commit this change separate from the (original) broader set applied to
the entire code base since the `.deribit.api` mod contained changes from
upstream max-pain work (from our very own @nt) which caused a noticeable
conflict and intros un-required changes from his work to re-enable
`deribit` support.

Note the original commit, "69eac7bb Spurious first-draft of EG
collapsing", applied similar changes through the rest of the code base.
AGAIN, this mod's change is only being broken out to minimize upstream
change conflicts due to updates to the `deribit` backend done earlier in
time-history.
2026-01-06 21:44:19 -05:00
Tyler Goodlet 72415fce63 Try running daemons on UDS tpt
The root daemon, pikerd, needs to be adjusted to use diff default
registry addrs to also utilize non-TCP, but for now this gets us started
testing; so far so good B)
2026-01-06 21:44:19 -05:00
Tyler Goodlet c3bc5b6783 Adjust feed status fields/display-pane to new actor-ID
That is to use the new `tractor.msg.types.Aid` struct to pull the
`brokerd` info from the `tractor.Channel.aid: Aid` attr as well as more
generally handling the new `Channel.raddr.proto_key: str` and no longer
assuming a TCP IPC transport; this per the recent `tractor.ipc`
subsys which adds multi-IPC-transports!

Downstream tweaks to match,
- use an "opt-in" field set to display in the `brokerd` info pane in
  `.ui._feedstatus.mk_feed_label()`.
 |_ also add some todos and drop some seemingly unneeded form sizing
    calcs?
- tweak `.ui._label` to allow not using markdown, though ended up not
  doing that since it looked too plain..
2026-01-06 21:44:19 -05:00
Tyler Goodlet 4067acee04 Adjust to `trio`'s strict eg nurseries throughout!
Using `tractor.trionics.collapse_eg()` as needed to avoid, at the least,
crash-worthy (in debug-mode REPL-ing terms) nested cancellation egs that
exhibit on SIGINT/ctl-c of each "app" (chart & daemon).

Also a bit of renaming of all `trio.Nursery`s to `tn`, the new "task
nursery" shorthand-var-name being used in all our other `tractor`
related projects.
2026-01-06 21:44:19 -05:00
Tyler Goodlet 1462496b2a Port to newer `tractor.get_registry()` 2026-01-06 21:44:19 -05:00
Tyler Goodlet e4e69b45ba Update legacy type to `tractor.MsgStream` 2026-01-06 21:44:19 -05:00
Tyler Goodlet 838ddd6e79 Fix type-check assertion in ems test to use `is` 2026-01-06 21:43:59 -05:00
Tyler Goodlet aaf2dbcd79 Cast to `float` as needed from order-mode and ems
Since we're not quite yet using automatic typed msging from
`tractor`/`msgspec` (i.e. still manually decoding order ctl msgs from
built-in types..`dict`s still not `msgspec.Struct`) this adds the
appropriate typecasting ops to ensure the required precision is attained
prior to processing and/or submission to a brokerd backend service.

For the `.clearing._ems`,
- flip all `trigger_price` previously presumed to be `float` to just
  the field-identical `price: Decimal` and ensure we cast to `float`
  for any `trigger_price` usage, like before passing to `mk_check()`.

For `.ui.order_mode.OrderMode`,
- add a new `.curr_mkt: MktPair` convenience property to get the
  chart-active value.
- ensure we always use the `.curr_mkt.quantize() -> Decimal` before
  setting any IPC-msg's `.price` field!
- always cast `float(Order.price)` before use in setting line-levels.
- don't bother setting `Order.symbol` to a (now fully removed) `Symbol`
  instance since it's not really required-for-use anywhere; leaving it
  a `str` (per the type-annot) is fine for now?
2026-01-06 21:43:59 -05:00
Tyler Goodlet cf976ff12b Mk `Brokerd[Order].price` avoid `float`-errs
By re-typing to a `.price: Decimal` field on both legs of the EMS.

It seems we must do it ourselves since,
- these msg's (fields) are relayed through the clearing engine to each
  `brokerd` backend and,
- bc many (if not all) of those backends `.broker`-clients (nor their
  encapsulated "brokerage services") **are not** doing any
  precision-truncation themselves.

So, for now, instead we opt to expect rounding at the source. This means
we will explicitly require casting to/from `float` at the line-graphics
interface to the order-clearing-engine (as implemented throughout
`.ui.order_mode.OrderMode`); and this is coming shortly.
2026-01-06 21:43:59 -05:00
Gud Boi fa0d088ebc Merge pull request 'rando_data_subsys_styling
Reviewed-on: https://www.pikers.dev/pikers/piker/pulls/58

Mostly `.data` subsys styling from feats branches' (#58) from rando_data_subsys_styling into main
2026-01-07 02:43:35 +00:00
Tyler Goodlet dc61e6fc4f Report with `{fqme!r}` in feed allocator for type clarity 2026-01-06 19:33:23 -05:00
Tyler Goodlet b2b0e4c40d `.config.get_app_dir()`: link to `click`'s orig impl on GH 2026-01-06 19:33:23 -05:00
Tyler Goodlet 4b1fa2173b Touch `conf.toml` by default when dne? 2026-01-06 19:33:23 -05:00
Tyler Goodlet b3d345fc41 Wow, update root `conf.toml` to new multiaddr style
I don't know how this wasn't already committed but.. drops the legacy
`marketstore` tsdb socket info vars since we're going all in on
`nativedb` BP
2026-01-06 19:33:23 -05:00
Tyler Goodlet 0282e632f9 `data._symcache`, impl a summary `.__repr__()`, avoids `Asset` causality issues 2026-01-06 19:33:23 -05:00
Tyler Goodlet 7e600b3901 Avoid `msgspec` eval-err on `Asset` in symcache? 2026-01-06 19:33:23 -05:00
Tyler Goodlet dbe2567fe8 Flip screen-info script to qt6, refine it to heck.
Buncha updates and improvements,
- adjust sub-namespace imports according to console warnings.
- iterate all detected screens in a loop and instead report which is the
  primary and the current.
- type annotate all vars where non-obvious, particularly the`Qt` refs.
2026-01-06 19:33:23 -05:00
Tyler Goodlet 60df863a6a Mk a `notes_to_self/` move orig file `ideas.rst' 2026-01-06 19:33:23 -05:00
Tyler Goodlet 2d44a9afaa Drop old/masked ahab-docker daemon starting 2026-01-06 19:33:23 -05:00
Tyler Goodlet 57a5903ccf Start a manual `tags` file for internal refs 2026-01-06 19:33:23 -05:00
Tyler Goodlet cbe0cbd29c Add a couple new grays to the pallete 2026-01-06 19:33:23 -05:00
Tyler Goodlet 2158e27a66 Add missing f-str prefix to log line 2026-01-06 19:33:23 -05:00
Tyler Goodlet 323290d20b Teensie `piker.data` styling tweaks
- use more compact optional value style with `|`-union
- fix `.flows` typing-only import since we need `MktPair` to be
  immediately defined for use on a `msgspec.Struct` field.
- more "tree-like" warning msg in `.validate()` reporting.
2026-01-06 19:33:23 -05:00
Gud Boi 4dd7391da7 Merge pull request 'bump_polars: new version with API adjustments' (#57) from bump_polars into main
Reviewed-on: https://www.pikers.dev/pikers/piker/pulls/57
2026-01-06 23:02:07 +00:00
Tyler Goodlet 2ced05c4d5 `polars.cumsum()` is now `.cum_sum()` 2026-01-06 16:10:36 -05:00
Tyler Goodlet e10f3a16dd Bump to (latest) `polars`, the `0.20.6x` series B)
Since I was trying out the neat lookin `polars-fuzzy-match` (also added
for now as a core dep here) which requires the new plugin sys, plus it's
about time we synced with upstream!

Adjust some column syntax to the new `.name` sub-field-space and the
`uv` lock-file to match.

Other,
- add back `trio-typing` bc i guess something else needs it (debug
  tooling stuff in new `tractor`?)
- flip back to the `tractor` pre-main pin since the new `main`-branch
  requires new `trio` stuff we haven't ported yet..
2026-01-06 16:10:36 -05:00
45 changed files with 634 additions and 388 deletions

View File

@ -1,7 +1,9 @@
[network] [network]
tsdb.backend = 'marketstore' pikerd = [
tsdb.host = 'localhost' '/ipv4/127.0.0.1/tcp/6116', # std localhost daemon-actor tree
tsdb.grpc_port = 5995 # '/uds/6116', # TODO std uds socket file
]
[ui] [ui]
# set custom font + size which will scale entire UI # set custom font + size which will scale entire UI

View File

@ -121,6 +121,7 @@ async def bot_main():
# tick_throttle=10, # tick_throttle=10,
) as feed, ) as feed,
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
assert accounts assert accounts

View File

@ -365,7 +365,11 @@ class Position(Struct):
# added: bool = False # added: bool = False
tid: str = t.tid tid: str = t.tid
if tid in self._events: if tid in self._events:
log.warning(f'{t} is already added?!') log.debug(
f'Txn is already added?\n'
f'\n'
f'{t}\n'
)
# return added # return added
# TODO: apparently this IS possible with a dict but not # TODO: apparently this IS possible with a dict but not
@ -731,7 +735,7 @@ class Account(Struct):
else: else:
# TODO: we reallly need a diff set of # TODO: we reallly need a diff set of
# loglevels/colors per subsys. # loglevels/colors per subsys.
log.warning( log.debug(
f'Recent position for {fqme} was closed!' f'Recent position for {fqme} was closed!'
) )

View File

@ -465,7 +465,7 @@ def ledger_to_dfs(
df = dfs[key] = ldf.with_columns([ df = dfs[key] = ldf.with_columns([
pl.cumsum('size').alias('cumsize'), pl.cum_sum('size').alias('cumsize'),
# amount of source asset "sent" (via buy txns in # amount of source asset "sent" (via buy txns in
# the market) to acquire the dst asset, PER txn. # the market) to acquire the dst asset, PER txn.
@ -480,7 +480,7 @@ def ledger_to_dfs(
]).with_columns([ ]).with_columns([
# rolling balance in src asset units # rolling balance in src asset units
(pl.col('dst_bot').cumsum() * -1).alias('src_balance'), (pl.col('dst_bot').cum_sum() * -1).alias('src_balance'),
# "position operation type" in terms of increasing the # "position operation type" in terms of increasing the
# amount in the dst asset (entering) or decreasing the # amount in the dst asset (entering) or decreasing the
@ -622,7 +622,7 @@ def ledger_to_dfs(
# cost that was included in the least-recently # cost that was included in the least-recently
# entered txn that is still part of the current CSi # entered txn that is still part of the current CSi
# set. # set.
# => we look up the cost-per-unit cumsum and apply # => we look up the cost-per-unit cum_sum and apply
# if over the current txn size (by multiplication) # if over the current txn size (by multiplication)
# and then reverse that previusly applied cost on # and then reverse that previusly applied cost on
# the txn_cost for this record. # the txn_cost for this record.

View File

@ -96,7 +96,10 @@ async def _setup_persistent_brokerd(
# - `open_symbol_search()` # - `open_symbol_search()`
# NOTE: see ep invocation details inside `.data.feed`. # NOTE: see ep invocation details inside `.data.feed`.
try: try:
async with trio.open_nursery() as service_nursery: async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as service_nursery
):
bus: _FeedsBus = feed.get_feed_bus( bus: _FeedsBus = feed.get_feed_bus(
brokername, brokername,
service_nursery, service_nursery,

View File

@ -440,6 +440,7 @@ async def open_trade_dialog(
# - ledger: TransactionLedger # - ledger: TransactionLedger
async with ( async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
): ):

View File

@ -450,7 +450,6 @@ async def subscribe(
async def stream_quotes( async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,
symbols: list[str], symbols: list[str],
feed_is_live: trio.Event, feed_is_live: trio.Event,
@ -462,6 +461,7 @@ async def stream_quotes(
) -> None: ) -> None:
async with ( async with (
tractor.trionics.maybe_raise_from_masking_exc(),
send_chan as send_chan, send_chan as send_chan,
open_cached_client('binance') as client, open_cached_client('binance') as client,
): ):

View File

@ -31,7 +31,7 @@ from typing import (
Callable, Callable,
) )
import pendulum from pendulum import now
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from rapidfuzz import process as fuzzy from rapidfuzz import process as fuzzy
@ -39,6 +39,7 @@ import numpy as np
from tractor.trionics import ( from tractor.trionics import (
broadcast_receiver, broadcast_receiver,
maybe_open_context maybe_open_context
collapse_eg,
) )
from tractor import to_asyncio from tractor import to_asyncio
# XXX WOOPS XD # XXX WOOPS XD
@ -432,6 +433,7 @@ async def get_client(
) -> Client: ) -> Client:
async with ( async with (
collapse_eg(),
trio.open_nursery() as n, trio.open_nursery() as n,
open_jsonrpc_session( open_jsonrpc_session(
_testnet_ws_url, dtype=JSONRPCResult) as json_rpc _testnet_ws_url, dtype=JSONRPCResult) as json_rpc

View File

@ -48,6 +48,7 @@ from bidict import bidict
import trio import trio
import tractor import tractor
from tractor import to_asyncio from tractor import to_asyncio
from tractor import trionics
from pendulum import ( from pendulum import (
from_timestamp, from_timestamp,
DateTime, DateTime,
@ -1369,8 +1370,8 @@ async def load_clients_for_trio(
''' '''
Pure async mngr proxy to ``load_aio_clients()``. Pure async mngr proxy to ``load_aio_clients()``.
This is a bootstrap entrypoing to call from This is a bootstrap entrypoint to call from
a ``tractor.to_asyncio.open_channel_from()``. a `tractor.to_asyncio.open_channel_from()`.
''' '''
async with load_aio_clients( async with load_aio_clients(
@ -1391,7 +1392,10 @@ async def open_client_proxies() -> tuple[
async with ( async with (
tractor.trionics.maybe_open_context( tractor.trionics.maybe_open_context(
acm_func=tractor.to_asyncio.open_channel_from, acm_func=tractor.to_asyncio.open_channel_from,
kwargs={'target': load_clients_for_trio}, kwargs={
'target': load_clients_for_trio,
# ^XXX, kwarg to `open_channel_from()`
},
# lock around current actor task access # lock around current actor task access
# TODO: maybe this should be the default in tractor? # TODO: maybe this should be the default in tractor?
@ -1584,7 +1588,8 @@ async def open_client_proxy(
event_consumers=event_table, event_consumers=event_table,
) as (first, chan), ) as (first, chan),
trio.open_nursery() as relay_n, trionics.collapse_eg(), # loose-ify
trio.open_nursery() as relay_tn,
): ):
assert isinstance(first, Client) assert isinstance(first, Client)
@ -1624,7 +1629,7 @@ async def open_client_proxy(
continue continue
relay_n.start_soon(relay_events) relay_tn.start_soon(relay_events)
yield proxy yield proxy

View File

@ -34,6 +34,7 @@ import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from tractor.to_asyncio import LinkedTaskChannel from tractor.to_asyncio import LinkedTaskChannel
from tractor import trionics
from ib_insync.contract import ( from ib_insync.contract import (
Contract, Contract,
) )
@ -407,7 +408,7 @@ async def update_and_audit_pos_msg(
# TODO: make this a "propaganda" log level? # TODO: make this a "propaganda" log level?
if ibpos.avgCost != msg.avg_price: if ibpos.avgCost != msg.avg_price:
log.warning( log.debug(
f'IB "FIFO" avg price for {msg.symbol} is DIFF:\n' f'IB "FIFO" avg price for {msg.symbol} is DIFF:\n'
f'ib: {ibfmtmsg}\n' f'ib: {ibfmtmsg}\n'
'---------------------------\n' '---------------------------\n'
@ -738,7 +739,7 @@ async def open_trade_dialog(
f'UNEXPECTED POSITION says IB => {msg.symbol}\n' f'UNEXPECTED POSITION says IB => {msg.symbol}\n'
'Maybe they LIQUIDATED YOU or your ledger is wrong?\n' 'Maybe they LIQUIDATED YOU or your ledger is wrong?\n'
) )
log.error(logmsg) log.debug(logmsg)
await ctx.started(( await ctx.started((
all_positions, all_positions,
@ -747,21 +748,22 @@ async def open_trade_dialog(
async with ( async with (
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
trio.open_nursery() as n, trionics.collapse_eg(),
trio.open_nursery() as tn,
): ):
# relay existing open orders to ems # relay existing open orders to ems
for msg in order_msgs: for msg in order_msgs:
await ems_stream.send(msg) await ems_stream.send(msg)
for client in set(aioclients.values()): for client in set(aioclients.values()):
trade_event_stream: LinkedTaskChannel = await n.start( trade_event_stream: LinkedTaskChannel = await tn.start(
open_trade_event_stream, open_trade_event_stream,
client, client,
) )
# start order request handler **before** local trades # start order request handler **before** local trades
# event loop # event loop
n.start_soon( tn.start_soon(
handle_order_requests, handle_order_requests,
ems_stream, ems_stream,
accounts_def, accounts_def,
@ -769,7 +771,7 @@ async def open_trade_dialog(
) )
# allocate event relay tasks for each client connection # allocate event relay tasks for each client connection
n.start_soon( tn.start_soon(
deliver_trade_events, deliver_trade_events,
trade_event_stream, trade_event_stream,

View File

@ -25,7 +25,10 @@ from typing import TYPE_CHECKING
import trio import trio
import tractor import tractor
from tractor.trionics import broadcast_receiver from tractor.trionics import (
broadcast_receiver,
collapse_eg,
)
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
@ -285,8 +288,11 @@ async def open_ems(
client._ems_stream = trades_stream client._ems_stream = trades_stream
# start sync code order msg delivery task # start sync code order msg delivery task
async with trio.open_nursery() as n: async with (
n.start_soon( collapse_eg(),
trio.open_nursery() as tn,
):
tn.start_soon(
relay_orders_from_sync_code, relay_orders_from_sync_code,
client, client,
fqme, fqme,
@ -302,4 +308,4 @@ async def open_ems(
) )
# stop the sync-msg-relay task on exit. # stop the sync-msg-relay task on exit.
n.cancel_scope.cancel() tn.cancel_scope.cancel()

View File

@ -42,6 +42,7 @@ from bidict import bidict
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from tractor import trionics
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
@ -76,7 +77,6 @@ if TYPE_CHECKING:
# TODO: numba all of this # TODO: numba all of this
def mk_check( def mk_check(
trigger_price: float, trigger_price: float,
known_last: float, known_last: float,
action: str, action: str,
@ -162,7 +162,7 @@ async def clear_dark_triggers(
router: Router, router: Router,
brokerd_orders_stream: tractor.MsgStream, brokerd_orders_stream: tractor.MsgStream,
quote_stream: tractor.ReceiveMsgStream, # noqa quote_stream: tractor.MsgStream,
broker: str, broker: str,
fqme: str, fqme: str,
@ -178,6 +178,7 @@ async def clear_dark_triggers(
''' '''
# XXX: optimize this for speed! # XXX: optimize this for speed!
# TODO: # TODO:
# - port to the new ringbuf stuff in `tractor.ipc`!
# - numba all this! # - numba all this!
# - this stream may eventually contain multiple symbols # - this stream may eventually contain multiple symbols
quote_stream._raise_on_lag = False quote_stream._raise_on_lag = False
@ -500,7 +501,7 @@ class Router(Struct):
''' '''
# setup at actor spawn time # setup at actor spawn time
nursery: trio.Nursery _tn: trio.Nursery
# broker to book map # broker to book map
books: dict[str, DarkBook] = {} books: dict[str, DarkBook] = {}
@ -670,7 +671,7 @@ class Router(Struct):
# dark book clearing loop, also lives with parent # dark book clearing loop, also lives with parent
# daemon to allow dark order clearing while no # daemon to allow dark order clearing while no
# client is connected. # client is connected.
self.nursery.start_soon( self._tn.start_soon(
clear_dark_triggers, clear_dark_triggers,
self, self,
relay.brokerd_stream, relay.brokerd_stream,
@ -693,7 +694,7 @@ class Router(Struct):
# spawn a ``brokerd`` order control dialog stream # spawn a ``brokerd`` order control dialog stream
# that syncs lifetime with the parent `emsd` daemon. # that syncs lifetime with the parent `emsd` daemon.
self.nursery.start_soon( self._tn.start_soon(
translate_and_relay_brokerd_events, translate_and_relay_brokerd_events,
broker, broker,
relay.brokerd_stream, relay.brokerd_stream,
@ -767,10 +768,12 @@ async def _setup_persistent_emsd(
global _router global _router
# open a root "service nursery" for the ``emsd`` actor # open a root "service task-nursery" for the `emsd`-actor
async with trio.open_nursery() as service_nursery: async with (
trionics.collapse_eg(),
_router = Router(nursery=service_nursery) trio.open_nursery() as tn
):
_router = Router(_tn=tn)
# TODO: send back the full set of persistent # TODO: send back the full set of persistent
# orders/execs? # orders/execs?
@ -1190,12 +1193,16 @@ async def process_client_order_cmds(
submitting live orders immediately if requested by the client. submitting live orders immediately if requested by the client.
''' '''
# cmd: dict # TODO, only allow `msgspec.Struct` form!
cmd: dict
async for cmd in client_order_stream: async for cmd in client_order_stream:
log.info(f'Received order cmd:\n{pformat(cmd)}') log.info(
f'Received order cmd:\n'
f'{pformat(cmd)}\n'
)
# CAWT DAMN we need struct support! # CAWT DAMN we need struct support!
oid = str(cmd['oid']) oid: str = str(cmd['oid'])
# register this stream as an active order dialog (msg flow) for # register this stream as an active order dialog (msg flow) for
# this order id such that translated message from the brokerd # this order id such that translated message from the brokerd
@ -1301,7 +1308,7 @@ async def process_client_order_cmds(
case { case {
'oid': oid, 'oid': oid,
'symbol': fqme, 'symbol': fqme,
'price': trigger_price, 'price': price,
'size': size, 'size': size,
'action': ('buy' | 'sell') as action, 'action': ('buy' | 'sell') as action,
'exec_mode': ('live' | 'paper'), 'exec_mode': ('live' | 'paper'),
@ -1333,7 +1340,7 @@ async def process_client_order_cmds(
symbol=sym, symbol=sym,
action=action, action=action,
price=trigger_price, price=price,
size=size, size=size,
account=req.account, account=req.account,
) )
@ -1355,7 +1362,11 @@ async def process_client_order_cmds(
# (``translate_and_relay_brokerd_events()`` above) will # (``translate_and_relay_brokerd_events()`` above) will
# handle relaying the ems side responses back to # handle relaying the ems side responses back to
# the client/cmd sender from this request # the client/cmd sender from this request
log.info(f'Sending live order to {broker}:\n{pformat(msg)}') log.info(
f'Sending live order to {broker}:\n'
f'{pformat(msg)}'
)
await brokerd_order_stream.send(msg) await brokerd_order_stream.send(msg)
# an immediate response should be ``BrokerdOrderAck`` # an immediate response should be ``BrokerdOrderAck``
@ -1371,7 +1382,7 @@ async def process_client_order_cmds(
case { case {
'oid': oid, 'oid': oid,
'symbol': fqme, 'symbol': fqme,
'price': trigger_price, 'price': price,
'size': size, 'size': size,
'exec_mode': exec_mode, 'exec_mode': exec_mode,
'action': action, 'action': action,
@ -1399,7 +1410,12 @@ async def process_client_order_cmds(
if isnan(last): if isnan(last):
last = flume.rt_shm.array[-1]['close'] last = flume.rt_shm.array[-1]['close']
pred = mk_check(trigger_price, last, action) trigger_price: float = float(price)
pred = mk_check(
trigger_price,
last,
action,
)
# NOTE: for dark orders currently we submit # NOTE: for dark orders currently we submit
# the triggered live order at a price 5 ticks # the triggered live order at a price 5 ticks
@ -1506,7 +1522,7 @@ async def maybe_open_trade_relays(
loglevel: str = 'info', loglevel: str = 'info',
): ):
fqme, relay, feed, client_ready = await _router.nursery.start( fqme, relay, feed, client_ready = await _router._tn.start(
_router.open_trade_relays, _router.open_trade_relays,
fqme, fqme,
exec_mode, exec_mode,

View File

@ -19,6 +19,7 @@ Clearing sub-system message and protocols.
""" """
from __future__ import annotations from __future__ import annotations
from decimal import Decimal
from typing import ( from typing import (
Literal, Literal,
) )
@ -71,7 +72,15 @@ class Order(Struct):
symbol: str # | MktPair symbol: str # | MktPair
account: str # should we set a default as '' ? account: str # should we set a default as '' ?
price: float # https://docs.python.org/3/library/decimal.html#decimal-objects
#
# ?TODO? decimal usage throughout?
# -[ ] possibly leverage the `Encoder(decimal_format='number')`
# bit?
# |_https://jcristharif.com/msgspec/supported-types.html#decimal
# -[ ] should we also use it for .size?
#
price: Decimal
size: float # -ve is "sell", +ve is "buy" size: float # -ve is "sell", +ve is "buy"
brokers: list[str] = [] brokers: list[str] = []
@ -178,7 +187,7 @@ class BrokerdOrder(Struct):
time_ns: int time_ns: int
symbol: str # fqme symbol: str # fqme
price: float price: Decimal
size: float size: float
# TODO: if we instead rely on a +ve/-ve size to determine # TODO: if we instead rely on a +ve/-ve size to determine

View File

@ -510,7 +510,7 @@ async def handle_order_requests(
reqid = await client.submit_limit( reqid = await client.submit_limit(
oid=order.oid, oid=order.oid,
symbol=f'{order.symbol}.{client.broker}', symbol=f'{order.symbol}.{client.broker}',
price=order.price, price=float(order.price),
action=order.action, action=order.action,
size=order.size, size=order.size,
# XXX: by default 0 tells ``ib_insync`` methods that # XXX: by default 0 tells ``ib_insync`` methods that

View File

@ -134,8 +134,8 @@ def pikerd(
Spawn the piker broker-daemon. Spawn the piker broker-daemon.
''' '''
from tractor.devx import maybe_open_crash_handler # from tractor.devx import maybe_open_crash_handler
with maybe_open_crash_handler(pdb=pdb): # with maybe_open_crash_handler(pdb=False):
log = get_console_log(loglevel, name='cli') log = get_console_log(loglevel, name='cli')
if pdb: if pdb:
@ -178,39 +178,18 @@ def pikerd(
async def main(): async def main():
service_mngr: service.Services service_mngr: service.Services
async with ( async with (
service.open_pikerd( service.open_pikerd(
registry_addrs=regaddrs, registry_addrs=regaddrs,
loglevel=loglevel, loglevel=loglevel,
debug_mode=pdb, debug_mode=pdb,
enable_transports=['uds'],
) as service_mngr, # normally delivers a ``Services`` handle # enable_transports=['tcp'],
) as service_mngr,
# AsyncExitStack() as stack,
): ):
# TODO: spawn all other sub-actor daemons according to
# multiaddress endpoint spec defined by user config
assert service_mngr assert service_mngr
# ?TODO? spawn all other sub-actor daemons according to
# if tsdb: # multiaddress endpoint spec defined by user config
# dname, conf = await stack.enter_async_context(
# service.marketstore.start_ahab_daemon(
# service_mngr,
# loglevel=loglevel,
# )
# )
# log.info(f'TSDB `{dname}` up with conf:\n{conf}')
# if es:
# dname, conf = await stack.enter_async_context(
# service.elastic.start_ahab_daemon(
# service_mngr,
# loglevel=loglevel,
# )
# )
# log.info(f'DB `{dname}` up with conf:\n{conf}')
await trio.sleep_forever() await trio.sleep_forever()
trio.run(main) trio.run(main)
@ -328,6 +307,10 @@ def services(config, tl, ports):
if not ports: if not ports:
ports = [_default_registry_port] ports = [_default_registry_port]
addr = tractor._addr.wrap_address(
addr=(host, ports[0])
)
async def list_services(): async def list_services():
nonlocal host nonlocal host
async with ( async with (
@ -335,16 +318,18 @@ def services(config, tl, ports):
name='service_query', name='service_query',
loglevel=config['loglevel'] if tl else None, loglevel=config['loglevel'] if tl else None,
), ),
tractor.get_arbiter( tractor.get_registry(
host=host, addr=addr,
port=ports[0]
) as portal ) as portal
): ):
registry = await portal.run_from_ns('self', 'get_registry') registry = await portal.run_from_ns(
'self',
'get_registry',
)
json_d = {} json_d = {}
for key, socket in registry.items(): for key, socket in registry.items():
host, port = socket json_d[key] = f'{socket}'
json_d[key] = f'{host}:{port}'
click.echo(f"{colorize_json(json_d)}") click.echo(f"{colorize_json(json_d)}")
trio.run(list_services) trio.run(list_services)

View File

@ -41,10 +41,13 @@ from .log import get_logger
log = get_logger('broker-config') log = get_logger('broker-config')
# XXX NOTE: taken from ``click`` since apparently they have some # XXX NOTE: taken from `click`
# super weirdness with sigint and sudo..no clue # |_https://github.com/pallets/click/blob/main/src/click/utils.py#L449
# we're probably going to slowly just modify it to our own version over #
# time.. # (since apparently they have some super weirdness with SIGINT and
# sudo.. no clue we're probably going to slowly just modify it to our
# own version over time..)
#
def get_app_dir( def get_app_dir(
app_name: str, app_name: str,
roaming: bool = True, roaming: bool = True,
@ -261,7 +264,7 @@ def load(
MutableMapping, MutableMapping,
] = tomllib.loads, ] = tomllib.loads,
touch_if_dne: bool = False, touch_if_dne: bool = True,
**tomlkws, **tomlkws,
@ -270,7 +273,7 @@ def load(
Load config file by name. Load config file by name.
If desired config is not in the top level piker-user config path then If desired config is not in the top level piker-user config path then
pass the ``path: Path`` explicitly. pass the `path: Path` explicitly.
''' '''
# create the $HOME/.config/piker dir if dne # create the $HOME/.config/piker dir if dne
@ -285,7 +288,8 @@ def load(
if ( if (
not path.is_file() not path.is_file()
and touch_if_dne and
touch_if_dne
): ):
# only do a template if no path provided, # only do a template if no path provided,
# just touch an empty file with same name. # just touch an empty file with same name.

View File

@ -740,7 +740,7 @@ async def sample_and_broadcast(
log.warning( log.warning(
f'Feed OVERRUN {sub_key}' f'Feed OVERRUN {sub_key}'
'@{bus.brokername} -> \n' f'@{bus.brokername} -> \n'
f'feed @ {chan.uid}\n' f'feed @ {chan.uid}\n'
f'throttle = {throttle} Hz' f'throttle = {throttle} Hz'
) )

View File

@ -91,6 +91,18 @@ class SymbologyCache(Struct):
# provided by the backend pkg. # provided by the backend pkg.
mktmaps: dict[str, MktPair] = field(default_factory=dict) mktmaps: dict[str, MktPair] = field(default_factory=dict)
def pformat(self) -> str:
return (
f'<{type(self).__name__}(\n'
f' .mod: {self.mod!r}\n'
f' .assets: {len(self.assets)!r}\n'
f' .pairs: {len(self.pairs)!r}\n'
f' .mktmaps: {len(self.mktmaps)!r}\n'
f')>'
)
__repr__ = pformat
def write_config(self) -> None: def write_config(self) -> None:
# put the backend's pair-struct type ref at the top # put the backend's pair-struct type ref at the top

View File

@ -27,7 +27,6 @@ from functools import partial
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
Any, Any,
Optional,
Callable, Callable,
AsyncContextManager, AsyncContextManager,
AsyncGenerator, AsyncGenerator,
@ -35,6 +34,7 @@ from typing import (
) )
import json import json
import tractor
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from trio_websocket import ( from trio_websocket import (
@ -167,7 +167,7 @@ async def _reconnect_forever(
async def proxy_msgs( async def proxy_msgs(
ws: WebSocketConnection, ws: WebSocketConnection,
pcs: trio.CancelScope, # parent cancel scope rent_cs: trio.CancelScope, # parent cancel scope
): ):
''' '''
Receive (under `timeout` deadline) all msgs from from underlying Receive (under `timeout` deadline) all msgs from from underlying
@ -192,7 +192,7 @@ async def _reconnect_forever(
f'{url} connection bail with:' f'{url} connection bail with:'
) )
await trio.sleep(0.5) await trio.sleep(0.5)
pcs.cancel() rent_cs.cancel()
# go back to reonnect loop in parent task # go back to reonnect loop in parent task
return return
@ -204,7 +204,7 @@ async def _reconnect_forever(
f'{src_mod}\n' f'{src_mod}\n'
'WS feed seems down and slow af.. reconnecting\n' 'WS feed seems down and slow af.. reconnecting\n'
) )
pcs.cancel() rent_cs.cancel()
# go back to reonnect loop in parent task # go back to reonnect loop in parent task
return return
@ -228,7 +228,12 @@ async def _reconnect_forever(
nobsws._connected = trio.Event() nobsws._connected = trio.Event()
task_status.started() task_status.started()
while not snd._closed: mc_state: trio._channel.MemoryChannelState = snd._state
while (
mc_state.open_receive_channels > 0
and
mc_state.open_send_channels > 0
):
log.info( log.info(
f'{src_mod}\n' f'{src_mod}\n'
f'{url} trying (RE)CONNECT' f'{url} trying (RE)CONNECT'
@ -237,10 +242,11 @@ async def _reconnect_forever(
ws: WebSocketConnection ws: WebSocketConnection
try: try:
async with ( async with (
trio.open_nursery() as n,
open_websocket_url(url) as ws, open_websocket_url(url) as ws,
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
): ):
cs = nobsws._cs = n.cancel_scope cs = nobsws._cs = tn.cancel_scope
nobsws._ws = ws nobsws._ws = ws
log.info( log.info(
f'{src_mod}\n' f'{src_mod}\n'
@ -248,7 +254,7 @@ async def _reconnect_forever(
) )
# begin relay loop to forward msgs # begin relay loop to forward msgs
n.start_soon( tn.start_soon(
proxy_msgs, proxy_msgs,
ws, ws,
cs, cs,
@ -262,7 +268,7 @@ async def _reconnect_forever(
# TODO: should we return an explicit sub-cs # TODO: should we return an explicit sub-cs
# from this fixture task? # from this fixture task?
await n.start( await tn.start(
open_fixture, open_fixture,
fixture, fixture,
nobsws, nobsws,
@ -272,11 +278,23 @@ async def _reconnect_forever(
# to let tasks run **inside** the ws open block above. # to let tasks run **inside** the ws open block above.
nobsws._connected.set() nobsws._connected.set()
await trio.sleep_forever() await trio.sleep_forever()
except HandshakeError:
except (
HandshakeError,
ConnectionRejected,
):
log.exception('Retrying connection') log.exception('Retrying connection')
await trio.sleep(0.5) # throttle
# ws & nursery block ends except BaseException as _berr:
berr = _berr
log.exception(
'Reconnect-attempt failed ??\n'
)
await trio.sleep(0.2) # throttle
raise berr
#|_ws & nursery block ends
nobsws._connected = trio.Event() nobsws._connected = trio.Event()
if cs.cancelled_caught: if cs.cancelled_caught:
log.cancel( log.cancel(
@ -324,21 +342,25 @@ async def open_autorecon_ws(
connetivity errors, or some user defined recv timeout. connetivity errors, or some user defined recv timeout.
You can provide a ``fixture`` async-context-manager which will be You can provide a ``fixture`` async-context-manager which will be
entered/exitted around each connection reset; eg. for (re)requesting entered/exitted around each connection reset; eg. for
subscriptions without requiring streaming setup code to rerun. (re)requesting subscriptions without requiring streaming setup
code to rerun.
''' '''
snd: trio.MemorySendChannel snd: trio.MemorySendChannel
rcv: trio.MemoryReceiveChannel rcv: trio.MemoryReceiveChannel
snd, rcv = trio.open_memory_channel(616) snd, rcv = trio.open_memory_channel(616)
async with trio.open_nursery() as n: async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
nobsws = NoBsWs( nobsws = NoBsWs(
url, url,
rcv, rcv,
msg_recv_timeout=msg_recv_timeout, msg_recv_timeout=msg_recv_timeout,
) )
await n.start( await tn.start(
partial( partial(
_reconnect_forever, _reconnect_forever,
url, url,
@ -351,11 +373,10 @@ async def open_autorecon_ws(
await nobsws._connected.wait() await nobsws._connected.wait()
assert nobsws._cs assert nobsws._cs
assert nobsws.connected() assert nobsws.connected()
try: try:
yield nobsws yield nobsws
finally: finally:
n.cancel_scope.cancel() tn.cancel_scope.cancel()
''' '''
@ -368,8 +389,8 @@ of msgs over a `NoBsWs`.
class JSONRPCResult(Struct): class JSONRPCResult(Struct):
id: int id: int
jsonrpc: str = '2.0' jsonrpc: str = '2.0'
result: Optional[dict] = None result: dict|None = None
error: Optional[dict] = None error: dict|None = None
@acm @acm

View File

@ -39,6 +39,7 @@ from typing import (
AsyncContextManager, AsyncContextManager,
Awaitable, Awaitable,
Sequence, Sequence,
TYPE_CHECKING,
) )
import trio import trio
@ -75,6 +76,10 @@ from ._sampling import (
uniform_rate_send, uniform_rate_send,
) )
if TYPE_CHECKING:
from tractor._addr import Address
from tractor.msg.types import Aid
class Sub(Struct, frozen=True): class Sub(Struct, frozen=True):
''' '''
@ -352,7 +357,9 @@ async def allocate_persistent_feed(
# yield back control to starting nursery once we receive either # yield back control to starting nursery once we receive either
# some history or a real-time quote. # some history or a real-time quote.
log.info(f'loading OHLCV history: {fqme}') log.info(
f'loading OHLCV history: {fqme!r}\n'
)
await some_data_ready.wait() await some_data_ready.wait()
flume = Flume( flume = Flume(
@ -723,7 +730,10 @@ class Feed(Struct):
async for msg in stream: async for msg in stream:
await tx.send(msg) await tx.send(msg)
async with trio.open_nursery() as nurse: async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as nurse
):
# spawn a relay task for each stream so that they all # spawn a relay task for each stream so that they all
# multiplex to a common channel. # multiplex to a common channel.
for brokername in mods: for brokername in mods:
@ -786,7 +796,6 @@ async def install_brokerd_search(
@acm @acm
async def maybe_open_feed( async def maybe_open_feed(
fqmes: list[str], fqmes: list[str],
loglevel: str | None = None, loglevel: str | None = None,
@ -840,7 +849,6 @@ async def maybe_open_feed(
@acm @acm
async def open_feed( async def open_feed(
fqmes: list[str], fqmes: list[str],
loglevel: str|None = None, loglevel: str|None = None,
@ -899,19 +907,19 @@ async def open_feed(
feed.portals[brokermod] = portal feed.portals[brokermod] = portal
# fill out "status info" that the UI can show # fill out "status info" that the UI can show
host, port = portal.channel.raddr chan: tractor.Channel = portal.chan
if host == '127.0.0.1': raddr: Address = chan.raddr
host = 'localhost' aid: Aid = chan.aid
# TAG_feed_status_update
feed.status.update({ feed.status.update({
'actor_name': portal.channel.uid[0], 'actor_id': aid,
'host': host, 'actor_short_id': f'{aid.name}@{aid.pid}',
'port': port, 'ipc': chan.raddr.proto_key,
'ipc_addr': raddr,
'hist_shm': 'NA', 'hist_shm': 'NA',
'rt_shm': 'NA', 'rt_shm': 'NA',
'throttle_rate': tick_throttle, 'throttle_hz': tick_throttle,
}) })
# feed.status.update(init_msg.pop('status', {}))
# (allocate and) connect to any feed bus for this broker # (allocate and) connect to any feed bus for this broker
bus_ctxs.append( bus_ctxs.append(

View File

@ -36,10 +36,10 @@ from ._sharedmem import (
ShmArray, ShmArray,
_Token, _Token,
) )
from piker.accounting import MktPair
if TYPE_CHECKING: if TYPE_CHECKING:
from ..accounting import MktPair from piker.data.feed import Feed
from .feed import Feed
class Flume(Struct): class Flume(Struct):

View File

@ -113,9 +113,9 @@ def validate_backend(
) )
if ep is None: if ep is None:
log.warning( log.warning(
f'Provider backend {mod.name} is missing ' f'Provider backend {mod.name!r} is missing '
f'{daemon_name} support :(\n' f'{daemon_name!r} support?\n'
f'The following endpoint is missing: {name}' f'|_module endpoint-func missing: {name!r}\n'
) )
inits: list[ inits: list[

View File

@ -498,6 +498,7 @@ async def cascade(
func_name: str = func.__name__ func_name: str = func.__name__
async with ( async with (
tractor.trionics.collapse_eg(), # avoid multi-taskc tb in console
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
# TODO: might be better to just make a "restart" method where # TODO: might be better to just make a "restart" method where

View File

@ -107,17 +107,22 @@ async def open_piker_runtime(
async with ( async with (
tractor.open_root_actor( tractor.open_root_actor(
# passed through to ``open_root_actor`` # passed through to `open_root_actor`
registry_addrs=registry_addrs, registry_addrs=registry_addrs,
name=name, name=name,
start_method=start_method,
loglevel=loglevel, loglevel=loglevel,
debug_mode=debug_mode, debug_mode=debug_mode,
start_method=start_method,
# XXX NOTE MEMBER DAT der's a perf hit yo!!
# https://greenback.readthedocs.io/en/latest/principle.html#performance
maybe_enable_greenback=True,
# TODO: eventually we should be able to avoid # TODO: eventually we should be able to avoid
# having the root have more then permissions to # having the root have more then permissions to
# spawn other specialized daemons I think? # spawn other specialized daemons I think?
enable_modules=enable_modules, enable_modules=enable_modules,
hide_tb=False,
**tractor_kwargs, **tractor_kwargs,
) as actor, ) as actor,
@ -200,7 +205,8 @@ async def open_pikerd(
reg_addrs, reg_addrs,
), ),
tractor.open_nursery() as actor_nursery, tractor.open_nursery() as actor_nursery,
trio.open_nursery() as service_nursery, tractor.trionics.collapse_eg(),
trio.open_nursery() as service_tn,
): ):
for addr in reg_addrs: for addr in reg_addrs:
if addr not in root_actor.accept_addrs: if addr not in root_actor.accept_addrs:
@ -211,7 +217,7 @@ async def open_pikerd(
# assign globally for future daemon/task creation # assign globally for future daemon/task creation
Services.actor_n = actor_nursery Services.actor_n = actor_nursery
Services.service_n = service_nursery Services.service_n = service_tn
Services.debug_mode = debug_mode Services.debug_mode = debug_mode
try: try:
@ -221,7 +227,7 @@ async def open_pikerd(
# TODO: is this more clever/efficient? # TODO: is this more clever/efficient?
# if 'samplerd' in Services.service_tasks: # if 'samplerd' in Services.service_tasks:
# await Services.cancel_service('samplerd') # await Services.cancel_service('samplerd')
service_nursery.cancel_scope.cancel() service_tn.cancel_scope.cancel()
# TODO: do we even need this? # TODO: do we even need this?
@ -256,7 +262,10 @@ async def maybe_open_pikerd(
loglevel: str | None = None, loglevel: str | None = None,
**kwargs, **kwargs,
) -> tractor._portal.Portal | ClassVar[Services]: ) -> (
tractor._portal.Portal
|ClassVar[Services]
):
''' '''
If no ``pikerd`` daemon-root-actor can be found start it and If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self yield up (we should probably figure out returning a portal to self
@ -281,7 +290,8 @@ async def maybe_open_pikerd(
registry_addrs: list[tuple[str, int]] = ( registry_addrs: list[tuple[str, int]] = (
registry_addrs registry_addrs
or [_default_reg_addr] or
[_default_reg_addr]
) )
pikerd_portal: tractor.Portal|None pikerd_portal: tractor.Portal|None

View File

@ -28,6 +28,7 @@ from contextlib import (
) )
import tractor import tractor
from trio.lowlevel import current_task
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
@ -70,6 +71,7 @@ async def maybe_spawn_daemon(
lock = Services.locks[service_name] lock = Services.locks[service_name]
await lock.acquire() await lock.acquire()
try:
async with find_service( async with find_service(
service_name, service_name,
registry_addrs=[('127.0.0.1', 6116)], registry_addrs=[('127.0.0.1', 6116)],
@ -134,6 +136,20 @@ async def maybe_spawn_daemon(
yield portal yield portal
await portal.cancel_actor() await portal.cancel_actor()
except BaseException as _err:
err = _err
if (
lock.locked()
and
lock.statistics().owner is current_task()
):
log.exception(
f'Releasing stale lock after crash..?'
f'{err!r}\n'
)
lock.release()
raise err
async def spawn_emsd( async def spawn_emsd(

View File

@ -109,7 +109,7 @@ class Services:
# wait on any context's return value # wait on any context's return value
# and any final portal result from the # and any final portal result from the
# sub-actor. # sub-actor.
ctx_res: Any = await ctx.result() ctx_res: Any = await ctx.wait_for_result()
# NOTE: blocks indefinitely until cancelled # NOTE: blocks indefinitely until cancelled
# either by error from the target context # either by error from the target context

View File

@ -101,13 +101,15 @@ async def open_registry(
if ( if (
not tractor.is_root_process() not tractor.is_root_process()
and not Registry.addrs and
not Registry.addrs
): ):
Registry.addrs.extend(actor.reg_addrs) Registry.addrs.extend(actor.reg_addrs)
if ( if (
ensure_exists ensure_exists
and not Registry.addrs and
not Registry.addrs
): ):
raise RuntimeError( raise RuntimeError(
f"`{uid}` registry should already exist but doesn't?" f"`{uid}` registry should already exist but doesn't?"
@ -146,7 +148,7 @@ async def find_service(
| list[Portal] | list[Portal]
| None | None
): ):
# try:
reg_addrs: list[tuple[str, int]] reg_addrs: list[tuple[str, int]]
async with open_registry( async with open_registry(
addrs=( addrs=(
@ -157,22 +159,39 @@ async def find_service(
or Registry.addrs or Registry.addrs
), ),
) as reg_addrs: ) as reg_addrs:
log.info(f'Scanning for service `{service_name}`')
maybe_portals: list[Portal] | Portal | None log.info(
f'Scanning for service {service_name!r}'
)
# attach to existing daemon by name if possible # attach to existing daemon by name if possible
maybe_portals: list[Portal]|Portal|None
async with tractor.find_actor( async with tractor.find_actor(
service_name, service_name,
registry_addrs=reg_addrs, registry_addrs=reg_addrs,
only_first=first_only, # if set only returns single ref only_first=first_only, # if set only returns single ref
) as maybe_portals: ) as maybe_portals:
if not maybe_portals: if not maybe_portals:
# log.info(
print(
f'Could NOT find service {service_name!r} -> {maybe_portals!r}'
)
yield None yield None
return return
# log.info(
print(
f'Found service {service_name!r} -> {maybe_portals}'
)
yield maybe_portals yield maybe_portals
# except BaseException as _berr:
# berr = _berr
# log.exception(
# 'tractor.find_actor() failed with,\n'
# )
# raise berr
async def check_for_service( async def check_for_service(
service_name: str, service_name: str,

View File

@ -963,7 +963,10 @@ async def tsdb_backfill(
# concurrently load the provider's most-recent-frame AND any # concurrently load the provider's most-recent-frame AND any
# pre-existing tsdb history already saved in `piker` storage. # pre-existing tsdb history already saved in `piker` storage.
dt_eps: list[DateTime, DateTime] = [] dt_eps: list[DateTime, DateTime] = []
async with trio.open_nursery() as tn: async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
tn.start_soon( tn.start_soon(
push_latest_frame, push_latest_frame,
dt_eps, dt_eps,
@ -1012,9 +1015,16 @@ async def tsdb_backfill(
int, int,
Duration, Duration,
]|None = config.get('frame_types', None) ]|None = config.get('frame_types', None)
if def_frame_durs: if def_frame_durs:
def_frame_size: Duration = def_frame_durs[timeframe] def_frame_size: Duration = def_frame_durs[timeframe]
assert def_frame_size == calced_frame_size
if def_frame_size != calced_frame_size:
log.warning(
f'Expected frame size {def_frame_size}\n'
f'Rxed frame {calced_frame_size}\n'
)
# await tractor.pause()
else: else:
# use what we calced from first frame above. # use what we calced from first frame above.
def_frame_size = calced_frame_size def_frame_size = calced_frame_size
@ -1043,7 +1053,9 @@ async def tsdb_backfill(
# if there is a gap to backfill from the first # if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb # history frame until the last datum loaded from the tsdb
# continue that now in the background # continue that now in the background
async with trio.open_nursery() as tn: async with trio.open_nursery(
strict_exception_groups=False,
) as tn:
bf_done = await tn.start( bf_done = await tn.start(
partial( partial(
@ -1308,6 +1320,7 @@ async def manage_history(
# sampling period) data set since normally differently # sampling period) data set since normally differently
# sampled timeseries can be loaded / process independently # sampled timeseries can be loaded / process independently
# ;) # ;)
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
log.info( log.info(

View File

@ -517,7 +517,7 @@ def with_dts(
''' '''
return df.with_columns([ return df.with_columns([
pl.col(time_col).shift(1).suffix('_prev'), pl.col(time_col).shift(1).name.suffix('_prev'),
pl.col(time_col).diff().alias('s_diff'), pl.col(time_col).diff().alias('s_diff'),
pl.from_epoch(pl.col(time_col)).alias('dt'), pl.from_epoch(pl.col(time_col)).alias('dt'),
]).with_columns([ ]).with_columns([
@ -623,7 +623,7 @@ def detect_vlm_gaps(
) -> pl.DataFrame: ) -> pl.DataFrame:
vnull: pl.DataFrame = w_dts.filter( vnull: pl.DataFrame = df.filter(
pl.col(col) == 0 pl.col(col) == 0
) )
return vnull return vnull

View File

@ -21,6 +21,7 @@ Main app startup and run.
from functools import partial from functools import partial
from types import ModuleType from types import ModuleType
import tractor
import trio import trio
from piker.ui.qt import ( from piker.ui.qt import (
@ -116,6 +117,7 @@ async def _async_main(
needed_brokermods[brokername] = brokers[brokername] needed_brokermods[brokername] = brokers[brokername]
async with ( async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as root_n, trio.open_nursery() as root_n,
): ):
# set root nursery and task stack for spawning other charts/feeds # set root nursery and task stack for spawning other charts/feeds

View File

@ -33,7 +33,6 @@ import trio
from piker.ui.qt import ( from piker.ui.qt import (
QtCore, QtCore,
QtWidgets,
Qt, Qt,
QLineF, QLineF,
QFrame, QFrame,

View File

@ -1445,7 +1445,10 @@ async def display_symbol_data(
# for pause/resume on mouse interaction # for pause/resume on mouse interaction
rt_chart.feed = feed rt_chart.feed = feed
async with trio.open_nursery() as ln: async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as ln,
):
# if available load volume related built-in display(s) # if available load volume related built-in display(s)
vlm_charts: dict[ vlm_charts: dict[
str, str,

View File

@ -22,7 +22,10 @@ from contextlib import asynccontextmanager as acm
from typing import Callable from typing import Callable
import trio import trio
from tractor.trionics import gather_contexts from tractor.trionics import (
gather_contexts,
collapse_eg,
)
from piker.ui.qt import ( from piker.ui.qt import (
QtCore, QtCore,
@ -207,7 +210,10 @@ async def open_signal_handler(
async for args in recv: async for args in recv:
await async_handler(*args) await async_handler(*args)
async with trio.open_nursery() as tn: async with (
collapse_eg(),
trio.open_nursery() as tn
):
tn.start_soon(proxy_to_handler) tn.start_soon(proxy_to_handler)
async with send: async with send:
yield yield
@ -242,6 +248,7 @@ async def open_handlers(
widget: QWidget widget: QWidget
streams: list[trio.abc.ReceiveChannel] streams: list[trio.abc.ReceiveChannel]
async with ( async with (
collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
gather_contexts([ gather_contexts([
open_event_stream( open_event_stream(

View File

@ -18,10 +18,11 @@
Feed status and controls widget(s) for embedding in a UI-pane. Feed status and controls widget(s) for embedding in a UI-pane.
""" """
from __future__ import annotations from __future__ import annotations
from textwrap import dedent from typing import (
from typing import TYPE_CHECKING Any,
TYPE_CHECKING,
)
# from PyQt5.QtCore import Qt # from PyQt5.QtCore import Qt
@ -49,35 +50,55 @@ def mk_feed_label(
a feed control protocol. a feed control protocol.
''' '''
status = feed.status status: dict[str, Any] = feed.status
assert status assert status
msg = dedent(""" # SO tips on ws/nls,
actor: **{actor_name}**\n # https://stackoverflow.com/a/15721400
|_ @**{host}:{port}**\n ws: str = '&nbsp;'
""") # nl: str = '<br>' # dun work?
actor_info_repr: str = (
f')> **{status["actor_short_id"]}**\n'
'\n' # bc md?
)
for key, val in status.items(): # fields to select *IN* for display
if key in ('host', 'port', 'actor_name'): # (see `.data.feed.open_feed()` status
continue # update -> TAG_feed_status_update)
msg += f'\n|_ {key}: **{{{key}}}**\n' for key in [
'ipc',
'hist_shm',
'rt_shm',
'throttle_hz',
]:
# NOTE, the 2nd key is filled via `.format()` updates.
actor_info_repr += (
f'\n' # bc md?
f'{ws}|_{key}: **{{{key}}}**\n'
)
# ^TODO? formatting and content..
# -[ ] showing which fqme is "forward" on the
# chart/fsp/order-mode?
# '|_ flows: **{symbols}**\n'
#
# -[x] why isn't the indent working?
# => markdown, now solved..
feed_label = FormatLabel( feed_label = FormatLabel(
fmt_str=msg, fmt_str=actor_info_repr,
# |_ streams: **{symbols}**\n
font=_font.font, font=_font.font,
font_size=_font_small.px_size, font_size=_font_small.px_size,
font_color='default_lightest', font_color='default_lightest',
) )
# ?TODO, remove this?
# form.vbox.setAlignment(feed_label, Qt.AlignBottom) # form.vbox.setAlignment(feed_label, Qt.AlignBottom)
# form.vbox.setAlignment(Qt.AlignBottom) # form.vbox.setAlignment(Qt.AlignBottom)
_ = chart.height() - ( # _ = chart.height() - (
form.height() + # form.height() +
form.fill_bar.height() # form.fill_bar.height()
# feed_label.height() # # feed_label.height()
) # )
feed_label.format(**feed.status) feed_label.format(**feed.status)
return feed_label return feed_label

View File

@ -600,6 +600,7 @@ async def open_fsp_admin(
kwargs=kwargs, kwargs=kwargs,
) as (cache_hit, cluster_map), ) as (cache_hit, cluster_map),
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
if cache_hit: if cache_hit:
@ -613,6 +614,8 @@ async def open_fsp_admin(
) )
try: try:
yield admin yield admin
# ??TODO, does this *need* to be inside a finally?
finally: finally:
# terminate all tasks via signals # terminate all tasks via signals
for key, entry in admin._registry.items(): for key, entry in admin._registry.items():

View File

@ -285,18 +285,20 @@ class FormatLabel(QLabel):
font_size: int, font_size: int,
font_color: str, font_color: str,
use_md: bool = True,
parent=None, parent=None,
) -> None: ) -> None:
super().__init__(parent) super().__init__(parent)
# by default set the format string verbatim and expect user to # by default set the format string verbatim and expect user
# call ``.format()`` later (presumably they'll notice the # to call ``.format()`` later (presumably they'll notice the
# unformatted content if ``fmt_str`` isn't meant to be # unformatted content if ``fmt_str`` isn't meant to be
# unformatted). # unformatted).
self.fmt_str = fmt_str self.fmt_str = fmt_str
self.setText(fmt_str) # self.setText(fmt_str) # ?TODO, why here?
self.setStyleSheet( self.setStyleSheet(
f"""QLabel {{ f"""QLabel {{
@ -306,6 +308,7 @@ class FormatLabel(QLabel):
""" """
) )
self.setFont(_font.font) self.setFont(_font.font)
if use_md:
self.setTextFormat( self.setTextFormat(
Qt.TextFormat.MarkdownText Qt.TextFormat.MarkdownText
) )
@ -316,7 +319,10 @@ class FormatLabel(QLabel):
size_policy.Expanding, size_policy.Expanding,
) )
self.setAlignment( self.setAlignment(
Qt.AlignVCenter | Qt.AlignLeft Qt.AlignLeft
|
Qt.AlignBottom
# Qt.AlignVCenter
) )
self.setText(self.fmt_str) self.setText(self.fmt_str)

View File

@ -15,7 +15,8 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" """
qompleterz: embeddable search and complete using trio, Qt and rapidfuzz. qompleterz: embeddable search and complete using trio, Qt and
rapidfuzz.
""" """
@ -46,6 +47,7 @@ import time
from pprint import pformat from pprint import pformat
from rapidfuzz import process as fuzzy from rapidfuzz import process as fuzzy
import tractor
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -53,7 +55,7 @@ from piker.ui.qt import (
size_policy, size_policy,
align_flag, align_flag,
Qt, Qt,
QtCore, # QtCore,
QtWidgets, QtWidgets,
QModelIndex, QModelIndex,
QItemSelectionModel, QItemSelectionModel,
@ -920,7 +922,10 @@ async def fill_results(
# issue multi-provider fan-out search request and place # issue multi-provider fan-out search request and place
# "searching.." statuses on outstanding results providers # "searching.." statuses on outstanding results providers
async with trio.open_nursery() as n: async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
for provider, (search, pause) in ( for provider, (search, pause) in (
_searcher_cache.copy().items() _searcher_cache.copy().items()
@ -944,7 +949,7 @@ async def fill_results(
status_field='-> searchin..', status_field='-> searchin..',
) )
await n.start( await tn.start(
pack_matches, pack_matches,
view, view,
has_results, has_results,
@ -1004,12 +1009,14 @@ async def handle_keyboard_input(
view.set_font_size(searchbar.dpi_font.px_size) view.set_font_size(searchbar.dpi_font.px_size)
send, recv = trio.open_memory_channel(616) send, recv = trio.open_memory_channel(616)
async with trio.open_nursery() as n: async with (
tractor.trionics.collapse_eg(), # needed?
trio.open_nursery() as tn
):
# start a background multi-searcher task which receives # start a background multi-searcher task which receives
# patterns relayed from this keyboard input handler and # patterns relayed from this keyboard input handler and
# async updates the completer view's results. # async updates the completer view's results.
n.start_soon( tn.start_soon(
partial( partial(
fill_results, fill_results,
searchw, searchw,

View File

@ -269,6 +269,8 @@ def hcolor(name: str) -> str:
# default ohlc-bars/curve gray # default ohlc-bars/curve gray
'bracket': '#666666', # like the logo 'bracket': '#666666', # like the logo
'pikers': '#616161', # a trader shade of..
'beast': '#161616', # in the dark alone.
# bluish # bluish
'charcoal': '#36454F', 'charcoal': '#36454F',

View File

@ -21,6 +21,7 @@ Chart trading, the only way to scalp.
from __future__ import annotations from __future__ import annotations
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass, field from dataclasses import dataclass, field
from decimal import Decimal
from functools import partial from functools import partial
from pprint import pformat from pprint import pformat
import time import time
@ -41,7 +42,6 @@ from piker.accounting import (
Position, Position,
mk_allocator, mk_allocator,
MktPair, MktPair,
Symbol,
) )
from piker.clearing import ( from piker.clearing import (
open_ems, open_ems,
@ -143,6 +143,15 @@ class OrderMode:
} }
_staged_order: Order | None = None _staged_order: Order | None = None
@property
def curr_mkt(self) -> MktPair:
'''
Deliver the currently selected `MktPair` according
chart state.
'''
return self.chart.linked.mkt
def on_level_change_update_next_order_info( def on_level_change_update_next_order_info(
self, self,
level: float, level: float,
@ -172,7 +181,11 @@ class OrderMode:
line.update_labels(order_info) line.update_labels(order_info)
# update bound-in staged order # update bound-in staged order
order.price = level mkt: MktPair = self.curr_mkt
order.price: Decimal = mkt.quantize(
size=level,
quantity_type='price',
)
order.size = order_info['size'] order.size = order_info['size']
# when an order is changed we flip the settings side-pane to # when an order is changed we flip the settings side-pane to
@ -187,7 +200,9 @@ class OrderMode:
) -> LevelLine: ) -> LevelLine:
level = order.price # TODO, if we instead just always decimalize at the ems layer
# we can avoid this back-n-forth casting?
level = float(order.price)
line = order_line( line = order_line(
chart or self.chart, chart or self.chart,
@ -224,7 +239,11 @@ class OrderMode:
# the order mode allocator but we still need to update the # the order mode allocator but we still need to update the
# "staged" order message we'll send to the ems # "staged" order message we'll send to the ems
def update_order_price(y: float) -> None: def update_order_price(y: float) -> None:
order.price = y mkt: MktPair = self.curr_mkt
order.price: Decimal = mkt.quantize(
size=y,
quantity_type='price',
)
line._on_level_change = update_order_price line._on_level_change = update_order_price
@ -275,34 +294,31 @@ class OrderMode:
chart = cursor.linked.chart chart = cursor.linked.chart
if ( if (
not chart not chart
and cursor and
and cursor.active_plot cursor
and
cursor.active_plot
): ):
return return
chart = cursor.active_plot chart = cursor.active_plot
price = cursor._datum_xy[1] price: float = cursor._datum_xy[1]
if not price: if not price:
# zero prices are not supported by any means # zero prices are not supported by any means
# since that's illogical / a no-op. # since that's illogical / a no-op.
return return
mkt: MktPair = self.chart.linked.mkt
# NOTE : we could also use instead,
# mkt.quantize(price, quantity_type='price')
# but it returns a Decimal and it's probably gonna
# be slower?
# TODO: should we be enforcing this precision # TODO: should we be enforcing this precision
# at a different layer in the stack? right now # at a different layer in the stack?
# any precision error will literally be relayed # |_ might require `MktPair` tracking in the EMS?
# all the way back from the backend. # |_ right now any precision error will be relayed
# all the way back from the backend and vice-versa..
price = round( #
price, mkt: MktPair = self.curr_mkt
ndigits=mkt.price_tick_digits, price: Decimal = mkt.quantize(
size=price,
quantity_type='price',
) )
order = self._staged_order = Order( order = self._staged_order = Order(
action=action, action=action,
price=price, price=price,
@ -378,7 +394,7 @@ class OrderMode:
'oid': oid, 'oid': oid,
}) })
if order.price <= 0: if float(order.price) <= 0:
log.error( log.error(
'*!? Invalid `Order.price <= 0` ?!*\n' '*!? Invalid `Order.price <= 0` ?!*\n'
# TODO: make this present multi-line in object form # TODO: make this present multi-line in object form
@ -515,14 +531,15 @@ class OrderMode:
# if an order msg is provided update the line # if an order msg is provided update the line
# **from** that msg. # **from** that msg.
if order: if order:
if order.price <= 0: price: float = float(order.price)
if price <= 0:
log.error(f'Order has 0 price, cancelling..\n{order}') log.error(f'Order has 0 price, cancelling..\n{order}')
self.cancel_orders([order.oid]) self.cancel_orders([order.oid])
return None return None
line.set_level(order.price) line.set_level(price)
self.on_level_change_update_next_order_info( self.on_level_change_update_next_order_info(
level=order.price, level=price,
line=line, line=line,
order=order, order=order,
# use the corresponding position tracker for the # use the corresponding position tracker for the
@ -681,9 +698,9 @@ class OrderMode:
) -> Dialog | None: ) -> Dialog | None:
# NOTE: the `.order` attr **must** be set with the # NOTE: the `.order` attr **must** be set with the
# equivalent order msg in order to be loaded. # equivalent order msg in order to be loaded.
order = msg.req order: Order = msg.req
oid = str(msg.oid) oid = str(msg.oid)
symbol = order.symbol symbol: str = order.symbol
# TODO: MEGA UGGG ZONEEEE! # TODO: MEGA UGGG ZONEEEE!
src = msg.src src = msg.src
@ -702,13 +719,22 @@ class OrderMode:
order.oid = str(order.oid) order.oid = str(order.oid)
order.brokers = [brokername] order.brokers = [brokername]
# TODO: change this over to `MktPair`, but it's # ?TODO? change this over to `MktPair`, but it's gonna be
# gonna be tough since we don't have any such data # tough since we don't have any such data really in our
# really in our clearing msg schema.. # clearing msg schema..
order.symbol = Symbol.from_fqme( # BUT WAIT! WHY do we even want/need this!?
fqsn=fqme, #
info={}, # order.symbol = self.curr_mkt
) #
# XXX, the old approach.. which i don't quire member why..
# -[ ] verify we for sure don't require this any more!
# |_https://github.com/pikers/piker/issues/517
#
# order.symbol = Symbol.from_fqme(
# fqsn=fqme,
# info={},
# )
maybe_dialog: Dialog | None = self.submit_order( maybe_dialog: Dialog | None = self.submit_order(
send_msg=False, send_msg=False,
order=order, order=order,
@ -766,6 +792,7 @@ async def open_order_mode(
brokerd_accounts, brokerd_accounts,
ems_dialog_msgs, ems_dialog_msgs,
), ),
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
@ -1101,7 +1128,7 @@ async def process_trade_msg(
) )
) )
): ):
msg.req = order msg.req: Order = order
dialog: ( dialog: (
Dialog Dialog
# NOTE: on an invalid order submission (eg. # NOTE: on an invalid order submission (eg.
@ -1166,7 +1193,7 @@ async def process_trade_msg(
tm = time.time() tm = time.time()
mode.on_fill( mode.on_fill(
oid, oid,
price=req.price, price=float(req.price),
time_s=tm, time_s=tm,
) )
mode.lines.remove_line(uuid=oid) mode.lines.remove_line(uuid=oid)
@ -1221,7 +1248,7 @@ async def process_trade_msg(
tm = details['broker_time'] tm = details['broker_time']
mode.on_fill( mode.on_fill(
oid, oid,
price=details['price'], price=float(details['price']),
time_s=tm, time_s=tm,
pointing='up' if action == 'buy' else 'down', pointing='up' if action == 'buy' else 'down',
) )

View File

@ -1,4 +1,22 @@
""" # piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
A per-display, DPI (scaling) info dumper.
Resource list for mucking with DPIs on multiple screens: Resource list for mucking with DPIs on multiple screens:
- https://stackoverflow.com/questions/42141354/convert-pixel-size-to-point-size-for-fonts-on-multiple-platforms - https://stackoverflow.com/questions/42141354/convert-pixel-size-to-point-size-for-fonts-on-multiple-platforms
@ -12,89 +30,86 @@ Resource list for mucking with DPIs on multiple screens:
- https://stackoverflow.com/questions/16561879/what-is-the-difference-between-logicaldpix-and-physicaldpix-in-qt - https://stackoverflow.com/questions/16561879/what-is-the-difference-between-logicaldpix-and-physicaldpix-in-qt
- https://doc.qt.io/qt-5/qguiapplication.html#screenAt - https://doc.qt.io/qt-5/qguiapplication.html#screenAt
""" '''
from pyqtgraph import QtGui from pyqtgraph import QtGui
from PyQt5.QtCore import ( from PyQt6 import (
Qt, QCoreApplication QtCore,
QtWidgets,
)
from PyQt6.QtCore import (
Qt,
QCoreApplication,
QSize,
QRect,
) )
# Proper high DPI scaling is available in Qt >= 5.6.0. This attibute # Proper high DPI scaling is available in Qt >= 5.6.0. This attibute
# must be set before creating the application # must be set before creating the application
if hasattr(Qt, 'AA_EnableHighDpiScaling'): if hasattr(Qt, 'AA_EnableHighDpiScaling'):
QCoreApplication.setAttribute(Qt.AA_EnableHighDpiScaling, True) QCoreApplication.setAttribute(
Qt.AA_EnableHighDpiScaling,
True,
)
if hasattr(Qt, 'AA_UseHighDpiPixmaps'): if hasattr(Qt, 'AA_UseHighDpiPixmaps'):
QCoreApplication.setAttribute(Qt.AA_UseHighDpiPixmaps, True) QCoreApplication.setAttribute(
Qt.AA_UseHighDpiPixmaps,
True,
)
app = QtWidgets.QApplication([])
app = QtGui.QApplication([]) window = QtWidgets.QMainWindow()
window = QtGui.QMainWindow() main_widget = QtWidgets.QWidget()
main_widget = QtGui.QWidget()
window.setCentralWidget(main_widget) window.setCentralWidget(main_widget)
window.show() window.show()
pxr = main_widget.devicePixelRatioF() pxr: float = main_widget.devicePixelRatioF()
# screen_num = app.desktop().screenNumber() # explicitly get main widget and primary displays
# screen = app.screens()[screen_num] current_screen: QtGui.QScreen = app.screenAt(
main_widget.geometry().center()
)
primary_screen: QtGui.QScreen = app.primaryScreen()
screen = app.screenAt(main_widget.geometry().center()) screen: QtGui.QScreen
for screen in app.screens():
name = screen.name() name: str = screen.name()
size = screen.size() model: str = screen.model().rstrip()
geo = screen.availableGeometry() size: QSize = screen.size()
phydpi = screen.physicalDotsPerInch() geo: QRect = screen.availableGeometry()
logdpi = screen.logicalDotsPerInch() phydpi: float = screen.physicalDotsPerInch()
logdpi: float = screen.logicalDotsPerInch()
is_primary: bool = screen is primary_screen
is_current: bool = screen is current_screen
print( print(
# f'screen number: {screen_num}\n', f'------ screen name: {name} ------\n'
f'screen name: {name}\n' f'|_primary: {is_primary}\n'
f'screen size: {size}\n' f' _current: {is_current}\n'
f'screen geometry: {geo}\n\n' f' _model: {model}\n'
f'devicePixelRationF(): {pxr}\n' f' _screen size: {size}\n'
f'physical dpi: {phydpi}\n' f' _screen geometry: {geo}\n'
f'logical dpi: {logdpi}\n' f' _devicePixelRationF(): {pxr}\n'
f' _physical dpi: {phydpi}\n'
f' _logical dpi: {logdpi}\n'
) )
print('-'*50) # app-wide font info
screen = app.primaryScreen()
name = screen.name()
size = screen.size()
geo = screen.availableGeometry()
phydpi = screen.physicalDotsPerInch()
logdpi = screen.logicalDotsPerInch()
print(
# f'screen number: {screen_num}\n',
f'screen name: {name}\n'
f'screen size: {size}\n'
f'screen geometry: {geo}\n\n'
f'devicePixelRationF(): {pxr}\n'
f'physical dpi: {phydpi}\n'
f'logical dpi: {logdpi}\n'
)
# app-wide font
font = QtGui.QFont("Hack") font = QtGui.QFont("Hack")
# use pixel size to be cross-resolution compatible? # use pixel size to be cross-resolution compatible?
font.setPixelSize(6) font.setPixelSize(6)
fm = QtGui.QFontMetrics(font) fm = QtGui.QFontMetrics(font)
fontdpi = fm.fontDpi() fontdpi: float = fm.fontDpi()
font_h = fm.height() font_h: int = fm.height()
string = '10000'
str_br = fm.boundingRect(string)
str_w = str_br.width()
string: str = '10000'
str_br: QtCore.QRect = fm.boundingRect(string)
str_w: int = str_br.width()
print( print(
# f'screen number: {screen_num}\n', f'------ global font settings ------\n'
f'font dpi: {fontdpi}\n' f'font dpi: {fontdpi}\n'
f'font height: {font_h}\n' f'font height: {font_h}\n'
f'string bounding rect: {str_br}\n' f'string bounding rect: {str_br}\n'

1
tags 100644
View File

@ -0,0 +1 @@
TAG_feed_status_update ./piker/data/feed.py /TAG_feed_status_update/

View File

@ -15,6 +15,12 @@ from piker.service import (
from piker.log import get_console_log from piker.log import get_console_log
# include `tractor`'s built-in fixtures!
pytest_plugins: tuple[str] = (
"tractor._testing.pytest",
)
def pytest_addoption(parser): def pytest_addoption(parser):
parser.addoption("--ll", action="store", dest='loglevel', parser.addoption("--ll", action="store", dest='loglevel',
default=None, help="logging level to set when testing") default=None, help="logging level to set when testing")

View File

@ -179,7 +179,7 @@ def test_ems_err_on_bad_broker(
# NOTE: emsd should error on the actor's enabled modules # NOTE: emsd should error on the actor's enabled modules
# import phase, when looking for a backend named `doggy`. # import phase, when looking for a backend named `doggy`.
except tractor.RemoteActorError as re: except tractor.RemoteActorError as re:
assert re.type == ModuleNotFoundError assert re.type is ModuleNotFoundError
run_and_tollerate_cancels(load_bad_fqme) run_and_tollerate_cancels(load_bad_fqme)

View File

@ -142,7 +142,12 @@ async def test_concurrent_tokens_refresh(us_symbols, loglevel):
# async with tractor.open_nursery() as n: # async with tractor.open_nursery() as n:
# await n.run_in_actor('other', intermittently_refresh_tokens) # await n.run_in_actor('other', intermittently_refresh_tokens)
async with trio.open_nursery() as n: async with (
tractor.trionics.collapse_eg(),
trio.open_nursery(
# strict_exception_groups=False,
) as n
):
quoter = await qt.stock_quoter(client, us_symbols) quoter = await qt.stock_quoter(client, us_symbols)
@ -383,7 +388,9 @@ async def test_quote_streaming(tmx_symbols, loglevel, stream_what):
else: else:
symbols = [tmx_symbols] symbols = [tmx_symbols]
async with trio.open_nursery() as n: async with trio.open_nursery(
strict_exception_groups=False,
) as n:
for syms, func in zip(symbols, stream_what): for syms, func in zip(symbols, stream_what):
n.start_soon(func, feed, syms) n.start_soon(func, feed, syms)