Compare commits

..

1 Commits

Author SHA1 Message Date
Nelson Torres 4b64ded255 necessary libraries for qt6 2025-01-28 15:49:08 -03:00
3 changed files with 86 additions and 167 deletions

View File

@ -55,10 +55,9 @@ from cryptofeed.defines import (
OPTION, CALL, PUT OPTION, CALL, PUT
) )
from cryptofeed.symbols import Symbol from cryptofeed.symbols import Symbol
from cryptofeed.types import (
L1Book, # types for managing the cb callbacks.
Trade, # from cryptofeed.types import L1Book
)
from piker.brokers import SymbolNotFound from piker.brokers import SymbolNotFound
from .venues import ( from .venues import (
_ws_url, _ws_url,
@ -67,7 +66,9 @@ from .venues import (
Pair, Pair,
OptionPair, OptionPair,
JSONRPCResult, JSONRPCResult,
# JSONRPCChannel,
KLinesResult, KLinesResult,
# Trade,
LastTradesResult, LastTradesResult,
) )
from piker.accounting import ( from piker.accounting import (
@ -97,17 +98,9 @@ _spawn_kwargs = {
} }
def deribit_timestamp(when: datetime) -> int: # convert datetime obj timestamp to unixtime in milliseconds
''' def deribit_timestamp(when) -> int:
Convert conventional epoch timestamp, in secs, to unixtime in return int((when.timestamp() * 1000) + (when.microsecond / 1000))
milliseconds.
'''
return int(
(when.timestamp() * 1000)
+
(when.microsecond / 1000)
)
def str_to_cb_sym(name: str) -> Symbol: def str_to_cb_sym(name: str) -> Symbol:
@ -162,28 +155,11 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
) )
# TODO, instead can't we just lookup the `MktPair` directly def cb_sym_to_deribit_inst(sym: Symbol):
# and pass it upward to `stream_quotes()`??
def cb_sym_to_deribit_inst(sym: Symbol) -> str:
'''
Generate our own internal `str`-repr for a `cryptofeed.Symbol`
uniquely from its fields.
This is the equiv of generating a `Pair.fmqe` from `cryptofeed`
for now i suppose..?
'''
new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date) new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date)
otype = ( otype = 'C' if sym.option_type == CALL else 'P'
'C' if sym.option_type == CALL
else 'P' return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}'
)
return (
f'{sym.base}-'
f'{new_expiry_date}-'
f'{sym.strike_price}-'
f'{otype}'
)
def get_values_from_cb_normalized_date(expiry_date: str) -> str: def get_values_from_cb_normalized_date(expiry_date: str) -> str:
@ -622,7 +598,7 @@ async def get_client(
@acm @acm
async def open_feed_handler() -> FeedHandler: async def open_feed_handler():
fh = FeedHandler(config=get_config()) fh = FeedHandler(config=get_config())
yield fh yield fh
await to_asyncio.run_task(fh.stop_async) await to_asyncio.run_task(fh.stop_async)
@ -643,37 +619,59 @@ async def aio_price_feed_relay(
from_trio: asyncio.Queue, from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel, to_trio: trio.abc.SendChannel,
) -> None: ) -> None:
'''
Relay price feed quotes from the `cryptofeed.FeedHandler` to
the `piker`-side `trio.task` consumers for delivery to consumer
sub-actors for various subsystems.
'''
async def _trade( async def _trade(
trade: Trade, # cryptofeed, NOT ours from `.venues`! data: dict,
receipt_timestamp: int, receipt_timestamp: int,
) -> None: ) -> None:
''' '''
Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side. Send `cryptofeed.FeedHandler` quotes to `piker`-side
`trio.Task`.
''' '''
to_trio.send_nowait(('trade', trade)) to_trio.send_nowait((
'trade', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'last': data,
'broker_ts': time.time(),
'data': data.to_dict(),
'receipt': receipt_timestamp,
},
))
async def _l1( async def _l1(
book: L1Book, data: dict,
receipt_timestamp: int, receipt_timestamp: int,
) -> None: ) -> None:
''' to_trio.send_nowait((
Relay-thru "l1 book" updates. 'l1', {
'symbol': cb_sym_to_deribit_inst(
''' str_to_cb_sym(data.symbol)).lower(),
'ticks': [
to_trio.send_nowait(('l1', book)) {
'type': 'bid',
# TODO, make this work! 'price': float(data.bid_price),
# -[ ] why isn't this working in `tractor.pause_from_sync()`?? 'size': float(data.bid_size)
# breakpoint() },
{
'type': 'bsize',
'price': float(data.bid_price),
'size': float(data.bid_size)
},
{
'type': 'ask',
'price': float(data.ask_price),
'size': float(data.ask_size)
},
{
'type': 'asize',
'price': float(data.ask_price),
'size': float(data.ask_size)
}
]
},
))
sym: Symbol = piker_sym_to_cb_sym(instrument) sym: Symbol = piker_sym_to_cb_sym(instrument)
fh.add_feed( fh.add_feed(
DERIBIT, DERIBIT,
@ -687,35 +685,27 @@ async def aio_price_feed_relay(
if not fh.running: if not fh.running:
fh.run( fh.run(
start_loop=False, start_loop=False,
install_signal_handlers=False install_signal_handlers=False)
)
# sync with trio # sync with trio
to_trio.send_nowait(None) to_trio.send_nowait(None)
# run until cancelled
await asyncio.sleep(float('inf')) await asyncio.sleep(float('inf'))
@acm @acm
async def open_price_feed( async def open_price_feed(
instrument: str instrument: str
) -> to_asyncio.LinkedTaskChannel: ) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh:
fh: FeedHandler async with to_asyncio.open_channel_from(
first: None
chan: to_asyncio.LinkedTaskChannel
async with (
maybe_open_feed_handler() as fh,
to_asyncio.open_channel_from(
partial( partial(
aio_price_feed_relay, aio_price_feed_relay,
fh, fh,
instrument instrument
) )
) as (first, chan) ) as (first, chan):
): yield chan
yield chan
@acm @acm
@ -724,7 +714,6 @@ async def maybe_open_price_feed(
) -> trio.abc.ReceiveStream: ) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context # TODO: add a predicate to maybe_open_context
feed: to_asyncio.LinkedTaskChannel
async with maybe_open_context( async with maybe_open_context(
acm_func=open_price_feed, acm_func=open_price_feed,
kwargs={ kwargs={

View File

@ -63,7 +63,6 @@ from .api import (
# get_config, # get_config,
piker_sym_to_cb_sym, piker_sym_to_cb_sym,
cb_sym_to_deribit_inst, cb_sym_to_deribit_inst,
str_to_cb_sym,
maybe_open_price_feed maybe_open_price_feed
) )
from .venues import ( from .venues import (
@ -238,19 +237,13 @@ async def stream_quotes(
''' '''
Open a live quote stream for the market set defined by `symbols`. Open a live quote stream for the market set defined by `symbols`.
Internally this starts a `cryptofeed.FeedHandler` inside an `asyncio`-side
task and relays through L1 and `Trade` msgs here to our `trio.Task`.
''' '''
sym = symbols[0].split('.')[0] sym = symbols[0].split('.')[0]
init_msgs: list[FeedInit] = [] init_msgs: list[FeedInit] = []
# multiline nested `dict` formatter (since rn quote-msgs are # multiline nested `dict` formatter (since rn quote-msgs are
# just that). # just that).
pfmt: Callable[[str], str] = mk_repr( pfmt: Callable[[str], str] = mk_repr()
# so we can see `deribit`'s delightfully mega-long bs fields..
maxstring=100,
)
async with ( async with (
open_cached_client('deribit') as client, open_cached_client('deribit') as client,
@ -269,31 +262,25 @@ async def stream_quotes(
# build `cryptofeed` feed-handle # build `cryptofeed` feed-handle
cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym) cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym)
from_cf: tractor.to_asyncio.LinkedTaskChannel async with maybe_open_price_feed(sym) as stream:
async with maybe_open_price_feed(sym) as from_cf: last_trades = (
await client.last_trades(
cb_sym_to_deribit_inst(cf_sym),
count=1,
)
).trades
# load the "last trades" summary if len(last_trades) == 0:
last_trades_res: cryptofeed.LastTradesResult = await client.last_trades( last_trade = None
cb_sym_to_deribit_inst(cf_sym), async for typ, quote in stream:
count=1, if typ == 'trade':
) last_trade = Trade(**(quote['data']))
last_trades: list[Trade] = last_trades_res.trades break
# TODO, do we even need this or will the above always else:
# work? last_trade = Trade(**(last_trades[0]))
# if not last_trades:
# await tractor.pause()
# async for typ, quote in from_cf:
# if typ == 'trade':
# last_trade = Trade(**(quote['data']))
# break
# else: first_quote = {
last_trade = Trade(
**(last_trades[0])
)
first_quote: dict = {
'symbol': sym, 'symbol': sym,
'last': last_trade.price, 'last': last_trade.price,
'brokerd_ts': last_trade.timestamp, 'brokerd_ts': last_trade.timestamp,
@ -318,69 +305,14 @@ async def stream_quotes(
topic: str = mkt.bs_fqme topic: str = mkt.bs_fqme
# deliver until cancelled # deliver until cancelled
async for typ, ref in from_cf: async for typ, quote in stream:
match typ: sym: str = quote['symbol']
case 'trade': log.info(
trade: cryptofeed.types.Trade = ref f'deribit {typ!r} quote for {sym!r}\n\n'
f'{pfmt(quote)}\n'
# TODO, re-impl this according to teh ideal )
# fqme for opts that we choose!!
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(trade.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'last': trade.price,
'broker_ts': time.time(),
# ^TODO, name this `brokerd/datad_ts` and
# use `time.time_ns()` ??
'ticks': [{
'type': 'trade',
'price': float(trade.price),
'size': float(trade.amount),
'broker_ts': trade.timestamp,
}],
}
log.info(
f'deribit {typ!r} quote for {sym!r}\n\n'
f'{trade}\n\n'
f'{pfmt(piker_quote)}\n'
)
case 'l1':
book: cryptofeed.types.L1Book = ref
# TODO, so this is where we can possibly change things
# and instead lever the `MktPair.bs_fqme: str` output?
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(book.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'ticks': [
{'type': 'bid',
'price': float(book.bid_price),
'size': float(book.bid_size)},
{'type': 'bsize',
'price': float(book.bid_price),
'size': float(book.bid_size),},
{'type': 'ask',
'price': float(book.ask_price),
'size': float(book.ask_size),},
{'type': 'asize',
'price': float(book.ask_price),
'size': float(book.ask_size),}
]
}
await send_chan.send({ await send_chan.send({
topic: piker_quote, topic: quote,
}) })
@ -395,6 +327,7 @@ async def open_symbol_search(
await ctx.started() await ctx.started()
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
pattern: str pattern: str
async for pattern in stream: async for pattern in stream:

View File

@ -154,7 +154,6 @@ class JSONRPCResult(Struct):
error: Optional[dict] = None error: Optional[dict] = None
result: Optional[list[dict]] = None result: Optional[list[dict]] = None
class JSONRPCChannel(Struct): class JSONRPCChannel(Struct):
method: str method: str
params: dict params: dict
@ -171,7 +170,6 @@ class KLinesResult(Struct):
status: str status: str
volume: list[float] volume: list[float]
class Trade(Struct): class Trade(Struct):
iv: float iv: float
price: float price: float
@ -190,7 +188,6 @@ class Trade(Struct):
block_trade_id: Optional[str] = '', block_trade_id: Optional[str] = '',
block_trade_leg_count: Optional[int] = 0, block_trade_leg_count: Optional[int] = 0,
class LastTradesResult(Struct): class LastTradesResult(Struct):
trades: list[Trade] trades: list[Trade]
has_more: bool has_more: bool