Support injecting a `info: dict` to `Sampler.broadcast_all()` calls
parent
f1252983e4
commit
7a5c43d01a
|
@ -230,6 +230,7 @@ class Sampler:
|
||||||
self,
|
self,
|
||||||
period_s: float,
|
period_s: float,
|
||||||
time_stamp: float | None = None,
|
time_stamp: float | None = None,
|
||||||
|
info: dict | None = None,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -258,10 +259,14 @@ class Sampler:
|
||||||
try:
|
try:
|
||||||
for stream in (subs - sent):
|
for stream in (subs - sent):
|
||||||
try:
|
try:
|
||||||
await stream.send({
|
msg = {
|
||||||
'index': time_stamp or last_ts,
|
'index': time_stamp or last_ts,
|
||||||
'period': period_s,
|
'period': period_s,
|
||||||
})
|
}
|
||||||
|
if info:
|
||||||
|
msg.update(info)
|
||||||
|
|
||||||
|
await stream.send(msg)
|
||||||
sent.add(stream)
|
sent.add(stream)
|
||||||
|
|
||||||
except (
|
except (
|
||||||
|
@ -287,9 +292,15 @@ class Sampler:
|
||||||
)
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def broadcast_all(self) -> None:
|
async def broadcast_all(
|
||||||
|
self,
|
||||||
|
info: dict | None = None,
|
||||||
|
) -> None:
|
||||||
for period_s in self.subscribers:
|
for period_s in self.subscribers:
|
||||||
await self.broadcast(period_s)
|
await self.broadcast(
|
||||||
|
period_s,
|
||||||
|
info=info,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
@ -359,8 +370,10 @@ async def register_with_sampler(
|
||||||
|
|
||||||
# except broadcast requests from the subscriber
|
# except broadcast requests from the subscriber
|
||||||
async for msg in stream:
|
async for msg in stream:
|
||||||
if msg == 'broadcast_all':
|
if 'broadcast_all' in msg:
|
||||||
await Sampler.broadcast_all()
|
await Sampler.broadcast_all(
|
||||||
|
info=msg['broadcast_all'],
|
||||||
|
)
|
||||||
finally:
|
finally:
|
||||||
if (
|
if (
|
||||||
sub_for_broadcasts
|
sub_for_broadcasts
|
||||||
|
@ -468,6 +481,8 @@ async def open_sample_stream(
|
||||||
cache_key: str | None = None,
|
cache_key: str | None = None,
|
||||||
allow_new_sampler: bool = True,
|
allow_new_sampler: bool = True,
|
||||||
|
|
||||||
|
ensure_is_active: bool = False,
|
||||||
|
|
||||||
) -> AsyncIterator[dict[str, float]]:
|
) -> AsyncIterator[dict[str, float]]:
|
||||||
'''
|
'''
|
||||||
Subscribe to OHLC sampling "step" events: when the time aggregation
|
Subscribe to OHLC sampling "step" events: when the time aggregation
|
||||||
|
@ -510,12 +525,18 @@ async def open_sample_stream(
|
||||||
},
|
},
|
||||||
) as (ctx, first)
|
) as (ctx, first)
|
||||||
):
|
):
|
||||||
|
if ensure_is_active:
|
||||||
assert len(first) > 1
|
assert len(first) > 1
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
ctx.open_stream() as istream,
|
ctx.open_stream() as istream,
|
||||||
|
|
||||||
# TODO: we don't need this task-bcasting right?
|
# TODO: we DO need this task-bcasting so that
|
||||||
# istream.subscribe() as istream,
|
# for eg. the history chart update loop eventually
|
||||||
|
# receceives all backfilling event msgs such that
|
||||||
|
# the underlying graphics format arrays are
|
||||||
|
# re-allocated until all history is loaded!
|
||||||
|
istream.subscribe() as istream,
|
||||||
):
|
):
|
||||||
yield istream
|
yield istream
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue