Port data feed to new tractor stream api

tractor_open_stream_from
Tyler Goodlet 2021-04-29 08:37:35 -04:00
parent 3375735914
commit 0d9f091a34
1 changed files with 62 additions and 57 deletions

View File

@ -51,6 +51,7 @@ from ._sampling import (
iter_ohlc_periods, iter_ohlc_periods,
sample_and_broadcast, sample_and_broadcast,
) )
from .ingest import get_ingestormod
log = get_logger(__name__) log = get_logger(__name__)
@ -302,6 +303,7 @@ class Feed:
async def receive(self) -> dict: async def receive(self) -> dict:
return await self.stream.__anext__() return await self.stream.__anext__()
@asynccontextmanager
async def index_stream( async def index_stream(
self, self,
delay_s: Optional[int] = None delay_s: Optional[int] = None
@ -312,14 +314,16 @@ class Feed:
# XXX: this should be singleton on a host, # XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be # a lone broker-daemon per provider should be
# created for all practical purposes # created for all practical purposes
self._index_stream = await self._brokerd_portal.run( async with self._brokerd_portal.open_stream_from(
iter_ohlc_periods, iter_ohlc_periods,
delay_s=delay_s or self._max_sample_rate, delay_s=delay_s or self._max_sample_rate,
) ) as self._index_stream:
yield self._index_stream
else:
yield self._index_stream
return self._index_stream @asynccontextmanager
async def receive_trades_data(self) -> AsyncIterator[dict]:
async def recv_trades_data(self) -> AsyncIterator[dict]:
if not getattr(self.mod, 'stream_trades', False): if not getattr(self.mod, 'stream_trades', False):
log.warning( log.warning(
@ -333,7 +337,7 @@ class Feed:
# using the ``_.set_fake_trades_stream()`` method # using the ``_.set_fake_trades_stream()`` method
if self._trade_stream is None: if self._trade_stream is None:
self._trade_stream = await self._brokerd_portal.run( async with self._brokerd_portal.open_stream_from(
self.mod.stream_trades, self.mod.stream_trades,
@ -342,9 +346,10 @@ class Feed:
# in messages, though we could probably use # in messages, though we could probably use
# more then one? # more then one?
topics=['local_trades'], topics=['local_trades'],
) ) as self._trade_stream:
yield self._trade_stream
return self._trade_stream else:
yield self._trade_stream
def sym_to_shm_key( def sym_to_shm_key(
@ -373,64 +378,64 @@ async def open_feed(
# TODO: do all! # TODO: do all!
sym = symbols[0] sym = symbols[0]
async with maybe_spawn_brokerd( # TODO: compress these to one line with py3.9+
brokername, async with maybe_spawn_brokerd(brokername, loglevel=loglevel) as portal:
loglevel=loglevel,
) as portal: async with portal.open_stream_from(
stream = await portal.run(
attach_feed_bus, attach_feed_bus,
brokername=brokername, brokername=brokername,
symbol=sym, symbol=sym,
loglevel=loglevel, loglevel=loglevel
)
# TODO: can we make this work better with the proposed ) as stream:
# context based bidirectional streaming style api proposed in:
# https://github.com/goodboy/tractor/issues/53
init_msg = await stream.receive()
# we can only read from shm # TODO: can we make this work better with the proposed
shm = attach_shm_array( # context based bidirectional streaming style api proposed in:
token=init_msg[sym]['shm_token'], # https://github.com/goodboy/tractor/issues/53
readonly=True, init_msg = await stream.receive()
)
feed = Feed( # we can only read from shm
name=brokername, shm = attach_shm_array(
stream=stream, token=init_msg[sym]['shm_token'],
shm=shm, readonly=True,
mod=mod,
_brokerd_portal=portal,
)
ohlc_sample_rates = []
for sym, data in init_msg.items():
si = data['symbol_info']
ohlc_sample_rates.append(data['sample_rate'])
symbol = Symbol(
key=sym,
type_key=si.get('asset_type', 'forex'),
tick_size=si.get('price_tick_size', 0.01),
lot_tick_size=si.get('lot_tick_size', 0.0),
) )
symbol.broker_info[brokername] = si
feed.symbols[sym] = symbol feed = Feed(
name=brokername,
stream=stream,
shm=shm,
mod=mod,
_brokerd_portal=portal,
)
ohlc_sample_rates = []
# cast shm dtype to list... can't member why we need this for sym, data in init_msg.items():
shm_token = data['shm_token']
shm_token['dtype_descr'] = list(shm_token['dtype_descr'])
assert shm_token == shm.token # sanity
feed._max_sample_rate = max(ohlc_sample_rates) si = data['symbol_info']
ohlc_sample_rates.append(data['sample_rate'])
try: symbol = Symbol(
yield feed key=sym,
type_key=si.get('asset_type', 'forex'),
tick_size=si.get('price_tick_size', 0.01),
lot_tick_size=si.get('lot_tick_size', 0.0),
)
symbol.broker_info[brokername] = si
finally: feed.symbols[sym] = symbol
# always cancel the far end producer task
with trio.CancelScope(shield=True): # cast shm dtype to list... can't member why we need this
await stream.aclose() shm_token = data['shm_token']
shm_token['dtype_descr'] = list(shm_token['dtype_descr'])
assert shm_token == shm.token # sanity
feed._max_sample_rate = max(ohlc_sample_rates)
try:
yield feed
finally:
# always cancel the far end producer task
with trio.CancelScope(shield=True):
await stream.aclose()