diff --git a/piker/brokers/deribit/__init__.py b/piker/brokers/deribit/__init__.py index 6794482e..4ce8f212 100644 --- a/piker/brokers/deribit/__init__.py +++ b/piker/brokers/deribit/__init__.py @@ -32,8 +32,8 @@ from .feed import ( stream_quotes, ) # from .broker import ( -# trades_dialogue, -# norm_trade_records, + # trades_dialogue, + # norm_trade_records, # ) __all__ = [ @@ -50,7 +50,7 @@ __all__ = [ __enable_modules__: list[str] = [ 'api', 'feed', -# 'broker', + 'broker', ] # passed to ``tractor.ActorNursery.start_actor()`` diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index 1e783b15..8febfb33 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -20,9 +20,11 @@ Deribit backend. ''' import json import time +import asyncio from contextlib import asynccontextmanager as acm, AsyncExitStack from itertools import count +from functools import partial from datetime import datetime from typing import Any, List, Dict, Optional, Iterable @@ -41,11 +43,24 @@ from .._util import resproc from piker import config from piker.log import get_logger +from tractor.trionics import broadcast_receiver, BroadcastReceiver +from tractor import to_asyncio + +from cryptofeed import FeedHandler + +from cryptofeed.defines import ( + DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT +) from cryptofeed.symbols import Symbol log = get_logger(__name__) +_spawn_kwargs = { + 'infect_asyncio': True, +} + + _url = 'https://www.deribit.com' _ws_url = 'wss://www.deribit.com/ws/api/v2' _testnet_ws_url = 'wss://test.deribit.com/ws/api/v2' @@ -96,6 +111,8 @@ class Trade(Struct): instrument_name: str index_price: float direction: str + combo_trade_id: Optional[int] = 0, + combo_id: Optional[str] = '', amount: float class LastTradesResult(Struct): @@ -108,6 +125,67 @@ def deribit_timestamp(when): return int((when.timestamp() * 1000) + (when.microsecond / 1000)) +def str_to_cb_sym(name: str) -> Symbol: + base, strike_price, expiry_date, option_type = name.split('-') + + quote = base + + if option_type == 'put': + option_type = PUT + elif option_type == 'call': + option_type = CALL + else: + raise Exception("Couldn\'t parse option type") + + return Symbol( + base, quote, + type=OPTION, + strike_price=strike_price, + option_type=option_type, + expiry_date=expiry_date, + expiry_normalize=False) + + +def piker_sym_to_cb_sym(name: str) -> Symbol: + base, expiry_date, strike_price, option_type = tuple( + name.upper().split('-')) + + quote = base + + if option_type == 'P': + option_type = PUT + elif option_type == 'C': + option_type = CALL + else: + raise Exception("Couldn\'t parse option type") + + return Symbol( + base, quote, + type=OPTION, + strike_price=strike_price, + option_type=option_type, + expiry_date=expiry_date.upper()) + + +def cb_sym_to_deribit_inst(sym: Symbol): + # cryptofeed normalized + cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z'] + + # deribit specific + months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC'] + + exp = sym.expiry_date + + # YYMDD + # 01234 + year, month, day = ( + exp[:2], months[cb_norm.index(exp[2:3])], exp[3:]) + + otype = 'C' if sym.option_type == CALL else 'P' + + return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}' + + def get_config() -> dict[str, Any]: conf, path = config.load() @@ -126,7 +204,7 @@ def get_config() -> dict[str, Any]: class Client: def __init__(self, n: Nursery, ws: NoBsWs) -> None: - self._pairs: dict[str, Any] = {} + self._pairs: dict[str, Any] = None config = get_config().get('deribit', {}) @@ -148,6 +226,8 @@ class Client: self._access_token: Optional[str] = None self._refresh_token: Optional[str] = None + self.feeds = CryptoFeedRelay() + @property def currencies(self): return ['btc', 'eth', 'sol', 'usd'] @@ -298,6 +378,33 @@ class Client: return balances + async def submit_limit( + self, + symbol: str, + price: float, + action: str, + size: float + ) -> dict: + """Place an order + """ + params = { + 'instrument_name': symbol.upper(), + 'amount': size, + 'type': 'limit', + 'price': price, + } + resp = await self.json_rpc( + f'private/{action}', params) + + return resp.result + + async def submit_cancel(self, oid: str): + """Send cancel request for order id + """ + resp = await self.json_rpc( + 'private/cancel', {'order_id': oid}) + return resp.result + async def symbol_info( self, instrument: Optional[str] = None, @@ -308,8 +415,8 @@ class Client: """Get symbol info for the exchange. """ - # TODO: we can load from our self._pairs cache - # on repeat calls... + if self._pairs: + return self._pairs # will retrieve all symbols by default params = { @@ -322,7 +429,9 @@ class Client: results = resp.result instruments = { - item['instrument_name']: item for item in results} + item['instrument_name'].lower(): item + for item in results + } if instrument is not None: return instruments[instrument] @@ -342,10 +451,7 @@ class Client: pattern: str, limit: int = 30, ) -> dict[str, Any]: - if self._pairs is not None: - data = self._pairs - else: - data = await self.symbol_info() + data = await self.symbol_info() matches = fuzzy.extractBests( pattern, @@ -354,7 +460,7 @@ class Client: limit=limit ) # repack in dict form - return {item[0]['instrument_name']: item[0] + return {item[0]['instrument_name'].lower(): item[0] for item in matches} async def bars( @@ -437,3 +543,141 @@ async def get_client() -> Client: await client.start_rpc() await client.cache_symbols() yield client + await client.feeds.stop() + + +class CryptoFeedRelay: + + def __init__(self): + self._fh = FeedHandler(config=get_config()) + + self._price_streams: dict[str, BroadcastReceiver] = {} + self._order_stream: Optional[BroadcastReceiver] = None + + self._loop = None + + async def stop(self): + await to_asyncio.run_task( + partial(self._fh.stop_async, loop=self._loop)) + + @acm + async def open_price_feed( + self, + instruments: List[str] + ) -> trio.abc.ReceiveStream: + inst_str = ','.join(instruments) + instruments = [piker_sym_to_cb_sym(i) for i in instruments] + + if inst_str in self._price_streams: + # TODO: a good value for maxlen? + yield broadcast_receiver(self._price_streams[inst_str], 10) + + else: + async def relay( + from_trio: asyncio.Queue, + to_trio: trio.abc.SendChannel, + ) -> None: + async def _trade(data: dict, receipt_timestamp): + to_trio.send_nowait(('trade', { + 'symbol': cb_sym_to_deribit_inst( + str_to_cb_sym(data.symbol)).lower(), + 'last': data, + 'broker_ts': time.time(), + 'data': data.to_dict(), + 'receipt': receipt_timestamp + })) + + async def _l1(data: dict, receipt_timestamp): + to_trio.send_nowait(('l1', { + 'symbol': cb_sym_to_deribit_inst( + str_to_cb_sym(data.symbol)).lower(), + 'ticks': [ + {'type': 'bid', + 'price': float(data.bid_price), 'size': float(data.bid_size)}, + {'type': 'bsize', + 'price': float(data.bid_price), 'size': float(data.bid_size)}, + {'type': 'ask', + 'price': float(data.ask_price), 'size': float(data.ask_size)}, + {'type': 'asize', + 'price': float(data.ask_price), 'size': float(data.ask_size)} + ] + })) + + self._fh.add_feed( + DERIBIT, + channels=[TRADES, L1_BOOK], + symbols=instruments, + callbacks={ + TRADES: _trade, + L1_BOOK: _l1 + }) + + if not self._fh.running: + self._fh.run(start_loop=False) + self._loop = asyncio.get_event_loop() + + # sync with trio + to_trio.send_nowait(None) + + try: + await asyncio.sleep(float('inf')) + + except asyncio.exceptions.CancelledError: + ... + + async with to_asyncio.open_channel_from( + relay + ) as (first, chan): + self._price_streams[inst_str] = chan + yield self._price_streams[inst_str] + + @acm + async def open_order_feed( + self, + instruments: List[str] + ) -> trio.abc.ReceiveStream: + + inst_str = ','.join(instruments) + instruments = [piker_sym_to_cb_sym(i) for i in instruments] + + if self._order_stream: + yield broadcast_receiver(self._order_streams[inst_str], 10) + + else: + async def relay( + from_trio: asyncio.Queue, + to_trio: trio.abc.SendChannel, + ) -> None: + async def _fill(data: dict, receipt_timestamp): + breakpoint() + + async def _order_info(data: dict, receipt_timestamp): + breakpoint() + + self._fh.add_feed( + DERIBIT, + channels=[FILLS, ORDER_INFO], + symbols=instruments, + callbacks={ + FILLS: _fill, + ORDER_INFO: _order_info, + }) + + if not self._fh.running: + self._fh.run(start_loop=False) + self._loop = asyncio.get_event_loop() + + # sync with trio + to_trio.send_nowait(None) + + try: + await asyncio.sleep(float('inf')) + + except asyncio.exceptions.CancelledError: + ... + + async with to_asyncio.open_channel_from( + relay + ) as (first, chan): + self._order_stream = chan + yield self._order_stream diff --git a/piker/brokers/deribit/broker.py b/piker/brokers/deribit/broker.py index e36a06eb..8dd77540 100644 --- a/piker/brokers/deribit/broker.py +++ b/piker/brokers/deribit/broker.py @@ -18,6 +18,10 @@ Order api and machinery ''' +from typing import Any, AsyncIterator + +import tractor + @tractor.context async def trades_dialogue( @@ -29,9 +33,12 @@ async def trades_dialogue( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) - async with get_client() as client: - + async with open_cached_client('deribit') as client: if not client._key_id: raise RuntimeError('Missing Deribit API key in `brokers.toml`!?!?') - ... + acc_name = f'deribit.{client._key_id}' + + await client.cache_symbols() + + breakpoint() diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index a4ff9ae1..76f99ec1 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -18,9 +18,6 @@ Deribit backend. ''' - -import asyncio -from async_generator import aclosing from contextlib import asynccontextmanager as acm from datetime import datetime from typing import Any, Optional, List, Callable @@ -32,7 +29,6 @@ import pendulum from fuzzywuzzy import process as fuzzy import numpy as np import tractor -from tractor import to_asyncio from piker._cacheables import open_cached_client from piker.log import get_logger, get_console_log @@ -49,7 +45,11 @@ from cryptofeed.defines import ( ) from cryptofeed.symbols import Symbol -from .api import Client, Trade, get_config +from .api import ( + Client, Trade, + get_config, + str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst +) _spawn_kwargs = { 'infect_asyncio': True, @@ -59,145 +59,6 @@ _spawn_kwargs = { log = get_logger(__name__) -_url = 'https://www.deribit.com' - - -def str_to_cb_sym(name: str) -> Symbol: - base, strike_price, expiry_date, option_type = name.split('-') - - quote = base - - if option_type == 'put': - option_type = PUT - elif option_type == 'call': - option_type = CALL - else: - raise BaseException("Couldn\'t parse option type") - - return Symbol( - base, quote, - type=OPTION, - strike_price=strike_price, - option_type=option_type, - expiry_date=expiry_date, - expiry_normalize=False) - - - -def piker_sym_to_cb_sym(name: str) -> Symbol: - base, expiry_date, strike_price, option_type = tuple( - name.upper().split('-')) - - quote = base - - if option_type == 'P': - option_type = PUT - elif option_type == 'C': - option_type = CALL - else: - raise BaseException("Couldn\'t parse option type") - - return Symbol( - base, quote, - type=OPTION, - strike_price=strike_price, - option_type=option_type, - expiry_date=expiry_date.upper()) - - -def cb_sym_to_deribit_inst(sym: Symbol): - # cryptofeed normalized - cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z'] - - # deribit specific - months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC'] - - exp = sym.expiry_date - - # YYMDD - # 01234 - year, month, day = ( - exp[:2], months[cb_norm.index(exp[2:3])], exp[3:]) - - otype = 'C' if sym.option_type == CALL else 'P' - - return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}' - - -# inside here we are in an asyncio context -async def open_aio_cryptofeed_relay( - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, - instruments: List[str] = [] -) -> None: - - instruments = [piker_sym_to_cb_sym(i) for i in instruments] - - async def trade_cb(data: dict, receipt_timestamp): - to_trio.send_nowait(('trade', { - 'symbol': cb_sym_to_deribit_inst( - str_to_cb_sym(data.symbol)).lower(), - 'last': data, - 'broker_ts': time.time(), - 'data': data.to_dict(), - 'receipt': receipt_timestamp - })) - - async def l1_book_cb(data: dict, receipt_timestamp): - to_trio.send_nowait(('l1', { - 'symbol': cb_sym_to_deribit_inst( - str_to_cb_sym(data.symbol)).lower(), - 'ticks': [ - {'type': 'bid', - 'price': float(data.bid_price), 'size': float(data.bid_size)}, - {'type': 'bsize', - 'price': float(data.bid_price), 'size': float(data.bid_size)}, - {'type': 'ask', - 'price': float(data.ask_price), 'size': float(data.ask_size)}, - {'type': 'asize', - 'price': float(data.ask_price), 'size': float(data.ask_size)} - ] - })) - - fh = FeedHandler(config=get_config()) - fh.run(start_loop=False) - - fh.add_feed( - DERIBIT, - channels=[L1_BOOK], - symbols=instruments, - callbacks={L1_BOOK: l1_book_cb}) - - fh.add_feed( - DERIBIT, - channels=[TRADES], - symbols=instruments, - callbacks={TRADES: trade_cb}) - - # sync with trio - to_trio.send_nowait(None) - - try: - await asyncio.sleep(float('inf')) - - except asyncio.exceptions.CancelledError: - ... - - -@acm -async def open_cryptofeeds( - - instruments: List[str] - -) -> trio.abc.ReceiveStream: - - async with to_asyncio.open_channel_from( - open_aio_cryptofeed_relay, - instruments=instruments, - ) as (first, chan): - yield chan - - @acm async def open_history_client( instrument: str, @@ -265,8 +126,7 @@ async def stream_quotes( async with ( open_cached_client('deribit') as client, - send_chan as send_chan, - open_cryptofeeds(symbols) as stream + send_chan as send_chan ): init_msgs = { @@ -284,20 +144,23 @@ async def stream_quotes( nsym = piker_sym_to_cb_sym(sym) - # keep client cached for real-time section - cache = await client.cache_symbols() + async with client.feeds.open_price_feed( + symbols) as stream: + + cache = await client.cache_symbols() - async with aclosing(stream): last_trades = (await client.last_trades( cb_sym_to_deribit_inst(nsym), count=1)).trades if len(last_trades) == 0: - async for typ, quote in stream: + last_trade = None + while not last_trade: + typ, quote = await stream.receive() if typ == 'trade': - last_trade = Trade(**quote['data']) + last_trade = Trade(**(quote['data'])) else: - last_trade = Trade(**last_trades[0]) + last_trade = Trade(**(last_trades[0])) first_quote = { 'symbol': sym, @@ -314,9 +177,14 @@ async def stream_quotes( feed_is_live.set() - async for typ, quote in stream: - topic = quote['symbol'] - await send_chan.send({topic: quote}) + try: + while True: + typ, quote = await stream.receive() + topic = quote['symbol'] + await send_chan.send({topic: quote}) + + except trio.ClosedResourceError: + ... @tractor.context