Compare commits

...

27 Commits

Author SHA1 Message Date
Tyler Goodlet d66d1c1597 binance: add `AggTrade.nq: float`: "normal quantity" field.. 2026-01-06 21:46:09 -05:00
Tyler Goodlet de232215e1 binance: handle new `TRADIFI_PERPETUAL`.. 2026-01-06 21:46:09 -05:00
Tyler Goodlet 30679dc478 binance: add `Pair.opoAllowed` field
Handle new API field per 2025-12-02 update.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-06 21:46:09 -05:00
Tyler Goodlet 351d898915 binance: set `Pair.pegInstructionsAllowed = False`
Lol, a cheeky unforeseen bug due to TOML's lack of a null type and
thinking i can render an `Optional` field on a `msgspec.Struct`
(defaulted to `None`) the `binance.symcache.toml` cache file..

I didn't catch this when i first updated to the 3.1 API in f7caa75228
because i never did a cache-files flush.. lesson learned and we **really
need tests for this**!!
2026-01-06 21:46:09 -05:00
Tyler Goodlet 93c1f8ede5 Add fix for binance API 3.1 rollout..
See https://developers.binance.com/docs/binance-spot-api-docs#2025-08-26
2026-01-06 21:46:09 -05:00
Tyler Goodlet 7e7d678bfb kraken: add crash-handling around `Pair()` init
Since it can otherwise be difficult to debug due to nursery cancellation
(we need that taskman yo!).
2026-01-06 21:46:09 -05:00
Tyler Goodlet 300670af96 kraken: `Pair.costmin` is now optional?
Some pairs don't seem to define it but it's not listed as deprecated on
official API page (new one now linked in type def's doc string).
2026-01-06 21:46:09 -05:00
Tyler Goodlet 991e1014e4 binance: add new `permissionSets` to base `Pair` 2026-01-06 21:46:09 -05:00
Tyler Goodlet 7a0b2b4dae Update `binance` spot pairs with `amendAllowed`
As per API updates,
https://developers.binance.com/docs/binance-spot-api-docs
https://developers.binance.com/docs/binance-spot-api-docs/faqs/order_amend_keep_priority

I also slightly tweaked the filed mismatch exception note to include the
`repr(pair_type)` so the dev can know which pair types should be
changed.
2026-01-06 21:46:09 -05:00
Tyler Goodlet adb687193a `.kraken`: add masked pauses for order req debug
Such that the next time i inevitably must debug the some order-request
error status or precision discrepancy, i have the mkt-symbol branch
ready to go. Also, switch to `'action': 'buy'|'sell' as action,` style
`case` matching instead of the post-`if` predicate style.
2026-01-06 21:46:09 -05:00
Tyler Goodlet 9003c28fb4 `.questrade`: link in ws-API issue! 2026-01-06 21:46:09 -05:00
Tyler Goodlet 3047ba260a `.kraken.broker`: need to `await verify_balances()` .. 2026-01-06 21:46:09 -05:00
Tyler Goodlet 4d950f15a6 `.brokers.ib.feed`: better `tractor.to_asyncio` typing and var naming throughout! 2026-01-06 21:46:09 -05:00
Tyler Goodlet 4f215bfc7d `.brokers.cli`: module type and todo for `--pdb` flag to NOT src from sub-cmd 2026-01-06 21:46:08 -05:00
Tyler Goodlet ec9c03408a Type loaded backend modules 2026-01-06 21:46:08 -05:00
Tyler Goodlet ede617b60e Use `pytest` plugin now exposed by `tractor` 2026-01-06 21:44:19 -05:00
Tyler Goodlet 8313ae2e96 `.ui._search`: collapse EGs as needed, use `tn` naming. 2026-01-06 21:44:19 -05:00
Tyler Goodlet 095773e7da Port `.data._web_bs` stuff to strict-EGs
Using `tractor.trionics.collapse_eg()` as needed and doing
some renames, in similar style as elsewhere:
- `pcs` -> `rent_cs`,
- `n` -> `tn` for nursery handles,

Also,
- tweak the `._reconnect_forever()` while loop to use the
  (also) `trio`-internal
  `mc_state: trio._channel.MemoryChannelState = snd._state` instead
  of `snd._close` to poll for open send/receive consumer task counts
  since,
    1. it seems more reliable then using the `snd._closed`,
    2. there's no other way to access the info.. afaik?

- handle `ConnectionRejected` explicitly alongside handshake-errs as
  a retry case.
- add a base-exc handler which `.exception()` reports the reconnect
  attempt failure explicitly.
- drop some lingering `Optional` usage.
2026-01-06 21:44:19 -05:00
Tyler Goodlet e99d3e6282 Port `.cli` & `.service` to latest `tractor` registry APIs
Namely changes for the `registry_addrs: list`, enable_transports: list`
and related `tractor._addr` primitive requirements.

Other updates include,
- passing `maybe_enable_greenback=True`,
- additional exc logging around `pikerd` syncing/booting,
- changing to newer `Context.wait_for_result()`,
- dropping (unnecessary?) `maybe_open_crash_handler()` around `pikerd` ep.
2026-01-06 21:44:19 -05:00
Tyler Goodlet 819bd990d2 binance; unmask around send-chan @acm usage 2026-01-06 21:44:19 -05:00
Tyler Goodlet 905352ff2f Spurious first-draft of EG collapsing
Topically, throughout various (seemingly) console-UX-affecting or benign
spots in the code base; nothing that required more intervention beyond
things superficial. A few spots also include `trio.Nursery` ref renames
(always to something with a `tn` in it) and log-level reductions to
quiet (benign) console noise oriented around issues meant to be solved
long..

Note there's still a couple spots i left with the loose-ify flag because
i haven't fully tested them without using the latest version of
`tractor.trionics.collapse_eg()`, but more then likely they should flip
over fine.
2026-01-06 21:44:19 -05:00
Tyler Goodlet 641b85df70 Use `.trionics.collapse_eg()` in `.deribit.api`
Commit this change separate from the (original) broader set applied to
the entire code base since the `.deribit.api` mod contained changes from
upstream max-pain work (from our very own @nt) which caused a noticeable
conflict and intros un-required changes from his work to re-enable
`deribit` support.

Note the original commit, "69eac7bb Spurious first-draft of EG
collapsing", applied similar changes through the rest of the code base.
AGAIN, this mod's change is only being broken out to minimize upstream
change conflicts due to updates to the `deribit` backend done earlier in
time-history.
2026-01-06 21:44:19 -05:00
Tyler Goodlet 72415fce63 Try running daemons on UDS tpt
The root daemon, pikerd, needs to be adjusted to use diff default
registry addrs to also utilize non-TCP, but for now this gets us started
testing; so far so good B)
2026-01-06 21:44:19 -05:00
Tyler Goodlet c3bc5b6783 Adjust feed status fields/display-pane to new actor-ID
That is to use the new `tractor.msg.types.Aid` struct to pull the
`brokerd` info from the `tractor.Channel.aid: Aid` attr as well as more
generally handling the new `Channel.raddr.proto_key: str` and no longer
assuming a TCP IPC transport; this per the recent `tractor.ipc`
subsys which adds multi-IPC-transports!

Downstream tweaks to match,
- use an "opt-in" field set to display in the `brokerd` info pane in
  `.ui._feedstatus.mk_feed_label()`.
 |_ also add some todos and drop some seemingly unneeded form sizing
    calcs?
- tweak `.ui._label` to allow not using markdown, though ended up not
  doing that since it looked too plain..
2026-01-06 21:44:19 -05:00
Tyler Goodlet 4067acee04 Adjust to `trio`'s strict eg nurseries throughout!
Using `tractor.trionics.collapse_eg()` as needed to avoid, at the least,
crash-worthy (in debug-mode REPL-ing terms) nested cancellation egs that
exhibit on SIGINT/ctl-c of each "app" (chart & daemon).

Also a bit of renaming of all `trio.Nursery`s to `tn`, the new "task
nursery" shorthand-var-name being used in all our other `tractor`
related projects.
2026-01-06 21:44:19 -05:00
Tyler Goodlet 1462496b2a Port to newer `tractor.get_registry()` 2026-01-06 21:44:19 -05:00
Tyler Goodlet e4e69b45ba Update legacy type to `tractor.MsgStream` 2026-01-06 21:44:19 -05:00
39 changed files with 516 additions and 270 deletions

View File

@ -121,6 +121,7 @@ async def bot_main():
# tick_throttle=10,
) as feed,
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
):
assert accounts

View File

@ -365,7 +365,11 @@ class Position(Struct):
# added: bool = False
tid: str = t.tid
if tid in self._events:
log.warning(f'{t} is already added?!')
log.debug(
f'Txn is already added?\n'
f'\n'
f'{t}\n'
)
# return added
# TODO: apparently this IS possible with a dict but not
@ -731,7 +735,7 @@ class Account(Struct):
else:
# TODO: we reallly need a diff set of
# loglevels/colors per subsys.
log.warning(
log.debug(
f'Recent position for {fqme} was closed!'
)

View File

@ -98,13 +98,14 @@ async def open_cached_client(
If one has not been setup do it and cache it.
'''
brokermod = get_brokermod(brokername)
brokermod: ModuleType = get_brokermod(brokername)
# TODO: make abstract or `typing.Protocol`
# client: Client
async with maybe_open_context(
acm_func=brokermod.get_client,
kwargs=kwargs,
) as (cache_hit, client):
if cache_hit:
log.runtime(f'Reusing existing {client}')

View File

@ -96,7 +96,10 @@ async def _setup_persistent_brokerd(
# - `open_symbol_search()`
# NOTE: see ep invocation details inside `.data.feed`.
try:
async with trio.open_nursery() as service_nursery:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as service_nursery
):
bus: _FeedsBus = feed.get_feed_bus(
brokername,
service_nursery,

View File

@ -374,9 +374,14 @@ class Client:
pair: Pair = pair_type(**item)
except Exception as e:
e.add_note(
"\nDon't panic, prolly stupid binance changed their symbology schema again..\n"
'Check out their API docs here:\n\n'
'https://binance-docs.github.io/apidocs/spot/en/#exchange-information'
f'\n'
f'New or removed field we need to codify!\n'
f'pair-type: {pair_type!r}\n'
f'\n'
f"Don't panic, prolly stupid binance changed their symbology schema again..\n"
f'Check out their API docs here:\n'
f'\n'
f'https://binance-docs.github.io/apidocs/spot/en/#exchange-information\n'
)
raise
pair_table[pair.symbol.upper()] = pair

View File

@ -440,6 +440,7 @@ async def open_trade_dialog(
# - ledger: TransactionLedger
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
ctx.open_stream() as ems_stream,
):

View File

@ -94,13 +94,15 @@ class L1(Struct):
# validation type
# https://developers.binance.com/docs/derivatives/usds-margined-futures/websocket-market-streams/Aggregate-Trade-Streams#response-example
class AggTrade(Struct, frozen=True):
e: str # Event type
E: int # Event time
s: str # Symbol
a: int # Aggregate trade ID
p: float # Price
q: float # Quantity
q: float # Quantity with all the market trades
nq: float # Normal quantity without the trades involving RPI orders
f: int # First trade ID
l: int # noqa Last trade ID
T: int # Trade time
@ -448,7 +450,6 @@ async def subscribe(
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
@ -460,6 +461,7 @@ async def stream_quotes(
) -> None:
async with (
tractor.trionics.maybe_raise_from_masking_exc(),
send_chan as send_chan,
open_cached_client('binance') as client,
):

View File

@ -97,6 +97,16 @@ class Pair(Struct, frozen=True, kw_only=True):
baseAsset: str
baseAssetPrecision: int
permissionSets: list[list[str]]
# https://developers.binance.com/docs/binance-spot-api-docs#2025-08-26
# will become non-optional 2025-08-28?
# https://developers.binance.com/docs/binance-spot-api-docs#future-changes
pegInstructionsAllowed: bool = False
# https://developers.binance.com/docs/binance-spot-api-docs#2025-12-02
opoAllowed: bool = False
filters: dict[
str,
str | int | float,
@ -142,7 +152,11 @@ class SpotPair(Pair, frozen=True):
defaultSelfTradePreventionMode: str
allowedSelfTradePreventionModes: list[str]
permissions: list[str]
permissionSets: list[list[str]]
# can the paint botz creat liq gaps even easier on this asset?
# Bp
# https://developers.binance.com/docs/binance-spot-api-docs/faqs/order_amend_keep_priority
amendAllowed: bool
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.binance:SpotPair'
@ -209,7 +223,10 @@ class FutesPair(Pair):
assert pair == self.pair # sanity
return f'{expiry}'
case 'PERPETUAL':
case (
'PERPETUAL'
| 'TRADIFI_PERPETUAL'
):
return 'PERP'
case '':
@ -238,7 +255,10 @@ class FutesPair(Pair):
margin: str = self.marginAsset
match ctype:
case 'PERPETUAL':
case (
'PERPETUAL'
| 'TRADIFI_PERPETUAL'
):
return f'{margin}M'
case (

View File

@ -471,11 +471,15 @@ def search(
'''
# global opts
brokermods = list(config['brokermods'].values())
brokermods: list[ModuleType] = list(config['brokermods'].values())
# TODO: this is coming from the `search --pdb` NOT from
# the `piker --pdb` XD ..
# -[ ] pull from the parent click ctx's values..dumdum
# assert pdb
# define tractor entrypoint
async def main(func):
async with maybe_open_pikerd(
loglevel=config['loglevel'],
debug_mode=pdb,

View File

@ -31,7 +31,7 @@ from typing import (
Callable,
)
import pendulum
from pendulum import now
import trio
from trio_typing import TaskStatus
from rapidfuzz import process as fuzzy
@ -39,6 +39,7 @@ import numpy as np
from tractor.trionics import (
broadcast_receiver,
maybe_open_context
collapse_eg,
)
from tractor import to_asyncio
# XXX WOOPS XD
@ -432,6 +433,7 @@ async def get_client(
) -> Client:
async with (
collapse_eg(),
trio.open_nursery() as n,
open_jsonrpc_session(
_testnet_ws_url, dtype=JSONRPCResult) as json_rpc

View File

@ -48,6 +48,7 @@ from bidict import bidict
import trio
import tractor
from tractor import to_asyncio
from tractor import trionics
from pendulum import (
from_timestamp,
DateTime,
@ -1369,8 +1370,8 @@ async def load_clients_for_trio(
'''
Pure async mngr proxy to ``load_aio_clients()``.
This is a bootstrap entrypoing to call from
a ``tractor.to_asyncio.open_channel_from()``.
This is a bootstrap entrypoint to call from
a `tractor.to_asyncio.open_channel_from()`.
'''
async with load_aio_clients(
@ -1391,7 +1392,10 @@ async def open_client_proxies() -> tuple[
async with (
tractor.trionics.maybe_open_context(
acm_func=tractor.to_asyncio.open_channel_from,
kwargs={'target': load_clients_for_trio},
kwargs={
'target': load_clients_for_trio,
# ^XXX, kwarg to `open_channel_from()`
},
# lock around current actor task access
# TODO: maybe this should be the default in tractor?
@ -1584,7 +1588,8 @@ async def open_client_proxy(
event_consumers=event_table,
) as (first, chan),
trio.open_nursery() as relay_n,
trionics.collapse_eg(), # loose-ify
trio.open_nursery() as relay_tn,
):
assert isinstance(first, Client)
@ -1624,7 +1629,7 @@ async def open_client_proxy(
continue
relay_n.start_soon(relay_events)
relay_tn.start_soon(relay_events)
yield proxy

View File

@ -34,6 +34,7 @@ import trio
from trio_typing import TaskStatus
import tractor
from tractor.to_asyncio import LinkedTaskChannel
from tractor import trionics
from ib_insync.contract import (
Contract,
)
@ -407,7 +408,7 @@ async def update_and_audit_pos_msg(
# TODO: make this a "propaganda" log level?
if ibpos.avgCost != msg.avg_price:
log.warning(
log.debug(
f'IB "FIFO" avg price for {msg.symbol} is DIFF:\n'
f'ib: {ibfmtmsg}\n'
'---------------------------\n'
@ -738,7 +739,7 @@ async def open_trade_dialog(
f'UNEXPECTED POSITION says IB => {msg.symbol}\n'
'Maybe they LIQUIDATED YOU or your ledger is wrong?\n'
)
log.error(logmsg)
log.debug(logmsg)
await ctx.started((
all_positions,
@ -747,21 +748,22 @@ async def open_trade_dialog(
async with (
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
trionics.collapse_eg(),
trio.open_nursery() as tn,
):
# relay existing open orders to ems
for msg in order_msgs:
await ems_stream.send(msg)
for client in set(aioclients.values()):
trade_event_stream: LinkedTaskChannel = await n.start(
trade_event_stream: LinkedTaskChannel = await tn.start(
open_trade_event_stream,
client,
)
# start order request handler **before** local trades
# event loop
n.start_soon(
tn.start_soon(
handle_order_requests,
ems_stream,
accounts_def,
@ -769,7 +771,7 @@ async def open_trade_dialog(
)
# allocate event relay tasks for each client connection
n.start_soon(
tn.start_soon(
deliver_trade_events,
trade_event_stream,

View File

@ -587,7 +587,7 @@ async def get_bars(
data_cs.cancel()
# spawn new data reset task
data_cs, reset_done = await nurse.start(
data_cs, reset_done = await tn.start(
partial(
wait_on_data_reset,
proxy,
@ -607,11 +607,11 @@ async def get_bars(
# such that simultaneous symbol queries don't try data resettingn
# too fast..
unset_resetter: bool = False
async with trio.open_nursery() as nurse:
async with trio.open_nursery() as tn:
# start history request that we allow
# to run indefinitely until a result is acquired
nurse.start_soon(query)
tn.start_soon(query)
# start history reset loop which waits up to the timeout
# for a result before triggering a data feed reset.
@ -631,7 +631,7 @@ async def get_bars(
unset_resetter: bool = True
# spawn new data reset task
data_cs, reset_done = await nurse.start(
data_cs, reset_done = await tn.start(
partial(
wait_on_data_reset,
proxy,
@ -705,7 +705,9 @@ async def _setup_quote_stream(
# to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
def teardown():
ticker.updateEvent.disconnect(push)
log.error(f"Disconnected stream for `{symbol}`")
log.error(
f'Disconnected stream for `{symbol}`'
)
client.ib.cancelMktData(contract)
# decouple broadcast mem chan
@ -761,7 +763,10 @@ async def open_aio_quote_stream(
symbol: str,
contract: Contract | None = None,
) -> trio.abc.ReceiveStream:
) -> (
trio.abc.Channel| # iface
tractor.to_asyncio.LinkedTaskChannel # actually
):
from tractor.trionics import broadcast_receiver
global _quote_streams
@ -778,6 +783,7 @@ async def open_aio_quote_stream(
yield from_aio
return
from_aio: tractor.to_asyncio.LinkedTaskChannel
async with tractor.to_asyncio.open_channel_from(
_setup_quote_stream,
symbol=symbol,
@ -983,17 +989,18 @@ async def stream_quotes(
)
cs: trio.CancelScope | None = None
startup: bool = True
iter_quotes: trio.abc.Channel
while (
startup
or cs.cancel_called
):
with trio.CancelScope() as cs:
async with (
trio.open_nursery() as nurse,
trio.open_nursery() as tn,
open_aio_quote_stream(
symbol=sym,
contract=con,
) as stream,
) as iter_quotes,
):
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash)
@ -1021,9 +1028,9 @@ async def stream_quotes(
await rt_ev.wait()
cs.cancel() # cancel called should now be set
nurse.start_soon(reset_on_feed)
tn.start_soon(reset_on_feed)
async with aclosing(stream):
async with aclosing(iter_quotes):
# if syminfo.get('no_vlm', False):
if not init_msg.shm_write_opts['has_vlm']:
@ -1038,19 +1045,21 @@ async def stream_quotes(
# wait for real volume on feed (trading might be
# closed)
while True:
ticker = await stream.receive()
ticker = await iter_quotes.receive()
# for a real volume contract we rait for
# the first "real" trade to take place
if (
# not calc_price
# and not ticker.rtTime
not ticker.rtTime
False
# not ticker.rtTime
):
# spin consuming tickers until we
# get a real market datum
log.debug(f"New unsent ticker: {ticker}")
continue
else:
log.debug("Received first volume tick")
# ugh, clear ticks since we've
@ -1066,13 +1075,18 @@ async def stream_quotes(
log.debug(f"First ticker received {quote}")
# tell data-layer spawner-caller that live
# quotes are now streaming.
# quotes are now active desptie not having
# necessarily received a first vlm/clearing
# tick.
ticker = await iter_quotes.receive()
feed_is_live.set()
fqme: str = quote['fqme']
await send_chan.send({fqme: quote})
# last = time.time()
async for ticker in stream:
async for ticker in iter_quotes:
quote = normalize(ticker)
fqme = quote['fqme']
fqme: str = quote['fqme']
await send_chan.send({fqme: quote})
# ugh, clear ticks since we've consumed them

View File

@ -34,6 +34,7 @@ import urllib.parse
import hashlib
import hmac
import base64
import tractor
import trio
from piker import config
@ -372,8 +373,7 @@ class Client:
# 1658347714, 'status': 'Success'}]}
if xfers:
import tractor
await tractor.pp()
await tractor.pause()
trans: dict[str, Transaction] = {}
for entry in xfers:
@ -501,6 +501,7 @@ class Client:
for xkey, data in resp['result'].items():
# NOTE: always cache in pairs tables for faster lookup
with tractor.devx.maybe_open_crash_handler(): # as bxerr:
pair = Pair(xname=xkey, **data)
# register the above `Pair` structs for all

View File

@ -175,9 +175,8 @@ async def handle_order_requests(
case {
'account': 'kraken.spot' as account,
'action': action,
} if action in {'buy', 'sell'}:
'action': 'buy'|'sell',
}:
# validate
order = BrokerdOrder(**msg)
@ -262,6 +261,12 @@ async def handle_order_requests(
} | extra
log.info(f'Submitting WS order request:\n{pformat(req)}')
# NOTE HOWTO, debug order requests
#
# if 'XRP' in pair:
# await tractor.pause()
await ws.send_msg(req)
# placehold for sanity checking in relay loop
@ -544,7 +549,7 @@ async def open_trade_dialog(
# to be reloaded.
balances: dict[str, float] = await client.get_balances()
verify_balances(
await verify_balances(
acnt,
src_fiat,
balances,
@ -1085,6 +1090,8 @@ async def handle_order_updates(
f'Failed to {action} order {reqid}:\n'
f'{errmsg}'
)
# if tractor._state.debug_mode():
# await tractor.pause()
symbol: str = 'N/A'
if chain := apiflows.get(reqid):

View File

@ -21,7 +21,6 @@ Symbology defs and search.
from decimal import Decimal
import tractor
from rapidfuzz import process as fuzzy
from piker._cacheables import (
async_lifo_cache,
@ -41,8 +40,13 @@ from piker.accounting._mktinfo import (
)
# https://www.kraken.com/features/api#get-tradable-pairs
class Pair(Struct):
'''
A tradable asset pair as schema-defined by,
https://docs.kraken.com/api/docs/rest-api/get-tradable-asset-pairs
'''
xname: str # idiotic bs_mktid equiv i guess?
altname: str # alternate pair name
wsname: str # WebSocket pair name (if available)
@ -53,7 +57,6 @@ class Pair(Struct):
lot: str # volume lot size
cost_decimals: int
costmin: float
pair_decimals: int # scaling decimal places for pair
lot_decimals: int # scaling decimal places for volume
@ -79,6 +82,7 @@ class Pair(Struct):
tick_size: float # min price step size
status: str
costmin: str|None = None # XXX, only some mktpairs?
short_position_limit: float = 0
long_position_limit: float = float('inf')

View File

@ -37,6 +37,12 @@ import tractor
from async_generator import asynccontextmanager
import numpy as np
import wrapt
# TODO, port to `httpx`/`trio-websocket` whenver i get back to
# writing a proper ws-api streamer for this backend (since the data
# feeds are free now) as per GH feat-req:
# https://github.com/pikers/piker/issues/509
#
import asks
from ..calc import humanize, percent_change

View File

@ -25,7 +25,10 @@ from typing import TYPE_CHECKING
import trio
import tractor
from tractor.trionics import broadcast_receiver
from tractor.trionics import (
broadcast_receiver,
collapse_eg,
)
from ._util import (
log, # sub-sys logger
@ -285,8 +288,11 @@ async def open_ems(
client._ems_stream = trades_stream
# start sync code order msg delivery task
async with trio.open_nursery() as n:
n.start_soon(
async with (
collapse_eg(),
trio.open_nursery() as tn,
):
tn.start_soon(
relay_orders_from_sync_code,
client,
fqme,
@ -302,4 +308,4 @@ async def open_ems(
)
# stop the sync-msg-relay task on exit.
n.cancel_scope.cancel()
tn.cancel_scope.cancel()

View File

@ -42,6 +42,7 @@ from bidict import bidict
import trio
from trio_typing import TaskStatus
import tractor
from tractor import trionics
from ._util import (
log, # sub-sys logger
@ -161,7 +162,7 @@ async def clear_dark_triggers(
router: Router,
brokerd_orders_stream: tractor.MsgStream,
quote_stream: tractor.ReceiveMsgStream, # noqa
quote_stream: tractor.MsgStream,
broker: str,
fqme: str,
@ -177,6 +178,7 @@ async def clear_dark_triggers(
'''
# XXX: optimize this for speed!
# TODO:
# - port to the new ringbuf stuff in `tractor.ipc`!
# - numba all this!
# - this stream may eventually contain multiple symbols
quote_stream._raise_on_lag = False
@ -499,7 +501,7 @@ class Router(Struct):
'''
# setup at actor spawn time
nursery: trio.Nursery
_tn: trio.Nursery
# broker to book map
books: dict[str, DarkBook] = {}
@ -669,7 +671,7 @@ class Router(Struct):
# dark book clearing loop, also lives with parent
# daemon to allow dark order clearing while no
# client is connected.
self.nursery.start_soon(
self._tn.start_soon(
clear_dark_triggers,
self,
relay.brokerd_stream,
@ -692,7 +694,7 @@ class Router(Struct):
# spawn a ``brokerd`` order control dialog stream
# that syncs lifetime with the parent `emsd` daemon.
self.nursery.start_soon(
self._tn.start_soon(
translate_and_relay_brokerd_events,
broker,
relay.brokerd_stream,
@ -766,10 +768,12 @@ async def _setup_persistent_emsd(
global _router
# open a root "service nursery" for the ``emsd`` actor
async with trio.open_nursery() as service_nursery:
_router = Router(nursery=service_nursery)
# open a root "service task-nursery" for the `emsd`-actor
async with (
trionics.collapse_eg(),
trio.open_nursery() as tn
):
_router = Router(_tn=tn)
# TODO: send back the full set of persistent
# orders/execs?
@ -1518,7 +1522,7 @@ async def maybe_open_trade_relays(
loglevel: str = 'info',
):
fqme, relay, feed, client_ready = await _router.nursery.start(
fqme, relay, feed, client_ready = await _router._tn.start(
_router.open_trade_relays,
fqme,
exec_mode,

View File

@ -134,8 +134,8 @@ def pikerd(
Spawn the piker broker-daemon.
'''
from tractor.devx import maybe_open_crash_handler
with maybe_open_crash_handler(pdb=pdb):
# from tractor.devx import maybe_open_crash_handler
# with maybe_open_crash_handler(pdb=False):
log = get_console_log(loglevel, name='cli')
if pdb:
@ -178,16 +178,14 @@ def pikerd(
async def main():
service_mngr: service.Services
async with (
service.open_pikerd(
registry_addrs=regaddrs,
loglevel=loglevel,
debug_mode=pdb,
) as service_mngr, # normally delivers a ``Services`` handle
# AsyncExitStack() as stack,
enable_transports=['uds'],
# enable_transports=['tcp'],
) as service_mngr,
):
assert service_mngr
# ?TODO? spawn all other sub-actor daemons according to
@ -309,6 +307,10 @@ def services(config, tl, ports):
if not ports:
ports = [_default_registry_port]
addr = tractor._addr.wrap_address(
addr=(host, ports[0])
)
async def list_services():
nonlocal host
async with (
@ -316,16 +318,18 @@ def services(config, tl, ports):
name='service_query',
loglevel=config['loglevel'] if tl else None,
),
tractor.get_arbiter(
host=host,
port=ports[0]
tractor.get_registry(
addr=addr,
) as portal
):
registry = await portal.run_from_ns('self', 'get_registry')
registry = await portal.run_from_ns(
'self',
'get_registry',
)
json_d = {}
for key, socket in registry.items():
host, port = socket
json_d[key] = f'{host}:{port}'
json_d[key] = f'{socket}'
click.echo(f"{colorize_json(json_d)}")
trio.run(list_services)

View File

@ -27,7 +27,6 @@ from functools import partial
from types import ModuleType
from typing import (
Any,
Optional,
Callable,
AsyncContextManager,
AsyncGenerator,
@ -35,6 +34,7 @@ from typing import (
)
import json
import tractor
import trio
from trio_typing import TaskStatus
from trio_websocket import (
@ -167,7 +167,7 @@ async def _reconnect_forever(
async def proxy_msgs(
ws: WebSocketConnection,
pcs: trio.CancelScope, # parent cancel scope
rent_cs: trio.CancelScope, # parent cancel scope
):
'''
Receive (under `timeout` deadline) all msgs from from underlying
@ -192,7 +192,7 @@ async def _reconnect_forever(
f'{url} connection bail with:'
)
await trio.sleep(0.5)
pcs.cancel()
rent_cs.cancel()
# go back to reonnect loop in parent task
return
@ -204,7 +204,7 @@ async def _reconnect_forever(
f'{src_mod}\n'
'WS feed seems down and slow af.. reconnecting\n'
)
pcs.cancel()
rent_cs.cancel()
# go back to reonnect loop in parent task
return
@ -228,7 +228,12 @@ async def _reconnect_forever(
nobsws._connected = trio.Event()
task_status.started()
while not snd._closed:
mc_state: trio._channel.MemoryChannelState = snd._state
while (
mc_state.open_receive_channels > 0
and
mc_state.open_send_channels > 0
):
log.info(
f'{src_mod}\n'
f'{url} trying (RE)CONNECT'
@ -237,10 +242,11 @@ async def _reconnect_forever(
ws: WebSocketConnection
try:
async with (
trio.open_nursery() as n,
open_websocket_url(url) as ws,
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
):
cs = nobsws._cs = n.cancel_scope
cs = nobsws._cs = tn.cancel_scope
nobsws._ws = ws
log.info(
f'{src_mod}\n'
@ -248,7 +254,7 @@ async def _reconnect_forever(
)
# begin relay loop to forward msgs
n.start_soon(
tn.start_soon(
proxy_msgs,
ws,
cs,
@ -262,7 +268,7 @@ async def _reconnect_forever(
# TODO: should we return an explicit sub-cs
# from this fixture task?
await n.start(
await tn.start(
open_fixture,
fixture,
nobsws,
@ -272,11 +278,23 @@ async def _reconnect_forever(
# to let tasks run **inside** the ws open block above.
nobsws._connected.set()
await trio.sleep_forever()
except HandshakeError:
except (
HandshakeError,
ConnectionRejected,
):
log.exception('Retrying connection')
await trio.sleep(0.5) # throttle
# ws & nursery block ends
except BaseException as _berr:
berr = _berr
log.exception(
'Reconnect-attempt failed ??\n'
)
await trio.sleep(0.2) # throttle
raise berr
#|_ws & nursery block ends
nobsws._connected = trio.Event()
if cs.cancelled_caught:
log.cancel(
@ -324,21 +342,25 @@ async def open_autorecon_ws(
connetivity errors, or some user defined recv timeout.
You can provide a ``fixture`` async-context-manager which will be
entered/exitted around each connection reset; eg. for (re)requesting
subscriptions without requiring streaming setup code to rerun.
entered/exitted around each connection reset; eg. for
(re)requesting subscriptions without requiring streaming setup
code to rerun.
'''
snd: trio.MemorySendChannel
rcv: trio.MemoryReceiveChannel
snd, rcv = trio.open_memory_channel(616)
async with trio.open_nursery() as n:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
nobsws = NoBsWs(
url,
rcv,
msg_recv_timeout=msg_recv_timeout,
)
await n.start(
await tn.start(
partial(
_reconnect_forever,
url,
@ -351,11 +373,10 @@ async def open_autorecon_ws(
await nobsws._connected.wait()
assert nobsws._cs
assert nobsws.connected()
try:
yield nobsws
finally:
n.cancel_scope.cancel()
tn.cancel_scope.cancel()
'''
@ -368,8 +389,8 @@ of msgs over a `NoBsWs`.
class JSONRPCResult(Struct):
id: int
jsonrpc: str = '2.0'
result: Optional[dict] = None
error: Optional[dict] = None
result: dict|None = None
error: dict|None = None
@acm

View File

@ -39,6 +39,7 @@ from typing import (
AsyncContextManager,
Awaitable,
Sequence,
TYPE_CHECKING,
)
import trio
@ -75,6 +76,10 @@ from ._sampling import (
uniform_rate_send,
)
if TYPE_CHECKING:
from tractor._addr import Address
from tractor.msg.types import Aid
class Sub(Struct, frozen=True):
'''
@ -725,7 +730,10 @@ class Feed(Struct):
async for msg in stream:
await tx.send(msg)
async with trio.open_nursery() as nurse:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as nurse
):
# spawn a relay task for each stream so that they all
# multiplex to a common channel.
for brokername in mods:
@ -899,19 +907,19 @@ async def open_feed(
feed.portals[brokermod] = portal
# fill out "status info" that the UI can show
host, port = portal.channel.raddr
if host == '127.0.0.1':
host = 'localhost'
chan: tractor.Channel = portal.chan
raddr: Address = chan.raddr
aid: Aid = chan.aid
# TAG_feed_status_update
feed.status.update({
'actor_name': portal.channel.uid[0],
'host': host,
'port': port,
'actor_id': aid,
'actor_short_id': f'{aid.name}@{aid.pid}',
'ipc': chan.raddr.proto_key,
'ipc_addr': raddr,
'hist_shm': 'NA',
'rt_shm': 'NA',
'throttle_rate': tick_throttle,
'throttle_hz': tick_throttle,
})
# feed.status.update(init_msg.pop('status', {}))
# (allocate and) connect to any feed bus for this broker
bus_ctxs.append(

View File

@ -498,6 +498,7 @@ async def cascade(
func_name: str = func.__name__
async with (
tractor.trionics.collapse_eg(), # avoid multi-taskc tb in console
trio.open_nursery() as tn,
):
# TODO: might be better to just make a "restart" method where

View File

@ -107,17 +107,22 @@ async def open_piker_runtime(
async with (
tractor.open_root_actor(
# passed through to ``open_root_actor``
# passed through to `open_root_actor`
registry_addrs=registry_addrs,
name=name,
start_method=start_method,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,
# XXX NOTE MEMBER DAT der's a perf hit yo!!
# https://greenback.readthedocs.io/en/latest/principle.html#performance
maybe_enable_greenback=True,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=enable_modules,
hide_tb=False,
**tractor_kwargs,
) as actor,
@ -200,7 +205,8 @@ async def open_pikerd(
reg_addrs,
),
tractor.open_nursery() as actor_nursery,
trio.open_nursery() as service_nursery,
tractor.trionics.collapse_eg(),
trio.open_nursery() as service_tn,
):
for addr in reg_addrs:
if addr not in root_actor.accept_addrs:
@ -211,7 +217,7 @@ async def open_pikerd(
# assign globally for future daemon/task creation
Services.actor_n = actor_nursery
Services.service_n = service_nursery
Services.service_n = service_tn
Services.debug_mode = debug_mode
try:
@ -221,7 +227,7 @@ async def open_pikerd(
# TODO: is this more clever/efficient?
# if 'samplerd' in Services.service_tasks:
# await Services.cancel_service('samplerd')
service_nursery.cancel_scope.cancel()
service_tn.cancel_scope.cancel()
# TODO: do we even need this?
@ -256,7 +262,10 @@ async def maybe_open_pikerd(
loglevel: str | None = None,
**kwargs,
) -> tractor._portal.Portal | ClassVar[Services]:
) -> (
tractor._portal.Portal
|ClassVar[Services]
):
'''
If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self
@ -281,7 +290,8 @@ async def maybe_open_pikerd(
registry_addrs: list[tuple[str, int]] = (
registry_addrs
or [_default_reg_addr]
or
[_default_reg_addr]
)
pikerd_portal: tractor.Portal|None

View File

@ -28,6 +28,7 @@ from contextlib import (
)
import tractor
from trio.lowlevel import current_task
from ._util import (
log, # sub-sys logger
@ -70,6 +71,7 @@ async def maybe_spawn_daemon(
lock = Services.locks[service_name]
await lock.acquire()
try:
async with find_service(
service_name,
registry_addrs=[('127.0.0.1', 6116)],
@ -134,6 +136,20 @@ async def maybe_spawn_daemon(
yield portal
await portal.cancel_actor()
except BaseException as _err:
err = _err
if (
lock.locked()
and
lock.statistics().owner is current_task()
):
log.exception(
f'Releasing stale lock after crash..?'
f'{err!r}\n'
)
lock.release()
raise err
async def spawn_emsd(

View File

@ -109,7 +109,7 @@ class Services:
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res: Any = await ctx.result()
ctx_res: Any = await ctx.wait_for_result()
# NOTE: blocks indefinitely until cancelled
# either by error from the target context

View File

@ -101,13 +101,15 @@ async def open_registry(
if (
not tractor.is_root_process()
and not Registry.addrs
and
not Registry.addrs
):
Registry.addrs.extend(actor.reg_addrs)
if (
ensure_exists
and not Registry.addrs
and
not Registry.addrs
):
raise RuntimeError(
f"`{uid}` registry should already exist but doesn't?"
@ -146,7 +148,7 @@ async def find_service(
| list[Portal]
| None
):
# try:
reg_addrs: list[tuple[str, int]]
async with open_registry(
addrs=(
@ -157,22 +159,39 @@ async def find_service(
or Registry.addrs
),
) as reg_addrs:
log.info(f'Scanning for service `{service_name}`')
maybe_portals: list[Portal] | Portal | None
log.info(
f'Scanning for service {service_name!r}'
)
# attach to existing daemon by name if possible
maybe_portals: list[Portal]|Portal|None
async with tractor.find_actor(
service_name,
registry_addrs=reg_addrs,
only_first=first_only, # if set only returns single ref
) as maybe_portals:
if not maybe_portals:
# log.info(
print(
f'Could NOT find service {service_name!r} -> {maybe_portals!r}'
)
yield None
return
# log.info(
print(
f'Found service {service_name!r} -> {maybe_portals}'
)
yield maybe_portals
# except BaseException as _berr:
# berr = _berr
# log.exception(
# 'tractor.find_actor() failed with,\n'
# )
# raise berr
async def check_for_service(
service_name: str,

View File

@ -963,7 +963,10 @@ async def tsdb_backfill(
# concurrently load the provider's most-recent-frame AND any
# pre-existing tsdb history already saved in `piker` storage.
dt_eps: list[DateTime, DateTime] = []
async with trio.open_nursery() as tn:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
tn.start_soon(
push_latest_frame,
dt_eps,
@ -1012,9 +1015,16 @@ async def tsdb_backfill(
int,
Duration,
]|None = config.get('frame_types', None)
if def_frame_durs:
def_frame_size: Duration = def_frame_durs[timeframe]
assert def_frame_size == calced_frame_size
if def_frame_size != calced_frame_size:
log.warning(
f'Expected frame size {def_frame_size}\n'
f'Rxed frame {calced_frame_size}\n'
)
# await tractor.pause()
else:
# use what we calced from first frame above.
def_frame_size = calced_frame_size
@ -1043,7 +1053,9 @@ async def tsdb_backfill(
# if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb
# continue that now in the background
async with trio.open_nursery() as tn:
async with trio.open_nursery(
strict_exception_groups=False,
) as tn:
bf_done = await tn.start(
partial(
@ -1308,6 +1320,7 @@ async def manage_history(
# sampling period) data set since normally differently
# sampled timeseries can be loaded / process independently
# ;)
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
):
log.info(

View File

@ -21,6 +21,7 @@ Main app startup and run.
from functools import partial
from types import ModuleType
import tractor
import trio
from piker.ui.qt import (
@ -116,6 +117,7 @@ async def _async_main(
needed_brokermods[brokername] = brokers[brokername]
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as root_n,
):
# set root nursery and task stack for spawning other charts/feeds

View File

@ -33,7 +33,6 @@ import trio
from piker.ui.qt import (
QtCore,
QtWidgets,
Qt,
QLineF,
QFrame,

View File

@ -1445,7 +1445,10 @@ async def display_symbol_data(
# for pause/resume on mouse interaction
rt_chart.feed = feed
async with trio.open_nursery() as ln:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as ln,
):
# if available load volume related built-in display(s)
vlm_charts: dict[
str,

View File

@ -22,7 +22,10 @@ from contextlib import asynccontextmanager as acm
from typing import Callable
import trio
from tractor.trionics import gather_contexts
from tractor.trionics import (
gather_contexts,
collapse_eg,
)
from piker.ui.qt import (
QtCore,
@ -207,7 +210,10 @@ async def open_signal_handler(
async for args in recv:
await async_handler(*args)
async with trio.open_nursery() as tn:
async with (
collapse_eg(),
trio.open_nursery() as tn
):
tn.start_soon(proxy_to_handler)
async with send:
yield
@ -242,6 +248,7 @@ async def open_handlers(
widget: QWidget
streams: list[trio.abc.ReceiveChannel]
async with (
collapse_eg(),
trio.open_nursery() as tn,
gather_contexts([
open_event_stream(

View File

@ -18,10 +18,11 @@
Feed status and controls widget(s) for embedding in a UI-pane.
"""
from __future__ import annotations
from textwrap import dedent
from typing import TYPE_CHECKING
from typing import (
Any,
TYPE_CHECKING,
)
# from PyQt5.QtCore import Qt
@ -49,35 +50,55 @@ def mk_feed_label(
a feed control protocol.
'''
status = feed.status
status: dict[str, Any] = feed.status
assert status
msg = dedent("""
actor: **{actor_name}**\n
|_ @**{host}:{port}**\n
""")
# SO tips on ws/nls,
# https://stackoverflow.com/a/15721400
ws: str = ' '
# nl: str = '<br>' # dun work?
actor_info_repr: str = (
f')> **{status["actor_short_id"]}**\n'
'\n' # bc md?
)
for key, val in status.items():
if key in ('host', 'port', 'actor_name'):
continue
msg += f'\n|_ {key}: **{{{key}}}**\n'
# fields to select *IN* for display
# (see `.data.feed.open_feed()` status
# update -> TAG_feed_status_update)
for key in [
'ipc',
'hist_shm',
'rt_shm',
'throttle_hz',
]:
# NOTE, the 2nd key is filled via `.format()` updates.
actor_info_repr += (
f'\n' # bc md?
f'{ws}|_{key}: **{{{key}}}**\n'
)
# ^TODO? formatting and content..
# -[ ] showing which fqme is "forward" on the
# chart/fsp/order-mode?
# '|_ flows: **{symbols}**\n'
#
# -[x] why isn't the indent working?
# => markdown, now solved..
feed_label = FormatLabel(
fmt_str=msg,
# |_ streams: **{symbols}**\n
fmt_str=actor_info_repr,
font=_font.font,
font_size=_font_small.px_size,
font_color='default_lightest',
)
# ?TODO, remove this?
# form.vbox.setAlignment(feed_label, Qt.AlignBottom)
# form.vbox.setAlignment(Qt.AlignBottom)
_ = chart.height() - (
form.height() +
form.fill_bar.height()
# feed_label.height()
)
# _ = chart.height() - (
# form.height() +
# form.fill_bar.height()
# # feed_label.height()
# )
feed_label.format(**feed.status)
return feed_label

View File

@ -600,6 +600,7 @@ async def open_fsp_admin(
kwargs=kwargs,
) as (cache_hit, cluster_map),
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
):
if cache_hit:
@ -613,6 +614,8 @@ async def open_fsp_admin(
)
try:
yield admin
# ??TODO, does this *need* to be inside a finally?
finally:
# terminate all tasks via signals
for key, entry in admin._registry.items():

View File

@ -285,18 +285,20 @@ class FormatLabel(QLabel):
font_size: int,
font_color: str,
use_md: bool = True,
parent=None,
) -> None:
super().__init__(parent)
# by default set the format string verbatim and expect user to
# call ``.format()`` later (presumably they'll notice the
# by default set the format string verbatim and expect user
# to call ``.format()`` later (presumably they'll notice the
# unformatted content if ``fmt_str`` isn't meant to be
# unformatted).
self.fmt_str = fmt_str
self.setText(fmt_str)
# self.setText(fmt_str) # ?TODO, why here?
self.setStyleSheet(
f"""QLabel {{
@ -306,6 +308,7 @@ class FormatLabel(QLabel):
"""
)
self.setFont(_font.font)
if use_md:
self.setTextFormat(
Qt.TextFormat.MarkdownText
)
@ -316,7 +319,10 @@ class FormatLabel(QLabel):
size_policy.Expanding,
)
self.setAlignment(
Qt.AlignVCenter | Qt.AlignLeft
Qt.AlignLeft
|
Qt.AlignBottom
# Qt.AlignVCenter
)
self.setText(self.fmt_str)

View File

@ -15,7 +15,8 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
qompleterz: embeddable search and complete using trio, Qt and rapidfuzz.
qompleterz: embeddable search and complete using trio, Qt and
rapidfuzz.
"""
@ -46,6 +47,7 @@ import time
from pprint import pformat
from rapidfuzz import process as fuzzy
import tractor
import trio
from trio_typing import TaskStatus
@ -53,7 +55,7 @@ from piker.ui.qt import (
size_policy,
align_flag,
Qt,
QtCore,
# QtCore,
QtWidgets,
QModelIndex,
QItemSelectionModel,
@ -920,7 +922,10 @@ async def fill_results(
# issue multi-provider fan-out search request and place
# "searching.." statuses on outstanding results providers
async with trio.open_nursery() as n:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
):
for provider, (search, pause) in (
_searcher_cache.copy().items()
@ -944,7 +949,7 @@ async def fill_results(
status_field='-> searchin..',
)
await n.start(
await tn.start(
pack_matches,
view,
has_results,
@ -1004,12 +1009,14 @@ async def handle_keyboard_input(
view.set_font_size(searchbar.dpi_font.px_size)
send, recv = trio.open_memory_channel(616)
async with trio.open_nursery() as n:
async with (
tractor.trionics.collapse_eg(), # needed?
trio.open_nursery() as tn
):
# start a background multi-searcher task which receives
# patterns relayed from this keyboard input handler and
# async updates the completer view's results.
n.start_soon(
tn.start_soon(
partial(
fill_results,
searchw,

View File

@ -792,6 +792,7 @@ async def open_order_mode(
brokerd_accounts,
ems_dialog_msgs,
),
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
):

View File

@ -15,6 +15,12 @@ from piker.service import (
from piker.log import get_console_log
# include `tractor`'s built-in fixtures!
pytest_plugins: tuple[str] = (
"tractor._testing.pytest",
)
def pytest_addoption(parser):
parser.addoption("--ll", action="store", dest='loglevel',
default=None, help="logging level to set when testing")

View File

@ -142,7 +142,12 @@ async def test_concurrent_tokens_refresh(us_symbols, loglevel):
# async with tractor.open_nursery() as n:
# await n.run_in_actor('other', intermittently_refresh_tokens)
async with trio.open_nursery() as n:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery(
# strict_exception_groups=False,
) as n
):
quoter = await qt.stock_quoter(client, us_symbols)
@ -383,7 +388,9 @@ async def test_quote_streaming(tmx_symbols, loglevel, stream_what):
else:
symbols = [tmx_symbols]
async with trio.open_nursery() as n:
async with trio.open_nursery(
strict_exception_groups=False,
) as n:
for syms, func in zip(symbols, stream_what):
n.start_soon(func, feed, syms)