diff --git a/piker/_async_utils.py b/piker/_async_utils.py deleted file mode 100644 index fb221215..00000000 --- a/piker/_async_utils.py +++ /dev/null @@ -1,66 +0,0 @@ -# piker: trading gear for hackers -# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -""" -Async utils no one seems to have built into a core lib (yet). -""" -from typing import AsyncContextManager -from collections import OrderedDict -from contextlib import asynccontextmanager - - -def async_lifo_cache(maxsize=128): - """Async ``cache`` with a LIFO policy. - - Implemented my own since no one else seems to have - a standard. I'll wait for the smarter people to come - up with one, but until then... - """ - cache = OrderedDict() - - def decorator(fn): - - async def wrapper(*args): - key = args - try: - return cache[key] - except KeyError: - if len(cache) >= maxsize: - # discard last added new entry - cache.popitem() - - # do it - cache[key] = await fn(*args) - return cache[key] - - return wrapper - - return decorator - - -@asynccontextmanager -async def _just_none(): - # noop -> skip entering context - yield None - - -@asynccontextmanager -async def maybe_with_if( - predicate: bool, - context: AsyncContextManager, -) -> AsyncContextManager: - async with context if predicate else _just_none() as output: - yield output diff --git a/piker/_cacheables.py b/piker/_cacheables.py new file mode 100644 index 00000000..87b700f5 --- /dev/null +++ b/piker/_cacheables.py @@ -0,0 +1,195 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Cacheing apis and toolz. + +""" +# further examples of interest: +# https://gist.github.com/njsmith/cf6fc0a97f53865f2c671659c88c1798#file-cache-py-L8 + +from collections import OrderedDict +from typing import ( + Any, + Hashable, + Optional, + TypeVar, + AsyncContextManager, +) +from contextlib import ( + asynccontextmanager, +) + +import trio +from trio_typing import TaskStatus +import tractor + +from .brokers import get_brokermod +from .log import get_logger + + +T = TypeVar('T') +log = get_logger(__name__) + + +def async_lifo_cache(maxsize=128): + """Async ``cache`` with a LIFO policy. + + Implemented my own since no one else seems to have + a standard. I'll wait for the smarter people to come + up with one, but until then... + """ + cache = OrderedDict() + + def decorator(fn): + + async def wrapper(*args): + key = args + try: + return cache[key] + except KeyError: + if len(cache) >= maxsize: + # discard last added new entry + cache.popitem() + + # do it + cache[key] = await fn(*args) + return cache[key] + + return wrapper + + return decorator + + +_cache: dict[str, 'Client'] = {} # noqa + + +class cache: + '''Globally (processs wide) cached, task access to a + kept-alive-while-in-use async resource. + + ''' + lock = trio.Lock() + users: int = 0 + values: dict[Any, Any] = {} + resources: dict[ + int, + Optional[tuple[trio.Nursery, trio.Event]] + ] = {} + no_more_users: Optional[trio.Event] = None + + @classmethod + async def run_ctx( + cls, + mng, + key, + task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED, + + ) -> None: + async with mng as value: + + _, no_more_users = cls.resources[id(mng)] + cls.values[key] = value + task_status.started(value) + try: + await no_more_users.wait() + finally: + value = cls.values.pop(key) + # discard nursery ref so it won't be re-used (an error) + cls.resources.pop(id(mng)) + + +@asynccontextmanager +async def maybe_open_ctx( + + key: Hashable, + mngr: AsyncContextManager[T], + +) -> (bool, T): + '''Maybe open a context manager if there is not already a cached + version for the provided ``key``. Return the cached instance on + a cache hit. + + ''' + + await cache.lock.acquire() + + ctx_key = id(mngr) + + value = None + try: + # lock feed acquisition around task racing / ``trio``'s + # scheduler protocol + value = cache.values[key] + log.info(f'Reusing cached resource for {key}') + cache.users += 1 + cache.lock.release() + yield True, value + + except KeyError: + log.info(f'Allocating new feed for {key}') + + # **critical section** that should prevent other tasks from + # checking the cache until complete otherwise the scheduler + # may switch and by accident we create more then one feed. + + # TODO: avoid pulling from ``tractor`` internals and + # instead offer a "root nursery" in piker actors? + service_n = tractor.current_actor()._service_n + + # TODO: does this need to be a tractor "root nursery"? + ln = cache.resources.get(ctx_key) + assert not ln + + ln, _ = cache.resources[ctx_key] = (service_n, trio.Event()) + + value = await ln.start(cache.run_ctx, mngr, key) + cache.users += 1 + cache.lock.release() + + yield False, value + + finally: + cache.users -= 1 + + if cache.lock.locked(): + cache.lock.release() + + if value is not None: + # if no more consumers, teardown the client + if cache.users <= 0: + log.warning(f'De-allocating resource for {key}') + + # terminate mngr nursery + _, no_more_users = cache.resources[ctx_key] + no_more_users.set() + + +@asynccontextmanager +async def open_cached_client( + brokername: str, +) -> 'Client': # noqa + '''Get a cached broker client from the current actor's local vars. + + If one has not been setup do it and cache it. + + ''' + brokermod = get_brokermod(brokername) + async with maybe_open_ctx( + key=brokername, + mngr=brokermod.get_client(), + ) as (cache_hit, client): + yield client diff --git a/piker/brokers/api.py b/piker/brokers/api.py deleted file mode 100644 index c1e11a01..00000000 --- a/piker/brokers/api.py +++ /dev/null @@ -1,85 +0,0 @@ -# piker: trading gear for hackers -# Copyright (C) Tyler Goodlet (in stewardship for piker0) - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -""" -Actor-aware broker agnostic interface. - -""" -from typing import Dict -from contextlib import asynccontextmanager, AsyncExitStack - -import trio - -from . import get_brokermod -from ..log import get_logger - - -log = get_logger(__name__) - - -_cache: Dict[str, 'Client'] = {} # noqa - - -@asynccontextmanager -async def open_cached_client( - brokername: str, - *args, - **kwargs, -) -> 'Client': # noqa - """Get a cached broker client from the current actor's local vars. - - If one has not been setup do it and cache it. - """ - global _cache - - clients = _cache.setdefault('clients', {'_lock': trio.Lock()}) - - # global cache task lock - lock = clients['_lock'] - - client = None - - try: - log.info(f"Loading existing `{brokername}` client") - - async with lock: - client = clients[brokername] - client._consumers += 1 - - yield client - - except KeyError: - log.info(f"Creating new client for broker {brokername}") - - async with lock: - brokermod = get_brokermod(brokername) - exit_stack = AsyncExitStack() - - client = await exit_stack.enter_async_context( - brokermod.get_client() - ) - client._consumers = 0 - client._exit_stack = exit_stack - clients[brokername] = client - - yield client - - finally: - if client is not None: - # if no more consumers, teardown the client - client._consumers -= 1 - if client._consumers <= 0: - await client._exit_stack.aclose() diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index e8309cdb..59fb3ea4 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -33,7 +33,7 @@ from pydantic.dataclasses import dataclass from pydantic import BaseModel import wsproto -from .api import open_cached_client +from .._cacheables import open_cached_client from ._util import resproc, SymbolNotFound from ..log import get_logger, get_console_log from ..data import ShmArray diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 59621b63..b16f46fe 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -29,7 +29,7 @@ import trio from ..log import get_logger from . import get_brokermod from .._daemon import maybe_spawn_brokerd -from .api import open_cached_client +from .._cacheables import open_cached_client log = get_logger(__name__) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 9369a73b..48b20d80 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -14,9 +14,14 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" -Real-time data feed machinery -""" +''' +NB: this is the old original implementation that was used way way back +when the project started with ``kivy``. + +This code is left for reference but will likely be merged in +appropriately and removed. + +''' import time from functools import partial from dataclasses import dataclass, field diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index f604fc93..45b93416 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1358,6 +1358,9 @@ async def trades_dialogue( # start order request handler **before** local trades event loop n.start_soon(handle_order_requests, ems_stream) + # TODO: for some reason we can receive a ``None`` here when the + # ib-gw goes down? Not sure exactly how that's happening looking + # at the eventkit code above but we should probably handle it... async for event_name, item in ib_trade_events_stream: # XXX: begin normalization of nonsense ib_insync internal @@ -1469,9 +1472,8 @@ async def trades_dialogue( @tractor.context async def open_symbol_search( ctx: tractor.Context, -) -> None: - # async with open_cached_client('ib') as client: +) -> None: # load all symbols locally for fast search await ctx.started({}) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 6d41ab10..cfce2d5a 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -34,7 +34,7 @@ from pydantic.dataclasses import dataclass from pydantic import BaseModel import wsproto -from .api import open_cached_client +from .._cacheables import open_cached_client from ._util import resproc, SymbolNotFound, BrokerError from ..log import get_logger, get_console_log from ..data import ShmArray diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 3f09587c..7a06ce76 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -42,12 +42,11 @@ import wrapt import asks from ..calc import humanize, percent_change +from .._cacheables import open_cached_client, async_lifo_cache from . import config from ._util import resproc, BrokerError, SymbolNotFound from ..log import get_logger, colorize_json, get_console_log -from .._async_utils import async_lifo_cache from . import get_brokermod -from . import api log = get_logger(__name__) @@ -1197,7 +1196,7 @@ async def stream_quotes( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel) - async with api.open_cached_client('questrade') as client: + async with open_cached_client('questrade') as client: if feed_type == 'stock': formatter = format_stock_quote get_quotes = await stock_quoter(client, symbols) diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 97869bb9..89630722 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -25,6 +25,7 @@ from dataclasses import dataclass, field import trio import tractor +from tractor._broadcast import broadcast_receiver from ..data._source import Symbol from ..log import get_logger @@ -38,7 +39,7 @@ log = get_logger(__name__) @dataclass class OrderBook: - """Buy-side (client-side ?) order book ctl and tracking. + '''EMS-client-side order book ctl and tracking. A style similar to "model-view" is used here where this api is provided as a supervised control for an EMS actor which does all the @@ -48,7 +49,7 @@ class OrderBook: Currently, this is mostly for keeping local state to match the EMS and use received events to trigger graphics updates. - """ + ''' # mem channels used to relay order requests to the EMS daemon _to_ems: trio.abc.SendChannel _from_order_book: trio.abc.ReceiveChannel @@ -123,10 +124,15 @@ def get_orders( global _orders if _orders is None: + size = 100 + tx, rx = trio.open_memory_channel(size) + brx = broadcast_receiver(rx, size) + # setup local ui event streaming channels for request/resp # streamging with EMS daemon _orders = OrderBook( - *trio.open_memory_channel(100), + _to_ems=tx, + _from_order_book=brx, ) return _orders @@ -157,23 +163,12 @@ async def relay_order_cmds_from_sync_code( """ book = get_orders() - orders_stream = book._from_order_book - - async for cmd in orders_stream: - - print(cmd) - if cmd['symbol'] == symbol_key: - - # send msg over IPC / wire - log.info(f'Send order cmd:\n{pformat(cmd)}') - await to_ems_stream.send(cmd) - - else: - # XXX BRUTAL HACKZORZES !!! - # re-insert for another consumer - # we need broadcast channelz...asap - # https://github.com/goodboy/tractor/issues/204 - book._to_ems.send_nowait(cmd) + async with book._from_order_book.subscribe() as orders_stream: + async for cmd in orders_stream: + if cmd['symbol'] == symbol_key: + log.info(f'Send order cmd:\n{pformat(cmd)}') + # send msg over IPC / wire + await to_ems_stream.send(cmd) @asynccontextmanager diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 7dbbfbad..91197d60 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -22,7 +22,7 @@ from contextlib import asynccontextmanager from dataclasses import dataclass, field from pprint import pformat import time -from typing import AsyncIterator, Callable, Any +from typing import AsyncIterator, Callable from bidict import bidict from pydantic import BaseModel @@ -30,10 +30,9 @@ import trio from trio_typing import TaskStatus import tractor -from .. import data from ..log import get_logger from ..data._normalize import iterticks -from ..data.feed import Feed +from ..data.feed import Feed, maybe_open_feed from .._daemon import maybe_spawn_brokerd from . import _paper_engine as paper from ._messages import ( @@ -123,7 +122,7 @@ class _DarkBook: # XXX: this is in place to prevent accidental positions that are too # big. Now obviously this won't make sense for crypto like BTC, but # for most traditional brokers it should be fine unless you start -# slinging NQ futes or something. +# slinging NQ futes or something; check ur margin. _DEFAULT_SIZE: float = 1.0 @@ -132,7 +131,6 @@ async def clear_dark_triggers( brokerd_orders_stream: tractor.MsgStream, ems_client_order_stream: tractor.MsgStream, quote_stream: tractor.ReceiveMsgStream, # noqa - broker: str, symbol: str, @@ -266,7 +264,7 @@ class TradesRelay: consumers: int = 0 -class _Router(BaseModel): +class Router(BaseModel): '''Order router which manages and tracks per-broker dark book, alerts, clearing and related data feed management. @@ -276,8 +274,6 @@ class _Router(BaseModel): # setup at actor spawn time nursery: trio.Nursery - feeds: dict[tuple[str, str], Any] = {} - # broker to book map books: dict[str, _DarkBook] = {} @@ -343,12 +339,12 @@ class _Router(BaseModel): relay.consumers -= 1 -_router: _Router = None +_router: Router = None async def open_brokerd_trades_dialogue( - router: _Router, + router: Router, feed: Feed, symbol: str, _exec_mode: str, @@ -466,7 +462,7 @@ async def _setup_persistent_emsd( # open a root "service nursery" for the ``emsd`` actor async with trio.open_nursery() as service_nursery: - _router = _Router(nursery=service_nursery) + _router = Router(nursery=service_nursery) # TODO: send back the full set of persistent # orders/execs? @@ -480,7 +476,7 @@ async def translate_and_relay_brokerd_events( broker: str, brokerd_trades_stream: tractor.MsgStream, - router: _Router, + router: Router, ) -> AsyncIterator[dict]: '''Trades update loop - receive updates from ``brokerd`` trades @@ -704,7 +700,7 @@ async def process_client_order_cmds( symbol: str, feed: Feed, # noqa dark_book: _DarkBook, - router: _Router, + router: Router, ) -> None: @@ -958,32 +954,25 @@ async def _emsd_main( # tractor.Context instead of strictly requiring a ctx arg. ems_ctx = ctx - cached_feed = _router.feeds.get((broker, symbol)) - if cached_feed: - # TODO: use cached feeds per calling-actor - log.warning(f'Opening duplicate feed for {(broker, symbol)}') + feed: Feed # spawn one task per broker feed async with ( - # TODO: eventually support N-brokers - data.open_feed( + maybe_open_feed( broker, [symbol], loglevel=loglevel, - ) as feed, + ) as (feed, stream), ): - if not cached_feed: - _router.feeds[(broker, symbol)] = feed # XXX: this should be initial price quote from target provider first_quote = feed.first_quote - # open a stream with the brokerd backend for order - # flow dialogue - book = _router.get_dark_book(broker) book.lasts[(broker, symbol)] = first_quote[symbol]['last'] + # open a stream with the brokerd backend for order + # flow dialogue async with ( # only open if one isn't already up: we try to keep @@ -1015,11 +1004,9 @@ async def _emsd_main( n.start_soon( clear_dark_triggers, - # relay.brokerd_dialogue, brokerd_stream, ems_client_order_stream, - feed.stream, - + stream, broker, symbol, book diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 490ae4b0..bf9ecbba 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -118,8 +118,9 @@ async def increment_ohlc_buffer( shm.push(last) # broadcast the buffer index step - # yield {'index': shm._last.value} - for ctx in _subscribers.get(delay_s, ()): + subs = _subscribers.get(delay_s, ()) + + for ctx in subs: try: await ctx.send_yield({'index': shm._last.value}) except ( @@ -127,6 +128,7 @@ async def increment_ohlc_buffer( trio.ClosedResourceError ): log.error(f'{ctx.chan.uid} dropped connection') + subs.remove(ctx) @tractor.stream @@ -235,6 +237,8 @@ async def sample_and_broadcast( try: if tick_throttle: + # this is a send mem chan that likely + # pushes to the ``uniform_rate_send()`` below. await stream.send(quote) else: @@ -255,6 +259,10 @@ async def sample_and_broadcast( subs.remove((stream, tick_throttle)) +# TODO: a less naive throttler, here's some snippets: +# token bucket by njs: +# https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9 + async def uniform_rate_send( rate: float, @@ -292,7 +300,7 @@ async def uniform_rate_send( rate = 1 / (now - last_send) last_send = now - # print(f'{rate} Hz sending quotes\n{first_quote}') + # print(f'{rate} Hz sending quotes') # \n{first_quote}') # TODO: now if only we could sync this to the display # rate timing exactly lul diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index aa2a5320..f7f9f904 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -15,7 +15,7 @@ # along with this program. If not, see . """ -NumPy compatible shared memory buffers for real-time FSP. +NumPy compatible shared memory buffers for real-time IPC streaming. """ from dataclasses import dataclass, asdict @@ -207,11 +207,16 @@ class ShmArray: def push( self, data: np.ndarray, + prepend: bool = False, + ) -> int: - """Ring buffer like "push" to append data + '''Ring buffer like "push" to append data into the buffer and return updated "last" index. - """ + + NB: no actual ring logic yet to give a "loop around" on overflow + condition, lel. + ''' length = len(data) if prepend: diff --git a/piker/data/feed.py b/piker/data/feed.py index ed24a095..72d3c50d 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -31,11 +31,14 @@ from typing import ( ) import trio +from trio.abc import ReceiveChannel from trio_typing import TaskStatus import tractor +# from tractor import _broadcast from pydantic import BaseModel from ..brokers import get_brokermod +from .._cacheables import maybe_open_ctx from ..log import get_logger, get_console_log from .._daemon import ( maybe_spawn_brokerd, @@ -322,11 +325,28 @@ async def attach_feed_bus( else: sub = (stream, tick_throttle) - bus._subscribers[symbol].append(sub) + subs = bus._subscribers[symbol] + subs.append(sub) try: - await trio.sleep_forever() + uid = ctx.chan.uid + fqsn = f'{symbol}.{brokername}' + async for msg in stream: + + if msg == 'pause': + if sub in subs: + log.info( + f'Pausing {fqsn} feed for {uid}') + subs.remove(sub) + + elif msg == 'resume': + if sub not in subs: + log.info( + f'Resuming {fqsn} feed for {uid}') + subs.append(sub) + else: + raise ValueError(msg) finally: log.info( f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}') @@ -335,6 +355,31 @@ async def attach_feed_bus( bus._subscribers[symbol].remove(sub) +@asynccontextmanager +async def open_sample_step_stream( + portal: tractor.Portal, + delay_s: int, + +) -> tractor.ReceiveMsgStream: + # XXX: this should be singleton on a host, + # a lone broker-daemon per provider should be + # created for all practical purposes + async with maybe_open_ctx( + key=delay_s, + mngr=portal.open_stream_from( + iter_ohlc_periods, + delay_s=delay_s, # must be kwarg + ), + ) as (cache_hit, istream): + if cache_hit: + # add a new broadcast subscription for the quote stream + # if this feed is likely already in use + async with istream.subscribe() as bistream: + yield bistream + else: + yield istream + + @dataclass class Feed: """A data feed for client-side interaction with far-process# }}} @@ -345,13 +390,12 @@ class Feed: memory buffer orchestration. """ name: str - stream: AsyncIterator[dict[str, Any]] shm: ShmArray mod: ModuleType first_quote: dict + stream: trio.abc.ReceiveChannel[dict[str, Any]] _brokerd_portal: tractor._portal.Portal - _index_stream: Optional[AsyncIterator[int]] = None _trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None _max_sample_rate: int = 0 @@ -362,7 +406,7 @@ class Feed: symbols: dict[str, Symbol] = field(default_factory=dict) async def receive(self) -> dict: - return await self.stream.__anext__() + return await self.stream.receive() @asynccontextmanager async def index_stream( @@ -371,18 +415,19 @@ class Feed: ) -> AsyncIterator[int]: - if not self._index_stream: - # XXX: this should be singleton on a host, - # a lone broker-daemon per provider should be - # created for all practical purposes - async with self._brokerd_portal.open_stream_from( - iter_ohlc_periods, - delay_s=delay_s or self._max_sample_rate, - ) as self._index_stream: + delay_s = delay_s or self._max_sample_rate - yield self._index_stream - else: - yield self._index_stream + async with open_sample_step_stream( + self._brokerd_portal, + delay_s, + ) as istream: + yield istream + + async def pause(self) -> None: + await self.stream.send('pause') + + async def resume(self) -> None: + await self.stream.send('resume') def sym_to_shm_key( @@ -395,7 +440,7 @@ def sym_to_shm_key( @asynccontextmanager async def install_brokerd_search( - portal: tractor._portal.Portal, + portal: tractor.Portal, brokermod: ModuleType, ) -> None: @@ -435,33 +480,19 @@ async def open_feed( tick_throttle: Optional[float] = None, # Hz -) -> AsyncIterator[dict[str, Any]]: +) -> Feed: ''' Open a "data feed" which provides streamed real-time quotes. ''' sym = symbols[0].lower() - # TODO: feed cache locking, right now this is causing - # issues when reconnecting to a long running emsd? - # global _searcher_cache - - # async with _cache_lock: - # feed = _searcher_cache.get((brokername, sym)) - - # # if feed is not None and sym in feed.symbols: - # if feed is not None: - # yield feed - # # short circuit - # return - try: mod = get_brokermod(brokername) except ImportError: mod = get_ingestormod(brokername) # no feed for broker exists so maybe spawn a data brokerd - async with ( maybe_spawn_brokerd( @@ -481,8 +512,8 @@ async def open_feed( ) as (ctx, (init_msg, first_quote)), ctx.open_stream() as stream, - ): + ): # we can only read from shm shm = attach_shm_array( token=init_msg[sym]['shm_token'], @@ -491,10 +522,10 @@ async def open_feed( feed = Feed( name=brokername, - stream=stream, shm=shm, mod=mod, first_quote=first_quote, + stream=stream, _brokerd_portal=portal, ) ohlc_sample_rates = [] @@ -530,3 +561,39 @@ async def open_feed( finally: # drop the infinite stream connection await ctx.cancel() + + +@asynccontextmanager +async def maybe_open_feed( + + brokername: str, + symbols: Sequence[str], + loglevel: Optional[str] = None, + + **kwargs, + +) -> (Feed, ReceiveChannel[dict[str, Any]]): + '''Maybe open a data to a ``brokerd`` daemon only if there is no + local one for the broker-symbol pair, if one is cached use it wrapped + in a tractor broadcast receiver. + + ''' + sym = symbols[0].lower() + + async with maybe_open_ctx( + key=(brokername, sym), + mngr=open_feed( + brokername, + [sym], + loglevel=loglevel, + **kwargs, + ), + ) as (cache_hit, feed): + + if cache_hit: + # add a new broadcast subscription for the quote stream + # if this feed is likely already in use + async with feed.stream.subscribe() as bstream: + yield feed, bstream + else: + yield feed, feed.stream diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index 312a0cef..a6324bb6 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -69,6 +69,7 @@ async def fsp_compute( ctx: tractor.Context, symbol: str, feed: Feed, + stream: trio.abc.ReceiveChannel, src: ShmArray, dst: ShmArray, @@ -93,14 +94,14 @@ async def fsp_compute( yield {} # task cancellation won't kill the channel - with stream.shield(): - async for quotes in stream: - for symbol, quotes in quotes.items(): - if symbol == sym: - yield quotes + # since we shielded at the `open_feed()` call + async for quotes in stream: + for symbol, quotes in quotes.items(): + if symbol == sym: + yield quotes out_stream = func( - filter_by_sym(symbol, feed.stream), + filter_by_sym(symbol, stream), feed.shm, ) @@ -164,7 +165,8 @@ async def cascade( dst_shm_token: Tuple[str, np.dtype], symbol: str, fsp_func_name: str, -) -> AsyncIterator[dict]: + +) -> None: """Chain streaming signal processors and deliver output to destination mem buf. @@ -175,7 +177,10 @@ async def cascade( func: Callable = _fsps[fsp_func_name] # open a data feed stream with requested broker - async with data.open_feed(brokername, [symbol]) as feed: + async with data.feed.maybe_open_feed( + brokername, + [symbol], + ) as (feed, stream): assert src.token == feed.shm.token @@ -186,6 +191,7 @@ async def cascade( ctx=ctx, symbol=symbol, feed=feed, + stream=stream, src=src, dst=dst,