Rework `_FeedsBus` subscriptions mgmt using `set`

Allows using `set` ops for subscription management and guarantees no
duplicates per `brokerd` actor. New API is simpler for dynamic
pause/resume changes per `Feed`:
- `_FeedsBus.add_subs()`, `.get_subs()`, `.remove_subs()` all accept multi-sub
  `set` inputs.
- `Feed.pause()` / `.resume()` encapsulates management of *only* sending
  a msg on each unique underlying IPC msg stream.

Use new api in sampler task.
epoch_index_backup
Tyler Goodlet 2022-11-16 13:32:26 -05:00
parent 42e934b912
commit 5e6ebca1e0
1 changed files with 1 additions and 1 deletions

View File

@ -1106,7 +1106,7 @@ async def allocate_persistent_feed(
# a small streaming machine around the remote feed which can then
# do the normal work of sampling and writing shm buffers
# (depending on if we want sampling done on the far end or not?)
msg = init_msg[symstr.lower()]
msg = init_msg[symstr]
# the broker-specific fully qualified symbol name,
# but ensure it is lower-cased for external use.