Merge pull request #212 from pikers/feed_caching

Feed caching
pause_feeds_on_sym_switch
goodboy 2021-09-01 10:25:49 -04:00 committed by GitHub
commit 37d94fbb28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 376 additions and 258 deletions

View File

@ -1,66 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Async utils no one seems to have built into a core lib (yet).
"""
from typing import AsyncContextManager
from collections import OrderedDict
from contextlib import asynccontextmanager
def async_lifo_cache(maxsize=128):
"""Async ``cache`` with a LIFO policy.
Implemented my own since no one else seems to have
a standard. I'll wait for the smarter people to come
up with one, but until then...
"""
cache = OrderedDict()
def decorator(fn):
async def wrapper(*args):
key = args
try:
return cache[key]
except KeyError:
if len(cache) >= maxsize:
# discard last added new entry
cache.popitem()
# do it
cache[key] = await fn(*args)
return cache[key]
return wrapper
return decorator
@asynccontextmanager
async def _just_none():
# noop -> skip entering context
yield None
@asynccontextmanager
async def maybe_with_if(
predicate: bool,
context: AsyncContextManager,
) -> AsyncContextManager:
async with context if predicate else _just_none() as output:
yield output

View File

@ -0,0 +1,195 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Cacheing apis and toolz.
"""
# further examples of interest:
# https://gist.github.com/njsmith/cf6fc0a97f53865f2c671659c88c1798#file-cache-py-L8
from collections import OrderedDict
from typing import (
Any,
Hashable,
Optional,
TypeVar,
AsyncContextManager,
)
from contextlib import (
asynccontextmanager,
)
import trio
from trio_typing import TaskStatus
import tractor
from .brokers import get_brokermod
from .log import get_logger
T = TypeVar('T')
log = get_logger(__name__)
def async_lifo_cache(maxsize=128):
"""Async ``cache`` with a LIFO policy.
Implemented my own since no one else seems to have
a standard. I'll wait for the smarter people to come
up with one, but until then...
"""
cache = OrderedDict()
def decorator(fn):
async def wrapper(*args):
key = args
try:
return cache[key]
except KeyError:
if len(cache) >= maxsize:
# discard last added new entry
cache.popitem()
# do it
cache[key] = await fn(*args)
return cache[key]
return wrapper
return decorator
_cache: dict[str, 'Client'] = {} # noqa
class cache:
'''Globally (processs wide) cached, task access to a
kept-alive-while-in-use async resource.
'''
lock = trio.Lock()
users: int = 0
values: dict[Any, Any] = {}
resources: dict[
int,
Optional[tuple[trio.Nursery, trio.Event]]
] = {}
no_more_users: Optional[trio.Event] = None
@classmethod
async def run_ctx(
cls,
mng,
key,
task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED,
) -> None:
async with mng as value:
_, no_more_users = cls.resources[id(mng)]
cls.values[key] = value
task_status.started(value)
try:
await no_more_users.wait()
finally:
value = cls.values.pop(key)
# discard nursery ref so it won't be re-used (an error)
cls.resources.pop(id(mng))
@asynccontextmanager
async def maybe_open_ctx(
key: Hashable,
mngr: AsyncContextManager[T],
) -> (bool, T):
'''Maybe open a context manager if there is not already a cached
version for the provided ``key``. Return the cached instance on
a cache hit.
'''
await cache.lock.acquire()
ctx_key = id(mngr)
value = None
try:
# lock feed acquisition around task racing / ``trio``'s
# scheduler protocol
value = cache.values[key]
log.info(f'Reusing cached resource for {key}')
cache.users += 1
cache.lock.release()
yield True, value
except KeyError:
log.info(f'Allocating new feed for {key}')
# **critical section** that should prevent other tasks from
# checking the cache until complete otherwise the scheduler
# may switch and by accident we create more then one feed.
# TODO: avoid pulling from ``tractor`` internals and
# instead offer a "root nursery" in piker actors?
service_n = tractor.current_actor()._service_n
# TODO: does this need to be a tractor "root nursery"?
ln = cache.resources.get(ctx_key)
assert not ln
ln, _ = cache.resources[ctx_key] = (service_n, trio.Event())
value = await ln.start(cache.run_ctx, mngr, key)
cache.users += 1
cache.lock.release()
yield False, value
finally:
cache.users -= 1
if cache.lock.locked():
cache.lock.release()
if value is not None:
# if no more consumers, teardown the client
if cache.users <= 0:
log.warning(f'De-allocating resource for {key}')
# terminate mngr nursery
_, no_more_users = cache.resources[ctx_key]
no_more_users.set()
@asynccontextmanager
async def open_cached_client(
brokername: str,
) -> 'Client': # noqa
'''Get a cached broker client from the current actor's local vars.
If one has not been setup do it and cache it.
'''
brokermod = get_brokermod(brokername)
async with maybe_open_ctx(
key=brokername,
mngr=brokermod.get_client(),
) as (cache_hit, client):
yield client

