It's a map of symbols to first quote dicts

fsp_feeds
Tyler Goodlet 2021-08-28 10:45:23 -04:00
parent 8c7e4c0ce9
commit 03c38a1163
1 changed files with 10 additions and 10 deletions

View File

@ -192,7 +192,7 @@ async def allocate_persistent_feed(
# establish broker backend quote stream # establish broker backend quote stream
# ``stream_quotes()`` is a required backend func # ``stream_quotes()`` is a required backend func
init_msg, first_quote = await bus.nursery.start( init_msg, first_quotes = await bus.nursery.start(
partial( partial(
mod.stream_quotes, mod.stream_quotes,
send_chan=send, send_chan=send,
@ -212,7 +212,7 @@ async def allocate_persistent_feed(
# XXX: the ``symbol`` here is put into our native piker format (i.e. # XXX: the ``symbol`` here is put into our native piker format (i.e.
# lower case). # lower case).
bus.feeds[symbol.lower()] = (cs, init_msg, first_quote) bus.feeds[symbol.lower()] = (cs, init_msg, first_quotes)
if opened: if opened:
# start history backfill task ``backfill_bars()`` is # start history backfill task ``backfill_bars()`` is
@ -227,7 +227,7 @@ async def allocate_persistent_feed(
init_msg[symbol]['sample_rate'] = int(delay_s) init_msg[symbol]['sample_rate'] = int(delay_s)
# yield back control to starting nursery # yield back control to starting nursery
task_status.started((init_msg, first_quote)) task_status.started((init_msg, first_quotes))
await feed_is_live.wait() await feed_is_live.wait()
@ -277,7 +277,7 @@ async def attach_feed_bus(
# service nursery # service nursery
async with bus.task_lock: async with bus.task_lock:
if entry is None: if entry is None:
init_msg, first_quote = await bus.nursery.start( init_msg, first_quotes = await bus.nursery.start(
partial( partial(
allocate_persistent_feed, allocate_persistent_feed,
@ -294,13 +294,13 @@ async def attach_feed_bus(
) )
assert isinstance(bus.feeds[symbol], tuple) assert isinstance(bus.feeds[symbol], tuple)
# XXX: ``first_quote`` may be outdated here if this is secondary # XXX: ``first_quotes`` may be outdated here if this is secondary
# subscriber # subscriber
cs, init_msg, first_quote = bus.feeds[symbol] cs, init_msg, first_quotes = bus.feeds[symbol]
# send this even to subscribers to existing feed? # send this even to subscribers to existing feed?
# deliver initial info message a first quote asap # deliver initial info message a first quote asap
await ctx.started((init_msg, first_quote)) await ctx.started((init_msg, first_quotes))
async with ( async with (
ctx.open_stream() as stream, ctx.open_stream() as stream,
@ -392,7 +392,7 @@ class Feed:
name: str name: str
shm: ShmArray shm: ShmArray
mod: ModuleType mod: ModuleType
first_quote: dict first_quotes: dict # symbol names to first quote dicts
stream: trio.abc.ReceiveChannel[dict[str, Any]] stream: trio.abc.ReceiveChannel[dict[str, Any]]
_brokerd_portal: tractor._portal.Portal _brokerd_portal: tractor._portal.Portal
@ -509,7 +509,7 @@ async def open_feed(
tick_throttle=tick_throttle, tick_throttle=tick_throttle,
) as (ctx, (init_msg, first_quote)), ) as (ctx, (init_msg, first_quotes)),
ctx.open_stream() as stream, ctx.open_stream() as stream,
@ -524,7 +524,7 @@ async def open_feed(
name=brokername, name=brokername,
shm=shm, shm=shm,
mod=mod, mod=mod,
first_quote=first_quote, first_quotes=first_quotes,
stream=stream, stream=stream,
_brokerd_portal=portal, _brokerd_portal=portal,
) )