Compare commits

..

No commits in common. "8c1905e35a00a6325753e058182f83c8b050823a" and "1967bc79737cbfc5e5158fe4294e1208fd9ad2d7" have entirely different histories.

7 changed files with 121 additions and 302 deletions

View File

@ -30,7 +30,7 @@ from trio_typing import TaskStatus
import tractor import tractor
import docker import docker
import json import json
from docker.models.containers import Container as DockerContainer from docker.models.containers import Container
from docker.errors import DockerException, APIError from docker.errors import DockerException, APIError
from requests.exceptions import ConnectionError, ReadTimeout from requests.exceptions import ConnectionError, ReadTimeout
@ -133,136 +133,6 @@ async def open_docker(
c.kill() c.kill()
class Container:
'''
Wrapper around a ``docker.models.containers.Container`` to include
log capture and relay through our native logging system and helper
method(s) for cancellation/teardown.
'''
def __init__(
self,
cntr: DockerContainer,
) -> None:
self.cntr = cntr
# log msg de-duplication
self.seen_so_far = set()
async def process_logs_until(
self,
patt: str,
bp_on_msg: bool = False,
) -> bool:
'''
Attempt to capture container log messages and relay through our
native logging system.
'''
seen_so_far = self.seen_so_far
while True:
logs = self.cntr.logs()
entries = logs.decode().split('\n')
for entry in entries:
# ignore null lines
if not entry:
continue
try:
record = json.loads(entry.strip())
except json.JSONDecodeError:
if 'Error' in entry:
raise RuntimeError(entry)
raise
msg = record['msg']
level = record['level']
if msg and entry not in seen_so_far:
seen_so_far.add(entry)
if bp_on_msg:
await tractor.breakpoint()
getattr(log, level, log.error)(f'{msg}')
if patt in msg:
return True
# do a checkpoint so we don't block if cancelled B)
await trio.sleep(0.01)
return False
def try_signal(
self,
signal: str = 'SIGINT',
) -> bool:
try:
# XXX: market store doesn't seem to shutdown nicely all the
# time with this (maybe because there are still open grpc
# connections?) noticably after client connections have been
# made or are in use/teardown. It works just fine if you
# just start and stop the container tho?..
log.cancel(f'SENDING {signal} to {self.cntr.id}')
self.cntr.kill(signal)
return True
except docker.errors.APIError as err:
# _err = err
if 'is not running' in err.explanation:
return False
async def cancel(
self,
) -> None:
cid = self.cntr.id
self.try_signal('SIGINT')
with trio.move_on_after(0.5) as cs:
cs.shield = True
# print('PROCESSINGN LOGS')
await self.process_logs_until('initiating graceful shutdown')
# print('SHUTDOWN REPORTED BY CONTAINER')
await self.process_logs_until('exiting...',)
for _ in range(10):
with trio.move_on_after(0.5) as cs:
cs.shield = True
# print('waiting on EXITING')
await self.process_logs_until('exiting...',)
# print('got EXITING')
break
if cs.cancelled_caught:
# get out the big guns, bc apparently marketstore
# doesn't actually know how to terminate gracefully
# :eyeroll:...
self.try_signal('SIGKILL')
try:
log.info('Waiting on container shutdown: {cid}')
self.cntr.wait(
timeout=0.1,
condition='not-running',
)
break
except (
ReadTimeout,
ConnectionError,
):
log.error(f'failed to wait on container {cid}')
raise
else:
raise RuntimeError('Failed to cancel container {cid}')
log.cancel(f'Container stopped: {cid}')
@tractor.context @tractor.context
async def open_marketstored( async def open_marketstored(
ctx: tractor.Context, ctx: tractor.Context,
@ -305,7 +175,7 @@ async def open_marketstored(
type='bind', type='bind',
) )
dcntr: DockerContainer = client.containers.run( cntr: Container = client.containers.run(
'alpacamarkets/marketstore:latest', 'alpacamarkets/marketstore:latest',
# do we need this for cmds? # do we need this for cmds?
# '-i', # '-i',
@ -321,60 +191,78 @@ async def open_marketstored(
init=True, init=True,
# remove=True, # remove=True,
) )
cntr = Container(dcntr) try:
seen_so_far = set()
with trio.move_on_after(1): async def process_logs_until(
found = await cntr.process_logs_until( match: str,
"launching tcp listener for all services...", bp_on_msg: bool = False,
) ):
logs = cntr.logs(stream=True)
for entry in logs:
entry = entry.decode()
if not found and cntr not in client.containers.list(): try:
raise RuntimeError( record = json.loads(entry.strip())
'Failed to start `marketstore` check logs deats' except json.JSONDecodeError:
if 'Error' in entry:
raise RuntimeError(entry)
msg = record['msg']
level = record['level']
if msg and entry not in seen_so_far:
seen_so_far.add(entry)
if bp_on_msg:
await tractor.breakpoint()
getattr(log, level, log.error)(f'{msg}')
# if "launching tcp listener for all services..." in msg:
if match in msg:
return True
# do a checkpoint so we don't block if cancelled B)
await trio.sleep(0)
return False
with trio.move_on_after(0.5):
found = await process_logs_until(
"launching tcp listener for all services...",
) )
await ctx.started((cntr.cntr.id, os.getpid())) if not found and cntr not in client.containers.list():
raise RuntimeError(
'Failed to start `marketstore` check logs deats'
)
# async with ctx.open_stream() as stream: await ctx.started(cntr.id)
try: # block for the expected "teardown log msg"..
await process_logs_until('exiting...',)
# TODO: we might eventually want a proxy-style msg-prot here
# to allow remote control of containers without needing
# callers to have root perms?
await trio.sleep_forever()
# await cntr.cancel()
# with trio.CancelScope(shield=True):
# # block for the expected "teardown log msg"..
# # await cntr.process_logs_until('exiting...',)
# # only msg should be to signal killing the
# # container and this super daemon.
# msg = await stream.receive()
# # print("GOT CANCEL MSG")
# cid = msg['cancel']
# log.cancel(f'Cancelling container {cid}')
# # print("CANCELLING CONTAINER")
# await cntr.cancel()
# # print("SENDING ACK")
# await stream.send('ack')
except ( except (
BaseException, BaseException,
# trio.Cancelled, # trio.Cancelled,
# KeyboardInterrupt, # KeyboardInterrupt,
): ):
cntr.kill('SIGINT')
with trio.CancelScope(shield=True): with trio.move_on_after(0.5) as cs:
await cntr.cancel() cs.shield = True
# await stream.send('ack') await process_logs_until('exiting...',)
raise raise
finally:
try:
cntr.wait(
timeout=0.5,
condition='not-running',
)
except (
ReadTimeout,
ConnectionError,
):
cntr.kill()
async def start_ahab( async def start_ahab(
service_name: str, service_name: str,
@ -423,18 +311,9 @@ async def start_ahab(
open_marketstored, open_marketstored,
) as (ctx, first): ) as (ctx, first):
cid, pid = first assert str(first)
# run till cancelled
await trio.sleep_forever() await trio.sleep_forever()
# async with ctx.open_stream() as stream:
# try:
# # run till cancelled
# await trio.sleep_forever()
# finally:
# with trio.CancelScope(shield=True):
# # print('SENDING CANCEL TO MARKETSTORED')
# await stream.send({'cancel': (cid, pid)})
# assert await stream.receive() == 'ack'
# since we demoted root perms in this parent # since we demoted root perms in this parent
# we'll get a perms error on proc cleanup in # we'll get a perms error on proc cleanup in

View File

@ -146,21 +146,13 @@ async def broadcast(
# a given sample period. # a given sample period.
subs = sampler.subscribers.get(delay_s, ()) subs = sampler.subscribers.get(delay_s, ())
last = -1
if shm is None: if shm is None:
periods = sampler.ohlcv_shms.keys() lowest = min(sampler.ohlcv_shms.keys())
# if this is an update triggered by a history update there shm = sampler.ohlcv_shms[lowest][0]
# might not actually be any sampling bus setup since there's
# no "live feed" active yet.
if periods:
lowest = min(periods)
shm = sampler.ohlcv_shms[lowest][0]
last = shm._last.value
for stream in subs: for stream in subs:
try: try:
await stream.send({'index': last}) await stream.send({'index': shm._last.value})
except ( except (
trio.BrokenResourceError, trio.BrokenResourceError,
trio.ClosedResourceError trio.ClosedResourceError

View File

@ -351,7 +351,7 @@ class ShmArray:
# tries to access ``.array`` (which due to the index # tries to access ``.array`` (which due to the index
# overlap will be empty). Pretty sure we've fixed it now # overlap will be empty). Pretty sure we've fixed it now
# but leaving this here as a reminder. # but leaving this here as a reminder.
if prepend and update_first and length: if prepend and update_first:
assert index < self._first.value assert index < self._first.value
if ( if (

View File

@ -21,7 +21,6 @@ from __future__ import annotations
from typing import Any from typing import Any
import decimal import decimal
from bidict import bidict
import numpy as np import numpy as np
from pydantic import BaseModel from pydantic import BaseModel
# from numba import from_dtype # from numba import from_dtype
@ -48,16 +47,16 @@ base_ohlc_dtype = np.dtype(ohlc_fields)
# https://github.com/numba/numba/issues/4511 # https://github.com/numba/numba/issues/4511
# numba_ohlc_dtype = from_dtype(base_ohlc_dtype) # numba_ohlc_dtype = from_dtype(base_ohlc_dtype)
# map time frame "keys" to seconds values # map time frame "keys" to minutes values
tf_in_1s = bidict({ tf_in_1m = {
1: '1s', '1m': 1,
60: '1m', '5m': 5,
60*5: '5m', '15m': 15,
60*15: '15m', '30m': 30,
60*30: '30m', '1h': 60,
60*60: '1h', '4h': 240,
60*60*24: '1d', '1d': 1440,
}) }
def mk_fqsn( def mk_fqsn(

View File

@ -31,7 +31,6 @@ from typing import (
AsyncIterator, Optional, AsyncIterator, Optional,
Generator, Generator,
Awaitable, Awaitable,
TYPE_CHECKING,
) )
import trio import trio
@ -75,8 +74,6 @@ from ..brokers._util import (
DataUnavailable, DataUnavailable,
) )
if TYPE_CHECKING:
from .marketstore import Storage
log = get_logger(__name__) log = get_logger(__name__)
@ -211,27 +208,27 @@ def diff_history(
) -> np.ndarray: ) -> np.ndarray:
to_push = array
if last_tsdb_dt: if last_tsdb_dt:
s_diff = (start_dt - last_tsdb_dt).seconds s_diff = (start_dt - last_tsdb_dt).seconds
to_push = array[:s_diff]
# if we detect a partial frame's worth of data # if we detect a partial frame's worth of data
# that is new, slice out only that history and # that is new, slice out only that history and
# write to shm. # write to shm.
if ( if abs(s_diff) < len(array):
s_diff < 0
and abs(s_diff) < len(array)
):
# the + 1 is because ``last_tsdb_dt`` is pulled from
# the last row entry for the ``'time'`` field retreived
# from the tsdb.
to_push = array[abs(s_diff)+1:]
log.info( log.info(
f'Pushing partial frame {to_push.size} to shm' f'Pushing partial frame {to_push.size} to shm'
) )
# assert last_tsdb_dt > start_dt
# selected = array['time'] > last_tsdb_dt.timestamp()
# to_push = array[selected]
# return to_push
return to_push return to_push
else:
return array
async def start_backfill( async def start_backfill(
@ -240,8 +237,6 @@ async def start_backfill(
shm: ShmArray, shm: ShmArray,
last_tsdb_dt: Optional[datetime] = None, last_tsdb_dt: Optional[datetime] = None,
storage: Optional[Storage] = None,
write_tsdb: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -253,17 +248,6 @@ async def start_backfill(
# back to what is recorded in the tsdb # back to what is recorded in the tsdb
array, start_dt, end_dt = await hist(end_dt=None) array, start_dt, end_dt = await hist(end_dt=None)
times = array['time']
# sample period step size in seconds
step_size_s = (
pendulum.from_timestamp(times[-1]) -
pendulum.from_timestamp(times[-2])
).seconds
# "frame"'s worth of sample period steps in seconds
frame_size_s = len(array) * step_size_s
to_push = diff_history( to_push = diff_history(
array, array,
start_dt, start_dt,
@ -283,13 +267,20 @@ async def start_backfill(
# let caller unblock and deliver latest history frame # let caller unblock and deliver latest history frame
task_status.started((shm, start_dt, end_dt, bf_done)) task_status.started((shm, start_dt, end_dt, bf_done))
times = array['time']
step_size_s = (
pendulum.from_timestamp(times[-1]) -
pendulum.from_timestamp(times[-2])
).seconds
frame_size_s = len(to_push) * step_size_s
if last_tsdb_dt is None: if last_tsdb_dt is None:
# maybe a better default (they don't seem to define epoch?!) # maybe a better default (they don't seem to define epoch?!)
# based on the sample step size load a certain amount # based on the sample step size load a certain amount
# history # history
if step_size_s == 1: if step_size_s == 1:
last_tsdb_dt = pendulum.now().subtract(days=2) last_tsdb_dt = pendulum.now().subtract(days=6)
elif step_size_s == 60: elif step_size_s == 60:
last_tsdb_dt = pendulum.now().subtract(years=2) last_tsdb_dt = pendulum.now().subtract(years=2)
@ -372,7 +363,7 @@ async def start_backfill(
except NoData: except NoData:
log.warning( log.warning(
f'NO DATA for {frame_size_s}s frame @ {input_end_dt} ?!?' f'NO DATA for {frame_size_s}s frame @ {end_dt} ?!?'
) )
return None # discard signal return None # discard signal
@ -411,26 +402,14 @@ async def start_backfill(
) )
# reset dtrange gen to new start point # reset dtrange gen to new start point
try: next_end = iter_dts_gen.send(start_dt)
next_end = iter_dts_gen.send(start_dt) log.info(
log.info( f'Reset frame index to start at {start_dt}\n'
f'Reset frame index to start at {start_dt}\n' f'Was at {next_end}'
f'Was at {next_end}' )
)
# NOTE: manually set "earliest end datetime" index-value # TODO: can we avoid this?
# to avoid the request loop getting confused about earliest_end_dt = start_dt
# new frames that are earlier in history - i.e. this
# **is not** the case of out-of-order frames from
# an async batch request.
earliest_end_dt = start_dt
except StopIteration:
# gen already terminated meaning we probably already
# exhausted it via frame requests.
log.info(
"Datetime index already exhausted, can't reset.."
)
to_push = diff_history( to_push = diff_history(
array, array,
@ -540,7 +519,6 @@ async def start_backfill(
break break
to_push, start_dt, end_dt = frames.pop(epoch) to_push, start_dt, end_dt = frames.pop(epoch)
ln = len(to_push)
# bail gracefully on shm allocation overrun/full condition # bail gracefully on shm allocation overrun/full condition
try: try:
@ -549,30 +527,19 @@ async def start_backfill(
log.info( log.info(
f'Shm buffer overrun on: {start_dt} -> {end_dt}?' f'Shm buffer overrun on: {start_dt} -> {end_dt}?'
) )
# await tractor.breakpoint()
break break
log.info( log.info(
f'Shm pushed {ln} frame:\n' f'Shm pushed {len(to_push)} frame:\n'
f'{start_dt} -> {end_dt}' f'{start_dt} -> {end_dt}'
) )
# keep track of most recent "prepended" ``start_dt`` # keep track of most recent "prepended" ``start_dt``
# both for detecting gaps and ensuring async # both for detecting gaps and ensuring async
# frame-result order. # frame-result order.
earliest_end_dt = start_dt earliest_end_dt = start_dt
if (
storage is not None
and write_tsdb
):
log.info(
f'Writing {ln} frame to storage:\n'
f'{start_dt} -> {end_dt}'
)
await storage.write_ohlcv(
f'{bfqsn}.{mod.name}', # lul..
to_push,
)
# TODO: can we only trigger this if the respective # TODO: can we only trigger this if the respective
# history in "in view"?!? # history in "in view"?!?
# XXX: extremely important, there can be no checkpoints # XXX: extremely important, there can be no checkpoints
@ -641,7 +608,7 @@ async def manage_history(
# shm backfiller approach below. # shm backfiller approach below.
# start history anal and load missing new data via backend. # start history anal and load missing new data via backend.
series, _, last_tsdb_dt = await storage.load(fqsn) series, first_dt, last_dt = await storage.load(fqsn)
broker, symbol, expiry = unpack_fqsn(fqsn) broker, symbol, expiry = unpack_fqsn(fqsn)
( (
@ -655,8 +622,7 @@ async def manage_history(
mod, mod,
bfqsn, bfqsn,
shm, shm,
last_tsdb_dt=last_tsdb_dt, last_tsdb_dt=last_dt,
storage=storage,
) )
) )
@ -677,10 +643,8 @@ async def manage_history(
# do diff against last start frame of history and only fill # do diff against last start frame of history and only fill
# in from the tsdb an allotment that allows for most recent # in from the tsdb an allotment that allows for most recent
# to be loaded into mem *before* tsdb data. # to be loaded into mem *before* tsdb data.
if last_tsdb_dt: if last_dt:
dt_diff_s = ( dt_diff_s = (latest_start_dt - last_dt).seconds
latest_start_dt - last_tsdb_dt
).seconds
else: else:
dt_diff_s = 0 dt_diff_s = 0
@ -709,7 +673,7 @@ async def manage_history(
field_map=marketstore.ohlc_key_map, field_map=marketstore.ohlc_key_map,
) )
# load as much from storage into shm as space will # load as much from storage into shm as spacec will
# allow according to user's shm size settings. # allow according to user's shm size settings.
count = 0 count = 0
end = fastest['Epoch'][0] end = fastest['Epoch'][0]
@ -734,11 +698,15 @@ async def manage_history(
prepend=True, prepend=True,
# update_first=False, # update_first=False,
# start=prepend_start, # start=prepend_start,
field_map=marketstore.ohlc_key_map, field_map={
'Epoch': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
},
) )
# manually trigger step update to update charts/fsps
# which need an incremental update.
for delay_s in sampler.subscribers: for delay_s in sampler.subscribers:
await broadcast(delay_s) await broadcast(delay_s)

View File

@ -191,9 +191,6 @@ class ContentsLabel(pg.LabelItem):
self.setText( self.setText(
"<b>i</b>:{index}<br/>" "<b>i</b>:{index}<br/>"
# NB: these fields must be indexed in the correct order via
# the slice syntax below.
"<b>epoch</b>:{}<br/>"
"<b>O</b>:{}<br/>" "<b>O</b>:{}<br/>"
"<b>H</b>:{}<br/>" "<b>H</b>:{}<br/>"
"<b>L</b>:{}<br/>" "<b>L</b>:{}<br/>"
@ -201,15 +198,7 @@ class ContentsLabel(pg.LabelItem):
"<b>V</b>:{}<br/>" "<b>V</b>:{}<br/>"
"<b>wap</b>:{}".format( "<b>wap</b>:{}".format(
*array[index - first][ *array[index - first][
[ ['open', 'high', 'low', 'close', 'volume', 'bar_wap']
'time',
'open',
'high',
'low',
'close',
'volume',
'bar_wap',
]
], ],
name=name, name=name,
index=index, index=index,

View File

@ -29,7 +29,6 @@ from typing import Optional, Any, Callable
import numpy as np import numpy as np
import tractor import tractor
import trio import trio
import pendulum
import pyqtgraph as pg import pyqtgraph as pg
from .. import brokers from .. import brokers
@ -48,7 +47,6 @@ from ._fsp import (
open_vlm_displays, open_vlm_displays,
) )
from ..data._sharedmem import ShmArray from ..data._sharedmem import ShmArray
from ..data._source import tf_in_1s
from ._forms import ( from ._forms import (
FieldsForm, FieldsForm,
mk_order_pane_layout, mk_order_pane_layout,
@ -662,17 +660,11 @@ async def display_symbol_data(
symbol = feed.symbols[sym] symbol = feed.symbols[sym]
fqsn = symbol.front_fqsn() fqsn = symbol.front_fqsn()
times = bars['time']
end = pendulum.from_timestamp(times[-1])
start = pendulum.from_timestamp(times[times != times[-1]][-1])
step_size_s = (end - start).seconds
tf_key = tf_in_1s[step_size_s]
# load in symbol's ohlc data # load in symbol's ohlc data
godwidget.window.setWindowTitle( godwidget.window.setWindowTitle(
f'{fqsn} ' f'{fqsn} '
f'tick:{symbol.tick_size} ' f'tick:{symbol.tick_size} '
f'step:{tf_key} ' f'step:1s '
) )
linked = godwidget.linkedsplits linked = godwidget.linkedsplits