View File

@ -1,85 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Actor-aware broker agnostic interface.
"""
from typing import Dict
from contextlib import asynccontextmanager, AsyncExitStack
import trio
from . import get_brokermod
from ..log import get_logger
log = get_logger(__name__)
_cache: Dict[str, 'Client'] = {} # noqa
@asynccontextmanager
async def open_cached_client(
brokername: str,
*args,
**kwargs,
) -> 'Client': # noqa
"""Get a cached broker client from the current actor's local vars.
If one has not been setup do it and cache it.
"""
global _cache
clients = _cache.setdefault('clients', {'_lock': trio.Lock()})
# global cache task lock
lock = clients['_lock']
client = None
try:
log.info(f"Loading existing `{brokername}` client")
async with lock:
client = clients[brokername]
client._consumers += 1
yield client
except KeyError:
log.info(f"Creating new client for broker {brokername}")
async with lock:
brokermod = get_brokermod(brokername)
exit_stack = AsyncExitStack()
client = await exit_stack.enter_async_context(
brokermod.get_client()
)
client._consumers = 0
client._exit_stack = exit_stack
clients[brokername] = client
yield client
finally:
if client is not None:
# if no more consumers, teardown the client
client._consumers -= 1
if client._consumers <= 0:
await client._exit_stack.aclose()

View File

@ -33,7 +33,7 @@ from pydantic.dataclasses import dataclass
from pydantic import BaseModel
import wsproto
from .api import open_cached_client
from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound
from ..log import get_logger, get_console_log
from ..data import ShmArray

View File

@ -29,7 +29,7 @@ import trio
from ..log import get_logger
from . import get_brokermod
from .._daemon import maybe_spawn_brokerd
from .api import open_cached_client
from .._cacheables import open_cached_client
log = get_logger(__name__)

View File

@ -14,9 +14,14 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Real-time data feed machinery
"""
'''
NB: this is the old original implementation that was used way way back
when the project started with ``kivy``.
This code is left for reference but will likely be merged in
appropriately and removed.
'''
import time
from functools import partial
from dataclasses import dataclass, field

View File

