Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet 8c1905e35a Fix less-then-frame off by one slice, add db write toggle and disable 2022-05-06 13:09:30 -04:00
Tyler Goodlet d9e2666e80 More reliable `marketstored` + container supervision
It turns out (i guess not so shockingly?) that `marketstore` doesn't
always teardown "gracefully" under SIGINT (seems to hang if there are
open client connections which are also in the midst of teardown?) so
this instead first tries the SIGINT and then fails over to a SIGKILL
(destroy loop) which seems to be much more reliable to ensure shutdown
without any downside - in terms of a "hard kill".

Originally i was thinking the issue was root perms related (which get
relegated solely to the `marketstored` daemon actor after spawn) but
actually it was indeed the signalling / application layer causing the
hold-up/latency on teardown. There's a bunch of lingering (now
commented) code which tried to solve this non-problem as well as a bunch
logging/prints to help decipher the root of the issue - this will all
get cleaned out shortly.
2022-05-05 21:04:10 -04:00
Tyler Goodlet 1abe7d87a5 Include epoch timestamp in quote label for now 2022-05-05 17:09:17 -04:00
Tyler Goodlet 2f9d199a7f Handle ``iter_dts()`` already exhausted edge case 2022-05-03 19:07:32 -04:00
Tyler Goodlet 2f3418546f Label "humanized" sample period in window title-bar" 2022-05-03 17:27:38 -04:00
Tyler Goodlet 729c72a48f Add timeframe key to seconds map 2022-05-03 16:30:10 -04:00
Tyler Goodlet 324dcbbfb0 Always write newly pulled frames to tsdb 2022-05-03 16:22:01 -04:00
Tyler Goodlet 112cba43e5 Fix slice logic for less-then-frame tsdb overlap
When the tsdb has a last datum that is in the past less then a "frame's
worth" of sample steps we need to slice out only the data from the
latest frame that doesn't overlap; this fixes that slice logic..
Previously i dunno wth it was doing..
2022-05-03 16:01:02 -04:00
Tyler Goodlet 468cd3a381 Handle no sampler subs case on history broadcasts
When the market isn't open the feed layer won't create a subscriber
entry in the sampler broadcast loop and so if a manual call to
``broadcast()`` is made (like when trying to update a chart from
a history prepend) we need to handle that case and just broadcast
a random `-1` for now..BD
2022-05-03 13:52:23 -04:00
Tyler Goodlet e8aaf42cc6 Only assert if input array actually has a size 2022-05-03 13:51:29 -04:00
7 changed files with 302 additions and 121 deletions

View File

