Compare commits
8 Commits
9073fbc317
...
5c02dc6cd7
Author | SHA1 | Date |
---|---|---|
Guillermo Rodriguez | 5c02dc6cd7 | |
Guillermo Rodriguez | a5481e6746 | |
Guillermo Rodriguez | 73fcd72256 | |
Guillermo Rodriguez | fc2ceb5964 | |
Guillermo Rodriguez | d0dbb44092 | |
Guillermo Rodriguez | b20500c0d9 | |
Guillermo Rodriguez | 5872095b09 | |
Guillermo Rodriguez | 5f60923ac1 |
|
@ -50,3 +50,8 @@ prefer_data_account = [
|
|||
paper = "XX0000000"
|
||||
margin = "X0000000"
|
||||
ira = "X0000000"
|
||||
|
||||
|
||||
[deribit]
|
||||
key_id = 'XXXXXXXX'
|
||||
key_secret = 'Xx_XxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXx'
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
``deribit`` backend
|
||||
------------------
|
||||
pretty good liquidity crypto derivatives, uses custom json rpc over ws for
|
||||
client methods, then `cryptofeed` for data streams.
|
||||
|
||||
status
|
||||
******
|
||||
- supports option charts
|
||||
- no order support yet
|
||||
|
||||
|
||||
config
|
||||
******
|
||||
In order to get order mode support your ``brokers.toml``
|
||||
needs to have something like the following:
|
||||
|
||||
.. code:: toml
|
||||
|
||||
[deribit]
|
||||
key_id = 'XXXXXXXX'
|
||||
key_secret = 'Xx_XxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXx'
|
||||
|
||||
To obtain an api id and secret you need to create an account, which can be a
|
||||
real market account over at:
|
||||
|
||||
- deribit.com (requires KYC for deposit address)
|
||||
|
||||
Or a testnet account over at:
|
||||
|
||||
- test.deribit.com
|
||||
|
||||
For testnet once the account is created here is how you deposit fake crypto to
|
||||
try it out:
|
||||
|
||||
1) Go to Wallet:
|
||||
|
||||
.. figure:: assets/0_wallet.png
|
||||
:align: center
|
||||
:target: assets/0_wallet.png
|
||||
:alt: wallet page
|
||||
|
||||
2) Then click on the elipsis menu and select deposit
|
||||
|
||||
.. figure:: assets/1_wallet_select_deposit.png
|
||||
:align: center
|
||||
:target: assets/1_wallet_select_deposit.png
|
||||
:alt: wallet deposit page
|
||||
|
||||
3) This will take you to the deposit address page
|
||||
|
||||
.. figure:: assets/2_gen_deposit_addr.png
|
||||
:align: center
|
||||
:target: assets/2_gen_deposit_addr.png
|
||||
:alt: generate deposit address page
|
||||
|
||||
4) After clicking generate you should see the address, copy it and go to the
|
||||
[coin faucet](https://test.deribit.com/dericoin/BTC/deposit) and send fake
|
||||
coins to that address.
|
||||
|
||||
.. figure:: assets/3_deposit_address.png
|
||||
:align: center
|
||||
:target: assets/3_deposit_address.png
|
||||
:alt: generated address
|
||||
|
||||
5) Back in the deposit address page you should see the deposit in your history
|
||||
|
||||
.. figure:: assets/4_wallet_deposit_history.png
|
||||
:align: center
|
||||
:target: assets/4_wallet_deposit_history.png
|
||||
:alt: wallet deposit history
|
|
@ -32,8 +32,8 @@ from .feed import (
|
|||
stream_quotes,
|
||||
)
|
||||
# from .broker import (
|
||||
# trades_dialogue,
|
||||
# norm_trade_records,
|
||||
# trades_dialogue,
|
||||
# norm_trade_records,
|
||||
# )
|
||||
|
||||
__all__ = [
|
||||
|
@ -50,7 +50,7 @@ __all__ = [
|
|||
__enable_modules__: list[str] = [
|
||||
'api',
|
||||
'feed',
|
||||
# 'broker',
|
||||
'broker',
|
||||
]
|
||||
|
||||
# passed to ``tractor.ActorNursery.start_actor()``
|
||||
|
|
|
@ -1,3 +1,6 @@
|
|||
# piker: trading gear for hackers
|
||||
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
|
@ -15,29 +18,52 @@
|
|||
Deribit backend.
|
||||
|
||||
'''
|
||||
import json
|
||||
import time
|
||||
import asyncio
|
||||
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from contextlib import asynccontextmanager as acm, AsyncExitStack
|
||||
from itertools import count
|
||||
from functools import partial
|
||||
from datetime import datetime
|
||||
from typing import Any, Optional, List
|
||||
from typing import Any, List, Dict, Optional, Iterable, Callable
|
||||
|
||||
import pendulum
|
||||
import asks
|
||||
import trio
|
||||
from trio_typing import Nursery, TaskStatus
|
||||
from fuzzywuzzy import process as fuzzy
|
||||
import numpy as np
|
||||
|
||||
from piker.data.types import Struct
|
||||
from piker.data._web_bs import NoBsWs, open_autorecon_ws
|
||||
|
||||
from .._util import resproc
|
||||
|
||||
from piker import config
|
||||
from piker.log import get_logger
|
||||
|
||||
from tractor.trionics import broadcast_receiver, BroadcastReceiver, maybe_open_context
|
||||
from tractor import to_asyncio
|
||||
|
||||
from cryptofeed import FeedHandler
|
||||
|
||||
from cryptofeed.defines import (
|
||||
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
|
||||
)
|
||||
from cryptofeed.symbols import Symbol
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
_spawn_kwargs = {
|
||||
'infect_asyncio': True,
|
||||
}
|
||||
|
||||
|
||||
_url = 'https://www.deribit.com'
|
||||
_ws_url = 'wss://www.deribit.com/ws/api/v2'
|
||||
_testnet_ws_url = 'wss://test.deribit.com/ws/api/v2'
|
||||
|
||||
|
||||
# Broker specific ohlc schema (rest)
|
||||
|
@ -55,7 +81,9 @@ _ohlc_dtype = [
|
|||
|
||||
class JSONRPCResult(Struct):
|
||||
jsonrpc: str = '2.0'
|
||||
result: dict
|
||||
id: int
|
||||
result: Optional[dict] = None
|
||||
error: Optional[dict] = None
|
||||
usIn: int
|
||||
usOut: int
|
||||
usDiff: int
|
||||
|
@ -83,6 +111,8 @@ class Trade(Struct):
|
|||
instrument_name: str
|
||||
index_price: float
|
||||
direction: str
|
||||
combo_trade_id: Optional[int] = 0,
|
||||
combo_id: Optional[str] = '',
|
||||
amount: float
|
||||
|
||||
class LastTradesResult(Struct):
|
||||
|
@ -95,24 +125,161 @@ def deribit_timestamp(when):
|
|||
return int((when.timestamp() * 1000) + (when.microsecond / 1000))
|
||||
|
||||
|
||||
def str_to_cb_sym(name: str) -> Symbol:
|
||||
base, strike_price, expiry_date, option_type = name.split('-')
|
||||
|
||||
quote = base
|
||||
|
||||
if option_type == 'put':
|
||||
option_type = PUT
|
||||
elif option_type == 'call':
|
||||
option_type = CALL
|
||||
else:
|
||||
raise Exception("Couldn\'t parse option type")
|
||||
|
||||
return Symbol(
|
||||
base, quote,
|
||||
type=OPTION,
|
||||
strike_price=strike_price,
|
||||
option_type=option_type,
|
||||
expiry_date=expiry_date,
|
||||
expiry_normalize=False)
|
||||
|
||||
|
||||
def piker_sym_to_cb_sym(name: str) -> Symbol:
|
||||
base, expiry_date, strike_price, option_type = tuple(
|
||||
name.upper().split('-'))
|
||||
|
||||
quote = base
|
||||
|
||||
if option_type == 'P':
|
||||
option_type = PUT
|
||||
elif option_type == 'C':
|
||||
option_type = CALL
|
||||
else:
|
||||
raise Exception("Couldn\'t parse option type")
|
||||
|
||||
return Symbol(
|
||||
base, quote,
|
||||
type=OPTION,
|
||||
strike_price=strike_price,
|
||||
option_type=option_type,
|
||||
expiry_date=expiry_date.upper())
|
||||
|
||||
|
||||
def cb_sym_to_deribit_inst(sym: Symbol):
|
||||
# cryptofeed normalized
|
||||
cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z']
|
||||
|
||||
# deribit specific
|
||||
months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC']
|
||||
|
||||
exp = sym.expiry_date
|
||||
|
||||
# YYMDD
|
||||
# 01234
|
||||
year, month, day = (
|
||||
exp[:2], months[cb_norm.index(exp[2:3])], exp[3:])
|
||||
|
||||
otype = 'C' if sym.option_type == CALL else 'P'
|
||||
|
||||
return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}'
|
||||
|
||||
|
||||
def get_config() -> dict[str, Any]:
|
||||
|
||||
conf, path = config.load()
|
||||
|
||||
section = conf.get('deribit')
|
||||
|
||||
# TODO: document why we send this, basically because logging params for cryptofeed
|
||||
conf['log'] = {}
|
||||
conf['log']['disabled'] = True
|
||||
|
||||
if section is None:
|
||||
log.warning(f'No config section found for deribit in {path}')
|
||||
|
||||
return conf
|
||||
|
||||
|
||||
class Client:
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._sesh = asks.Session(connections=4)
|
||||
self._sesh.base_location = _url
|
||||
self._pairs: dict[str, Any] = {}
|
||||
def __init__(self, json_rpc: Callable) -> None:
|
||||
self._pairs: dict[str, Any] = None
|
||||
|
||||
async def _api(
|
||||
config = get_config().get('deribit', {})
|
||||
|
||||
if ('key_id' in config) and ('key_secret' in config):
|
||||
self._key_id = config['key_id']
|
||||
self._key_secret = config['key_secret']
|
||||
|
||||
else:
|
||||
self._key_id = None
|
||||
self._key_secret = None
|
||||
|
||||
self.json_rpc = json_rpc
|
||||
|
||||
@property
|
||||
def currencies(self):
|
||||
return ['btc', 'eth', 'sol', 'usd']
|
||||
|
||||
async def get_balances(self, kind: str = 'option') -> dict[str, float]:
|
||||
"""Return the set of positions for this account
|
||||
by symbol.
|
||||
"""
|
||||
balances = {}
|
||||
|
||||
for currency in self.currencies:
|
||||
resp = await self.json_rpc(
|
||||
'private/get_positions', params={
|
||||
'currency': currency.upper(),
|
||||
'kind': kind})
|
||||
|
||||
balances[currency] = resp.result
|
||||
|
||||
return balances
|
||||
|
||||
async def get_assets(self) -> dict[str, float]:
|
||||
"""Return the set of asset balances for this account
|
||||
by symbol.
|
||||
"""
|
||||
balances = {}
|
||||
|
||||
for currency in self.currencies:
|
||||
resp = await self.json_rpc(
|
||||
'private/get_account_summary', params={
|
||||
'currency': currency.upper()})
|
||||
|
||||
balances[currency] = resp.result['balance']
|
||||
|
||||
return balances
|
||||
|
||||
async def submit_limit(
|
||||
self,
|
||||
method: str,
|
||||
params: dict,
|
||||
) -> dict[str, Any]:
|
||||
resp = await self._sesh.get(
|
||||
path=f'/api/v2/public/{method}',
|
||||
params=params,
|
||||
timeout=float('inf')
|
||||
)
|
||||
return resproc(resp, log)
|
||||
symbol: str,
|
||||
price: float,
|
||||
action: str,
|
||||
size: float
|
||||
) -> dict:
|
||||
"""Place an order
|
||||
"""
|
||||
params = {
|
||||
'instrument_name': symbol.upper(),
|
||||
'amount': size,
|
||||
'type': 'limit',
|
||||
'price': price,
|
||||
}
|
||||
resp = await self.json_rpc(
|
||||
f'private/{action}', params)
|
||||
|
||||
return resp.result
|
||||
|
||||
async def submit_cancel(self, oid: str):
|
||||
"""Send cancel request for order id
|
||||
"""
|
||||
resp = await self.json_rpc(
|
||||
'private/cancel', {'order_id': oid})
|
||||
return resp.result
|
||||
|
||||
async def symbol_info(
|
||||
self,
|
||||
|
@ -121,11 +288,11 @@ class Client:
|
|||
kind: str = 'option',
|
||||
expired: bool = False
|
||||
) -> dict[str, Any]:
|
||||
'''Get symbol info for the exchange.
|
||||
"""Get symbol info for the exchange.
|
||||
|
||||
'''
|
||||
# TODO: we can load from our self._pairs cache
|
||||
# on repeat calls...
|
||||
"""
|
||||
if self._pairs:
|
||||
return self._pairs
|
||||
|
||||
# will retrieve all symbols by default
|
||||
params = {
|
||||
|
@ -134,13 +301,13 @@ class Client:
|
|||
'expired': str(expired).lower()
|
||||
}
|
||||
|
||||
resp = await self._api(
|
||||
'get_instruments', params=params)
|
||||
|
||||
results = resp['result']
|
||||
resp = await self.json_rpc('public/get_instruments', params)
|
||||
results = resp.result
|
||||
|
||||
instruments = {
|
||||
item['instrument_name']: item for item in results}
|
||||
item['instrument_name'].lower(): item
|
||||
for item in results
|
||||
}
|
||||
|
||||
if instrument is not None:
|
||||
return instruments[instrument]
|
||||
|
@ -158,20 +325,18 @@ class Client:
|
|||
async def search_symbols(
|
||||
self,
|
||||
pattern: str,
|
||||
limit: int = None,
|
||||
limit: int = 30,
|
||||
) -> dict[str, Any]:
|
||||
if self._pairs is not None:
|
||||
data = self._pairs
|
||||
else:
|
||||
data = await self.symbol_info()
|
||||
|
||||
matches = fuzzy.extractBests(
|
||||
pattern,
|
||||
data,
|
||||
score_cutoff=50,
|
||||
score_cutoff=35,
|
||||
limit=limit
|
||||
)
|
||||
# repack in dict form
|
||||
return {item[0]['instrument_name']: item[0]
|
||||
return {item[0]['instrument_name'].lower(): item[0]
|
||||
for item in matches}
|
||||
|
||||
async def bars(
|
||||
|
@ -195,19 +360,16 @@ class Client:
|
|||
end_time = deribit_timestamp(end_dt)
|
||||
|
||||
# https://docs.deribit.com/#public-get_tradingview_chart_data
|
||||
response = await self._api(
|
||||
'get_tradingview_chart_data',
|
||||
resp = await self.json_rpc(
|
||||
'public/get_tradingview_chart_data',
|
||||
params={
|
||||
'instrument_name': instrument.upper(),
|
||||
'start_timestamp': start_time,
|
||||
'end_timestamp': end_time,
|
||||
'resolution': '1'
|
||||
}
|
||||
)
|
||||
})
|
||||
|
||||
klines = JSONRPCResult(**response)
|
||||
|
||||
result = KLinesResult(**klines.result)
|
||||
result = KLinesResult(**resp.result)
|
||||
new_bars = []
|
||||
for i in range(len(result.close)):
|
||||
|
||||
|
@ -237,19 +399,308 @@ class Client:
|
|||
instrument: str,
|
||||
count: int = 10
|
||||
):
|
||||
response = await self._api(
|
||||
'get_last_trades_by_instrument',
|
||||
resp = await self.json_rpc(
|
||||
'public/get_last_trades_by_instrument',
|
||||
params={
|
||||
'instrument_name': instrument,
|
||||
'count': count
|
||||
}
|
||||
)
|
||||
})
|
||||
|
||||
return LastTradesResult(**(JSONRPCResult(**response).result))
|
||||
return LastTradesResult(**resp.result)
|
||||
|
||||
|
||||
@acm
|
||||
async def get_client() -> Client:
|
||||
client = Client()
|
||||
async with (
|
||||
trio.open_nursery() as n,
|
||||
open_autorecon_ws(_testnet_ws_url) as ws
|
||||
):
|
||||
|
||||
_rpc_id: Iterable = count(0)
|
||||
_rpc_results: Dict[int, Dict] = {}
|
||||
|
||||
_expiry_time: int = float('inf')
|
||||
_access_token: Optional[str] = None
|
||||
_refresh_token: Optional[str] = None
|
||||
|
||||
def _next_json_body(method: str, params: Dict):
|
||||
"""get the typical json rpc 2.0 msg body and increment the req id
|
||||
"""
|
||||
return {
|
||||
'jsonrpc': '2.0',
|
||||
'id': next(_rpc_id),
|
||||
'method': method,
|
||||
'params': params
|
||||
}
|
||||
|
||||
async def json_rpc(method: str, params: Dict) -> Dict:
|
||||
"""perform a json rpc call and wait for the result, raise exception in
|
||||
case of error field present on response
|
||||
"""
|
||||
msg = _next_json_body(method, params)
|
||||
_id = msg['id']
|
||||
|
||||
_rpc_results[_id] = {
|
||||
'result': None,
|
||||
'event': trio.Event()
|
||||
}
|
||||
|
||||
await ws.send_msg(msg)
|
||||
|
||||
await _rpc_results[_id]['event'].wait()
|
||||
|
||||
ret = _rpc_results[_id]['result']
|
||||
|
||||
del _rpc_results[_id]
|
||||
|
||||
if ret.error is not None:
|
||||
raise Exception(json.dumps(ret.error, indent=4))
|
||||
|
||||
return ret
|
||||
|
||||
async def _recv_task():
|
||||
"""receives every ws message and stores it in its corresponding result
|
||||
field, then sets the event to wakeup original sender tasks.
|
||||
"""
|
||||
async for msg in ws:
|
||||
msg = JSONRPCResult(**msg)
|
||||
|
||||
if msg.id not in _rpc_results:
|
||||
# in case this message wasn't beign accounted for store it
|
||||
_rpc_results[msg.id] = {
|
||||
'result': None,
|
||||
'event': trio.Event()
|
||||
}
|
||||
|
||||
_rpc_results[msg.id]['result'] = msg
|
||||
_rpc_results[msg.id]['event'].set()
|
||||
|
||||
client = Client(json_rpc)
|
||||
|
||||
async def _auth_loop(
|
||||
task_status: TaskStatus = trio.TASK_STATUS_IGNORED
|
||||
):
|
||||
"""Background task that adquires a first access token and then will
|
||||
refresh the access token while the nursery isn't cancelled.
|
||||
|
||||
https://docs.deribit.com/?python#authentication-2
|
||||
"""
|
||||
renew_time = 10
|
||||
access_scope = 'trade:read_write'
|
||||
_expiry_time = time.time()
|
||||
got_access = False
|
||||
nonlocal _refresh_token
|
||||
nonlocal _access_token
|
||||
|
||||
while True:
|
||||
if time.time() - _expiry_time < renew_time:
|
||||
# if we are close to token expiry time
|
||||
|
||||
if _refresh_token != None:
|
||||
# if we have a refresh token already dont need to send
|
||||
# secret
|
||||
params = {
|
||||
'grant_type': 'refresh_token',
|
||||
'refresh_token': _refresh_token,
|
||||
'scope': access_scope
|
||||
}
|
||||
|
||||
else:
|
||||
# we don't have refresh token, send secret to initialize
|
||||
params = {
|
||||
'grant_type': 'client_credentials',
|
||||
'client_id': client._key_id,
|
||||
'client_secret': client._key_secret,
|
||||
'scope': access_scope
|
||||
}
|
||||
|
||||
resp = await json_rpc('public/auth', params)
|
||||
result = resp.result
|
||||
|
||||
_expiry_time = time.time() + result['expires_in']
|
||||
_refresh_token = result['refresh_token']
|
||||
|
||||
if 'access_token' in result:
|
||||
_access_token = result['access_token']
|
||||
|
||||
if not got_access:
|
||||
# first time this loop runs we must indicate task is
|
||||
# started, we have auth
|
||||
got_access = True
|
||||
task_status.started()
|
||||
|
||||
else:
|
||||
await trio.sleep(renew_time / 2)
|
||||
|
||||
n.start_soon(_recv_task)
|
||||
# if we have client creds launch auth loop
|
||||
if client._key_id is not None:
|
||||
await n.start(_auth_loop)
|
||||
|
||||
await client.cache_symbols()
|
||||
yield client
|
||||
|
||||
|
||||
@acm
|
||||
async def open_feed_handler():
|
||||
fh = FeedHandler(config=get_config())
|
||||
yield fh
|
||||
await to_asyncio.run_task(fh.stop_async)
|
||||
|
||||
|
||||
@acm
|
||||
async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
|
||||
async with maybe_open_context(
|
||||
acm_func=open_feed_handler,
|
||||
key='feedhandler',
|
||||
) as (cache_hit, fh):
|
||||
yield fh
|
||||
|
||||
|
||||
@acm
|
||||
async def open_price_feed(
|
||||
instrument: str
|
||||
) -> trio.abc.ReceiveStream:
|
||||
|
||||
# XXX: hangs when going into this ctx mngr
|
||||
async with maybe_open_feed_handler() as fh:
|
||||
|
||||
async def relay(
|
||||
from_trio: asyncio.Queue,
|
||||
to_trio: trio.abc.SendChannel,
|
||||
) -> None:
|
||||
async def _trade(data: dict, receipt_timestamp):
|
||||
to_trio.send_nowait(('trade', {
|
||||
'symbol': cb_sym_to_deribit_inst(
|
||||
str_to_cb_sym(data.symbol)).lower(),
|
||||
'last': data,
|
||||
'broker_ts': time.time(),
|
||||
'data': data.to_dict(),
|
||||
'receipt': receipt_timestamp
|
||||
}))
|
||||
|
||||
async def _l1(data: dict, receipt_timestamp):
|
||||
to_trio.send_nowait(('l1', {
|
||||
'symbol': cb_sym_to_deribit_inst(
|
||||
str_to_cb_sym(data.symbol)).lower(),
|
||||
'ticks': [
|
||||
{'type': 'bid',
|
||||
'price': float(data.bid_price), 'size': float(data.bid_size)},
|
||||
{'type': 'bsize',
|
||||
'price': float(data.bid_price), 'size': float(data.bid_size)},
|
||||
{'type': 'ask',
|
||||
'price': float(data.ask_price), 'size': float(data.ask_size)},
|
||||
{'type': 'asize',
|
||||
'price': float(data.ask_price), 'size': float(data.ask_size)}
|
||||
]
|
||||
}))
|
||||
|
||||
fh.add_feed(
|
||||
DERIBIT,
|
||||
channels=[TRADES, L1_BOOK],
|
||||
symbols=[instrument],
|
||||
callbacks={
|
||||
TRADES: _trade,
|
||||
L1_BOOK: _l1
|
||||
})
|
||||
|
||||
if not fh.running:
|
||||
fh.run(
|
||||
start_loop=False,
|
||||
install_signal_handlers=False)
|
||||
|
||||
# sync with trio
|
||||
to_trio.send_nowait(None)
|
||||
|
||||
try:
|
||||
await asyncio.sleep(float('inf'))
|
||||
|
||||
except asyncio.exceptions.CancelledError:
|
||||
...
|
||||
|
||||
async with to_asyncio.open_channel_from(
|
||||
relay
|
||||
) as (first, chan):
|
||||
yield chan
|
||||
|
||||
|
||||
@acm
|
||||
async def maybe_open_price_feed(
|
||||
instrument: str
|
||||
) -> trio.abc.ReceiveStream:
|
||||
|
||||
# TODO: add a predicate to maybe_open_context
|
||||
async with maybe_open_context(
|
||||
acm_func=open_price_feed,
|
||||
kwargs={
|
||||
'instrument': instrument
|
||||
},
|
||||
key=f'{instrument}-price',
|
||||
) as (cache_hit, feed):
|
||||
if cache_hit:
|
||||
yield broadcast_receiver(feed, 10)
|
||||
else:
|
||||
yield feed
|
||||
|
||||
@acm
|
||||
async def open_order_feed(
|
||||
instrument: List[str]
|
||||
) -> trio.abc.ReceiveStream:
|
||||
|
||||
async with maybe_open_feed_handler() as fh:
|
||||
|
||||
async def relay(
|
||||
from_trio: asyncio.Queue,
|
||||
to_trio: trio.abc.SendChannel,
|
||||
) -> None:
|
||||
async def _fill(data: dict, receipt_timestamp):
|
||||
breakpoint()
|
||||
|
||||
async def _order_info(data: dict, receipt_timestamp):
|
||||
breakpoint()
|
||||
|
||||
fh.add_feed(
|
||||
DERIBIT,
|
||||
channels=[FILLS, ORDER_INFO],
|
||||
symbols=[instrument],
|
||||
callbacks={
|
||||
FILLS: _fill,
|
||||
ORDER_INFO: _order_info,
|
||||
})
|
||||
|
||||
if not fh.running:
|
||||
fh.run(
|
||||
start_loop=False,
|
||||
install_signal_handlers=False)
|
||||
|
||||
# sync with trio
|
||||
to_trio.send_nowait(None)
|
||||
|
||||
try:
|
||||
await asyncio.sleep(float('inf'))
|
||||
|
||||
except asyncio.exceptions.CancelledError:
|
||||
...
|
||||
|
||||
async with to_asyncio.open_channel_from(
|
||||
relay
|
||||
) as (first, chan):
|
||||
yield chan
|
||||
|
||||
@acm
|
||||
async def maybe_open_order_feed(
|
||||
instrument: str
|
||||
) -> trio.abc.ReceiveStream:
|
||||
|
||||
# TODO: add a predicate to maybe_open_context
|
||||
async with maybe_open_context(
|
||||
acm_func=open_order_feed,
|
||||
kwargs={
|
||||
'instrument': instrument
|
||||
},
|
||||
key=f'{instrument}-order',
|
||||
) as (cache_hit, feed):
|
||||
if cache_hit:
|
||||
yield broadcast_receiver(feed, 10)
|
||||
else:
|
||||
yield feed
|
||||
|
|
Binary file not shown.
After Width: | Height: | Size: 169 KiB |
Binary file not shown.
After Width: | Height: | Size: 106 KiB |
Binary file not shown.
After Width: | Height: | Size: 59 KiB |
Binary file not shown.
After Width: | Height: | Size: 70 KiB |
Binary file not shown.
After Width: | Height: | Size: 132 KiB |
|
@ -0,0 +1,44 @@
|
|||
# piker: trading gear for hackers
|
||||
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
'''
|
||||
Order api and machinery
|
||||
|
||||
'''
|
||||
|
||||
from typing import Any, AsyncIterator
|
||||
|
||||
import tractor
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def trades_dialogue(
|
||||
ctx: tractor.Context,
|
||||
loglevel: str = None,
|
||||
|
||||
) -> AsyncIterator[dict[str, Any]]:
|
||||
|
||||
# XXX: required to propagate ``tractor`` loglevel to piker logging
|
||||
get_console_log(loglevel or tractor.current_actor().loglevel)
|
||||
|
||||
async with open_cached_client('deribit') as client:
|
||||
if not client._key_id:
|
||||
raise RuntimeError('Missing Deribit API key in `brokers.toml`!?!?')
|
||||
|
||||
acc_name = f'deribit.{client._key_id}'
|
||||
|
||||
await client.cache_symbols()
|
||||
|
||||
breakpoint()
|
|
@ -1,3 +1,6 @@
|
|||
# piker: trading gear for hackers
|
||||
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
|
@ -15,9 +18,6 @@
|
|||
Deribit backend.
|
||||
|
||||
'''
|
||||
|
||||
import asyncio
|
||||
from async_generator import aclosing
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from datetime import datetime
|
||||
from typing import Any, Optional, List, Callable
|
||||
|
@ -29,9 +29,7 @@ import pendulum
|
|||
from fuzzywuzzy import process as fuzzy
|
||||
import numpy as np
|
||||
import tractor
|
||||
from tractor import to_asyncio
|
||||
|
||||
from piker import config
|
||||
from piker._cacheables import open_cached_client
|
||||
from piker.log import get_logger, get_console_log
|
||||
from piker.data import ShmArray
|
||||
|
@ -47,170 +45,21 @@ from cryptofeed.defines import (
|
|||
)
|
||||
from cryptofeed.symbols import Symbol
|
||||
|
||||
from .api import Client, Trade
|
||||
from .api import (
|
||||
Client, Trade,
|
||||
get_config,
|
||||
str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
|
||||
maybe_open_price_feed
|
||||
)
|
||||
|
||||
_spawn_kwargs = {
|
||||
'infect_asyncio': True,
|
||||
}
|
||||
|
||||
|
||||
def get_config() -> dict[str, Any]:
|
||||
|
||||
conf, path = config.load()
|
||||
|
||||
section = conf.get('deribit')
|
||||
|
||||
if section is None:
|
||||
log.warning(f'No config section found for deribit in {path}')
|
||||
return {}
|
||||
|
||||
conf['log'] = {}
|
||||
conf['log']['disabled'] = True
|
||||
|
||||
# conf['log']['filename'] = '/tmp/feedhandler.log'
|
||||
# conf['log']['level'] = 'WARNING'
|
||||
|
||||
return conf
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
_url = 'https://www.deribit.com'
|
||||
|
||||
|
||||
def str_to_cb_sym(name: str) -> Symbol:
|
||||
base, strike_price, expiry_date, option_type = name.split('-')
|
||||
|
||||
quote = base
|
||||
|
||||
if option_type == 'put':
|
||||
option_type = PUT
|
||||
elif option_type == 'call':
|
||||
option_type = CALL
|
||||
else:
|
||||
raise BaseException("Couldn\'t parse option type")
|
||||
|
||||
return Symbol(
|
||||
base, quote,
|
||||
type=OPTION,
|
||||
strike_price=strike_price,
|
||||
option_type=option_type,
|
||||
expiry_date=expiry_date,
|
||||
expiry_normalize=False)
|
||||
|
||||
|
||||
|
||||
def piker_sym_to_cb_sym(name: str) -> Symbol:
|
||||
base, expiry_date, strike_price, option_type = tuple(
|
||||
name.upper().split('-'))
|
||||
|
||||
quote = base
|
||||
|
||||
if option_type == 'P':
|
||||
option_type = PUT
|
||||
elif option_type == 'C':
|
||||
option_type = CALL
|
||||
else:
|
||||
raise BaseException("Couldn\'t parse option type")
|
||||
|
||||
return Symbol(
|
||||
base, quote,
|
||||
type=OPTION,
|
||||
strike_price=strike_price,
|
||||
option_type=option_type,
|
||||
expiry_date=expiry_date.upper())
|
||||
|
||||
|
||||
def cb_sym_to_deribit_inst(sym: Symbol):
|
||||
# cryptofeed normalized
|
||||
cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z']
|
||||
|
||||
# deribit specific
|
||||
months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC']
|
||||
|
||||
exp = sym.expiry_date
|
||||
|
||||
# YYMDD
|
||||
# 01234
|
||||
year, month, day = (
|
||||
exp[:2], months[cb_norm.index(exp[2:3])], exp[3:])
|
||||
|
||||
otype = 'C' if sym.option_type == CALL else 'P'
|
||||
|
||||
return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}'
|
||||
|
||||
|
||||
# inside here we are in an asyncio context
|
||||
async def open_aio_cryptofeed_relay(
|
||||
from_trio: asyncio.Queue,
|
||||
to_trio: trio.abc.SendChannel,
|
||||
instruments: List[str] = []
|
||||
) -> None:
|
||||
|
||||
instruments = [piker_sym_to_cb_sym(i) for i in instruments]
|
||||
|
||||
async def trade_cb(data: dict, receipt_timestamp):
|
||||
to_trio.send_nowait(('trade', {
|
||||
'symbol': cb_sym_to_deribit_inst(
|
||||
str_to_cb_sym(data.symbol)).lower(),
|
||||
'last': data,
|
||||
'broker_ts': time.time(),
|
||||
'data': data.to_dict(),
|
||||
'receipt': receipt_timestamp
|
||||
}))
|
||||
|
||||
async def l1_book_cb(data: dict, receipt_timestamp):
|
||||
to_trio.send_nowait(('l1', {
|
||||
'symbol': cb_sym_to_deribit_inst(
|
||||
str_to_cb_sym(data.symbol)).lower(),
|
||||
'ticks': [
|
||||
{'type': 'bid',
|
||||
'price': float(data.bid_price), 'size': float(data.bid_size)},
|
||||
{'type': 'bsize',
|
||||
'price': float(data.bid_price), 'size': float(data.bid_size)},
|
||||
{'type': 'ask',
|
||||
'price': float(data.ask_price), 'size': float(data.ask_size)},
|
||||
{'type': 'asize',
|
||||
'price': float(data.ask_price), 'size': float(data.ask_size)}
|
||||
]
|
||||
}))
|
||||
|
||||
fh = FeedHandler(config=get_config())
|
||||
fh.run(start_loop=False)
|
||||
|
||||
fh.add_feed(
|
||||
DERIBIT,
|
||||
channels=[L1_BOOK],
|
||||
symbols=instruments,
|
||||
callbacks={L1_BOOK: l1_book_cb})
|
||||
|
||||
fh.add_feed(
|
||||
DERIBIT,
|
||||
channels=[TRADES],
|
||||
symbols=instruments,
|
||||
callbacks={TRADES: trade_cb})
|
||||
|
||||
# sync with trio
|
||||
to_trio.send_nowait(None)
|
||||
|
||||
await asyncio.sleep(float('inf'))
|
||||
|
||||
|
||||
@acm
|
||||
async def open_cryptofeeds(
|
||||
|
||||
instruments: List[str]
|
||||
|
||||
) -> trio.abc.ReceiveStream:
|
||||
|
||||
async with to_asyncio.open_channel_from(
|
||||
open_aio_cryptofeed_relay,
|
||||
instruments=instruments,
|
||||
) as (first, chan):
|
||||
yield chan
|
||||
|
||||
|
||||
@acm
|
||||
async def open_history_client(
|
||||
instrument: str,
|
||||
|
@ -278,9 +127,7 @@ async def stream_quotes(
|
|||
|
||||
async with (
|
||||
open_cached_client('deribit') as client,
|
||||
send_chan as send_chan,
|
||||
trio.open_nursery() as n,
|
||||
open_cryptofeeds(symbols) as stream
|
||||
send_chan as send_chan
|
||||
):
|
||||
|
||||
init_msgs = {
|
||||
|
@ -298,11 +145,22 @@ async def stream_quotes(
|
|||
|
||||
nsym = piker_sym_to_cb_sym(sym)
|
||||
|
||||
# keep client cached for real-time section
|
||||
async with maybe_open_price_feed(sym) as stream:
|
||||
|
||||
cache = await client.cache_symbols()
|
||||
|
||||
last_trade = Trade(**(await client.last_trades(
|
||||
cb_sym_to_deribit_inst(nsym), count=1)).trades[0])
|
||||
last_trades = (await client.last_trades(
|
||||
cb_sym_to_deribit_inst(nsym), count=1)).trades
|
||||
|
||||
if len(last_trades) == 0:
|
||||
last_trade = None
|
||||
async for typ, quote in stream:
|
||||
if typ == 'trade':
|
||||
last_trade = Trade(**(quote['data']))
|
||||
break
|
||||
|
||||
else:
|
||||
last_trade = Trade(**(last_trades[0]))
|
||||
|
||||
first_quote = {
|
||||
'symbol': sym,
|
||||
|
@ -317,7 +175,6 @@ async def stream_quotes(
|
|||
}
|
||||
task_status.started((init_msgs, first_quote))
|
||||
|
||||
async with aclosing(stream):
|
||||
feed_is_live.set()
|
||||
|
||||
async for typ, quote in stream:
|
||||
|
@ -338,15 +195,6 @@ async def open_symbol_search(
|
|||
async with ctx.open_stream() as stream:
|
||||
|
||||
async for pattern in stream:
|
||||
# results = await client.symbol_info(sym=pattern.upper())
|
||||
|
||||
matches = fuzzy.extractBests(
|
||||
pattern,
|
||||
cache,
|
||||
score_cutoff=30,
|
||||
)
|
||||
# repack in dict form
|
||||
await stream.send(
|
||||
{item[0]['instrument_name']: item[0]
|
||||
for item in matches}
|
||||
)
|
||||
await client.search_symbols(pattern))
|
||||
|
|
|
@ -20,7 +20,7 @@ ToOlS fOr CoPInG wITh "tHE wEB" protocols.
|
|||
"""
|
||||
from contextlib import asynccontextmanager, AsyncExitStack
|
||||
from types import ModuleType
|
||||
from typing import Any, Callable, AsyncGenerator
|
||||
from typing import Any, Optional, Callable, AsyncGenerator
|
||||
import json
|
||||
|
||||
import trio
|
||||
|
@ -54,8 +54,8 @@ class NoBsWs:
|
|||
self,
|
||||
url: str,
|
||||
stack: AsyncExitStack,
|
||||
fixture: Callable,
|
||||
serializer: ModuleType = json,
|
||||
fixture: Optional[Callable] = None,
|
||||
serializer: ModuleType = json
|
||||
):
|
||||
self.url = url
|
||||
self.fixture = fixture
|
||||
|
@ -80,6 +80,8 @@ class NoBsWs:
|
|||
self._ws = await self._stack.enter_async_context(
|
||||
trio_websocket.open_websocket_url(self.url)
|
||||
)
|
||||
|
||||
if self.fixture is not None:
|
||||
# rerun user code fixture
|
||||
ret = await self._stack.enter_async_context(
|
||||
self.fixture(self)
|
||||
|
@ -121,13 +123,19 @@ class NoBsWs:
|
|||
except self.recon_errors:
|
||||
await self._connect()
|
||||
|
||||
def __aiter__(self):
|
||||
return self
|
||||
|
||||
async def __anext__(self):
|
||||
return await self.recv_msg()
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def open_autorecon_ws(
|
||||
url: str,
|
||||
|
||||
# TODO: proper type annot smh
|
||||
fixture: Callable,
|
||||
fixture: Optional[Callable] = None,
|
||||
|
||||
) -> AsyncGenerator[tuple[...], NoBsWs]:
|
||||
"""Apparently we can QoS for all sorts of reasons..so catch em.
|
||||
|
|
|
@ -20,4 +20,4 @@
|
|||
-e git+https://github.com/pikers/asyncvnc.git@main#egg=asyncvnc
|
||||
|
||||
# ``cryptofeed`` for connecting to various crypto exchanges + custom fixes
|
||||
-e git+https://github.com/guilledk/cryptofeed.git@date_parsing#egg=cryptofeed
|
||||
-e git+https://github.com/pikers/cryptofeed.git@date_parsing#egg=cryptofeed
|
||||
|
|
Loading…
Reference in New Issue