Add multi-symbol-buffer increment support

bar_select
Tyler Goodlet 2020-09-25 15:16:58 -04:00
parent 41e85ccaa9
commit e3e219aa4b
3 changed files with 83 additions and 55 deletions

View File

@ -23,10 +23,13 @@ from ._sharedmem import (
maybe_open_shm_array, maybe_open_shm_array,
attach_shm_array, attach_shm_array,
open_shm_array, open_shm_array,
SharedArray, ShmArray,
get_shm_token, get_shm_token,
) )
from ._buffer import incr_buffer from ._buffer import (
increment_ohlc_buffer,
subscribe_ohlc_for_increment
)
__all__ = [ __all__ = [
@ -35,7 +38,7 @@ __all__ = [
'attach_shm_array', 'attach_shm_array',
'open_shm_array', 'open_shm_array',
'get_shm_token', 'get_shm_token',
'incr_buffer', 'subscribe_ohlc_for_increment',
] ]
@ -115,7 +118,7 @@ class Feed:
""" """
name: str name: str
stream: AsyncIterator[Dict[str, Any]] stream: AsyncIterator[Dict[str, Any]]
shm: SharedArray shm: ShmArray
_broker_portal: tractor._portal.Portal _broker_portal: tractor._portal.Portal
_index_stream: Optional[AsyncIterator[Dict[str, Any]]] = None _index_stream: Optional[AsyncIterator[Dict[str, Any]]] = None
@ -129,7 +132,7 @@ class Feed:
# created for all practical purposes # created for all practical purposes
self._index_stream = await self._broker_portal.run( self._index_stream = await self._broker_portal.run(
'piker.data', 'piker.data',
'incr_buffer', 'increment_ohlc_buffer',
shm_token=self.shm.token, shm_token=self.shm.token,
topics=['index'], topics=['index'],
) )

View File

@ -1,54 +1,73 @@
""" """
Data buffers for fast shared humpy. Data buffers for fast shared humpy.
""" """
from typing import Tuple, Callable from typing import Tuple, Callable, Dict
import time # import time
import tractor import tractor
import trio import trio
from ._sharedmem import attach_shm_array from ._sharedmem import ShmArray
_shms: Dict[int, ShmArray] = {}
@tractor.msg.pub @tractor.msg.pub
async def incr_buffer( async def increment_ohlc_buffer(
shm_token: dict, shm_token: dict,
get_topics: Callable[..., Tuple[str]], get_topics: Callable[..., Tuple[str]],
# delay_s: Optional[float] = None, # delay_s: Optional[float] = None,
): ):
"""Task which inserts new bars into the provide shared memory array """Task which inserts new bars into the provide shared memory array
every ``delay_s`` seconds. every ``delay_s`` seconds.
This task fulfills 2 purposes:
- it takes the subscribed set of shm arrays and increments them
on a common time period
- broadcast of this increment "signal" message to other actor
subscribers
Note that if **no** actor has initiated this task then **none** of
the underlying buffers will actually be incremented.
""" """
# TODO: right now we'll spin printing bars if the last time # TODO: right now we'll spin printing bars if the last time stamp is
# stamp is before a large period of no market activity. # before a large period of no market activity. Likely the best way
# Likely the best way to solve this is to make this task # to solve this is to make this task aware of the instrument's
# aware of the instrument's tradable hours? # tradable hours?
shm = attach_shm_array(
token=shm_token,
readonly=False,
)
# determine ohlc delay between bars
# to determine time step between datums
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
# adjust delay to compensate for trio processing time # adjust delay to compensate for trio processing time
ad = delay_s - 0.002 ad = min(_shms.keys()) - 0.001
async def sleep(): # async def sleep():
"""Sleep until next time frames worth has passed from last bar. # """Sleep until next time frames worth has passed from last bar.
""" # """
# last_ts = shm.array[-1]['time'] # # last_ts = shm.array[-1]['time']
# delay = max((last_ts + ad) - time.time(), 0) # # delay = max((last_ts + ad) - time.time(), 0)
# await trio.sleep(delay) # # await trio.sleep(delay)
await trio.sleep(ad) # await trio.sleep(ad)
total_s = 0 # total seconds counted
lowest = min(_shms.keys())
ad = lowest - 0.001
while True: while True:
# sleep for duration of current bar # TODO: do we want to support dynamically
await sleep() # adding a "lower" lowest increment period?
await trio.sleep(ad)
total_s += lowest
# # sleep for duration of current bar
# await sleep()
# increment all subscribed shm arrays
# TODO: this in ``numba``
for delay_s, shms in _shms.items():
if total_s % delay_s != 0:
continue
# TODO: numa this!
for shm in shms:
# TODO: in theory we could make this faster by copying the # TODO: in theory we could make this faster by copying the
# "last" readable value into the underlying larger buffer's # "last" readable value into the underlying larger buffer's
# next value and then incrementing the counter instead of # next value and then incrementing the counter instead of
@ -66,9 +85,15 @@ async def incr_buffer(
# write to the buffer # write to the buffer
shm.push(last) shm.push(last)
# print('incrementing array')
# print(get_topics())
# broadcast the buffer index step # broadcast the buffer index step
yield {'index': shm._i.value} yield {'index': shm._i.value}
def subscribe_ohlc_for_increment(
shm: ShmArray,
delay: int,
) -> None:
"""Add an OHLC ``ShmArray`` to the increment set.
"""
_shms.setdefault(delay, []).append(shm)

View File

@ -134,7 +134,7 @@ def _make_token(
) )
class SharedArray: class ShmArray:
def __init__( def __init__(
self, self,
shmarr: np.ndarray, shmarr: np.ndarray,
@ -216,7 +216,7 @@ def open_shm_array(
size: int = int(2*60*60*10/5), size: int = int(2*60*60*10/5),
dtype: np.dtype = base_ohlc_dtype, dtype: np.dtype = base_ohlc_dtype,
readonly: bool = False, readonly: bool = False,
) -> SharedArray: ) -> ShmArray:
"""Open a memory shared ``numpy`` using the standard library. """Open a memory shared ``numpy`` using the standard library.
This call unlinks (aka permanently destroys) the buffer on teardown This call unlinks (aka permanently destroys) the buffer on teardown
@ -245,7 +245,7 @@ def open_shm_array(
) )
counter.value = 0 counter.value = 0
shmarr = SharedArray( shmarr = ShmArray(
array, array,
counter, counter,
shm, shm,
@ -268,7 +268,7 @@ def attach_shm_array(
size: int = int(60*60*10/5), size: int = int(60*60*10/5),
# dtype: np.dtype = base_ohlc_dtype, # dtype: np.dtype = base_ohlc_dtype,
readonly: bool = True, readonly: bool = True,
) -> SharedArray: ) -> ShmArray:
"""Load and attach to an existing shared memory array previously """Load and attach to an existing shared memory array previously
created by another process using ``open_shared_array``. created by another process using ``open_shared_array``.
""" """
@ -289,7 +289,7 @@ def attach_shm_array(
# make sure we can read # make sure we can read
counter.value counter.value
sha = SharedArray( sha = ShmArray(
shmarr, shmarr,
counter, counter,
shm, shm,
@ -314,7 +314,7 @@ def maybe_open_shm_array(
key: str, key: str,
dtype: np.dtype = base_ohlc_dtype, dtype: np.dtype = base_ohlc_dtype,
**kwargs, **kwargs,
) -> Tuple[SharedArray, bool]: ) -> Tuple[ShmArray, bool]:
"""Attempt to attach to a shared memory block by a """Attempt to attach to a shared memory block by a
"key" determined by the users overall "system" "key" determined by the users overall "system"
(presumes you don't have the block's explicit token). (presumes you don't have the block's explicit token).