Refactor cryptofeed relay api and move it to client

Added submit_limit and submit_cancel
Cache syms correctly
Lowercase search results
size_in_shm_token
Guillermo Rodriguez 2022-08-23 14:00:52 -03:00
parent d60f222bb7
commit 80a1a58bfc
No known key found for this signature in database
GPG Key ID: EC3AB66D5D83B392
3 changed files with 279 additions and 167 deletions

View File

@ -32,8 +32,8 @@ from .feed import (
stream_quotes,
)
# from .broker import (
# trades_dialogue,
# norm_trade_records,
# trades_dialogue,
# norm_trade_records,
# )
__all__ = [
@ -50,7 +50,7 @@ __all__ = [
__enable_modules__: list[str] = [
'api',
'feed',
# 'broker',
# 'broker',
]
# passed to ``tractor.ActorNursery.start_actor()``

View File

@ -20,9 +20,11 @@ Deribit backend.
'''
import json
import time
import asyncio
from contextlib import asynccontextmanager as acm, AsyncExitStack
from itertools import count
from functools import partial
from datetime import datetime
from typing import Any, List, Dict, Optional, Iterable
@ -41,11 +43,24 @@ from .._util import resproc
from piker import config
from piker.log import get_logger
from tractor.trionics import broadcast_receiver, BroadcastReceiver
from tractor import to_asyncio
from cryptofeed import FeedHandler
from cryptofeed.defines import (
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
)
from cryptofeed.symbols import Symbol
log = get_logger(__name__)
_spawn_kwargs = {
'infect_asyncio': True,
}
_url = 'https://www.deribit.com'
_ws_url = 'wss://www.deribit.com/ws/api/v2'
_testnet_ws_url = 'wss://test.deribit.com/ws/api/v2'
@ -96,6 +111,8 @@ class Trade(Struct):
instrument_name: str
index_price: float
direction: str
combo_trade_id: Optional[int] = 0,
combo_id: Optional[str] = '',
amount: float
class LastTradesResult(Struct):
@ -108,6 +125,67 @@ def deribit_timestamp(when):
return int((when.timestamp() * 1000) + (when.microsecond / 1000))
def str_to_cb_sym(name: str) -> Symbol:
base, strike_price, expiry_date, option_type = name.split('-')
quote = base
if option_type == 'put':
option_type = PUT
elif option_type == 'call':
option_type = CALL
else:
raise Exception("Couldn\'t parse option type")
return Symbol(
base, quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date,
expiry_normalize=False)
def piker_sym_to_cb_sym(name: str) -> Symbol:
base, expiry_date, strike_price, option_type = tuple(
name.upper().split('-'))
quote = base
if option_type == 'P':
option_type = PUT
elif option_type == 'C':
option_type = CALL
else:
raise Exception("Couldn\'t parse option type")
return Symbol(
base, quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date.upper())
def cb_sym_to_deribit_inst(sym: Symbol):
# cryptofeed normalized
cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z']
# deribit specific
months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC']
exp = sym.expiry_date
# YYMDD
# 01234
year, month, day = (
exp[:2], months[cb_norm.index(exp[2:3])], exp[3:])
otype = 'C' if sym.option_type == CALL else 'P'
return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}'
def get_config() -> dict[str, Any]:
conf, path = config.load()
@ -126,7 +204,7 @@ def get_config() -> dict[str, Any]:
class Client:
def __init__(self, n: Nursery, ws: NoBsWs) -> None:
self._pairs: dict[str, Any] = {}
self._pairs: dict[str, Any] = None
config = get_config().get('deribit', {})
@ -148,6 +226,8 @@ class Client:
self._access_token: Optional[str] = None
self._refresh_token: Optional[str] = None
self.feeds = CryptoFeedRelay()
@property
def currencies(self):
return ['btc', 'eth', 'sol', 'usd']
@ -298,6 +378,33 @@ class Client:
return balances
async def submit_limit(
self,
symbol: str,
price: float,
action: str,
size: float
) -> dict:
"""Place an order
"""
params = {
'instrument_name': symbol.upper(),
'amount': size,
'type': 'limit',
'price': price,
}
resp = await self.json_rpc(
f'private/{action}', params)
return resp.result
async def submit_cancel(self, oid: str):
"""Send cancel request for order id
"""
resp = await self.json_rpc(
'private/cancel', {'order_id': oid})
return resp.result
async def symbol_info(
self,
instrument: Optional[str] = None,
@ -308,8 +415,8 @@ class Client:
"""Get symbol info for the exchange.
"""
# TODO: we can load from our self._pairs cache
# on repeat calls...
if self._pairs:
return self._pairs
# will retrieve all symbols by default
params = {
@ -322,7 +429,9 @@ class Client:
results = resp.result
instruments = {
item['instrument_name']: item for item in results}
item['instrument_name'].lower(): item
for item in results
}
if instrument is not None:
return instruments[instrument]
@ -342,10 +451,7 @@ class Client:
pattern: str,
limit: int = 30,
) -> dict[str, Any]:
if self._pairs is not None:
data = self._pairs
else:
data = await self.symbol_info()
data = await self.symbol_info()
matches = fuzzy.extractBests(
pattern,
@ -354,7 +460,7 @@ class Client:
limit=limit
)
# repack in dict form
return {item[0]['instrument_name']: item[0]
return {item[0]['instrument_name'].lower(): item[0]
for item in matches}
async def bars(
@ -437,3 +543,141 @@ async def get_client() -> Client:
await client.start_rpc()
await client.cache_symbols()
yield client
await client.feeds.stop()
class CryptoFeedRelay:
def __init__(self):
self._fh = FeedHandler(config=get_config())
self._price_streams: dict[str, BroadcastReceiver] = {}
self._order_stream: Optional[BroadcastReceiver] = None
self._loop = None
async def stop(self):
await to_asyncio.run_task(
partial(self._fh.stop_async, loop=self._loop))
@acm
async def open_price_feed(
self,
instruments: List[str]
) -> trio.abc.ReceiveStream:
inst_str = ','.join(instruments)
instruments = [piker_sym_to_cb_sym(i) for i in instruments]
if inst_str in self._price_streams:
# TODO: a good value for maxlen?
yield broadcast_receiver(self._price_streams[inst_str], 10)
else:
async def relay(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _trade(data: dict, receipt_timestamp):
to_trio.send_nowait(('trade', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'last': data,
'broker_ts': time.time(),
'data': data.to_dict(),
'receipt': receipt_timestamp
}))
async def _l1(data: dict, receipt_timestamp):
to_trio.send_nowait(('l1', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'ticks': [
{'type': 'bid',
'price': float(data.bid_price), 'size': float(data.bid_size)},
{'type': 'bsize',
'price': float(data.bid_price), 'size': float(data.bid_size)},
{'type': 'ask',
'price': float(data.ask_price), 'size': float(data.ask_size)},
{'type': 'asize',
'price': float(data.ask_price), 'size': float(data.ask_size)}
]
}))
self._fh.add_feed(
DERIBIT,
channels=[TRADES, L1_BOOK],
symbols=instruments,
callbacks={
TRADES: _trade,
L1_BOOK: _l1
})
if not self._fh.running:
self._fh.run(start_loop=False)
self._loop = asyncio.get_event_loop()
# sync with trio
to_trio.send_nowait(None)
try:
await asyncio.sleep(float('inf'))
except asyncio.exceptions.CancelledError:
...
async with to_asyncio.open_channel_from(
relay
) as (first, chan):
self._price_streams[inst_str] = chan
yield self._price_streams[inst_str]
@acm
async def open_order_feed(
self,
instruments: List[str]
) -> trio.abc.ReceiveStream:
inst_str = ','.join(instruments)
instruments = [piker_sym_to_cb_sym(i) for i in instruments]
if self._order_stream:
yield broadcast_receiver(self._order_streams[inst_str], 10)
else:
async def relay(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _fill(data: dict, receipt_timestamp):
breakpoint()
async def _order_info(data: dict, receipt_timestamp):
breakpoint()
self._fh.add_feed(
DERIBIT,
channels=[FILLS, ORDER_INFO],
symbols=instruments,
callbacks={
FILLS: _fill,
ORDER_INFO: _order_info,
})
if not self._fh.running:
self._fh.run(start_loop=False)
self._loop = asyncio.get_event_loop()
# sync with trio
to_trio.send_nowait(None)
try:
await asyncio.sleep(float('inf'))
except asyncio.exceptions.CancelledError:
...
async with to_asyncio.open_channel_from(
relay
) as (first, chan):
self._order_stream = chan
yield self._order_stream

View File

@ -18,9 +18,6 @@
Deribit backend.
'''
import asyncio
from async_generator import aclosing
from contextlib import asynccontextmanager as acm
from datetime import datetime
from typing import Any, Optional, List, Callable
@ -32,7 +29,6 @@ import pendulum
from fuzzywuzzy import process as fuzzy
import numpy as np
import tractor
from tractor import to_asyncio
from piker._cacheables import open_cached_client
from piker.log import get_logger, get_console_log
@ -49,7 +45,11 @@ from cryptofeed.defines import (
)
from cryptofeed.symbols import Symbol
from .api import Client, Trade, get_config
from .api import (
Client, Trade,
get_config,
str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst
)
_spawn_kwargs = {
'infect_asyncio': True,
@ -59,145 +59,6 @@ _spawn_kwargs = {
log = get_logger(__name__)
_url = 'https://www.deribit.com'
def str_to_cb_sym(name: str) -> Symbol:
base, strike_price, expiry_date, option_type = name.split('-')
quote = base
if option_type == 'put':
option_type = PUT
elif option_type == 'call':
option_type = CALL
else:
raise BaseException("Couldn\'t parse option type")
return Symbol(
base, quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date,
expiry_normalize=False)
def piker_sym_to_cb_sym(name: str) -> Symbol:
base, expiry_date, strike_price, option_type = tuple(
name.upper().split('-'))
quote = base
if option_type == 'P':
option_type = PUT
elif option_type == 'C':
option_type = CALL
else:
raise BaseException("Couldn\'t parse option type")
return Symbol(
base, quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date.upper())
def cb_sym_to_deribit_inst(sym: Symbol):
# cryptofeed normalized
cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z']
# deribit specific
months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC']
exp = sym.expiry_date
# YYMDD
# 01234
year, month, day = (
exp[:2], months[cb_norm.index(exp[2:3])], exp[3:])
otype = 'C' if sym.option_type == CALL else 'P'
return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}'
# inside here we are in an asyncio context
async def open_aio_cryptofeed_relay(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
instruments: List[str] = []
) -> None:
instruments = [piker_sym_to_cb_sym(i) for i in instruments]
async def trade_cb(data: dict, receipt_timestamp):
to_trio.send_nowait(('trade', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'last': data,
'broker_ts': time.time(),
'data': data.to_dict(),
'receipt': receipt_timestamp
}))
async def l1_book_cb(data: dict, receipt_timestamp):
to_trio.send_nowait(('l1', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'ticks': [
{'type': 'bid',
'price': float(data.bid_price), 'size': float(data.bid_size)},
{'type': 'bsize',
'price': float(data.bid_price), 'size': float(data.bid_size)},
{'type': 'ask',
'price': float(data.ask_price), 'size': float(data.ask_size)},
{'type': 'asize',
'price': float(data.ask_price), 'size': float(data.ask_size)}
]
}))
fh = FeedHandler(config=get_config())
fh.run(start_loop=False)
fh.add_feed(
DERIBIT,
channels=[L1_BOOK],
symbols=instruments,
callbacks={L1_BOOK: l1_book_cb})
fh.add_feed(
DERIBIT,
channels=[TRADES],
symbols=instruments,
callbacks={TRADES: trade_cb})
# sync with trio
to_trio.send_nowait(None)
try:
await asyncio.sleep(float('inf'))
except asyncio.exceptions.CancelledError:
...
@acm
async def open_cryptofeeds(
instruments: List[str]
) -> trio.abc.ReceiveStream:
async with to_asyncio.open_channel_from(
open_aio_cryptofeed_relay,
instruments=instruments,
) as (first, chan):
yield chan
@acm
async def open_history_client(
instrument: str,
@ -265,8 +126,7 @@ async def stream_quotes(
async with (
open_cached_client('deribit') as client,
send_chan as send_chan,
open_cryptofeeds(symbols) as stream
send_chan as send_chan
):
init_msgs = {
@ -284,20 +144,23 @@ async def stream_quotes(
nsym = piker_sym_to_cb_sym(sym)
# keep client cached for real-time section
cache = await client.cache_symbols()
async with client.feeds.open_price_feed(
symbols) as stream:
cache = await client.cache_symbols()
async with aclosing(stream):
last_trades = (await client.last_trades(
cb_sym_to_deribit_inst(nsym), count=1)).trades
if len(last_trades) == 0:
async for typ, quote in stream:
last_trade = None
while not last_trade:
typ, quote = await stream.receive()
if typ == 'trade':
last_trade = Trade(**quote['data'])
last_trade = Trade(**(quote['data']))
else:
last_trade = Trade(**last_trades[0])
last_trade = Trade(**(last_trades[0]))
first_quote = {
'symbol': sym,
@ -314,9 +177,14 @@ async def stream_quotes(
feed_is_live.set()
async for typ, quote in stream:
topic = quote['symbol']
await send_chan.send({topic: quote})
try:
while True:
typ, quote = await stream.receive()
topic = quote['symbol']
await send_chan.send({topic: quote})
except trio.ClosedResourceError:
...
@tractor.context