fix syms for venues.

little refactor in get_config, and created get_fh_config for cryptofeed.
jsonrpc_err_in_rent_task
Nelson Torres 2024-11-04 13:37:05 +00:00
parent 8b0f1e7045
commit 499b2d0090
2 changed files with 30 additions and 13 deletions

View File

@ -219,9 +219,11 @@ def get_config() -> dict[str, Any]:
touch_if_dne=True, touch_if_dne=True,
) )
section: dict = {} section: dict = {}
section['deribit'] = conf.get('deribit') section = conf.get('deribit')
section['log'] = {} section['log'] = {}
section['log']['filename'] = 'feedhandler.log'
section['log']['level'] = 'DEBUG'
section['log']['disabled'] = True section['log']['disabled'] = True
if section is None: if section is None:
@ -230,6 +232,22 @@ def get_config() -> dict[str, Any]:
return section return section
def get_fh_config() -> dict[str, Any]:
conf_option = get_config().get('option', {})
conf_log = get_config().get('log', {})
return {
'log': {
'filename': conf_log.get('filename'),
'level': conf_log.get('level'),
'disabled': conf_log.get('disabled')
},
'deribit': {
'key_id': conf_option.get('api_key'),
'key_secret': conf_option.get('api_secret')
}
}
class Client: class Client:
@ -241,10 +259,10 @@ class Client:
) -> None: ) -> None:
self._pairs: dict[str, Any] = None self._pairs: dict[str, Any] = None
config = get_config().get('deribit', {}) config = get_config().get('option', {})
self._key_id = config.get('key_id') self._key_id = config.get('api_key')
self._key_secret = config.get('key_secret') self._key_secret = config.get('api_secret')
self.json_rpc = json_rpc self.json_rpc = json_rpc
@ -466,7 +484,6 @@ async def get_client(
) as json_rpc ) as json_rpc
): ):
client = Client(json_rpc) client = Client(json_rpc)
_refresh_token: Optional[str] = None _refresh_token: Optional[str] = None
_access_token: Optional[str] = None _access_token: Optional[str] = None
@ -536,7 +553,7 @@ async def get_client(
@acm @acm
async def open_feed_handler(): async def open_feed_handler():
fh = FeedHandler(config=get_config()) fh = FeedHandler(config=get_fh_config())
yield fh yield fh
await to_asyncio.run_task(fh.stop_async) await to_asyncio.run_task(fh.stop_async)
@ -581,11 +598,11 @@ async def aio_price_feed_relay(
'price': float(data.ask_price), 'size': float(data.ask_size)} 'price': float(data.ask_price), 'size': float(data.ask_size)}
] ]
})) }))
sym: Symbol = piker_sym_to_cb_sym(instrument)
fh.add_feed( fh.add_feed(
DERIBIT, DERIBIT,
channels=[TRADES, L1_BOOK], channels=[TRADES, L1_BOOK],
symbols=[piker_sym_to_cb_sym(instrument)], symbols=[sym],
callbacks={ callbacks={
TRADES: _trade, TRADES: _trade,
L1_BOOK: _l1 L1_BOOK: _l1
@ -626,9 +643,9 @@ async def maybe_open_price_feed(
async with maybe_open_context( async with maybe_open_context(
acm_func=open_price_feed, acm_func=open_price_feed,
kwargs={ kwargs={
'instrument': instrument 'instrument': instrument.split('.')[0]
}, },
key=f'{instrument}-price', key=f'{instrument.split('.')[0]}-price',
) as (cache_hit, feed): ) as (cache_hit, feed):
if cache_hit: if cache_hit:
yield broadcast_receiver(feed, 10) yield broadcast_receiver(feed, 10)
@ -693,10 +710,10 @@ async def maybe_open_order_feed(
async with maybe_open_context( async with maybe_open_context(
acm_func=open_order_feed, acm_func=open_order_feed,
kwargs={ kwargs={
'instrument': instrument, 'instrument': instrument.split('.')[0],
'fh': fh 'fh': fh
}, },
key=f'{instrument}-order', key=f'{instrument.split('.')[0]}-order',
) as (cache_hit, feed): ) as (cache_hit, feed):
if cache_hit: if cache_hit:
yield broadcast_receiver(feed, 10) yield broadcast_receiver(feed, 10)

View File

@ -156,7 +156,7 @@ async def stream_quotes(
}, },
} }
nsym = piker_sym_to_cb_sym(sym) nsym = piker_sym_to_cb_sym(sym.split('.')[0])
async with maybe_open_price_feed(sym) as stream: async with maybe_open_price_feed(sym) as stream: