Use `maybe_open_feed()` in ems and fsp daemons
parent
6f30ae448a
commit
3d4898c4d5
|
@ -38,7 +38,7 @@ log = get_logger(__name__)
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class OrderBook:
|
class OrderBook:
|
||||||
"""Buy-side (client-side ?) order book ctl and tracking.
|
'''EMS-client-side order book ctl and tracking.
|
||||||
|
|
||||||
A style similar to "model-view" is used here where this api is
|
A style similar to "model-view" is used here where this api is
|
||||||
provided as a supervised control for an EMS actor which does all the
|
provided as a supervised control for an EMS actor which does all the
|
||||||
|
@ -48,7 +48,7 @@ class OrderBook:
|
||||||
Currently, this is mostly for keeping local state to match the EMS
|
Currently, this is mostly for keeping local state to match the EMS
|
||||||
and use received events to trigger graphics updates.
|
and use received events to trigger graphics updates.
|
||||||
|
|
||||||
"""
|
'''
|
||||||
# mem channels used to relay order requests to the EMS daemon
|
# mem channels used to relay order requests to the EMS daemon
|
||||||
_to_ems: trio.abc.SendChannel
|
_to_ems: trio.abc.SendChannel
|
||||||
_from_order_book: trio.abc.ReceiveChannel
|
_from_order_book: trio.abc.ReceiveChannel
|
||||||
|
|
|
@ -32,9 +32,8 @@ import tractor
|
||||||
|
|
||||||
from ..log import get_logger
|
from ..log import get_logger
|
||||||
from ..data._normalize import iterticks
|
from ..data._normalize import iterticks
|
||||||
from ..data.feed import Feed, open_feed
|
from ..data.feed import Feed, maybe_open_feed
|
||||||
from .._daemon import maybe_spawn_brokerd
|
from .._daemon import maybe_spawn_brokerd
|
||||||
from .._cacheables import maybe_open_ctx
|
|
||||||
from . import _paper_engine as paper
|
from . import _paper_engine as paper
|
||||||
from ._messages import (
|
from ._messages import (
|
||||||
Status, Order,
|
Status, Order,
|
||||||
|
@ -959,15 +958,11 @@ async def _emsd_main(
|
||||||
|
|
||||||
# spawn one task per broker feed
|
# spawn one task per broker feed
|
||||||
async with (
|
async with (
|
||||||
maybe_open_ctx(
|
maybe_open_feed(
|
||||||
key=(broker, symbol),
|
broker,
|
||||||
mngr=open_feed(
|
[symbol],
|
||||||
broker,
|
|
||||||
[symbol],
|
|
||||||
loglevel=loglevel,
|
|
||||||
),
|
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
) as feed,
|
) as (feed, stream),
|
||||||
):
|
):
|
||||||
|
|
||||||
# XXX: this should be initial price quote from target provider
|
# XXX: this should be initial price quote from target provider
|
||||||
|
@ -1011,7 +1006,7 @@ async def _emsd_main(
|
||||||
|
|
||||||
brokerd_stream,
|
brokerd_stream,
|
||||||
ems_client_order_stream,
|
ems_client_order_stream,
|
||||||
feed.stream,
|
stream,
|
||||||
broker,
|
broker,
|
||||||
symbol,
|
symbol,
|
||||||
book
|
book
|
||||||
|
|
|
@ -441,7 +441,7 @@ async def open_feed(
|
||||||
tick_throttle: Optional[float] = None, # Hz
|
tick_throttle: Optional[float] = None, # Hz
|
||||||
shielded_stream: bool = False,
|
shielded_stream: bool = False,
|
||||||
|
|
||||||
) -> ReceiveChannel[dict[str, Any]]:
|
) -> Feed:
|
||||||
'''
|
'''
|
||||||
Open a "data feed" which provides streamed real-time quotes.
|
Open a "data feed" which provides streamed real-time quotes.
|
||||||
|
|
||||||
|
@ -522,7 +522,7 @@ async def open_feed(
|
||||||
feed._max_sample_rate = max(ohlc_sample_rates)
|
feed._max_sample_rate = max(ohlc_sample_rates)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
yield feed, bstream
|
yield feed
|
||||||
finally:
|
finally:
|
||||||
# drop the infinite stream connection
|
# drop the infinite stream connection
|
||||||
await ctx.cancel()
|
await ctx.cancel()
|
||||||
|
@ -538,7 +538,7 @@ async def maybe_open_feed(
|
||||||
tick_throttle: Optional[float] = None, # Hz
|
tick_throttle: Optional[float] = None, # Hz
|
||||||
shielded_stream: bool = False,
|
shielded_stream: bool = False,
|
||||||
|
|
||||||
) -> ReceiveChannel[dict[str, Any]]:
|
) -> (Feed, ReceiveChannel[dict[str, Any]]):
|
||||||
'''Maybe open a data to a ``brokerd`` daemon only if there is no
|
'''Maybe open a data to a ``brokerd`` daemon only if there is no
|
||||||
local one for the broker-symbol pair, if one is cached use it wrapped
|
local one for the broker-symbol pair, if one is cached use it wrapped
|
||||||
in a tractor broadcast receiver.
|
in a tractor broadcast receiver.
|
||||||
|
@ -553,12 +553,12 @@ async def maybe_open_feed(
|
||||||
[sym],
|
[sym],
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
),
|
),
|
||||||
) as (cache_hit, (feed, stream)):
|
) as (cache_hit, feed):
|
||||||
|
|
||||||
if cache_hit:
|
if cache_hit:
|
||||||
# add a new broadcast subscription for the quote stream
|
# add a new broadcast subscription for the quote stream
|
||||||
# if this feed is likely already in use
|
# if this feed is likely already in use
|
||||||
async with stream.subscribe() as bstream:
|
async with feed.stream.subscribe() as bstream:
|
||||||
yield feed, bstream
|
yield feed, bstream
|
||||||
else:
|
else:
|
||||||
yield feed, stream
|
yield feed, stream
|
||||||
|
|
|
@ -69,6 +69,7 @@ async def fsp_compute(
|
||||||
ctx: tractor.Context,
|
ctx: tractor.Context,
|
||||||
symbol: str,
|
symbol: str,
|
||||||
feed: Feed,
|
feed: Feed,
|
||||||
|
stream: trio.abc.ReceiveChannel,
|
||||||
|
|
||||||
src: ShmArray,
|
src: ShmArray,
|
||||||
dst: ShmArray,
|
dst: ShmArray,
|
||||||
|
@ -93,14 +94,14 @@ async def fsp_compute(
|
||||||
yield {}
|
yield {}
|
||||||
|
|
||||||
# task cancellation won't kill the channel
|
# task cancellation won't kill the channel
|
||||||
with stream.shield():
|
# since we shielded at the `open_feed()` call
|
||||||
async for quotes in stream:
|
async for quotes in stream:
|
||||||
for symbol, quotes in quotes.items():
|
for symbol, quotes in quotes.items():
|
||||||
if symbol == sym:
|
if symbol == sym:
|
||||||
yield quotes
|
yield quotes
|
||||||
|
|
||||||
out_stream = func(
|
out_stream = func(
|
||||||
filter_by_sym(symbol, feed.stream),
|
filter_by_sym(symbol, stream),
|
||||||
feed.shm,
|
feed.shm,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -164,7 +165,8 @@ async def cascade(
|
||||||
dst_shm_token: Tuple[str, np.dtype],
|
dst_shm_token: Tuple[str, np.dtype],
|
||||||
symbol: str,
|
symbol: str,
|
||||||
fsp_func_name: str,
|
fsp_func_name: str,
|
||||||
) -> AsyncIterator[dict]:
|
|
||||||
|
) -> None:
|
||||||
"""Chain streaming signal processors and deliver output to
|
"""Chain streaming signal processors and deliver output to
|
||||||
destination mem buf.
|
destination mem buf.
|
||||||
|
|
||||||
|
@ -175,7 +177,11 @@ async def cascade(
|
||||||
func: Callable = _fsps[fsp_func_name]
|
func: Callable = _fsps[fsp_func_name]
|
||||||
|
|
||||||
# open a data feed stream with requested broker
|
# open a data feed stream with requested broker
|
||||||
async with data.open_feed(brokername, [symbol]) as feed:
|
async with data.feed.maybe_open_feed(
|
||||||
|
brokername,
|
||||||
|
[symbol],
|
||||||
|
shielded_stream=True,
|
||||||
|
) as (feed, stream):
|
||||||
|
|
||||||
assert src.token == feed.shm.token
|
assert src.token == feed.shm.token
|
||||||
|
|
||||||
|
@ -186,6 +192,7 @@ async def cascade(
|
||||||
ctx=ctx,
|
ctx=ctx,
|
||||||
symbol=symbol,
|
symbol=symbol,
|
||||||
feed=feed,
|
feed=feed,
|
||||||
|
stream=stream,
|
||||||
|
|
||||||
src=src,
|
src=src,
|
||||||
dst=dst,
|
dst=dst,
|
||||||
|
|
Loading…
Reference in New Issue