Compare commits
1 Commits
42e442c36a
...
4b64ded255
Author | SHA1 | Date |
---|---|---|
|
4b64ded255 |
|
@ -55,10 +55,9 @@ from cryptofeed.defines import (
|
||||||
OPTION, CALL, PUT
|
OPTION, CALL, PUT
|
||||||
)
|
)
|
||||||
from cryptofeed.symbols import Symbol
|
from cryptofeed.symbols import Symbol
|
||||||
from cryptofeed.types import (
|
|
||||||
L1Book,
|
# types for managing the cb callbacks.
|
||||||
Trade,
|
# from cryptofeed.types import L1Book
|
||||||
)
|
|
||||||
from piker.brokers import SymbolNotFound
|
from piker.brokers import SymbolNotFound
|
||||||
from .venues import (
|
from .venues import (
|
||||||
_ws_url,
|
_ws_url,
|
||||||
|
@ -67,7 +66,9 @@ from .venues import (
|
||||||
Pair,
|
Pair,
|
||||||
OptionPair,
|
OptionPair,
|
||||||
JSONRPCResult,
|
JSONRPCResult,
|
||||||
|
# JSONRPCChannel,
|
||||||
KLinesResult,
|
KLinesResult,
|
||||||
|
# Trade,
|
||||||
LastTradesResult,
|
LastTradesResult,
|
||||||
)
|
)
|
||||||
from piker.accounting import (
|
from piker.accounting import (
|
||||||
|
@ -97,17 +98,9 @@ _spawn_kwargs = {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def deribit_timestamp(when: datetime) -> int:
|
# convert datetime obj timestamp to unixtime in milliseconds
|
||||||
'''
|
def deribit_timestamp(when) -> int:
|
||||||
Convert conventional epoch timestamp, in secs, to unixtime in
|
return int((when.timestamp() * 1000) + (when.microsecond / 1000))
|
||||||
milliseconds.
|
|
||||||
|
|
||||||
'''
|
|
||||||
return int(
|
|
||||||
(when.timestamp() * 1000)
|
|
||||||
+
|
|
||||||
(when.microsecond / 1000)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def str_to_cb_sym(name: str) -> Symbol:
|
def str_to_cb_sym(name: str) -> Symbol:
|
||||||
|
@ -162,28 +155,11 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# TODO, instead can't we just lookup the `MktPair` directly
|
def cb_sym_to_deribit_inst(sym: Symbol):
|
||||||
# and pass it upward to `stream_quotes()`??
|
|
||||||
def cb_sym_to_deribit_inst(sym: Symbol) -> str:
|
|
||||||
'''
|
|
||||||
Generate our own internal `str`-repr for a `cryptofeed.Symbol`
|
|
||||||
uniquely from its fields.
|
|
||||||
|
|
||||||
This is the equiv of generating a `Pair.fmqe` from `cryptofeed`
|
|
||||||
for now i suppose..?
|
|
||||||
|
|
||||||
'''
|
|
||||||
new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date)
|
new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date)
|
||||||
otype = (
|
otype = 'C' if sym.option_type == CALL else 'P'
|
||||||
'C' if sym.option_type == CALL
|
|
||||||
else 'P'
|
return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}'
|
||||||
)
|
|
||||||
return (
|
|
||||||
f'{sym.base}-'
|
|
||||||
f'{new_expiry_date}-'
|
|
||||||
f'{sym.strike_price}-'
|
|
||||||
f'{otype}'
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def get_values_from_cb_normalized_date(expiry_date: str) -> str:
|
def get_values_from_cb_normalized_date(expiry_date: str) -> str:
|
||||||
|
@ -622,7 +598,7 @@ async def get_client(
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def open_feed_handler() -> FeedHandler:
|
async def open_feed_handler():
|
||||||
fh = FeedHandler(config=get_config())
|
fh = FeedHandler(config=get_config())
|
||||||
yield fh
|
yield fh
|
||||||
await to_asyncio.run_task(fh.stop_async)
|
await to_asyncio.run_task(fh.stop_async)
|
||||||
|
@ -643,37 +619,59 @@ async def aio_price_feed_relay(
|
||||||
from_trio: asyncio.Queue,
|
from_trio: asyncio.Queue,
|
||||||
to_trio: trio.abc.SendChannel,
|
to_trio: trio.abc.SendChannel,
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
|
||||||
Relay price feed quotes from the `cryptofeed.FeedHandler` to
|
|
||||||
the `piker`-side `trio.task` consumers for delivery to consumer
|
|
||||||
sub-actors for various subsystems.
|
|
||||||
|
|
||||||
'''
|
|
||||||
async def _trade(
|
async def _trade(
|
||||||
trade: Trade, # cryptofeed, NOT ours from `.venues`!
|
data: dict,
|
||||||
receipt_timestamp: int,
|
receipt_timestamp: int,
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side.
|
Send `cryptofeed.FeedHandler` quotes to `piker`-side
|
||||||
|
`trio.Task`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
to_trio.send_nowait(('trade', trade))
|
to_trio.send_nowait((
|
||||||
|
'trade', {
|
||||||
|
'symbol': cb_sym_to_deribit_inst(
|
||||||
|
str_to_cb_sym(data.symbol)).lower(),
|
||||||
|
'last': data,
|
||||||
|
'broker_ts': time.time(),
|
||||||
|
'data': data.to_dict(),
|
||||||
|
'receipt': receipt_timestamp,
|
||||||
|
},
|
||||||
|
))
|
||||||
|
|
||||||
async def _l1(
|
async def _l1(
|
||||||
book: L1Book,
|
data: dict,
|
||||||
receipt_timestamp: int,
|
receipt_timestamp: int,
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
to_trio.send_nowait((
|
||||||
Relay-thru "l1 book" updates.
|
'l1', {
|
||||||
|
'symbol': cb_sym_to_deribit_inst(
|
||||||
'''
|
str_to_cb_sym(data.symbol)).lower(),
|
||||||
|
'ticks': [
|
||||||
to_trio.send_nowait(('l1', book))
|
{
|
||||||
|
'type': 'bid',
|
||||||
# TODO, make this work!
|
'price': float(data.bid_price),
|
||||||
# -[ ] why isn't this working in `tractor.pause_from_sync()`??
|
'size': float(data.bid_size)
|
||||||
# breakpoint()
|
},
|
||||||
|
{
|
||||||
|
'type': 'bsize',
|
||||||
|
'price': float(data.bid_price),
|
||||||
|
'size': float(data.bid_size)
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'type': 'ask',
|
||||||
|
'price': float(data.ask_price),
|
||||||
|
'size': float(data.ask_size)
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'type': 'asize',
|
||||||
|
'price': float(data.ask_price),
|
||||||
|
'size': float(data.ask_size)
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
))
|
||||||
sym: Symbol = piker_sym_to_cb_sym(instrument)
|
sym: Symbol = piker_sym_to_cb_sym(instrument)
|
||||||
fh.add_feed(
|
fh.add_feed(
|
||||||
DERIBIT,
|
DERIBIT,
|
||||||
|
@ -687,35 +685,27 @@ async def aio_price_feed_relay(
|
||||||
if not fh.running:
|
if not fh.running:
|
||||||
fh.run(
|
fh.run(
|
||||||
start_loop=False,
|
start_loop=False,
|
||||||
install_signal_handlers=False
|
install_signal_handlers=False)
|
||||||
)
|
|
||||||
|
|
||||||
# sync with trio
|
# sync with trio
|
||||||
to_trio.send_nowait(None)
|
to_trio.send_nowait(None)
|
||||||
|
|
||||||
# run until cancelled
|
|
||||||
await asyncio.sleep(float('inf'))
|
await asyncio.sleep(float('inf'))
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def open_price_feed(
|
async def open_price_feed(
|
||||||
instrument: str
|
instrument: str
|
||||||
) -> to_asyncio.LinkedTaskChannel:
|
) -> trio.abc.ReceiveStream:
|
||||||
|
async with maybe_open_feed_handler() as fh:
|
||||||
fh: FeedHandler
|
async with to_asyncio.open_channel_from(
|
||||||
first: None
|
|
||||||
chan: to_asyncio.LinkedTaskChannel
|
|
||||||
async with (
|
|
||||||
maybe_open_feed_handler() as fh,
|
|
||||||
to_asyncio.open_channel_from(
|
|
||||||
partial(
|
partial(
|
||||||
aio_price_feed_relay,
|
aio_price_feed_relay,
|
||||||
fh,
|
fh,
|
||||||
instrument
|
instrument
|
||||||
)
|
)
|
||||||
) as (first, chan)
|
) as (first, chan):
|
||||||
):
|
yield chan
|
||||||
yield chan
|
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
|
@ -724,7 +714,6 @@ async def maybe_open_price_feed(
|
||||||
) -> trio.abc.ReceiveStream:
|
) -> trio.abc.ReceiveStream:
|
||||||
|
|
||||||
# TODO: add a predicate to maybe_open_context
|
# TODO: add a predicate to maybe_open_context
|
||||||
feed: to_asyncio.LinkedTaskChannel
|
|
||||||
async with maybe_open_context(
|
async with maybe_open_context(
|
||||||
acm_func=open_price_feed,
|
acm_func=open_price_feed,
|
||||||
kwargs={
|
kwargs={
|
||||||
|
|
|
@ -63,7 +63,6 @@ from .api import (
|
||||||
# get_config,
|
# get_config,
|
||||||
piker_sym_to_cb_sym,
|
piker_sym_to_cb_sym,
|
||||||
cb_sym_to_deribit_inst,
|
cb_sym_to_deribit_inst,
|
||||||
str_to_cb_sym,
|
|
||||||
maybe_open_price_feed
|
maybe_open_price_feed
|
||||||
)
|
)
|
||||||
from .venues import (
|
from .venues import (
|
||||||
|
@ -238,19 +237,13 @@ async def stream_quotes(
|
||||||
'''
|
'''
|
||||||
Open a live quote stream for the market set defined by `symbols`.
|
Open a live quote stream for the market set defined by `symbols`.
|
||||||
|
|
||||||
Internally this starts a `cryptofeed.FeedHandler` inside an `asyncio`-side
|
|
||||||
task and relays through L1 and `Trade` msgs here to our `trio.Task`.
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
sym = symbols[0].split('.')[0]
|
sym = symbols[0].split('.')[0]
|
||||||
init_msgs: list[FeedInit] = []
|
init_msgs: list[FeedInit] = []
|
||||||
|
|
||||||
# multiline nested `dict` formatter (since rn quote-msgs are
|
# multiline nested `dict` formatter (since rn quote-msgs are
|
||||||
# just that).
|
# just that).
|
||||||
pfmt: Callable[[str], str] = mk_repr(
|
pfmt: Callable[[str], str] = mk_repr()
|
||||||
# so we can see `deribit`'s delightfully mega-long bs fields..
|
|
||||||
maxstring=100,
|
|
||||||
)
|
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
open_cached_client('deribit') as client,
|
open_cached_client('deribit') as client,
|
||||||
|
@ -269,31 +262,25 @@ async def stream_quotes(
|
||||||
# build `cryptofeed` feed-handle
|
# build `cryptofeed` feed-handle
|
||||||
cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym)
|
cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym)
|
||||||
|
|
||||||
from_cf: tractor.to_asyncio.LinkedTaskChannel
|
async with maybe_open_price_feed(sym) as stream:
|
||||||
async with maybe_open_price_feed(sym) as from_cf:
|
last_trades = (
|
||||||
|
await client.last_trades(
|
||||||
|
cb_sym_to_deribit_inst(cf_sym),
|
||||||
|
count=1,
|
||||||
|
)
|
||||||
|
).trades
|
||||||
|
|
||||||
# load the "last trades" summary
|
if len(last_trades) == 0:
|
||||||
last_trades_res: cryptofeed.LastTradesResult = await client.last_trades(
|
last_trade = None
|
||||||
cb_sym_to_deribit_inst(cf_sym),
|
async for typ, quote in stream:
|
||||||
count=1,
|
if typ == 'trade':
|
||||||
)
|
last_trade = Trade(**(quote['data']))
|
||||||
last_trades: list[Trade] = last_trades_res.trades
|
break
|
||||||
|
|
||||||
# TODO, do we even need this or will the above always
|
else:
|
||||||
# work?
|
last_trade = Trade(**(last_trades[0]))
|
||||||
# if not last_trades:
|
|
||||||
# await tractor.pause()
|
|
||||||
# async for typ, quote in from_cf:
|
|
||||||
# if typ == 'trade':
|
|
||||||
# last_trade = Trade(**(quote['data']))
|
|
||||||
# break
|
|
||||||
|
|
||||||
# else:
|
first_quote = {
|
||||||
last_trade = Trade(
|
|
||||||
**(last_trades[0])
|
|
||||||
)
|
|
||||||
|
|
||||||
first_quote: dict = {
|
|
||||||
'symbol': sym,
|
'symbol': sym,
|
||||||
'last': last_trade.price,
|
'last': last_trade.price,
|
||||||
'brokerd_ts': last_trade.timestamp,
|
'brokerd_ts': last_trade.timestamp,
|
||||||
|
@ -318,69 +305,14 @@ async def stream_quotes(
|
||||||
topic: str = mkt.bs_fqme
|
topic: str = mkt.bs_fqme
|
||||||
|
|
||||||
# deliver until cancelled
|
# deliver until cancelled
|
||||||
async for typ, ref in from_cf:
|
async for typ, quote in stream:
|
||||||
match typ:
|
sym: str = quote['symbol']
|
||||||
case 'trade':
|
log.info(
|
||||||
trade: cryptofeed.types.Trade = ref
|
f'deribit {typ!r} quote for {sym!r}\n\n'
|
||||||
|
f'{pfmt(quote)}\n'
|
||||||
# TODO, re-impl this according to teh ideal
|
)
|
||||||
# fqme for opts that we choose!!
|
|
||||||
bs_fqme: str = cb_sym_to_deribit_inst(
|
|
||||||
str_to_cb_sym(trade.symbol)
|
|
||||||
).lower()
|
|
||||||
|
|
||||||
piker_quote: dict = {
|
|
||||||
'symbol': bs_fqme,
|
|
||||||
'last': trade.price,
|
|
||||||
'broker_ts': time.time(),
|
|
||||||
# ^TODO, name this `brokerd/datad_ts` and
|
|
||||||
# use `time.time_ns()` ??
|
|
||||||
'ticks': [{
|
|
||||||
'type': 'trade',
|
|
||||||
'price': float(trade.price),
|
|
||||||
'size': float(trade.amount),
|
|
||||||
'broker_ts': trade.timestamp,
|
|
||||||
}],
|
|
||||||
}
|
|
||||||
log.info(
|
|
||||||
f'deribit {typ!r} quote for {sym!r}\n\n'
|
|
||||||
f'{trade}\n\n'
|
|
||||||
f'{pfmt(piker_quote)}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
case 'l1':
|
|
||||||
book: cryptofeed.types.L1Book = ref
|
|
||||||
|
|
||||||
# TODO, so this is where we can possibly change things
|
|
||||||
# and instead lever the `MktPair.bs_fqme: str` output?
|
|
||||||
bs_fqme: str = cb_sym_to_deribit_inst(
|
|
||||||
str_to_cb_sym(book.symbol)
|
|
||||||
).lower()
|
|
||||||
|
|
||||||
piker_quote: dict = {
|
|
||||||
'symbol': bs_fqme,
|
|
||||||
'ticks': [
|
|
||||||
|
|
||||||
{'type': 'bid',
|
|
||||||
'price': float(book.bid_price),
|
|
||||||
'size': float(book.bid_size)},
|
|
||||||
|
|
||||||
{'type': 'bsize',
|
|
||||||
'price': float(book.bid_price),
|
|
||||||
'size': float(book.bid_size),},
|
|
||||||
|
|
||||||
{'type': 'ask',
|
|
||||||
'price': float(book.ask_price),
|
|
||||||
'size': float(book.ask_size),},
|
|
||||||
|
|
||||||
{'type': 'asize',
|
|
||||||
'price': float(book.ask_price),
|
|
||||||
'size': float(book.ask_size),}
|
|
||||||
]
|
|
||||||
}
|
|
||||||
|
|
||||||
await send_chan.send({
|
await send_chan.send({
|
||||||
topic: piker_quote,
|
topic: quote,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
|
@ -395,6 +327,7 @@ async def open_symbol_search(
|
||||||
await ctx.started()
|
await ctx.started()
|
||||||
|
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
|
|
||||||
pattern: str
|
pattern: str
|
||||||
async for pattern in stream:
|
async for pattern in stream:
|
||||||
|
|
||||||
|
|
|
@ -154,7 +154,6 @@ class JSONRPCResult(Struct):
|
||||||
error: Optional[dict] = None
|
error: Optional[dict] = None
|
||||||
result: Optional[list[dict]] = None
|
result: Optional[list[dict]] = None
|
||||||
|
|
||||||
|
|
||||||
class JSONRPCChannel(Struct):
|
class JSONRPCChannel(Struct):
|
||||||
method: str
|
method: str
|
||||||
params: dict
|
params: dict
|
||||||
|
@ -171,7 +170,6 @@ class KLinesResult(Struct):
|
||||||
status: str
|
status: str
|
||||||
volume: list[float]
|
volume: list[float]
|
||||||
|
|
||||||
|
|
||||||
class Trade(Struct):
|
class Trade(Struct):
|
||||||
iv: float
|
iv: float
|
||||||
price: float
|
price: float
|
||||||
|
@ -190,7 +188,6 @@ class Trade(Struct):
|
||||||
block_trade_id: Optional[str] = '',
|
block_trade_id: Optional[str] = '',
|
||||||
block_trade_leg_count: Optional[int] = 0,
|
block_trade_leg_count: Optional[int] = 0,
|
||||||
|
|
||||||
|
|
||||||
class LastTradesResult(Struct):
|
class LastTradesResult(Struct):
|
||||||
trades: list[Trade]
|
trades: list[Trade]
|
||||||
has_more: bool
|
has_more: bool
|
||||||
|
|
Loading…
Reference in New Issue