Compare commits
10 Commits
b734245183
...
9ad25ef2e6
Author | SHA1 | Date |
---|---|---|
|
9ad25ef2e6 | |
|
5cefe8bcdb | |
|
d96e9d4f11 | |
|
a0dcf14aba | |
|
1705afb607 | |
|
dafd5a3ca5 | |
|
b9dde98d1e | |
|
1616cc0e82 | |
|
0a2ed195a7 | |
|
28e8628c61 |
|
@ -28,8 +28,9 @@ from decimal import (
|
|||
Decimal,
|
||||
)
|
||||
from functools import partial
|
||||
import time
|
||||
from pathlib import Path
|
||||
from pprint import pformat
|
||||
import time
|
||||
from typing import (
|
||||
Any,
|
||||
Optional,
|
||||
|
@ -54,9 +55,10 @@ from cryptofeed.defines import (
|
|||
OPTION, CALL, PUT
|
||||
)
|
||||
from cryptofeed.symbols import Symbol
|
||||
|
||||
# types for managing the cb callbacks.
|
||||
# from cryptofeed.types import L1Book
|
||||
from cryptofeed.types import (
|
||||
L1Book,
|
||||
Trade,
|
||||
)
|
||||
from piker.brokers import SymbolNotFound
|
||||
from .venues import (
|
||||
_ws_url,
|
||||
|
@ -65,9 +67,7 @@ from .venues import (
|
|||
Pair,
|
||||
OptionPair,
|
||||
JSONRPCResult,
|
||||
# JSONRPCChannel,
|
||||
KLinesResult,
|
||||
# Trade,
|
||||
LastTradesResult,
|
||||
)
|
||||
from piker.accounting import (
|
||||
|
@ -97,9 +97,17 @@ _spawn_kwargs = {
|
|||
}
|
||||
|
||||
|
||||
# convert datetime obj timestamp to unixtime in milliseconds
|
||||
def deribit_timestamp(when) -> int:
|
||||
return int((when.timestamp() * 1000) + (when.microsecond / 1000))
|
||||
def deribit_timestamp(when: datetime) -> int:
|
||||
'''
|
||||
Convert conventional epoch timestamp, in secs, to unixtime in
|
||||
milliseconds.
|
||||
|
||||
'''
|
||||
return int(
|
||||
(when.timestamp() * 1000)
|
||||
+
|
||||
(when.microsecond / 1000)
|
||||
)
|
||||
|
||||
|
||||
def str_to_cb_sym(name: str) -> Symbol:
|
||||
|
@ -122,14 +130,20 @@ def str_to_cb_sym(name: str) -> Symbol:
|
|||
type=OPTION,
|
||||
strike_price=strike_price,
|
||||
option_type=option_type,
|
||||
expiry_date=new_expiry_date)
|
||||
expiry_date=new_expiry_date
|
||||
)
|
||||
|
||||
|
||||
def piker_sym_to_cb_sym(name: str) -> Symbol:
|
||||
base, expiry_date, strike_price, option_type = tuple(
|
||||
(
|
||||
base,
|
||||
expiry_date,
|
||||
strike_price,
|
||||
option_type,
|
||||
)= tuple(
|
||||
name.upper().split('-'))
|
||||
|
||||
quote = base
|
||||
quote: str = base
|
||||
|
||||
if option_type == 'P':
|
||||
option_type = PUT
|
||||
|
@ -144,14 +158,32 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
|
|||
type=OPTION,
|
||||
strike_price=strike_price,
|
||||
option_type=option_type,
|
||||
expiry_date=expiry_date)
|
||||
expiry_date=expiry_date
|
||||
)
|
||||
|
||||
|
||||
def cb_sym_to_deribit_inst(sym: Symbol):
|
||||
# TODO, instead can't we just lookup the `MktPair` directly
|
||||
# and pass it upward to `stream_quotes()`??
|
||||
def cb_sym_to_deribit_inst(sym: Symbol) -> str:
|
||||
'''
|
||||
Generate our own internal `str`-repr for a `cryptofeed.Symbol`
|
||||
uniquely from its fields.
|
||||
|
||||
This is the equiv of generating a `Pair.fmqe` from `cryptofeed`
|
||||
for now i suppose..?
|
||||
|
||||
'''
|
||||
new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date)
|
||||
otype = 'C' if sym.option_type == CALL else 'P'
|
||||
|
||||
return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}'
|
||||
otype = (
|
||||
'C' if sym.option_type == CALL
|
||||
else 'P'
|
||||
)
|
||||
return (
|
||||
f'{sym.base}-'
|
||||
f'{new_expiry_date}-'
|
||||
f'{sym.strike_price}-'
|
||||
f'{otype}'
|
||||
)
|
||||
|
||||
|
||||
def get_values_from_cb_normalized_date(expiry_date: str) -> str:
|
||||
|
@ -207,7 +239,10 @@ def get_config() -> dict[str, Any]:
|
|||
|
||||
|
||||
class Client:
|
||||
'''
|
||||
Hi-level interface for the jsron-RPC over websocket API.
|
||||
|
||||
'''
|
||||
def __init__(
|
||||
self,
|
||||
|
||||
|
@ -369,6 +404,19 @@ class Client:
|
|||
return cached_pair
|
||||
|
||||
if sym:
|
||||
opt: OptionPair|None = pair_table.get(sym)
|
||||
if not opt:
|
||||
closest_matches: dict[str, Pair] = match_from_pairs(
|
||||
pairs=pair_table,
|
||||
query=sym,
|
||||
score_cutoff=40,
|
||||
)
|
||||
closest_syms: list[str] = list(closest_matches.keys())
|
||||
raise ValueError(
|
||||
f'No contract found for {sym!r}\n\n'
|
||||
f'Closest {len(closest_syms)} available contracts:\n\n'
|
||||
f'{pformat(closest_syms)}\n'
|
||||
)
|
||||
return pair_table[sym]
|
||||
else:
|
||||
return self._pairs
|
||||
|
@ -574,7 +622,7 @@ async def get_client(
|
|||
|
||||
|
||||
@acm
|
||||
async def open_feed_handler():
|
||||
async def open_feed_handler() -> FeedHandler:
|
||||
fh = FeedHandler(config=get_config())
|
||||
yield fh
|
||||
await to_asyncio.run_task(fh.stop_async)
|
||||
|
@ -595,43 +643,37 @@ async def aio_price_feed_relay(
|
|||
from_trio: asyncio.Queue,
|
||||
to_trio: trio.abc.SendChannel,
|
||||
) -> None:
|
||||
async def _trade(data: dict, receipt_timestamp):
|
||||
to_trio.send_nowait(('trade', {
|
||||
'symbol': cb_sym_to_deribit_inst(
|
||||
str_to_cb_sym(data.symbol)).lower(),
|
||||
'last': data,
|
||||
'broker_ts': time.time(),
|
||||
'data': data.to_dict(),
|
||||
'receipt': receipt_timestamp
|
||||
}))
|
||||
'''
|
||||
Relay price feed quotes from the `cryptofeed.FeedHandler` to
|
||||
the `piker`-side `trio.task` consumers for delivery to consumer
|
||||
sub-actors for various subsystems.
|
||||
|
||||
'''
|
||||
async def _trade(
|
||||
trade: Trade, # cryptofeed, NOT ours from `.venues`!
|
||||
receipt_timestamp: int,
|
||||
) -> None:
|
||||
'''
|
||||
Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side.
|
||||
|
||||
'''
|
||||
to_trio.send_nowait(('trade', trade))
|
||||
|
||||
async def _l1(
|
||||
book: L1Book,
|
||||
receipt_timestamp: int,
|
||||
) -> None:
|
||||
'''
|
||||
Relay-thru "l1 book" updates.
|
||||
|
||||
'''
|
||||
|
||||
to_trio.send_nowait(('l1', book))
|
||||
|
||||
# TODO, make this work!
|
||||
# -[ ] why isn't this working in `tractor.pause_from_sync()`??
|
||||
# breakpoint()
|
||||
|
||||
async def _l1(data: dict, receipt_timestamp):
|
||||
to_trio.send_nowait(('l1', {
|
||||
'symbol': cb_sym_to_deribit_inst(
|
||||
str_to_cb_sym(data.symbol)).lower(),
|
||||
'ticks': [
|
||||
{
|
||||
'type': 'bid',
|
||||
'price': float(data.bid_price),
|
||||
'size': float(data.bid_size)
|
||||
},
|
||||
{
|
||||
'type': 'bsize',
|
||||
'price': float(data.bid_price),
|
||||
'size': float(data.bid_size)
|
||||
},
|
||||
{
|
||||
'type': 'ask',
|
||||
'price': float(data.ask_price),
|
||||
'size': float(data.ask_size)
|
||||
},
|
||||
{
|
||||
'type': 'asize',
|
||||
'price': float(data.ask_price),
|
||||
'size': float(data.ask_size)
|
||||
}
|
||||
]
|
||||
}))
|
||||
sym: Symbol = piker_sym_to_cb_sym(instrument)
|
||||
fh.add_feed(
|
||||
DERIBIT,
|
||||
|
@ -645,27 +687,35 @@ async def aio_price_feed_relay(
|
|||
if not fh.running:
|
||||
fh.run(
|
||||
start_loop=False,
|
||||
install_signal_handlers=False)
|
||||
install_signal_handlers=False
|
||||
)
|
||||
|
||||
# sync with trio
|
||||
to_trio.send_nowait(None)
|
||||
|
||||
# run until cancelled
|
||||
await asyncio.sleep(float('inf'))
|
||||
|
||||
|
||||
@acm
|
||||
async def open_price_feed(
|
||||
instrument: str
|
||||
) -> trio.abc.ReceiveStream:
|
||||
async with maybe_open_feed_handler() as fh:
|
||||
async with to_asyncio.open_channel_from(
|
||||
) -> to_asyncio.LinkedTaskChannel:
|
||||
|
||||
fh: FeedHandler
|
||||
first: None
|
||||
chan: to_asyncio.LinkedTaskChannel
|
||||
async with (
|
||||
maybe_open_feed_handler() as fh,
|
||||
to_asyncio.open_channel_from(
|
||||
partial(
|
||||
aio_price_feed_relay,
|
||||
fh,
|
||||
instrument
|
||||
)
|
||||
) as (first, chan):
|
||||
yield chan
|
||||
) as (first, chan)
|
||||
):
|
||||
yield chan
|
||||
|
||||
|
||||
@acm
|
||||
|
@ -674,6 +724,7 @@ async def maybe_open_price_feed(
|
|||
) -> trio.abc.ReceiveStream:
|
||||
|
||||
# TODO: add a predicate to maybe_open_context
|
||||
feed: to_asyncio.LinkedTaskChannel
|
||||
async with maybe_open_context(
|
||||
acm_func=open_price_feed,
|
||||
kwargs={
|
||||
|
|
|
@ -29,6 +29,7 @@ from typing import (
|
|||
# from pprint import pformat
|
||||
import time
|
||||
|
||||
import cryptofeed
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
from pendulum import (
|
||||
|
@ -52,25 +53,17 @@ from piker._cacheables import (
|
|||
)
|
||||
from piker.log import (
|
||||
get_logger,
|
||||
mk_repr,
|
||||
)
|
||||
from piker.data.validate import FeedInit
|
||||
|
||||
# from cryptofeed import FeedHandler
|
||||
# from cryptofeed.defines import (
|
||||
# DERIBIT,
|
||||
# L1_BOOK,
|
||||
# TRADES,
|
||||
# OPTION,
|
||||
# CALL,
|
||||
# PUT,
|
||||
# )
|
||||
# from cryptofeed.symbols import Symbol
|
||||
|
||||
from .api import (
|
||||
Client,
|
||||
# get_config,
|
||||
piker_sym_to_cb_sym,
|
||||
cb_sym_to_deribit_inst,
|
||||
str_to_cb_sym,
|
||||
maybe_open_price_feed
|
||||
)
|
||||
from .venues import (
|
||||
|
@ -219,63 +212,88 @@ async def get_mkt_info(
|
|||
price_tick=pair.price_tick,
|
||||
size_tick=pair.size_tick,
|
||||
bs_mktid=pair.symbol,
|
||||
expiry=pair.expiry,
|
||||
venue=mkt_mode,
|
||||
broker='deribit',
|
||||
_atype=mkt_mode,
|
||||
_fqme_without_src=True,
|
||||
|
||||
# expiry=pair.expiry,
|
||||
# XXX TODO, currently we don't use it since it's
|
||||
# already "described" in the `OptionPair.symbol: str`
|
||||
# and if we slap in the ISO repr it's kinda hideous..
|
||||
# -[ ] figure out the best either std
|
||||
)
|
||||
return mkt, pair
|
||||
|
||||
|
||||
async def stream_quotes(
|
||||
|
||||
send_chan: trio.abc.SendChannel,
|
||||
symbols: list[str],
|
||||
feed_is_live: trio.Event,
|
||||
loglevel: str = None,
|
||||
|
||||
# startup sync
|
||||
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Open a live quote stream for the market set defined by `symbols`.
|
||||
|
||||
Internally this starts a `cryptofeed.FeedHandler` inside an `asyncio`-side
|
||||
task and relays through L1 and `Trade` msgs here to our `trio.Task`.
|
||||
|
||||
'''
|
||||
sym = symbols[0].split('.')[0]
|
||||
|
||||
init_msgs: list[FeedInit] = []
|
||||
|
||||
# multiline nested `dict` formatter (since rn quote-msgs are
|
||||
# just that).
|
||||
pfmt: Callable[[str], str] = mk_repr(
|
||||
# so we can see `deribit`'s delightfully mega-long bs fields..
|
||||
maxstring=100,
|
||||
)
|
||||
|
||||
async with (
|
||||
open_cached_client('deribit') as client,
|
||||
send_chan as send_chan
|
||||
):
|
||||
|
||||
mkt: MktPair
|
||||
pair: Pair
|
||||
mkt, pair = await get_mkt_info(sym)
|
||||
|
||||
# build out init msgs according to latest spec
|
||||
init_msgs.append(
|
||||
FeedInit(mkt_info=mkt)
|
||||
FeedInit(
|
||||
mkt_info=mkt,
|
||||
)
|
||||
)
|
||||
nsym = piker_sym_to_cb_sym(sym)
|
||||
# build `cryptofeed` feed-handle
|
||||
cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym)
|
||||
|
||||
async with maybe_open_price_feed(sym) as stream:
|
||||
from_cf: tractor.to_asyncio.LinkedTaskChannel
|
||||
async with maybe_open_price_feed(sym) as from_cf:
|
||||
|
||||
# TODO, uhh use it ?? XD
|
||||
# cache = client._pairs
|
||||
# load the "last trades" summary
|
||||
last_trades_res: cryptofeed.LastTradesResult = await client.last_trades(
|
||||
cb_sym_to_deribit_inst(cf_sym),
|
||||
count=1,
|
||||
)
|
||||
last_trades: list[Trade] = last_trades_res.trades
|
||||
|
||||
last_trades = (await client.last_trades(
|
||||
cb_sym_to_deribit_inst(nsym), count=1)).trades
|
||||
# TODO, do we even need this or will the above always
|
||||
# work?
|
||||
# if not last_trades:
|
||||
# await tractor.pause()
|
||||
# async for typ, quote in from_cf:
|
||||
# if typ == 'trade':
|
||||
# last_trade = Trade(**(quote['data']))
|
||||
# break
|
||||
|
||||
if len(last_trades) == 0:
|
||||
last_trade = None
|
||||
async for typ, quote in stream:
|
||||
if typ == 'trade':
|
||||
last_trade = Trade(**(quote['data']))
|
||||
break
|
||||
# else:
|
||||
last_trade = Trade(
|
||||
**(last_trades[0])
|
||||
)
|
||||
|
||||
else:
|
||||
last_trade = Trade(**(last_trades[0]))
|
||||
|
||||
first_quote = {
|
||||
first_quote: dict = {
|
||||
'symbol': sym,
|
||||
'last': last_trade.price,
|
||||
'brokerd_ts': last_trade.timestamp,
|
||||
|
@ -286,19 +304,83 @@ async def stream_quotes(
|
|||
'broker_ts': last_trade.timestamp
|
||||
}]
|
||||
}
|
||||
task_status.started((init_msgs, first_quote))
|
||||
task_status.started((
|
||||
init_msgs,
|
||||
first_quote,
|
||||
))
|
||||
|
||||
feed_is_live.set()
|
||||
|
||||
# NOTE XXX, static for now!
|
||||
# => since this only handles ONE mkt feed at a time we
|
||||
# don't need a lookup table to map interleaved quotes
|
||||
# from multiple possible mkt-pairs
|
||||
topic: str = mkt.bs_fqme
|
||||
|
||||
# deliver until cancelled
|
||||
async for typ, quote in stream:
|
||||
topic: str = quote['symbol']
|
||||
log.info(
|
||||
f'deribit {typ!r} quote\n\n'
|
||||
f'{quote}\n'
|
||||
)
|
||||
async for typ, ref in from_cf:
|
||||
match typ:
|
||||
case 'trade':
|
||||
trade: cryptofeed.types.Trade = ref
|
||||
|
||||
# TODO, re-impl this according to teh ideal
|
||||
# fqme for opts that we choose!!
|
||||
bs_fqme: str = cb_sym_to_deribit_inst(
|
||||
str_to_cb_sym(trade.symbol)
|
||||
).lower()
|
||||
|
||||
piker_quote: dict = {
|
||||
'symbol': bs_fqme,
|
||||
'last': trade.price,
|
||||
'broker_ts': time.time(),
|
||||
# ^TODO, name this `brokerd/datad_ts` and
|
||||
# use `time.time_ns()` ??
|
||||
'ticks': [{
|
||||
'type': 'trade',
|
||||
'price': float(trade.price),
|
||||
'size': float(trade.amount),
|
||||
'broker_ts': trade.timestamp,
|
||||
}],
|
||||
}
|
||||
log.info(
|
||||
f'deribit {typ!r} quote for {sym!r}\n\n'
|
||||
f'{trade}\n\n'
|
||||
f'{pfmt(piker_quote)}\n'
|
||||
)
|
||||
|
||||
case 'l1':
|
||||
book: cryptofeed.types.L1Book = ref
|
||||
|
||||
# TODO, so this is where we can possibly change things
|
||||
# and instead lever the `MktPair.bs_fqme: str` output?
|
||||
bs_fqme: str = cb_sym_to_deribit_inst(
|
||||
str_to_cb_sym(book.symbol)
|
||||
).lower()
|
||||
|
||||
piker_quote: dict = {
|
||||
'symbol': bs_fqme,
|
||||
'ticks': [
|
||||
|
||||
{'type': 'bid',
|
||||
'price': float(book.bid_price),
|
||||
'size': float(book.bid_size)},
|
||||
|
||||
{'type': 'bsize',
|
||||
'price': float(book.bid_price),
|
||||
'size': float(book.bid_size),},
|
||||
|
||||
{'type': 'ask',
|
||||
'price': float(book.ask_price),
|
||||
'size': float(book.ask_size),},
|
||||
|
||||
{'type': 'asize',
|
||||
'price': float(book.ask_price),
|
||||
'size': float(book.ask_size),}
|
||||
]
|
||||
}
|
||||
|
||||
await send_chan.send({
|
||||
topic: quote,
|
||||
topic: piker_quote,
|
||||
})
|
||||
|
||||
|
||||
|
@ -313,7 +395,6 @@ async def open_symbol_search(
|
|||
await ctx.started()
|
||||
|
||||
async with ctx.open_stream() as stream:
|
||||
|
||||
pattern: str
|
||||
async for pattern in stream:
|
||||
|
||||
|
|
|
@ -26,8 +26,6 @@ from typing import (
|
|||
)
|
||||
from decimal import Decimal
|
||||
|
||||
from msgspec import field
|
||||
|
||||
from piker.types import Struct
|
||||
|
||||
|
||||
|
@ -115,9 +113,13 @@ class OptionPair(Pair, frozen=True):
|
|||
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
|
||||
ns_path: str = 'piker.brokers.deribit:OptionPair'
|
||||
|
||||
# TODO, impl this without the MM:SS part of
|
||||
# the `'THH:MM:SS..'` etc..
|
||||
@property
|
||||
def expiry(self) -> str:
|
||||
iso_date = pendulum.from_timestamp(self.expiration_timestamp / 1000).isoformat()
|
||||
iso_date = pendulum.from_timestamp(
|
||||
self.expiration_timestamp / 1000
|
||||
).isoformat()
|
||||
return iso_date
|
||||
|
||||
@property
|
||||
|
@ -152,6 +154,7 @@ class JSONRPCResult(Struct):
|
|||
error: Optional[dict] = None
|
||||
result: Optional[list[dict]] = None
|
||||
|
||||
|
||||
class JSONRPCChannel(Struct):
|
||||
method: str
|
||||
params: dict
|
||||
|
@ -168,6 +171,7 @@ class KLinesResult(Struct):
|
|||
status: str
|
||||
volume: list[float]
|
||||
|
||||
|
||||
class Trade(Struct):
|
||||
iv: float
|
||||
price: float
|
||||
|
@ -186,6 +190,7 @@ class Trade(Struct):
|
|||
block_trade_id: Optional[str] = '',
|
||||
block_trade_leg_count: Optional[int] = 0,
|
||||
|
||||
|
||||
class LastTradesResult(Struct):
|
||||
trades: list[Trade]
|
||||
has_more: bool
|
||||
|
|
|
@ -30,6 +30,7 @@ import time
|
|||
from typing import (
|
||||
Any,
|
||||
AsyncIterator,
|
||||
Callable,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
|
@ -54,6 +55,9 @@ from ._util import (
|
|||
get_console_log,
|
||||
)
|
||||
from ..service import maybe_spawn_daemon
|
||||
from piker.log import (
|
||||
mk_repr,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ._sharedmem import (
|
||||
|
@ -575,7 +579,6 @@ async def open_sample_stream(
|
|||
|
||||
|
||||
async def sample_and_broadcast(
|
||||
|
||||
bus: _FeedsBus, # noqa
|
||||
rt_shm: ShmArray,
|
||||
hist_shm: ShmArray,
|
||||
|
@ -596,11 +599,22 @@ async def sample_and_broadcast(
|
|||
|
||||
overruns = Counter()
|
||||
|
||||
# multiline nested `dict` formatter (since rn quote-msgs are
|
||||
# just that).
|
||||
pfmt: Callable[[str], str] = mk_repr()
|
||||
|
||||
# iterate stream delivered by broker
|
||||
async for quotes in quote_stream:
|
||||
# print(quotes)
|
||||
|
||||
# TODO: ``numba`` this!
|
||||
# XXX WARNING XXX only enable for debugging bc ow can cost
|
||||
# ALOT of perf with HF-feedz!!!
|
||||
#
|
||||
# log.info(
|
||||
# 'Rx live quotes:\n'
|
||||
# f'{pfmt(quotes)}'
|
||||
# )
|
||||
|
||||
# TODO: `numba` this!
|
||||
for broker_symbol, quote in quotes.items():
|
||||
# TODO: in theory you can send the IPC msg *before* writing
|
||||
# to the sharedmem array to decrease latency, however, that
|
||||
|
@ -673,6 +687,18 @@ async def sample_and_broadcast(
|
|||
sub_key: str = broker_symbol.lower()
|
||||
subs: set[Sub] = bus.get_subs(sub_key)
|
||||
|
||||
if not subs:
|
||||
all_bs_fqmes: list[str] = list(
|
||||
bus._subscribers.keys()
|
||||
)
|
||||
log.warning(
|
||||
f'No subscribers for {brokername!r} live-quote ??\n'
|
||||
f'broker_symbol: {broker_symbol}\n\n'
|
||||
|
||||
f'Maybe the backend-sys symbol does not match one of,\n'
|
||||
f'{pfmt(all_bs_fqmes)}\n'
|
||||
)
|
||||
|
||||
# NOTE: by default the broker backend doesn't append
|
||||
# it's own "name" into the fqme schema (but maybe it
|
||||
# should?) so we have to manually generate the correct
|
||||
|
|
|
@ -540,7 +540,10 @@ async def open_feed_bus(
|
|||
# subscription since the backend isn't (yet) expected to
|
||||
# append it's own name to the fqme, so we filter on keys
|
||||
# which *do not* include that name (e.g .ib) .
|
||||
bus._subscribers.setdefault(bs_fqme, set())
|
||||
bus._subscribers.setdefault(
|
||||
bs_fqme,
|
||||
set(),
|
||||
)
|
||||
|
||||
# sync feed subscribers with flume handles
|
||||
await ctx.started(
|
||||
|
|
28
piker/log.py
28
piker/log.py
|
@ -18,7 +18,11 @@
|
|||
Log like a forester!
|
||||
"""
|
||||
import logging
|
||||
import reprlib
|
||||
import json
|
||||
from typing import (
|
||||
Callable,
|
||||
)
|
||||
|
||||
import tractor
|
||||
from pygments import (
|
||||
|
@ -84,3 +88,27 @@ def colorize_json(
|
|||
# likeable styles: algol_nu, tango, monokai
|
||||
formatters.TerminalTrueColorFormatter(style=style)
|
||||
)
|
||||
|
||||
|
||||
def mk_repr(
|
||||
**repr_kws,
|
||||
) -> Callable[[str], str]:
|
||||
'''
|
||||
Allocate and deliver a `repr.Repr` instance with provided input
|
||||
settings using the std-lib's `reprlib` mod,
|
||||
* https://docs.python.org/3/library/reprlib.html
|
||||
|
||||
------ Ex. ------
|
||||
An up to 6-layer-nested `dict` as multi-line:
|
||||
- https://stackoverflow.com/a/79102479
|
||||
- https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
|
||||
|
||||
'''
|
||||
def_kws: dict[str, int] = dict(
|
||||
indent=2,
|
||||
maxlevel=6, # recursion levels
|
||||
maxstring=66, # match editor line-len limit
|
||||
)
|
||||
def_kws |= repr_kws
|
||||
reprr = reprlib.Repr(**def_kws)
|
||||
return reprr.repr
|
||||
|
|
|
@ -161,7 +161,12 @@ class NativeStorageClient:
|
|||
|
||||
def index_files(self):
|
||||
for path in self._datadir.iterdir():
|
||||
if path.name in {'borked', 'expired',}:
|
||||
if (
|
||||
path.name in {'borked', 'expired',}
|
||||
or
|
||||
'.parquet' not in str(path)
|
||||
):
|
||||
# ignore all non-apache files (for now)
|
||||
continue
|
||||
|
||||
key: str = path.name.rstrip('.parquet')
|
||||
|
|
|
@ -500,7 +500,6 @@ async def start_backfill(
|
|||
)
|
||||
|
||||
assert time[-1] == next_end_dt.timestamp()
|
||||
|
||||
expected_dur: Interval = last_start_dt - next_start_dt
|
||||
|
||||
# frame's worth of sample-period-steps, in seconds
|
||||
|
@ -1011,7 +1010,15 @@ async def tsdb_backfill(
|
|||
]|None = config.get('frame_types', None)
|
||||
if def_frame_durs:
|
||||
def_frame_size: Duration = def_frame_durs[timeframe]
|
||||
assert def_frame_size == calced_frame_size
|
||||
if def_frame_size != calced_frame_size:
|
||||
log.warning(
|
||||
f'Provider {mod.name!r} frame diff then declared ?\n\n'
|
||||
f'fqme: {mkt.fqme}\n'
|
||||
f'def_frame_size: {def_frame_size}\n'
|
||||
f'calced_frame_size: {calced_frame_size}\n\n'
|
||||
|
||||
f'=> possibly the frame contains a GAP ?\n'
|
||||
)
|
||||
else:
|
||||
# use what we calced from first frame above.
|
||||
def_frame_size = calced_frame_size
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -25,11 +25,11 @@ build-backend = "poetry.core.masonry.api"
|
|||
ignore = []
|
||||
|
||||
# https://docs.astral.sh/ruff/settings/#lint_per-file-ignores
|
||||
"piker/ui/qt.py" = [
|
||||
"E402",
|
||||
'F401', # unused imports (without __all__ or blah as blah)
|
||||
# "F841", # unused variable rules
|
||||
]
|
||||
# "piker/ui/qt.py" = [
|
||||
# "E402",
|
||||
# 'F401', # unused imports (without __all__ or blah as blah)
|
||||
# # "F841", # unused variable rules
|
||||
# ]
|
||||
# ignore-init-module-imports = false
|
||||
|
||||
# ------ - ------
|
||||
|
@ -72,12 +72,8 @@ httpx = "^0.27.0"
|
|||
cryptofeed = "^2.4.0"
|
||||
pyarrow = "^17.0.0"
|
||||
|
||||
[tool.poetry.dependencies.tractor]
|
||||
develop = true
|
||||
git = 'https://pikers.dev/goodboy/tractor.git'
|
||||
branch = 'aio_abandons'
|
||||
# path = "../tractor"
|
||||
|
||||
tractor = {path = "../tractor", develop = true}
|
||||
websockets = "12.0"
|
||||
[tool.poetry.dependencies.asyncvnc]
|
||||
git = 'https://github.com/pikers/asyncvnc.git'
|
||||
branch = 'main'
|
||||
|
|
Loading…
Reference in New Issue