Use `maybe_open_feed()` in ems and fsp daemons

pause_feeds_on_sym_switch
Tyler Goodlet 2021-08-10 16:50:40 -04:00
parent a7d3afc9b1
commit 7d0f47364c
4 changed files with 28 additions and 26 deletions

View File

@ -38,7 +38,7 @@ log = get_logger(__name__)
@dataclass
class OrderBook:
"""Buy-side (client-side ?) order book ctl and tracking.
'''EMS-client-side order book ctl and tracking.
A style similar to "model-view" is used here where this api is
provided as a supervised control for an EMS actor which does all the
@ -48,7 +48,7 @@ class OrderBook:
Currently, this is mostly for keeping local state to match the EMS
and use received events to trigger graphics updates.
"""
'''
# mem channels used to relay order requests to the EMS daemon
_to_ems: trio.abc.SendChannel
_from_order_book: trio.abc.ReceiveChannel

View File

@ -32,9 +32,8 @@ import tractor
from ..log import get_logger
from ..data._normalize import iterticks
from ..data.feed import Feed, open_feed
from ..data.feed import Feed, maybe_open_feed
from .._daemon import maybe_spawn_brokerd
from .._cacheables import maybe_open_ctx
from . import _paper_engine as paper
from ._messages import (
Status, Order,
@ -959,15 +958,11 @@ async def _emsd_main(
# spawn one task per broker feed
async with (
maybe_open_ctx(
key=(broker, symbol),
mngr=open_feed(
maybe_open_feed(
broker,
[symbol],
loglevel=loglevel,
),
loglevel=loglevel,
) as feed,
) as (feed, stream),
):
# XXX: this should be initial price quote from target provider
@ -1011,7 +1006,7 @@ async def _emsd_main(
brokerd_stream,
ems_client_order_stream,
feed.stream,
stream,
broker,
symbol,
book

View File

@ -441,7 +441,7 @@ async def open_feed(
tick_throttle: Optional[float] = None, # Hz
shielded_stream: bool = False,
) -> ReceiveChannel[dict[str, Any]]:
) -> Feed:
'''
Open a "data feed" which provides streamed real-time quotes.
@ -522,7 +522,7 @@ async def open_feed(
feed._max_sample_rate = max(ohlc_sample_rates)
try:
yield feed, bstream
yield feed
finally:
# drop the infinite stream connection
await ctx.cancel()
@ -538,7 +538,7 @@ async def maybe_open_feed(
tick_throttle: Optional[float] = None, # Hz
shielded_stream: bool = False,
) -> ReceiveChannel[dict[str, Any]]:
) -> (Feed, ReceiveChannel[dict[str, Any]]):
'''Maybe open a data to a ``brokerd`` daemon only if there is no
local one for the broker-symbol pair, if one is cached use it wrapped
in a tractor broadcast receiver.
@ -553,12 +553,12 @@ async def maybe_open_feed(
[sym],
loglevel=loglevel,
),
) as (cache_hit, (feed, stream)):
) as (cache_hit, feed):
if cache_hit:
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use
async with stream.subscribe() as bstream:
async with feed.stream.subscribe() as bstream:
yield feed, bstream
else:
yield feed, stream

View File

@ -69,6 +69,7 @@ async def fsp_compute(
ctx: tractor.Context,
symbol: str,
feed: Feed,
stream: trio.abc.ReceiveChannel,
src: ShmArray,
dst: ShmArray,
@ -93,14 +94,14 @@ async def fsp_compute(
yield {}
# task cancellation won't kill the channel
with stream.shield():
# since we shielded at the `open_feed()` call
async for quotes in stream:
for symbol, quotes in quotes.items():
if symbol == sym:
yield quotes
out_stream = func(
filter_by_sym(symbol, feed.stream),
filter_by_sym(symbol, stream),
feed.shm,
)
@ -164,7 +165,8 @@ async def cascade(
dst_shm_token: Tuple[str, np.dtype],
symbol: str,
fsp_func_name: str,
) -> AsyncIterator[dict]:
) -> None:
"""Chain streaming signal processors and deliver output to
destination mem buf.
@ -175,7 +177,11 @@ async def cascade(
func: Callable = _fsps[fsp_func_name]
# open a data feed stream with requested broker
async with data.open_feed(brokername, [symbol]) as feed:
async with data.feed.maybe_open_feed(
brokername,
[symbol],
shielded_stream=True,
) as (feed, stream):
assert src.token == feed.shm.token
@ -186,6 +192,7 @@ async def cascade(
ctx=ctx,
symbol=symbol,
feed=feed,
stream=stream,
src=src,
dst=dst,