Add better shared mem writer task checking

If you have a common broker feed daemon then likely you don't want to
create superfluous shared mem buffers for the same symbol. This adds an
ad hoc little context manger which keeps a bool state of whether
a buffer writer task currently is running in this process. Before we
were checking the shared array token cache and **not** clearing it when
the writer task exited, resulting in incorrect writer/loader logic on
the next entry..

Really, we need a better set of SC semantics around the shared mem stuff
presuming there's only ever one writer per shared buffer at given time.
Hopefully that will come soon!
bar_select
Tyler Goodlet 2020-10-15 15:02:42 -04:00
parent e0613675c7
commit 454b445b4b
2 changed files with 116 additions and 94 deletions

View File

@ -5,7 +5,7 @@ Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is
built on it) and thus actor aware API calls must be spawned with built on it) and thus actor aware API calls must be spawned with
``infected_aio==True``. ``infected_aio==True``.
""" """
from contextlib import asynccontextmanager from contextlib import asynccontextmanager, contextmanager
from dataclasses import asdict from dataclasses import asdict
from functools import partial from functools import partial
from typing import List, Dict, Any, Tuple, Optional, AsyncIterator, Callable from typing import List, Dict, Any, Tuple, Optional, AsyncIterator, Callable
@ -292,7 +292,7 @@ class Client:
ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts))
def push(t): def push(t):
log.debug(t) # log.debug(t)
try: try:
to_trio.send_nowait(t) to_trio.send_nowait(t)
except trio.BrokenResourceError: except trio.BrokenResourceError:
@ -497,6 +497,21 @@ def normalize(
return data return data
_local_buffer_writers = {}
@contextmanager
def activate_writer(key: str):
try:
writer_already_exists = _local_buffer_writers.get(key, False)
if not writer_already_exists:
_local_buffer_writers[key] = True
yield writer_already_exists
finally:
_local_buffer_writers.pop(key, None)
# TODO: figure out how to share quote feeds sanely despite # TODO: figure out how to share quote feeds sanely despite
# the wacky ``ib_insync`` api. # the wacky ``ib_insync`` api.
# @tractor.msg.pub # @tractor.msg.pub
@ -528,108 +543,113 @@ async def stream_quotes(
async with aclosing(stream): async with aclosing(stream):
# maybe load historical ohlcv in to shared mem # check if a writer already is alive in a streaming task,
# check if shm has already been created by previous # otherwise start one and mark it as now existing
# feed initialization with activate_writer(shm_token['shm_name']) as writer_already_exists:
writer_exists = get_shm_token(shm_token['shm_name'])
if not writer_exists: # maybe load historical ohlcv in to shared mem
shm = attach_shm_array( # check if shm has already been created by previous
token=shm_token, # feed initialization
# we are writer if not writer_already_exists:
readonly=False,
)
bars = await _trio_run_client_method(
method='bars',
symbol=sym,
)
shm.push(bars) shm = attach_shm_array(
shm_token = shm.token token=shm_token,
times = shm.array['time'] # we are the buffer writer
delay_s = times[-1] - times[times != times[-1]][-1] readonly=False,
subscribe_ohlc_for_increment(shm, delay_s) )
bars = await _trio_run_client_method(
method='bars',
symbol=sym,
)
# pass back token, and bool, signalling if we're the writer # write historical data to buffer
await ctx.send_yield((shm_token, not writer_exists)) shm.push(bars)
shm_token = shm.token
# first quote can be ignored as a 2nd with newer data is sent? times = shm.array['time']
first_ticker = await stream.__anext__() delay_s = times[-1] - times[times != times[-1]][-1]
quote = normalize(first_ticker) subscribe_ohlc_for_increment(shm, delay_s)
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash)
first_ticker.ticks = []
log.debug(f"First ticker received {quote}") # pass back token, and bool, signalling if we're the writer
await ctx.send_yield((shm_token, not writer_already_exists))
if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): # first quote can be ignored as a 2nd with newer data is sent?
suffix = 'exchange' first_ticker = await stream.__anext__()
quote = normalize(first_ticker)
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash)
first_ticker.ticks = []
calc_price = False # should be real volume for contract log.debug(f"First ticker received {quote}")
async for ticker in stream: if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex):
# spin consuming tickers until we get a real market datum suffix = 'exchange'
if not ticker.rtTime:
log.debug(f"New unsent ticker: {ticker}")
continue
else:
log.debug("Received first real volume tick")
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is truly stateful trash)
ticker.ticks = []
# XXX: this works because we don't use calc_price = False # should be real volume for contract
# ``aclosing()`` above?
break
else:
# commodities don't have an exchange name for some reason?
suffix = 'secType'
calc_price = True
ticker = first_ticker
con = quote['contract'] async for ticker in stream:
quote = normalize(ticker, calc_price=calc_price) # spin consuming tickers until we get a real market datum
topic = '.'.join((con['symbol'], con[suffix])).lower() if not ticker.rtTime:
first_quote = {topic: quote} log.debug(f"New unsent ticker: {ticker}")
ticker.ticks = [] continue
else:
log.debug("Received first real volume tick")
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is truly stateful trash)
ticker.ticks = []
# yield first quote asap # XXX: this works because we don't use
await ctx.send_yield(first_quote) # ``aclosing()`` above?
break
# real-time stream else:
async for ticker in stream: # commodities don't have an exchange name for some reason?
quote = normalize( suffix = 'secType'
ticker, calc_price = True
calc_price=calc_price ticker = first_ticker
)
# TODO: in theory you can send the IPC msg *before*
# writing to the sharedmem array to decrease latency,
# however, that will require `tractor.msg.pub` support
# here or at least some way to prevent task switching
# at the yield such that the array write isn't delayed
# while another consumer is serviced..
# if we are the lone tick writer start writing
# the buffer with appropriate trade data
if not writer_exists:
for tick in iterticks(quote, type='trade'):
last = tick['price']
# print(f'broker last: {tick}')
# update last entry
# benchmarked in the 4-5 us range
high, low = shm.array[-1][['high', 'low']]
shm.array[['high', 'low', 'close']][-1] = (
max(high, last),
min(low, last),
last,
)
con = quote['contract'] con = quote['contract']
quote = normalize(ticker, calc_price=calc_price)
topic = '.'.join((con['symbol'], con[suffix])).lower() topic = '.'.join((con['symbol'], con[suffix])).lower()
first_quote = {topic: quote}
await ctx.send_yield({topic: quote})
# ugh, clear ticks since we've consumed them
ticker.ticks = [] ticker.ticks = []
# yield first quote asap
await ctx.send_yield(first_quote)
# real-time stream
async for ticker in stream:
quote = normalize(
ticker,
calc_price=calc_price
)
# TODO: in theory you can send the IPC msg *before*
# writing to the sharedmem array to decrease latency,
# however, that will require `tractor.msg.pub` support
# here or at least some way to prevent task switching
# at the yield such that the array write isn't delayed
# while another consumer is serviced..
# if we are the lone tick writer start writing
# the buffer with appropriate trade data
if not writer_already_exists:
for tick in iterticks(quote, type='trade'):
last = tick['price']
# print(f'broker last: {tick}')
# update last entry
# benchmarked in the 4-5 us range
high, low = shm.array[-1][['high', 'low']]
shm.array[['high', 'low', 'close']][-1] = (
max(high, last),
min(low, last),
last,
)
con = quote['contract']
topic = '.'.join((con['symbol'], con[suffix])).lower()
await ctx.send_yield({topic: quote})
# ugh, clear ticks since we've consumed them
ticker.ticks = []

View File

@ -167,6 +167,7 @@ async def open_feed(
# Attempt to allocate (or attach to) shm array for this broker/symbol # Attempt to allocate (or attach to) shm array for this broker/symbol
shm, opened = maybe_open_shm_array( shm, opened = maybe_open_shm_array(
key=sym_to_shm_key(name, symbols[0]), key=sym_to_shm_key(name, symbols[0]),
# use any broker defined ohlc dtype: # use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_ohlc_dtype), dtype=getattr(mod, '_ohlc_dtype', base_ohlc_dtype),
@ -193,12 +194,13 @@ async def open_feed(
# tests are in? # tests are in?
shm_token, is_writer = await stream.receive() shm_token, is_writer = await stream.receive()
if opened:
assert is_writer
log.info("Started shared mem bar writer")
shm_token['dtype_descr'] = list(shm_token['dtype_descr']) shm_token['dtype_descr'] = list(shm_token['dtype_descr'])
assert shm_token == shm.token # sanity assert shm_token == shm.token # sanity
if is_writer:
log.info("Started shared mem bar writer")
yield Feed( yield Feed(
name=name, name=name,
stream=stream, stream=stream,