@ -1358,6 +1358,9 @@ async def trades_dialogue(
# start order request handler **before** local trades event loop
n.start_soon(handle_order_requests, ems_stream)
# TODO: for some reason we can receive a ``None`` here when the
# ib-gw goes down? Not sure exactly how that's happening looking
# at the eventkit code above but we should probably handle it...
async for event_name, item in ib_trade_events_stream:
# XXX: begin normalization of nonsense ib_insync internal
@ -1469,9 +1472,8 @@ async def trades_dialogue(
@tractor.context
async def open_symbol_search(
ctx: tractor.Context,
) -> None:
# async with open_cached_client('ib') as client:
) -> None:
# load all symbols locally for fast search
await ctx.started({})

View File

@ -34,7 +34,7 @@ from pydantic.dataclasses import dataclass
from pydantic import BaseModel
import wsproto
from .api import open_cached_client
from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound, BrokerError
from ..log import get_logger, get_console_log
from ..data import ShmArray

View File

@ -42,12 +42,11 @@ import wrapt
import asks
from ..calc import humanize, percent_change
from .._cacheables import open_cached_client, async_lifo_cache
from . import config
from ._util import resproc, BrokerError, SymbolNotFound
from ..log import get_logger, colorize_json, get_console_log
from .._async_utils import async_lifo_cache
from . import get_brokermod
from . import api
log = get_logger(__name__)
@ -1197,7 +1196,7 @@ async def stream_quotes(
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel)
async with api.open_cached_client('questrade') as client:
async with open_cached_client('questrade') as client:
if feed_type == 'stock':
formatter = format_stock_quote
get_quotes = await stock_quoter(client, symbols)

View File

@ -25,6 +25,7 @@ from dataclasses import dataclass, field
import trio
import tractor
from tractor._broadcast import broadcast_receiver
from ..data._source import Symbol
from ..log import get_logger
@ -38,7 +39,7 @@ log = get_logger(__name__)
@dataclass
class OrderBook:
"""Buy-side (client-side ?) order book ctl and tracking.
'''EMS-client-side order book ctl and tracking.
A style similar to "model-view" is used here where this api is
provided as a supervised control for an EMS actor which does all the
@ -48,7 +49,7 @@ class OrderBook:
Currently, this is mostly for keeping local state to match the EMS
and use received events to trigger graphics updates.
"""
'''
# mem channels used to relay order requests to the EMS daemon
_to_ems: trio.abc.SendChannel
_from_order_book: trio.abc.ReceiveChannel
@ -123,10 +124,15 @@ def get_orders(
global _orders
if _orders is None:
size = 100
tx, rx = trio.open_memory_channel(size)
brx = broadcast_receiver(rx, size)
# setup local ui event streaming channels for request/resp
# streamging with EMS daemon
_orders = OrderBook(
*trio.open_memory_channel(100),
_to_ems=tx,
_from_order_book=brx,
)
return _orders
@ -157,23 +163,12 @@ async def relay_order_cmds_from_sync_code(
"""
book = get_orders()
orders_stream = book._from_order_book
async for cmd in orders_stream:
print(cmd)
if cmd['symbol'] == symbol_key:
# send msg over IPC / wire
log.info(f'Send order cmd:\n{pformat(cmd)}')
await to_ems_stream.send(cmd)
else:
# XXX BRUTAL HACKZORZES !!!
# re-insert for another consumer
# we need broadcast channelz...asap
# https://github.com/goodboy/tractor/issues/204
book._to_ems.send_nowait(cmd)
async with book._from_order_book.subscribe() as orders_stream:
async for cmd in orders_stream:
if cmd['symbol'] == symbol_key:
log.info(f'Send order cmd:\n{pformat(cmd)}')
# send msg over IPC / wire
await to_ems_stream.send(cmd)
@asynccontextmanager

View File

@ -22,7 +22,7 @@ from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from pprint import pformat
import time
from typing import AsyncIterator, Callable, Any
from typing import AsyncIterator, Callable
from bidict import bidict
from pydantic import BaseModel
@ -30,10 +30,9 @@ import trio
from trio_typing import TaskStatus
import tractor
from .. import data
from ..log import get_logger
from ..data._normalize import iterticks
from ..data.feed import Feed
from ..data.feed import Feed, maybe_open_feed
from .._daemon import maybe_spawn_brokerd
from . import _paper_engine as paper
from ._messages import (
@ -123,7 +122,7 @@ class _DarkBook:
# XXX: this is in place to prevent accidental positions that are too
# big. Now obviously this won't make sense for crypto like BTC, but
# for most traditional brokers it should be fine unless you start
# slinging NQ futes or something.
# slinging NQ futes or something; check ur margin.
_DEFAULT_SIZE: float = 1.0
@ -132,7 +131,6 @@ async def clear_dark_triggers(
brokerd_orders_stream: tractor.MsgStream,
ems_client_order_stream: tractor.MsgStream,
quote_stream: tractor.ReceiveMsgStream, # noqa
broker: str,
symbol: str,
@ -266,7 +264,7 @@ class TradesRelay:
consumers: int = 0
class _Router(BaseModel):
class Router(BaseModel):
'''Order router which manages and tracks per-broker dark book,
alerts, clearing and related data feed management.
@ -276,8 +274,6 @@ class _Router(BaseModel):
# setup at actor spawn time
nursery: trio.Nursery
feeds: dict[tuple[str, str], Any] = {}
# broker to book map
books: dict[str, _DarkBook] = {}
@ -343,12 +339,12 @@ class _Router(BaseModel):
relay.consumers -= 1
_router: _Router = None
_router: Router = None
async def open_brokerd_trades_dialogue(
router: _Router,
router: Router,
feed: Feed,
symbol: str,
_exec_mode: str,
@ -466,7 +462,7 @@ async def _setup_persistent_emsd(
# open a root "service nursery" for the ``emsd`` actor
async with trio.open_nursery() as service_nursery:
_router = _Router(nursery=service_nursery)
_router = Router(nursery=service_nursery)
# TODO: send back the full set of persistent
# orders/execs?
@ -480,7 +476,7 @@ async def translate_and_relay_brokerd_events(
broker: str,
brokerd_trades_stream: tractor.MsgStream,
router: _Router,
router: Router,
) -> AsyncIterator[dict]:
'''Trades update loop - receive updates from ``brokerd`` trades
@ -704,7 +700,7 @@ async def process_client_order_cmds(
symbol: str,
feed: Feed, # noqa
dark_book: _DarkBook,
router: _Router,
router: Router,
) -> None:
@ -958,32 +954,25 @@ async def _emsd_main(
# tractor.Context instead of strictly requiring a ctx arg.
ems_ctx = ctx
cached_feed = _router.feeds.get((broker, symbol))
if cached_feed:
# TODO: use cached feeds per calling-actor
log.warning(f'Opening duplicate feed for {(broker, symbol)}')
feed: Feed
# spawn one task per broker feed
async with (
# TODO: eventually support N-brokers
data.open_feed(
maybe_open_feed(
broker,
[symbol],
loglevel=loglevel,
) as feed,
) as (feed, stream),
):
if not cached_feed:
_router.feeds[(broker, symbol)] = feed
# XXX: this should be initial price quote from target provider
first_quote = feed.first_quote
# open a stream with the brokerd backend for order
# flow dialogue
book = _router.get_dark_book(broker)
book.lasts[(broker, symbol)] = first_quote[symbol]['last']
# open a stream with the brokerd backend for order
# flow dialogue
async with (
# only open if one isn't already up: we try to keep
@ -1015,11 +1004,9 @@ async def _emsd_main(
n.start_soon(
clear_dark_triggers,
# relay.brokerd_dialogue,
brokerd_stream,
ems_client_order_stream,
feed.stream,
stream,
broker,
symbol,
book

View File

@ -118,8 +118,9 @@ async def increment_ohlc_buffer(
shm.push(last)
# broadcast the buffer index step
# yield {'index': shm._last.value}
for ctx in _subscribers.get(delay_s, ()):
subs = _subscribers.get(delay_s, ())
for ctx in subs:
try:
await ctx.send_yield({'index': shm._last.value})
except (
@ -127,6 +128,7 @@ async def increment_ohlc_buffer(
trio.ClosedResourceError
):
log.error(f'{ctx.chan.uid} dropped connection')
subs.remove(ctx)
@tractor.stream
@ -235,6 +237,8 @@ async def sample_and_broadcast(
try:
if tick_throttle:
# this is a send mem chan that likely
# pushes to the ``uniform_rate_send()`` below.
await stream.send(quote)
else:
@ -255,6 +259,10 @@ async def sample_and_broadcast(
subs.remove((stream, tick_throttle))
# TODO: a less naive throttler, here's some snippets:
# token bucket by njs:
# https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9
async def uniform_rate_send(
rate: float,
@ -292,7 +300,7 @@ async def uniform_rate_send(
rate = 1 / (now - last_send)
last_send = now
# print(f'{rate} Hz sending quotes\n{first_quote}')
# print(f'{rate} Hz sending quotes') # \n{first_quote}')
# TODO: now if only we could sync this to the display
# rate timing exactly lul

View File

@ -15,7 +15,7 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
NumPy compatible shared memory buffers for real-time FSP.
NumPy compatible shared memory buffers for real-time IPC streaming.
"""
from dataclasses import dataclass, asdict
@ -207,11 +207,16 @@ class ShmArray:
def push(
self,
data: np.ndarray,
prepend: bool = False,
) -> int:
"""Ring buffer like "push" to append data
'''Ring buffer like "push" to append data
into the buffer and return updated "last" index.
"""
NB: no actual ring logic yet to give a "loop around" on overflow
condition, lel.
'''
length = len(data)
if prepend:

View File

@ -31,11 +31,14 @@ from typing import (
)
import trio
from trio.abc import ReceiveChannel
from trio_typing import TaskStatus
import tractor
# from tractor import _broadcast
from pydantic import BaseModel
from ..brokers import get_brokermod
from .._cacheables import maybe_open_ctx
from ..log import get_logger, get_console_log
from .._daemon import (
maybe_spawn_brokerd,
@ -322,11 +325,28 @@ async def attach_feed_bus(
else:
sub = (stream, tick_throttle)
bus._subscribers[symbol].append(sub)
subs = bus._subscribers[symbol]
subs.append(sub)
try:
await trio.sleep_forever()
uid = ctx.chan.uid
fqsn = f'{symbol}.{brokername}'
async for msg in stream:
if msg == 'pause':
if sub in subs:
log.info(
f'Pausing {fqsn} feed for {uid}')
subs.remove(sub)
elif msg == 'resume':
if sub not in subs:
log.info(
f'Resuming {fqsn} feed for {uid}')
subs.append(sub)
else:
raise ValueError(msg)
finally:
log.info(
f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}')
@ -335,6 +355,31 @@ async def attach_feed_bus(
bus._subscribers[symbol].remove(sub)
@asynccontextmanager
async def open_sample_step_stream(
portal: tractor.Portal,
delay_s: int,
) -> tractor.ReceiveMsgStream:
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
# created for all practical purposes
async with maybe_open_ctx(
key=delay_s,
mngr=portal.open_stream_from(
iter_ohlc_periods,
delay_s=delay_s, # must be kwarg
),
) as (cache_hit, istream):
if cache_hit:
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use
async with istream.subscribe() as bistream:
yield bistream
else:
yield istream
@dataclass
class Feed:
"""A data feed for client-side interaction with far-process# }}}
@ -345,13 +390,12 @@ class Feed:
memory buffer orchestration.
"""
name: str
stream: AsyncIterator[dict[str, Any]]
shm: ShmArray
mod: ModuleType
first_quote: dict
stream: trio.abc.ReceiveChannel[dict[str, Any]]
_brokerd_portal: tractor._portal.Portal
_index_stream: Optional[AsyncIterator[int]] = None
_trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None
_max_sample_rate: int = 0
@ -362,7 +406,7 @@ class Feed:
symbols: dict[str, Symbol] = field(default_factory=dict)
async def receive(self) -> dict:
return await self.stream.__anext__()
return await self.stream.receive()
@asynccontextmanager
async def index_stream(
@ -371,18 +415,19 @@ class Feed:
) -> AsyncIterator[int]:
if not self._index_stream:
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
# created for all practical purposes
async with self._brokerd_portal.open_stream_from(
iter_ohlc_periods,
delay_s=delay_s or self._max_sample_rate,
) as self._index_stream:
delay_s = delay_s or self._max_sample_rate
yield self._index_stream
else:
yield self._index_stream
async with open_sample_step_stream(
self._brokerd_portal,
delay_s,
) as istream:
yield istream
async def pause(self) -> None:
await self.stream.send('pause')
async def resume(self) -> None:
await self.stream.send('resume')
def sym_to_shm_key(
@ -395,7 +440,7 @@ def sym_to_shm_key(
@asynccontextmanager
async def install_brokerd_search(
portal: tractor._portal.Portal,
portal: tractor.Portal,
brokermod: ModuleType,
) -> None:
@ -435,33 +480,19 @@ async def open_feed(
tick_throttle: Optional[float] = None, # Hz
) -> AsyncIterator[dict[str, Any]]:
) -> Feed:
'''
Open a "data feed" which provides streamed real-time quotes.
'''
sym = symbols[0].lower()
# TODO: feed cache locking, right now this is causing
# issues when reconnecting to a long running emsd?
# global _searcher_cache
# async with _cache_lock:
# feed = _searcher_cache.get((brokername, sym))
# # if feed is not None and sym in feed.symbols:
# if feed is not None:
# yield feed
# # short circuit
# return
try:
mod = get_brokermod(brokername)
except ImportError:
mod = get_ingestormod(brokername)
# no feed for broker exists so maybe spawn a data brokerd
async with (
maybe_spawn_brokerd(
@ -481,8 +512,8 @@ async def open_feed(
) as (ctx, (init_msg, first_quote)),
ctx.open_stream() as stream,
):
):
# we can only read from shm
shm = attach_shm_array(
token=init_msg[sym]['shm_token'],
@ -491,10 +522,10 @@ async def open_feed(
feed = Feed(
name=brokername,
stream=stream,
shm=shm,
mod=mod,
first_quote=first_quote,
stream=stream,
_brokerd_portal=portal,
)
ohlc_sample_rates = []
@ -530,3 +561,39 @@ async def open_feed(
finally:
# drop the infinite stream connection
await ctx.cancel()
@asynccontextmanager
async def maybe_open_feed(
brokername: str,
symbols: Sequence[str],
loglevel: Optional[str] = None,
**kwargs,
) -> (Feed, ReceiveChannel[dict[str, Any]]):
'''Maybe open a data to a ``brokerd`` daemon only if there is no
local one for the broker-symbol pair, if one is cached use it wrapped
in a tractor broadcast receiver.
'''
sym = symbols[0].lower()
async with maybe_open_ctx(
key=(brokername, sym),
mngr=open_feed(
brokername,
[sym],
loglevel=loglevel,
**kwargs,
),
) as (cache_hit, feed):
if cache_hit:
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use
async with feed.stream.subscribe() as bstream:
yield feed, bstream
else:
yield feed, feed.stream

View File

@ -69,6 +69,7 @@ async def fsp_compute(
ctx: tractor.Context,
symbol: str,
feed: Feed,
stream: trio.abc.ReceiveChannel,
src: ShmArray,
dst: ShmArray,
@ -93,14 +94,14 @@ async def fsp_compute(
yield {}
# task cancellation won't kill the channel
with stream.shield():
async for quotes in stream:
for symbol, quotes in quotes.items():
if symbol == sym:
yield quotes
# since we shielded at the `open_feed()` call
async for quotes in stream:
for symbol, quotes in quotes.items():
if symbol == sym:
yield quotes
out_stream = func(
filter_by_sym(symbol, feed.stream),
filter_by_sym(symbol, stream),
feed.shm,
)
@ -164,7 +165,8 @@ async def cascade(
dst_shm_token: Tuple[str, np.dtype],
symbol: str,
fsp_func_name: str,
) -> AsyncIterator[dict]:
) -> None:
"""Chain streaming signal processors and deliver output to
destination mem buf.
@ -175,7 +177,10 @@ async def cascade(
func: Callable = _fsps[fsp_func_name]
# open a data feed stream with requested broker
async with data.open_feed(brokername, [symbol]) as feed:
async with data.feed.maybe_open_feed(
brokername,
[symbol],
) as (feed, stream):
assert src.token == feed.shm.token
@ -186,6 +191,7 @@ async def cascade(
ctx=ctx,
symbol=symbol,
feed=feed,
stream=stream,
src=src,
dst=dst,