@ -30,7 +30,7 @@ from trio_typing import TaskStatus
import tractor import tractor
import docker import docker
import json import json
from docker.models.containers import Container from docker.models.containers import Container as DockerContainer
from docker.errors import DockerException, APIError from docker.errors import DockerException, APIError
from requests.exceptions import ConnectionError, ReadTimeout from requests.exceptions import ConnectionError, ReadTimeout
@ -133,6 +133,136 @@ async def open_docker(
c.kill() c.kill()
class Container:
'''
Wrapper around a ``docker.models.containers.Container`` to include
log capture and relay through our native logging system and helper
method(s) for cancellation/teardown.
'''
def __init__(
self,
cntr: DockerContainer,
) -> None:
self.cntr = cntr
# log msg de-duplication
self.seen_so_far = set()
async def process_logs_until(
self,
patt: str,
bp_on_msg: bool = False,
) -> bool:
'''
Attempt to capture container log messages and relay through our
native logging system.
'''
seen_so_far = self.seen_so_far
while True:
logs = self.cntr.logs()
entries = logs.decode().split('\n')
for entry in entries:
# ignore null lines
if not entry:
continue
try:
record = json.loads(entry.strip())
except json.JSONDecodeError:
if 'Error' in entry:
raise RuntimeError(entry)
raise
msg = record['msg']
level = record['level']
if msg and entry not in seen_so_far:
seen_so_far.add(entry)
if bp_on_msg:
await tractor.breakpoint()
getattr(log, level, log.error)(f'{msg}')
if patt in msg:
return True
# do a checkpoint so we don't block if cancelled B)
await trio.sleep(0.01)
return False
def try_signal(
self,
signal: str = 'SIGINT',
) -> bool:
try:
# XXX: market store doesn't seem to shutdown nicely all the
# time with this (maybe because there are still open grpc
# connections?) noticably after client connections have been
# made or are in use/teardown. It works just fine if you
# just start and stop the container tho?..
log.cancel(f'SENDING {signal} to {self.cntr.id}')
self.cntr.kill(signal)
return True
except docker.errors.APIError as err:
# _err = err
if 'is not running' in err.explanation:
return False
async def cancel(
self,
) -> None:
cid = self.cntr.id
self.try_signal('SIGINT')
with trio.move_on_after(0.5) as cs:
cs.shield = True
# print('PROCESSINGN LOGS')
await self.process_logs_until('initiating graceful shutdown')
# print('SHUTDOWN REPORTED BY CONTAINER')
await self.process_logs_until('exiting...',)
for _ in range(10):
with trio.move_on_after(0.5) as cs:
cs.shield = True
# print('waiting on EXITING')
await self.process_logs_until('exiting...',)
# print('got EXITING')
break
if cs.cancelled_caught:
# get out the big guns, bc apparently marketstore
# doesn't actually know how to terminate gracefully
# :eyeroll:...
self.try_signal('SIGKILL')
try:
log.info('Waiting on container shutdown: {cid}')
self.cntr.wait(
timeout=0.1,
condition='not-running',
)
break
except (
ReadTimeout,
ConnectionError,
):
log.error(f'failed to wait on container {cid}')
raise
else:
raise RuntimeError('Failed to cancel container {cid}')
log.cancel(f'Container stopped: {cid}')
@tractor.context @tractor.context
async def open_marketstored( async def open_marketstored(
ctx: tractor.Context, ctx: tractor.Context,
@ -175,7 +305,7 @@ async def open_marketstored(
type='bind', type='bind',
) )
cntr: Container = client.containers.run( dcntr: DockerContainer = client.containers.run(
'alpacamarkets/marketstore:latest', 'alpacamarkets/marketstore:latest',
# do we need this for cmds? # do we need this for cmds?
# '-i', # '-i',
@ -191,42 +321,10 @@ async def open_marketstored(
init=True, init=True,
# remove=True, # remove=True,
) )
try: cntr = Container(dcntr)
seen_so_far = set()
async def process_logs_until( with trio.move_on_after(1):
match: str, found = await cntr.process_logs_until(
bp_on_msg: bool = False,
):
logs = cntr.logs(stream=True)
for entry in logs:
entry = entry.decode()
try:
record = json.loads(entry.strip())
except json.JSONDecodeError:
if 'Error' in entry:
raise RuntimeError(entry)
msg = record['msg']
level = record['level']
if msg and entry not in seen_so_far:
seen_so_far.add(entry)
if bp_on_msg:
await tractor.breakpoint()
getattr(log, level, log.error)(f'{msg}')
# if "launching tcp listener for all services..." in msg:
if match in msg:
return True
# do a checkpoint so we don't block if cancelled B)
await trio.sleep(0)
return False
with trio.move_on_after(0.5):
found = await process_logs_until(
"launching tcp listener for all services...", "launching tcp listener for all services...",
) )
@ -235,33 +333,47 @@ async def open_marketstored(
'Failed to start `marketstore` check logs deats' 'Failed to start `marketstore` check logs deats'
) )
await ctx.started(cntr.id) await ctx.started((cntr.cntr.id, os.getpid()))
# block for the expected "teardown log msg".. # async with ctx.open_stream() as stream:
await process_logs_until('exiting...',)
try:
# TODO: we might eventually want a proxy-style msg-prot here
# to allow remote control of containers without needing
# callers to have root perms?
await trio.sleep_forever()
# await cntr.cancel()
# with trio.CancelScope(shield=True):
# # block for the expected "teardown log msg"..
# # await cntr.process_logs_until('exiting...',)
# # only msg should be to signal killing the
# # container and this super daemon.
# msg = await stream.receive()
# # print("GOT CANCEL MSG")
# cid = msg['cancel']
# log.cancel(f'Cancelling container {cid}')
# # print("CANCELLING CONTAINER")
# await cntr.cancel()
# # print("SENDING ACK")
# await stream.send('ack')
except ( except (
BaseException, BaseException,
# trio.Cancelled, # trio.Cancelled,
# KeyboardInterrupt, # KeyboardInterrupt,
): ):
cntr.kill('SIGINT')
with trio.move_on_after(0.5) as cs:
cs.shield = True
await process_logs_until('exiting...',)
raise
finally: with trio.CancelScope(shield=True):
try: await cntr.cancel()
cntr.wait( # await stream.send('ack')
timeout=0.5,
condition='not-running', raise
)
except (
ReadTimeout,
ConnectionError,
):
cntr.kill()
async def start_ahab( async def start_ahab(
@ -311,9 +423,18 @@ async def start_ahab(
open_marketstored, open_marketstored,
) as (ctx, first): ) as (ctx, first):
assert str(first) cid, pid = first
# run till cancelled
await trio.sleep_forever() await trio.sleep_forever()
# async with ctx.open_stream() as stream:
# try:
# # run till cancelled
# await trio.sleep_forever()
# finally:
# with trio.CancelScope(shield=True):
# # print('SENDING CANCEL TO MARKETSTORED')
# await stream.send({'cancel': (cid, pid)})
# assert await stream.receive() == 'ack'
# since we demoted root perms in this parent # since we demoted root perms in this parent
# we'll get a perms error on proc cleanup in # we'll get a perms error on proc cleanup in

View File

@ -146,13 +146,21 @@ async def broadcast(
# a given sample period. # a given sample period.
subs = sampler.subscribers.get(delay_s, ()) subs = sampler.subscribers.get(delay_s, ())
last = -1
if shm is None: if shm is None:
lowest = min(sampler.ohlcv_shms.keys()) periods = sampler.ohlcv_shms.keys()
# if this is an update triggered by a history update there
# might not actually be any sampling bus setup since there's
# no "live feed" active yet.
if periods:
lowest = min(periods)
shm = sampler.ohlcv_shms[lowest][0] shm = sampler.ohlcv_shms[lowest][0]
last = shm._last.value
for stream in subs: for stream in subs:
try: try:
await stream.send({'index': shm._last.value}) await stream.send({'index': last})
except ( except (
trio.BrokenResourceError, trio.BrokenResourceError,
trio.ClosedResourceError trio.ClosedResourceError

View File

@ -351,7 +351,7 @@ class ShmArray:
# tries to access ``.array`` (which due to the index # tries to access ``.array`` (which due to the index
# overlap will be empty). Pretty sure we've fixed it now # overlap will be empty). Pretty sure we've fixed it now
# but leaving this here as a reminder. # but leaving this here as a reminder.
if prepend and update_first: if prepend and update_first and length:
assert index < self._first.value assert index < self._first.value
if ( if (

View File

@ -21,6 +21,7 @@ from __future__ import annotations
from typing import Any from typing import Any
import decimal import decimal
from bidict import bidict
import numpy as np import numpy as np
from pydantic import BaseModel from pydantic import BaseModel
# from numba import from_dtype # from numba import from_dtype
@ -47,16 +48,16 @@ base_ohlc_dtype = np.dtype(ohlc_fields)
# https://github.com/numba/numba/issues/4511 # https://github.com/numba/numba/issues/4511
# numba_ohlc_dtype = from_dtype(base_ohlc_dtype) # numba_ohlc_dtype = from_dtype(base_ohlc_dtype)
# map time frame "keys" to minutes values # map time frame "keys" to seconds values
tf_in_1m = { tf_in_1s = bidict({
'1m': 1, 1: '1s',
'5m': 5, 60: '1m',
'15m': 15, 60*5: '5m',
'30m': 30, 60*15: '15m',
'1h': 60, 60*30: '30m',
'4h': 240, 60*60: '1h',
'1d': 1440, 60*60*24: '1d',
} })
def mk_fqsn( def mk_fqsn(

View File

@ -31,6 +31,7 @@ from typing import (
AsyncIterator, Optional, AsyncIterator, Optional,
Generator, Generator,
Awaitable, Awaitable,
TYPE_CHECKING,
) )
import trio import trio
@ -74,6 +75,8 @@ from ..brokers._util import (
DataUnavailable, DataUnavailable,
) )
if TYPE_CHECKING:
from .marketstore import Storage
log = get_logger(__name__) log = get_logger(__name__)
@ -208,28 +211,28 @@ def diff_history(
) -> np.ndarray: ) -> np.ndarray:
to_push = array
if last_tsdb_dt: if last_tsdb_dt:
s_diff = (start_dt - last_tsdb_dt).seconds s_diff = (start_dt - last_tsdb_dt).seconds
to_push = array[:s_diff]
# if we detect a partial frame's worth of data # if we detect a partial frame's worth of data
# that is new, slice out only that history and # that is new, slice out only that history and
# write to shm. # write to shm.
if abs(s_diff) < len(array): if (
s_diff < 0
and abs(s_diff) < len(array)
):
# the + 1 is because ``last_tsdb_dt`` is pulled from
# the last row entry for the ``'time'`` field retreived
# from the tsdb.
to_push = array[abs(s_diff)+1:]
log.info( log.info(
f'Pushing partial frame {to_push.size} to shm' f'Pushing partial frame {to_push.size} to shm'
) )
# assert last_tsdb_dt > start_dt
# selected = array['time'] > last_tsdb_dt.timestamp()
# to_push = array[selected]
# return to_push
return to_push return to_push
else:
return array
async def start_backfill( async def start_backfill(
mod: ModuleType, mod: ModuleType,
@ -237,6 +240,8 @@ async def start_backfill(
shm: ShmArray, shm: ShmArray,
last_tsdb_dt: Optional[datetime] = None, last_tsdb_dt: Optional[datetime] = None,
storage: Optional[Storage] = None,
write_tsdb: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -248,6 +253,17 @@ async def start_backfill(
# back to what is recorded in the tsdb # back to what is recorded in the tsdb
array, start_dt, end_dt = await hist(end_dt=None) array, start_dt, end_dt = await hist(end_dt=None)
times = array['time']
# sample period step size in seconds
step_size_s = (
pendulum.from_timestamp(times[-1]) -
pendulum.from_timestamp(times[-2])
).seconds
# "frame"'s worth of sample period steps in seconds
frame_size_s = len(array) * step_size_s
to_push = diff_history( to_push = diff_history(
array, array,
start_dt, start_dt,
@ -267,20 +283,13 @@ async def start_backfill(
# let caller unblock and deliver latest history frame # let caller unblock and deliver latest history frame
task_status.started((shm, start_dt, end_dt, bf_done)) task_status.started((shm, start_dt, end_dt, bf_done))
times = array['time']
step_size_s = (
pendulum.from_timestamp(times[-1]) -
pendulum.from_timestamp(times[-2])
).seconds
frame_size_s = len(to_push) * step_size_s
if last_tsdb_dt is None: if last_tsdb_dt is None:
# maybe a better default (they don't seem to define epoch?!) # maybe a better default (they don't seem to define epoch?!)
# based on the sample step size load a certain amount # based on the sample step size load a certain amount
# history # history
if step_size_s == 1: if step_size_s == 1:
last_tsdb_dt = pendulum.now().subtract(days=6) last_tsdb_dt = pendulum.now().subtract(days=2)
elif step_size_s == 60: elif step_size_s == 60:
last_tsdb_dt = pendulum.now().subtract(years=2) last_tsdb_dt = pendulum.now().subtract(years=2)
@ -363,7 +372,7 @@ async def start_backfill(
except NoData: except NoData:
log.warning( log.warning(
f'NO DATA for {frame_size_s}s frame @ {end_dt} ?!?' f'NO DATA for {frame_size_s}s frame @ {input_end_dt} ?!?'
) )
return None # discard signal return None # discard signal
@ -402,15 +411,27 @@ async def start_backfill(
) )
# reset dtrange gen to new start point # reset dtrange gen to new start point
try:
next_end = iter_dts_gen.send(start_dt) next_end = iter_dts_gen.send(start_dt)
log.info( log.info(
f'Reset frame index to start at {start_dt}\n' f'Reset frame index to start at {start_dt}\n'
f'Was at {next_end}' f'Was at {next_end}'
) )
# TODO: can we avoid this? # NOTE: manually set "earliest end datetime" index-value
# to avoid the request loop getting confused about
# new frames that are earlier in history - i.e. this
# **is not** the case of out-of-order frames from
# an async batch request.
earliest_end_dt = start_dt earliest_end_dt = start_dt
except StopIteration:
# gen already terminated meaning we probably already
# exhausted it via frame requests.
log.info(
"Datetime index already exhausted, can't reset.."
)
to_push = diff_history( to_push = diff_history(
array, array,
start_dt, start_dt,
@ -519,6 +540,7 @@ async def start_backfill(
break break
to_push, start_dt, end_dt = frames.pop(epoch) to_push, start_dt, end_dt = frames.pop(epoch)
ln = len(to_push)
# bail gracefully on shm allocation overrun/full condition # bail gracefully on shm allocation overrun/full condition
try: try:
@ -527,19 +549,30 @@ async def start_backfill(
log.info( log.info(
f'Shm buffer overrun on: {start_dt} -> {end_dt}?' f'Shm buffer overrun on: {start_dt} -> {end_dt}?'
) )
# await tractor.breakpoint()
break break
log.info( log.info(
f'Shm pushed {len(to_push)} frame:\n' f'Shm pushed {ln} frame:\n'
f'{start_dt} -> {end_dt}' f'{start_dt} -> {end_dt}'
) )
# keep track of most recent "prepended" ``start_dt`` # keep track of most recent "prepended" ``start_dt``
# both for detecting gaps and ensuring async # both for detecting gaps and ensuring async
# frame-result order. # frame-result order.
earliest_end_dt = start_dt earliest_end_dt = start_dt
if (
storage is not None
and write_tsdb
):
log.info(
f'Writing {ln} frame to storage:\n'
f'{start_dt} -> {end_dt}'
)
await storage.write_ohlcv(
f'{bfqsn}.{mod.name}', # lul..
to_push,
)
# TODO: can we only trigger this if the respective # TODO: can we only trigger this if the respective
# history in "in view"?!? # history in "in view"?!?
# XXX: extremely important, there can be no checkpoints # XXX: extremely important, there can be no checkpoints
@ -608,7 +641,7 @@ async def manage_history(
# shm backfiller approach below. # shm backfiller approach below.
# start history anal and load missing new data via backend. # start history anal and load missing new data via backend.
series, first_dt, last_dt = await storage.load(fqsn) series, _, last_tsdb_dt = await storage.load(fqsn)
broker, symbol, expiry = unpack_fqsn(fqsn) broker, symbol, expiry = unpack_fqsn(fqsn)
( (
@ -622,7 +655,8 @@ async def manage_history(
mod, mod,
bfqsn, bfqsn,
shm, shm,
last_tsdb_dt=last_dt, last_tsdb_dt=last_tsdb_dt,
storage=storage,
) )
) )
@ -643,8 +677,10 @@ async def manage_history(
# do diff against last start frame of history and only fill # do diff against last start frame of history and only fill
# in from the tsdb an allotment that allows for most recent # in from the tsdb an allotment that allows for most recent
# to be loaded into mem *before* tsdb data. # to be loaded into mem *before* tsdb data.
if last_dt: if last_tsdb_dt:
dt_diff_s = (latest_start_dt - last_dt).seconds dt_diff_s = (
latest_start_dt - last_tsdb_dt
).seconds
else: else:
dt_diff_s = 0 dt_diff_s = 0
@ -673,7 +709,7 @@ async def manage_history(
field_map=marketstore.ohlc_key_map, field_map=marketstore.ohlc_key_map,
) )
# load as much from storage into shm as spacec will # load as much from storage into shm as space will
# allow according to user's shm size settings. # allow according to user's shm size settings.
count = 0 count = 0
end = fastest['Epoch'][0] end = fastest['Epoch'][0]
@ -698,15 +734,11 @@ async def manage_history(
prepend=True, prepend=True,
# update_first=False, # update_first=False,
# start=prepend_start, # start=prepend_start,
field_map={ field_map=marketstore.ohlc_key_map,
'Epoch': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
},
) )
# manually trigger step update to update charts/fsps
# which need an incremental update.
for delay_s in sampler.subscribers: for delay_s in sampler.subscribers:
await broadcast(delay_s) await broadcast(delay_s)

View File

@ -191,6 +191,9 @@ class ContentsLabel(pg.LabelItem):
self.setText( self.setText(
"<b>i</b>:{index}<br/>" "<b>i</b>:{index}<br/>"
# NB: these fields must be indexed in the correct order via
# the slice syntax below.
"<b>epoch</b>:{}<br/>"
"<b>O</b>:{}<br/>" "<b>O</b>:{}<br/>"
"<b>H</b>:{}<br/>" "<b>H</b>:{}<br/>"
"<b>L</b>:{}<br/>" "<b>L</b>:{}<br/>"
@ -198,7 +201,15 @@ class ContentsLabel(pg.LabelItem):
"<b>V</b>:{}<br/>" "<b>V</b>:{}<br/>"
"<b>wap</b>:{}".format( "<b>wap</b>:{}".format(
*array[index - first][ *array[index - first][
['open', 'high', 'low', 'close', 'volume', 'bar_wap'] [
'time',
'open',
'high',
'low',
'close',
'volume',
'bar_wap',
]
], ],
name=name, name=name,
index=index, index=index,

View File

@ -29,6 +29,7 @@ from typing import Optional, Any, Callable
import numpy as np import numpy as np
import tractor import tractor
import trio import trio
import pendulum
import pyqtgraph as pg import pyqtgraph as pg
from .. import brokers from .. import brokers
@ -47,6 +48,7 @@ from ._fsp import (
open_vlm_displays, open_vlm_displays,
) )
from ..data._sharedmem import ShmArray from ..data._sharedmem import ShmArray
from ..data._source import tf_in_1s
from ._forms import ( from ._forms import (
FieldsForm, FieldsForm,
mk_order_pane_layout, mk_order_pane_layout,
@ -660,11 +662,17 @@ async def display_symbol_data(
symbol = feed.symbols[sym] symbol = feed.symbols[sym]
fqsn = symbol.front_fqsn() fqsn = symbol.front_fqsn()
times = bars['time']
end = pendulum.from_timestamp(times[-1])
start = pendulum.from_timestamp(times[times != times[-1]][-1])
step_size_s = (end - start).seconds
tf_key = tf_in_1s[step_size_s]
# load in symbol's ohlc data # load in symbol's ohlc data
godwidget.window.setWindowTitle( godwidget.window.setWindowTitle(
f'{fqsn} ' f'{fqsn} '
f'tick:{symbol.tick_size} ' f'tick:{symbol.tick_size} '
f'step:1s ' f'step:{tf_key} '
) )
linked = godwidget.linkedsplits linked = godwidget.linkedsplits