From 77fbc7eb86401844b0a644835b13fec3051e9098 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Fri, 10 Mar 2023 13:25:40 -0300 Subject: [PATCH] Fruther generalize json_rpc hook mechanic to allow for multi hook, Add new maybe_open_ticker_feed to stream greeks, iv, open interest of an instrument --- piker/brokers/deribit/api.py | 122 +++++++++++++++++------------------ piker/data/_web_bs.py | 35 +++++++--- 2 files changed, 86 insertions(+), 71 deletions(-) diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index 321908c9..628a37f8 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -152,7 +152,7 @@ class Client: def __init__( self, json_rpc: Callable, - update_hooks: Callable, + append_hooks: Callable, update_types: Callable, ) -> None: @@ -169,7 +169,7 @@ class Client: self._key_secret = None self.json_rpc = json_rpc - self.update_hooks = update_hooks + self.append_hooks = append_hooks self.update_types = update_types @property @@ -490,6 +490,7 @@ async def open_price_feed( }] } )) + return True elif chan == book_chan: bid, bsize = data['bids'][0] @@ -504,11 +505,14 @@ async def open_price_feed( {'type': 'asize', 'price': ask, 'size': asize} ]} )) + return True + + return False async with open_cached_client('deribit') as client: - client.update_hooks({ - 'request': sub_hook + client.append_hooks({ + 'request': [sub_hook] }) client.update_types({ 'request': JSONRPCSubRequest @@ -517,12 +521,15 @@ async def open_price_feed( resp = await client.json_rpc( 'private/subscribe', {'channels': channels}) - assert resp.result == channels + assert not resp.error log.info(f'Subscribed to {channels}') yield recv_chann + resp = await client.json_rpc('private/unsubscribe', {'channels': channels}) + + assert not resp.error @acm async def maybe_open_price_feed( @@ -543,71 +550,64 @@ async def maybe_open_price_feed( yield feed -# TODO: order broker support: this is all draft code from @guilledk B) +@acm +async def open_ticker_feed( + instrument: str +) -> trio.abc.ReceiveStream: -# async def aio_order_feed_relay( -# fh: FeedHandler, -# instrument: Symbol, -# from_trio: asyncio.Queue, -# to_trio: trio.abc.SendChannel, + instrument_db = sym_fmt_piker_to_deribit(instrument) -# ) -> None: -# async def _fill(data: dict, receipt_timestamp): -# breakpoint() + ticker_chan = f'incremental_ticker.{instrument_db}' -# async def _order_info(data: dict, receipt_timestamp): -# breakpoint() + channels = [ticker_chan] -# fh.add_feed( -# DERIBIT, -# channels=[FILLS, ORDER_INFO], -# symbols=[instrument.upper()], -# callbacks={ -# FILLS: _fill, -# ORDER_INFO: _order_info, -# }) + send_chann, recv_chann = trio.open_memory_channel(0) + async def sub_hook(msg): + chann = msg.params['channel'] + if chann == ticker_chan: + data = msg.params['data'] + await send_chann.send(( + 'ticker', { + 'symbol': instrument, + 'data': data + } + )) + return True -# if not fh.running: -# fh.run( -# start_loop=False, -# install_signal_handlers=False) + return False -# # sync with trio -# to_trio.send_nowait(None) + async with open_cached_client('deribit') as client: -# await asyncio.sleep(float('inf')) + client.append_hooks({ + 'request': [sub_hook] + }) + resp = await client.json_rpc( + 'private/subscribe', {'channels': channels}) -# @acm -# async def open_order_feed( -# instrument: list[str] -# ) -> trio.abc.ReceiveStream: -# async with maybe_open_feed_handler() as fh: -# async with to_asyncio.open_channel_from( -# partial( -# aio_order_feed_relay, -# fh, -# instrument -# ) -# ) as (first, chan): -# yield chan + assert not resp.error + log.info(f'Subscribed to {channels}') -# @acm -# async def maybe_open_order_feed( -# instrument: str -# ) -> trio.abc.ReceiveStream: + yield recv_chann -# # TODO: add a predicate to maybe_open_context -# async with maybe_open_context( -# acm_func=open_order_feed, -# kwargs={ -# 'instrument': instrument, -# 'fh': fh -# }, -# key=f'{instrument}-order', -# ) as (cache_hit, feed): -# if cache_hit: -# yield broadcast_receiver(feed, 10) -# else: -# yield feed + resp = await client.json_rpc('private/unsubscribe', {'channels': channels}) + + assert not resp.error + +@acm +async def maybe_open_ticker_feed( + instrument: str +) -> trio.abc.ReceiveStream: + + async with maybe_open_context( + acm_func=open_ticker_feed, + kwargs={ + 'instrument': instrument + }, + key=f'{instrument}-ticker', + ) as (cache_hit, feed): + if cache_hit: + yield broadcast_receiver(feed, 10) + else: + yield feed diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 41aab6ac..2780a75a 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -209,9 +209,17 @@ async def open_jsonrpc_session( raise ValueError( 'Need to path both a request_type and request_hook') + req_hooks = [] + if request_hook: + req_hooks.append(request_hook) + + err_hooks = [] + if error_hook: + err_hooks.append(error_hook) + hook_table = { - 'request': request_hook, - 'error': error_hook + 'request': req_hooks, + 'error': err_hooks } types_table = { @@ -219,9 +227,10 @@ async def open_jsonrpc_session( 'request': request_type } - def update_hooks(new_hooks: dict): + def append_hooks(new_hooks: dict): nonlocal hook_table - hook_table.update(new_hooks) + for htype, hooks in new_hooks.items(): + hook_table[htype] += hooks def update_types(new_types: dict): nonlocal types_table @@ -234,7 +243,7 @@ async def open_jsonrpc_session( rpc_id: Iterable = count(start_id) rpc_results: dict[int, dict] = {} - async def json_rpc(method: str, params: dict) -> dict: + async def json_rpc(method: str, params: dict = {}) -> dict: ''' perform a json rpc call and wait for the result, raise exception in case of error field present on response @@ -303,19 +312,25 @@ async def open_jsonrpc_session( 'params': _, }: log.info(f'Recieved\n{msg}') - if hook_table['request']: - await hook_table['request'](types_table['request'](**msg)) + if len(hook_table['request']) > 0: + for hook in hook_table['request']: + result = await hook(types_table['request'](**msg)) + if result: + break case { 'error': error, }: log.warning(f'Recieved\n{error}') - if hook_table['error']: - await hook_table['error'](types_table['response'](**msg)) + if len(hook_table['error']) > 0: + for hook in hook_table['error']: + result = await hook(types_table['response'](**msg)) + if result: + break case _: log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') n.start_soon(recv_task) - yield json_rpc, update_hooks, update_types + yield json_rpc, append_hooks, update_types n.cancel_scope.cancel()