Compare commits

..

2 Commits

Author SHA1 Message Date
Nelson Torres 7d5ac3561e uv migration 2025-01-30 14:50:31 -03:00
Nelson Torres 740f081d7e Default.nix fix:
Since the last `poetry` update the command `poetry shell` have been
moved to a plugin, this affects the entire instalation, so now we
need to add more deps to the `buildInputs` and added to the
`LD_LIBRARY_PATH`

To run a command now you need to do something like this:

`poetry run piker ...`
2025-01-30 14:36:30 -03:00
31 changed files with 1236 additions and 4045 deletions

View File

@ -30,8 +30,7 @@ from types import ModuleType
from typing import ( from typing import (
Any, Any,
Iterator, Iterator,
Generator, Generator
TYPE_CHECKING,
) )
import pendulum import pendulum
@ -60,10 +59,8 @@ from ..clearing._messages import (
BrokerdPosition, BrokerdPosition,
) )
from piker.types import Struct from piker.types import Struct
from piker.log import get_logger from piker.data._symcache import SymbologyCache
from ..log import get_logger
if TYPE_CHECKING:
from piker.data._symcache import SymbologyCache
log = get_logger(__name__) log = get_logger(__name__)
@ -496,17 +493,6 @@ class Account(Struct):
_mktmap_table: dict[str, MktPair] | None = None, _mktmap_table: dict[str, MktPair] | None = None,
only_require: list[str]|True = True,
# ^list of fqmes that are "required" to be processed from
# this ledger pass; we often don't care about others and
# definitely shouldn't always error in such cases.
# (eg. broker backend loaded that doesn't yet supsport the
# symcache but also, inside the paper engine we don't ad-hoc
# request `get_mkt_info()` for every symbol in the ledger,
# only the one for which we're simulating against).
# TODO, not sure if there's a better soln for this, ideally
# all backends get symcache support afap i guess..
) -> dict[str, Position]: ) -> dict[str, Position]:
''' '''
Update the internal `.pps[str, Position]` table from input Update the internal `.pps[str, Position]` table from input
@ -549,32 +535,11 @@ class Account(Struct):
if _mktmap_table is None: if _mktmap_table is None:
raise raise
required: bool = (
only_require is True
or (
only_require is not True
and
fqme in only_require
)
)
# XXX: caller is allowed to provide a fallback # XXX: caller is allowed to provide a fallback
# mktmap table for the case where a new position is # mktmap table for the case where a new position is
# being added and the preloaded symcache didn't # being added and the preloaded symcache didn't
# have this entry prior (eg. with frickin IB..) # have this entry prior (eg. with frickin IB..)
if ( mkt = _mktmap_table[fqme]
not (mkt := _mktmap_table.get(fqme))
and
required
):
raise
elif not required:
continue
else:
# should be an entry retreived somewhere
assert mkt
if not (pos := pps.get(bs_mktid)): if not (pos := pps.get(bs_mktid)):
@ -691,7 +656,7 @@ class Account(Struct):
def write_config(self) -> None: def write_config(self) -> None:
''' '''
Write the current account state to the user's account TOML file, normally Write the current account state to the user's account TOML file, normally
something like `pps.toml`. something like ``pps.toml``.
''' '''
# TODO: show diff output? # TODO: show diff output?

View File

@ -51,7 +51,6 @@ __brokers__: list[str] = [
'ib', 'ib',
'kraken', 'kraken',
'kucoin', 'kucoin',
'deribit',
# broken but used to work # broken but used to work
# 'questrade', # 'questrade',
@ -62,6 +61,7 @@ __brokers__: list[str] = [
# wstrade # wstrade
# iex # iex
# deribit
# bitso # bitso
] ]

View File

