Compare commits
10 Commits
1967bc7973
...
8c1905e35a
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 8c1905e35a | |
Tyler Goodlet | d9e2666e80 | |
Tyler Goodlet | 1abe7d87a5 | |
Tyler Goodlet | 2f9d199a7f | |
Tyler Goodlet | 2f3418546f | |
Tyler Goodlet | 729c72a48f | |
Tyler Goodlet | 324dcbbfb0 | |
Tyler Goodlet | 112cba43e5 | |
Tyler Goodlet | 468cd3a381 | |
Tyler Goodlet | e8aaf42cc6 |
|
@ -30,7 +30,7 @@ from trio_typing import TaskStatus
|
||||||
import tractor
|
import tractor
|
||||||
import docker
|
import docker
|
||||||
import json
|
import json
|
||||||
from docker.models.containers import Container
|
from docker.models.containers import Container as DockerContainer
|
||||||
from docker.errors import DockerException, APIError
|
from docker.errors import DockerException, APIError
|
||||||
from requests.exceptions import ConnectionError, ReadTimeout
|
from requests.exceptions import ConnectionError, ReadTimeout
|
||||||
|
|
||||||
|
@ -133,6 +133,136 @@ async def open_docker(
|
||||||
c.kill()
|
c.kill()
|
||||||
|
|
||||||
|
|
||||||
|
class Container:
|
||||||
|
'''
|
||||||
|
Wrapper around a ``docker.models.containers.Container`` to include
|
||||||
|
log capture and relay through our native logging system and helper
|
||||||
|
method(s) for cancellation/teardown.
|
||||||
|
|
||||||
|
'''
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
cntr: DockerContainer,
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
self.cntr = cntr
|
||||||
|
# log msg de-duplication
|
||||||
|
self.seen_so_far = set()
|
||||||
|
|
||||||
|
async def process_logs_until(
|
||||||
|
self,
|
||||||
|
patt: str,
|
||||||
|
bp_on_msg: bool = False,
|
||||||
|
) -> bool:
|
||||||
|
'''
|
||||||
|
Attempt to capture container log messages and relay through our
|
||||||
|
native logging system.
|
||||||
|
|
||||||
|
'''
|
||||||
|
seen_so_far = self.seen_so_far
|
||||||
|
|
||||||
|
while True:
|
||||||
|
logs = self.cntr.logs()
|
||||||
|
entries = logs.decode().split('\n')
|
||||||
|
for entry in entries:
|
||||||
|
|
||||||
|
# ignore null lines
|
||||||
|
if not entry:
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
record = json.loads(entry.strip())
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
if 'Error' in entry:
|
||||||
|
raise RuntimeError(entry)
|
||||||
|
raise
|
||||||
|
|
||||||
|
msg = record['msg']
|
||||||
|
level = record['level']
|
||||||
|
if msg and entry not in seen_so_far:
|
||||||
|
seen_so_far.add(entry)
|
||||||
|
if bp_on_msg:
|
||||||
|
await tractor.breakpoint()
|
||||||
|
|
||||||
|
getattr(log, level, log.error)(f'{msg}')
|
||||||
|
|
||||||
|
if patt in msg:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# do a checkpoint so we don't block if cancelled B)
|
||||||
|
await trio.sleep(0.01)
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def try_signal(
|
||||||
|
self,
|
||||||
|
signal: str = 'SIGINT',
|
||||||
|
|
||||||
|
) -> bool:
|
||||||
|
try:
|
||||||
|
# XXX: market store doesn't seem to shutdown nicely all the
|
||||||
|
# time with this (maybe because there are still open grpc
|
||||||
|
# connections?) noticably after client connections have been
|
||||||
|
# made or are in use/teardown. It works just fine if you
|
||||||
|
# just start and stop the container tho?..
|
||||||
|
log.cancel(f'SENDING {signal} to {self.cntr.id}')
|
||||||
|
self.cntr.kill(signal)
|
||||||
|
return True
|
||||||
|
|
||||||
|
except docker.errors.APIError as err:
|
||||||
|
# _err = err
|
||||||
|
if 'is not running' in err.explanation:
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def cancel(
|
||||||
|
self,
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
cid = self.cntr.id
|
||||||
|
self.try_signal('SIGINT')
|
||||||
|
|
||||||
|
with trio.move_on_after(0.5) as cs:
|
||||||
|
cs.shield = True
|
||||||
|
# print('PROCESSINGN LOGS')
|
||||||
|
await self.process_logs_until('initiating graceful shutdown')
|
||||||
|
# print('SHUTDOWN REPORTED BY CONTAINER')
|
||||||
|
await self.process_logs_until('exiting...',)
|
||||||
|
|
||||||
|
for _ in range(10):
|
||||||
|
with trio.move_on_after(0.5) as cs:
|
||||||
|
cs.shield = True
|
||||||
|
# print('waiting on EXITING')
|
||||||
|
await self.process_logs_until('exiting...',)
|
||||||
|
# print('got EXITING')
|
||||||
|
break
|
||||||
|
|
||||||
|
if cs.cancelled_caught:
|
||||||
|
# get out the big guns, bc apparently marketstore
|
||||||
|
# doesn't actually know how to terminate gracefully
|
||||||
|
# :eyeroll:...
|
||||||
|
self.try_signal('SIGKILL')
|
||||||
|
|
||||||
|
try:
|
||||||
|
log.info('Waiting on container shutdown: {cid}')
|
||||||
|
self.cntr.wait(
|
||||||
|
timeout=0.1,
|
||||||
|
condition='not-running',
|
||||||
|
)
|
||||||
|
break
|
||||||
|
|
||||||
|
except (
|
||||||
|
ReadTimeout,
|
||||||
|
ConnectionError,
|
||||||
|
):
|
||||||
|
log.error(f'failed to wait on container {cid}')
|
||||||
|
raise
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise RuntimeError('Failed to cancel container {cid}')
|
||||||
|
|
||||||
|
log.cancel(f'Container stopped: {cid}')
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def open_marketstored(
|
async def open_marketstored(
|
||||||
ctx: tractor.Context,
|
ctx: tractor.Context,
|
||||||
|
@ -175,7 +305,7 @@ async def open_marketstored(
|
||||||
type='bind',
|
type='bind',
|
||||||
)
|
)
|
||||||
|
|
||||||
cntr: Container = client.containers.run(
|
dcntr: DockerContainer = client.containers.run(
|
||||||
'alpacamarkets/marketstore:latest',
|
'alpacamarkets/marketstore:latest',
|
||||||
# do we need this for cmds?
|
# do we need this for cmds?
|
||||||
# '-i',
|
# '-i',
|
||||||
|
@ -191,42 +321,10 @@ async def open_marketstored(
|
||||||
init=True,
|
init=True,
|
||||||
# remove=True,
|
# remove=True,
|
||||||
)
|
)
|
||||||
try:
|
cntr = Container(dcntr)
|
||||||
seen_so_far = set()
|
|
||||||
|
|
||||||
async def process_logs_until(
|
with trio.move_on_after(1):
|
||||||
match: str,
|
found = await cntr.process_logs_until(
|
||||||
bp_on_msg: bool = False,
|
|
||||||
):
|
|
||||||
logs = cntr.logs(stream=True)
|
|
||||||
for entry in logs:
|
|
||||||
entry = entry.decode()
|
|
||||||
|
|
||||||
try:
|
|
||||||
record = json.loads(entry.strip())
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
if 'Error' in entry:
|
|
||||||
raise RuntimeError(entry)
|
|
||||||
|
|
||||||
msg = record['msg']
|
|
||||||
level = record['level']
|
|
||||||
if msg and entry not in seen_so_far:
|
|
||||||
seen_so_far.add(entry)
|
|
||||||
if bp_on_msg:
|
|
||||||
await tractor.breakpoint()
|
|
||||||
getattr(log, level, log.error)(f'{msg}')
|
|
||||||
|
|
||||||
# if "launching tcp listener for all services..." in msg:
|
|
||||||
if match in msg:
|
|
||||||
return True
|
|
||||||
|
|
||||||
# do a checkpoint so we don't block if cancelled B)
|
|
||||||
await trio.sleep(0)
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
with trio.move_on_after(0.5):
|
|
||||||
found = await process_logs_until(
|
|
||||||
"launching tcp listener for all services...",
|
"launching tcp listener for all services...",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -235,33 +333,47 @@ async def open_marketstored(
|
||||||
'Failed to start `marketstore` check logs deats'
|
'Failed to start `marketstore` check logs deats'
|
||||||
)
|
)
|
||||||
|
|
||||||
await ctx.started(cntr.id)
|
await ctx.started((cntr.cntr.id, os.getpid()))
|
||||||
|
|
||||||
# block for the expected "teardown log msg"..
|
# async with ctx.open_stream() as stream:
|
||||||
await process_logs_until('exiting...',)
|
|
||||||
|
try:
|
||||||
|
|
||||||
|
# TODO: we might eventually want a proxy-style msg-prot here
|
||||||
|
# to allow remote control of containers without needing
|
||||||
|
# callers to have root perms?
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
# await cntr.cancel()
|
||||||
|
# with trio.CancelScope(shield=True):
|
||||||
|
# # block for the expected "teardown log msg"..
|
||||||
|
# # await cntr.process_logs_until('exiting...',)
|
||||||
|
|
||||||
|
# # only msg should be to signal killing the
|
||||||
|
# # container and this super daemon.
|
||||||
|
# msg = await stream.receive()
|
||||||
|
# # print("GOT CANCEL MSG")
|
||||||
|
|
||||||
|
# cid = msg['cancel']
|
||||||
|
# log.cancel(f'Cancelling container {cid}')
|
||||||
|
|
||||||
|
# # print("CANCELLING CONTAINER")
|
||||||
|
# await cntr.cancel()
|
||||||
|
|
||||||
|
# # print("SENDING ACK")
|
||||||
|
# await stream.send('ack')
|
||||||
|
|
||||||
except (
|
except (
|
||||||
BaseException,
|
BaseException,
|
||||||
# trio.Cancelled,
|
# trio.Cancelled,
|
||||||
# KeyboardInterrupt,
|
# KeyboardInterrupt,
|
||||||
):
|
):
|
||||||
cntr.kill('SIGINT')
|
|
||||||
with trio.move_on_after(0.5) as cs:
|
|
||||||
cs.shield = True
|
|
||||||
await process_logs_until('exiting...',)
|
|
||||||
raise
|
|
||||||
|
|
||||||
finally:
|
with trio.CancelScope(shield=True):
|
||||||
try:
|
await cntr.cancel()
|
||||||
cntr.wait(
|
# await stream.send('ack')
|
||||||
timeout=0.5,
|
|
||||||
condition='not-running',
|
raise
|
||||||
)
|
|
||||||
except (
|
|
||||||
ReadTimeout,
|
|
||||||
ConnectionError,
|
|
||||||
):
|
|
||||||
cntr.kill()
|
|
||||||
|
|
||||||
|
|
||||||
async def start_ahab(
|
async def start_ahab(
|
||||||
|
@ -311,9 +423,18 @@ async def start_ahab(
|
||||||
open_marketstored,
|
open_marketstored,
|
||||||
) as (ctx, first):
|
) as (ctx, first):
|
||||||
|
|
||||||
assert str(first)
|
cid, pid = first
|
||||||
# run till cancelled
|
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
# async with ctx.open_stream() as stream:
|
||||||
|
# try:
|
||||||
|
# # run till cancelled
|
||||||
|
# await trio.sleep_forever()
|
||||||
|
# finally:
|
||||||
|
# with trio.CancelScope(shield=True):
|
||||||
|
# # print('SENDING CANCEL TO MARKETSTORED')
|
||||||
|
# await stream.send({'cancel': (cid, pid)})
|
||||||
|
# assert await stream.receive() == 'ack'
|
||||||
|
|
||||||
# since we demoted root perms in this parent
|
# since we demoted root perms in this parent
|
||||||
# we'll get a perms error on proc cleanup in
|
# we'll get a perms error on proc cleanup in
|
||||||
|
|
|
@ -146,13 +146,21 @@ async def broadcast(
|
||||||
# a given sample period.
|
# a given sample period.
|
||||||
subs = sampler.subscribers.get(delay_s, ())
|
subs = sampler.subscribers.get(delay_s, ())
|
||||||
|
|
||||||
|
last = -1
|
||||||
|
|
||||||
if shm is None:
|
if shm is None:
|
||||||
lowest = min(sampler.ohlcv_shms.keys())
|
periods = sampler.ohlcv_shms.keys()
|
||||||
|
# if this is an update triggered by a history update there
|
||||||
|
# might not actually be any sampling bus setup since there's
|
||||||
|
# no "live feed" active yet.
|
||||||
|
if periods:
|
||||||
|
lowest = min(periods)
|
||||||
shm = sampler.ohlcv_shms[lowest][0]
|
shm = sampler.ohlcv_shms[lowest][0]
|
||||||
|
last = shm._last.value
|
||||||
|
|
||||||
for stream in subs:
|
for stream in subs:
|
||||||
try:
|
try:
|
||||||
await stream.send({'index': shm._last.value})
|
await stream.send({'index': last})
|
||||||
except (
|
except (
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
trio.ClosedResourceError
|
trio.ClosedResourceError
|
||||||
|
|
|
@ -351,7 +351,7 @@ class ShmArray:
|
||||||
# tries to access ``.array`` (which due to the index
|
# tries to access ``.array`` (which due to the index
|
||||||
# overlap will be empty). Pretty sure we've fixed it now
|
# overlap will be empty). Pretty sure we've fixed it now
|
||||||
# but leaving this here as a reminder.
|
# but leaving this here as a reminder.
|
||||||
if prepend and update_first:
|
if prepend and update_first and length:
|
||||||
assert index < self._first.value
|
assert index < self._first.value
|
||||||
|
|
||||||
if (
|
if (
|
||||||
|
|
|
@ -21,6 +21,7 @@ from __future__ import annotations
|
||||||
from typing import Any
|
from typing import Any
|
||||||
import decimal
|
import decimal
|
||||||
|
|
||||||
|
from bidict import bidict
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
# from numba import from_dtype
|
# from numba import from_dtype
|
||||||
|
@ -47,16 +48,16 @@ base_ohlc_dtype = np.dtype(ohlc_fields)
|
||||||
# https://github.com/numba/numba/issues/4511
|
# https://github.com/numba/numba/issues/4511
|
||||||
# numba_ohlc_dtype = from_dtype(base_ohlc_dtype)
|
# numba_ohlc_dtype = from_dtype(base_ohlc_dtype)
|
||||||
|
|
||||||
# map time frame "keys" to minutes values
|
# map time frame "keys" to seconds values
|
||||||
tf_in_1m = {
|
tf_in_1s = bidict({
|
||||||
'1m': 1,
|
1: '1s',
|
||||||
'5m': 5,
|
60: '1m',
|
||||||
'15m': 15,
|
60*5: '5m',
|
||||||
'30m': 30,
|
60*15: '15m',
|
||||||
'1h': 60,
|
60*30: '30m',
|
||||||
'4h': 240,
|
60*60: '1h',
|
||||||
'1d': 1440,
|
60*60*24: '1d',
|
||||||
}
|
})
|
||||||
|
|
||||||
|
|
||||||
def mk_fqsn(
|
def mk_fqsn(
|
||||||
|
|
|
@ -31,6 +31,7 @@ from typing import (
|
||||||
AsyncIterator, Optional,
|
AsyncIterator, Optional,
|
||||||
Generator,
|
Generator,
|
||||||
Awaitable,
|
Awaitable,
|
||||||
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
|
@ -74,6 +75,8 @@ from ..brokers._util import (
|
||||||
DataUnavailable,
|
DataUnavailable,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from .marketstore import Storage
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
@ -208,28 +211,28 @@ def diff_history(
|
||||||
|
|
||||||
) -> np.ndarray:
|
) -> np.ndarray:
|
||||||
|
|
||||||
|
to_push = array
|
||||||
|
|
||||||
if last_tsdb_dt:
|
if last_tsdb_dt:
|
||||||
s_diff = (start_dt - last_tsdb_dt).seconds
|
s_diff = (start_dt - last_tsdb_dt).seconds
|
||||||
|
|
||||||
to_push = array[:s_diff]
|
|
||||||
|
|
||||||
# if we detect a partial frame's worth of data
|
# if we detect a partial frame's worth of data
|
||||||
# that is new, slice out only that history and
|
# that is new, slice out only that history and
|
||||||
# write to shm.
|
# write to shm.
|
||||||
if abs(s_diff) < len(array):
|
if (
|
||||||
|
s_diff < 0
|
||||||
|
and abs(s_diff) < len(array)
|
||||||
|
):
|
||||||
|
# the + 1 is because ``last_tsdb_dt`` is pulled from
|
||||||
|
# the last row entry for the ``'time'`` field retreived
|
||||||
|
# from the tsdb.
|
||||||
|
to_push = array[abs(s_diff)+1:]
|
||||||
log.info(
|
log.info(
|
||||||
f'Pushing partial frame {to_push.size} to shm'
|
f'Pushing partial frame {to_push.size} to shm'
|
||||||
)
|
)
|
||||||
# assert last_tsdb_dt > start_dt
|
|
||||||
# selected = array['time'] > last_tsdb_dt.timestamp()
|
|
||||||
# to_push = array[selected]
|
|
||||||
# return to_push
|
|
||||||
|
|
||||||
return to_push
|
return to_push
|
||||||
|
|
||||||
else:
|
|
||||||
return array
|
|
||||||
|
|
||||||
|
|
||||||
async def start_backfill(
|
async def start_backfill(
|
||||||
mod: ModuleType,
|
mod: ModuleType,
|
||||||
|
@ -237,6 +240,8 @@ async def start_backfill(
|
||||||
shm: ShmArray,
|
shm: ShmArray,
|
||||||
|
|
||||||
last_tsdb_dt: Optional[datetime] = None,
|
last_tsdb_dt: Optional[datetime] = None,
|
||||||
|
storage: Optional[Storage] = None,
|
||||||
|
write_tsdb: bool = False,
|
||||||
|
|
||||||
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
|
@ -248,6 +253,17 @@ async def start_backfill(
|
||||||
# back to what is recorded in the tsdb
|
# back to what is recorded in the tsdb
|
||||||
array, start_dt, end_dt = await hist(end_dt=None)
|
array, start_dt, end_dt = await hist(end_dt=None)
|
||||||
|
|
||||||
|
times = array['time']
|
||||||
|
|
||||||
|
# sample period step size in seconds
|
||||||
|
step_size_s = (
|
||||||
|
pendulum.from_timestamp(times[-1]) -
|
||||||
|
pendulum.from_timestamp(times[-2])
|
||||||
|
).seconds
|
||||||
|
|
||||||
|
# "frame"'s worth of sample period steps in seconds
|
||||||
|
frame_size_s = len(array) * step_size_s
|
||||||
|
|
||||||
to_push = diff_history(
|
to_push = diff_history(
|
||||||
array,
|
array,
|
||||||
start_dt,
|
start_dt,
|
||||||
|
@ -267,20 +283,13 @@ async def start_backfill(
|
||||||
# let caller unblock and deliver latest history frame
|
# let caller unblock and deliver latest history frame
|
||||||
task_status.started((shm, start_dt, end_dt, bf_done))
|
task_status.started((shm, start_dt, end_dt, bf_done))
|
||||||
|
|
||||||
times = array['time']
|
|
||||||
step_size_s = (
|
|
||||||
pendulum.from_timestamp(times[-1]) -
|
|
||||||
pendulum.from_timestamp(times[-2])
|
|
||||||
).seconds
|
|
||||||
frame_size_s = len(to_push) * step_size_s
|
|
||||||
|
|
||||||
if last_tsdb_dt is None:
|
if last_tsdb_dt is None:
|
||||||
# maybe a better default (they don't seem to define epoch?!)
|
# maybe a better default (they don't seem to define epoch?!)
|
||||||
|
|
||||||
# based on the sample step size load a certain amount
|
# based on the sample step size load a certain amount
|
||||||
# history
|
# history
|
||||||
if step_size_s == 1:
|
if step_size_s == 1:
|
||||||
last_tsdb_dt = pendulum.now().subtract(days=6)
|
last_tsdb_dt = pendulum.now().subtract(days=2)
|
||||||
|
|
||||||
elif step_size_s == 60:
|
elif step_size_s == 60:
|
||||||
last_tsdb_dt = pendulum.now().subtract(years=2)
|
last_tsdb_dt = pendulum.now().subtract(years=2)
|
||||||
|
@ -363,7 +372,7 @@ async def start_backfill(
|
||||||
|
|
||||||
except NoData:
|
except NoData:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'NO DATA for {frame_size_s}s frame @ {end_dt} ?!?'
|
f'NO DATA for {frame_size_s}s frame @ {input_end_dt} ?!?'
|
||||||
)
|
)
|
||||||
return None # discard signal
|
return None # discard signal
|
||||||
|
|
||||||
|
@ -402,15 +411,27 @@ async def start_backfill(
|
||||||
)
|
)
|
||||||
|
|
||||||
# reset dtrange gen to new start point
|
# reset dtrange gen to new start point
|
||||||
|
try:
|
||||||
next_end = iter_dts_gen.send(start_dt)
|
next_end = iter_dts_gen.send(start_dt)
|
||||||
log.info(
|
log.info(
|
||||||
f'Reset frame index to start at {start_dt}\n'
|
f'Reset frame index to start at {start_dt}\n'
|
||||||
f'Was at {next_end}'
|
f'Was at {next_end}'
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: can we avoid this?
|
# NOTE: manually set "earliest end datetime" index-value
|
||||||
|
# to avoid the request loop getting confused about
|
||||||
|
# new frames that are earlier in history - i.e. this
|
||||||
|
# **is not** the case of out-of-order frames from
|
||||||
|
# an async batch request.
|
||||||
earliest_end_dt = start_dt
|
earliest_end_dt = start_dt
|
||||||
|
|
||||||
|
except StopIteration:
|
||||||
|
# gen already terminated meaning we probably already
|
||||||
|
# exhausted it via frame requests.
|
||||||
|
log.info(
|
||||||
|
"Datetime index already exhausted, can't reset.."
|
||||||
|
)
|
||||||
|
|
||||||
to_push = diff_history(
|
to_push = diff_history(
|
||||||
array,
|
array,
|
||||||
start_dt,
|
start_dt,
|
||||||
|
@ -519,6 +540,7 @@ async def start_backfill(
|
||||||
break
|
break
|
||||||
|
|
||||||
to_push, start_dt, end_dt = frames.pop(epoch)
|
to_push, start_dt, end_dt = frames.pop(epoch)
|
||||||
|
ln = len(to_push)
|
||||||
|
|
||||||
# bail gracefully on shm allocation overrun/full condition
|
# bail gracefully on shm allocation overrun/full condition
|
||||||
try:
|
try:
|
||||||
|
@ -527,19 +549,30 @@ async def start_backfill(
|
||||||
log.info(
|
log.info(
|
||||||
f'Shm buffer overrun on: {start_dt} -> {end_dt}?'
|
f'Shm buffer overrun on: {start_dt} -> {end_dt}?'
|
||||||
)
|
)
|
||||||
# await tractor.breakpoint()
|
|
||||||
break
|
break
|
||||||
|
|
||||||
log.info(
|
log.info(
|
||||||
f'Shm pushed {len(to_push)} frame:\n'
|
f'Shm pushed {ln} frame:\n'
|
||||||
f'{start_dt} -> {end_dt}'
|
f'{start_dt} -> {end_dt}'
|
||||||
)
|
)
|
||||||
|
|
||||||
# keep track of most recent "prepended" ``start_dt``
|
# keep track of most recent "prepended" ``start_dt``
|
||||||
# both for detecting gaps and ensuring async
|
# both for detecting gaps and ensuring async
|
||||||
# frame-result order.
|
# frame-result order.
|
||||||
earliest_end_dt = start_dt
|
earliest_end_dt = start_dt
|
||||||
|
|
||||||
|
if (
|
||||||
|
storage is not None
|
||||||
|
and write_tsdb
|
||||||
|
):
|
||||||
|
log.info(
|
||||||
|
f'Writing {ln} frame to storage:\n'
|
||||||
|
f'{start_dt} -> {end_dt}'
|
||||||
|
)
|
||||||
|
await storage.write_ohlcv(
|
||||||
|
f'{bfqsn}.{mod.name}', # lul..
|
||||||
|
to_push,
|
||||||
|
)
|
||||||
|
|
||||||
# TODO: can we only trigger this if the respective
|
# TODO: can we only trigger this if the respective
|
||||||
# history in "in view"?!?
|
# history in "in view"?!?
|
||||||
# XXX: extremely important, there can be no checkpoints
|
# XXX: extremely important, there can be no checkpoints
|
||||||
|
@ -608,7 +641,7 @@ async def manage_history(
|
||||||
# shm backfiller approach below.
|
# shm backfiller approach below.
|
||||||
|
|
||||||
# start history anal and load missing new data via backend.
|
# start history anal and load missing new data via backend.
|
||||||
series, first_dt, last_dt = await storage.load(fqsn)
|
series, _, last_tsdb_dt = await storage.load(fqsn)
|
||||||
|
|
||||||
broker, symbol, expiry = unpack_fqsn(fqsn)
|
broker, symbol, expiry = unpack_fqsn(fqsn)
|
||||||
(
|
(
|
||||||
|
@ -622,7 +655,8 @@ async def manage_history(
|
||||||
mod,
|
mod,
|
||||||
bfqsn,
|
bfqsn,
|
||||||
shm,
|
shm,
|
||||||
last_tsdb_dt=last_dt,
|
last_tsdb_dt=last_tsdb_dt,
|
||||||
|
storage=storage,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -643,8 +677,10 @@ async def manage_history(
|
||||||
# do diff against last start frame of history and only fill
|
# do diff against last start frame of history and only fill
|
||||||
# in from the tsdb an allotment that allows for most recent
|
# in from the tsdb an allotment that allows for most recent
|
||||||
# to be loaded into mem *before* tsdb data.
|
# to be loaded into mem *before* tsdb data.
|
||||||
if last_dt:
|
if last_tsdb_dt:
|
||||||
dt_diff_s = (latest_start_dt - last_dt).seconds
|
dt_diff_s = (
|
||||||
|
latest_start_dt - last_tsdb_dt
|
||||||
|
).seconds
|
||||||
else:
|
else:
|
||||||
dt_diff_s = 0
|
dt_diff_s = 0
|
||||||
|
|
||||||
|
@ -673,7 +709,7 @@ async def manage_history(
|
||||||
field_map=marketstore.ohlc_key_map,
|
field_map=marketstore.ohlc_key_map,
|
||||||
)
|
)
|
||||||
|
|
||||||
# load as much from storage into shm as spacec will
|
# load as much from storage into shm as space will
|
||||||
# allow according to user's shm size settings.
|
# allow according to user's shm size settings.
|
||||||
count = 0
|
count = 0
|
||||||
end = fastest['Epoch'][0]
|
end = fastest['Epoch'][0]
|
||||||
|
@ -698,15 +734,11 @@ async def manage_history(
|
||||||
prepend=True,
|
prepend=True,
|
||||||
# update_first=False,
|
# update_first=False,
|
||||||
# start=prepend_start,
|
# start=prepend_start,
|
||||||
field_map={
|
field_map=marketstore.ohlc_key_map,
|
||||||
'Epoch': 'time',
|
|
||||||
'Open': 'open',
|
|
||||||
'High': 'high',
|
|
||||||
'Low': 'low',
|
|
||||||
'Close': 'close',
|
|
||||||
'Volume': 'volume',
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# manually trigger step update to update charts/fsps
|
||||||
|
# which need an incremental update.
|
||||||
for delay_s in sampler.subscribers:
|
for delay_s in sampler.subscribers:
|
||||||
await broadcast(delay_s)
|
await broadcast(delay_s)
|
||||||
|
|
||||||
|
|
|
@ -191,6 +191,9 @@ class ContentsLabel(pg.LabelItem):
|
||||||
|
|
||||||
self.setText(
|
self.setText(
|
||||||
"<b>i</b>:{index}<br/>"
|
"<b>i</b>:{index}<br/>"
|
||||||
|
# NB: these fields must be indexed in the correct order via
|
||||||
|
# the slice syntax below.
|
||||||
|
"<b>epoch</b>:{}<br/>"
|
||||||
"<b>O</b>:{}<br/>"
|
"<b>O</b>:{}<br/>"
|
||||||
"<b>H</b>:{}<br/>"
|
"<b>H</b>:{}<br/>"
|
||||||
"<b>L</b>:{}<br/>"
|
"<b>L</b>:{}<br/>"
|
||||||
|
@ -198,7 +201,15 @@ class ContentsLabel(pg.LabelItem):
|
||||||
"<b>V</b>:{}<br/>"
|
"<b>V</b>:{}<br/>"
|
||||||
"<b>wap</b>:{}".format(
|
"<b>wap</b>:{}".format(
|
||||||
*array[index - first][
|
*array[index - first][
|
||||||
['open', 'high', 'low', 'close', 'volume', 'bar_wap']
|
[
|
||||||
|
'time',
|
||||||
|
'open',
|
||||||
|
'high',
|
||||||
|
'low',
|
||||||
|
'close',
|
||||||
|
'volume',
|
||||||
|
'bar_wap',
|
||||||
|
]
|
||||||
],
|
],
|
||||||
name=name,
|
name=name,
|
||||||
index=index,
|
index=index,
|
||||||
|
|
|
@ -29,6 +29,7 @@ from typing import Optional, Any, Callable
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import tractor
|
import tractor
|
||||||
import trio
|
import trio
|
||||||
|
import pendulum
|
||||||
import pyqtgraph as pg
|
import pyqtgraph as pg
|
||||||
|
|
||||||
from .. import brokers
|
from .. import brokers
|
||||||
|
@ -47,6 +48,7 @@ from ._fsp import (
|
||||||
open_vlm_displays,
|
open_vlm_displays,
|
||||||
)
|
)
|
||||||
from ..data._sharedmem import ShmArray
|
from ..data._sharedmem import ShmArray
|
||||||
|
from ..data._source import tf_in_1s
|
||||||
from ._forms import (
|
from ._forms import (
|
||||||
FieldsForm,
|
FieldsForm,
|
||||||
mk_order_pane_layout,
|
mk_order_pane_layout,
|
||||||
|
@ -660,11 +662,17 @@ async def display_symbol_data(
|
||||||
symbol = feed.symbols[sym]
|
symbol = feed.symbols[sym]
|
||||||
fqsn = symbol.front_fqsn()
|
fqsn = symbol.front_fqsn()
|
||||||
|
|
||||||
|
times = bars['time']
|
||||||
|
end = pendulum.from_timestamp(times[-1])
|
||||||
|
start = pendulum.from_timestamp(times[times != times[-1]][-1])
|
||||||
|
step_size_s = (end - start).seconds
|
||||||
|
tf_key = tf_in_1s[step_size_s]
|
||||||
|
|
||||||
# load in symbol's ohlc data
|
# load in symbol's ohlc data
|
||||||
godwidget.window.setWindowTitle(
|
godwidget.window.setWindowTitle(
|
||||||
f'{fqsn} '
|
f'{fqsn} '
|
||||||
f'tick:{symbol.tick_size} '
|
f'tick:{symbol.tick_size} '
|
||||||
f'step:1s '
|
f'step:{tf_key} '
|
||||||
)
|
)
|
||||||
|
|
||||||
linked = godwidget.linkedsplits
|
linked = godwidget.linkedsplits
|
||||||
|
|
Loading…
Reference in New Issue