diff --git a/piker/data/__init__.py b/piker/data/__init__.py index fa26801c..579e596f 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -75,10 +75,12 @@ def get_ingestormod(name: str) -> ModuleType: return module +# capable rpc modules _data_mods = [ 'piker.brokers.core', 'piker.brokers.data', 'piker.data', + 'piker.data._buffer', ] @@ -104,10 +106,13 @@ async def maybe_spawn_brokerd( brokermod = get_brokermod(brokername) dname = f'brokerd.{brokername}' async with tractor.find_actor(dname) as portal: + # WTF: why doesn't this work? if portal is not None: yield portal - else: + + else: # no daemon has been spawned yet + log.info(f"Spawning {brokername} broker daemon") tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) async with tractor.open_nursery() as nursery: @@ -115,7 +120,7 @@ async def maybe_spawn_brokerd( # spawn new daemon portal = await nursery.start_actor( dname, - rpc_module_paths=_data_mods + [brokermod.__name__], + enable_modules=_data_mods + [brokermod.__name__], loglevel=loglevel, **tractor_kwargs ) @@ -140,7 +145,7 @@ class Feed: stream: AsyncIterator[Dict[str, Any]] shm: ShmArray # ticks: ShmArray - _broker_portal: tractor._portal.Portal + _brokerd_portal: tractor._portal.Portal _index_stream: Optional[AsyncIterator[Dict[str, Any]]] = None async def receive(self) -> dict: @@ -151,9 +156,8 @@ class Feed: # XXX: this should be singleton on a host, # a lone broker-daemon per provider should be # created for all practical purposes - self._index_stream = await self._broker_portal.run( - 'piker.data', - 'increment_ohlc_buffer', + self._index_stream = await self._brokerd_portal.run( + increment_ohlc_buffer, shm_token=self.shm.token, topics=['index'], ) @@ -200,8 +204,7 @@ async def open_feed( loglevel=loglevel, ) as portal: stream = await portal.run( - mod.__name__, - 'stream_quotes', + mod.stream_quotes, symbols=symbols, shm_token=shm.token, @@ -225,5 +228,5 @@ async def open_feed( name=name, stream=stream, shm=shm, - _broker_portal=portal, + _brokerd_portal=portal, )