Port to new `Portal.run()` api

basic_alerts
Tyler Goodlet 2021-01-04 14:45:34 -05:00
parent 267c8c6bd3
commit 3c424a153f
1 changed files with 12 additions and 9 deletions

View File

@ -75,10 +75,12 @@ def get_ingestormod(name: str) -> ModuleType:
return module
# capable rpc modules
_data_mods = [
'piker.brokers.core',
'piker.brokers.data',
'piker.data',
'piker.data._buffer',
]
@ -104,10 +106,13 @@ async def maybe_spawn_brokerd(
brokermod = get_brokermod(brokername)
dname = f'brokerd.{brokername}'
async with tractor.find_actor(dname) as portal:
# WTF: why doesn't this work?
if portal is not None:
yield portal
else:
else: # no daemon has been spawned yet
log.info(f"Spawning {brokername} broker daemon")
tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {})
async with tractor.open_nursery() as nursery:
@ -115,7 +120,7 @@ async def maybe_spawn_brokerd(
# spawn new daemon
portal = await nursery.start_actor(
dname,
rpc_module_paths=_data_mods + [brokermod.__name__],
enable_modules=_data_mods + [brokermod.__name__],
loglevel=loglevel,
**tractor_kwargs
)
@ -140,7 +145,7 @@ class Feed:
stream: AsyncIterator[Dict[str, Any]]
shm: ShmArray
# ticks: ShmArray
_broker_portal: tractor._portal.Portal
_brokerd_portal: tractor._portal.Portal
_index_stream: Optional[AsyncIterator[Dict[str, Any]]] = None
async def receive(self) -> dict:
@ -151,9 +156,8 @@ class Feed:
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
# created for all practical purposes
self._index_stream = await self._broker_portal.run(
'piker.data',
'increment_ohlc_buffer',
self._index_stream = await self._brokerd_portal.run(
increment_ohlc_buffer,
shm_token=self.shm.token,
topics=['index'],
)
@ -200,8 +204,7 @@ async def open_feed(
loglevel=loglevel,
) as portal:
stream = await portal.run(
mod.__name__,
'stream_quotes',
mod.stream_quotes,
symbols=symbols,
shm_token=shm.token,
@ -225,5 +228,5 @@ async def open_feed(
name=name,
stream=stream,
shm=shm,
_broker_portal=portal,
_brokerd_portal=portal,
)