Hook IB up to shared memory system

Adjust the `data.open_feed()` api to take a shm token so the
broker-daemon can attach a previously created (by the parent actor) mem
buf and push real-time tick data. There's still some sloppiness here in
terms of ensuring only one mem buf per symbol (can be seen in
`stream_quotes()`) which should really managed at the data api level.
Add a bar incrementing stream-task which delivers increment msgs to any
consumers.
bar_select
Tyler Goodlet 2020-09-17 09:12:05 -04:00
parent 17491ba819
commit f872fbecf8
3 changed files with 200 additions and 58 deletions

View File

@ -25,7 +25,10 @@ import trio
import tractor import tractor
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import maybe_spawn_brokerd from ..data import (
maybe_spawn_brokerd, iterticks, attach_shared_array,
incr_buffer,
)
from ..ui._source import from_df from ..ui._source import from_df
@ -104,7 +107,6 @@ _adhoc_cmdty_data_map = {
# NOTE: cmdtys don't have trade data: # NOTE: cmdtys don't have trade data:
# https://groups.io/g/twsapi/message/44174 # https://groups.io/g/twsapi/message/44174
'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}), 'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}),
'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}),
} }
@ -143,7 +145,7 @@ class Client:
# durationStr='1 D', # durationStr='1 D',
# time length calcs # time length calcs
durationStr='{count} S'.format(count=3000 * 5), durationStr='{count} S'.format(count=5000 * 5),
barSizeSetting='5 secs', barSizeSetting='5 secs',
# always use extended hours # always use extended hours
@ -487,6 +489,7 @@ def normalize(
# @tractor.msg.pub # @tractor.msg.pub
async def stream_quotes( async def stream_quotes(
symbols: List[str], symbols: List[str],
shared_array_token: Tuple[str, str],
loglevel: str = None, loglevel: str = None,
# compat for @tractor.msg.pub # compat for @tractor.msg.pub
topics: Any = None, topics: Any = None,
@ -508,26 +511,25 @@ async def stream_quotes(
method='stream_ticker', method='stream_ticker',
symbol=sym, symbol=sym,
) )
async with get_client() as client:
bars = await client.bars(symbol=sym)
async with aclosing(stream): async with aclosing(stream):
# first quote can be ignored as a 2nd with newer data is sent? # first quote can be ignored as a 2nd with newer data is sent?
first_ticker = await stream.__anext__() first_ticker = await stream.__anext__()
quote = normalize(first_ticker)
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash)
first_ticker.ticks = []
log.debug(f"First ticker received {quote}")
if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex):
suffix = 'exchange' suffix = 'exchange'
calc_price = False # should be real volume for contract calc_price = False # should be real volume for contract
quote = normalize(first_ticker)
log.debug(f"First ticker received {quote}")
con = quote['contract']
topic = '.'.join((con['symbol'], con[suffix])).lower()
yield {topic: quote}
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash)
first_ticker.ticks = []
async for ticker in stream: async for ticker in stream:
# spin consuming tickers until we get a real market datum # spin consuming tickers until we get a real market datum
if not ticker.rtTime: if not ticker.rtTime:
@ -535,10 +537,6 @@ async def stream_quotes(
continue continue
else: else:
log.debug("Received first real volume tick") log.debug("Received first real volume tick")
quote = normalize(ticker)
topic = '.'.join((con['symbol'], con[suffix])).lower()
yield {topic: quote}
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash) # (ahem, ib_insync is stateful trash)
ticker.ticks = [] ticker.ticks = []
@ -550,28 +548,65 @@ async def stream_quotes(
# commodities don't have an exchange name for some reason? # commodities don't have an exchange name for some reason?
suffix = 'secType' suffix = 'secType'
calc_price = True calc_price = True
ticker = first_ticker
async for ticker in stream: con = quote['contract']
quote = normalize( quote = normalize(ticker, calc_price=calc_price)
ticker, topic = '.'.join((con['symbol'], con[suffix])).lower()
calc_price=calc_price first_quote = {topic: quote}
) ticker.ticks = []
con = quote['contract']
topic = '.'.join((con['symbol'], con[suffix])).lower()
yield {topic: quote}
# ugh, clear ticks since we've consumed them # load historical ohlcv in to shared mem
ticker.ticks = [] ss = tractor.current_actor().statespace
existing_shm = ss.get(f'ib_shm.{sym}')
if not existing_shm:
readonly = False
else:
readonly = True
shm = existing_shm
with attach_shared_array(
token=shared_array_token,
readonly=readonly
) as shm:
if not existing_shm:
shm.push(bars)
ss[f'ib_shm.{sym}'] = shm
if __name__ == '__main__': yield (first_quote, shm.token)
import sys else:
sym = sys.argv[1] yield (first_quote, None)
contract = asyncio.run( async for ticker in stream:
_aio_run_client_method( quote = normalize(
'find_contract', ticker,
symbol=sym, calc_price=calc_price
) )
) # TODO: in theory you can send the IPC msg *before*
print(contract) # writing to the sharedmem array to decrease latency,
# however, that will require `tractor.msg.pub` support
# here or at least some way to prevent task switching
# at the yield such that the array write isn't delayed
# while another consumer is serviced..
# if we are the lone tick writer
if not existing_shm:
for tick in iterticks(quote, type='trade'):
last = tick['price']
# print(f'broker last: {tick}')
# update last entry
# benchmarked in the 4-5 us range
high, low = shm.array[-1][['high', 'low']]
shm.array[['high', 'low', 'close']][-1] = (
max(high, last),
min(low, last),
last,
)
con = quote['contract']
topic = '.'.join((con['symbol'], con[suffix])).lower()
yield {topic: quote}
# ugh, clear ticks since we've consumed them
ticker.ticks = []

View File

@ -18,6 +18,20 @@ import tractor
from ..brokers import get_brokermod from ..brokers import get_brokermod
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ._normalize import iterticks
from ._sharedmem import (
maybe_open_shared_array, attach_shared_array, open_shared_array,
)
from ._buffer import incr_buffer
__all__ = [
'maybe_open_shared_array',
'attach_shared_array',
'open_shared_array',
'iterticks',
'incr_buffer',
]
log = get_logger(__name__) log = get_logger(__name__)
@ -27,7 +41,7 @@ __ingestors__ = [
] ]
def get_ingestor(name: str) -> ModuleType: def get_ingestormod(name: str) -> ModuleType:
"""Return the imported ingestor module by name. """Return the imported ingestor module by name.
""" """
module = import_module('.' + name, 'piker.data') module = import_module('.' + name, 'piker.data')
@ -39,6 +53,7 @@ def get_ingestor(name: str) -> ModuleType:
_data_mods = [ _data_mods = [
'piker.brokers.core', 'piker.brokers.core',
'piker.brokers.data', 'piker.brokers.data',
'piker.data',
] ]
@ -100,22 +115,40 @@ async def open_feed(
if loglevel is None: if loglevel is None:
loglevel = tractor.current_actor().loglevel loglevel = tractor.current_actor().loglevel
async with maybe_spawn_brokerd( with maybe_open_shared_array(
mod.name, name=f'{name}.{symbols[0]}.buf',
loglevel=loglevel, readonly=True, # we expect the sub-actor to write
) as portal: ) as shmarr:
stream = await portal.run( async with maybe_spawn_brokerd(
mod.__name__, mod.name,
'stream_quotes', loglevel=loglevel,
symbols=symbols, ) as portal:
topics=symbols, stream = await portal.run(
) mod.__name__,
# Feed is required to deliver an initial quote asap. 'stream_quotes',
# TODO: should we timeout and raise a more explicit error? symbols=symbols,
# with trio.fail_after(5): shared_array_token=shmarr.token,
with trio.fail_after(float('inf')): topics=symbols,
# Retreive initial quote for each symbol )
# such that consumer code can know the data layout # Feed is required to deliver an initial quote asap.
first_quote = await stream.__anext__() # TODO: should we timeout and raise a more explicit error?
log.info(f"Received first quote {first_quote}") # with trio.fail_after(5):
yield (first_quote, stream) with trio.fail_after(float('inf')):
# Retreive initial quote for each symbol
# such that consumer code can know the data layout
first_quote, child_shmarr_token = await stream.__anext__()
log.info(f"Received first quote {first_quote}")
if child_shmarr_token is not None:
# we are the buffer writer task
increment_stream = await portal.run(
'piker.data',
'incr_buffer',
shm_token=child_shmarr_token,
)
assert child_shmarr_token == shmarr.token
else:
increment_stream = None
yield (first_quote, stream, increment_stream, shmarr)

View File

@ -0,0 +1,74 @@
"""
Data buffers for fast shared humpy.
"""
import time
import tractor
import numpy as np
import trio
from ._sharedmem import SharedArray, attach_shared_array
@tractor.stream
async def incr_buffer(
ctx: tractor.Context,
shm_token: str,
# delay_s: Optional[float] = None,
):
"""Task which inserts new bars into the provide shared memory array
every ``delay_s`` seconds.
"""
# TODO: right now we'll spin printing bars if the last time
# stamp is before a large period of no market activity.
# Likely the best way to solve this is to make this task
# aware of the instrument's tradable hours?
with attach_shared_array(
token=shm_token,
readonly=False,
) as shm:
# determine ohlc delay between bars
# to determine time step between datums
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
# adjust delay to compensate for trio processing time
ad = delay_s - 0.002
async def sleep():
"""Sleep until next time frames worth has passed from last bar.
"""
# last_ts = shm.array[-1]['time']
# delay = max((last_ts + ad) - time.time(), 0)
# await trio.sleep(delay)
await trio.sleep(ad)
while True:
# sleep for duration of current bar
await sleep()
# append new entry to buffer thus "incrementing" the bar
array = shm.array
last = array[-1:].copy()
# last = np.array(last, dtype=array.dtype)
# shm.push(last)
# array = shm.array
# last = array[-1].copy()
(index, t, close) = last[0][['index', 'time', 'close']]
# new = np.array(last, dtype=array.dtype)
# this copies non-std fields (eg. vwap) from the last datum
last[
['index', 'time', 'volume', 'open', 'high', 'low', 'close']
][0] = (index + 1, t + delay_s, 0, close, close, close, close)
# write to the buffer
print('incrementing array')
# await tractor.breakpoint()
shm.push(last)
# yield the new buffer index value
await ctx.send_yield(shm._i.value)