Sub each new symbol to shm incrementing

bar_select
Tyler Goodlet 2020-09-25 15:19:33 -04:00
parent e3e219aa4b
commit 8832804bab
1 changed files with 18 additions and 8 deletions

View File

@ -8,7 +8,7 @@ built on it) and thus actor aware API calls must be spawned with
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import asdict from dataclasses import asdict
from functools import partial from functools import partial
from typing import List, Dict, Any, Tuple, Optional, AsyncGenerator, Callable from typing import List, Dict, Any, Tuple, Optional, AsyncIterator, Callable
import asyncio import asyncio
import logging import logging
import inspect import inspect
@ -29,7 +29,8 @@ from ..data import (
maybe_spawn_brokerd, maybe_spawn_brokerd,
iterticks, iterticks,
attach_shm_array, attach_shm_array,
get_shm_token get_shm_token,
subscribe_ohlc_for_increment,
) )
from ..ui._source import from_df from ..ui._source import from_df
@ -147,7 +148,7 @@ class Client:
# durationStr='1 D', # durationStr='1 D',
# time length calcs # time length calcs
durationStr='{count} S'.format(count=1000 * 5), durationStr='{count} S'.format(count=100 * 5),
barSizeSetting='5 secs', barSizeSetting='5 secs',
# always use extended hours # always use extended hours
@ -494,14 +495,16 @@ def normalize(
# TODO: figure out how to share quote feeds sanely despite # TODO: figure out how to share quote feeds sanely despite
# the wacky ``ib_insync`` api. # the wacky ``ib_insync`` api.
# @tractor.msg.pub # @tractor.msg.pub
@tractor.stream
async def stream_quotes( async def stream_quotes(
ctx: tractor.Context,
symbols: List[str], symbols: List[str],
shm_token: Tuple[str, str, List[tuple]], shm_token: Tuple[str, str, List[tuple]],
loglevel: str = None, loglevel: str = None,
# compat for @tractor.msg.pub # compat for @tractor.msg.pub
topics: Any = None, topics: Any = None,
get_topics: Callable = None, get_topics: Callable = None,
) -> AsyncGenerator[str, Dict[str, Any]]: ) -> AsyncIterator[Dict[str, Any]]:
"""Stream symbol quotes. """Stream symbol quotes.
This is a ``trio`` callable routine meant to be invoked This is a ``trio`` callable routine meant to be invoked
@ -539,8 +542,12 @@ async def stream_quotes(
shm.push(bars) shm.push(bars)
shm_token = shm.token shm_token = shm.token
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
subscribe_ohlc_for_increment(shm, delay_s)
# pass back token, and bool, signalling if we're the writer # pass back token, and bool, signalling if we're the writer
yield shm_token, not writer_exists await ctx.send_yield((shm_token, not writer_exists))
# first quote can be ignored as a 2nd with newer data is sent? # first quote can be ignored as a 2nd with newer data is sent?
first_ticker = await stream.__anext__() first_ticker = await stream.__anext__()
@ -581,9 +588,11 @@ async def stream_quotes(
topic = '.'.join((con['symbol'], con[suffix])).lower() topic = '.'.join((con['symbol'], con[suffix])).lower()
first_quote = {topic: quote} first_quote = {topic: quote}
ticker.ticks = [] ticker.ticks = []
# yield first quote asap
yield first_quote
# yield first quote asap
await ctx.send_yield(first_quote)
# real-time stream
async for ticker in stream: async for ticker in stream:
quote = normalize( quote = normalize(
ticker, ticker,
@ -614,7 +623,8 @@ async def stream_quotes(
con = quote['contract'] con = quote['contract']
topic = '.'.join((con['symbol'], con[suffix])).lower() topic = '.'.join((con['symbol'], con[suffix])).lower()
yield {topic: quote}
await ctx.send_yield({topic: quote})
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
ticker.ticks = [] ticker.ticks = []