Fruther generalize json_rpc hook mechanic to allow for multi hook, Add new maybe_open_ticker_feed to stream greeks, iv, open interest of an instrument

deribit_updates
Guillermo Rodriguez 2023-03-10 13:25:40 -03:00
parent fef8073113
commit 77fbc7eb86
No known key found for this signature in database
GPG Key ID: EC3AB66D5D83B392
2 changed files with 86 additions and 71 deletions

View File

@ -152,7 +152,7 @@ class Client:
def __init__(
self,
json_rpc: Callable,
update_hooks: Callable,
append_hooks: Callable,
update_types: Callable,
) -> None:
@ -169,7 +169,7 @@ class Client:
self._key_secret = None
self.json_rpc = json_rpc
self.update_hooks = update_hooks
self.append_hooks = append_hooks
self.update_types = update_types
@property
@ -490,6 +490,7 @@ async def open_price_feed(
}]
}
))
return True
elif chan == book_chan:
bid, bsize = data['bids'][0]
@ -504,11 +505,14 @@ async def open_price_feed(
{'type': 'asize', 'price': ask, 'size': asize}
]}
))
return True
return False
async with open_cached_client('deribit') as client:
client.update_hooks({
'request': sub_hook
client.append_hooks({
'request': [sub_hook]
})
client.update_types({
'request': JSONRPCSubRequest
@ -517,12 +521,15 @@ async def open_price_feed(
resp = await client.json_rpc(
'private/subscribe', {'channels': channels})
assert resp.result == channels
assert not resp.error
log.info(f'Subscribed to {channels}')
yield recv_chann
resp = await client.json_rpc('private/unsubscribe', {'channels': channels})
assert not resp.error
@acm
async def maybe_open_price_feed(
@ -543,71 +550,64 @@ async def maybe_open_price_feed(
yield feed
# TODO: order broker support: this is all draft code from @guilledk B)
@acm
async def open_ticker_feed(
instrument: str
) -> trio.abc.ReceiveStream:
# async def aio_order_feed_relay(
# fh: FeedHandler,
# instrument: Symbol,
# from_trio: asyncio.Queue,
# to_trio: trio.abc.SendChannel,
instrument_db = sym_fmt_piker_to_deribit(instrument)
# ) -> None:
# async def _fill(data: dict, receipt_timestamp):
# breakpoint()
ticker_chan = f'incremental_ticker.{instrument_db}'
# async def _order_info(data: dict, receipt_timestamp):
# breakpoint()
channels = [ticker_chan]
# fh.add_feed(
# DERIBIT,
# channels=[FILLS, ORDER_INFO],
# symbols=[instrument.upper()],
# callbacks={
# FILLS: _fill,
# ORDER_INFO: _order_info,
# })
send_chann, recv_chann = trio.open_memory_channel(0)
async def sub_hook(msg):
chann = msg.params['channel']
if chann == ticker_chan:
data = msg.params['data']
await send_chann.send((
'ticker', {
'symbol': instrument,
'data': data
}
))
return True
# if not fh.running:
# fh.run(
# start_loop=False,
# install_signal_handlers=False)
return False
# # sync with trio
# to_trio.send_nowait(None)
async with open_cached_client('deribit') as client:
# await asyncio.sleep(float('inf'))
client.append_hooks({
'request': [sub_hook]
})
resp = await client.json_rpc(
'private/subscribe', {'channels': channels})
# @acm
# async def open_order_feed(
# instrument: list[str]
# ) -> trio.abc.ReceiveStream:
# async with maybe_open_feed_handler() as fh:
# async with to_asyncio.open_channel_from(
# partial(
# aio_order_feed_relay,
# fh,
# instrument
# )
# ) as (first, chan):
# yield chan
assert not resp.error
log.info(f'Subscribed to {channels}')
# @acm
# async def maybe_open_order_feed(
# instrument: str
# ) -> trio.abc.ReceiveStream:
yield recv_chann
# # TODO: add a predicate to maybe_open_context
# async with maybe_open_context(
# acm_func=open_order_feed,
# kwargs={
# 'instrument': instrument,
# 'fh': fh
# },
# key=f'{instrument}-order',
# ) as (cache_hit, feed):
# if cache_hit:
# yield broadcast_receiver(feed, 10)
# else:
# yield feed
resp = await client.json_rpc('private/unsubscribe', {'channels': channels})
assert not resp.error
@acm
async def maybe_open_ticker_feed(
instrument: str
) -> trio.abc.ReceiveStream:
async with maybe_open_context(
acm_func=open_ticker_feed,
kwargs={
'instrument': instrument
},
key=f'{instrument}-ticker',
) as (cache_hit, feed):
if cache_hit:
yield broadcast_receiver(feed, 10)
else:
yield feed

View File

@ -209,9 +209,17 @@ async def open_jsonrpc_session(
raise ValueError(
'Need to path both a request_type and request_hook')
req_hooks = []
if request_hook:
req_hooks.append(request_hook)
err_hooks = []
if error_hook:
err_hooks.append(error_hook)
hook_table = {
'request': request_hook,
'error': error_hook
'request': req_hooks,
'error': err_hooks
}
types_table = {
@ -219,9 +227,10 @@ async def open_jsonrpc_session(
'request': request_type
}
def update_hooks(new_hooks: dict):
def append_hooks(new_hooks: dict):
nonlocal hook_table
hook_table.update(new_hooks)
for htype, hooks in new_hooks.items():
hook_table[htype] += hooks
def update_types(new_types: dict):
nonlocal types_table
@ -234,7 +243,7 @@ async def open_jsonrpc_session(
rpc_id: Iterable = count(start_id)
rpc_results: dict[int, dict] = {}
async def json_rpc(method: str, params: dict) -> dict:
async def json_rpc(method: str, params: dict = {}) -> dict:
'''
perform a json rpc call and wait for the result, raise exception in
case of error field present on response
@ -303,19 +312,25 @@ async def open_jsonrpc_session(
'params': _,
}:
log.info(f'Recieved\n{msg}')
if hook_table['request']:
await hook_table['request'](types_table['request'](**msg))
if len(hook_table['request']) > 0:
for hook in hook_table['request']:
result = await hook(types_table['request'](**msg))
if result:
break
case {
'error': error,
}:
log.warning(f'Recieved\n{error}')
if hook_table['error']:
await hook_table['error'](types_table['response'](**msg))
if len(hook_table['error']) > 0:
for hook in hook_table['error']:
result = await hook(types_table['response'](**msg))
if result:
break
case _:
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
n.start_soon(recv_task)
yield json_rpc, update_hooks, update_types
yield json_rpc, append_hooks, update_types
n.cancel_scope.cancel()