Merge pull request 'port_to_latest_tractor'
#45 from port_to_latest_tractor into main Reviewed-on: https://www.pikers.dev/pikers/piker/pulls/45main
commit
f218cf450e
|
|
@ -121,6 +121,7 @@ async def bot_main():
|
|||
# tick_throttle=10,
|
||||
) as feed,
|
||||
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
assert accounts
|
||||
|
|
|
|||
|
|
@ -365,7 +365,11 @@ class Position(Struct):
|
|||
# added: bool = False
|
||||
tid: str = t.tid
|
||||
if tid in self._events:
|
||||
log.warning(f'{t} is already added?!')
|
||||
log.debug(
|
||||
f'Txn is already added?\n'
|
||||
f'\n'
|
||||
f'{t}\n'
|
||||
)
|
||||
# return added
|
||||
|
||||
# TODO: apparently this IS possible with a dict but not
|
||||
|
|
@ -731,7 +735,7 @@ class Account(Struct):
|
|||
else:
|
||||
# TODO: we reallly need a diff set of
|
||||
# loglevels/colors per subsys.
|
||||
log.warning(
|
||||
log.debug(
|
||||
f'Recent position for {fqme} was closed!'
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -96,7 +96,10 @@ async def _setup_persistent_brokerd(
|
|||
# - `open_symbol_search()`
|
||||
# NOTE: see ep invocation details inside `.data.feed`.
|
||||
try:
|
||||
async with trio.open_nursery() as service_nursery:
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as service_nursery
|
||||
):
|
||||
bus: _FeedsBus = feed.get_feed_bus(
|
||||
brokername,
|
||||
service_nursery,
|
||||
|
|
|
|||
|
|
@ -440,6 +440,7 @@ async def open_trade_dialog(
|
|||
# - ledger: TransactionLedger
|
||||
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
ctx.open_stream() as ems_stream,
|
||||
):
|
||||
|
|
|
|||
|
|
@ -448,7 +448,6 @@ async def subscribe(
|
|||
|
||||
|
||||
async def stream_quotes(
|
||||
|
||||
send_chan: trio.abc.SendChannel,
|
||||
symbols: list[str],
|
||||
feed_is_live: trio.Event,
|
||||
|
|
@ -460,6 +459,7 @@ async def stream_quotes(
|
|||
) -> None:
|
||||
|
||||
async with (
|
||||
tractor.trionics.maybe_raise_from_masking_exc(),
|
||||
send_chan as send_chan,
|
||||
open_cached_client('binance') as client,
|
||||
):
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ from typing import (
|
|||
Callable,
|
||||
)
|
||||
|
||||
import pendulum
|
||||
from pendulum import now
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
from rapidfuzz import process as fuzzy
|
||||
|
|
@ -39,6 +39,7 @@ import numpy as np
|
|||
from tractor.trionics import (
|
||||
broadcast_receiver,
|
||||
maybe_open_context
|
||||
collapse_eg,
|
||||
)
|
||||
from tractor import to_asyncio
|
||||
# XXX WOOPS XD
|
||||
|
|
@ -432,6 +433,7 @@ async def get_client(
|
|||
) -> Client:
|
||||
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as n,
|
||||
open_jsonrpc_session(
|
||||
_testnet_ws_url, dtype=JSONRPCResult) as json_rpc
|
||||
|
|
|
|||
|
|
@ -48,6 +48,7 @@ from bidict import bidict
|
|||
import trio
|
||||
import tractor
|
||||
from tractor import to_asyncio
|
||||
from tractor import trionics
|
||||
from pendulum import (
|
||||
from_timestamp,
|
||||
DateTime,
|
||||
|
|
@ -1369,8 +1370,8 @@ async def load_clients_for_trio(
|
|||
'''
|
||||
Pure async mngr proxy to ``load_aio_clients()``.
|
||||
|
||||
This is a bootstrap entrypoing to call from
|
||||
a ``tractor.to_asyncio.open_channel_from()``.
|
||||
This is a bootstrap entrypoint to call from
|
||||
a `tractor.to_asyncio.open_channel_from()`.
|
||||
|
||||
'''
|
||||
async with load_aio_clients(
|
||||
|
|
@ -1391,7 +1392,10 @@ async def open_client_proxies() -> tuple[
|
|||
async with (
|
||||
tractor.trionics.maybe_open_context(
|
||||
acm_func=tractor.to_asyncio.open_channel_from,
|
||||
kwargs={'target': load_clients_for_trio},
|
||||
kwargs={
|
||||
'target': load_clients_for_trio,
|
||||
# ^XXX, kwarg to `open_channel_from()`
|
||||
},
|
||||
|
||||
# lock around current actor task access
|
||||
# TODO: maybe this should be the default in tractor?
|
||||
|
|
@ -1584,7 +1588,8 @@ async def open_client_proxy(
|
|||
event_consumers=event_table,
|
||||
) as (first, chan),
|
||||
|
||||
trio.open_nursery() as relay_n,
|
||||
trionics.collapse_eg(), # loose-ify
|
||||
trio.open_nursery() as relay_tn,
|
||||
):
|
||||
|
||||
assert isinstance(first, Client)
|
||||
|
|
@ -1624,7 +1629,7 @@ async def open_client_proxy(
|
|||
|
||||
continue
|
||||
|
||||
relay_n.start_soon(relay_events)
|
||||
relay_tn.start_soon(relay_events)
|
||||
|
||||
yield proxy
|
||||
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ import trio
|
|||
from trio_typing import TaskStatus
|
||||
import tractor
|
||||
from tractor.to_asyncio import LinkedTaskChannel
|
||||
from tractor import trionics
|
||||
from ib_insync.contract import (
|
||||
Contract,
|
||||
)
|
||||
|
|
@ -407,7 +408,7 @@ async def update_and_audit_pos_msg(
|
|||
|
||||
# TODO: make this a "propaganda" log level?
|
||||
if ibpos.avgCost != msg.avg_price:
|
||||
log.warning(
|
||||
log.debug(
|
||||
f'IB "FIFO" avg price for {msg.symbol} is DIFF:\n'
|
||||
f'ib: {ibfmtmsg}\n'
|
||||
'---------------------------\n'
|
||||
|
|
@ -738,7 +739,7 @@ async def open_trade_dialog(
|
|||
f'UNEXPECTED POSITION says IB => {msg.symbol}\n'
|
||||
'Maybe they LIQUIDATED YOU or your ledger is wrong?\n'
|
||||
)
|
||||
log.error(logmsg)
|
||||
log.debug(logmsg)
|
||||
|
||||
await ctx.started((
|
||||
all_positions,
|
||||
|
|
@ -747,21 +748,22 @@ async def open_trade_dialog(
|
|||
|
||||
async with (
|
||||
ctx.open_stream() as ems_stream,
|
||||
trio.open_nursery() as n,
|
||||
trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
# relay existing open orders to ems
|
||||
for msg in order_msgs:
|
||||
await ems_stream.send(msg)
|
||||
|
||||
for client in set(aioclients.values()):
|
||||
trade_event_stream: LinkedTaskChannel = await n.start(
|
||||
trade_event_stream: LinkedTaskChannel = await tn.start(
|
||||
open_trade_event_stream,
|
||||
client,
|
||||
)
|
||||
|
||||
# start order request handler **before** local trades
|
||||
# event loop
|
||||
n.start_soon(
|
||||
tn.start_soon(
|
||||
handle_order_requests,
|
||||
ems_stream,
|
||||
accounts_def,
|
||||
|
|
@ -769,7 +771,7 @@ async def open_trade_dialog(
|
|||
)
|
||||
|
||||
# allocate event relay tasks for each client connection
|
||||
n.start_soon(
|
||||
tn.start_soon(
|
||||
deliver_trade_events,
|
||||
|
||||
trade_event_stream,
|
||||
|
|
|
|||
|
|
@ -25,7 +25,10 @@ from typing import TYPE_CHECKING
|
|||
|
||||
import trio
|
||||
import tractor
|
||||
from tractor.trionics import broadcast_receiver
|
||||
from tractor.trionics import (
|
||||
broadcast_receiver,
|
||||
collapse_eg,
|
||||
)
|
||||
|
||||
from ._util import (
|
||||
log, # sub-sys logger
|
||||
|
|
@ -285,8 +288,11 @@ async def open_ems(
|
|||
client._ems_stream = trades_stream
|
||||
|
||||
# start sync code order msg delivery task
|
||||
async with trio.open_nursery() as n:
|
||||
n.start_soon(
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
tn.start_soon(
|
||||
relay_orders_from_sync_code,
|
||||
client,
|
||||
fqme,
|
||||
|
|
@ -302,4 +308,4 @@ async def open_ems(
|
|||
)
|
||||
|
||||
# stop the sync-msg-relay task on exit.
|
||||
n.cancel_scope.cancel()
|
||||
tn.cancel_scope.cancel()
|
||||
|
|
|
|||
|
|
@ -42,6 +42,7 @@ from bidict import bidict
|
|||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
import tractor
|
||||
from tractor import trionics
|
||||
|
||||
from ._util import (
|
||||
log, # sub-sys logger
|
||||
|
|
@ -161,7 +162,7 @@ async def clear_dark_triggers(
|
|||
|
||||
router: Router,
|
||||
brokerd_orders_stream: tractor.MsgStream,
|
||||
quote_stream: tractor.ReceiveMsgStream, # noqa
|
||||
quote_stream: tractor.MsgStream,
|
||||
broker: str,
|
||||
fqme: str,
|
||||
|
||||
|
|
@ -177,6 +178,7 @@ async def clear_dark_triggers(
|
|||
'''
|
||||
# XXX: optimize this for speed!
|
||||
# TODO:
|
||||
# - port to the new ringbuf stuff in `tractor.ipc`!
|
||||
# - numba all this!
|
||||
# - this stream may eventually contain multiple symbols
|
||||
quote_stream._raise_on_lag = False
|
||||
|
|
@ -499,7 +501,7 @@ class Router(Struct):
|
|||
|
||||
'''
|
||||
# setup at actor spawn time
|
||||
nursery: trio.Nursery
|
||||
_tn: trio.Nursery
|
||||
|
||||
# broker to book map
|
||||
books: dict[str, DarkBook] = {}
|
||||
|
|
@ -669,7 +671,7 @@ class Router(Struct):
|
|||
# dark book clearing loop, also lives with parent
|
||||
# daemon to allow dark order clearing while no
|
||||
# client is connected.
|
||||
self.nursery.start_soon(
|
||||
self._tn.start_soon(
|
||||
clear_dark_triggers,
|
||||
self,
|
||||
relay.brokerd_stream,
|
||||
|
|
@ -692,7 +694,7 @@ class Router(Struct):
|
|||
|
||||
# spawn a ``brokerd`` order control dialog stream
|
||||
# that syncs lifetime with the parent `emsd` daemon.
|
||||
self.nursery.start_soon(
|
||||
self._tn.start_soon(
|
||||
translate_and_relay_brokerd_events,
|
||||
broker,
|
||||
relay.brokerd_stream,
|
||||
|
|
@ -766,10 +768,12 @@ async def _setup_persistent_emsd(
|
|||
|
||||
global _router
|
||||
|
||||
# open a root "service nursery" for the ``emsd`` actor
|
||||
async with trio.open_nursery() as service_nursery:
|
||||
|
||||
_router = Router(nursery=service_nursery)
|
||||
# open a root "service task-nursery" for the `emsd`-actor
|
||||
async with (
|
||||
trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn
|
||||
):
|
||||
_router = Router(_tn=tn)
|
||||
|
||||
# TODO: send back the full set of persistent
|
||||
# orders/execs?
|
||||
|
|
@ -1518,7 +1522,7 @@ async def maybe_open_trade_relays(
|
|||
loglevel: str = 'info',
|
||||
):
|
||||
|
||||
fqme, relay, feed, client_ready = await _router.nursery.start(
|
||||
fqme, relay, feed, client_ready = await _router._tn.start(
|
||||
_router.open_trade_relays,
|
||||
fqme,
|
||||
exec_mode,
|
||||
|
|
|
|||
|
|
@ -134,67 +134,65 @@ def pikerd(
|
|||
Spawn the piker broker-daemon.
|
||||
|
||||
'''
|
||||
from tractor.devx import maybe_open_crash_handler
|
||||
with maybe_open_crash_handler(pdb=pdb):
|
||||
log = get_console_log(loglevel, name='cli')
|
||||
# from tractor.devx import maybe_open_crash_handler
|
||||
# with maybe_open_crash_handler(pdb=False):
|
||||
log = get_console_log(loglevel, name='cli')
|
||||
|
||||
if pdb:
|
||||
log.warning((
|
||||
"\n"
|
||||
"!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n"
|
||||
"When a `piker` daemon crashes it will block the "
|
||||
"task-thread until resumed from console!\n"
|
||||
"\n"
|
||||
if pdb:
|
||||
log.warning((
|
||||
"\n"
|
||||
"!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n"
|
||||
"When a `piker` daemon crashes it will block the "
|
||||
"task-thread until resumed from console!\n"
|
||||
"\n"
|
||||
))
|
||||
|
||||
# service-actor registry endpoint socket-address set
|
||||
regaddrs: list[tuple[str, int]] = []
|
||||
|
||||
conf, _ = config.load(
|
||||
conf_name='conf',
|
||||
)
|
||||
network: dict = conf.get('network')
|
||||
if (
|
||||
network is None
|
||||
and not maddr
|
||||
):
|
||||
regaddrs = [(
|
||||
_default_registry_host,
|
||||
_default_registry_port,
|
||||
)]
|
||||
|
||||
else:
|
||||
eps: dict = load_trans_eps(
|
||||
network,
|
||||
maddr,
|
||||
)
|
||||
for layers in eps['pikerd']:
|
||||
regaddrs.append((
|
||||
layers['ipv4']['addr'],
|
||||
layers['tcp']['port'],
|
||||
))
|
||||
|
||||
# service-actor registry endpoint socket-address set
|
||||
regaddrs: list[tuple[str, int]] = []
|
||||
from .. import service
|
||||
|
||||
conf, _ = config.load(
|
||||
conf_name='conf',
|
||||
)
|
||||
network: dict = conf.get('network')
|
||||
if (
|
||||
network is None
|
||||
and not maddr
|
||||
async def main():
|
||||
service_mngr: service.Services
|
||||
async with (
|
||||
service.open_pikerd(
|
||||
registry_addrs=regaddrs,
|
||||
loglevel=loglevel,
|
||||
debug_mode=pdb,
|
||||
# enable_transports=['uds'],
|
||||
enable_transports=['tcp'],
|
||||
) as service_mngr,
|
||||
):
|
||||
regaddrs = [(
|
||||
_default_registry_host,
|
||||
_default_registry_port,
|
||||
)]
|
||||
assert service_mngr
|
||||
# ?TODO? spawn all other sub-actor daemons according to
|
||||
# multiaddress endpoint spec defined by user config
|
||||
await trio.sleep_forever()
|
||||
|
||||
else:
|
||||
eps: dict = load_trans_eps(
|
||||
network,
|
||||
maddr,
|
||||
)
|
||||
for layers in eps['pikerd']:
|
||||
regaddrs.append((
|
||||
layers['ipv4']['addr'],
|
||||
layers['tcp']['port'],
|
||||
))
|
||||
|
||||
from .. import service
|
||||
|
||||
async def main():
|
||||
service_mngr: service.Services
|
||||
|
||||
async with (
|
||||
service.open_pikerd(
|
||||
registry_addrs=regaddrs,
|
||||
loglevel=loglevel,
|
||||
debug_mode=pdb,
|
||||
|
||||
) as service_mngr, # normally delivers a ``Services`` handle
|
||||
|
||||
# AsyncExitStack() as stack,
|
||||
):
|
||||
assert service_mngr
|
||||
# ?TODO? spawn all other sub-actor daemons according to
|
||||
# multiaddress endpoint spec defined by user config
|
||||
await trio.sleep_forever()
|
||||
|
||||
trio.run(main)
|
||||
trio.run(main)
|
||||
|
||||
|
||||
@click.group(context_settings=config._context_defaults)
|
||||
|
|
@ -309,6 +307,10 @@ def services(config, tl, ports):
|
|||
if not ports:
|
||||
ports = [_default_registry_port]
|
||||
|
||||
addr = tractor._addr.wrap_address(
|
||||
addr=(host, ports[0])
|
||||
)
|
||||
|
||||
async def list_services():
|
||||
nonlocal host
|
||||
async with (
|
||||
|
|
@ -316,16 +318,18 @@ def services(config, tl, ports):
|
|||
name='service_query',
|
||||
loglevel=config['loglevel'] if tl else None,
|
||||
),
|
||||
tractor.get_arbiter(
|
||||
host=host,
|
||||
port=ports[0]
|
||||
tractor.get_registry(
|
||||
addr=addr,
|
||||
) as portal
|
||||
):
|
||||
registry = await portal.run_from_ns('self', 'get_registry')
|
||||
registry = await portal.run_from_ns(
|
||||
'self',
|
||||
'get_registry',
|
||||
)
|
||||
json_d = {}
|
||||
for key, socket in registry.items():
|
||||
host, port = socket
|
||||
json_d[key] = f'{host}:{port}'
|
||||
json_d[key] = f'{socket}'
|
||||
|
||||
click.echo(f"{colorize_json(json_d)}")
|
||||
|
||||
trio.run(list_services)
|
||||
|
|
|
|||
|
|
@ -27,7 +27,6 @@ from functools import partial
|
|||
from types import ModuleType
|
||||
from typing import (
|
||||
Any,
|
||||
Optional,
|
||||
Callable,
|
||||
AsyncContextManager,
|
||||
AsyncGenerator,
|
||||
|
|
@ -35,6 +34,7 @@ from typing import (
|
|||
)
|
||||
import json
|
||||
|
||||
import tractor
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
from trio_websocket import (
|
||||
|
|
@ -167,7 +167,7 @@ async def _reconnect_forever(
|
|||
|
||||
async def proxy_msgs(
|
||||
ws: WebSocketConnection,
|
||||
pcs: trio.CancelScope, # parent cancel scope
|
||||
rent_cs: trio.CancelScope, # parent cancel scope
|
||||
):
|
||||
'''
|
||||
Receive (under `timeout` deadline) all msgs from from underlying
|
||||
|
|
@ -192,7 +192,7 @@ async def _reconnect_forever(
|
|||
f'{url} connection bail with:'
|
||||
)
|
||||
await trio.sleep(0.5)
|
||||
pcs.cancel()
|
||||
rent_cs.cancel()
|
||||
|
||||
# go back to reonnect loop in parent task
|
||||
return
|
||||
|
|
@ -204,7 +204,7 @@ async def _reconnect_forever(
|
|||
f'{src_mod}\n'
|
||||
'WS feed seems down and slow af.. reconnecting\n'
|
||||
)
|
||||
pcs.cancel()
|
||||
rent_cs.cancel()
|
||||
|
||||
# go back to reonnect loop in parent task
|
||||
return
|
||||
|
|
@ -228,7 +228,12 @@ async def _reconnect_forever(
|
|||
nobsws._connected = trio.Event()
|
||||
task_status.started()
|
||||
|
||||
while not snd._closed:
|
||||
mc_state: trio._channel.MemoryChannelState = snd._state
|
||||
while (
|
||||
mc_state.open_receive_channels > 0
|
||||
and
|
||||
mc_state.open_send_channels > 0
|
||||
):
|
||||
log.info(
|
||||
f'{src_mod}\n'
|
||||
f'{url} trying (RE)CONNECT'
|
||||
|
|
@ -237,10 +242,11 @@ async def _reconnect_forever(
|
|||
ws: WebSocketConnection
|
||||
try:
|
||||
async with (
|
||||
trio.open_nursery() as n,
|
||||
open_websocket_url(url) as ws,
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
cs = nobsws._cs = n.cancel_scope
|
||||
cs = nobsws._cs = tn.cancel_scope
|
||||
nobsws._ws = ws
|
||||
log.info(
|
||||
f'{src_mod}\n'
|
||||
|
|
@ -248,7 +254,7 @@ async def _reconnect_forever(
|
|||
)
|
||||
|
||||
# begin relay loop to forward msgs
|
||||
n.start_soon(
|
||||
tn.start_soon(
|
||||
proxy_msgs,
|
||||
ws,
|
||||
cs,
|
||||
|
|
@ -262,7 +268,7 @@ async def _reconnect_forever(
|
|||
|
||||
# TODO: should we return an explicit sub-cs
|
||||
# from this fixture task?
|
||||
await n.start(
|
||||
await tn.start(
|
||||
open_fixture,
|
||||
fixture,
|
||||
nobsws,
|
||||
|
|
@ -272,11 +278,23 @@ async def _reconnect_forever(
|
|||
# to let tasks run **inside** the ws open block above.
|
||||
nobsws._connected.set()
|
||||
await trio.sleep_forever()
|
||||
except HandshakeError:
|
||||
|
||||
except (
|
||||
HandshakeError,
|
||||
ConnectionRejected,
|
||||
):
|
||||
log.exception('Retrying connection')
|
||||
await trio.sleep(0.5) # throttle
|
||||
|
||||
# ws & nursery block ends
|
||||
except BaseException as _berr:
|
||||
berr = _berr
|
||||
log.exception(
|
||||
'Reconnect-attempt failed ??\n'
|
||||
)
|
||||
await trio.sleep(0.2) # throttle
|
||||
raise berr
|
||||
|
||||
#|_ws & nursery block ends
|
||||
nobsws._connected = trio.Event()
|
||||
if cs.cancelled_caught:
|
||||
log.cancel(
|
||||
|
|
@ -324,21 +342,25 @@ async def open_autorecon_ws(
|
|||
connetivity errors, or some user defined recv timeout.
|
||||
|
||||
You can provide a ``fixture`` async-context-manager which will be
|
||||
entered/exitted around each connection reset; eg. for (re)requesting
|
||||
subscriptions without requiring streaming setup code to rerun.
|
||||
entered/exitted around each connection reset; eg. for
|
||||
(re)requesting subscriptions without requiring streaming setup
|
||||
code to rerun.
|
||||
|
||||
'''
|
||||
snd: trio.MemorySendChannel
|
||||
rcv: trio.MemoryReceiveChannel
|
||||
snd, rcv = trio.open_memory_channel(616)
|
||||
|
||||
async with trio.open_nursery() as n:
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn
|
||||
):
|
||||
nobsws = NoBsWs(
|
||||
url,
|
||||
rcv,
|
||||
msg_recv_timeout=msg_recv_timeout,
|
||||
)
|
||||
await n.start(
|
||||
await tn.start(
|
||||
partial(
|
||||
_reconnect_forever,
|
||||
url,
|
||||
|
|
@ -351,11 +373,10 @@ async def open_autorecon_ws(
|
|||
await nobsws._connected.wait()
|
||||
assert nobsws._cs
|
||||
assert nobsws.connected()
|
||||
|
||||
try:
|
||||
yield nobsws
|
||||
finally:
|
||||
n.cancel_scope.cancel()
|
||||
tn.cancel_scope.cancel()
|
||||
|
||||
|
||||
'''
|
||||
|
|
@ -368,8 +389,8 @@ of msgs over a `NoBsWs`.
|
|||
class JSONRPCResult(Struct):
|
||||
id: int
|
||||
jsonrpc: str = '2.0'
|
||||
result: Optional[dict] = None
|
||||
error: Optional[dict] = None
|
||||
result: dict|None = None
|
||||
error: dict|None = None
|
||||
|
||||
|
||||
@acm
|
||||
|
|
|
|||
|
|
@ -39,6 +39,7 @@ from typing import (
|
|||
AsyncContextManager,
|
||||
Awaitable,
|
||||
Sequence,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
import trio
|
||||
|
|
@ -75,6 +76,10 @@ from ._sampling import (
|
|||
uniform_rate_send,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from tractor._addr import Address
|
||||
from tractor.msg.types import Aid
|
||||
|
||||
|
||||
class Sub(Struct, frozen=True):
|
||||
'''
|
||||
|
|
@ -725,7 +730,10 @@ class Feed(Struct):
|
|||
async for msg in stream:
|
||||
await tx.send(msg)
|
||||
|
||||
async with trio.open_nursery() as nurse:
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as nurse
|
||||
):
|
||||
# spawn a relay task for each stream so that they all
|
||||
# multiplex to a common channel.
|
||||
for brokername in mods:
|
||||
|
|
@ -899,19 +907,19 @@ async def open_feed(
|
|||
feed.portals[brokermod] = portal
|
||||
|
||||
# fill out "status info" that the UI can show
|
||||
host, port = portal.channel.raddr
|
||||
if host == '127.0.0.1':
|
||||
host = 'localhost'
|
||||
|
||||
chan: tractor.Channel = portal.chan
|
||||
raddr: Address = chan.raddr
|
||||
aid: Aid = chan.aid
|
||||
# TAG_feed_status_update
|
||||
feed.status.update({
|
||||
'actor_name': portal.channel.uid[0],
|
||||
'host': host,
|
||||
'port': port,
|
||||
'actor_id': aid,
|
||||
'actor_short_id': f'{aid.name}@{aid.pid}',
|
||||
'ipc': chan.raddr.proto_key,
|
||||
'ipc_addr': raddr,
|
||||
'hist_shm': 'NA',
|
||||
'rt_shm': 'NA',
|
||||
'throttle_rate': tick_throttle,
|
||||
'throttle_hz': tick_throttle,
|
||||
})
|
||||
# feed.status.update(init_msg.pop('status', {}))
|
||||
|
||||
# (allocate and) connect to any feed bus for this broker
|
||||
bus_ctxs.append(
|
||||
|
|
|
|||
|
|
@ -498,6 +498,7 @@ async def cascade(
|
|||
|
||||
func_name: str = func.__name__
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(), # avoid multi-taskc tb in console
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
# TODO: might be better to just make a "restart" method where
|
||||
|
|
|
|||
|
|
@ -107,17 +107,22 @@ async def open_piker_runtime(
|
|||
async with (
|
||||
tractor.open_root_actor(
|
||||
|
||||
# passed through to ``open_root_actor``
|
||||
# passed through to `open_root_actor`
|
||||
registry_addrs=registry_addrs,
|
||||
name=name,
|
||||
start_method=start_method,
|
||||
loglevel=loglevel,
|
||||
debug_mode=debug_mode,
|
||||
start_method=start_method,
|
||||
|
||||
# XXX NOTE MEMBER DAT der's a perf hit yo!!
|
||||
# https://greenback.readthedocs.io/en/latest/principle.html#performance
|
||||
maybe_enable_greenback=True,
|
||||
|
||||
# TODO: eventually we should be able to avoid
|
||||
# having the root have more then permissions to
|
||||
# spawn other specialized daemons I think?
|
||||
enable_modules=enable_modules,
|
||||
hide_tb=False,
|
||||
|
||||
**tractor_kwargs,
|
||||
) as actor,
|
||||
|
|
@ -200,7 +205,8 @@ async def open_pikerd(
|
|||
reg_addrs,
|
||||
),
|
||||
tractor.open_nursery() as actor_nursery,
|
||||
trio.open_nursery() as service_nursery,
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as service_tn,
|
||||
):
|
||||
for addr in reg_addrs:
|
||||
if addr not in root_actor.accept_addrs:
|
||||
|
|
@ -211,7 +217,7 @@ async def open_pikerd(
|
|||
|
||||
# assign globally for future daemon/task creation
|
||||
Services.actor_n = actor_nursery
|
||||
Services.service_n = service_nursery
|
||||
Services.service_n = service_tn
|
||||
Services.debug_mode = debug_mode
|
||||
|
||||
try:
|
||||
|
|
@ -221,7 +227,7 @@ async def open_pikerd(
|
|||
# TODO: is this more clever/efficient?
|
||||
# if 'samplerd' in Services.service_tasks:
|
||||
# await Services.cancel_service('samplerd')
|
||||
service_nursery.cancel_scope.cancel()
|
||||
service_tn.cancel_scope.cancel()
|
||||
|
||||
|
||||
# TODO: do we even need this?
|
||||
|
|
@ -256,7 +262,10 @@ async def maybe_open_pikerd(
|
|||
loglevel: str | None = None,
|
||||
**kwargs,
|
||||
|
||||
) -> tractor._portal.Portal | ClassVar[Services]:
|
||||
) -> (
|
||||
tractor._portal.Portal
|
||||
|ClassVar[Services]
|
||||
):
|
||||
'''
|
||||
If no ``pikerd`` daemon-root-actor can be found start it and
|
||||
yield up (we should probably figure out returning a portal to self
|
||||
|
|
@ -281,10 +290,11 @@ async def maybe_open_pikerd(
|
|||
|
||||
registry_addrs: list[tuple[str, int]] = (
|
||||
registry_addrs
|
||||
or [_default_reg_addr]
|
||||
or
|
||||
[_default_reg_addr]
|
||||
)
|
||||
|
||||
pikerd_portal: tractor.Portal | None
|
||||
pikerd_portal: tractor.Portal|None
|
||||
async with (
|
||||
open_piker_runtime(
|
||||
name=query_name,
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ from contextlib import (
|
|||
)
|
||||
|
||||
import tractor
|
||||
from trio.lowlevel import current_task
|
||||
|
||||
from ._util import (
|
||||
log, # sub-sys logger
|
||||
|
|
@ -70,69 +71,84 @@ async def maybe_spawn_daemon(
|
|||
lock = Services.locks[service_name]
|
||||
await lock.acquire()
|
||||
|
||||
async with find_service(
|
||||
service_name,
|
||||
registry_addrs=[('127.0.0.1', 6116)],
|
||||
) as portal:
|
||||
if portal is not None:
|
||||
lock.release()
|
||||
yield portal
|
||||
return
|
||||
try:
|
||||
async with find_service(
|
||||
service_name,
|
||||
registry_addrs=[('127.0.0.1', 6116)],
|
||||
) as portal:
|
||||
if portal is not None:
|
||||
lock.release()
|
||||
yield portal
|
||||
return
|
||||
|
||||
log.warning(
|
||||
f"Couldn't find any existing {service_name}\n"
|
||||
'Attempting to spawn new daemon-service..'
|
||||
)
|
||||
log.warning(
|
||||
f"Couldn't find any existing {service_name}\n"
|
||||
'Attempting to spawn new daemon-service..'
|
||||
)
|
||||
|
||||
# ask root ``pikerd`` daemon to spawn the daemon we need if
|
||||
# pikerd is not live we now become the root of the
|
||||
# process tree
|
||||
async with maybe_open_pikerd(
|
||||
loglevel=loglevel,
|
||||
**pikerd_kwargs,
|
||||
# ask root ``pikerd`` daemon to spawn the daemon we need if
|
||||
# pikerd is not live we now become the root of the
|
||||
# process tree
|
||||
async with maybe_open_pikerd(
|
||||
loglevel=loglevel,
|
||||
**pikerd_kwargs,
|
||||
|
||||
) as pikerd_portal:
|
||||
) as pikerd_portal:
|
||||
|
||||
# we are the root and thus are `pikerd`
|
||||
# so spawn the target service directly by calling
|
||||
# the provided target routine.
|
||||
# XXX: this assumes that the target is well formed and will
|
||||
# do the right things to setup both a sub-actor **and** call
|
||||
# the ``_Services`` api from above to start the top level
|
||||
# service task for that actor.
|
||||
started: bool
|
||||
if pikerd_portal is None:
|
||||
started = await service_task_target(
|
||||
loglevel=loglevel,
|
||||
**spawn_args,
|
||||
# we are the root and thus are `pikerd`
|
||||
# so spawn the target service directly by calling
|
||||
# the provided target routine.
|
||||
# XXX: this assumes that the target is well formed and will
|
||||
# do the right things to setup both a sub-actor **and** call
|
||||
# the ``_Services`` api from above to start the top level
|
||||
# service task for that actor.
|
||||
started: bool
|
||||
if pikerd_portal is None:
|
||||
started = await service_task_target(
|
||||
loglevel=loglevel,
|
||||
**spawn_args,
|
||||
)
|
||||
|
||||
else:
|
||||
# request a remote `pikerd` (service manager) to start the
|
||||
# target daemon-task, the target can't return
|
||||
# a non-serializable value since it is expected that service
|
||||
# starting is non-blocking and the target task will persist
|
||||
# running "under" or "within" the `pikerd` actor tree after
|
||||
# the questing client disconnects. in other words this
|
||||
# spawns a persistent daemon actor that continues to live
|
||||
# for the lifespan of whatever the service manager inside
|
||||
# `pikerd` says it should.
|
||||
started = await pikerd_portal.run(
|
||||
service_task_target,
|
||||
loglevel=loglevel,
|
||||
**spawn_args,
|
||||
)
|
||||
|
||||
if started:
|
||||
log.info(f'Service {service_name} started!')
|
||||
|
||||
# block until we can discover (by IPC connection) to the newly
|
||||
# spawned daemon-actor and then deliver the portal to the
|
||||
# caller.
|
||||
async with tractor.wait_for_actor(service_name) as portal:
|
||||
lock.release()
|
||||
yield portal
|
||||
await portal.cancel_actor()
|
||||
|
||||
except BaseException as _err:
|
||||
err = _err
|
||||
if (
|
||||
lock.locked()
|
||||
and
|
||||
lock.statistics().owner is current_task()
|
||||
):
|
||||
log.exception(
|
||||
f'Releasing stale lock after crash..?'
|
||||
f'{err!r}\n'
|
||||
)
|
||||
|
||||
else:
|
||||
# request a remote `pikerd` (service manager) to start the
|
||||
# target daemon-task, the target can't return
|
||||
# a non-serializable value since it is expected that service
|
||||
# starting is non-blocking and the target task will persist
|
||||
# running "under" or "within" the `pikerd` actor tree after
|
||||
# the questing client disconnects. in other words this
|
||||
# spawns a persistent daemon actor that continues to live
|
||||
# for the lifespan of whatever the service manager inside
|
||||
# `pikerd` says it should.
|
||||
started = await pikerd_portal.run(
|
||||
service_task_target,
|
||||
loglevel=loglevel,
|
||||
**spawn_args,
|
||||
)
|
||||
|
||||
if started:
|
||||
log.info(f'Service {service_name} started!')
|
||||
|
||||
# block until we can discover (by IPC connection) to the newly
|
||||
# spawned daemon-actor and then deliver the portal to the
|
||||
# caller.
|
||||
async with tractor.wait_for_actor(service_name) as portal:
|
||||
lock.release()
|
||||
yield portal
|
||||
await portal.cancel_actor()
|
||||
raise err
|
||||
|
||||
|
||||
async def spawn_emsd(
|
||||
|
|
|
|||
|
|
@ -109,7 +109,7 @@ class Services:
|
|||
# wait on any context's return value
|
||||
# and any final portal result from the
|
||||
# sub-actor.
|
||||
ctx_res: Any = await ctx.result()
|
||||
ctx_res: Any = await ctx.wait_for_result()
|
||||
|
||||
# NOTE: blocks indefinitely until cancelled
|
||||
# either by error from the target context
|
||||
|
|
|
|||
|
|
@ -101,13 +101,15 @@ async def open_registry(
|
|||
|
||||
if (
|
||||
not tractor.is_root_process()
|
||||
and not Registry.addrs
|
||||
and
|
||||
not Registry.addrs
|
||||
):
|
||||
Registry.addrs.extend(actor.reg_addrs)
|
||||
|
||||
if (
|
||||
ensure_exists
|
||||
and not Registry.addrs
|
||||
and
|
||||
not Registry.addrs
|
||||
):
|
||||
raise RuntimeError(
|
||||
f"`{uid}` registry should already exist but doesn't?"
|
||||
|
|
@ -146,7 +148,7 @@ async def find_service(
|
|||
| list[Portal]
|
||||
| None
|
||||
):
|
||||
|
||||
# try:
|
||||
reg_addrs: list[tuple[str, int]]
|
||||
async with open_registry(
|
||||
addrs=(
|
||||
|
|
@ -157,22 +159,39 @@ async def find_service(
|
|||
or Registry.addrs
|
||||
),
|
||||
) as reg_addrs:
|
||||
log.info(f'Scanning for service `{service_name}`')
|
||||
|
||||
maybe_portals: list[Portal] | Portal | None
|
||||
log.info(
|
||||
f'Scanning for service {service_name!r}'
|
||||
)
|
||||
|
||||
# attach to existing daemon by name if possible
|
||||
maybe_portals: list[Portal]|Portal|None
|
||||
async with tractor.find_actor(
|
||||
service_name,
|
||||
registry_addrs=reg_addrs,
|
||||
only_first=first_only, # if set only returns single ref
|
||||
) as maybe_portals:
|
||||
if not maybe_portals:
|
||||
# log.info(
|
||||
print(
|
||||
f'Could NOT find service {service_name!r} -> {maybe_portals!r}'
|
||||
)
|
||||
yield None
|
||||
return
|
||||
|
||||
# log.info(
|
||||
print(
|
||||
f'Found service {service_name!r} -> {maybe_portals}'
|
||||
)
|
||||
yield maybe_portals
|
||||
|
||||
# except BaseException as _berr:
|
||||
# berr = _berr
|
||||
# log.exception(
|
||||
# 'tractor.find_actor() failed with,\n'
|
||||
# )
|
||||
# raise berr
|
||||
|
||||
|
||||
async def check_for_service(
|
||||
service_name: str,
|
||||
|
|
|
|||
|
|
@ -963,7 +963,10 @@ async def tsdb_backfill(
|
|||
# concurrently load the provider's most-recent-frame AND any
|
||||
# pre-existing tsdb history already saved in `piker` storage.
|
||||
dt_eps: list[DateTime, DateTime] = []
|
||||
async with trio.open_nursery() as tn:
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn
|
||||
):
|
||||
tn.start_soon(
|
||||
push_latest_frame,
|
||||
dt_eps,
|
||||
|
|
@ -1012,9 +1015,16 @@ async def tsdb_backfill(
|
|||
int,
|
||||
Duration,
|
||||
]|None = config.get('frame_types', None)
|
||||
|
||||
if def_frame_durs:
|
||||
def_frame_size: Duration = def_frame_durs[timeframe]
|
||||
assert def_frame_size == calced_frame_size
|
||||
|
||||
if def_frame_size != calced_frame_size:
|
||||
log.warning(
|
||||
f'Expected frame size {def_frame_size}\n'
|
||||
f'Rxed frame {calced_frame_size}\n'
|
||||
)
|
||||
# await tractor.pause()
|
||||
else:
|
||||
# use what we calced from first frame above.
|
||||
def_frame_size = calced_frame_size
|
||||
|
|
@ -1043,7 +1053,9 @@ async def tsdb_backfill(
|
|||
# if there is a gap to backfill from the first
|
||||
# history frame until the last datum loaded from the tsdb
|
||||
# continue that now in the background
|
||||
async with trio.open_nursery() as tn:
|
||||
async with trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as tn:
|
||||
|
||||
bf_done = await tn.start(
|
||||
partial(
|
||||
|
|
@ -1308,6 +1320,7 @@ async def manage_history(
|
|||
# sampling period) data set since normally differently
|
||||
# sampled timeseries can be loaded / process independently
|
||||
# ;)
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
log.info(
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ Main app startup and run.
|
|||
from functools import partial
|
||||
from types import ModuleType
|
||||
|
||||
import tractor
|
||||
import trio
|
||||
|
||||
from piker.ui.qt import (
|
||||
|
|
@ -116,6 +117,7 @@ async def _async_main(
|
|||
needed_brokermods[brokername] = brokers[brokername]
|
||||
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as root_n,
|
||||
):
|
||||
# set root nursery and task stack for spawning other charts/feeds
|
||||
|
|
|
|||
|
|
@ -33,7 +33,6 @@ import trio
|
|||
|
||||
from piker.ui.qt import (
|
||||
QtCore,
|
||||
QtWidgets,
|
||||
Qt,
|
||||
QLineF,
|
||||
QFrame,
|
||||
|
|
|
|||
|
|
@ -1445,7 +1445,10 @@ async def display_symbol_data(
|
|||
# for pause/resume on mouse interaction
|
||||
rt_chart.feed = feed
|
||||
|
||||
async with trio.open_nursery() as ln:
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as ln,
|
||||
):
|
||||
# if available load volume related built-in display(s)
|
||||
vlm_charts: dict[
|
||||
str,
|
||||
|
|
|
|||
|
|
@ -22,7 +22,10 @@ from contextlib import asynccontextmanager as acm
|
|||
from typing import Callable
|
||||
|
||||
import trio
|
||||
from tractor.trionics import gather_contexts
|
||||
from tractor.trionics import (
|
||||
gather_contexts,
|
||||
collapse_eg,
|
||||
)
|
||||
|
||||
from piker.ui.qt import (
|
||||
QtCore,
|
||||
|
|
@ -207,7 +210,10 @@ async def open_signal_handler(
|
|||
async for args in recv:
|
||||
await async_handler(*args)
|
||||
|
||||
async with trio.open_nursery() as tn:
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as tn
|
||||
):
|
||||
tn.start_soon(proxy_to_handler)
|
||||
async with send:
|
||||
yield
|
||||
|
|
@ -242,6 +248,7 @@ async def open_handlers(
|
|||
widget: QWidget
|
||||
streams: list[trio.abc.ReceiveChannel]
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
gather_contexts([
|
||||
open_event_stream(
|
||||
|
|
|
|||
|
|
@ -18,10 +18,11 @@
|
|||
Feed status and controls widget(s) for embedding in a UI-pane.
|
||||
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
from textwrap import dedent
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import (
|
||||
Any,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
# from PyQt5.QtCore import Qt
|
||||
|
||||
|
|
@ -49,35 +50,55 @@ def mk_feed_label(
|
|||
a feed control protocol.
|
||||
|
||||
'''
|
||||
status = feed.status
|
||||
status: dict[str, Any] = feed.status
|
||||
assert status
|
||||
|
||||
msg = dedent("""
|
||||
actor: **{actor_name}**\n
|
||||
|_ @**{host}:{port}**\n
|
||||
""")
|
||||
# SO tips on ws/nls,
|
||||
# https://stackoverflow.com/a/15721400
|
||||
ws: str = ' '
|
||||
# nl: str = '<br>' # dun work?
|
||||
actor_info_repr: str = (
|
||||
f')> **{status["actor_short_id"]}**\n'
|
||||
'\n' # bc md?
|
||||
)
|
||||
|
||||
for key, val in status.items():
|
||||
if key in ('host', 'port', 'actor_name'):
|
||||
continue
|
||||
msg += f'\n|_ {key}: **{{{key}}}**\n'
|
||||
# fields to select *IN* for display
|
||||
# (see `.data.feed.open_feed()` status
|
||||
# update -> TAG_feed_status_update)
|
||||
for key in [
|
||||
'ipc',
|
||||
'hist_shm',
|
||||
'rt_shm',
|
||||
'throttle_hz',
|
||||
]:
|
||||
# NOTE, the 2nd key is filled via `.format()` updates.
|
||||
actor_info_repr += (
|
||||
f'\n' # bc md?
|
||||
f'{ws}|_{key}: **{{{key}}}**\n'
|
||||
)
|
||||
# ^TODO? formatting and content..
|
||||
# -[ ] showing which fqme is "forward" on the
|
||||
# chart/fsp/order-mode?
|
||||
# '|_ flows: **{symbols}**\n'
|
||||
#
|
||||
# -[x] why isn't the indent working?
|
||||
# => markdown, now solved..
|
||||
|
||||
feed_label = FormatLabel(
|
||||
fmt_str=msg,
|
||||
# |_ streams: **{symbols}**\n
|
||||
fmt_str=actor_info_repr,
|
||||
font=_font.font,
|
||||
font_size=_font_small.px_size,
|
||||
font_color='default_lightest',
|
||||
)
|
||||
|
||||
# ?TODO, remove this?
|
||||
# form.vbox.setAlignment(feed_label, Qt.AlignBottom)
|
||||
# form.vbox.setAlignment(Qt.AlignBottom)
|
||||
_ = chart.height() - (
|
||||
form.height() +
|
||||
form.fill_bar.height()
|
||||
# feed_label.height()
|
||||
)
|
||||
# _ = chart.height() - (
|
||||
# form.height() +
|
||||
# form.fill_bar.height()
|
||||
# # feed_label.height()
|
||||
# )
|
||||
|
||||
feed_label.format(**feed.status)
|
||||
|
||||
return feed_label
|
||||
|
|
|
|||
|
|
@ -600,6 +600,7 @@ async def open_fsp_admin(
|
|||
kwargs=kwargs,
|
||||
) as (cache_hit, cluster_map),
|
||||
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
if cache_hit:
|
||||
|
|
@ -613,6 +614,8 @@ async def open_fsp_admin(
|
|||
)
|
||||
try:
|
||||
yield admin
|
||||
|
||||
# ??TODO, does this *need* to be inside a finally?
|
||||
finally:
|
||||
# terminate all tasks via signals
|
||||
for key, entry in admin._registry.items():
|
||||
|
|
|
|||
|
|
@ -285,18 +285,20 @@ class FormatLabel(QLabel):
|
|||
font_size: int,
|
||||
font_color: str,
|
||||
|
||||
use_md: bool = True,
|
||||
|
||||
parent=None,
|
||||
|
||||
) -> None:
|
||||
|
||||
super().__init__(parent)
|
||||
|
||||
# by default set the format string verbatim and expect user to
|
||||
# call ``.format()`` later (presumably they'll notice the
|
||||
# by default set the format string verbatim and expect user
|
||||
# to call ``.format()`` later (presumably they'll notice the
|
||||
# unformatted content if ``fmt_str`` isn't meant to be
|
||||
# unformatted).
|
||||
self.fmt_str = fmt_str
|
||||
self.setText(fmt_str)
|
||||
# self.setText(fmt_str) # ?TODO, why here?
|
||||
|
||||
self.setStyleSheet(
|
||||
f"""QLabel {{
|
||||
|
|
@ -306,9 +308,10 @@ class FormatLabel(QLabel):
|
|||
"""
|
||||
)
|
||||
self.setFont(_font.font)
|
||||
self.setTextFormat(
|
||||
Qt.TextFormat.MarkdownText
|
||||
)
|
||||
if use_md:
|
||||
self.setTextFormat(
|
||||
Qt.TextFormat.MarkdownText
|
||||
)
|
||||
self.setMargin(0)
|
||||
|
||||
self.setSizePolicy(
|
||||
|
|
@ -316,7 +319,10 @@ class FormatLabel(QLabel):
|
|||
size_policy.Expanding,
|
||||
)
|
||||
self.setAlignment(
|
||||
Qt.AlignVCenter | Qt.AlignLeft
|
||||
Qt.AlignLeft
|
||||
|
|
||||
Qt.AlignBottom
|
||||
# Qt.AlignVCenter
|
||||
)
|
||||
self.setText(self.fmt_str)
|
||||
|
||||
|
|
|
|||
|
|
@ -15,7 +15,8 @@
|
|||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
"""
|
||||
qompleterz: embeddable search and complete using trio, Qt and rapidfuzz.
|
||||
qompleterz: embeddable search and complete using trio, Qt and
|
||||
rapidfuzz.
|
||||
|
||||
"""
|
||||
|
||||
|
|
@ -46,6 +47,7 @@ import time
|
|||
from pprint import pformat
|
||||
|
||||
from rapidfuzz import process as fuzzy
|
||||
import tractor
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
|
||||
|
|
@ -53,7 +55,7 @@ from piker.ui.qt import (
|
|||
size_policy,
|
||||
align_flag,
|
||||
Qt,
|
||||
QtCore,
|
||||
# QtCore,
|
||||
QtWidgets,
|
||||
QModelIndex,
|
||||
QItemSelectionModel,
|
||||
|
|
@ -920,7 +922,10 @@ async def fill_results(
|
|||
|
||||
# issue multi-provider fan-out search request and place
|
||||
# "searching.." statuses on outstanding results providers
|
||||
async with trio.open_nursery() as n:
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn
|
||||
):
|
||||
|
||||
for provider, (search, pause) in (
|
||||
_searcher_cache.copy().items()
|
||||
|
|
@ -944,7 +949,7 @@ async def fill_results(
|
|||
status_field='-> searchin..',
|
||||
)
|
||||
|
||||
await n.start(
|
||||
await tn.start(
|
||||
pack_matches,
|
||||
view,
|
||||
has_results,
|
||||
|
|
@ -1004,12 +1009,14 @@ async def handle_keyboard_input(
|
|||
view.set_font_size(searchbar.dpi_font.px_size)
|
||||
send, recv = trio.open_memory_channel(616)
|
||||
|
||||
async with trio.open_nursery() as n:
|
||||
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(), # needed?
|
||||
trio.open_nursery() as tn
|
||||
):
|
||||
# start a background multi-searcher task which receives
|
||||
# patterns relayed from this keyboard input handler and
|
||||
# async updates the completer view's results.
|
||||
n.start_soon(
|
||||
tn.start_soon(
|
||||
partial(
|
||||
fill_results,
|
||||
searchw,
|
||||
|
|
|
|||
|
|
@ -792,6 +792,7 @@ async def open_order_mode(
|
|||
brokerd_accounts,
|
||||
ems_dialog_msgs,
|
||||
),
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
|
||||
):
|
||||
|
|
|
|||
|
|
@ -15,6 +15,12 @@ from piker.service import (
|
|||
from piker.log import get_console_log
|
||||
|
||||
|
||||
# include `tractor`'s built-in fixtures!
|
||||
pytest_plugins: tuple[str] = (
|
||||
"tractor._testing.pytest",
|
||||
)
|
||||
|
||||
|
||||
def pytest_addoption(parser):
|
||||
parser.addoption("--ll", action="store", dest='loglevel',
|
||||
default=None, help="logging level to set when testing")
|
||||
|
|
|
|||
|
|
@ -142,7 +142,12 @@ async def test_concurrent_tokens_refresh(us_symbols, loglevel):
|
|||
# async with tractor.open_nursery() as n:
|
||||
# await n.run_in_actor('other', intermittently_refresh_tokens)
|
||||
|
||||
async with trio.open_nursery() as n:
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery(
|
||||
# strict_exception_groups=False,
|
||||
) as n
|
||||
):
|
||||
|
||||
quoter = await qt.stock_quoter(client, us_symbols)
|
||||
|
||||
|
|
@ -383,7 +388,9 @@ async def test_quote_streaming(tmx_symbols, loglevel, stream_what):
|
|||
else:
|
||||
symbols = [tmx_symbols]
|
||||
|
||||
async with trio.open_nursery() as n:
|
||||
async with trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as n:
|
||||
for syms, func in zip(symbols, stream_what):
|
||||
n.start_soon(func, feed, syms)
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue