Compare commits
2 Commits
b8512edd1b
...
2cb7b505e1
Author | SHA1 | Date |
---|---|---|
|
2cb7b505e1 | |
|
33a37f24c7 |
|
@ -122,7 +122,7 @@ def str_to_cb_sym(name: str) -> Symbol:
|
||||||
quote = base
|
quote = base
|
||||||
|
|
||||||
if option_type == 'put':
|
if option_type == 'put':
|
||||||
option_type = PUT
|
option_type = PUT
|
||||||
elif option_type == 'call':
|
elif option_type == 'call':
|
||||||
option_type = CALL
|
option_type = CALL
|
||||||
else:
|
else:
|
||||||
|
@ -154,7 +154,7 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
|
||||||
quote: str = base
|
quote: str = base
|
||||||
|
|
||||||
if option_type == 'P' or option_type == 'PUT':
|
if option_type == 'P' or option_type == 'PUT':
|
||||||
option_type = PUT
|
option_type = PUT
|
||||||
elif option_type == 'C' or option_type == 'CALL':
|
elif option_type == 'C' or option_type == 'CALL':
|
||||||
option_type = CALL
|
option_type = CALL
|
||||||
else:
|
else:
|
||||||
|
@ -420,7 +420,7 @@ class Client:
|
||||||
"""
|
"""
|
||||||
Get a dict with all expiration dates listed as value and currency as key.
|
Get a dict with all expiration dates listed as value and currency as key.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
params: dict[str, str] = {
|
params: dict[str, str] = {
|
||||||
'currency': currency.upper(),
|
'currency': currency.upper(),
|
||||||
'kind': kind,
|
'kind': kind,
|
||||||
|
@ -744,38 +744,6 @@ async def aio_price_feed_relay(
|
||||||
the `piker`-side `trio.task` consumers for delivery to consumer
|
the `piker`-side `trio.task` consumers for delivery to consumer
|
||||||
sub-actors for various subsystems.
|
sub-actors for various subsystems.
|
||||||
|
|
||||||
'''
|
|
||||||
async def _trade(
|
|
||||||
trade: Trade, # cryptofeed, NOT ours from `.venues`!
|
|
||||||
receipt_timestamp: int,
|
|
||||||
) -> None:
|
|
||||||
'''
|
|
||||||
Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side.
|
|
||||||
|
|
||||||
'''
|
|
||||||
to_trio.send_nowait(('trade', trade))
|
|
||||||
|
|
||||||
async def _l1(
|
|
||||||
book: L1Book,
|
|
||||||
receipt_timestamp: int,
|
|
||||||
) -> None:
|
|
||||||
'''
|
|
||||||
Relay-thru "l1 book" updates.
|
|
||||||
|
|
||||||
'''
|
|
||||||
|
|
||||||
to_trio.send_nowait(('l1', book))
|
|
||||||
|
|
||||||
# TODO, make this work!
|
|
||||||
# -[ ] why isn't this working in `tractor.pause_from_sync()`??
|
|
||||||
# breakpoint()
|
|
||||||
|
|
||||||
sym: Symbol = piker_sym_to_cb_sym(instrument)
|
|
||||||
'''
|
|
||||||
Relay price feed quotes from the `cryptofeed.FeedHandler` to
|
|
||||||
the `piker`-side `trio.task` consumers for delivery to consumer
|
|
||||||
sub-actors for various subsystems.
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def _trade(
|
async def _trade(
|
||||||
trade: Trade, # cryptofeed, NOT ours from `.venues`!
|
trade: Trade, # cryptofeed, NOT ours from `.venues`!
|
||||||
|
@ -807,7 +775,6 @@ async def aio_price_feed_relay(
|
||||||
DERIBIT,
|
DERIBIT,
|
||||||
channels=[TRADES, L1_BOOK],
|
channels=[TRADES, L1_BOOK],
|
||||||
symbols=[sym],
|
symbols=[sym],
|
||||||
symbols=[sym],
|
|
||||||
callbacks={
|
callbacks={
|
||||||
TRADES: _trade,
|
TRADES: _trade,
|
||||||
L1_BOOK: _l1
|
L1_BOOK: _l1
|
||||||
|
@ -818,13 +785,10 @@ async def aio_price_feed_relay(
|
||||||
start_loop=False,
|
start_loop=False,
|
||||||
install_signal_handlers=False
|
install_signal_handlers=False
|
||||||
)
|
)
|
||||||
install_signal_handlers=False
|
|
||||||
)
|
|
||||||
|
|
||||||
# sync with trio
|
# sync with trio
|
||||||
to_trio.send_nowait(None)
|
to_trio.send_nowait(None)
|
||||||
|
|
||||||
# run until cancelled
|
|
||||||
# run until cancelled
|
# run until cancelled
|
||||||
await asyncio.sleep(float('inf'))
|
await asyncio.sleep(float('inf'))
|
||||||
|
|
||||||
|
@ -834,14 +798,6 @@ async def open_price_feed(
|
||||||
instrument: str
|
instrument: str
|
||||||
) -> to_asyncio.LinkedTaskChannel:
|
) -> to_asyncio.LinkedTaskChannel:
|
||||||
|
|
||||||
fh: FeedHandler
|
|
||||||
first: None
|
|
||||||
chan: to_asyncio.LinkedTaskChannel
|
|
||||||
async with (
|
|
||||||
maybe_open_feed_handler() as fh,
|
|
||||||
to_asyncio.open_channel_from(
|
|
||||||
) -> to_asyncio.LinkedTaskChannel:
|
|
||||||
|
|
||||||
fh: FeedHandler
|
fh: FeedHandler
|
||||||
first: None
|
first: None
|
||||||
chan: to_asyncio.LinkedTaskChannel
|
chan: to_asyncio.LinkedTaskChannel
|
||||||
|
@ -849,34 +805,28 @@ async def open_price_feed(
|
||||||
maybe_open_feed_handler() as fh,
|
maybe_open_feed_handler() as fh,
|
||||||
to_asyncio.open_channel_from(
|
to_asyncio.open_channel_from(
|
||||||
partial(
|
partial(
|
||||||
aio_open_interest_feed_relay,
|
aio_price_feed_relay,
|
||||||
fh,
|
fh,
|
||||||
instruments,
|
instrument
|
||||||
)
|
)
|
||||||
) as (first, chan)
|
) as (first, chan)
|
||||||
):
|
):
|
||||||
yield chan
|
yield chan
|
||||||
) as (first, chan)
|
|
||||||
):
|
|
||||||
yield chan
|
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def maybe_open_oi_feed(
|
async def maybe_open_price_feed(
|
||||||
instruments: list[Symbol],
|
instrument: str
|
||||||
) -> trio.abc.ReceiveStream:
|
) -> trio.abc.ReceiveStream:
|
||||||
|
|
||||||
# TODO: add a predicate to maybe_open_context
|
# TODO: add a predicate to maybe_open_context
|
||||||
feed: to_asyncio.LinkedTaskChannel
|
feed: to_asyncio.LinkedTaskChannel
|
||||||
feed: to_asyncio.LinkedTaskChannel
|
|
||||||
async with maybe_open_context(
|
async with maybe_open_context(
|
||||||
acm_func=open_oi_feed,
|
acm_func=open_price_feed,
|
||||||
kwargs={
|
kwargs={
|
||||||
'instrument': instrument.split('.')[0]
|
'instrument': instrument.split('.')[0]
|
||||||
'instrument': instrument.split('.')[0]
|
|
||||||
},
|
},
|
||||||
key=f'{instrument.split('.')[0]}-price',
|
key=f'{instrument.split('.')[0]}-price',
|
||||||
key=f'{instrument.split('.')[0]}-price',
|
|
||||||
) as (cache_hit, feed):
|
) as (cache_hit, feed):
|
||||||
if cache_hit:
|
if cache_hit:
|
||||||
yield broadcast_receiver(feed, 10)
|
yield broadcast_receiver(feed, 10)
|
||||||
|
@ -884,6 +834,7 @@ async def maybe_open_oi_feed(
|
||||||
yield feed
|
yield feed
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
async def aio_open_interest_feed_relay(
|
async def aio_open_interest_feed_relay(
|
||||||
fh: FeedHandler,
|
fh: FeedHandler,
|
||||||
instruments: list[Symbol],
|
instruments: list[Symbol],
|
||||||
|
@ -955,7 +906,7 @@ async def aio_open_interest_feed_relay(
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def open_oi_feed(
|
async def open_oi_feed(
|
||||||
instruments: list[Symbol],
|
instruments: list[Symbol],
|
||||||
) -> to_asyncio.LinkedTaskChannel:
|
) -> to_asyncio.LinkedTaskChannel:
|
||||||
|
|
||||||
fh: FeedHandler
|
fh: FeedHandler
|
||||||
|
@ -976,7 +927,7 @@ async def open_oi_feed(
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def maybe_open_oi_feed(
|
async def maybe_open_oi_feed(
|
||||||
instruments: list[Symbol],
|
instruments: list[Symbol],
|
||||||
) -> trio.abc.ReceiveStream:
|
) -> trio.abc.ReceiveStream:
|
||||||
|
|
||||||
# TODO: add a predicate to maybe_open_context
|
# TODO: add a predicate to maybe_open_context
|
||||||
|
|
Loading…
Reference in New Issue