Compare commits

..

12 Commits

Author SHA1 Message Date
Tyler Goodlet b9e904666b Support `tractor.pause_from_sync()` in `brokerd`s
By passing down the `tractor.hilevel.ServiceMngr.debug_mode: bool`
(normally proxied in from the `--pdb` CLI flag) to `spawn_brokerd()` and
adjusting the `_setup_persistent_brokerd()` endpoint to do the
`tractor.devx._debug.maybe_init_greenback()` if needed.

Also in the `broker_init()` factory merge all `tractor` related `kwargs`
(i.e. `start_actor_kwargs | datad_kwargs | spawn_kws`) into the 2nd
element returned as to be passed to `ActorNursery.start_actor()`. Start
re-naming some internal vars/fields as `datad` as well.
2025-02-21 16:36:36 -05:00
Tyler Goodlet 811e857c1c Type adjust to `tractor.hilevel.ServicecMngr` 2025-02-21 16:36:36 -05:00
Tyler Goodlet e3098f56c7 Official service-mngr to `tractor.hilevel` move
Such that we maintain that subsys in the actor-runtime repo (with
hopefully an extensive test suite XD).

Port deats,
- rewrite `open_service_mngr()` as a thin wrapper that delegates into
  the new `tractor.hilevel.open_service_mngr()` but with maintenance of
  the `Services` class-singleton for now.
- port `.service._daemon` usage to the new
  `ServiceMngr.start_service_ctx()` a rename from
  `.start_service_task()` which is now likely destined for the soon
  supported `tractor.trionics.TaskMngr` nursery extension.
- ref the new `ServiceMngr.an: ActorNursery` instance var name.

Other,
- always enable the `tractor.pause_from_sync()` support via `greenback`
  whenever `debug_mode` is set at `pikerd` init.
2025-02-21 16:36:36 -05:00
Nelson Torres 8c908e9ed0 Updated tractor method name. 2025-02-21 16:36:36 -05:00
Tyler Goodlet 9a933d82a2 More service-mngr clarity notes
Nothing changing functionally here just adding more `tractor`
operational notes, tips for debug tooling and typing fixes B)

Of particular note is adding further details about the reason we do not
need to call `Context.cancel()` inside the `finally:` block of
`.open_context_in_task()` thanks to `tractor`'s new and improved
inter-actor cancellation semantics Bo
2025-02-21 16:36:36 -05:00
Tyler Goodlet 5b686edae5 Drop `.cancel_actor()` from `maybe_spawn_daemon()`
Since `tractor`'s new and improved inter-actor cancellation semantics
are much more pedantic, AND bc we use the `ServiceMngr` for spawning
service actors on-demand, the caller of `maybe_spawn_daemon()` should
NEVER conduct a so called "out of band" `Actor`-runtime cancel request
since this is precisely the job of our `ServiceMngr` XD

Add a super in depth note explaining the underlying issue and adding
a todo list of how we should prolly augment `tractor` to make such cases
easier to grok and fix in the future!
2025-02-21 16:36:35 -05:00
Tyler Goodlet 1218af883e Prep service mngr for move to `tractor.hilevel`
Given it's a fairly simple yet useful abstraction, it makes sense to
offer this sub-sys alongside the core `tractor` runtime lib.

Without going into extreme detail on the impl changes (it'll come in
the commit that moves to the other repo) here is the high level summary:
------ - ------
- rename `Services` -> `ServiceMngr` and use an factory `@acm`
  to guarantee a single-instance-per-actor using a niche approach for a
  singleton object using a default keyword-arg B)
  - the mod level `open_service_mngr()` and `get_service_mngr()` are the
    new allocation/access API.
- add a `ServiceMngr.start_service()` method which does the work of both
  spawning a new subactor (for the daemon) and uses its portal to start
  the mngr side supervision task.
- open actor/task nurseries inside the `@acm` allocator.

Adjust other dependent subsystems to match:
------ - ------
- use `open_service_mngr()` when first allocated in `open_pikerd()`.
- use `get_service_mngr()` instead of importing the class ref inside
  `.service.maybe_spawn_daemon()`, `.brokers._daemon.spawn_brokerd()`
  and `.data._sampling.spawn_samplerd()` using a `partial` to pack in
  the endpoint ctx kwargs (unpacked inside `.start_service()` XD).