@ -23,7 +23,6 @@ from __future__ import annotations
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from functools import partial
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
@ -191,17 +190,14 @@ def broker_init(
async def spawn_brokerd( async def spawn_brokerd(
brokername: str, brokername: str,
loglevel: str | None = None, loglevel: str | None = None,
**tractor_kwargs, **tractor_kwargs,
) -> bool: ) -> bool:
'''
Spawn a `brokerd.<backendname>` subactor service daemon
using `pikerd`'s service mngr.
'''
from piker.service._util import log # use service mngr log from piker.service._util import log # use service mngr log
log.info(f'Spawning {brokername} broker daemon') log.info(f'Spawning {brokername} broker daemon')
@ -221,35 +217,27 @@ async def spawn_brokerd(
# ask `pikerd` to spawn a new sub-actor and manage it under its # ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery # actor nursery
from piker.service import ( from piker.service import Services
get_service_mngr,
ServiceMngr,
)
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
mngr: ServiceMngr = get_service_mngr()
ctx: tractor.Context = await mngr.start_service(
daemon_name=dname,
ctx_ep=partial(
# signature of target root-task endpoint
daemon_fixture_ep,
# passed to daemon_fixture_ep(**kwargs) dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
brokername=brokername, portal = await Services.actor_n.start_actor(
loglevel=loglevel, dname,
), enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'),
debug_mode=mngr.debug_mode, debug_mode=Services.debug_mode,
loglevel=loglevel,
enable_modules=(
_data_mods
+
tractor_kwargs.pop('enable_modules')
),
**tractor_kwargs **tractor_kwargs
) )
assert (
not ctx.cancel_called # NOTE: the service mngr expects an already spawned actor + its
and ctx.portal # parent side # portal ref in order to do non-blocking setup of brokerd
and dname in ctx.chan.uid # subactor is named as desired # service nursery.
await Services.start_service_task(
dname,
portal,
# signature of target root-task endpoint
daemon_fixture_ep,
brokername=brokername,
loglevel=loglevel,
) )
return True return True
@ -274,7 +262,8 @@ async def maybe_spawn_brokerd(
from piker.service import maybe_spawn_daemon from piker.service import maybe_spawn_daemon
async with maybe_spawn_daemon( async with maybe_spawn_daemon(
service_name=f'brokerd.{brokername}',
f'brokerd.{brokername}',
service_task_target=spawn_brokerd, service_task_target=spawn_brokerd,
spawn_args={ spawn_args={
'brokername': brokername, 'brokername': brokername,

View File

@ -567,7 +567,6 @@ class Client:
) -> str: ) -> str:
return { return {
'USDTM': 'usdtm_futes', 'USDTM': 'usdtm_futes',
'SPOT': 'spot',
# 'COINM': 'coin_futes', # 'COINM': 'coin_futes',
# ^-TODO-^ bc someone might want it..? # ^-TODO-^ bc someone might want it..?
}[pair.venue] }[pair.venue]

View File

@ -181,6 +181,7 @@ class FutesPair(Pair):
quoteAsset: str # 'USDT', quoteAsset: str # 'USDT',
quotePrecision: int # 8, quotePrecision: int # 8,
requiredMarginPercent: float # '5.0000', requiredMarginPercent: float # '5.0000',
settlePlan: int # 0,
timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'], timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'],
triggerProtect: float # '0.0500', triggerProtect: float # '0.0500',
underlyingSubType: list[str] # ['PoW'], underlyingSubType: list[str] # ['PoW'],

View File

@ -25,7 +25,6 @@ from .api import (
get_client, get_client,
) )
from .feed import ( from .feed import (
get_mkt_info,
open_history_client, open_history_client,
open_symbol_search, open_symbol_search,
stream_quotes, stream_quotes,
@ -35,20 +34,15 @@ from .feed import (
# open_trade_dialog, # open_trade_dialog,
# norm_trade_records, # norm_trade_records,
# ) # )
from .venues import (
OptionPair,
)
log = get_logger(__name__) log = get_logger(__name__)
__all__ = [ __all__ = [
'get_client', 'get_client',
# 'trades_dialogue', # 'trades_dialogue',
'get_mkt_info',
'open_history_client', 'open_history_client',
'open_symbol_search', 'open_symbol_search',
'stream_quotes', 'stream_quotes',
'OptionPair',
# 'norm_trade_records', # 'norm_trade_records',
] ]

File diff suppressed because it is too large Load Diff

View File

@ -18,59 +18,38 @@
Deribit backend. Deribit backend.
''' '''
from __future__ import annotations
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from datetime import datetime from datetime import datetime
from typing import ( from typing import Any, Optional, Callable
# Any,
# Optional,
Callable,
)
# from pprint import pformat
import time import time
import cryptofeed
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from pendulum import ( import pendulum
from_timestamp, from rapidfuzz import process as fuzzy
)
import numpy as np import numpy as np
import tractor import tractor
from piker.accounting import ( from piker.brokers import open_cached_client
Asset, from piker.log import get_logger, get_console_log
MktPair, from piker.data import ShmArray
unpack_fqme, from piker.brokers._util import (
) BrokerError,
from piker.brokers import (
open_cached_client,
NoData,
DataUnavailable, DataUnavailable,
) )
from piker._cacheables import (
async_lifo_cache,
)
from piker.log import (
get_logger,
mk_repr,
)
from piker.data.validate import FeedInit
from cryptofeed import FeedHandler
from cryptofeed.defines import (
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
)
from cryptofeed.symbols import Symbol
from .api import ( from .api import (
Client, Client, Trade,
# get_config, get_config,
piker_sym_to_cb_sym, str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
cb_sym_to_deribit_inst,
str_to_cb_sym,
maybe_open_price_feed maybe_open_price_feed
) )
from .venues import (
Pair,
OptionPair,
Trade,
)
_spawn_kwargs = { _spawn_kwargs = {
'infect_asyncio': True, 'infect_asyncio': True,
@ -85,215 +64,90 @@ async def open_history_client(
mkt: MktPair, mkt: MktPair,
) -> tuple[Callable, int]: ) -> tuple[Callable, int]:
fnstrument: str = mkt.bs_fqme
# TODO implement history getter for the new storage layer. # TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client: async with open_cached_client('deribit') as client:
pair: OptionPair = client._pairs[mkt.dst.name]
# XXX NOTE, the cuckers use ms !!!
creation_time_s: int = pair.creation_timestamp/1000
async def get_ohlc( async def get_ohlc(
timeframe: float, end_dt: Optional[datetime] = None,
end_dt: datetime | None = None, start_dt: Optional[datetime] = None,
start_dt: datetime | None = None,
) -> tuple[ ) -> tuple[
np.ndarray, np.ndarray,
datetime, # start datetime, # start
datetime, # end datetime, # end
]: ]:
if timeframe != 60:
raise DataUnavailable('Only 1m bars are supported')
array: np.ndarray = await client.bars( array = await client.bars(
mkt, instrument,
start_dt=start_dt, start_dt=start_dt,
end_dt=end_dt, end_dt=end_dt,
) )
if len(array) == 0: if len(array) == 0:
if ( raise DataUnavailable
end_dt is None
):
raise DataUnavailable(
'No history seems to exist yet?\n\n'
f'{mkt}'
)
elif (
end_dt
and
end_dt.timestamp() < creation_time_s
):
# the contract can't have history
# before it was created.
pair_type_str: str = type(pair).__name__
create_dt: datetime = from_timestamp(creation_time_s)
raise DataUnavailable(
f'No history prior to\n'
f'`{pair_type_str}.creation_timestamp: int = '
f'{pair.creation_timestamp}\n\n'
f'------ deribit sux ------\n'
f'WHICH IN "NORMAL PEOPLE WHO USE EPOCH TIME" form is,\n'
f'creation_time_s: {creation_time_s}\n'
f'create_dt: {create_dt}\n'
)
raise NoData(
f'No frame for {start_dt} -> {end_dt}\n'
)
start_dt = from_timestamp(array[0]['time']) start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = from_timestamp(array[-1]['time']) end_dt = pendulum.from_timestamp(array[-1]['time'])
times = array['time']
if not times.any():
raise ValueError(
'Bad frame with null-times?\n\n'
f'{times}'
)
if end_dt is None:
inow: int = round(time.time())
if (inow - times[-1]) > 60:
await tractor.pause()
return array, start_dt, end_dt return array, start_dt, end_dt
yield ( yield get_ohlc, {'erlangs': 3, 'rate': 3}
get_ohlc,
{ # backfill config
'erlangs': 3,
'rate': 3,
}
)
@async_lifo_cache()
async def get_mkt_info(
fqme: str,
) -> tuple[MktPair, Pair|OptionPair] | None:
# uppercase since kraken bs_mktid is always upper
if 'deribit' not in fqme.lower():
fqme += '.deribit'
mkt_mode: str = ''
broker, mkt_ep, venue, expiry = unpack_fqme(fqme)
# NOTE: we always upper case all tokens to be consistent with
# binance's symbology style for pairs, like `BTCUSDT`, but in
# theory we could also just keep things lower case; as long as
# we're consistent and the symcache matches whatever this func
# returns, always!
expiry: str = expiry.upper()
venue: str = venue.upper()
# venue_lower: str = venue.lower()
mkt_mode: str = 'option'
async with open_cached_client(
'deribit',
) as client:
assets: dict[str, Asset] = await client.get_assets()
pair_str: str = mkt_ep.lower()
pair: Pair = await client.exch_info(
sym=pair_str,
)
mkt_mode = pair.venue
client.mkt_mode = mkt_mode
dst: Asset | None = assets.get(pair.bs_dst_asset)
src: Asset | None = assets.get(pair.bs_src_asset)
mkt = MktPair(
dst=dst,
src=src,
price_tick=pair.price_tick,
size_tick=pair.size_tick,
bs_mktid=pair.symbol,
venue=mkt_mode,
broker='deribit',
_atype=mkt_mode,
_fqme_without_src=True,
# expiry=pair.expiry,
# XXX TODO, currently we don't use it since it's
# already "described" in the `OptionPair.symbol: str`
# and if we slap in the ISO repr it's kinda hideous..
# -[ ] figure out the best either std
)
return mkt, pair
async def stream_quotes( async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,
symbols: list[str], symbols: list[str],
feed_is_live: trio.Event, feed_is_live: trio.Event,
loglevel: str = None,
# startup sync # startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
''' # XXX: required to propagate ``tractor`` loglevel to piker logging
Open a live quote stream for the market set defined by `symbols`. get_console_log(loglevel or tractor.current_actor().loglevel)
Internally this starts a `cryptofeed.FeedHandler` inside an `asyncio`-side sym = symbols[0]
task and relays through L1 and `Trade` msgs here to our `trio.Task`.
'''
sym = symbols[0].split('.')[0]
init_msgs: list[FeedInit] = []
# multiline nested `dict` formatter (since rn quote-msgs are
# just that).
pfmt: Callable[[str], str] = mk_repr(
# so we can see `deribit`'s delightfully mega-long bs fields..
maxstring=100,
)
async with ( async with (
open_cached_client('deribit') as client, open_cached_client('deribit') as client,
send_chan as send_chan send_chan as send_chan
): ):
mkt: MktPair
pair: Pair
mkt, pair = await get_mkt_info(sym)
# build out init msgs according to latest spec init_msgs = {
init_msgs.append( # pass back token, and bool, signalling if we're the writer
FeedInit( # and that history has been written
mkt_info=mkt, sym: {
) 'symbol_info': {
) 'asset_type': 'option',
# build `cryptofeed` feed-handle 'price_tick_size': 0.0005
cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym) },
'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
},
}
from_cf: tractor.to_asyncio.LinkedTaskChannel nsym = piker_sym_to_cb_sym(sym)
async with maybe_open_price_feed(sym) as from_cf:
# load the "last trades" summary async with maybe_open_price_feed(sym) as stream:
last_trades_res: cryptofeed.LastTradesResult = await client.last_trades(
cb_sym_to_deribit_inst(cf_sym),
count=1,
)
last_trades: list[Trade] = last_trades_res.trades
# TODO, do we even need this or will the above always cache = await client.cache_symbols()
# work?
# if not last_trades:
# await tractor.pause()
# async for typ, quote in from_cf:
# if typ == 'trade':
# last_trade = Trade(**(quote['data']))
# break
# else: last_trades = (await client.last_trades(
last_trade = Trade( cb_sym_to_deribit_inst(nsym), count=1)).trades
**(last_trades[0])
)
first_quote: dict = { if len(last_trades) == 0:
last_trade = None
async for typ, quote in stream:
if typ == 'trade':
last_trade = Trade(**(quote['data']))
break
else:
last_trade = Trade(**(last_trades[0]))
first_quote = {
'symbol': sym, 'symbol': sym,
'last': last_trade.price, 'last': last_trade.price,
'brokerd_ts': last_trade.timestamp, 'brokerd_ts': last_trade.timestamp,
@ -304,84 +158,13 @@ async def stream_quotes(
'broker_ts': last_trade.timestamp 'broker_ts': last_trade.timestamp
}] }]
} }
task_status.started(( task_status.started((init_msgs, first_quote))
init_msgs,
first_quote,
))
feed_is_live.set() feed_is_live.set()
# NOTE XXX, static for now! async for typ, quote in stream:
# => since this only handles ONE mkt feed at a time we topic = quote['symbol']
# don't need a lookup table to map interleaved quotes await send_chan.send({topic: quote})
# from multiple possible mkt-pairs
topic: str = mkt.bs_fqme
# deliver until cancelled
async for typ, ref in from_cf:
match typ:
case 'trade':
trade: cryptofeed.types.Trade = ref
# TODO, re-impl this according to teh ideal
# fqme for opts that we choose!!
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(trade.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'last': trade.price,
'broker_ts': time.time(),
# ^TODO, name this `brokerd/datad_ts` and
# use `time.time_ns()` ??
'ticks': [{
'type': 'trade',
'price': float(trade.price),
'size': float(trade.amount),
'broker_ts': trade.timestamp,
}],
}
log.info(
f'deribit {typ!r} quote for {sym!r}\n\n'
f'{trade}\n\n'
f'{pfmt(piker_quote)}\n'
)
case 'l1':
book: cryptofeed.types.L1Book = ref
# TODO, so this is where we can possibly change things
# and instead lever the `MktPair.bs_fqme: str` output?
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(book.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'ticks': [
{'type': 'bid',
'price': float(book.bid_price),
'size': float(book.bid_size)},
{'type': 'bsize',
'price': float(book.bid_price),
'size': float(book.bid_size),},
{'type': 'ask',
'price': float(book.ask_price),
'size': float(book.ask_size),},
{'type': 'asize',
'price': float(book.ask_price),
'size': float(book.ask_size),}
]
}
await send_chan.send({
topic: piker_quote,
})
@tractor.context @tractor.context
@ -391,21 +174,12 @@ async def open_symbol_search(
async with open_cached_client('deribit') as client: async with open_cached_client('deribit') as client:
# load all symbols locally for fast search # load all symbols locally for fast search
# cache = client._pairs cache = await client.cache_symbols()
await ctx.started() await ctx.started()
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
pattern: str
async for pattern in stream: async for pattern in stream:
# repack in dict form
# NOTE: pattern fuzzy-matching is done within await stream.send(
# the methd impl. await client.search_symbols(pattern))
pairs: dict[str, Pair] = await client.search_symbols(
pattern,
)
# repack in fqme-keyed table
byfqme: dict[str, Pair] = {}
for pair in pairs.values():
byfqme[pair.bs_fqme] = pair
await stream.send(byfqme)

View File

@ -1,196 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Per market data-type definitions and schemas types.
"""
from __future__ import annotations
import pendulum
from typing import (
Literal,
Optional,
)
from decimal import Decimal
from piker.types import Struct
# API endpoint paths by venue / sub-API
_domain: str = 'deribit.com'
_url = f'https://www.{_domain}'
# WEBsocketz
_ws_url: str = f'wss://www.{_domain}/ws/api/v2'
# test nets
_testnet_ws_url: str = f'wss://test.{_domain}/ws/api/v2'
MarketType = Literal[
'option'
]
def get_api_eps(venue: MarketType) -> tuple[str, str]:
'''
Return API ep root paths per venue.
'''
return {
'option': (
_ws_url,
),
}[venue]
class Pair(Struct, frozen=True, kw_only=True):
symbol: str
# src
quote_currency: str # 'BTC'
# dst
base_currency: str # "BTC",
tick_size: float # 0.0001 # [{'above_price': 0.005, 'tick_size': 0.0005}]
tick_size_steps: list[dict[str, float]]
@property
def price_tick(self) -> Decimal:
return Decimal(str(self.tick_size_steps[0]['above_price']))
@property
def size_tick(self) -> Decimal:
return Decimal(str(self.tick_size))
@property
def bs_fqme(self) -> str:
return f'{self.symbol}'
@property
def bs_mktid(self) -> str:
return f'{self.symbol}.{self.venue}'
class OptionPair(Pair, frozen=True):
taker_commission: float # 0.0003
strike: float # 5000.0
settlement_period: str # 'day'
settlement_currency: str # "BTC",
rfq: bool # false
price_index: str # 'btc_usd'
option_type: str # 'call'
min_trade_amount: float # 0.1
maker_commission: float # 0.0003
kind: str # 'option'
is_active: bool # true
instrument_type: str # 'reversed'
instrument_name: str # 'BTC-1SEP24-55000-C'
instrument_id: int # 364671
expiration_timestamp: int # 1725177600000
creation_timestamp: int # 1724918461000
counter_currency: str # 'USD'
contract_size: float # '1.0'
block_trade_tick_size: float # '0.0001'
block_trade_min_trade_amount: int # '25'
block_trade_commission: float # '0.003'
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.deribit:OptionPair'
# TODO, impl this without the MM:SS part of
# the `'THH:MM:SS..'` etc..
@property
def expiry(self) -> str:
iso_date = pendulum.from_timestamp(
self.expiration_timestamp / 1000
).isoformat()
return iso_date
@property
def venue(self) -> str:
return f'{self.instrument_type}_option'
@property
def bs_fqme(self) -> str:
return f'{self.symbol}'
@property
def bs_src_asset(self) -> str:
return f'{self.quote_currency}'
@property
def bs_dst_asset(self) -> str:
return f'{self.symbol}'
PAIRTYPES: dict[MarketType, Pair] = {
'option': OptionPair,
}
class JSONRPCResult(Struct):
id: int
usIn: int
usOut: int
usDiff: int
testnet: bool
jsonrpc: str = '2.0'
error: Optional[dict] = None
result: Optional[list[dict]] = None
class JSONRPCChannel(Struct):
method: str
params: dict
jsonrpc: str = '2.0'
class KLinesResult(Struct):
low: list[float]
cost: list[float]
high: list[float]
open: list[float]
close: list[float]
ticks: list[int]
status: str
volume: list[float]
class Trade(Struct):
iv: float
price: float
amount: float
trade_id: str
contracts: float
direction: str
trade_seq: int
timestamp: int
mark_price: float
index_price: float
tick_direction: int
instrument_name: str
combo_id: Optional[str] = '',
combo_trade_id: Optional[int] = 0,
block_trade_id: Optional[str] = '',
block_trade_leg_count: Optional[int] = 0,
class LastTradesResult(Struct):
trades: list[Trade]
has_more: bool

View File

@ -111,10 +111,6 @@ class KucoinMktPair(Struct, frozen=True):
quoteMaxSize: float quoteMaxSize: float
quoteMinSize: float quoteMinSize: float
symbol: str # our bs_mktid, kucoin's internal id symbol: str # our bs_mktid, kucoin's internal id
feeCategory: int
makerFeeCoefficient: float
takerFeeCoefficient: float
st: bool
class AccountTrade(Struct, frozen=True): class AccountTrade(Struct, frozen=True):
@ -597,7 +593,7 @@ async def get_client() -> AsyncGenerator[Client, None]:
''' '''
async with ( async with (
httpx.AsyncClient( httpx.AsyncClient(
base_url='https://api.kucoin.com/api', base_url=f'https://api.kucoin.com/api',
) as trio_client, ) as trio_client,
): ):
client = Client(httpx_client=trio_client) client = Client(httpx_client=trio_client)
@ -641,7 +637,7 @@ async def open_ping_task(
await trio.sleep((ping_interval - 1000) / 1000) await trio.sleep((ping_interval - 1000) / 1000)
await ws.send_msg({'id': connect_id, 'type': 'ping'}) await ws.send_msg({'id': connect_id, 'type': 'ping'})
log.warning('Starting ping task for kucoin ws connection') log.info('Starting ping task for kucoin ws connection')
n.start_soon(ping_server) n.start_soon(ping_server)
yield yield
@ -653,14 +649,9 @@ async def open_ping_task(
async def get_mkt_info( async def get_mkt_info(
fqme: str, fqme: str,
) -> tuple[ ) -> tuple[MktPair, KucoinMktPair]:
MktPair,
KucoinMktPair,
]:
''' '''
Query for and return both a `piker.accounting.MktPair` and Query for and return a `MktPair` and `KucoinMktPair`.
`KucoinMktPair` from provided `fqme: str`
(fully-qualified-market-endpoint).
''' '''
async with open_cached_client('kucoin') as client: async with open_cached_client('kucoin') as client:
@ -735,8 +726,6 @@ async def stream_quotes(
log.info(f'Starting up quote stream(s) for {symbols}') log.info(f'Starting up quote stream(s) for {symbols}')
for sym_str in symbols: for sym_str in symbols:
mkt: MktPair
pair: KucoinMktPair
mkt, pair = await get_mkt_info(sym_str) mkt, pair = await get_mkt_info(sym_str)
init_msgs.append( init_msgs.append(
FeedInit(mkt_info=mkt) FeedInit(mkt_info=mkt)
@ -744,11 +733,7 @@ async def stream_quotes(
ws: NoBsWs ws: NoBsWs
token, ping_interval = await client._get_ws_token() token, ping_interval = await client._get_ws_token()
log.info('API reported ping_interval: {ping_interval}\n') connect_id = str(uuid4())
connect_id: str = str(uuid4())
typ: str
quote: dict
async with ( async with (
open_autorecon_ws( open_autorecon_ws(
( (
@ -762,37 +747,20 @@ async def stream_quotes(
), ),
) as ws, ) as ws,
open_ping_task(ws, ping_interval, connect_id), open_ping_task(ws, ping_interval, connect_id),
aclosing( aclosing(stream_messages(ws, sym_str)) as msg_gen,
iter_normed_quotes(
ws, sym_str
)
) as iter_quotes,
): ):
typ, quote = await anext(iter_quotes) typ, quote = await anext(msg_gen)
# take care to not unblock here until we get a real while typ != 'trade':
# trade quote? # take care to not unblock here until we get a real
# ^TODO, remove this right? # trade quote
# -[ ] what often blocks chart boot/new-feed switching typ, quote = await anext(msg_gen)
# since we'ere waiting for a live quote instead of just
# loading history afap..
# |_ XXX, not sure if we require a bit of rework to core
# feed init logic or if backends justg gotta be
# changed up.. feel like there was some causality
# dilema prolly only seen with IB too..
# while typ != 'trade':
# typ, quote = await anext(iter_quotes)
task_status.started((init_msgs, quote)) task_status.started((init_msgs, quote))
feed_is_live.set() feed_is_live.set()
# XXX NOTE, DO NOT include the `.<backend>` suffix! async for typ, msg in msg_gen:
# OW the sampling loop will not broadcast correctly.. await send_chan.send({sym_str: msg})
# since `bus._subscribers.setdefault(bs_fqme, set())`
# is used inside `.data.open_feed_bus()` !!!
topic: str = mkt.bs_fqme
async for typ, quote in iter_quotes:
await send_chan.send({topic: quote})
@acm @acm
@ -847,7 +815,7 @@ async def subscribe(
) )
async def iter_normed_quotes( async def stream_messages(
ws: NoBsWs, ws: NoBsWs,
sym: str, sym: str,
@ -878,9 +846,6 @@ async def iter_normed_quotes(
yield 'trade', { yield 'trade', {
'symbol': sym, 'symbol': sym,
# TODO, is 'last' even used elsewhere/a-good
# semantic? can't we just read the ticks with our
# .data.ticktools.frame_ticks()`/
'last': trade_data.price, 'last': trade_data.price,
'brokerd_ts': last_trade_ts, 'brokerd_ts': last_trade_ts,
'ticks': [ 'ticks': [
@ -973,7 +938,7 @@ async def open_history_client(
if end_dt is None: if end_dt is None:
inow = round(time.time()) inow = round(time.time())
log.debug( print(
f'difference in time between load and processing' f'difference in time between load and processing'
f'{inow - times[-1]}' f'{inow - times[-1]}'
) )

View File

@ -653,11 +653,7 @@ class Router(Struct):
flume = feed.flumes[fqme] flume = feed.flumes[fqme]
first_quote: dict = flume.first_quote first_quote: dict = flume.first_quote
book: DarkBook = self.get_dark_book(broker) book: DarkBook = self.get_dark_book(broker)
book.lasts[fqme]: float = float(first_quote['last'])
if not (last := first_quote.get('last')):
last: float = flume.rt_shm.array[-1]['close']
book.lasts[fqme]: float = float(last)
async with self.maybe_open_brokerd_dialog( async with self.maybe_open_brokerd_dialog(
brokermod=brokermod, brokermod=brokermod,
@ -720,7 +716,7 @@ class Router(Struct):
subs = self.subscribers[sub_key] subs = self.subscribers[sub_key]
sent_some: bool = False sent_some: bool = False
for client_stream in subs.copy(): for client_stream in subs:
try: try:
await client_stream.send(msg) await client_stream.send(msg)
sent_some = True sent_some = True
@ -1014,14 +1010,10 @@ async def translate_and_relay_brokerd_events(
status_msg.brokerd_msg = msg status_msg.brokerd_msg = msg
status_msg.src = msg.broker_details['name'] status_msg.src = msg.broker_details['name']
if not status_msg.req: await router.client_broadcast(
# likely some order change state? status_msg.req.symbol,
await tractor.pause() status_msg,
else: )
await router.client_broadcast(
status_msg.req.symbol,
status_msg,
)
if status == 'closed': if status == 'closed':
log.info(f'Execution for {oid} is complete!') log.info(f'Execution for {oid} is complete!')

View File

@ -653,7 +653,6 @@ async def open_trade_dialog(
# in) use manually constructed table from calling # in) use manually constructed table from calling
# the `.get_mkt_info()` provider EP above. # the `.get_mkt_info()` provider EP above.
_mktmap_table=mkt_by_fqme, _mktmap_table=mkt_by_fqme,
only_require=list(mkt_by_fqme),
) )
pp_msgs: list[BrokerdPosition] = [] pp_msgs: list[BrokerdPosition] = []

View File

@ -335,7 +335,7 @@ def services(config, tl, ports):
name='service_query', name='service_query',
loglevel=config['loglevel'] if tl else None, loglevel=config['loglevel'] if tl else None,
), ),
tractor.get_registry( tractor.get_arbiter(
host=host, host=host,
port=ports[0] port=ports[0]
) as portal ) as portal

View File

@ -25,12 +25,10 @@ from collections import (
defaultdict, defaultdict,
) )
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from functools import partial
import time import time
from typing import ( from typing import (
Any, Any,
AsyncIterator, AsyncIterator,
Callable,
TYPE_CHECKING, TYPE_CHECKING,
) )
@ -44,7 +42,7 @@ from tractor.trionics import (
maybe_open_nursery, maybe_open_nursery,
) )
import trio import trio
from trio import TaskStatus from trio_typing import TaskStatus
from .ticktools import ( from .ticktools import (
frame_ticks, frame_ticks,
@ -55,9 +53,6 @@ from ._util import (
get_console_log, get_console_log,
) )
from ..service import maybe_spawn_daemon from ..service import maybe_spawn_daemon
from piker.log import (
mk_repr,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from ._sharedmem import ( from ._sharedmem import (
@ -75,7 +70,6 @@ if TYPE_CHECKING:
_default_delay_s: float = 1.0 _default_delay_s: float = 1.0
# TODO: use new `tractor.singleton_acm` API for this!
class Sampler: class Sampler:
''' '''
Global sampling engine registry. Global sampling engine registry.
@ -85,9 +79,9 @@ class Sampler:
This non-instantiated type is meant to be a singleton within This non-instantiated type is meant to be a singleton within
a `samplerd` actor-service spawned once by the user wishing to a `samplerd` actor-service spawned once by the user wishing to
time-step-sample a (real-time) quote feeds, see time-step-sample (real-time) quote feeds, see
`.service.maybe_open_samplerd()` and the below ``.service.maybe_open_samplerd()`` and the below
`register_with_sampler()`. ``register_with_sampler()``.
''' '''
service_nursery: None | trio.Nursery = None service_nursery: None | trio.Nursery = None
@ -381,10 +375,7 @@ async def register_with_sampler(
assert Sampler.ohlcv_shms assert Sampler.ohlcv_shms
# unblock caller # unblock caller
await ctx.started( await ctx.started(set(Sampler.ohlcv_shms.keys()))
# XXX bc msgpack only allows one array type!
list(Sampler.ohlcv_shms.keys())
)
if open_index_stream: if open_index_stream:
try: try:
@ -428,6 +419,7 @@ async def register_with_sampler(
async def spawn_samplerd( async def spawn_samplerd(
loglevel: str | None = None, loglevel: str | None = None,
**extra_tractor_kwargs **extra_tractor_kwargs
@ -437,10 +429,7 @@ async def spawn_samplerd(
update and increment count write and stream broadcasting. update and increment count write and stream broadcasting.
''' '''
from piker.service import ( from piker.service import Services
get_service_mngr,
ServiceMngr,
)
dname = 'samplerd' dname = 'samplerd'
log.info(f'Spawning `{dname}`') log.info(f'Spawning `{dname}`')
@ -448,33 +437,26 @@ async def spawn_samplerd(
# singleton lock creation of ``samplerd`` since we only ever want # singleton lock creation of ``samplerd`` since we only ever want
# one daemon per ``pikerd`` proc tree. # one daemon per ``pikerd`` proc tree.
# TODO: make this built-into the service api? # TODO: make this built-into the service api?
mngr: ServiceMngr = get_service_mngr() async with Services.locks[dname + '_singleton']:
already_started: bool = dname in mngr.service_tasks
async with mngr._locks[dname + '_singleton']: if dname not in Services.service_tasks:
ctx: Context = await mngr.start_service(
daemon_name=dname, portal = await Services.actor_n.start_actor(
ctx_ep=partial( dname,
enable_modules=[
'piker.data._sampling',
],
loglevel=loglevel,
debug_mode=Services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs
)
await Services.start_service_task(
dname,
portal,
register_with_sampler, register_with_sampler,
period_s=1, period_s=1,
sub_for_broadcasts=False, sub_for_broadcasts=False,
),
debug_mode=mngr.debug_mode, # set by pikerd flag
# proxy-through to tractor
enable_modules=[
'piker.data._sampling',
],
loglevel=loglevel,
**extra_tractor_kwargs
)
if not already_started:
assert (
ctx
and
ctx.portal
and
not ctx.cancel_called
) )
return True return True
@ -579,6 +561,7 @@ async def open_sample_stream(
async def sample_and_broadcast( async def sample_and_broadcast(
bus: _FeedsBus, # noqa bus: _FeedsBus, # noqa
rt_shm: ShmArray, rt_shm: ShmArray,
hist_shm: ShmArray, hist_shm: ShmArray,
@ -599,22 +582,11 @@ async def sample_and_broadcast(
overruns = Counter() overruns = Counter()
# multiline nested `dict` formatter (since rn quote-msgs are
# just that).
pfmt: Callable[[str], str] = mk_repr()
# iterate stream delivered by broker # iterate stream delivered by broker
async for quotes in quote_stream: async for quotes in quote_stream:
# print(quotes)
# XXX WARNING XXX only enable for debugging bc ow can cost # TODO: ``numba`` this!
# ALOT of perf with HF-feedz!!!
#
# log.info(
# 'Rx live quotes:\n'
# f'{pfmt(quotes)}'
# )
# TODO: `numba` this!
for broker_symbol, quote in quotes.items(): for broker_symbol, quote in quotes.items():
# TODO: in theory you can send the IPC msg *before* writing # TODO: in theory you can send the IPC msg *before* writing
# to the sharedmem array to decrease latency, however, that # to the sharedmem array to decrease latency, however, that
@ -687,18 +659,6 @@ async def sample_and_broadcast(
sub_key: str = broker_symbol.lower() sub_key: str = broker_symbol.lower()
subs: set[Sub] = bus.get_subs(sub_key) subs: set[Sub] = bus.get_subs(sub_key)
if not subs:
all_bs_fqmes: list[str] = list(
bus._subscribers.keys()
)
log.warning(
f'No subscribers for {brokername!r} live-quote ??\n'
f'broker_symbol: {broker_symbol}\n\n'
f'Maybe the backend-sys symbol does not match one of,\n'
f'{pfmt(all_bs_fqmes)}\n'
)
# NOTE: by default the broker backend doesn't append # NOTE: by default the broker backend doesn't append
# it's own "name" into the fqme schema (but maybe it # it's own "name" into the fqme schema (but maybe it
# should?) so we have to manually generate the correct # should?) so we have to manually generate the correct
@ -929,7 +889,6 @@ async def uniform_rate_send(
# to consumers which crash or lose network connection. # to consumers which crash or lose network connection.
# I.e. we **DO NOT** want to crash and propagate up to # I.e. we **DO NOT** want to crash and propagate up to
# ``pikerd`` these kinds of errors! # ``pikerd`` these kinds of errors!
trio.EndOfChannel,
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
ConnectionResetError, ConnectionResetError,

View File

@ -273,7 +273,7 @@ async def _reconnect_forever(
nobsws._connected.set() nobsws._connected.set()
await trio.sleep_forever() await trio.sleep_forever()
except HandshakeError: except HandshakeError:
log.exception('Retrying connection') log.exception(f'Retrying connection')
# ws & nursery block ends # ws & nursery block ends
@ -359,8 +359,8 @@ async def open_autorecon_ws(
''' '''
JSONRPC response-request style machinery for transparent multiplexing JSONRPC response-request style machinery for transparent multiplexing of msgs
of msgs over a `NoBsWs`. over a NoBsWs.
''' '''
@ -377,77 +377,43 @@ async def open_jsonrpc_session(
url: str, url: str,
start_id: int = 0, start_id: int = 0,
response_type: type = JSONRPCResult, response_type: type = JSONRPCResult,
msg_recv_timeout: float = float('inf'), request_type: Optional[type] = None,
# ^NOTE, since only `deribit` is using this jsonrpc stuff atm request_hook: Optional[Callable] = None,
# and options mkts are generally "slow moving".. error_hook: Optional[Callable] = None,
#
# FURTHER if we break the underlying ws connection then since we
# don't pass a `fixture` to the task that manages `NoBsWs`, i.e.
# `_reconnect_forever()`, the jsonrpc "transport pipe" get's
# broken and never restored with wtv init sequence is required to
# re-establish a working req-resp session.
# request_type: Optional[type] = None,
# request_hook: Optional[Callable] = None,
# error_hook: Optional[Callable] = None,
) -> Callable[[str, dict], dict]: ) -> Callable[[str, dict], dict]:
# NOTE, store all request msgs so we can raise errors on the
# caller side!
req_msgs: dict[int, dict] = {}
async with ( async with (
trio.open_nursery() as n, trio.open_nursery() as n,
open_autorecon_ws( open_autorecon_ws(url) as ws
url=url,
msg_recv_timeout=msg_recv_timeout,
) as ws
): ):
rpc_id: Iterable[int] = count(start_id) rpc_id: Iterable = count(start_id)
rpc_results: dict[int, dict] = {} rpc_results: dict[int, dict] = {}
async def json_rpc( async def json_rpc(method: str, params: dict) -> dict:
method: str,
params: dict,
) -> dict:
''' '''
perform a json rpc call and wait for the result, raise exception in perform a json rpc call and wait for the result, raise exception in
case of error field present on response case of error field present on response
''' '''
nonlocal req_msgs
req_id: int = next(rpc_id)
msg = { msg = {
'jsonrpc': '2.0', 'jsonrpc': '2.0',
'id': req_id, 'id': next(rpc_id),
'method': method, 'method': method,
'params': params 'params': params
} }
_id = msg['id'] _id = msg['id']
result = rpc_results[_id] = { rpc_results[_id] = {
'result': None, 'result': None,
'error': None, 'event': trio.Event()
'event': trio.Event(), # signal caller resp arrived
} }
req_msgs[_id] = msg
await ws.send_msg(msg) await ws.send_msg(msg)
# wait for reponse before unblocking requester code
await rpc_results[_id]['event'].wait() await rpc_results[_id]['event'].wait()
if (maybe_result := result['result']): ret = rpc_results[_id]['result']
ret = maybe_result
del rpc_results[_id]
else: del rpc_results[_id]
err = result['error']
raise Exception(
f'JSONRPC request failed\n'
f'req: {msg}\n'
f'resp: {err}\n'
)
if ret.error is not None: if ret.error is not None:
raise Exception(json.dumps(ret.error, indent=4)) raise Exception(json.dumps(ret.error, indent=4))
@ -462,7 +428,6 @@ async def open_jsonrpc_session(
the server side. the server side.
''' '''
nonlocal req_msgs
async for msg in ws: async for msg in ws:
match msg: match msg:
case { case {
@ -486,29 +451,15 @@ async def open_jsonrpc_session(
'params': _, 'params': _,
}: }:
log.debug(f'Recieved\n{msg}') log.debug(f'Recieved\n{msg}')
# if request_hook: if request_hook:
# await request_hook(request_type(**msg)) await request_hook(request_type(**msg))
case { case {
'error': error 'error': error
}: }:
# if error_hook: log.warning(f'Recieved\n{error}')
# await error_hook(response_type(**msg)) if error_hook:
await error_hook(response_type(**msg))
# retreive orig request msg, set error
# response in original "result" msg,
# THEN FINALLY set the event to signal caller
# to raise the error in the parent task.
req_id: int = error['id']
req_msg: dict = req_msgs[req_id]
result: dict = rpc_results[req_id]
result['error'] = error
result['event'].set()
log.error(
f'JSONRPC request failed\n'
f'req: {req_msg}\n'
f'resp: {error}\n'
)
case _: case _:
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')

View File

@ -540,10 +540,7 @@ async def open_feed_bus(
# subscription since the backend isn't (yet) expected to # subscription since the backend isn't (yet) expected to
# append it's own name to the fqme, so we filter on keys # append it's own name to the fqme, so we filter on keys
# which *do not* include that name (e.g .ib) . # which *do not* include that name (e.g .ib) .
bus._subscribers.setdefault( bus._subscribers.setdefault(bs_fqme, set())
bs_fqme,
set(),
)
# sync feed subscribers with flume handles # sync feed subscribers with flume handles
await ctx.started( await ctx.started(

View File

@ -18,11 +18,7 @@
Log like a forester! Log like a forester!
""" """
import logging import logging
import reprlib
import json import json
from typing import (
Callable,
)
import tractor import tractor
from pygments import ( from pygments import (
@ -88,27 +84,3 @@ def colorize_json(
# likeable styles: algol_nu, tango, monokai # likeable styles: algol_nu, tango, monokai
formatters.TerminalTrueColorFormatter(style=style) formatters.TerminalTrueColorFormatter(style=style)
) )
def mk_repr(
**repr_kws,
) -> Callable[[str], str]:
'''
Allocate and deliver a `repr.Repr` instance with provided input
settings using the std-lib's `reprlib` mod,
* https://docs.python.org/3/library/reprlib.html
------ Ex. ------
An up to 6-layer-nested `dict` as multi-line:
- https://stackoverflow.com/a/79102479
- https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
'''
def_kws: dict[str, int] = dict(
indent=2,
maxlevel=6, # recursion levels
maxstring=66, # match editor line-len limit
)
def_kws |= repr_kws
reprr = reprlib.Repr(**def_kws)
return reprr.repr

View File

@ -30,11 +30,7 @@ Actor runtime primtives and (distributed) service APIs for,
=> TODO: maybe to (re)move elsewhere? => TODO: maybe to (re)move elsewhere?
''' '''
from ._mngr import ( from ._mngr import Services as Services
get_service_mngr as get_service_mngr,
open_service_mngr as open_service_mngr,
ServiceMngr as ServiceMngr,
)
from ._registry import ( from ._registry import (
_tractor_kwargs as _tractor_kwargs, _tractor_kwargs as _tractor_kwargs,
_default_reg_addr as _default_reg_addr, _default_reg_addr as _default_reg_addr,

View File

@ -21,6 +21,7 @@
from __future__ import annotations from __future__ import annotations
import os import os
from typing import ( from typing import (
Optional,
Any, Any,
ClassVar, ClassVar,
) )
@ -29,13 +30,13 @@ from contextlib import (
) )
import tractor import tractor
import trio
from ._util import ( from ._util import (
get_console_log, get_console_log,
) )
from ._mngr import ( from ._mngr import (
open_service_mngr, Services,
ServiceMngr,
) )
from ._registry import ( # noqa from ._registry import ( # noqa
_tractor_kwargs, _tractor_kwargs,
@ -58,7 +59,7 @@ async def open_piker_runtime(
registry_addrs: list[tuple[str, int]] = [], registry_addrs: list[tuple[str, int]] = [],
enable_modules: list[str] = [], enable_modules: list[str] = [],
loglevel: str|None = None, loglevel: Optional[str] = None,
# XXX NOTE XXX: you should pretty much never want debug mode # XXX NOTE XXX: you should pretty much never want debug mode
# for data daemons when running in production. # for data daemons when running in production.
@ -68,7 +69,7 @@ async def open_piker_runtime(
# and spawn the service tree distributed per that. # and spawn the service tree distributed per that.
start_method: str = 'trio', start_method: str = 'trio',
tractor_runtime_overrides: dict|None = None, tractor_runtime_overrides: dict | None = None,
**tractor_kwargs, **tractor_kwargs,
) -> tuple[ ) -> tuple[
@ -118,10 +119,6 @@ async def open_piker_runtime(
# spawn other specialized daemons I think? # spawn other specialized daemons I think?
enable_modules=enable_modules, enable_modules=enable_modules,
# TODO: how to configure this?
# keep it on by default if debug mode is set?
# maybe_enable_greenback=debug_mode,
**tractor_kwargs, **tractor_kwargs,
) as actor, ) as actor,
@ -170,13 +167,12 @@ async def open_pikerd(
**kwargs, **kwargs,
) -> ServiceMngr: ) -> Services:
''' '''
Start a root piker daemon actor (aka `pikerd`) with an indefinite Start a root piker daemon with an indefinite lifetime.
lifetime.
A root actor-nursery is created which can be used to spawn and A root actor nursery is created which can be used to create and keep
supervise underling service sub-actors (see below). alive underling services (see below).
''' '''
# NOTE: for the root daemon we always enable the root # NOTE: for the root daemon we always enable the root
@ -203,6 +199,8 @@ async def open_pikerd(
root_actor, root_actor,
reg_addrs, reg_addrs,
), ),
tractor.open_nursery() as actor_nursery,
trio.open_nursery() as service_nursery,
): ):
for addr in reg_addrs: for addr in reg_addrs:
if addr not in root_actor.accept_addrs: if addr not in root_actor.accept_addrs:
@ -211,17 +209,25 @@ async def open_pikerd(
'Maybe you have another daemon already running?' 'Maybe you have another daemon already running?'
) )
mngr: ServiceMngr # assign globally for future daemon/task creation
async with open_service_mngr( Services.actor_n = actor_nursery
debug_mode=debug_mode, Services.service_n = service_nursery
) as mngr: Services.debug_mode = debug_mode
yield mngr
try:
yield Services
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in Services.service_tasks:
# await Services.cancel_service('samplerd')
service_nursery.cancel_scope.cancel()
# TODO: do we even need this? # TODO: do we even need this?
# @acm # @acm
# async def maybe_open_runtime( # async def maybe_open_runtime(
# loglevel: str|None = None, # loglevel: Optional[str] = None,
# **kwargs, # **kwargs,
# ) -> None: # ) -> None:
@ -250,7 +256,7 @@ async def maybe_open_pikerd(
loglevel: str | None = None, loglevel: str | None = None,
**kwargs, **kwargs,
) -> tractor._portal.Portal | ClassVar[ServiceMngr]: ) -> tractor._portal.Portal | ClassVar[Services]:
''' '''
If no ``pikerd`` daemon-root-actor can be found start it and If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self yield up (we should probably figure out returning a portal to self

View File

@ -49,7 +49,7 @@ from requests.exceptions import (
ReadTimeout, ReadTimeout,
) )
from ._mngr import ServiceMngr from ._mngr import Services
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
get_console_log, get_console_log,
@ -453,7 +453,7 @@ async def open_ahabd(
@acm @acm
async def start_ahab_service( async def start_ahab_service(
services: ServiceMngr, services: Services,
service_name: str, service_name: str,
# endpoint config passed as **kwargs # endpoint config passed as **kwargs
@ -549,8 +549,7 @@ async def start_ahab_service(
log.warning('Failed to cancel root permsed container') log.warning('Failed to cancel root permsed container')
except ( except (
# trio.MultiError, trio.MultiError,
ExceptionGroup,
) as err: ) as err:
for subexc in err.exceptions: for subexc in err.exceptions:
if isinstance(subexc, PermissionError): if isinstance(subexc, PermissionError):

View File

@ -26,17 +26,14 @@ from typing import (
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from collections import defaultdict
import tractor import tractor
import trio
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
) )
from ._mngr import ( from ._mngr import (
get_service_mngr, Services,
ServiceMngr,
) )
from ._actor_runtime import maybe_open_pikerd from ._actor_runtime import maybe_open_pikerd
from ._registry import find_service from ._registry import find_service
@ -44,14 +41,15 @@ from ._registry import find_service
@acm @acm
async def maybe_spawn_daemon( async def maybe_spawn_daemon(
service_name: str, service_name: str,
service_task_target: Callable, service_task_target: Callable,
spawn_args: dict[str, Any], spawn_args: dict[str, Any],
loglevel: str | None = None, loglevel: str | None = None,
singleton: bool = False, singleton: bool = False,
_locks = defaultdict(trio.Lock),
**pikerd_kwargs, **pikerd_kwargs,
) -> tractor.Portal: ) -> tractor.Portal:
@ -69,7 +67,7 @@ async def maybe_spawn_daemon(
''' '''
# serialize access to this section to avoid # serialize access to this section to avoid
# 2 or more tasks racing to create a daemon # 2 or more tasks racing to create a daemon
lock = _locks[service_name] lock = Services.locks[service_name]
await lock.acquire() await lock.acquire()
async with find_service( async with find_service(
@ -134,65 +132,7 @@ async def maybe_spawn_daemon(
async with tractor.wait_for_actor(service_name) as portal: async with tractor.wait_for_actor(service_name) as portal:
lock.release() lock.release()
yield portal yield portal
# --- ---- --- await portal.cancel_actor()
# XXX NOTE XXX
# --- ---- ---
# DO NOT PUT A `portal.cancel_actor()` here (as was prior)!
#
# Doing so will cause an "out-of-band" ctxc
# (`tractor.ContextCancelled`) to be raised inside the
# `ServiceMngr.open_context_in_task()`'s call to
# `ctx.wait_for_result()` AND the internal self-ctxc
# "graceful capture" WILL NOT CATCH IT!
#
# This can cause certain types of operations to raise
# that ctxc BEFORE THEY `return`, resulting in
# a "false-negative" ctxc being raised when really
# nothing actually failed, other then our semantic
# "failure" to suppress an expected, graceful,
# self-cancel scenario..
#
# bUt wHy duZ It WorK lIKe dis..
# ------------------------------
# from the perspective of the `tractor.Context` this
# cancel request was conducted "out of band" since
# `Context.cancel()` was never called and thus the
# `._cancel_called: bool` was never set. Despite the
# remote `.canceller` being set to `pikerd` (i.e. the
# same `Actor.uid` of the raising service-mngr task) the
# service-task's ctx itself was never marked as having
# requested cancellation and thus still raises the ctxc
# bc it was unaware of any such request.
#
# How to make grokin these cases easier tho?
# ------------------------------------------
# Because `Portal.cancel_actor()` was called it requests
# "full-`Actor`-runtime-cancellation" of it's peer
# process which IS NOT THE SAME as a single inter-actor
# RPC task cancelling its local context with a remote
# peer `Task` in that same peer process.
#
# ?TODO? It might be better if we do one (or all) of the
# following:
#
# -[ ] at least set a special message for the
# `ContextCancelled` when raised locally by the
# unaware ctx task such that we check for the
# `.canceller` being *our `Actor`* and in the case
# where `Context._cancel_called == False` we specially
# note that this is likely an "out-of-band"
# runtime-cancel request triggered by some call to
# `Portal.cancel_actor()`, possibly even reporting the
# exact LOC of that caller by tracking it inside our
# portal-type?
# -[ ] possibly add another field `ContextCancelled` like
# maybe a,
# `.request_type: Literal['os', 'proc', 'actor',
# 'ctx']` type thing which would allow immediately
# being able to tell what kind of cancellation caused
# the unexpected ctxc?
# -[ ] REMOVE THIS COMMENT, once we've settled on how to
# better augment `tractor` to be more explicit on this!
async def spawn_emsd( async def spawn_emsd(
@ -207,22 +147,21 @@ async def spawn_emsd(
""" """
log.info('Spawning emsd') log.info('Spawning emsd')
smngr: ServiceMngr = get_service_mngr() portal = await Services.actor_n.start_actor(
portal = await smngr.actor_n.start_actor(
'emsd', 'emsd',
enable_modules=[ enable_modules=[
'piker.clearing._ems', 'piker.clearing._ems',
'piker.clearing._client', 'piker.clearing._client',
], ],
loglevel=loglevel, loglevel=loglevel,
debug_mode=smngr.debug_mode, # set by pikerd flag debug_mode=Services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs **extra_tractor_kwargs
) )
# non-blocking setup of clearing service # non-blocking setup of clearing service
from ..clearing._ems import _setup_persistent_emsd from ..clearing._ems import _setup_persistent_emsd
await smngr.start_service_task( await Services.start_service_task(
'emsd', 'emsd',
portal, portal,

View File

@ -18,29 +18,16 @@
daemon-service management API. daemon-service management API.
""" """
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
# contextmanager as cm,
)
from collections import defaultdict from collections import defaultdict
from dataclasses import (
dataclass,
field,
)
import functools
import inspect
from typing import ( from typing import (
Callable, Callable,
Any, Any,
) )
import msgspec
import tractor
import trio import trio
from trio import TaskStatus from trio_typing import TaskStatus
import tractor
from tractor import ( from tractor import (
ActorNursery,
current_actor, current_actor,
ContextCancelled, ContextCancelled,
Context, Context,
@ -52,130 +39,6 @@ from ._util import (
) )
# TODO: implement a singleton deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# TODO: singleton factory API instead of a class API
@acm
async def open_service_mngr(
*,
debug_mode: bool = False,
# impl deat which ensures a single global instance
_singleton: list[ServiceMngr|None] = [None],
**init_kwargs,
) -> ServiceMngr:
'''
Open a multi-subactor-as-service-daemon tree supervisor.
The delivered `ServiceMngr` is a singleton instance for each
actor-process and is allocated on first open and never
de-allocated unless explicitly deleted by al call to
`del_service_mngr()`.
'''
# TODO: factor this an allocation into
# a `._mngr.open_service_mngr()` and put in the
# once-n-only-once setup/`.__aenter__()` part!
# -[ ] how to make this only happen on the `mngr == None` case?
# |_ use `.trionics.maybe_open_context()` (for generic
# async-with-style-only-once of the factory impl, though
# what do we do for the allocation case?
# / `.maybe_open_nursery()` (since for this specific case
# it's simpler?) to activate
async with (
tractor.open_nursery() as an,
trio.open_nursery() as tn,
):
# impl specific obvi..
init_kwargs.update({
'actor_n': an,
'service_n': tn,
})
mngr: ServiceMngr|None
if (mngr := _singleton[0]) is None:
log.info('Allocating a new service mngr!')
mngr = _singleton[0] = ServiceMngr(**init_kwargs)
# TODO: put into `.__aenter__()` section of
# eventual `@singleton_acm` API wrapper.
#
# assign globally for future daemon/task creation
mngr.actor_n = an
mngr.service_n = tn
else:
assert (
mngr.actor_n
and
mngr.service_tn
)
log.info(
'Using extant service mngr!\n\n'
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
)
try:
# NOTE: this is a singleton factory impl specific detail
# which should be supported in the condensed
# `@singleton_acm` API?
mngr.debug_mode = debug_mode
yield mngr
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in mngr.service_tasks:
# await mngr.cancel_service('samplerd')
tn.cancel_scope.cancel()
def get_service_mngr() -> ServiceMngr:
'''
Try to get the singleton service-mngr for this actor presuming it
has already been allocated using,
.. code:: python
async with open_<@singleton_acm(func)>() as mngr`
... this block kept open ...
If not yet allocated raise a `ServiceError`.
'''
# https://stackoverflow.com/a/12627202
# https://docs.python.org/3/library/inspect.html#inspect.Signature
maybe_mngr: ServiceMngr|None = inspect.signature(
open_service_mngr
).parameters['_singleton'].default[0]
if maybe_mngr is None:
raise RuntimeError(
'Someone must allocate a `ServiceMngr` using\n\n'
'`async with open_service_mngr()` beforehand!!\n'
)
return maybe_mngr
# TODO: we need remote wrapping and a general soln: # TODO: we need remote wrapping and a general soln:
# - factor this into a ``tractor.highlevel`` extension # pack for the # - factor this into a ``tractor.highlevel`` extension # pack for the
# library. # library.
@ -183,46 +46,31 @@ def get_service_mngr() -> ServiceMngr:
# to the pikerd actor for starting services remotely! # to the pikerd actor for starting services remotely!
# - prolly rename this to ActorServicesNursery since it spawns # - prolly rename this to ActorServicesNursery since it spawns
# new actors and supervises them to completion? # new actors and supervises them to completion?
@dataclass class Services:
class ServiceMngr:
# class ServiceMngr(msgspec.Struct):
'''
A multi-subactor-as-service manager.
Spawn, supervise and monitor service/daemon subactors in a SC actor_n: tractor._supervise.ActorNursery
process tree.
'''
actor_n: ActorNursery
service_n: trio.Nursery service_n: trio.Nursery
debug_mode: bool = False # tractor sub-actor debug mode flag debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[ service_tasks: dict[
str, str,
tuple[ tuple[
trio.CancelScope, trio.CancelScope,
Context,
Portal, Portal,
trio.Event, trio.Event,
] ]
] = field(default_factory=dict) ] = {}
locks = defaultdict(trio.Lock)
# internal per-service task mutexs
_locks = defaultdict(trio.Lock)
@classmethod
async def start_service_task( async def start_service_task(
self, self,
name: str, name: str,
portal: Portal, portal: Portal,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
target: Callable, target: Callable,
allow_overruns: bool = False, allow_overruns: bool = False,
**ctx_kwargs, **ctx_kwargs,
) -> (trio.CancelScope, Context, Any): ) -> (trio.CancelScope, Context):
''' '''
Open a context in a service sub-actor, add to a stack Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` teardown. that gets unwound at ``pikerd`` teardown.
@ -235,7 +83,6 @@ class ServiceMngr:
task_status: TaskStatus[ task_status: TaskStatus[
tuple[ tuple[
trio.CancelScope, trio.CancelScope,
Context,
trio.Event, trio.Event,
Any, Any,
] ]
@ -243,87 +90,64 @@ class ServiceMngr:
) -> Any: ) -> Any:
# TODO: use the ctx._scope directly here instead?
# -[ ] actually what semantics do we expect for this
# usage!?
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
try:
async with portal.open_context(
target,
allow_overruns=allow_overruns,
**ctx_kwargs,
) as (ctx, started): async with portal.open_context(
target,
allow_overruns=allow_overruns,
**ctx_kwargs,
# unblock once the remote context has started ) as (ctx, first):
complete = trio.Event()
task_status.started(( # unblock once the remote context has started
cs, complete = trio.Event()
ctx, task_status.started((cs, complete, first))
complete, log.info(
started, f'`pikerd` service {name} started with value {first}'
)) )
log.info( try:
f'`pikerd` service {name} started with value {started}'
)
# wait on any context's return value # wait on any context's return value
# and any final portal result from the # and any final portal result from the
# sub-actor. # sub-actor.
ctx_res: Any = await ctx.wait_for_result() ctx_res: Any = await ctx.result()
# NOTE: blocks indefinitely until cancelled # NOTE: blocks indefinitely until cancelled
# either by error from the target context # either by error from the target context
# function or by being cancelled here by the # function or by being cancelled here by the
# surrounding cancel scope. # surrounding cancel scope.
return ( return (await portal.result(), ctx_res)
await portal.wait_for_result(), except ContextCancelled as ctxe:
ctx_res, canceller: tuple[str, str] = ctxe.canceller
) our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.channel.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service {name} was remotely cancelled?\n'
f'remote canceller: {canceller}\n'
f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n'
)
else:
raise
except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.chan.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service `{name}` was remotely cancelled by a peer?\n'
# TODO: this would be a good spot to use
# a respawn feature Bo
f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n'
f'cancellee: {portal.chan.uid}\n' finally:
f'canceller: {canceller}\n' await portal.cancel_actor()
) complete.set()
else: self.service_tasks.pop(name)
raise
finally: cs, complete, first = await self.service_n.start(open_context_in_task)
# NOTE: the ctx MUST be cancelled first if we
# don't want the above `ctx.wait_for_result()` to
# raise a self-ctxc. WHY, well since from the ctx's
# perspective the cancel request will have
# arrived out-out-of-band at the `Actor.cancel()`
# level, thus `Context.cancel_called == False`,
# meaning `ctx._is_self_cancelled() == False`.
# with trio.CancelScope(shield=True):
# await ctx.cancel()
await portal.cancel_actor()
complete.set()
self.service_tasks.pop(name)
cs, sub_ctx, complete, started = await self.service_n.start(
open_context_in_task
)
# store the cancel scope and portal for later cancellation or # store the cancel scope and portal for later cancellation or
# retstart if needed. # retstart if needed.
self.service_tasks[name] = (cs, sub_ctx, portal, complete) self.service_tasks[name] = (cs, portal, complete)
return cs, sub_ctx, started
return cs, first
@classmethod
async def cancel_service( async def cancel_service(
self, self,
name: str, name: str,
@ -334,80 +158,8 @@ class ServiceMngr:
''' '''
log.info(f'Cancelling `pikerd` service {name}') log.info(f'Cancelling `pikerd` service {name}')
cs, sub_ctx, portal, complete = self.service_tasks[name] cs, portal, complete = self.service_tasks[name]
cs.cancel()
# cs.cancel()
await sub_ctx.cancel()
await complete.wait() await complete.wait()
assert name not in self.service_tasks, \
if name in self.service_tasks: f'Serice task for {name} not terminated?'
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Serice task for {name} not terminated?'
)
# assert name not in self.service_tasks, \
# f'Serice task for {name} not terminated?'
async def start_service(
self,
daemon_name: str,
ctx_ep: Callable, # kwargs must `partial`-ed in!
debug_mode: bool = False,
**tractor_actor_kwargs,
) -> Context:
'''
Start a "service" task in a new sub-actor (daemon) and manage it's lifetime
indefinitely.
Services can be cancelled/shutdown using `.cancel_service()`.
'''
entry: tuple|None = self.service_tasks.get(daemon_name)
if entry:
(cs, sub_ctx, portal, complete) = entry
return sub_ctx
if daemon_name not in self.service_tasks:
portal = await self.actor_n.start_actor(
daemon_name,
debug_mode=( # maybe set globally during allocate
debug_mode
or
self.debug_mode
),
**tractor_actor_kwargs,
)
ctx_kwargs: dict[str, Any] = {}
if isinstance(ctx_ep, functools.partial):
ctx_kwargs: dict[str, Any] = ctx_ep.keywords
ctx_ep: Callable = ctx_ep.func
(cs, sub_ctx, started) = await self.start_service_task(
daemon_name,
portal,
ctx_ep,
**ctx_kwargs,
)
return sub_ctx
# TODO:
# -[ ] factor all the common shit from `.data._sampling`
# and `.brokers._daemon` into here / `ServiceMngr`
# in terms of allocating the `Portal` as part of the
# "service-in-subactor" starting!
# -[ ] move to `tractor.hilevel._service`, import and use here!
# NOTE: purposely leaks the ref to the mod-scope Bo
# import tractor
# from tractor.hilevel import (
# open_service_mngr,
# ServiceMngr,
# )
# mngr: ServiceMngr|None = None
# with tractor.hilevel.open_service_mngr() as mngr:
# Services = proxy(mngr)

View File

@ -21,13 +21,11 @@ from typing import (
TYPE_CHECKING, TYPE_CHECKING,
) )
# TODO: oof, needs to be changed to `httpx`!
import asks import asks
if TYPE_CHECKING: if TYPE_CHECKING:
import docker import docker
from ._ahab import DockerContainer from ._ahab import DockerContainer
from . import ServiceMngr
from ._util import log # sub-sys logger from ._util import log # sub-sys logger
from ._util import ( from ._util import (
@ -129,7 +127,7 @@ def start_elasticsearch(
@acm @acm
async def start_ahab_daemon( async def start_ahab_daemon(
service_mngr: ServiceMngr, service_mngr: Services,
user_config: dict | None = None, user_config: dict | None = None,
loglevel: str | None = None, loglevel: str | None = None,

View File

@ -53,7 +53,7 @@ import pendulum
# import purerpc # import purerpc
from ..data.feed import maybe_open_feed from ..data.feed import maybe_open_feed
from . import ServiceMngr from . import Services
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
get_console_log, get_console_log,
@ -233,7 +233,7 @@ def start_marketstore(
@acm @acm
async def start_ahab_daemon( async def start_ahab_daemon(
service_mngr: ServiceMngr, service_mngr: Services,
user_config: dict | None = None, user_config: dict | None = None,
loglevel: str | None = None, loglevel: str | None = None,

View File

@ -161,12 +161,7 @@ class NativeStorageClient:
def index_files(self): def index_files(self):
for path in self._datadir.iterdir(): for path in self._datadir.iterdir():
if ( if path.name in {'borked', 'expired',}:
path.name in {'borked', 'expired',}
or
'.parquet' not in str(path)
):
# ignore all non-apache files (for now)
continue continue
key: str = path.name.rstrip('.parquet') key: str = path.name.rstrip('.parquet')

View File

@ -44,10 +44,8 @@ import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from pendulum import ( from pendulum import (
Interval,
DateTime, DateTime,
Duration, Duration,
duration as mk_duration,
from_timestamp, from_timestamp,
) )
import numpy as np import numpy as np
@ -216,8 +214,7 @@ async def maybe_fill_null_segments(
# pair, immediately stop backfilling? # pair, immediately stop backfilling?
if ( if (
start_dt start_dt
and and end_dt < start_dt
end_dt < start_dt
): ):
await tractor.pause() await tractor.pause()
break break
@ -265,7 +262,6 @@ async def maybe_fill_null_segments(
except tractor.ContextCancelled: except tractor.ContextCancelled:
# log.exception # log.exception
await tractor.pause() await tractor.pause()
raise
null_segs_detected.set() null_segs_detected.set()
# RECHECK for more null-gaps # RECHECK for more null-gaps
@ -353,7 +349,7 @@ async def maybe_fill_null_segments(
async def start_backfill( async def start_backfill(
get_hist, get_hist,
def_frame_duration: Duration, frame_types: dict[str, Duration] | None,
mod: ModuleType, mod: ModuleType,
mkt: MktPair, mkt: MktPair,
shm: ShmArray, shm: ShmArray,
@ -383,23 +379,22 @@ async def start_backfill(
update_start_on_prepend: bool = False update_start_on_prepend: bool = False
if backfill_until_dt is None: if backfill_until_dt is None:
# TODO: per-provider default history-durations? # TODO: drop this right and just expose the backfill
# -[ ] inside the `open_history_client()` config allow # limits inside a [storage] section in conf.toml?
# declaring the history duration limits instead of # when no tsdb "last datum" is provided, we just load
# guessing and/or applying the same limits to all? # some near-term history.
# # periods = {
# -[ ] allow declaring (default) per-provider backfill # 1: {'days': 1},
# limits inside a [storage] sub-section in conf.toml? # 60: {'days': 14},
# # }
# NOTE, when no tsdb "last datum" is provided, we just
# load some near-term history by presuming a "decently # do a decently sized backfill and load it into storage.
# large" 60s duration limit and a much shorter 1s range.
periods = { periods = {
1: {'days': 2}, 1: {'days': 2},
60: {'years': 6}, 60: {'years': 6},
} }
period_duration: int = periods[timeframe] period_duration: int = periods[timeframe]
update_start_on_prepend: bool = True update_start_on_prepend = True
# NOTE: manually set the "latest" datetime which we intend to # NOTE: manually set the "latest" datetime which we intend to
# backfill history "until" so as to adhere to the history # backfill history "until" so as to adhere to the history
@ -421,6 +416,7 @@ async def start_backfill(
f'backfill_until_dt: {backfill_until_dt}\n' f'backfill_until_dt: {backfill_until_dt}\n'
f'last_start_dt: {last_start_dt}\n' f'last_start_dt: {last_start_dt}\n'
) )
try: try:
( (
array, array,
@ -430,114 +426,71 @@ async def start_backfill(
timeframe, timeframe,
end_dt=last_start_dt, end_dt=last_start_dt,
) )
except NoData as _daterr: except NoData as _daterr:
orig_last_start_dt: datetime = last_start_dt # 3 cases:
gap_report: str = ( # - frame in the middle of a legit venue gap
f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n' # - history actually began at the `last_start_dt`
f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n' # - some other unknown error (ib blocking the
f'last_start_dt: {orig_last_start_dt}\n\n' # history bc they don't want you seeing how they
f'bf_until: {backfill_until_dt}\n' # cucked all the tinas..)
) if dur := frame_types.get(timeframe):
# EMPTY FRAME signal with 3 (likely) causes: # decrement by a frame's worth of duration and
# # retry a few times.
# 1. range contains legit gap in venue history last_start_dt.subtract(
# 2. history actually (edge case) **began** at the seconds=dur.total_seconds()
# value `last_start_dt`
# 3. some other unknown error (ib blocking the
# history-query bc they don't want you seeing how
# they cucked all the tinas.. like with options
# hist)
#
if def_frame_duration:
# decrement by a duration's (frame) worth of time
# as maybe indicated by the backend to see if we
# can get older data before this possible
# "history gap".
last_start_dt: datetime = last_start_dt.subtract(
seconds=def_frame_duration.total_seconds()
) )
gap_report += ( log.warning(
f'Decrementing `end_dt` and retrying with,\n' f'{mod.name} -> EMPTY FRAME for end_dt?\n'
f'def_frame_duration: {def_frame_duration}\n' f'tf@fqme: {timeframe}@{mkt.fqme}\n'
f'(new) last_start_dt: {last_start_dt}\n' 'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
f'Decrementing `end_dt` by {dur} and retry..\n'
) )
log.warning(gap_report)
# skip writing to shm/tsdb and try the next
# duration's worth of prior history.
continue continue
else:
# await tractor.pause()
raise DataUnavailable(gap_report)
# broker says there never was or is no more history to pull # broker says there never was or is no more history to pull
except DataUnavailable as due: except DataUnavailable:
message: str = due.args[0]
log.warning( log.warning(
f'Provider {mod.name!r} halted backfill due to,\n\n' f'NO-MORE-DATA in range?\n'
f'`{mod.name}` halted history:\n'
f'{message}\n' f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'fqme: {mkt.fqme}\n' f'{backfill_until_dt} <- {last_start_dt}\n'
f'timeframe: {timeframe}\n'
f'last_start_dt: {last_start_dt}\n'
f'bf_until: {backfill_until_dt}\n'
) )
# UGH: what's a better way?
# TODO: backends are responsible for being correct on # ugh, what's a better way?
# this right!? # TODO: fwiw, we probably want a way to signal a throttle
# -[ ] in the `ib` case we could maybe offer some way # condition (eg. with ib) so that we can halt the
# to halt the request loop until the condition is # request loop until the condition is resolved?
# resolved or should the backend be entirely in if timeframe > 1:
# charge of solving such faults? yes, right? await tractor.pause()
return return
time: np.ndarray = array['time']
assert ( assert (
time[0] array['time'][0]
== ==
next_start_dt.timestamp() next_start_dt.timestamp()
) )
assert time[-1] == next_end_dt.timestamp() diff = last_start_dt - next_start_dt
frame_time_diff_s = diff.seconds
expected_dur: Interval = last_start_dt - next_start_dt
# frame's worth of sample-period-steps, in seconds # frame's worth of sample-period-steps, in seconds
frame_size_s: float = len(array) * timeframe frame_size_s: float = len(array) * timeframe
recv_frame_dur: Duration = ( expected_frame_size_s: float = frame_size_s + timeframe
from_timestamp(array[-1]['time']) if frame_time_diff_s > expected_frame_size_s:
-
from_timestamp(array[0]['time'])
)
if (
(lt_frame := (recv_frame_dur < expected_dur))
or
(null_frame := (frame_size_s == 0))
# ^XXX, should NEVER hit now!
):
# XXX: query result includes a start point prior to our # XXX: query result includes a start point prior to our
# expected "frame size" and thus is likely some kind of # expected "frame size" and thus is likely some kind of
# history gap (eg. market closed period, outage, etc.) # history gap (eg. market closed period, outage, etc.)
# so just report it to console for now. # so just report it to console for now.
if lt_frame:
reason = 'Possible GAP (or first-datum)'
else:
assert null_frame
reason = 'NULL-FRAME'
missing_dur: Interval = expected_dur.end - recv_frame_dur.end
log.warning( log.warning(
f'{timeframe}s-series {reason} detected!\n' 'GAP DETECTED:\n'
f'fqme: {mkt.fqme}\n' f'last_start_dt: {last_start_dt}\n'
f'last_start_dt: {last_start_dt}\n\n' f'diff: {diff}\n'
f'recv interval: {recv_frame_dur}\n' f'frame_time_diff_s: {frame_time_diff_s}\n'
f'expected interval: {expected_dur}\n\n'
f'Missing duration of history of {missing_dur.in_words()!r}\n'
f'{missing_dur}\n'
) )
# await tractor.pause()
to_push = diff_history( to_push = diff_history(
array, array,
@ -612,8 +565,7 @@ async def start_backfill(
# long-term storage. # long-term storage.
if ( if (
storage is not None storage is not None
and and write_tsdb
write_tsdb
): ):
log.info( log.info(
f'Writing {ln} frame to storage:\n' f'Writing {ln} frame to storage:\n'
@ -626,7 +578,6 @@ async def start_backfill(
'crypto', 'crypto',
'crypto_currency', 'crypto_currency',
'fiat', # a "forex pair" 'fiat', # a "forex pair"
'perpetual_future', # stupid "perps" from cex land
}: }:
# for now, our table key schema is not including # for now, our table key schema is not including
# the dst[/src] source asset token. # the dst[/src] source asset token.
@ -734,7 +685,7 @@ async def back_load_from_tsdb(
last_tsdb_dt last_tsdb_dt
and latest_start_dt and latest_start_dt
): ):
backfilled_size_s: Duration = ( backfilled_size_s = (
latest_start_dt - last_tsdb_dt latest_start_dt - last_tsdb_dt
).seconds ).seconds
# if the shm buffer len is not large enough to contain # if the shm buffer len is not large enough to contain
@ -957,8 +908,6 @@ async def tsdb_backfill(
f'{pformat(config)}\n' f'{pformat(config)}\n'
) )
# concurrently load the provider's most-recent-frame AND any
# pre-existing tsdb history already saved in `piker` storage.
dt_eps: list[DateTime, DateTime] = [] dt_eps: list[DateTime, DateTime] = []
async with trio.open_nursery() as tn: async with trio.open_nursery() as tn:
tn.start_soon( tn.start_soon(
@ -969,6 +918,7 @@ async def tsdb_backfill(
timeframe, timeframe,
config, config,
) )
tsdb_entry: tuple = await load_tsdb_hist( tsdb_entry: tuple = await load_tsdb_hist(
storage, storage,
mkt, mkt,
@ -997,25 +947,6 @@ async def tsdb_backfill(
mr_end_dt, mr_end_dt,
) = dt_eps ) = dt_eps
first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds
calced_frame_size: Duration = mk_duration(
seconds=first_frame_dur_s,
)
# NOTE, attempt to use the backend declared default frame
# sizing (as allowed by their time-series query APIs) and
# if not provided try to construct a default from the
# first frame received above.
def_frame_durs: dict[
int,
Duration,
]|None = config.get('frame_types', None)
if def_frame_durs:
def_frame_size: Duration = def_frame_durs[timeframe]
assert def_frame_size == calced_frame_size
else:
# use what we calced from first frame above.
def_frame_size = calced_frame_size
# NOTE: when there's no offline data, there's 2 cases: # NOTE: when there's no offline data, there's 2 cases:
# - data backend doesn't support timeframe/sample # - data backend doesn't support timeframe/sample
# period (in which case `dt_eps` should be `None` and # period (in which case `dt_eps` should be `None` and
@ -1046,7 +977,7 @@ async def tsdb_backfill(
partial( partial(
start_backfill, start_backfill,
get_hist=get_hist, get_hist=get_hist,
def_frame_duration=def_frame_size, frame_types=config.get('frame_types', None),
mod=mod, mod=mod,
mkt=mkt, mkt=mkt,
shm=shm, shm=shm,

2850
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -25,11 +25,11 @@ build-backend = "hatchling.build"
ignore = [] ignore = []
# https://docs.astral.sh/ruff/settings/#lint_per-file-ignores # https://docs.astral.sh/ruff/settings/#lint_per-file-ignores
# "piker/ui/qt.py" = [ "piker/ui/qt.py" = [
# "E402", "E402",
# 'F401', # unused imports (without __all__ or blah as blah) 'F401', # unused imports (without __all__ or blah as blah)
# # "F841", # unused variable rules # "F841", # unused variable rules
# ] ]
# ignore-init-module-imports = false # ignore-init-module-imports = false
# ------ - ------ # ------ - ------

View File

@ -10,7 +10,7 @@ from piker import (
config, config,
) )
from piker.service import ( from piker.service import (
get_service_mngr, Services,
) )
from piker.log import get_console_log from piker.log import get_console_log
@ -129,7 +129,7 @@ async def _open_test_pikerd(
) as service_manager, ) as service_manager,
): ):
# this proc/actor is the pikerd # this proc/actor is the pikerd
assert service_manager is get_service_mngr() assert service_manager is Services
async with tractor.wait_for_actor( async with tractor.wait_for_actor(
'pikerd', 'pikerd',

View File

@ -26,7 +26,7 @@ import pytest
import tractor import tractor
from uuid import uuid4 from uuid import uuid4
from piker.service import ServiceMngr from piker.service import Services
from piker.log import get_logger from piker.log import get_logger
from piker.clearing._messages import ( from piker.clearing._messages import (
Order, Order,
@ -158,7 +158,7 @@ def load_and_check_pos(
def test_ems_err_on_bad_broker( def test_ems_err_on_bad_broker(
open_test_pikerd: ServiceMngr, open_test_pikerd: Services,
loglevel: str, loglevel: str,
): ):
async def load_bad_fqme(): async def load_bad_fqme():

View File

@ -15,7 +15,7 @@ import tractor
from piker.service import ( from piker.service import (
find_service, find_service,
ServiceMngr, Services,
) )
from piker.data import ( from piker.data import (
open_feed, open_feed,
@ -44,7 +44,7 @@ def test_runtime_boot(
async def main(): async def main():
port = 6666 port = 6666
daemon_addr = ('127.0.0.1', port) daemon_addr = ('127.0.0.1', port)
services: ServiceMngr services: Services
async with ( async with (
open_test_pikerd( open_test_pikerd(