Add pause/resume feed api, delegate to msg stream for broadcast api

pause_feeds_on_sym_switch
Tyler Goodlet 2021-08-15 18:14:09 -04:00
parent 2f5abaa47a
commit 1e42f58478
1 changed files with 20 additions and 13 deletions

View File

@ -34,7 +34,7 @@ import trio
from trio.abc import ReceiveChannel from trio.abc import ReceiveChannel
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from tractor import _broadcast # from tractor import _broadcast
from pydantic import BaseModel from pydantic import BaseModel
from ..brokers import get_brokermod from ..brokers import get_brokermod
@ -329,16 +329,21 @@ async def attach_feed_bus(
subs.append(sub) subs.append(sub)
try: try:
uid = ctx.chan.uid
fqsn = f'{symbol}.{brokername}'
async for msg in stream: async for msg in stream:
if msg == 'pause': if msg == 'pause':
if sub in subs:
log.info( log.info(
f'Pausing {symbol}.{brokername} feed for {ctx.chan.uid}') f'Pausing {fqsn} feed for {uid}')
subs.remove(sub) subs.remove(sub)
elif msg == 'resume': elif msg == 'resume':
if sub not in subs:
log.info( log.info(
f'Resuming {symbol}.{brokername} feed for {ctx.chan.uid}') f'Resuming {fqsn} feed for {uid}')
subs.append(sub) subs.append(sub)
else: else:
raise ValueError(msg) raise ValueError(msg)
@ -401,6 +406,12 @@ class Feed:
else: else:
yield self._index_stream yield self._index_stream
async def pause(self) -> None:
await self.stream.send('pause')
async def resume(self) -> None:
await self.stream.send('resume')
def sym_to_shm_key( def sym_to_shm_key(
broker: str, broker: str,
@ -493,16 +504,12 @@ async def open_feed(
readonly=True, readonly=True,
) )
bstream = _broadcast.broadcast_receiver(
stream,
2**10,
)
feed = Feed( feed = Feed(
name=brokername, name=brokername,
shm=shm, shm=shm,
mod=mod, mod=mod,
first_quote=first_quote, first_quote=first_quote,
stream=bstream, #brx_stream, stream=stream,
_brokerd_portal=portal, _brokerd_portal=portal,
) )
ohlc_sample_rates = [] ohlc_sample_rates = []
@ -573,4 +580,4 @@ async def maybe_open_feed(
async with feed.stream.subscribe() as bstream: async with feed.stream.subscribe() as bstream:
yield feed, bstream yield feed, bstream
else: else:
yield feed, stream yield feed, feed.stream