Merge pull request #294 from pikers/broker_bumpz

Broker bumpz
drop_arrow_add_predulum
goodboy 2022-04-13 08:10:44 -04:00 committed by GitHub
commit fbabfb78e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 848 additions and 308 deletions

View File

@ -39,7 +39,9 @@ class NoData(BrokerError):
def resproc( def resproc(
resp: asks.response_objects.Response, resp: asks.response_objects.Response,
log: logging.Logger, log: logging.Logger,
return_json: bool = True return_json: bool = True,
log_resp: bool = False,
) -> asks.response_objects.Response: ) -> asks.response_objects.Response:
"""Process response and return its json content. """Process response and return its json content.
@ -52,7 +54,8 @@ def resproc(
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError:
log.exception(f"Failed to process {resp}:\n{resp.text}") log.exception(f"Failed to process {resp}:\n{resp.text}")
raise BrokerError(resp.text) raise BrokerError(resp.text)
else:
if log_resp:
log.debug(f"Received json contents:\n{colorize_json(json)}") log.debug(f"Received json contents:\n{colorize_json(json)}")
return json if return_json else resp return json if return_json else resp

View File

@ -142,15 +142,23 @@ async def symbol_search(
brokermods: list[ModuleType], brokermods: list[ModuleType],
pattern: str, pattern: str,
**kwargs, **kwargs,
) -> Dict[str, Dict[str, Dict[str, Any]]]: ) -> Dict[str, Dict[str, Dict[str, Any]]]:
"""Return symbol info from broker. '''
""" Return symbol info from broker.
'''
results = [] results = []
async def search_backend(brokername: str) -> None: async def search_backend(
brokermod: ModuleType
) -> None:
brokername: str = mod.name
async with maybe_spawn_brokerd( async with maybe_spawn_brokerd(
brokername, mod.name,
infect_asyncio=getattr(mod, '_infect_asyncio', False),
) as portal: ) as portal:
results.append(( results.append((

File diff suppressed because it is too large Load Diff

View File

@ -19,7 +19,6 @@ NumPy compatible shared memory buffers for real-time IPC streaming.
""" """
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass, asdict
from sys import byteorder from sys import byteorder
from typing import Optional from typing import Optional
from multiprocessing.shared_memory import SharedMemory, _USE_POSIX from multiprocessing.shared_memory import SharedMemory, _USE_POSIX
@ -30,7 +29,7 @@ if _USE_POSIX:
import tractor import tractor
import numpy as np import numpy as np
from pydantic import BaseModel, validator from pydantic import BaseModel
from ..log import get_logger from ..log import get_logger
from ._source import base_iohlc_dtype from ._source import base_iohlc_dtype
@ -39,6 +38,14 @@ from ._source import base_iohlc_dtype
log = get_logger(__name__) log = get_logger(__name__)
# how much is probably dependent on lifestyle
_secs_in_day = int(60 * 60 * 24)
# we try for 3 times but only on a run-every-other-day kinda week.
_default_size = 10 * _secs_in_day
# where to start the new data append index
_rt_buffer_start = int(9*_secs_in_day)
# Tell the "resource tracker" thing to fuck off. # Tell the "resource tracker" thing to fuck off.
class ManTracker(mantracker.ResourceTracker): class ManTracker(mantracker.ResourceTracker):
def register(self, name, rtype): def register(self, name, rtype):
@ -152,7 +159,8 @@ def _make_token(
class ShmArray: class ShmArray:
"""A shared memory ``numpy`` (compatible) array API. '''
A shared memory ``numpy`` (compatible) array API.
An underlying shared memory buffer is allocated based on An underlying shared memory buffer is allocated based on
a user specified ``numpy.ndarray``. This fixed size array a user specified ``numpy.ndarray``. This fixed size array
@ -162,7 +170,7 @@ class ShmArray:
``SharedInt`` interfaces) values such that multiple processes can ``SharedInt`` interfaces) values such that multiple processes can
interact with the same array using a synchronized-index. interact with the same array using a synchronized-index.
""" '''
def __init__( def __init__(
self, self,
shmarr: np.ndarray, shmarr: np.ndarray,
@ -209,7 +217,8 @@ class ShmArray:
@property @property
def array(self) -> np.ndarray: def array(self) -> np.ndarray:
'''Return an up-to-date ``np.ndarray`` view of the '''
Return an up-to-date ``np.ndarray`` view of the
so-far-written data to the underlying shm buffer. so-far-written data to the underlying shm buffer.
''' '''
@ -231,26 +240,34 @@ class ShmArray:
def last( def last(
self, self,
length: int = 1, length: int = 1,
) -> np.ndarray: ) -> np.ndarray:
'''
Return the last ``length``'s worth of ("row") entries from the
array.
'''
return self.array[-length:] return self.array[-length:]
def push( def push(
self, self,
data: np.ndarray, data: np.ndarray,
field_map: Optional[dict[str, str]] = None,
prepend: bool = False, prepend: bool = False,
start: Optional[int] = None, start: Optional[int] = None,
) -> int: ) -> int:
'''Ring buffer like "push" to append data '''
Ring buffer like "push" to append data
into the buffer and return updated "last" index. into the buffer and return updated "last" index.
NB: no actual ring logic yet to give a "loop around" on overflow NB: no actual ring logic yet to give a "loop around" on overflow
condition, lel. condition, lel.
''' '''
self._post_init = True
length = len(data) length = len(data)
index = start or self._last.value index = start if start is not None else self._last.value
if prepend: if prepend:
index = self._first.value - length index = self._first.value - length
@ -263,10 +280,15 @@ class ShmArray:
end = index + length end = index + length
fields = self._write_fields if field_map:
src_names, dst_names = zip(*field_map.items())
else:
dst_names = src_names = self._write_fields
try: try:
self._array[fields][index:end] = data[fields][:] self._array[
list(dst_names)
][index:end] = data[list(src_names)][:]
# NOTE: there was a race here between updating # NOTE: there was a race here between updating
# the first and last indices and when the next reader # the first and last indices and when the next reader
@ -281,9 +303,13 @@ class ShmArray:
else: else:
self._last.value = end self._last.value = end
self._post_init = True
return end return end
except ValueError as err: except ValueError as err:
if field_map:
raise
# should raise if diff detected # should raise if diff detected
self.diff_err_fields(data) self.diff_err_fields(data)
raise err raise err
@ -336,12 +362,6 @@ class ShmArray:
... ...
# how much is probably dependent on lifestyle
_secs_in_day = int(60 * 60 * 24)
# we try for 3 times but only on a run-every-other-day kinda week.
_default_size = 3 * _secs_in_day
def open_shm_array( def open_shm_array(
key: Optional[str] = None, key: Optional[str] = None,
@ -392,7 +412,24 @@ def open_shm_array(
) )
) )
last.value = first.value = int(_secs_in_day) # start the "real-time" updated section after 3-days worth of 1s
# sampled OHLC. this allows appending up to a days worth from
# tick/quote feeds before having to flush to a (tsdb) storage
# backend, and looks something like,
# -------------------------
# | | i
# _________________________
# <-------------> <------->
# history real-time
#
# Once fully "prepended", the history section will leave the
# ``ShmArray._start.value: int = 0`` and the yet-to-be written
# real-time section will start at ``ShmArray.index: int``.
# this sets the index to 3/4 of the length of the buffer
# leaving a "days worth of second samples" for the real-time
# section.
last.value = first.value = _rt_buffer_start
shmarr = ShmArray( shmarr = ShmArray(
array, array,