Compare commits
2 Commits
fbaaf87d71
...
7d5ac3561e
Author | SHA1 | Date |
---|---|---|
|
7d5ac3561e | |
|
740f081d7e |
|
@ -30,8 +30,7 @@ from types import ModuleType
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
Iterator,
|
Iterator,
|
||||||
Generator,
|
Generator
|
||||||
TYPE_CHECKING,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
import pendulum
|
import pendulum
|
||||||
|
@ -60,10 +59,8 @@ from ..clearing._messages import (
|
||||||
BrokerdPosition,
|
BrokerdPosition,
|
||||||
)
|
)
|
||||||
from piker.types import Struct
|
from piker.types import Struct
|
||||||
from piker.log import get_logger
|
from piker.data._symcache import SymbologyCache
|
||||||
|
from ..log import get_logger
|
||||||
if TYPE_CHECKING:
|
|
||||||
from piker.data._symcache import SymbologyCache
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
@ -496,17 +493,6 @@ class Account(Struct):
|
||||||
|
|
||||||
_mktmap_table: dict[str, MktPair] | None = None,
|
_mktmap_table: dict[str, MktPair] | None = None,
|
||||||
|
|
||||||
only_require: list[str]|True = True,
|
|
||||||
# ^list of fqmes that are "required" to be processed from
|
|
||||||
# this ledger pass; we often don't care about others and
|
|
||||||
# definitely shouldn't always error in such cases.
|
|
||||||
# (eg. broker backend loaded that doesn't yet supsport the
|
|
||||||
# symcache but also, inside the paper engine we don't ad-hoc
|
|
||||||
# request `get_mkt_info()` for every symbol in the ledger,
|
|
||||||
# only the one for which we're simulating against).
|
|
||||||
# TODO, not sure if there's a better soln for this, ideally
|
|
||||||
# all backends get symcache support afap i guess..
|
|
||||||
|
|
||||||
) -> dict[str, Position]:
|
) -> dict[str, Position]:
|
||||||
'''
|
'''
|
||||||
Update the internal `.pps[str, Position]` table from input
|
Update the internal `.pps[str, Position]` table from input
|
||||||
|
@ -549,32 +535,11 @@ class Account(Struct):
|
||||||
if _mktmap_table is None:
|
if _mktmap_table is None:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
required: bool = (
|
|
||||||
only_require is True
|
|
||||||
or (
|
|
||||||
only_require is not True
|
|
||||||
and
|
|
||||||
fqme in only_require
|
|
||||||
)
|
|
||||||
)
|
|
||||||
# XXX: caller is allowed to provide a fallback
|
# XXX: caller is allowed to provide a fallback
|
||||||
# mktmap table for the case where a new position is
|
# mktmap table for the case where a new position is
|
||||||
# being added and the preloaded symcache didn't
|
# being added and the preloaded symcache didn't
|
||||||
# have this entry prior (eg. with frickin IB..)
|
# have this entry prior (eg. with frickin IB..)
|
||||||
if (
|
mkt = _mktmap_table[fqme]
|
||||||
not (mkt := _mktmap_table.get(fqme))
|
|
||||||
and
|
|
||||||
required
|
|
||||||
):
|
|
||||||
raise
|
|
||||||
|
|
||||||
elif not required:
|
|
||||||
continue
|
|
||||||
|
|
||||||
else:
|
|
||||||
# should be an entry retreived somewhere
|
|
||||||
assert mkt
|
|
||||||
|
|
||||||
|
|
||||||
if not (pos := pps.get(bs_mktid)):
|
if not (pos := pps.get(bs_mktid)):
|
||||||
|
|
||||||
|
@ -691,7 +656,7 @@ class Account(Struct):
|
||||||
def write_config(self) -> None:
|
def write_config(self) -> None:
|
||||||
'''
|
'''
|
||||||
Write the current account state to the user's account TOML file, normally
|
Write the current account state to the user's account TOML file, normally
|
||||||
something like `pps.toml`.
|
something like ``pps.toml``.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# TODO: show diff output?
|
# TODO: show diff output?
|
||||||
|
|
|
@ -51,7 +51,6 @@ __brokers__: list[str] = [
|
||||||
'ib',
|
'ib',
|
||||||
'kraken',
|
'kraken',
|
||||||
'kucoin',
|
'kucoin',
|
||||||
'deribit',
|
|
||||||
|
|
||||||
# broken but used to work
|
# broken but used to work
|
||||||
# 'questrade',
|
# 'questrade',
|
||||||
|
@ -62,6 +61,7 @@ __brokers__: list[str] = [
|
||||||
# wstrade
|
# wstrade
|
||||||
# iex
|
# iex
|
||||||
|
|
||||||
|
# deribit
|
||||||
# bitso
|
# bitso
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,6 @@ from __future__ import annotations
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
asynccontextmanager as acm,
|
asynccontextmanager as acm,
|
||||||
)
|
)
|
||||||
from functools import partial
|
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
from typing import (
|
from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
|
@ -191,17 +190,14 @@ def broker_init(
|
||||||
|
|
||||||
|
|
||||||
async def spawn_brokerd(
|
async def spawn_brokerd(
|
||||||
|
|
||||||
brokername: str,
|
brokername: str,
|
||||||
loglevel: str | None = None,
|
loglevel: str | None = None,
|
||||||
|
|
||||||
**tractor_kwargs,
|
**tractor_kwargs,
|
||||||
|
|
||||||
) -> bool:
|
) -> bool:
|
||||||
'''
|
|
||||||
Spawn a `brokerd.<backendname>` subactor service daemon
|
|
||||||
using `pikerd`'s service mngr.
|
|
||||||
|
|
||||||
'''
|
|
||||||
from piker.service._util import log # use service mngr log
|
from piker.service._util import log # use service mngr log
|
||||||
log.info(f'Spawning {brokername} broker daemon')
|
log.info(f'Spawning {brokername} broker daemon')
|
||||||
|
|
||||||
|
@ -221,35 +217,27 @@ async def spawn_brokerd(
|
||||||
|
|
||||||
# ask `pikerd` to spawn a new sub-actor and manage it under its
|
# ask `pikerd` to spawn a new sub-actor and manage it under its
|
||||||
# actor nursery
|
# actor nursery
|
||||||
from piker.service import (
|
from piker.service import Services
|
||||||
get_service_mngr,
|
|
||||||
ServiceMngr,
|
|
||||||
)
|
|
||||||
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
|
|
||||||
mngr: ServiceMngr = get_service_mngr()
|
|
||||||
ctx: tractor.Context = await mngr.start_service(
|
|
||||||
daemon_name=dname,
|
|
||||||
ctx_ep=partial(
|
|
||||||
# signature of target root-task endpoint
|
|
||||||
daemon_fixture_ep,
|
|
||||||
|
|
||||||
# passed to daemon_fixture_ep(**kwargs)
|
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
|
||||||
brokername=brokername,
|
portal = await Services.actor_n.start_actor(
|
||||||
loglevel=loglevel,
|
dname,
|
||||||
),
|
enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'),
|
||||||
debug_mode=mngr.debug_mode,
|
debug_mode=Services.debug_mode,
|
||||||
loglevel=loglevel,
|
|
||||||
enable_modules=(
|
|
||||||
_data_mods
|
|
||||||
+
|
|
||||||
tractor_kwargs.pop('enable_modules')
|
|
||||||
),
|
|
||||||
**tractor_kwargs
|
**tractor_kwargs
|
||||||
)
|
)
|
||||||
assert (
|
|
||||||
not ctx.cancel_called
|
# NOTE: the service mngr expects an already spawned actor + its
|
||||||
and ctx.portal # parent side
|
# portal ref in order to do non-blocking setup of brokerd
|
||||||
and dname in ctx.chan.uid # subactor is named as desired
|
# service nursery.
|
||||||
|
await Services.start_service_task(
|
||||||
|
dname,
|
||||||
|
portal,
|
||||||
|
|
||||||
|
# signature of target root-task endpoint
|
||||||
|
daemon_fixture_ep,
|
||||||
|
brokername=brokername,
|
||||||
|
loglevel=loglevel,
|
||||||
)
|
)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@ -274,7 +262,8 @@ async def maybe_spawn_brokerd(
|
||||||
from piker.service import maybe_spawn_daemon
|
from piker.service import maybe_spawn_daemon
|
||||||
|
|
||||||
async with maybe_spawn_daemon(
|
async with maybe_spawn_daemon(
|
||||||
service_name=f'brokerd.{brokername}',
|
|
||||||
|
f'brokerd.{brokername}',
|
||||||
service_task_target=spawn_brokerd,
|
service_task_target=spawn_brokerd,
|
||||||
spawn_args={
|
spawn_args={
|
||||||
'brokername': brokername,
|
'brokername': brokername,
|
||||||
|
|
|
@ -567,7 +567,6 @@ class Client:
|
||||||
) -> str:
|
) -> str:
|
||||||
return {
|
return {
|
||||||
'USDTM': 'usdtm_futes',
|
'USDTM': 'usdtm_futes',
|
||||||
'SPOT': 'spot',
|
|
||||||
# 'COINM': 'coin_futes',
|
# 'COINM': 'coin_futes',
|
||||||
# ^-TODO-^ bc someone might want it..?
|
# ^-TODO-^ bc someone might want it..?
|
||||||
}[pair.venue]
|
}[pair.venue]
|
||||||
|
|
|
@ -181,6 +181,7 @@ class FutesPair(Pair):
|
||||||
quoteAsset: str # 'USDT',
|
quoteAsset: str # 'USDT',
|
||||||
quotePrecision: int # 8,
|
quotePrecision: int # 8,
|
||||||
requiredMarginPercent: float # '5.0000',
|
requiredMarginPercent: float # '5.0000',
|
||||||
|
settlePlan: int # 0,
|
||||||
timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'],
|
timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'],
|
||||||
triggerProtect: float # '0.0500',
|
triggerProtect: float # '0.0500',
|
||||||
underlyingSubType: list[str] # ['PoW'],
|
underlyingSubType: list[str] # ['PoW'],
|
||||||
|
|
|
@ -25,7 +25,6 @@ from .api import (
|
||||||
get_client,
|
get_client,
|
||||||
)
|
)
|
||||||
from .feed import (
|
from .feed import (
|
||||||
get_mkt_info,
|
|
||||||
open_history_client,
|
open_history_client,
|
||||||
open_symbol_search,
|
open_symbol_search,
|
||||||
stream_quotes,
|
stream_quotes,
|
||||||
|
@ -35,20 +34,15 @@ from .feed import (
|
||||||
# open_trade_dialog,
|
# open_trade_dialog,
|
||||||
# norm_trade_records,
|
# norm_trade_records,
|
||||||
# )
|
# )
|
||||||
from .venues import (
|
|
||||||
OptionPair,
|
|
||||||
)
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
'get_client',
|
'get_client',
|
||||||
# 'trades_dialogue',
|
# 'trades_dialogue',
|
||||||
'get_mkt_info',
|
|
||||||
'open_history_client',
|
'open_history_client',
|
||||||
'open_symbol_search',
|
'open_symbol_search',
|
||||||
'stream_quotes',
|
'stream_quotes',
|
||||||
'OptionPair',
|
|
||||||
# 'norm_trade_records',
|
# 'norm_trade_records',
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -18,59 +18,38 @@
|
||||||
Deribit backend.
|
Deribit backend.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import (
|
from typing import Any, Optional, Callable
|
||||||
# Any,
|
|
||||||
# Optional,
|
|
||||||
Callable,
|
|
||||||
)
|
|
||||||
# from pprint import pformat
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import cryptofeed
|
|
||||||
import trio
|
import trio
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
from pendulum import (
|
import pendulum
|
||||||
from_timestamp,
|
from rapidfuzz import process as fuzzy
|
||||||
)
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import tractor
|
import tractor
|
||||||
|
|
||||||
from piker.accounting import (
|
from piker.brokers import open_cached_client
|
||||||
Asset,
|
from piker.log import get_logger, get_console_log
|
||||||
MktPair,
|
from piker.data import ShmArray
|
||||||
unpack_fqme,
|
from piker.brokers._util import (
|
||||||
)
|
BrokerError,
|
||||||
from piker.brokers import (
|
|
||||||
open_cached_client,
|
|
||||||
NoData,
|
|
||||||
DataUnavailable,
|
DataUnavailable,
|
||||||
)
|
)
|
||||||
from piker._cacheables import (
|
|
||||||
async_lifo_cache,
|
|
||||||
)
|
|
||||||
from piker.log import (
|
|
||||||
get_logger,
|
|
||||||
mk_repr,
|
|
||||||
)
|
|
||||||
from piker.data.validate import FeedInit
|
|
||||||
|
|
||||||
|
from cryptofeed import FeedHandler
|
||||||
|
from cryptofeed.defines import (
|
||||||
|
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
|
||||||
|
)
|
||||||
|
from cryptofeed.symbols import Symbol
|
||||||
|
|
||||||
from .api import (
|
from .api import (
|
||||||
Client,
|
Client, Trade,
|
||||||
# get_config,
|
get_config,
|
||||||
piker_sym_to_cb_sym,
|
str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
|
||||||
cb_sym_to_deribit_inst,
|
|
||||||
str_to_cb_sym,
|
|
||||||
maybe_open_price_feed
|
maybe_open_price_feed
|
||||||
)
|
)
|
||||||
from .venues import (
|
|
||||||
Pair,
|
|
||||||
OptionPair,
|
|
||||||
Trade,
|
|
||||||
)
|
|
||||||
|
|
||||||
_spawn_kwargs = {
|
_spawn_kwargs = {
|
||||||
'infect_asyncio': True,
|
'infect_asyncio': True,
|
||||||
|
@ -85,215 +64,90 @@ async def open_history_client(
|
||||||
mkt: MktPair,
|
mkt: MktPair,
|
||||||
) -> tuple[Callable, int]:
|
) -> tuple[Callable, int]:
|
||||||
|
|
||||||
|
fnstrument: str = mkt.bs_fqme
|
||||||
# TODO implement history getter for the new storage layer.
|
# TODO implement history getter for the new storage layer.
|
||||||
async with open_cached_client('deribit') as client:
|
async with open_cached_client('deribit') as client:
|
||||||
|
|
||||||
pair: OptionPair = client._pairs[mkt.dst.name]
|
|
||||||
# XXX NOTE, the cuckers use ms !!!
|
|
||||||
creation_time_s: int = pair.creation_timestamp/1000
|
|
||||||
|
|
||||||
async def get_ohlc(
|
async def get_ohlc(
|
||||||
timeframe: float,
|
end_dt: Optional[datetime] = None,
|
||||||
end_dt: datetime | None = None,
|
start_dt: Optional[datetime] = None,
|
||||||
start_dt: datetime | None = None,
|
|
||||||
|
|
||||||
) -> tuple[
|
) -> tuple[
|
||||||
np.ndarray,
|
np.ndarray,
|
||||||
datetime, # start
|
datetime, # start
|
||||||
datetime, # end
|
datetime, # end
|
||||||
]:
|
]:
|
||||||
if timeframe != 60:
|
|
||||||
raise DataUnavailable('Only 1m bars are supported')
|
|
||||||
|
|
||||||
array: np.ndarray = await client.bars(
|
array = await client.bars(
|
||||||
mkt,
|
instrument,
|
||||||
start_dt=start_dt,
|
start_dt=start_dt,
|
||||||
end_dt=end_dt,
|
end_dt=end_dt,
|
||||||
)
|
)
|
||||||
if len(array) == 0:
|
if len(array) == 0:
|
||||||
if (
|
raise DataUnavailable
|
||||||
end_dt is None
|
|
||||||
):
|
|
||||||
raise DataUnavailable(
|
|
||||||
'No history seems to exist yet?\n\n'
|
|
||||||
f'{mkt}'
|
|
||||||
)
|
|
||||||
elif (
|
|
||||||
end_dt
|
|
||||||
and
|
|
||||||
end_dt.timestamp() < creation_time_s
|
|
||||||
):
|
|
||||||
# the contract can't have history
|
|
||||||
# before it was created.
|
|
||||||
pair_type_str: str = type(pair).__name__
|
|
||||||
create_dt: datetime = from_timestamp(creation_time_s)
|
|
||||||
raise DataUnavailable(
|
|
||||||
f'No history prior to\n'
|
|
||||||
f'`{pair_type_str}.creation_timestamp: int = '
|
|
||||||
f'{pair.creation_timestamp}\n\n'
|
|
||||||
f'------ deribit sux ------\n'
|
|
||||||
f'WHICH IN "NORMAL PEOPLE WHO USE EPOCH TIME" form is,\n'
|
|
||||||
f'creation_time_s: {creation_time_s}\n'
|
|
||||||
f'create_dt: {create_dt}\n'
|
|
||||||
)
|
|
||||||
raise NoData(
|
|
||||||
f'No frame for {start_dt} -> {end_dt}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
start_dt = from_timestamp(array[0]['time'])
|
start_dt = pendulum.from_timestamp(array[0]['time'])
|
||||||
end_dt = from_timestamp(array[-1]['time'])
|
end_dt = pendulum.from_timestamp(array[-1]['time'])
|
||||||
|
|
||||||
times = array['time']
|
|
||||||
if not times.any():
|
|
||||||
raise ValueError(
|
|
||||||
'Bad frame with null-times?\n\n'
|
|
||||||
f'{times}'
|
|
||||||
)
|
|
||||||
|
|
||||||
if end_dt is None:
|
|
||||||
inow: int = round(time.time())
|
|
||||||
if (inow - times[-1]) > 60:
|
|
||||||
await tractor.pause()
|
|
||||||
|
|
||||||
return array, start_dt, end_dt
|
return array, start_dt, end_dt
|
||||||
|
|
||||||
yield (
|
yield get_ohlc, {'erlangs': 3, 'rate': 3}
|
||||||
get_ohlc,
|
|
||||||
{ # backfill config
|
|
||||||
'erlangs': 3,
|
|
||||||
'rate': 3,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@async_lifo_cache()
|
|
||||||
async def get_mkt_info(
|
|
||||||
fqme: str,
|
|
||||||
|
|
||||||
) -> tuple[MktPair, Pair|OptionPair] | None:
|
|
||||||
|
|
||||||
# uppercase since kraken bs_mktid is always upper
|
|
||||||
if 'deribit' not in fqme.lower():
|
|
||||||
fqme += '.deribit'
|
|
||||||
|
|
||||||
mkt_mode: str = ''
|
|
||||||
broker, mkt_ep, venue, expiry = unpack_fqme(fqme)
|
|
||||||
|
|
||||||
# NOTE: we always upper case all tokens to be consistent with
|
|
||||||
# binance's symbology style for pairs, like `BTCUSDT`, but in
|
|
||||||
# theory we could also just keep things lower case; as long as
|
|
||||||
# we're consistent and the symcache matches whatever this func
|
|
||||||
# returns, always!
|
|
||||||
expiry: str = expiry.upper()
|
|
||||||
venue: str = venue.upper()
|
|
||||||
# venue_lower: str = venue.lower()
|
|
||||||
|
|
||||||
mkt_mode: str = 'option'
|
|
||||||
|
|
||||||
async with open_cached_client(
|
|
||||||
'deribit',
|
|
||||||
) as client:
|
|
||||||
|
|
||||||
assets: dict[str, Asset] = await client.get_assets()
|
|
||||||
pair_str: str = mkt_ep.lower()
|
|
||||||
|
|
||||||
pair: Pair = await client.exch_info(
|
|
||||||
sym=pair_str,
|
|
||||||
)
|
|
||||||
mkt_mode = pair.venue
|
|
||||||
client.mkt_mode = mkt_mode
|
|
||||||
|
|
||||||
dst: Asset | None = assets.get(pair.bs_dst_asset)
|
|
||||||
src: Asset | None = assets.get(pair.bs_src_asset)
|
|
||||||
|
|
||||||
mkt = MktPair(
|
|
||||||
dst=dst,
|
|
||||||
src=src,
|
|
||||||
price_tick=pair.price_tick,
|
|
||||||
size_tick=pair.size_tick,
|
|
||||||
bs_mktid=pair.symbol,
|
|
||||||
venue=mkt_mode,
|
|
||||||
broker='deribit',
|
|
||||||
_atype=mkt_mode,
|
|
||||||
_fqme_without_src=True,
|
|
||||||
|
|
||||||
# expiry=pair.expiry,
|
|
||||||
# XXX TODO, currently we don't use it since it's
|
|
||||||
# already "described" in the `OptionPair.symbol: str`
|
|
||||||
# and if we slap in the ISO repr it's kinda hideous..
|
|
||||||
# -[ ] figure out the best either std
|
|
||||||
)
|
|
||||||
return mkt, pair
|
|
||||||
|
|
||||||
|
|
||||||
async def stream_quotes(
|
async def stream_quotes(
|
||||||
|
|
||||||
send_chan: trio.abc.SendChannel,
|
send_chan: trio.abc.SendChannel,
|
||||||
symbols: list[str],
|
symbols: list[str],
|
||||||
feed_is_live: trio.Event,
|
feed_is_live: trio.Event,
|
||||||
|
loglevel: str = None,
|
||||||
|
|
||||||
# startup sync
|
# startup sync
|
||||||
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
# XXX: required to propagate ``tractor`` loglevel to piker logging
|
||||||
Open a live quote stream for the market set defined by `symbols`.
|
get_console_log(loglevel or tractor.current_actor().loglevel)
|
||||||
|
|
||||||
Internally this starts a `cryptofeed.FeedHandler` inside an `asyncio`-side
|
sym = symbols[0]
|
||||||
task and relays through L1 and `Trade` msgs here to our `trio.Task`.
|
|
||||||
|
|
||||||
'''
|
|
||||||
sym = symbols[0].split('.')[0]
|
|
||||||
init_msgs: list[FeedInit] = []
|
|
||||||
|
|
||||||
# multiline nested `dict` formatter (since rn quote-msgs are
|
|
||||||
# just that).
|
|
||||||
pfmt: Callable[[str], str] = mk_repr(
|
|
||||||
# so we can see `deribit`'s delightfully mega-long bs fields..
|
|
||||||
maxstring=100,
|
|
||||||
)
|
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
open_cached_client('deribit') as client,
|
open_cached_client('deribit') as client,
|
||||||
send_chan as send_chan
|
send_chan as send_chan
|
||||||
):
|
):
|
||||||
mkt: MktPair
|
|
||||||
pair: Pair
|
|
||||||
mkt, pair = await get_mkt_info(sym)
|
|
||||||
|
|
||||||
# build out init msgs according to latest spec
|
init_msgs = {
|
||||||
init_msgs.append(
|
# pass back token, and bool, signalling if we're the writer
|
||||||
FeedInit(
|
# and that history has been written
|
||||||
mkt_info=mkt,
|
sym: {
|
||||||
)
|
'symbol_info': {
|
||||||
)
|
'asset_type': 'option',
|
||||||
# build `cryptofeed` feed-handle
|
'price_tick_size': 0.0005
|
||||||
cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym)
|
},
|
||||||
|
'shm_write_opts': {'sum_tick_vml': False},
|
||||||
|
'fqsn': sym,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
from_cf: tractor.to_asyncio.LinkedTaskChannel
|
nsym = piker_sym_to_cb_sym(sym)
|
||||||
async with maybe_open_price_feed(sym) as from_cf:
|
|
||||||
|
|
||||||
# load the "last trades" summary
|
async with maybe_open_price_feed(sym) as stream:
|
||||||
last_trades_res: cryptofeed.LastTradesResult = await client.last_trades(
|
|
||||||
cb_sym_to_deribit_inst(cf_sym),
|
|
||||||
count=1,
|
|
||||||
)
|
|
||||||
last_trades: list[Trade] = last_trades_res.trades
|
|
||||||
|
|
||||||
# TODO, do we even need this or will the above always
|
cache = await client.cache_symbols()
|
||||||
# work?
|
|
||||||
# if not last_trades:
|
|
||||||
# await tractor.pause()
|
|
||||||
# async for typ, quote in from_cf:
|
|
||||||
# if typ == 'trade':
|
|
||||||
# last_trade = Trade(**(quote['data']))
|
|
||||||
# break
|
|
||||||
|
|
||||||
# else:
|
last_trades = (await client.last_trades(
|
||||||
last_trade = Trade(
|
cb_sym_to_deribit_inst(nsym), count=1)).trades
|
||||||
**(last_trades[0])
|
|
||||||
)
|
|
||||||
|
|
||||||
first_quote: dict = {
|
if len(last_trades) == 0:
|
||||||
|
last_trade = None
|
||||||
|
async for typ, quote in stream:
|
||||||
|
if typ == 'trade':
|
||||||
|
last_trade = Trade(**(quote['data']))
|
||||||
|
break
|
||||||
|
|
||||||
|
else:
|
||||||
|
last_trade = Trade(**(last_trades[0]))
|
||||||
|
|
||||||
|
first_quote = {
|
||||||
'symbol': sym,
|
'symbol': sym,
|
||||||
'last': last_trade.price,
|
'last': last_trade.price,
|
||||||
'brokerd_ts': last_trade.timestamp,
|
'brokerd_ts': last_trade.timestamp,
|
||||||
|
@ -304,84 +158,13 @@ async def stream_quotes(
|
||||||
'broker_ts': last_trade.timestamp
|
'broker_ts': last_trade.timestamp
|
||||||
}]
|
}]
|
||||||
}
|
}
|
||||||
task_status.started((
|
task_status.started((init_msgs, first_quote))
|
||||||
init_msgs,
|
|
||||||
first_quote,
|
|
||||||
))
|
|
||||||
|
|
||||||
feed_is_live.set()
|
feed_is_live.set()
|
||||||
|
|
||||||
# NOTE XXX, static for now!
|
async for typ, quote in stream:
|
||||||
# => since this only handles ONE mkt feed at a time we
|
topic = quote['symbol']
|
||||||
# don't need a lookup table to map interleaved quotes
|
await send_chan.send({topic: quote})
|
||||||
# from multiple possible mkt-pairs
|
|
||||||
topic: str = mkt.bs_fqme
|
|
||||||
|
|
||||||
# deliver until cancelled
|
|
||||||
async for typ, ref in from_cf:
|
|
||||||
match typ:
|
|
||||||
case 'trade':
|
|
||||||
trade: cryptofeed.types.Trade = ref
|
|
||||||
|
|
||||||
# TODO, re-impl this according to teh ideal
|
|
||||||
# fqme for opts that we choose!!
|
|
||||||
bs_fqme: str = cb_sym_to_deribit_inst(
|
|
||||||
str_to_cb_sym(trade.symbol)
|
|
||||||
).lower()
|
|
||||||
|
|
||||||
piker_quote: dict = {
|
|
||||||
'symbol': bs_fqme,
|
|
||||||
'last': trade.price,
|
|
||||||
'broker_ts': time.time(),
|
|
||||||
# ^TODO, name this `brokerd/datad_ts` and
|
|
||||||
# use `time.time_ns()` ??
|
|
||||||
'ticks': [{
|
|
||||||
'type': 'trade',
|
|
||||||
'price': float(trade.price),
|
|
||||||
'size': float(trade.amount),
|
|
||||||
'broker_ts': trade.timestamp,
|
|
||||||
}],
|
|
||||||
}
|
|
||||||
log.info(
|
|
||||||
f'deribit {typ!r} quote for {sym!r}\n\n'
|
|
||||||
f'{trade}\n\n'
|
|
||||||
f'{pfmt(piker_quote)}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
case 'l1':
|
|
||||||
book: cryptofeed.types.L1Book = ref
|
|
||||||
|
|
||||||
# TODO, so this is where we can possibly change things
|
|
||||||
# and instead lever the `MktPair.bs_fqme: str` output?
|
|
||||||
bs_fqme: str = cb_sym_to_deribit_inst(
|
|
||||||
str_to_cb_sym(book.symbol)
|
|
||||||
).lower()
|
|
||||||
|
|
||||||
piker_quote: dict = {
|
|
||||||
'symbol': bs_fqme,
|
|
||||||
'ticks': [
|
|
||||||
|
|
||||||
{'type': 'bid',
|
|
||||||
'price': float(book.bid_price),
|
|
||||||
'size': float(book.bid_size)},
|
|
||||||
|
|
||||||
{'type': 'bsize',
|
|
||||||
'price': float(book.bid_price),
|
|
||||||
'size': float(book.bid_size),},
|
|
||||||
|
|
||||||
{'type': 'ask',
|
|
||||||
'price': float(book.ask_price),
|
|
||||||
'size': float(book.ask_size),},
|
|
||||||
|
|
||||||
{'type': 'asize',
|
|
||||||
'price': float(book.ask_price),
|
|
||||||
'size': float(book.ask_size),}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
|
|
||||||
await send_chan.send({
|
|
||||||
topic: piker_quote,
|
|
||||||
})
|
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
@ -391,21 +174,12 @@ async def open_symbol_search(
|
||||||
async with open_cached_client('deribit') as client:
|
async with open_cached_client('deribit') as client:
|
||||||
|
|
||||||
# load all symbols locally for fast search
|
# load all symbols locally for fast search
|
||||||
# cache = client._pairs
|
cache = await client.cache_symbols()
|
||||||
await ctx.started()
|
await ctx.started()
|
||||||
|
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
pattern: str
|
|
||||||
async for pattern in stream:
|
async for pattern in stream:
|
||||||
|
# repack in dict form
|
||||||
# NOTE: pattern fuzzy-matching is done within
|
await stream.send(
|
||||||
# the methd impl.
|
await client.search_symbols(pattern))
|
||||||
pairs: dict[str, Pair] = await client.search_symbols(
|
|
||||||
pattern,
|
|
||||||
)
|
|
||||||
# repack in fqme-keyed table
|
|
||||||
byfqme: dict[str, Pair] = {}
|
|
||||||
for pair in pairs.values():
|
|
||||||
byfqme[pair.bs_fqme] = pair
|
|
||||||
|
|
||||||
await stream.send(byfqme)
|
|
||||||
|
|
|
@ -1,196 +0,0 @@
|
||||||
# piker: trading gear for hackers
|
|
||||||
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
|
|
||||||
|
|
||||||
# This program is free software: you can redistribute it and/or modify
|
|
||||||
# it under the terms of the GNU Affero General Public License as published by
|
|
||||||
# the Free Software Foundation, either version 3 of the License, or
|
|
||||||
# (at your option) any later version.
|
|
||||||
|
|
||||||
# This program is distributed in the hope that it will be useful,
|
|
||||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
# GNU Affero General Public License for more details.
|
|
||||||
|
|
||||||
# You should have received a copy of the GNU Affero General Public License
|
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
"""
|
|
||||||
Per market data-type definitions and schemas types.
|
|
||||||
|
|
||||||
"""
|
|
||||||
from __future__ import annotations
|
|
||||||
import pendulum
|
|
||||||
from typing import (
|
|
||||||
Literal,
|
|
||||||
Optional,
|
|
||||||
)
|
|
||||||
from decimal import Decimal
|
|
||||||
|
|
||||||
from piker.types import Struct
|
|
||||||
|
|
||||||
|
|
||||||
# API endpoint paths by venue / sub-API
|
|
||||||
_domain: str = 'deribit.com'
|
|
||||||
_url = f'https://www.{_domain}'
|
|
||||||
|
|
||||||
# WEBsocketz
|
|
||||||
_ws_url: str = f'wss://www.{_domain}/ws/api/v2'
|
|
||||||
|
|
||||||
# test nets
|
|
||||||
_testnet_ws_url: str = f'wss://test.{_domain}/ws/api/v2'
|
|
||||||
|
|
||||||
MarketType = Literal[
|
|
||||||
'option'
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
def get_api_eps(venue: MarketType) -> tuple[str, str]:
|
|
||||||
'''
|
|
||||||
Return API ep root paths per venue.
|
|
||||||
|
|
||||||
'''
|
|
||||||
return {
|
|
||||||
'option': (
|
|
||||||
_ws_url,
|
|
||||||
),
|
|
||||||
}[venue]
|
|
||||||
|
|
||||||
|
|
||||||
class Pair(Struct, frozen=True, kw_only=True):
|
|
||||||
|
|
||||||
symbol: str
|
|
||||||
|
|
||||||
# src
|
|
||||||
quote_currency: str # 'BTC'
|
|
||||||
|
|
||||||
# dst
|
|
||||||
base_currency: str # "BTC",
|
|
||||||
|
|
||||||
tick_size: float # 0.0001 # [{'above_price': 0.005, 'tick_size': 0.0005}]
|
|
||||||
tick_size_steps: list[dict[str, float]]
|
|
||||||
|
|
||||||
@property
|
|
||||||
def price_tick(self) -> Decimal:
|
|
||||||
return Decimal(str(self.tick_size_steps[0]['above_price']))
|
|
||||||
|
|
||||||
@property
|
|
||||||
def size_tick(self) -> Decimal:
|
|
||||||
return Decimal(str(self.tick_size))
|
|
||||||
|
|
||||||
@property
|
|
||||||
def bs_fqme(self) -> str:
|
|
||||||
return f'{self.symbol}'
|
|
||||||
|
|
||||||
@property
|
|
||||||
def bs_mktid(self) -> str:
|
|
||||||
return f'{self.symbol}.{self.venue}'
|
|
||||||
|
|
||||||
|
|
||||||
class OptionPair(Pair, frozen=True):
|
|
||||||
|
|
||||||
taker_commission: float # 0.0003
|
|
||||||
strike: float # 5000.0
|
|
||||||
settlement_period: str # 'day'
|
|
||||||
settlement_currency: str # "BTC",
|
|
||||||
rfq: bool # false
|
|
||||||
price_index: str # 'btc_usd'
|
|
||||||
option_type: str # 'call'
|
|
||||||
min_trade_amount: float # 0.1
|
|
||||||
maker_commission: float # 0.0003
|
|
||||||
kind: str # 'option'
|
|
||||||
is_active: bool # true
|
|
||||||
instrument_type: str # 'reversed'
|
|
||||||
instrument_name: str # 'BTC-1SEP24-55000-C'
|
|
||||||
instrument_id: int # 364671
|
|
||||||
expiration_timestamp: int # 1725177600000
|
|
||||||
creation_timestamp: int # 1724918461000
|
|
||||||
counter_currency: str # 'USD'
|
|
||||||
contract_size: float # '1.0'
|
|
||||||
block_trade_tick_size: float # '0.0001'
|
|
||||||
block_trade_min_trade_amount: int # '25'
|
|
||||||
block_trade_commission: float # '0.003'
|
|
||||||
|
|
||||||
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
|
|
||||||
ns_path: str = 'piker.brokers.deribit:OptionPair'
|
|
||||||
|
|
||||||
# TODO, impl this without the MM:SS part of
|
|
||||||
# the `'THH:MM:SS..'` etc..
|
|
||||||
@property
|
|
||||||
def expiry(self) -> str:
|
|
||||||
iso_date = pendulum.from_timestamp(
|
|
||||||
self.expiration_timestamp / 1000
|
|
||||||
).isoformat()
|
|
||||||
return iso_date
|
|
||||||
|
|
||||||
@property
|
|
||||||
def venue(self) -> str:
|
|
||||||
return f'{self.instrument_type}_option'
|
|
||||||
|
|
||||||
@property
|
|
||||||
def bs_fqme(self) -> str:
|
|
||||||
return f'{self.symbol}'
|
|
||||||
|
|
||||||
@property
|
|
||||||
def bs_src_asset(self) -> str:
|
|
||||||
return f'{self.quote_currency}'
|
|
||||||
|
|
||||||
@property
|
|
||||||
def bs_dst_asset(self) -> str:
|
|
||||||
return f'{self.symbol}'
|
|
||||||
|
|
||||||
|
|
||||||
PAIRTYPES: dict[MarketType, Pair] = {
|
|
||||||
'option': OptionPair,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class JSONRPCResult(Struct):
|
|
||||||
id: int
|
|
||||||
usIn: int
|
|
||||||
usOut: int
|
|
||||||
usDiff: int
|
|
||||||
testnet: bool
|
|
||||||
jsonrpc: str = '2.0'
|
|
||||||
error: Optional[dict] = None
|
|
||||||
result: Optional[list[dict]] = None
|
|
||||||
|
|
||||||
|
|
||||||
class JSONRPCChannel(Struct):
|
|
||||||
method: str
|
|
||||||
params: dict
|
|
||||||
jsonrpc: str = '2.0'
|
|
||||||
|
|
||||||
|
|
||||||
class KLinesResult(Struct):
|
|
||||||
low: list[float]
|
|
||||||
cost: list[float]
|
|
||||||
high: list[float]
|
|
||||||
open: list[float]
|
|
||||||
close: list[float]
|
|
||||||
ticks: list[int]
|
|
||||||
status: str
|
|
||||||
volume: list[float]
|
|
||||||
|
|
||||||
|
|
||||||
class Trade(Struct):
|
|
||||||
iv: float
|
|
||||||
price: float
|
|
||||||
amount: float
|
|
||||||
trade_id: str
|
|
||||||
contracts: float
|
|
||||||
direction: str
|
|
||||||
trade_seq: int
|
|
||||||
timestamp: int
|
|
||||||
mark_price: float
|
|
||||||
index_price: float
|
|
||||||
tick_direction: int
|
|
||||||
instrument_name: str
|
|
||||||
combo_id: Optional[str] = '',
|
|
||||||
combo_trade_id: Optional[int] = 0,
|
|
||||||
block_trade_id: Optional[str] = '',
|
|
||||||
block_trade_leg_count: Optional[int] = 0,
|
|
||||||
|
|
||||||
|
|
||||||
class LastTradesResult(Struct):
|
|
||||||
trades: list[Trade]
|
|
||||||
has_more: bool
|
|
|
@ -111,10 +111,6 @@ class KucoinMktPair(Struct, frozen=True):
|
||||||
quoteMaxSize: float
|
quoteMaxSize: float
|
||||||
quoteMinSize: float
|
quoteMinSize: float
|
||||||
symbol: str # our bs_mktid, kucoin's internal id
|
symbol: str # our bs_mktid, kucoin's internal id
|
||||||
feeCategory: int
|
|
||||||
makerFeeCoefficient: float
|
|
||||||
takerFeeCoefficient: float
|
|
||||||
st: bool
|
|
||||||
|
|
||||||
|
|
||||||
class AccountTrade(Struct, frozen=True):
|
class AccountTrade(Struct, frozen=True):
|
||||||
|
@ -597,7 +593,7 @@ async def get_client() -> AsyncGenerator[Client, None]:
|
||||||
'''
|
'''
|
||||||
async with (
|
async with (
|
||||||
httpx.AsyncClient(
|
httpx.AsyncClient(
|
||||||
base_url='https://api.kucoin.com/api',
|
base_url=f'https://api.kucoin.com/api',
|
||||||
) as trio_client,
|
) as trio_client,
|
||||||
):
|
):
|
||||||
client = Client(httpx_client=trio_client)
|
client = Client(httpx_client=trio_client)
|
||||||
|
@ -641,7 +637,7 @@ async def open_ping_task(
|
||||||
await trio.sleep((ping_interval - 1000) / 1000)
|
await trio.sleep((ping_interval - 1000) / 1000)
|
||||||
await ws.send_msg({'id': connect_id, 'type': 'ping'})
|
await ws.send_msg({'id': connect_id, 'type': 'ping'})
|
||||||
|
|
||||||
log.warning('Starting ping task for kucoin ws connection')
|
log.info('Starting ping task for kucoin ws connection')
|
||||||
n.start_soon(ping_server)
|
n.start_soon(ping_server)
|
||||||
|
|
||||||
yield
|
yield
|
||||||
|
@ -653,14 +649,9 @@ async def open_ping_task(
|
||||||
async def get_mkt_info(
|
async def get_mkt_info(
|
||||||
fqme: str,
|
fqme: str,
|
||||||
|
|
||||||
) -> tuple[
|
) -> tuple[MktPair, KucoinMktPair]:
|
||||||
MktPair,
|
|
||||||
KucoinMktPair,
|
|
||||||
]:
|
|
||||||
'''
|
'''
|
||||||
Query for and return both a `piker.accounting.MktPair` and
|
Query for and return a `MktPair` and `KucoinMktPair`.
|
||||||
`KucoinMktPair` from provided `fqme: str`
|
|
||||||
(fully-qualified-market-endpoint).
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async with open_cached_client('kucoin') as client:
|
async with open_cached_client('kucoin') as client:
|
||||||
|
@ -735,8 +726,6 @@ async def stream_quotes(
|
||||||
|
|
||||||
log.info(f'Starting up quote stream(s) for {symbols}')
|
log.info(f'Starting up quote stream(s) for {symbols}')
|
||||||
for sym_str in symbols:
|
for sym_str in symbols:
|
||||||
mkt: MktPair
|
|
||||||
pair: KucoinMktPair
|
|
||||||
mkt, pair = await get_mkt_info(sym_str)
|
mkt, pair = await get_mkt_info(sym_str)
|
||||||
init_msgs.append(
|
init_msgs.append(
|
||||||
FeedInit(mkt_info=mkt)
|
FeedInit(mkt_info=mkt)
|
||||||
|
@ -744,11 +733,7 @@ async def stream_quotes(
|
||||||
|
|
||||||
ws: NoBsWs
|
ws: NoBsWs
|
||||||
token, ping_interval = await client._get_ws_token()
|
token, ping_interval = await client._get_ws_token()
|
||||||
log.info('API reported ping_interval: {ping_interval}\n')
|
connect_id = str(uuid4())
|
||||||
|
|
||||||
connect_id: str = str(uuid4())
|
|
||||||
typ: str
|
|
||||||
quote: dict
|
|
||||||
async with (
|
async with (
|
||||||
open_autorecon_ws(
|
open_autorecon_ws(
|
||||||
(
|
(
|
||||||
|
@ -762,37 +747,20 @@ async def stream_quotes(
|
||||||
),
|
),
|
||||||
) as ws,
|
) as ws,
|
||||||
open_ping_task(ws, ping_interval, connect_id),
|
open_ping_task(ws, ping_interval, connect_id),
|
||||||
aclosing(
|
aclosing(stream_messages(ws, sym_str)) as msg_gen,
|
||||||
iter_normed_quotes(
|
|
||||||
ws, sym_str
|
|
||||||
)
|
|
||||||
) as iter_quotes,
|
|
||||||
):
|
):
|
||||||
typ, quote = await anext(iter_quotes)
|
typ, quote = await anext(msg_gen)
|
||||||
|
|
||||||
# take care to not unblock here until we get a real
|
while typ != 'trade':
|
||||||
# trade quote?
|
# take care to not unblock here until we get a real
|
||||||
# ^TODO, remove this right?
|
# trade quote
|
||||||
# -[ ] what often blocks chart boot/new-feed switching
|
typ, quote = await anext(msg_gen)
|
||||||
# since we'ere waiting for a live quote instead of just
|
|
||||||
# loading history afap..
|
|
||||||
# |_ XXX, not sure if we require a bit of rework to core
|
|
||||||
# feed init logic or if backends justg gotta be
|
|
||||||
# changed up.. feel like there was some causality
|
|
||||||
# dilema prolly only seen with IB too..
|
|
||||||
# while typ != 'trade':
|
|
||||||
# typ, quote = await anext(iter_quotes)
|
|
||||||
|
|
||||||
task_status.started((init_msgs, quote))
|
task_status.started((init_msgs, quote))
|
||||||
feed_is_live.set()
|
feed_is_live.set()
|
||||||
|
|
||||||
# XXX NOTE, DO NOT include the `.<backend>` suffix!
|
async for typ, msg in msg_gen:
|
||||||
# OW the sampling loop will not broadcast correctly..
|
await send_chan.send({sym_str: msg})
|
||||||
# since `bus._subscribers.setdefault(bs_fqme, set())`
|
|
||||||
# is used inside `.data.open_feed_bus()` !!!
|
|
||||||
topic: str = mkt.bs_fqme
|
|
||||||
async for typ, quote in iter_quotes:
|
|
||||||
await send_chan.send({topic: quote})
|
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
|
@ -847,7 +815,7 @@ async def subscribe(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def iter_normed_quotes(
|
async def stream_messages(
|
||||||
ws: NoBsWs,
|
ws: NoBsWs,
|
||||||
sym: str,
|
sym: str,
|
||||||
|
|
||||||
|
@ -878,9 +846,6 @@ async def iter_normed_quotes(
|
||||||
|
|
||||||
yield 'trade', {
|
yield 'trade', {
|
||||||
'symbol': sym,
|
'symbol': sym,
|
||||||
# TODO, is 'last' even used elsewhere/a-good
|
|
||||||
# semantic? can't we just read the ticks with our
|
|
||||||
# .data.ticktools.frame_ticks()`/
|
|
||||||
'last': trade_data.price,
|
'last': trade_data.price,
|
||||||
'brokerd_ts': last_trade_ts,
|
'brokerd_ts': last_trade_ts,
|
||||||
'ticks': [
|
'ticks': [
|
||||||
|
@ -973,7 +938,7 @@ async def open_history_client(
|
||||||
if end_dt is None:
|
if end_dt is None:
|
||||||
inow = round(time.time())
|
inow = round(time.time())
|
||||||
|
|
||||||
log.debug(
|
print(
|
||||||
f'difference in time between load and processing'
|
f'difference in time between load and processing'
|
||||||
f'{inow - times[-1]}'
|
f'{inow - times[-1]}'
|
||||||
)
|
)
|
||||||
|
|
|
@ -653,11 +653,7 @@ class Router(Struct):
|
||||||
flume = feed.flumes[fqme]
|
flume = feed.flumes[fqme]
|
||||||
first_quote: dict = flume.first_quote
|
first_quote: dict = flume.first_quote
|
||||||
book: DarkBook = self.get_dark_book(broker)
|
book: DarkBook = self.get_dark_book(broker)
|
||||||
|
book.lasts[fqme]: float = float(first_quote['last'])
|
||||||
if not (last := first_quote.get('last')):
|
|
||||||
last: float = flume.rt_shm.array[-1]['close']
|
|
||||||
|
|
||||||
book.lasts[fqme]: float = float(last)
|
|
||||||
|
|
||||||
async with self.maybe_open_brokerd_dialog(
|
async with self.maybe_open_brokerd_dialog(
|
||||||
brokermod=brokermod,
|
brokermod=brokermod,
|
||||||
|
@ -720,7 +716,7 @@ class Router(Struct):
|
||||||
subs = self.subscribers[sub_key]
|
subs = self.subscribers[sub_key]
|
||||||
|
|
||||||
sent_some: bool = False
|
sent_some: bool = False
|
||||||
for client_stream in subs.copy():
|
for client_stream in subs:
|
||||||
try:
|
try:
|
||||||
await client_stream.send(msg)
|
await client_stream.send(msg)
|
||||||
sent_some = True
|
sent_some = True
|
||||||
|
@ -1014,14 +1010,10 @@ async def translate_and_relay_brokerd_events(
|
||||||
status_msg.brokerd_msg = msg
|
status_msg.brokerd_msg = msg
|
||||||
status_msg.src = msg.broker_details['name']
|
status_msg.src = msg.broker_details['name']
|
||||||
|
|
||||||
if not status_msg.req:
|
await router.client_broadcast(
|
||||||
# likely some order change state?
|
status_msg.req.symbol,
|
||||||
await tractor.pause()
|
status_msg,
|
||||||
else:
|
)
|
||||||
await router.client_broadcast(
|
|
||||||
status_msg.req.symbol,
|
|
||||||
status_msg,
|
|
||||||
)
|
|
||||||
|
|
||||||
if status == 'closed':
|
if status == 'closed':
|
||||||
log.info(f'Execution for {oid} is complete!')
|
log.info(f'Execution for {oid} is complete!')
|
||||||
|
|
|
@ -653,7 +653,6 @@ async def open_trade_dialog(
|
||||||
# in) use manually constructed table from calling
|
# in) use manually constructed table from calling
|
||||||
# the `.get_mkt_info()` provider EP above.
|
# the `.get_mkt_info()` provider EP above.
|
||||||
_mktmap_table=mkt_by_fqme,
|
_mktmap_table=mkt_by_fqme,
|
||||||
only_require=list(mkt_by_fqme),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
pp_msgs: list[BrokerdPosition] = []
|
pp_msgs: list[BrokerdPosition] = []
|
||||||
|
|
|
@ -335,7 +335,7 @@ def services(config, tl, ports):
|
||||||
name='service_query',
|
name='service_query',
|
||||||
loglevel=config['loglevel'] if tl else None,
|
loglevel=config['loglevel'] if tl else None,
|
||||||
),
|
),
|
||||||
tractor.get_registry(
|
tractor.get_arbiter(
|
||||||
host=host,
|
host=host,
|
||||||
port=ports[0]
|
port=ports[0]
|
||||||
) as portal
|
) as portal
|
||||||
|
|
|
@ -25,12 +25,10 @@ from collections import (
|
||||||
defaultdict,
|
defaultdict,
|
||||||
)
|
)
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
from functools import partial
|
|
||||||
import time
|
import time
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
AsyncIterator,
|
AsyncIterator,
|
||||||
Callable,
|
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -44,7 +42,7 @@ from tractor.trionics import (
|
||||||
maybe_open_nursery,
|
maybe_open_nursery,
|
||||||
)
|
)
|
||||||
import trio
|
import trio
|
||||||
from trio import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
|
|
||||||
from .ticktools import (
|
from .ticktools import (
|
||||||
frame_ticks,
|
frame_ticks,
|
||||||
|
@ -55,9 +53,6 @@ from ._util import (
|
||||||
get_console_log,
|
get_console_log,
|
||||||
)
|
)
|
||||||
from ..service import maybe_spawn_daemon
|
from ..service import maybe_spawn_daemon
|
||||||
from piker.log import (
|
|
||||||
mk_repr,
|
|
||||||
)
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ._sharedmem import (
|
from ._sharedmem import (
|
||||||
|
@ -75,7 +70,6 @@ if TYPE_CHECKING:
|
||||||
_default_delay_s: float = 1.0
|
_default_delay_s: float = 1.0
|
||||||
|
|
||||||
|
|
||||||
# TODO: use new `tractor.singleton_acm` API for this!
|
|
||||||
class Sampler:
|
class Sampler:
|
||||||
'''
|
'''
|
||||||
Global sampling engine registry.
|
Global sampling engine registry.
|
||||||
|
@ -85,9 +79,9 @@ class Sampler:
|
||||||
|
|
||||||
This non-instantiated type is meant to be a singleton within
|
This non-instantiated type is meant to be a singleton within
|
||||||
a `samplerd` actor-service spawned once by the user wishing to
|
a `samplerd` actor-service spawned once by the user wishing to
|
||||||
time-step-sample a (real-time) quote feeds, see
|
time-step-sample (real-time) quote feeds, see
|
||||||
`.service.maybe_open_samplerd()` and the below
|
``.service.maybe_open_samplerd()`` and the below
|
||||||
`register_with_sampler()`.
|
``register_with_sampler()``.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
service_nursery: None | trio.Nursery = None
|
service_nursery: None | trio.Nursery = None
|
||||||
|
@ -381,10 +375,7 @@ async def register_with_sampler(
|
||||||
assert Sampler.ohlcv_shms
|
assert Sampler.ohlcv_shms
|
||||||
|
|
||||||
# unblock caller
|
# unblock caller
|
||||||
await ctx.started(
|
await ctx.started(set(Sampler.ohlcv_shms.keys()))
|
||||||
# XXX bc msgpack only allows one array type!
|
|
||||||
list(Sampler.ohlcv_shms.keys())
|
|
||||||
)
|
|
||||||
|
|
||||||
if open_index_stream:
|
if open_index_stream:
|
||||||
try:
|
try:
|
||||||
|
@ -428,6 +419,7 @@ async def register_with_sampler(
|
||||||
|
|
||||||
|
|
||||||
async def spawn_samplerd(
|
async def spawn_samplerd(
|
||||||
|
|
||||||
loglevel: str | None = None,
|
loglevel: str | None = None,
|
||||||
**extra_tractor_kwargs
|
**extra_tractor_kwargs
|
||||||
|
|
||||||
|
@ -437,10 +429,7 @@ async def spawn_samplerd(
|
||||||
update and increment count write and stream broadcasting.
|
update and increment count write and stream broadcasting.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from piker.service import (
|
from piker.service import Services
|
||||||
get_service_mngr,
|
|
||||||
ServiceMngr,
|
|
||||||
)
|
|
||||||
|
|
||||||
dname = 'samplerd'
|
dname = 'samplerd'
|
||||||
log.info(f'Spawning `{dname}`')
|
log.info(f'Spawning `{dname}`')
|
||||||
|
@ -448,33 +437,26 @@ async def spawn_samplerd(
|
||||||
# singleton lock creation of ``samplerd`` since we only ever want
|
# singleton lock creation of ``samplerd`` since we only ever want
|
||||||
# one daemon per ``pikerd`` proc tree.
|
# one daemon per ``pikerd`` proc tree.
|
||||||
# TODO: make this built-into the service api?
|
# TODO: make this built-into the service api?
|
||||||
mngr: ServiceMngr = get_service_mngr()
|
async with Services.locks[dname + '_singleton']:
|
||||||
already_started: bool = dname in mngr.service_tasks
|
|
||||||
|
|
||||||
async with mngr._locks[dname + '_singleton']:
|
if dname not in Services.service_tasks:
|
||||||
ctx: Context = await mngr.start_service(
|
|
||||||
daemon_name=dname,
|
portal = await Services.actor_n.start_actor(
|
||||||
ctx_ep=partial(
|
dname,
|
||||||
|
enable_modules=[
|
||||||
|
'piker.data._sampling',
|
||||||
|
],
|
||||||
|
loglevel=loglevel,
|
||||||
|
debug_mode=Services.debug_mode, # set by pikerd flag
|
||||||
|
**extra_tractor_kwargs
|
||||||
|
)
|
||||||
|
|
||||||
|
await Services.start_service_task(
|
||||||
|
dname,
|
||||||
|
portal,
|
||||||
register_with_sampler,
|
register_with_sampler,
|
||||||
period_s=1,
|
period_s=1,
|
||||||
sub_for_broadcasts=False,
|
sub_for_broadcasts=False,
|
||||||
),
|
|
||||||
debug_mode=mngr.debug_mode, # set by pikerd flag
|
|
||||||
|
|
||||||
# proxy-through to tractor
|
|
||||||
enable_modules=[
|
|
||||||
'piker.data._sampling',
|
|
||||||
],
|
|
||||||
loglevel=loglevel,
|
|
||||||
**extra_tractor_kwargs
|
|
||||||
)
|
|
||||||
if not already_started:
|
|
||||||
assert (
|
|
||||||
ctx
|
|
||||||
and
|
|
||||||
ctx.portal
|
|
||||||
and
|
|
||||||
not ctx.cancel_called
|
|
||||||
)
|
)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@ -579,6 +561,7 @@ async def open_sample_stream(
|
||||||
|
|
||||||
|
|
||||||
async def sample_and_broadcast(
|
async def sample_and_broadcast(
|
||||||
|
|
||||||
bus: _FeedsBus, # noqa
|
bus: _FeedsBus, # noqa
|
||||||
rt_shm: ShmArray,
|
rt_shm: ShmArray,
|
||||||
hist_shm: ShmArray,
|
hist_shm: ShmArray,
|
||||||
|
@ -599,22 +582,11 @@ async def sample_and_broadcast(
|
||||||
|
|
||||||
overruns = Counter()
|
overruns = Counter()
|
||||||
|
|
||||||
# multiline nested `dict` formatter (since rn quote-msgs are
|
|
||||||
# just that).
|
|
||||||
pfmt: Callable[[str], str] = mk_repr()
|
|
||||||
|
|
||||||
# iterate stream delivered by broker
|
# iterate stream delivered by broker
|
||||||
async for quotes in quote_stream:
|
async for quotes in quote_stream:
|
||||||
|
# print(quotes)
|
||||||
|
|
||||||
# XXX WARNING XXX only enable for debugging bc ow can cost
|
# TODO: ``numba`` this!
|
||||||
# ALOT of perf with HF-feedz!!!
|
|
||||||
#
|
|
||||||
# log.info(
|
|
||||||
# 'Rx live quotes:\n'
|
|
||||||
# f'{pfmt(quotes)}'
|
|
||||||
# )
|
|
||||||
|
|
||||||
# TODO: `numba` this!
|
|
||||||
for broker_symbol, quote in quotes.items():
|
for broker_symbol, quote in quotes.items():
|
||||||
# TODO: in theory you can send the IPC msg *before* writing
|
# TODO: in theory you can send the IPC msg *before* writing
|
||||||
# to the sharedmem array to decrease latency, however, that
|
# to the sharedmem array to decrease latency, however, that
|
||||||
|
@ -687,18 +659,6 @@ async def sample_and_broadcast(
|
||||||
sub_key: str = broker_symbol.lower()
|
sub_key: str = broker_symbol.lower()
|
||||||
subs: set[Sub] = bus.get_subs(sub_key)
|
subs: set[Sub] = bus.get_subs(sub_key)
|
||||||
|
|
||||||
if not subs:
|
|
||||||
all_bs_fqmes: list[str] = list(
|
|
||||||
bus._subscribers.keys()
|
|
||||||
)
|
|
||||||
log.warning(
|
|
||||||
f'No subscribers for {brokername!r} live-quote ??\n'
|
|
||||||
f'broker_symbol: {broker_symbol}\n\n'
|
|
||||||
|
|
||||||
f'Maybe the backend-sys symbol does not match one of,\n'
|
|
||||||
f'{pfmt(all_bs_fqmes)}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# NOTE: by default the broker backend doesn't append
|
# NOTE: by default the broker backend doesn't append
|
||||||
# it's own "name" into the fqme schema (but maybe it
|
# it's own "name" into the fqme schema (but maybe it
|
||||||
# should?) so we have to manually generate the correct
|
# should?) so we have to manually generate the correct
|
||||||
|
@ -929,7 +889,6 @@ async def uniform_rate_send(
|
||||||
# to consumers which crash or lose network connection.
|
# to consumers which crash or lose network connection.
|
||||||
# I.e. we **DO NOT** want to crash and propagate up to
|
# I.e. we **DO NOT** want to crash and propagate up to
|
||||||
# ``pikerd`` these kinds of errors!
|
# ``pikerd`` these kinds of errors!
|
||||||
trio.EndOfChannel,
|
|
||||||
trio.ClosedResourceError,
|
trio.ClosedResourceError,
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
ConnectionResetError,
|
ConnectionResetError,
|
||||||
|
|
|
@ -273,7 +273,7 @@ async def _reconnect_forever(
|
||||||
nobsws._connected.set()
|
nobsws._connected.set()
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
except HandshakeError:
|
except HandshakeError:
|
||||||
log.exception('Retrying connection')
|
log.exception(f'Retrying connection')
|
||||||
|
|
||||||
# ws & nursery block ends
|
# ws & nursery block ends
|
||||||
|
|
||||||
|
@ -359,8 +359,8 @@ async def open_autorecon_ws(
|
||||||
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
JSONRPC response-request style machinery for transparent multiplexing
|
JSONRPC response-request style machinery for transparent multiplexing of msgs
|
||||||
of msgs over a `NoBsWs`.
|
over a NoBsWs.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
@ -377,77 +377,43 @@ async def open_jsonrpc_session(
|
||||||
url: str,
|
url: str,
|
||||||
start_id: int = 0,
|
start_id: int = 0,
|
||||||
response_type: type = JSONRPCResult,
|
response_type: type = JSONRPCResult,
|
||||||
msg_recv_timeout: float = float('inf'),
|
request_type: Optional[type] = None,
|
||||||
# ^NOTE, since only `deribit` is using this jsonrpc stuff atm
|
request_hook: Optional[Callable] = None,
|
||||||
# and options mkts are generally "slow moving"..
|
error_hook: Optional[Callable] = None,
|
||||||
#
|
|
||||||
# FURTHER if we break the underlying ws connection then since we
|
|
||||||
# don't pass a `fixture` to the task that manages `NoBsWs`, i.e.
|
|
||||||
# `_reconnect_forever()`, the jsonrpc "transport pipe" get's
|
|
||||||
# broken and never restored with wtv init sequence is required to
|
|
||||||
# re-establish a working req-resp session.
|
|
||||||
|
|
||||||
# request_type: Optional[type] = None,
|
|
||||||
# request_hook: Optional[Callable] = None,
|
|
||||||
# error_hook: Optional[Callable] = None,
|
|
||||||
) -> Callable[[str, dict], dict]:
|
) -> Callable[[str, dict], dict]:
|
||||||
|
|
||||||
# NOTE, store all request msgs so we can raise errors on the
|
|
||||||
# caller side!
|
|
||||||
req_msgs: dict[int, dict] = {}
|
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
trio.open_nursery() as n,
|
trio.open_nursery() as n,
|
||||||
open_autorecon_ws(
|
open_autorecon_ws(url) as ws
|
||||||
url=url,
|
|
||||||
msg_recv_timeout=msg_recv_timeout,
|
|
||||||
) as ws
|
|
||||||
):
|
):
|
||||||
rpc_id: Iterable[int] = count(start_id)
|
rpc_id: Iterable = count(start_id)
|
||||||
rpc_results: dict[int, dict] = {}
|
rpc_results: dict[int, dict] = {}
|
||||||
|
|
||||||
async def json_rpc(
|
async def json_rpc(method: str, params: dict) -> dict:
|
||||||
method: str,
|
|
||||||
params: dict,
|
|
||||||
) -> dict:
|
|
||||||
'''
|
'''
|
||||||
perform a json rpc call and wait for the result, raise exception in
|
perform a json rpc call and wait for the result, raise exception in
|
||||||
case of error field present on response
|
case of error field present on response
|
||||||
'''
|
'''
|
||||||
nonlocal req_msgs
|
|
||||||
|
|
||||||
req_id: int = next(rpc_id)
|
|
||||||
msg = {
|
msg = {
|
||||||
'jsonrpc': '2.0',
|
'jsonrpc': '2.0',
|
||||||
'id': req_id,
|
'id': next(rpc_id),
|
||||||
'method': method,
|
'method': method,
|
||||||
'params': params
|
'params': params
|
||||||
}
|
}
|
||||||
_id = msg['id']
|
_id = msg['id']
|
||||||
|
|
||||||
result = rpc_results[_id] = {
|
rpc_results[_id] = {
|
||||||
'result': None,
|
'result': None,
|
||||||
'error': None,
|
'event': trio.Event()
|
||||||
'event': trio.Event(), # signal caller resp arrived
|
|
||||||
}
|
}
|
||||||
req_msgs[_id] = msg
|
|
||||||
|
|
||||||
await ws.send_msg(msg)
|
await ws.send_msg(msg)
|
||||||
|
|
||||||
# wait for reponse before unblocking requester code
|
|
||||||
await rpc_results[_id]['event'].wait()
|
await rpc_results[_id]['event'].wait()
|
||||||
|
|
||||||
if (maybe_result := result['result']):
|
ret = rpc_results[_id]['result']
|
||||||
ret = maybe_result
|
|
||||||
del rpc_results[_id]
|
|
||||||
|
|
||||||
else:
|
del rpc_results[_id]
|
||||||
err = result['error']
|
|
||||||
raise Exception(
|
|
||||||
f'JSONRPC request failed\n'
|
|
||||||
f'req: {msg}\n'
|
|
||||||
f'resp: {err}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
if ret.error is not None:
|
if ret.error is not None:
|
||||||
raise Exception(json.dumps(ret.error, indent=4))
|
raise Exception(json.dumps(ret.error, indent=4))
|
||||||
|
@ -462,7 +428,6 @@ async def open_jsonrpc_session(
|
||||||
the server side.
|
the server side.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
nonlocal req_msgs
|
|
||||||
async for msg in ws:
|
async for msg in ws:
|
||||||
match msg:
|
match msg:
|
||||||
case {
|
case {
|
||||||
|
@ -486,29 +451,15 @@ async def open_jsonrpc_session(
|
||||||
'params': _,
|
'params': _,
|
||||||
}:
|
}:
|
||||||
log.debug(f'Recieved\n{msg}')
|
log.debug(f'Recieved\n{msg}')
|
||||||
# if request_hook:
|
if request_hook:
|
||||||
# await request_hook(request_type(**msg))
|
await request_hook(request_type(**msg))
|
||||||
|
|
||||||
case {
|
case {
|
||||||
'error': error
|
'error': error
|
||||||
}:
|
}:
|
||||||
# if error_hook:
|
log.warning(f'Recieved\n{error}')
|
||||||
# await error_hook(response_type(**msg))
|
if error_hook:
|
||||||
|
await error_hook(response_type(**msg))
|
||||||
# retreive orig request msg, set error
|
|
||||||
# response in original "result" msg,
|
|
||||||
# THEN FINALLY set the event to signal caller
|
|
||||||
# to raise the error in the parent task.
|
|
||||||
req_id: int = error['id']
|
|
||||||
req_msg: dict = req_msgs[req_id]
|
|
||||||
result: dict = rpc_results[req_id]
|
|
||||||
result['error'] = error
|
|
||||||
result['event'].set()
|
|
||||||
log.error(
|
|
||||||
f'JSONRPC request failed\n'
|
|
||||||
f'req: {req_msg}\n'
|
|
||||||
f'resp: {error}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
|
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
|
||||||
|
|
|
@ -540,10 +540,7 @@ async def open_feed_bus(
|
||||||
# subscription since the backend isn't (yet) expected to
|
# subscription since the backend isn't (yet) expected to
|
||||||
# append it's own name to the fqme, so we filter on keys
|
# append it's own name to the fqme, so we filter on keys
|
||||||
# which *do not* include that name (e.g .ib) .
|
# which *do not* include that name (e.g .ib) .
|
||||||
bus._subscribers.setdefault(
|
bus._subscribers.setdefault(bs_fqme, set())
|
||||||
bs_fqme,
|
|
||||||
set(),
|
|
||||||
)
|
|
||||||
|
|
||||||
# sync feed subscribers with flume handles
|
# sync feed subscribers with flume handles
|
||||||
await ctx.started(
|
await ctx.started(
|
||||||
|
|
28
piker/log.py
28
piker/log.py
|
@ -18,11 +18,7 @@
|
||||||
Log like a forester!
|
Log like a forester!
|
||||||
"""
|
"""
|
||||||
import logging
|
import logging
|
||||||
import reprlib
|
|
||||||
import json
|
import json
|
||||||
from typing import (
|
|
||||||
Callable,
|
|
||||||
)
|
|
||||||
|
|
||||||
import tractor
|
import tractor
|
||||||
from pygments import (
|
from pygments import (
|
||||||
|
@ -88,27 +84,3 @@ def colorize_json(
|
||||||
# likeable styles: algol_nu, tango, monokai
|
# likeable styles: algol_nu, tango, monokai
|
||||||
formatters.TerminalTrueColorFormatter(style=style)
|
formatters.TerminalTrueColorFormatter(style=style)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def mk_repr(
|
|
||||||
**repr_kws,
|
|
||||||
) -> Callable[[str], str]:
|
|
||||||
'''
|
|
||||||
Allocate and deliver a `repr.Repr` instance with provided input
|
|
||||||
settings using the std-lib's `reprlib` mod,
|
|
||||||
* https://docs.python.org/3/library/reprlib.html
|
|
||||||
|
|
||||||
------ Ex. ------
|
|
||||||
An up to 6-layer-nested `dict` as multi-line:
|
|
||||||
- https://stackoverflow.com/a/79102479
|
|
||||||
- https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
|
|
||||||
|
|
||||||
'''
|
|
||||||
def_kws: dict[str, int] = dict(
|
|
||||||
indent=2,
|
|
||||||
maxlevel=6, # recursion levels
|
|
||||||
maxstring=66, # match editor line-len limit
|
|
||||||
)
|
|
||||||
def_kws |= repr_kws
|
|
||||||
reprr = reprlib.Repr(**def_kws)
|
|
||||||
return reprr.repr
|
|
||||||
|
|
|
@ -30,11 +30,7 @@ Actor runtime primtives and (distributed) service APIs for,
|
||||||
=> TODO: maybe to (re)move elsewhere?
|
=> TODO: maybe to (re)move elsewhere?
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from ._mngr import (
|
from ._mngr import Services as Services
|
||||||
get_service_mngr as get_service_mngr,
|
|
||||||
open_service_mngr as open_service_mngr,
|
|
||||||
ServiceMngr as ServiceMngr,
|
|
||||||
)
|
|
||||||
from ._registry import (
|
from ._registry import (
|
||||||
_tractor_kwargs as _tractor_kwargs,
|
_tractor_kwargs as _tractor_kwargs,
|
||||||
_default_reg_addr as _default_reg_addr,
|
_default_reg_addr as _default_reg_addr,
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
import os
|
import os
|
||||||
from typing import (
|
from typing import (
|
||||||
|
Optional,
|
||||||
Any,
|
Any,
|
||||||
ClassVar,
|
ClassVar,
|
||||||
)
|
)
|
||||||
|
@ -29,13 +30,13 @@ from contextlib import (
|
||||||
)
|
)
|
||||||
|
|
||||||
import tractor
|
import tractor
|
||||||
|
import trio
|
||||||
|
|
||||||
from ._util import (
|
from ._util import (
|
||||||
get_console_log,
|
get_console_log,
|
||||||
)
|
)
|
||||||
from ._mngr import (
|
from ._mngr import (
|
||||||
open_service_mngr,
|
Services,
|
||||||
ServiceMngr,
|
|
||||||
)
|
)
|
||||||
from ._registry import ( # noqa
|
from ._registry import ( # noqa
|
||||||
_tractor_kwargs,
|
_tractor_kwargs,
|
||||||
|
@ -58,7 +59,7 @@ async def open_piker_runtime(
|
||||||
registry_addrs: list[tuple[str, int]] = [],
|
registry_addrs: list[tuple[str, int]] = [],
|
||||||
|
|
||||||
enable_modules: list[str] = [],
|
enable_modules: list[str] = [],
|
||||||
loglevel: str|None = None,
|
loglevel: Optional[str] = None,
|
||||||
|
|
||||||
# XXX NOTE XXX: you should pretty much never want debug mode
|
# XXX NOTE XXX: you should pretty much never want debug mode
|
||||||
# for data daemons when running in production.
|
# for data daemons when running in production.
|
||||||
|
@ -68,7 +69,7 @@ async def open_piker_runtime(
|
||||||
# and spawn the service tree distributed per that.
|
# and spawn the service tree distributed per that.
|
||||||
start_method: str = 'trio',
|
start_method: str = 'trio',
|
||||||
|
|
||||||
tractor_runtime_overrides: dict|None = None,
|
tractor_runtime_overrides: dict | None = None,
|
||||||
**tractor_kwargs,
|
**tractor_kwargs,
|
||||||
|
|
||||||
) -> tuple[
|
) -> tuple[
|
||||||
|
@ -118,10 +119,6 @@ async def open_piker_runtime(
|
||||||
# spawn other specialized daemons I think?
|
# spawn other specialized daemons I think?
|
||||||
enable_modules=enable_modules,
|
enable_modules=enable_modules,
|
||||||
|
|
||||||
# TODO: how to configure this?
|
|
||||||
# keep it on by default if debug mode is set?
|
|
||||||
# maybe_enable_greenback=debug_mode,
|
|
||||||
|
|
||||||
**tractor_kwargs,
|
**tractor_kwargs,
|
||||||
) as actor,
|
) as actor,
|
||||||
|
|
||||||
|
@ -170,13 +167,12 @@ async def open_pikerd(
|
||||||
|
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> ServiceMngr:
|
) -> Services:
|
||||||
'''
|
'''
|
||||||
Start a root piker daemon actor (aka `pikerd`) with an indefinite
|
Start a root piker daemon with an indefinite lifetime.
|
||||||
lifetime.
|
|
||||||
|
|
||||||
A root actor-nursery is created which can be used to spawn and
|
A root actor nursery is created which can be used to create and keep
|
||||||
supervise underling service sub-actors (see below).
|
alive underling services (see below).
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# NOTE: for the root daemon we always enable the root
|
# NOTE: for the root daemon we always enable the root
|
||||||
|
@ -203,6 +199,8 @@ async def open_pikerd(
|
||||||
root_actor,
|
root_actor,
|
||||||
reg_addrs,
|
reg_addrs,
|
||||||
),
|
),
|
||||||
|
tractor.open_nursery() as actor_nursery,
|
||||||
|
trio.open_nursery() as service_nursery,
|
||||||
):
|
):
|
||||||
for addr in reg_addrs:
|
for addr in reg_addrs:
|
||||||
if addr not in root_actor.accept_addrs:
|
if addr not in root_actor.accept_addrs:
|
||||||
|
@ -211,17 +209,25 @@ async def open_pikerd(
|
||||||
'Maybe you have another daemon already running?'
|
'Maybe you have another daemon already running?'
|
||||||
)
|
)
|
||||||
|
|
||||||
mngr: ServiceMngr
|
# assign globally for future daemon/task creation
|
||||||
async with open_service_mngr(
|
Services.actor_n = actor_nursery
|
||||||
debug_mode=debug_mode,
|
Services.service_n = service_nursery
|
||||||
) as mngr:
|
Services.debug_mode = debug_mode
|
||||||
yield mngr
|
|
||||||
|
try:
|
||||||
|
yield Services
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# TODO: is this more clever/efficient?
|
||||||
|
# if 'samplerd' in Services.service_tasks:
|
||||||
|
# await Services.cancel_service('samplerd')
|
||||||
|
service_nursery.cancel_scope.cancel()
|
||||||
|
|
||||||
|
|
||||||
# TODO: do we even need this?
|
# TODO: do we even need this?
|
||||||
# @acm
|
# @acm
|
||||||
# async def maybe_open_runtime(
|
# async def maybe_open_runtime(
|
||||||
# loglevel: str|None = None,
|
# loglevel: Optional[str] = None,
|
||||||
# **kwargs,
|
# **kwargs,
|
||||||
|
|
||||||
# ) -> None:
|
# ) -> None:
|
||||||
|
@ -250,7 +256,7 @@ async def maybe_open_pikerd(
|
||||||
loglevel: str | None = None,
|
loglevel: str | None = None,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> tractor._portal.Portal | ClassVar[ServiceMngr]:
|
) -> tractor._portal.Portal | ClassVar[Services]:
|
||||||
'''
|
'''
|
||||||
If no ``pikerd`` daemon-root-actor can be found start it and
|
If no ``pikerd`` daemon-root-actor can be found start it and
|
||||||
yield up (we should probably figure out returning a portal to self
|
yield up (we should probably figure out returning a portal to self
|
||||||
|
|
|
@ -49,7 +49,7 @@ from requests.exceptions import (
|
||||||
ReadTimeout,
|
ReadTimeout,
|
||||||
)
|
)
|
||||||
|
|
||||||
from ._mngr import ServiceMngr
|
from ._mngr import Services
|
||||||
from ._util import (
|
from ._util import (
|
||||||
log, # sub-sys logger
|
log, # sub-sys logger
|
||||||
get_console_log,
|
get_console_log,
|
||||||
|
@ -453,7 +453,7 @@ async def open_ahabd(
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def start_ahab_service(
|
async def start_ahab_service(
|
||||||
services: ServiceMngr,
|
services: Services,
|
||||||
service_name: str,
|
service_name: str,
|
||||||
|
|
||||||
# endpoint config passed as **kwargs
|
# endpoint config passed as **kwargs
|
||||||
|
@ -549,8 +549,7 @@ async def start_ahab_service(
|
||||||
log.warning('Failed to cancel root permsed container')
|
log.warning('Failed to cancel root permsed container')
|
||||||
|
|
||||||
except (
|
except (
|
||||||
# trio.MultiError,
|
trio.MultiError,
|
||||||
ExceptionGroup,
|
|
||||||
) as err:
|
) as err:
|
||||||
for subexc in err.exceptions:
|
for subexc in err.exceptions:
|
||||||
if isinstance(subexc, PermissionError):
|
if isinstance(subexc, PermissionError):
|
||||||
|
|
|
@ -26,17 +26,14 @@ from typing import (
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
asynccontextmanager as acm,
|
asynccontextmanager as acm,
|
||||||
)
|
)
|
||||||
from collections import defaultdict
|
|
||||||
|
|
||||||
import tractor
|
import tractor
|
||||||
import trio
|
|
||||||
|
|
||||||
from ._util import (
|
from ._util import (
|
||||||
log, # sub-sys logger
|
log, # sub-sys logger
|
||||||
)
|
)
|
||||||
from ._mngr import (
|
from ._mngr import (
|
||||||
get_service_mngr,
|
Services,
|
||||||
ServiceMngr,
|
|
||||||
)
|
)
|
||||||
from ._actor_runtime import maybe_open_pikerd
|
from ._actor_runtime import maybe_open_pikerd
|
||||||
from ._registry import find_service
|
from ._registry import find_service
|
||||||
|
@ -44,14 +41,15 @@ from ._registry import find_service
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def maybe_spawn_daemon(
|
async def maybe_spawn_daemon(
|
||||||
|
|
||||||
service_name: str,
|
service_name: str,
|
||||||
service_task_target: Callable,
|
service_task_target: Callable,
|
||||||
|
|
||||||
spawn_args: dict[str, Any],
|
spawn_args: dict[str, Any],
|
||||||
|
|
||||||
loglevel: str | None = None,
|
loglevel: str | None = None,
|
||||||
singleton: bool = False,
|
singleton: bool = False,
|
||||||
|
|
||||||
_locks = defaultdict(trio.Lock),
|
|
||||||
**pikerd_kwargs,
|
**pikerd_kwargs,
|
||||||
|
|
||||||
) -> tractor.Portal:
|
) -> tractor.Portal:
|
||||||
|
@ -69,7 +67,7 @@ async def maybe_spawn_daemon(
|
||||||
'''
|
'''
|
||||||
# serialize access to this section to avoid
|
# serialize access to this section to avoid
|
||||||
# 2 or more tasks racing to create a daemon
|
# 2 or more tasks racing to create a daemon
|
||||||
lock = _locks[service_name]
|
lock = Services.locks[service_name]
|
||||||
await lock.acquire()
|
await lock.acquire()
|
||||||
|
|
||||||
async with find_service(
|
async with find_service(
|
||||||
|
@ -134,65 +132,7 @@ async def maybe_spawn_daemon(
|
||||||
async with tractor.wait_for_actor(service_name) as portal:
|
async with tractor.wait_for_actor(service_name) as portal:
|
||||||
lock.release()
|
lock.release()
|
||||||
yield portal
|
yield portal
|
||||||
# --- ---- ---
|
await portal.cancel_actor()
|
||||||
# XXX NOTE XXX
|
|
||||||
# --- ---- ---
|
|
||||||
# DO NOT PUT A `portal.cancel_actor()` here (as was prior)!
|
|
||||||
#
|
|
||||||
# Doing so will cause an "out-of-band" ctxc
|
|
||||||
# (`tractor.ContextCancelled`) to be raised inside the
|
|
||||||
# `ServiceMngr.open_context_in_task()`'s call to
|
|
||||||
# `ctx.wait_for_result()` AND the internal self-ctxc
|
|
||||||
# "graceful capture" WILL NOT CATCH IT!
|
|
||||||
#
|
|
||||||
# This can cause certain types of operations to raise
|
|
||||||
# that ctxc BEFORE THEY `return`, resulting in
|
|
||||||
# a "false-negative" ctxc being raised when really
|
|
||||||
# nothing actually failed, other then our semantic
|
|
||||||
# "failure" to suppress an expected, graceful,
|
|
||||||
# self-cancel scenario..
|
|
||||||
#
|
|
||||||
# bUt wHy duZ It WorK lIKe dis..
|
|
||||||
# ------------------------------
|
|
||||||
# from the perspective of the `tractor.Context` this
|
|
||||||
# cancel request was conducted "out of band" since
|
|
||||||
# `Context.cancel()` was never called and thus the
|
|
||||||
# `._cancel_called: bool` was never set. Despite the
|
|
||||||
# remote `.canceller` being set to `pikerd` (i.e. the
|
|
||||||
# same `Actor.uid` of the raising service-mngr task) the
|
|
||||||
# service-task's ctx itself was never marked as having
|
|
||||||
# requested cancellation and thus still raises the ctxc
|
|
||||||
# bc it was unaware of any such request.
|
|
||||||
#
|
|
||||||
# How to make grokin these cases easier tho?
|
|
||||||
# ------------------------------------------
|
|
||||||
# Because `Portal.cancel_actor()` was called it requests
|
|
||||||
# "full-`Actor`-runtime-cancellation" of it's peer
|
|
||||||
# process which IS NOT THE SAME as a single inter-actor
|
|
||||||
# RPC task cancelling its local context with a remote
|
|
||||||
# peer `Task` in that same peer process.
|
|
||||||
#
|
|
||||||
# ?TODO? It might be better if we do one (or all) of the
|
|
||||||
# following:
|
|
||||||
#
|
|
||||||
# -[ ] at least set a special message for the
|
|
||||||
# `ContextCancelled` when raised locally by the
|
|
||||||
# unaware ctx task such that we check for the
|
|
||||||
# `.canceller` being *our `Actor`* and in the case
|
|
||||||
# where `Context._cancel_called == False` we specially
|
|
||||||
# note that this is likely an "out-of-band"
|
|
||||||
# runtime-cancel request triggered by some call to
|
|
||||||
# `Portal.cancel_actor()`, possibly even reporting the
|
|
||||||
# exact LOC of that caller by tracking it inside our
|
|
||||||
# portal-type?
|
|
||||||
# -[ ] possibly add another field `ContextCancelled` like
|
|
||||||
# maybe a,
|
|
||||||
# `.request_type: Literal['os', 'proc', 'actor',
|
|
||||||
# 'ctx']` type thing which would allow immediately
|
|
||||||
# being able to tell what kind of cancellation caused
|
|
||||||
# the unexpected ctxc?
|
|
||||||
# -[ ] REMOVE THIS COMMENT, once we've settled on how to
|
|
||||||
# better augment `tractor` to be more explicit on this!
|
|
||||||
|
|
||||||
|
|
||||||
async def spawn_emsd(
|
async def spawn_emsd(
|
||||||
|
@ -207,22 +147,21 @@ async def spawn_emsd(
|
||||||
"""
|
"""
|
||||||
log.info('Spawning emsd')
|
log.info('Spawning emsd')
|
||||||
|
|
||||||
smngr: ServiceMngr = get_service_mngr()
|
portal = await Services.actor_n.start_actor(
|
||||||
portal = await smngr.actor_n.start_actor(
|
|
||||||
'emsd',
|
'emsd',
|
||||||
enable_modules=[
|
enable_modules=[
|
||||||
'piker.clearing._ems',
|
'piker.clearing._ems',
|
||||||
'piker.clearing._client',
|
'piker.clearing._client',
|
||||||
],
|
],
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
debug_mode=smngr.debug_mode, # set by pikerd flag
|
debug_mode=Services.debug_mode, # set by pikerd flag
|
||||||
**extra_tractor_kwargs
|
**extra_tractor_kwargs
|
||||||
)
|
)
|
||||||
|
|
||||||
# non-blocking setup of clearing service
|
# non-blocking setup of clearing service
|
||||||
from ..clearing._ems import _setup_persistent_emsd
|
from ..clearing._ems import _setup_persistent_emsd
|
||||||
|
|
||||||
await smngr.start_service_task(
|
await Services.start_service_task(
|
||||||
'emsd',
|
'emsd',
|
||||||
portal,
|
portal,
|
||||||
|
|
||||||
|
|
|
@ -18,29 +18,16 @@
|
||||||
daemon-service management API.
|
daemon-service management API.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
|
||||||
from contextlib import (
|
|
||||||
asynccontextmanager as acm,
|
|
||||||
# contextmanager as cm,
|
|
||||||
)
|
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from dataclasses import (
|
|
||||||
dataclass,
|
|
||||||
field,
|
|
||||||
)
|
|
||||||
import functools
|
|
||||||
import inspect
|
|
||||||
from typing import (
|
from typing import (
|
||||||
Callable,
|
Callable,
|
||||||
Any,
|
Any,
|
||||||
)
|
)
|
||||||
|
|
||||||
import msgspec
|
|
||||||
import tractor
|
|
||||||
import trio
|
import trio
|
||||||
from trio import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
|
import tractor
|
||||||
from tractor import (
|
from tractor import (
|
||||||
ActorNursery,
|
|
||||||
current_actor,
|
current_actor,
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
Context,
|
Context,
|
||||||
|
@ -52,130 +39,6 @@ from ._util import (
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# TODO: implement a singleton deco-API for wrapping the below
|
|
||||||
# factory's impl for general actor-singleton use?
|
|
||||||
#
|
|
||||||
# @singleton
|
|
||||||
# async def open_service_mngr(
|
|
||||||
# **init_kwargs,
|
|
||||||
# ) -> ServiceMngr:
|
|
||||||
# '''
|
|
||||||
# Note this function body is invoke IFF no existing singleton instance already
|
|
||||||
# exists in this proc's memory.
|
|
||||||
|
|
||||||
# '''
|
|
||||||
# # setup
|
|
||||||
# yield ServiceMngr(**init_kwargs)
|
|
||||||
# # teardown
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: singleton factory API instead of a class API
|
|
||||||
@acm
|
|
||||||
async def open_service_mngr(
|
|
||||||
*,
|
|
||||||
debug_mode: bool = False,
|
|
||||||
|
|
||||||
# impl deat which ensures a single global instance
|
|
||||||
_singleton: list[ServiceMngr|None] = [None],
|
|
||||||
**init_kwargs,
|
|
||||||
|
|
||||||
) -> ServiceMngr:
|
|
||||||
'''
|
|
||||||
Open a multi-subactor-as-service-daemon tree supervisor.
|
|
||||||
|
|
||||||
The delivered `ServiceMngr` is a singleton instance for each
|
|
||||||
actor-process and is allocated on first open and never
|
|
||||||
de-allocated unless explicitly deleted by al call to
|
|
||||||
`del_service_mngr()`.
|
|
||||||
|
|
||||||
'''
|
|
||||||
# TODO: factor this an allocation into
|
|
||||||
# a `._mngr.open_service_mngr()` and put in the
|
|
||||||
# once-n-only-once setup/`.__aenter__()` part!
|
|
||||||
# -[ ] how to make this only happen on the `mngr == None` case?
|
|
||||||
# |_ use `.trionics.maybe_open_context()` (for generic
|
|
||||||
# async-with-style-only-once of the factory impl, though
|
|
||||||
# what do we do for the allocation case?
|
|
||||||
# / `.maybe_open_nursery()` (since for this specific case
|
|
||||||
# it's simpler?) to activate
|
|
||||||
async with (
|
|
||||||
tractor.open_nursery() as an,
|
|
||||||
trio.open_nursery() as tn,
|
|
||||||
):
|
|
||||||
# impl specific obvi..
|
|
||||||
init_kwargs.update({
|
|
||||||
'actor_n': an,
|
|
||||||
'service_n': tn,
|
|
||||||
})
|
|
||||||
|
|
||||||
mngr: ServiceMngr|None
|
|
||||||
if (mngr := _singleton[0]) is None:
|
|
||||||
|
|
||||||
log.info('Allocating a new service mngr!')
|
|
||||||
mngr = _singleton[0] = ServiceMngr(**init_kwargs)
|
|
||||||
|
|
||||||
# TODO: put into `.__aenter__()` section of
|
|
||||||
# eventual `@singleton_acm` API wrapper.
|
|
||||||
#
|
|
||||||
# assign globally for future daemon/task creation
|
|
||||||
mngr.actor_n = an
|
|
||||||
mngr.service_n = tn
|
|
||||||
|
|
||||||
else:
|
|
||||||
assert (
|
|
||||||
mngr.actor_n
|
|
||||||
and
|
|
||||||
mngr.service_tn
|
|
||||||
)
|
|
||||||
log.info(
|
|
||||||
'Using extant service mngr!\n\n'
|
|
||||||
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
# NOTE: this is a singleton factory impl specific detail
|
|
||||||
# which should be supported in the condensed
|
|
||||||
# `@singleton_acm` API?
|
|
||||||
mngr.debug_mode = debug_mode
|
|
||||||
|
|
||||||
yield mngr
|
|
||||||
finally:
|
|
||||||
# TODO: is this more clever/efficient?
|
|
||||||
# if 'samplerd' in mngr.service_tasks:
|
|
||||||
# await mngr.cancel_service('samplerd')
|
|
||||||
tn.cancel_scope.cancel()
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def get_service_mngr() -> ServiceMngr:
|
|
||||||
'''
|
|
||||||
Try to get the singleton service-mngr for this actor presuming it
|
|
||||||
has already been allocated using,
|
|
||||||
|
|
||||||
.. code:: python
|
|
||||||
|
|
||||||
async with open_<@singleton_acm(func)>() as mngr`
|
|
||||||
... this block kept open ...
|
|
||||||
|
|
||||||
If not yet allocated raise a `ServiceError`.
|
|
||||||
|
|
||||||
'''
|
|
||||||
# https://stackoverflow.com/a/12627202
|
|
||||||
# https://docs.python.org/3/library/inspect.html#inspect.Signature
|
|
||||||
maybe_mngr: ServiceMngr|None = inspect.signature(
|
|
||||||
open_service_mngr
|
|
||||||
).parameters['_singleton'].default[0]
|
|
||||||
|
|
||||||
if maybe_mngr is None:
|
|
||||||
raise RuntimeError(
|
|
||||||
'Someone must allocate a `ServiceMngr` using\n\n'
|
|
||||||
'`async with open_service_mngr()` beforehand!!\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
return maybe_mngr
|
|
||||||
|
|
||||||
|
|
||||||
# TODO: we need remote wrapping and a general soln:
|
# TODO: we need remote wrapping and a general soln:
|
||||||
# - factor this into a ``tractor.highlevel`` extension # pack for the
|
# - factor this into a ``tractor.highlevel`` extension # pack for the
|
||||||
# library.
|
# library.
|
||||||
|
@ -183,46 +46,31 @@ def get_service_mngr() -> ServiceMngr:
|
||||||
# to the pikerd actor for starting services remotely!
|
# to the pikerd actor for starting services remotely!
|
||||||
# - prolly rename this to ActorServicesNursery since it spawns
|
# - prolly rename this to ActorServicesNursery since it spawns
|
||||||
# new actors and supervises them to completion?
|
# new actors and supervises them to completion?
|
||||||
@dataclass
|
class Services:
|
||||||
class ServiceMngr:
|
|
||||||
# class ServiceMngr(msgspec.Struct):
|
|
||||||
'''
|
|
||||||
A multi-subactor-as-service manager.
|
|
||||||
|
|
||||||
Spawn, supervise and monitor service/daemon subactors in a SC
|
actor_n: tractor._supervise.ActorNursery
|
||||||
process tree.
|
|
||||||
|
|
||||||
'''
|
|
||||||
actor_n: ActorNursery
|
|
||||||
service_n: trio.Nursery
|
service_n: trio.Nursery
|
||||||
debug_mode: bool = False # tractor sub-actor debug mode flag
|
debug_mode: bool # tractor sub-actor debug mode flag
|
||||||
|
|
||||||
service_tasks: dict[
|
service_tasks: dict[
|
||||||
str,
|
str,
|
||||||
tuple[
|
tuple[
|
||||||
trio.CancelScope,
|
trio.CancelScope,
|
||||||
Context,
|
|
||||||
Portal,
|
Portal,
|
||||||
trio.Event,
|
trio.Event,
|
||||||
]
|
]
|
||||||
] = field(default_factory=dict)
|
] = {}
|
||||||
|
locks = defaultdict(trio.Lock)
|
||||||
# internal per-service task mutexs
|
|
||||||
_locks = defaultdict(trio.Lock)
|
|
||||||
|
|
||||||
|
@classmethod
|
||||||
async def start_service_task(
|
async def start_service_task(
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
portal: Portal,
|
portal: Portal,
|
||||||
|
|
||||||
# TODO: typevar for the return type of the target and then
|
|
||||||
# use it below for `ctx_res`?
|
|
||||||
target: Callable,
|
target: Callable,
|
||||||
|
|
||||||
allow_overruns: bool = False,
|
allow_overruns: bool = False,
|
||||||
**ctx_kwargs,
|
**ctx_kwargs,
|
||||||
|
|
||||||
) -> (trio.CancelScope, Context, Any):
|
) -> (trio.CancelScope, Context):
|
||||||
'''
|
'''
|
||||||
Open a context in a service sub-actor, add to a stack
|
Open a context in a service sub-actor, add to a stack
|
||||||
that gets unwound at ``pikerd`` teardown.
|
that gets unwound at ``pikerd`` teardown.
|
||||||
|
@ -235,7 +83,6 @@ class ServiceMngr:
|
||||||
task_status: TaskStatus[
|
task_status: TaskStatus[
|
||||||
tuple[
|
tuple[
|
||||||
trio.CancelScope,
|
trio.CancelScope,
|
||||||
Context,
|
|
||||||
trio.Event,
|
trio.Event,
|
||||||
Any,
|
Any,
|
||||||
]
|
]
|
||||||
|
@ -243,87 +90,64 @@ class ServiceMngr:
|
||||||
|
|
||||||
) -> Any:
|
) -> Any:
|
||||||
|
|
||||||
# TODO: use the ctx._scope directly here instead?
|
|
||||||
# -[ ] actually what semantics do we expect for this
|
|
||||||
# usage!?
|
|
||||||
with trio.CancelScope() as cs:
|
with trio.CancelScope() as cs:
|
||||||
try:
|
|
||||||
async with portal.open_context(
|
|
||||||
target,
|
|
||||||
allow_overruns=allow_overruns,
|
|
||||||
**ctx_kwargs,
|
|
||||||
|
|
||||||
) as (ctx, started):
|
async with portal.open_context(
|
||||||
|
target,
|
||||||
|
allow_overruns=allow_overruns,
|
||||||
|
**ctx_kwargs,
|
||||||
|
|
||||||
# unblock once the remote context has started
|
) as (ctx, first):
|
||||||
complete = trio.Event()
|
|
||||||
task_status.started((
|
# unblock once the remote context has started
|
||||||
cs,
|
complete = trio.Event()
|
||||||
ctx,
|
task_status.started((cs, complete, first))
|
||||||
complete,
|
log.info(
|
||||||
started,
|
f'`pikerd` service {name} started with value {first}'
|
||||||
))
|
)
|
||||||
log.info(
|
try:
|
||||||
f'`pikerd` service {name} started with value {started}'
|
|
||||||
)
|
|
||||||
# wait on any context's return value
|
# wait on any context's return value
|
||||||
# and any final portal result from the
|
# and any final portal result from the
|
||||||
# sub-actor.
|
# sub-actor.
|
||||||
ctx_res: Any = await ctx.wait_for_result()
|
ctx_res: Any = await ctx.result()
|
||||||
|
|
||||||
# NOTE: blocks indefinitely until cancelled
|
# NOTE: blocks indefinitely until cancelled
|
||||||
# either by error from the target context
|
# either by error from the target context
|
||||||
# function or by being cancelled here by the
|
# function or by being cancelled here by the
|
||||||
# surrounding cancel scope.
|
# surrounding cancel scope.
|
||||||
return (
|
return (await portal.result(), ctx_res)
|
||||||
await portal.wait_for_result(),
|
except ContextCancelled as ctxe:
|
||||||
ctx_res,
|
canceller: tuple[str, str] = ctxe.canceller
|
||||||
)
|
our_uid: tuple[str, str] = current_actor().uid
|
||||||
|
if (
|
||||||
|
canceller != portal.channel.uid
|
||||||
|
and
|
||||||
|
canceller != our_uid
|
||||||
|
):
|
||||||
|
log.cancel(
|
||||||
|
f'Actor-service {name} was remotely cancelled?\n'
|
||||||
|
f'remote canceller: {canceller}\n'
|
||||||
|
f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n'
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
except ContextCancelled as ctxe:
|
|
||||||
canceller: tuple[str, str] = ctxe.canceller
|
|
||||||
our_uid: tuple[str, str] = current_actor().uid
|
|
||||||
if (
|
|
||||||
canceller != portal.chan.uid
|
|
||||||
and
|
|
||||||
canceller != our_uid
|
|
||||||
):
|
|
||||||
log.cancel(
|
|
||||||
f'Actor-service `{name}` was remotely cancelled by a peer?\n'
|
|
||||||
|
|
||||||
# TODO: this would be a good spot to use
|
|
||||||
# a respawn feature Bo
|
|
||||||
f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n'
|
|
||||||
|
|
||||||
f'cancellee: {portal.chan.uid}\n'
|
finally:
|
||||||
f'canceller: {canceller}\n'
|
await portal.cancel_actor()
|
||||||
)
|
complete.set()
|
||||||
else:
|
self.service_tasks.pop(name)
|
||||||
raise
|
|
||||||
|
|
||||||
finally:
|
cs, complete, first = await self.service_n.start(open_context_in_task)
|
||||||
# NOTE: the ctx MUST be cancelled first if we
|
|
||||||
# don't want the above `ctx.wait_for_result()` to
|
|
||||||
# raise a self-ctxc. WHY, well since from the ctx's
|
|
||||||
# perspective the cancel request will have
|
|
||||||
# arrived out-out-of-band at the `Actor.cancel()`
|
|
||||||
# level, thus `Context.cancel_called == False`,
|
|
||||||
# meaning `ctx._is_self_cancelled() == False`.
|
|
||||||
# with trio.CancelScope(shield=True):
|
|
||||||
# await ctx.cancel()
|
|
||||||
await portal.cancel_actor()
|
|
||||||
complete.set()
|
|
||||||
self.service_tasks.pop(name)
|
|
||||||
|
|
||||||
cs, sub_ctx, complete, started = await self.service_n.start(
|
|
||||||
open_context_in_task
|
|
||||||
)
|
|
||||||
|
|
||||||
# store the cancel scope and portal for later cancellation or
|
# store the cancel scope and portal for later cancellation or
|
||||||
# retstart if needed.
|
# retstart if needed.
|
||||||
self.service_tasks[name] = (cs, sub_ctx, portal, complete)
|
self.service_tasks[name] = (cs, portal, complete)
|
||||||
return cs, sub_ctx, started
|
|
||||||
|
|
||||||
|
return cs, first
|
||||||
|
|
||||||
|
@classmethod
|
||||||
async def cancel_service(
|
async def cancel_service(
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
|
@ -334,80 +158,8 @@ class ServiceMngr:
|
||||||
|
|
||||||
'''
|
'''
|
||||||
log.info(f'Cancelling `pikerd` service {name}')
|
log.info(f'Cancelling `pikerd` service {name}')
|
||||||
cs, sub_ctx, portal, complete = self.service_tasks[name]
|
cs, portal, complete = self.service_tasks[name]
|
||||||
|
cs.cancel()
|
||||||
# cs.cancel()
|
|
||||||
await sub_ctx.cancel()
|
|
||||||
await complete.wait()
|
await complete.wait()
|
||||||
|
assert name not in self.service_tasks, \
|
||||||
if name in self.service_tasks:
|
f'Serice task for {name} not terminated?'
|
||||||
# TODO: custom err?
|
|
||||||
# raise ServiceError(
|
|
||||||
raise RuntimeError(
|
|
||||||
f'Serice task for {name} not terminated?'
|
|
||||||
)
|
|
||||||
|
|
||||||
# assert name not in self.service_tasks, \
|
|
||||||
# f'Serice task for {name} not terminated?'
|
|
||||||
|
|
||||||
async def start_service(
|
|
||||||
self,
|
|
||||||
daemon_name: str,
|
|
||||||
ctx_ep: Callable, # kwargs must `partial`-ed in!
|
|
||||||
|
|
||||||
debug_mode: bool = False,
|
|
||||||
**tractor_actor_kwargs,
|
|
||||||
|
|
||||||
) -> Context:
|
|
||||||
'''
|
|
||||||
Start a "service" task in a new sub-actor (daemon) and manage it's lifetime
|
|
||||||
indefinitely.
|
|
||||||
|
|
||||||
Services can be cancelled/shutdown using `.cancel_service()`.
|
|
||||||
|
|
||||||
'''
|
|
||||||
entry: tuple|None = self.service_tasks.get(daemon_name)
|
|
||||||
if entry:
|
|
||||||
(cs, sub_ctx, portal, complete) = entry
|
|
||||||
return sub_ctx
|
|
||||||
|
|
||||||
if daemon_name not in self.service_tasks:
|
|
||||||
portal = await self.actor_n.start_actor(
|
|
||||||
daemon_name,
|
|
||||||
debug_mode=( # maybe set globally during allocate
|
|
||||||
debug_mode
|
|
||||||
or
|
|
||||||
self.debug_mode
|
|
||||||
),
|
|
||||||
**tractor_actor_kwargs,
|
|
||||||
)
|
|
||||||
ctx_kwargs: dict[str, Any] = {}
|
|
||||||
if isinstance(ctx_ep, functools.partial):
|
|
||||||
ctx_kwargs: dict[str, Any] = ctx_ep.keywords
|
|
||||||
ctx_ep: Callable = ctx_ep.func
|
|
||||||
|
|
||||||
(cs, sub_ctx, started) = await self.start_service_task(
|
|
||||||
daemon_name,
|
|
||||||
portal,
|
|
||||||
ctx_ep,
|
|
||||||
**ctx_kwargs,
|
|
||||||
)
|
|
||||||
|
|
||||||
return sub_ctx
|
|
||||||
|
|
||||||
|
|
||||||
# TODO:
|
|
||||||
# -[ ] factor all the common shit from `.data._sampling`
|
|
||||||
# and `.brokers._daemon` into here / `ServiceMngr`
|
|
||||||
# in terms of allocating the `Portal` as part of the
|
|
||||||
# "service-in-subactor" starting!
|
|
||||||
# -[ ] move to `tractor.hilevel._service`, import and use here!
|
|
||||||
# NOTE: purposely leaks the ref to the mod-scope Bo
|
|
||||||
# import tractor
|
|
||||||
# from tractor.hilevel import (
|
|
||||||
# open_service_mngr,
|
|
||||||
# ServiceMngr,
|
|
||||||
# )
|
|
||||||
# mngr: ServiceMngr|None = None
|
|
||||||
# with tractor.hilevel.open_service_mngr() as mngr:
|
|
||||||
# Services = proxy(mngr)
|
|
||||||
|
|
|
@ -21,13 +21,11 @@ from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: oof, needs to be changed to `httpx`!
|
|
||||||
import asks
|
import asks
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
import docker
|
import docker
|
||||||
from ._ahab import DockerContainer
|
from ._ahab import DockerContainer
|
||||||
from . import ServiceMngr
|
|
||||||
|
|
||||||
from ._util import log # sub-sys logger
|
from ._util import log # sub-sys logger
|
||||||
from ._util import (
|
from ._util import (
|
||||||
|
@ -129,7 +127,7 @@ def start_elasticsearch(
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def start_ahab_daemon(
|
async def start_ahab_daemon(
|
||||||
service_mngr: ServiceMngr,
|
service_mngr: Services,
|
||||||
user_config: dict | None = None,
|
user_config: dict | None = None,
|
||||||
loglevel: str | None = None,
|
loglevel: str | None = None,
|
||||||
|
|
||||||
|
|
|
@ -53,7 +53,7 @@ import pendulum
|
||||||
# import purerpc
|
# import purerpc
|
||||||
|
|
||||||
from ..data.feed import maybe_open_feed
|
from ..data.feed import maybe_open_feed
|
||||||
from . import ServiceMngr
|
from . import Services
|
||||||
from ._util import (
|
from ._util import (
|
||||||
log, # sub-sys logger
|
log, # sub-sys logger
|
||||||
get_console_log,
|
get_console_log,
|
||||||
|
@ -233,7 +233,7 @@ def start_marketstore(
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def start_ahab_daemon(
|
async def start_ahab_daemon(
|
||||||
service_mngr: ServiceMngr,
|
service_mngr: Services,
|
||||||
user_config: dict | None = None,
|
user_config: dict | None = None,
|
||||||
loglevel: str | None = None,
|
loglevel: str | None = None,
|
||||||
|
|
||||||
|
|
|
@ -161,12 +161,7 @@ class NativeStorageClient:
|
||||||
|
|
||||||
def index_files(self):
|
def index_files(self):
|
||||||
for path in self._datadir.iterdir():
|
for path in self._datadir.iterdir():
|
||||||
if (
|
if path.name in {'borked', 'expired',}:
|
||||||
path.name in {'borked', 'expired',}
|
|
||||||
or
|
|
||||||
'.parquet' not in str(path)
|
|
||||||
):
|
|
||||||
# ignore all non-apache files (for now)
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
key: str = path.name.rstrip('.parquet')
|
key: str = path.name.rstrip('.parquet')
|
||||||
|
|
|
@ -44,10 +44,8 @@ import trio
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
import tractor
|
import tractor
|
||||||
from pendulum import (
|
from pendulum import (
|
||||||
Interval,
|
|
||||||
DateTime,
|
DateTime,
|
||||||
Duration,
|
Duration,
|
||||||
duration as mk_duration,
|
|
||||||
from_timestamp,
|
from_timestamp,
|
||||||
)
|
)
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
@ -216,8 +214,7 @@ async def maybe_fill_null_segments(
|
||||||
# pair, immediately stop backfilling?
|
# pair, immediately stop backfilling?
|
||||||
if (
|
if (
|
||||||
start_dt
|
start_dt
|
||||||
and
|
and end_dt < start_dt
|
||||||
end_dt < start_dt
|
|
||||||
):
|
):
|
||||||
await tractor.pause()
|
await tractor.pause()
|
||||||
break
|
break
|
||||||
|
@ -265,7 +262,6 @@ async def maybe_fill_null_segments(
|
||||||
except tractor.ContextCancelled:
|
except tractor.ContextCancelled:
|
||||||
# log.exception
|
# log.exception
|
||||||
await tractor.pause()
|
await tractor.pause()
|
||||||
raise
|
|
||||||
|
|
||||||
null_segs_detected.set()
|
null_segs_detected.set()
|
||||||
# RECHECK for more null-gaps
|
# RECHECK for more null-gaps
|
||||||
|
@ -353,7 +349,7 @@ async def maybe_fill_null_segments(
|
||||||
|
|
||||||
async def start_backfill(
|
async def start_backfill(
|
||||||
get_hist,
|
get_hist,
|
||||||
def_frame_duration: Duration,
|
frame_types: dict[str, Duration] | None,
|
||||||
mod: ModuleType,
|
mod: ModuleType,
|
||||||
mkt: MktPair,
|
mkt: MktPair,
|
||||||
shm: ShmArray,
|
shm: ShmArray,
|
||||||
|
@ -383,23 +379,22 @@ async def start_backfill(
|
||||||
update_start_on_prepend: bool = False
|
update_start_on_prepend: bool = False
|
||||||
if backfill_until_dt is None:
|
if backfill_until_dt is None:
|
||||||
|
|
||||||
# TODO: per-provider default history-durations?
|
# TODO: drop this right and just expose the backfill
|
||||||
# -[ ] inside the `open_history_client()` config allow
|
# limits inside a [storage] section in conf.toml?
|
||||||
# declaring the history duration limits instead of
|
# when no tsdb "last datum" is provided, we just load
|
||||||
# guessing and/or applying the same limits to all?
|
# some near-term history.
|
||||||
#
|
# periods = {
|
||||||
# -[ ] allow declaring (default) per-provider backfill
|
# 1: {'days': 1},
|
||||||
# limits inside a [storage] sub-section in conf.toml?
|
# 60: {'days': 14},
|
||||||
#
|
# }
|
||||||
# NOTE, when no tsdb "last datum" is provided, we just
|
|
||||||
# load some near-term history by presuming a "decently
|
# do a decently sized backfill and load it into storage.
|
||||||
# large" 60s duration limit and a much shorter 1s range.
|
|
||||||
periods = {
|
periods = {
|
||||||
1: {'days': 2},
|
1: {'days': 2},
|
||||||
60: {'years': 6},
|
60: {'years': 6},
|
||||||
}
|
}
|
||||||
period_duration: int = periods[timeframe]
|
period_duration: int = periods[timeframe]
|
||||||
update_start_on_prepend: bool = True
|
update_start_on_prepend = True
|
||||||
|
|
||||||
# NOTE: manually set the "latest" datetime which we intend to
|
# NOTE: manually set the "latest" datetime which we intend to
|
||||||
# backfill history "until" so as to adhere to the history
|
# backfill history "until" so as to adhere to the history
|
||||||
|
@ -421,6 +416,7 @@ async def start_backfill(
|
||||||
f'backfill_until_dt: {backfill_until_dt}\n'
|
f'backfill_until_dt: {backfill_until_dt}\n'
|
||||||
f'last_start_dt: {last_start_dt}\n'
|
f'last_start_dt: {last_start_dt}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
(
|
(
|
||||||
array,
|
array,
|
||||||
|
@ -430,114 +426,71 @@ async def start_backfill(
|
||||||
timeframe,
|
timeframe,
|
||||||
end_dt=last_start_dt,
|
end_dt=last_start_dt,
|
||||||
)
|
)
|
||||||
|
|
||||||
except NoData as _daterr:
|
except NoData as _daterr:
|
||||||
orig_last_start_dt: datetime = last_start_dt
|
# 3 cases:
|
||||||
gap_report: str = (
|
# - frame in the middle of a legit venue gap
|
||||||
f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n'
|
# - history actually began at the `last_start_dt`
|
||||||
f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n'
|
# - some other unknown error (ib blocking the
|
||||||
f'last_start_dt: {orig_last_start_dt}\n\n'
|
# history bc they don't want you seeing how they
|
||||||
f'bf_until: {backfill_until_dt}\n'
|
# cucked all the tinas..)
|
||||||
)
|
if dur := frame_types.get(timeframe):
|
||||||
# EMPTY FRAME signal with 3 (likely) causes:
|
# decrement by a frame's worth of duration and
|
||||||
#
|
# retry a few times.
|
||||||
# 1. range contains legit gap in venue history
|
last_start_dt.subtract(
|
||||||
# 2. history actually (edge case) **began** at the
|
seconds=dur.total_seconds()
|
||||||
# value `last_start_dt`
|
|
||||||
# 3. some other unknown error (ib blocking the
|
|
||||||
# history-query bc they don't want you seeing how
|
|
||||||
# they cucked all the tinas.. like with options
|
|
||||||
# hist)
|
|
||||||
#
|
|
||||||
if def_frame_duration:
|
|
||||||
# decrement by a duration's (frame) worth of time
|
|
||||||
# as maybe indicated by the backend to see if we
|
|
||||||
# can get older data before this possible
|
|
||||||
# "history gap".
|
|
||||||
last_start_dt: datetime = last_start_dt.subtract(
|
|
||||||
seconds=def_frame_duration.total_seconds()
|
|
||||||
)
|
)
|
||||||
gap_report += (
|
log.warning(
|
||||||
f'Decrementing `end_dt` and retrying with,\n'
|
f'{mod.name} -> EMPTY FRAME for end_dt?\n'
|
||||||
f'def_frame_duration: {def_frame_duration}\n'
|
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
|
||||||
f'(new) last_start_dt: {last_start_dt}\n'
|
'bf_until <- last_start_dt:\n'
|
||||||
|
f'{backfill_until_dt} <- {last_start_dt}\n'
|
||||||
|
f'Decrementing `end_dt` by {dur} and retry..\n'
|
||||||
)
|
)
|
||||||
log.warning(gap_report)
|
|
||||||
# skip writing to shm/tsdb and try the next
|
|
||||||
# duration's worth of prior history.
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
else:
|
|
||||||
# await tractor.pause()
|
|
||||||
raise DataUnavailable(gap_report)
|
|
||||||
|
|
||||||
# broker says there never was or is no more history to pull
|
# broker says there never was or is no more history to pull
|
||||||
except DataUnavailable as due:
|
except DataUnavailable:
|
||||||
message: str = due.args[0]
|
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Provider {mod.name!r} halted backfill due to,\n\n'
|
f'NO-MORE-DATA in range?\n'
|
||||||
|
f'`{mod.name}` halted history:\n'
|
||||||
f'{message}\n'
|
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
|
||||||
|
'bf_until <- last_start_dt:\n'
|
||||||
f'fqme: {mkt.fqme}\n'
|
f'{backfill_until_dt} <- {last_start_dt}\n'
|
||||||
f'timeframe: {timeframe}\n'
|
|
||||||
f'last_start_dt: {last_start_dt}\n'
|
|
||||||
f'bf_until: {backfill_until_dt}\n'
|
|
||||||
)
|
)
|
||||||
# UGH: what's a better way?
|
|
||||||
# TODO: backends are responsible for being correct on
|
# ugh, what's a better way?
|
||||||
# this right!?
|
# TODO: fwiw, we probably want a way to signal a throttle
|
||||||
# -[ ] in the `ib` case we could maybe offer some way
|
# condition (eg. with ib) so that we can halt the
|
||||||
# to halt the request loop until the condition is
|
# request loop until the condition is resolved?
|
||||||
# resolved or should the backend be entirely in
|
if timeframe > 1:
|
||||||
# charge of solving such faults? yes, right?
|
await tractor.pause()
|
||||||
return
|
return
|
||||||
|
|
||||||
time: np.ndarray = array['time']
|
|
||||||
assert (
|
assert (
|
||||||
time[0]
|
array['time'][0]
|
||||||
==
|
==
|
||||||
next_start_dt.timestamp()
|
next_start_dt.timestamp()
|
||||||
)
|
)
|
||||||
|
|
||||||
assert time[-1] == next_end_dt.timestamp()
|
diff = last_start_dt - next_start_dt
|
||||||
|
frame_time_diff_s = diff.seconds
|
||||||
expected_dur: Interval = last_start_dt - next_start_dt
|
|
||||||
|
|
||||||
# frame's worth of sample-period-steps, in seconds
|
# frame's worth of sample-period-steps, in seconds
|
||||||
frame_size_s: float = len(array) * timeframe
|
frame_size_s: float = len(array) * timeframe
|
||||||
recv_frame_dur: Duration = (
|
expected_frame_size_s: float = frame_size_s + timeframe
|
||||||
from_timestamp(array[-1]['time'])
|
if frame_time_diff_s > expected_frame_size_s:
|
||||||
-
|
|
||||||
from_timestamp(array[0]['time'])
|
|
||||||
)
|
|
||||||
if (
|
|
||||||
(lt_frame := (recv_frame_dur < expected_dur))
|
|
||||||
or
|
|
||||||
(null_frame := (frame_size_s == 0))
|
|
||||||
# ^XXX, should NEVER hit now!
|
|
||||||
):
|
|
||||||
# XXX: query result includes a start point prior to our
|
# XXX: query result includes a start point prior to our
|
||||||
# expected "frame size" and thus is likely some kind of
|
# expected "frame size" and thus is likely some kind of
|
||||||
# history gap (eg. market closed period, outage, etc.)
|
# history gap (eg. market closed period, outage, etc.)
|
||||||
# so just report it to console for now.
|
# so just report it to console for now.
|
||||||
if lt_frame:
|
|
||||||
reason = 'Possible GAP (or first-datum)'
|
|
||||||
else:
|
|
||||||
assert null_frame
|
|
||||||
reason = 'NULL-FRAME'
|
|
||||||
|
|
||||||
missing_dur: Interval = expected_dur.end - recv_frame_dur.end
|
|
||||||
log.warning(
|
log.warning(
|
||||||
f'{timeframe}s-series {reason} detected!\n'
|
'GAP DETECTED:\n'
|
||||||
f'fqme: {mkt.fqme}\n'
|
f'last_start_dt: {last_start_dt}\n'
|
||||||
f'last_start_dt: {last_start_dt}\n\n'
|
f'diff: {diff}\n'
|
||||||
f'recv interval: {recv_frame_dur}\n'
|
f'frame_time_diff_s: {frame_time_diff_s}\n'
|
||||||
f'expected interval: {expected_dur}\n\n'
|
|
||||||
|
|
||||||
f'Missing duration of history of {missing_dur.in_words()!r}\n'
|
|
||||||
f'{missing_dur}\n'
|
|
||||||
)
|
)
|
||||||
# await tractor.pause()
|
|
||||||
|
|
||||||
to_push = diff_history(
|
to_push = diff_history(
|
||||||
array,
|
array,
|
||||||
|
@ -612,8 +565,7 @@ async def start_backfill(
|
||||||
# long-term storage.
|
# long-term storage.
|
||||||
if (
|
if (
|
||||||
storage is not None
|
storage is not None
|
||||||
and
|
and write_tsdb
|
||||||
write_tsdb
|
|
||||||
):
|
):
|
||||||
log.info(
|
log.info(
|
||||||
f'Writing {ln} frame to storage:\n'
|
f'Writing {ln} frame to storage:\n'
|
||||||
|
@ -626,7 +578,6 @@ async def start_backfill(
|
||||||
'crypto',
|
'crypto',
|
||||||
'crypto_currency',
|
'crypto_currency',
|
||||||
'fiat', # a "forex pair"
|
'fiat', # a "forex pair"
|
||||||
'perpetual_future', # stupid "perps" from cex land
|
|
||||||
}:
|
}:
|
||||||
# for now, our table key schema is not including
|
# for now, our table key schema is not including
|
||||||
# the dst[/src] source asset token.
|
# the dst[/src] source asset token.
|
||||||
|
@ -734,7 +685,7 @@ async def back_load_from_tsdb(
|
||||||
last_tsdb_dt
|
last_tsdb_dt
|
||||||
and latest_start_dt
|
and latest_start_dt
|
||||||
):
|
):
|
||||||
backfilled_size_s: Duration = (
|
backfilled_size_s = (
|
||||||
latest_start_dt - last_tsdb_dt
|
latest_start_dt - last_tsdb_dt
|
||||||
).seconds
|
).seconds
|
||||||
# if the shm buffer len is not large enough to contain
|
# if the shm buffer len is not large enough to contain
|
||||||
|
@ -957,8 +908,6 @@ async def tsdb_backfill(
|
||||||
f'{pformat(config)}\n'
|
f'{pformat(config)}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# concurrently load the provider's most-recent-frame AND any
|
|
||||||
# pre-existing tsdb history already saved in `piker` storage.
|
|
||||||
dt_eps: list[DateTime, DateTime] = []
|
dt_eps: list[DateTime, DateTime] = []
|
||||||
async with trio.open_nursery() as tn:
|
async with trio.open_nursery() as tn:
|
||||||
tn.start_soon(
|
tn.start_soon(
|
||||||
|
@ -969,6 +918,7 @@ async def tsdb_backfill(
|
||||||
timeframe,
|
timeframe,
|
||||||
config,
|
config,
|
||||||
)
|
)
|
||||||
|
|
||||||
tsdb_entry: tuple = await load_tsdb_hist(
|
tsdb_entry: tuple = await load_tsdb_hist(
|
||||||
storage,
|
storage,
|
||||||
mkt,
|
mkt,
|
||||||
|
@ -997,25 +947,6 @@ async def tsdb_backfill(
|
||||||
mr_end_dt,
|
mr_end_dt,
|
||||||
) = dt_eps
|
) = dt_eps
|
||||||
|
|
||||||
first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds
|
|
||||||
calced_frame_size: Duration = mk_duration(
|
|
||||||
seconds=first_frame_dur_s,
|
|
||||||
)
|
|
||||||
# NOTE, attempt to use the backend declared default frame
|
|
||||||
# sizing (as allowed by their time-series query APIs) and
|
|
||||||
# if not provided try to construct a default from the
|
|
||||||
# first frame received above.
|
|
||||||
def_frame_durs: dict[
|
|
||||||
int,
|
|
||||||
Duration,
|
|
||||||
]|None = config.get('frame_types', None)
|
|
||||||
if def_frame_durs:
|
|
||||||
def_frame_size: Duration = def_frame_durs[timeframe]
|
|
||||||
assert def_frame_size == calced_frame_size
|
|
||||||
else:
|
|
||||||
# use what we calced from first frame above.
|
|
||||||
def_frame_size = calced_frame_size
|
|
||||||
|
|
||||||
# NOTE: when there's no offline data, there's 2 cases:
|
# NOTE: when there's no offline data, there's 2 cases:
|
||||||
# - data backend doesn't support timeframe/sample
|
# - data backend doesn't support timeframe/sample
|
||||||
# period (in which case `dt_eps` should be `None` and
|
# period (in which case `dt_eps` should be `None` and
|
||||||
|
@ -1046,7 +977,7 @@ async def tsdb_backfill(
|
||||||
partial(
|
partial(
|
||||||
start_backfill,
|
start_backfill,
|
||||||
get_hist=get_hist,
|
get_hist=get_hist,
|
||||||
def_frame_duration=def_frame_size,
|
frame_types=config.get('frame_types', None),
|
||||||
mod=mod,
|
mod=mod,
|
||||||
mkt=mkt,
|
mkt=mkt,
|
||||||
shm=shm,
|
shm=shm,
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -25,11 +25,11 @@ build-backend = "hatchling.build"
|
||||||
ignore = []
|
ignore = []
|
||||||
|
|
||||||
# https://docs.astral.sh/ruff/settings/#lint_per-file-ignores
|
# https://docs.astral.sh/ruff/settings/#lint_per-file-ignores
|
||||||
# "piker/ui/qt.py" = [
|
"piker/ui/qt.py" = [
|
||||||
# "E402",
|
"E402",
|
||||||
# 'F401', # unused imports (without __all__ or blah as blah)
|
'F401', # unused imports (without __all__ or blah as blah)
|
||||||
# # "F841", # unused variable rules
|
# "F841", # unused variable rules
|
||||||
# ]
|
]
|
||||||
# ignore-init-module-imports = false
|
# ignore-init-module-imports = false
|
||||||
|
|
||||||
# ------ - ------
|
# ------ - ------
|
||||||
|
|
|
@ -10,7 +10,7 @@ from piker import (
|
||||||
config,
|
config,
|
||||||
)
|
)
|
||||||
from piker.service import (
|
from piker.service import (
|
||||||
get_service_mngr,
|
Services,
|
||||||
)
|
)
|
||||||
from piker.log import get_console_log
|
from piker.log import get_console_log
|
||||||
|
|
||||||
|
@ -129,7 +129,7 @@ async def _open_test_pikerd(
|
||||||
) as service_manager,
|
) as service_manager,
|
||||||
):
|
):
|
||||||
# this proc/actor is the pikerd
|
# this proc/actor is the pikerd
|
||||||
assert service_manager is get_service_mngr()
|
assert service_manager is Services
|
||||||
|
|
||||||
async with tractor.wait_for_actor(
|
async with tractor.wait_for_actor(
|
||||||
'pikerd',
|
'pikerd',
|
||||||
|
|
|
@ -26,7 +26,7 @@ import pytest
|
||||||
import tractor
|
import tractor
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
|
|
||||||
from piker.service import ServiceMngr
|
from piker.service import Services
|
||||||
from piker.log import get_logger
|
from piker.log import get_logger
|
||||||
from piker.clearing._messages import (
|
from piker.clearing._messages import (
|
||||||
Order,
|
Order,
|
||||||
|
@ -158,7 +158,7 @@ def load_and_check_pos(
|
||||||
|
|
||||||
|
|
||||||
def test_ems_err_on_bad_broker(
|
def test_ems_err_on_bad_broker(
|
||||||
open_test_pikerd: ServiceMngr,
|
open_test_pikerd: Services,
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
):
|
):
|
||||||
async def load_bad_fqme():
|
async def load_bad_fqme():
|
||||||
|
|
|
@ -15,7 +15,7 @@ import tractor
|
||||||
|
|
||||||
from piker.service import (
|
from piker.service import (
|
||||||
find_service,
|
find_service,
|
||||||
ServiceMngr,
|
Services,
|
||||||
)
|
)
|
||||||
from piker.data import (
|
from piker.data import (
|
||||||
open_feed,
|
open_feed,
|
||||||
|
@ -44,7 +44,7 @@ def test_runtime_boot(
|
||||||
async def main():
|
async def main():
|
||||||
port = 6666
|
port = 6666
|
||||||
daemon_addr = ('127.0.0.1', port)
|
daemon_addr = ('127.0.0.1', port)
|
||||||
services: ServiceMngr
|
services: Services
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
open_test_pikerd(
|
open_test_pikerd(
|
||||||
|
|
Loading…
Reference in New Issue