Port feed bus endpoint to a `@tractor.context`

naive_feed_throttling
Tyler Goodlet 2021-06-14 10:55:01 -04:00
parent 8cde68fedb
commit 57a35a3c6c
3 changed files with 68 additions and 95 deletions

View File

@ -724,7 +724,7 @@ async def _emsd_main(
_router.feeds[(broker, symbol)] = feed _router.feeds[(broker, symbol)] = feed
# XXX: this should be initial price quote from target provider # XXX: this should be initial price quote from target provider
first_quote = await feed.receive() first_quote = feed.first_quote
# open a stream with the brokerd backend for order # open a stream with the brokerd backend for order
# flow dialogue # flow dialogue

View File

@ -229,10 +229,10 @@ async def sample_and_broadcast(
# thus other consumers still attached. # thus other consumers still attached.
subs = bus._subscribers[sym.lower()] subs = bus._subscribers[sym.lower()]
for ctx in subs: for stream in subs:
# print(f'sub is {ctx.chan.uid}') # print(f'sub is {ctx.chan.uid}')
try: try:
await ctx.send_yield({sym: quote}) await stream.send({sym: quote})
except ( except (
trio.BrokenResourceError, trio.BrokenResourceError,
trio.ClosedResourceError trio.ClosedResourceError
@ -241,4 +241,4 @@ async def sample_and_broadcast(
# if it's done in the fee bus code? # if it's done in the fee bus code?
# so far seems like no since this should all # so far seems like no since this should all
# be single-threaded. # be single-threaded.
log.error(f'{ctx.chan.uid} dropped connection') log.error(f'{stream._ctx.chan.uid} dropped connection')

View File

@ -149,7 +149,6 @@ async def _setup_persistent_brokerd(
async def allocate_persistent_feed( async def allocate_persistent_feed(
ctx: tractor.Context,
bus: _FeedsBus, bus: _FeedsBus,
brokername: str, brokername: str,
symbol: str, symbol: str,
@ -240,7 +239,7 @@ async def allocate_persistent_feed(
await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm) await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm)
@tractor.stream @tractor.context
async def attach_feed_bus( async def attach_feed_bus(
ctx: tractor.Context, ctx: tractor.Context,
@ -260,10 +259,11 @@ async def attach_feed_bus(
assert 'brokerd' in tractor.current_actor().name assert 'brokerd' in tractor.current_actor().name
bus = get_feed_bus(brokername) bus = get_feed_bus(brokername)
sub_only: bool = False
entry = bus.feeds.get(symbol) entry = bus.feeds.get(symbol)
bus._subscribers.setdefault(symbol, [])
# if no cached feed for this symbol has been created for this # if no cached feed for this symbol has been created for this
# brokerd yet, start persistent stream and shm writer task in # brokerd yet, start persistent stream and shm writer task in
# service nursery # service nursery
@ -272,7 +272,7 @@ async def attach_feed_bus(
init_msg, first_quote = await bus.nursery.start( init_msg, first_quote = await bus.nursery.start(
partial( partial(
allocate_persistent_feed, allocate_persistent_feed,
ctx=ctx,
bus=bus, bus=bus,
brokername=brokername, brokername=brokername,
@ -284,29 +284,24 @@ async def attach_feed_bus(
loglevel=loglevel, loglevel=loglevel,
) )
) )
bus._subscribers.setdefault(symbol, []).append(ctx)
assert isinstance(bus.feeds[symbol], tuple) assert isinstance(bus.feeds[symbol], tuple)
else:
sub_only = True
# XXX: ``first_quote`` may be outdated here if this is secondary # XXX: ``first_quote`` may be outdated here if this is secondary
# subscriber # subscriber
cs, init_msg, first_quote = bus.feeds[symbol] cs, init_msg, first_quote = bus.feeds[symbol]
# send this even to subscribers to existing feed? # send this even to subscribers to existing feed?
await ctx.send_yield(init_msg) # deliver initial info message a first quote asap
await ctx.started((init_msg, first_quote))
# deliver a first quote asap async with ctx.open_stream() as stream:
await ctx.send_yield(first_quote)
if sub_only: bus._subscribers[symbol].append(stream)
bus._subscribers[symbol].append(ctx)
try: try:
await trio.sleep_forever() await trio.sleep_forever()
finally: finally:
bus._subscribers[symbol].remove(ctx) bus._subscribers[symbol].remove(stream)
@dataclass @dataclass
@ -322,6 +317,7 @@ class Feed:
stream: AsyncIterator[Dict[str, Any]] stream: AsyncIterator[Dict[str, Any]]
shm: ShmArray shm: ShmArray
mod: ModuleType mod: ModuleType
first_quote: dict
_brokerd_portal: tractor._portal.Portal _brokerd_portal: tractor._portal.Portal
_index_stream: Optional[AsyncIterator[int]] = None _index_stream: Optional[AsyncIterator[int]] = None
@ -357,36 +353,6 @@ class Feed:
else: else:
yield self._index_stream yield self._index_stream
@asynccontextmanager
async def receive_trades_data(self) -> AsyncIterator[dict]:
if not getattr(self.mod, 'stream_trades', False):
log.warning(
f"{self.mod.name} doesn't have trade data support yet :(")
if not self._trade_stream:
raise RuntimeError(
f'Can not stream trade data from {self.mod.name}')
# NOTE: this can be faked by setting a rx chan
# using the ``_.set_fake_trades_stream()`` method
if self._trade_stream is None:
async with self._brokerd_portal.open_stream_from(
self.mod.stream_trades,
# do we need this? -> yes
# the broker side must declare this key
# in messages, though we could probably use
# more then one?
topics=['local_trades'],
) as self._trade_stream:
yield self._trade_stream
else:
yield self._trade_stream
def sym_to_shm_key( def sym_to_shm_key(
broker: str, broker: str,
@ -463,24 +429,29 @@ async def open_feed(
# no feed for broker exists so maybe spawn a data brokerd # no feed for broker exists so maybe spawn a data brokerd
async with maybe_spawn_brokerd( async with (
maybe_spawn_brokerd(
brokername, brokername,
loglevel=loglevel loglevel=loglevel
) as portal: ) as portal,
async with portal.open_stream_from( portal.open_context(
attach_feed_bus, attach_feed_bus,
brokername=brokername, brokername=brokername,
symbol=sym, symbol=sym,
loglevel=loglevel loglevel=loglevel
) as stream: ) as (ctx, (init_msg, first_quote)),
ctx.open_stream() as stream,
):
# TODO: can we make this work better with the proposed # TODO: can we make this work better with the proposed
# context based bidirectional streaming style api proposed in: # context based bidirectional streaming style api proposed in:
# https://github.com/goodboy/tractor/issues/53 # https://github.com/goodboy/tractor/issues/53
init_msg = await stream.receive() # init_msg = await stream.receive()
# we can only read from shm # we can only read from shm
shm = attach_shm_array( shm = attach_shm_array(
@ -493,6 +464,7 @@ async def open_feed(
stream=stream, stream=stream,
shm=shm, shm=shm,
mod=mod, mod=mod,
first_quote=first_quote,
_brokerd_portal=portal, _brokerd_portal=portal,
) )
ohlc_sample_rates = [] ohlc_sample_rates = []
@ -516,7 +488,8 @@ async def open_feed(
shm_token = data['shm_token'] shm_token = data['shm_token']
# XXX: msgspec won't relay through the tuples XD # XXX: msgspec won't relay through the tuples XD
shm_token['dtype_descr'] = list(map(tuple, shm_token['dtype_descr'])) shm_token['dtype_descr'] = list(
map(tuple, shm_token['dtype_descr']))
assert shm_token == shm.token # sanity assert shm_token == shm.token # sanity