2025-02-21 16:36:35 -05:00
Tyler Goodlet 79a4c433dd Enable `greenback` for `.pause_from_sync()` by default? 2025-02-21 16:36:35 -05:00
Tyler Goodlet 683ad0ffb4 Delegate to `tractor.msg.pretty_struct` since it was factored from here! 2025-02-21 16:36:35 -05:00
Tyler Goodlet d3f047663f Catch using `Sampler.bcast_errors` where possible
In all other possible IPC disconnect handling blocks. Also more
comprehensive typing throughout `uniform_rate_send()`.
2025-02-21 16:36:35 -05:00
Tyler Goodlet f21c44dd83 Group bcast errors as `Sampler.bcast_errors`
A new class var `tuple[Exception]` such that the err set can be reffed
externally as needed for catching other similar pub-sub/IPC failures in
other (related) real-time sub-systems.

Also added some now-masked logging for debugging live-feed stream reading
issues that should ONLY be used for debugging since they'll greatly
degrade HFT perf. Used the new `log.mk_repr()` stuff (that one day we
should prolly pull from `modden` as a dep) for pretty console emissions.
2025-02-21 16:36:35 -05:00
Tyler Goodlet e8c0485d99 Suppress `trio.EndOfChannel`s raised by remote peer
Since now `tractor` will raise this native `trio`-exc translated from
a `Stop` msg when the peer gracefully terminates a `tractor.MsgStream`.
Just `info()` log in such cases versus continuing to warn for the
others.
2025-02-21 16:36:35 -05:00
5 changed files with 8 additions and 24 deletions

View File

@ -168,6 +168,7 @@ class OrderClient(Struct):
async def relay_orders_from_sync_code(
client: OrderClient,
symbol_key: str,
to_ems_stream: tractor.MsgStream,
@ -241,11 +242,6 @@ async def open_ems(
async with maybe_open_emsd(
broker,
# XXX NOTE, LOL so this determines the daemon `emsd` loglevel
# then FYI.. that's kinda wrong no?
# -[ ] shouldn't it be set by `pikerd -l` or no?
# -[ ] would make a lot more sense to have a subsys ctl for
# levels.. like `-l emsd.info` or something?
loglevel=loglevel,
) as portal:

View File

@ -653,11 +653,7 @@ class Router(Struct):
flume = feed.flumes[fqme]
first_quote: dict = flume.first_quote
book: DarkBook = self.get_dark_book(broker)
if not (last := first_quote.get('last')):
last: float = flume.rt_shm.array[-1]['close']
book.lasts[fqme]: float = float(last)
book.lasts[fqme]: float = float(first_quote['last'])
async with self.maybe_open_brokerd_dialog(
brokermod=brokermod,
@ -720,7 +716,7 @@ class Router(Struct):
subs = self.subscribers[sub_key]
sent_some: bool = False
for client_stream in subs.copy():
for client_stream in subs:
try:
await client_stream.send(msg)
sent_some = True
@ -1014,14 +1010,10 @@ async def translate_and_relay_brokerd_events(
status_msg.brokerd_msg = msg
status_msg.src = msg.broker_details['name']
if not status_msg.req:
# likely some order change state?
await tractor.pause()
else:
await router.client_broadcast(
status_msg.req.symbol,
status_msg,
)
await router.client_broadcast(
status_msg.req.symbol,
status_msg,
)
if status == 'closed':
log.info(f'Execution for {oid} is complete!')

View File

@ -297,8 +297,6 @@ class PaperBoi(Struct):
# transmit pp msg to ems
pp: Position = self.acnt.pps[bs_mktid]
# TODO, this will break if `require_only=True` was passed to
# `.update_from_ledger()`
pp_msg = BrokerdPosition(
broker=self.broker,

View File

@ -30,7 +30,6 @@ subsys: str = 'piker.clearing'
log = get_logger(subsys)
# TODO, oof doesn't this ignore the `loglevel` then???
get_console_log = partial(
get_console_log,
name=subsys,

View File

@ -130,5 +130,4 @@ pyqtgraph = { git = "https://github.com/pikers/pyqtgraph.git" }
asyncvnc = { git = "https://github.com/pikers/asyncvnc.git", branch = "main" }
tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" }
msgspec = { git = "https://github.com/jcrist/msgspec.git" }
tractor = { git = "https://pikers.dev/goodboy/tractor", branch = "piker_pin" }
# tractor = { path = "../tractor", editable = true }
tractor = { path = "../tractor", editable = true }