Compare commits
2 Commits
b8512edd1b
...
2cb7b505e1
Author | SHA1 | Date |
---|---|---|
|
2cb7b505e1 | |
|
33a37f24c7 |
|
@ -122,7 +122,7 @@ def str_to_cb_sym(name: str) -> Symbol:
|
|||
quote = base
|
||||
|
||||
if option_type == 'put':
|
||||
option_type = PUT
|
||||
option_type = PUT
|
||||
elif option_type == 'call':
|
||||
option_type = CALL
|
||||
else:
|
||||
|
@ -154,7 +154,7 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
|
|||
quote: str = base
|
||||
|
||||
if option_type == 'P' or option_type == 'PUT':
|
||||
option_type = PUT
|
||||
option_type = PUT
|
||||
elif option_type == 'C' or option_type == 'CALL':
|
||||
option_type = CALL
|
||||
else:
|
||||
|
@ -420,7 +420,7 @@ class Client:
|
|||
"""
|
||||
Get a dict with all expiration dates listed as value and currency as key.
|
||||
"""
|
||||
|
||||
|
||||
params: dict[str, str] = {
|
||||
'currency': currency.upper(),
|
||||
'kind': kind,
|
||||
|
@ -744,38 +744,6 @@ async def aio_price_feed_relay(
|
|||
the `piker`-side `trio.task` consumers for delivery to consumer
|
||||
sub-actors for various subsystems.
|
||||
|
||||
'''
|
||||
async def _trade(
|
||||
trade: Trade, # cryptofeed, NOT ours from `.venues`!
|
||||
receipt_timestamp: int,
|
||||
) -> None:
|
||||
'''
|
||||
Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side.
|
||||
|
||||
'''
|
||||
to_trio.send_nowait(('trade', trade))
|
||||
|
||||
async def _l1(
|
||||
book: L1Book,
|
||||
receipt_timestamp: int,
|
||||
) -> None:
|
||||
'''
|
||||
Relay-thru "l1 book" updates.
|
||||
|
||||
'''
|
||||
|
||||
to_trio.send_nowait(('l1', book))
|
||||
|
||||
# TODO, make this work!
|
||||
# -[ ] why isn't this working in `tractor.pause_from_sync()`??
|
||||
# breakpoint()
|
||||
|
||||
sym: Symbol = piker_sym_to_cb_sym(instrument)
|
||||
'''
|
||||
Relay price feed quotes from the `cryptofeed.FeedHandler` to
|
||||
the `piker`-side `trio.task` consumers for delivery to consumer
|
||||
sub-actors for various subsystems.
|
||||
|
||||
'''
|
||||
async def _trade(
|
||||
trade: Trade, # cryptofeed, NOT ours from `.venues`!
|
||||
|
@ -807,7 +775,6 @@ async def aio_price_feed_relay(
|
|||
DERIBIT,
|
||||
channels=[TRADES, L1_BOOK],
|
||||
symbols=[sym],
|
||||
symbols=[sym],
|
||||
callbacks={
|
||||
TRADES: _trade,
|
||||
L1_BOOK: _l1
|
||||
|
@ -818,13 +785,10 @@ async def aio_price_feed_relay(
|
|||
start_loop=False,
|
||||
install_signal_handlers=False
|
||||
)
|
||||
install_signal_handlers=False
|
||||
)
|
||||
|
||||
# sync with trio
|
||||
to_trio.send_nowait(None)
|
||||
|
||||
# run until cancelled
|
||||
# run until cancelled
|
||||
await asyncio.sleep(float('inf'))
|
||||
|
||||
|
@ -834,14 +798,6 @@ async def open_price_feed(
|
|||
instrument: str
|
||||
) -> to_asyncio.LinkedTaskChannel:
|
||||
|
||||
fh: FeedHandler
|
||||
first: None
|
||||
chan: to_asyncio.LinkedTaskChannel
|
||||
async with (
|
||||
maybe_open_feed_handler() as fh,
|
||||
to_asyncio.open_channel_from(
|
||||
) -> to_asyncio.LinkedTaskChannel:
|
||||
|
||||
fh: FeedHandler
|
||||
first: None
|
||||
chan: to_asyncio.LinkedTaskChannel
|
||||
|
@ -849,34 +805,28 @@ async def open_price_feed(
|
|||
maybe_open_feed_handler() as fh,
|
||||
to_asyncio.open_channel_from(
|
||||
partial(
|
||||
aio_open_interest_feed_relay,
|
||||
aio_price_feed_relay,
|
||||
fh,
|
||||
instruments,
|
||||
instrument
|
||||
)
|
||||
) as (first, chan)
|
||||
):
|
||||
yield chan
|
||||
) as (first, chan)
|
||||
):
|
||||
yield chan
|
||||
|
||||
|
||||
@acm
|
||||
async def maybe_open_oi_feed(
|
||||
instruments: list[Symbol],
|
||||
async def maybe_open_price_feed(
|
||||
instrument: str
|
||||
) -> trio.abc.ReceiveStream:
|
||||
|
||||
# TODO: add a predicate to maybe_open_context
|
||||
feed: to_asyncio.LinkedTaskChannel
|
||||
feed: to_asyncio.LinkedTaskChannel
|
||||
async with maybe_open_context(
|
||||
acm_func=open_oi_feed,
|
||||
acm_func=open_price_feed,
|
||||
kwargs={
|
||||
'instrument': instrument.split('.')[0]
|
||||
'instrument': instrument.split('.')[0]
|
||||
},
|
||||
key=f'{instrument.split('.')[0]}-price',
|
||||
key=f'{instrument.split('.')[0]}-price',
|
||||
) as (cache_hit, feed):
|
||||
if cache_hit:
|
||||
yield broadcast_receiver(feed, 10)
|
||||
|
@ -884,6 +834,7 @@ async def maybe_open_oi_feed(
|
|||
yield feed
|
||||
|
||||
|
||||
|
||||
async def aio_open_interest_feed_relay(
|
||||
fh: FeedHandler,
|
||||
instruments: list[Symbol],
|
||||
|
@ -955,7 +906,7 @@ async def aio_open_interest_feed_relay(
|
|||
|
||||
@acm
|
||||
async def open_oi_feed(
|
||||
instruments: list[Symbol],
|
||||
instruments: list[Symbol],
|
||||
) -> to_asyncio.LinkedTaskChannel:
|
||||
|
||||
fh: FeedHandler
|
||||
|
@ -976,7 +927,7 @@ async def open_oi_feed(
|
|||
|
||||
@acm
|
||||
async def maybe_open_oi_feed(
|
||||
instruments: list[Symbol],
|
||||
instruments: list[Symbol],
|
||||
) -> trio.abc.ReceiveStream:
|
||||
|
||||
# TODO: add a predicate to maybe_open_context
|
||||
|
|
Loading…
Reference in New Issue