Compare commits

...

8 Commits

Author SHA1 Message Date
Guillermo Rodriguez 5c02dc6cd7
Add aiter api to NoBsWs and rework cryptofeed relay to not be OOPy 2022-08-23 22:21:27 -03:00
Guillermo Rodriguez a5481e6746
Switch back to using async for and dont install signal handlers on cryptofeed 2022-08-23 16:15:35 -03:00
Guillermo Rodriguez 73fcd72256
Add README.rst and brokers.toml section in config example 2022-08-23 16:00:49 -03:00
Guillermo Rodriguez fc2ceb5964
Refactor cryptofeed relay api and move it to client
Added submit_limit and submit_cancel
Cache syms correctly
Lowercase search results
2022-08-23 15:18:45 -03:00
Guillermo Rodriguez d0dbb44092
Add get_balances, and get_assets rpc to deribit.api.Client
Improve symbol_info search results
Expect cancellation on cryptofeeds asyncio task
Fix the no trades on instrument bug that we had on startup
2022-08-23 15:18:45 -03:00
Guillermo Rodriguez b20500c0d9
Add comments and update cryptofeed fork url in requirements 2022-08-23 15:18:45 -03:00
Guillermo Rodriguez 5872095b09
Tweaks on Client init to make api credentials optional 2022-08-23 15:18:45 -03:00
Guillermo Rodriguez 5f60923ac1
Begin jsonrpc over ws refactor 2022-08-23 15:18:45 -03:00
13 changed files with 678 additions and 252 deletions

View File

@ -50,3 +50,8 @@ prefer_data_account = [
paper = "XX0000000" paper = "XX0000000"
margin = "X0000000" margin = "X0000000"
ira = "X0000000" ira = "X0000000"
[deribit]
key_id = 'XXXXXXXX'
key_secret = 'Xx_XxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXx'

View File

@ -0,0 +1,70 @@
``deribit`` backend
------------------
pretty good liquidity crypto derivatives, uses custom json rpc over ws for
client methods, then `cryptofeed` for data streams.
status
******
- supports option charts
- no order support yet
config
******
In order to get order mode support your ``brokers.toml``
needs to have something like the following:
.. code:: toml
[deribit]
key_id = 'XXXXXXXX'
key_secret = 'Xx_XxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXx'
To obtain an api id and secret you need to create an account, which can be a
real market account over at:
- deribit.com (requires KYC for deposit address)
Or a testnet account over at:
- test.deribit.com
For testnet once the account is created here is how you deposit fake crypto to
try it out:
1) Go to Wallet:
.. figure:: assets/0_wallet.png
:align: center
:target: assets/0_wallet.png
:alt: wallet page
2) Then click on the elipsis menu and select deposit
.. figure:: assets/1_wallet_select_deposit.png
:align: center
:target: assets/1_wallet_select_deposit.png
:alt: wallet deposit page
3) This will take you to the deposit address page
.. figure:: assets/2_gen_deposit_addr.png
:align: center
:target: assets/2_gen_deposit_addr.png
:alt: generate deposit address page
4) After clicking generate you should see the address, copy it and go to the
[coin faucet](https://test.deribit.com/dericoin/BTC/deposit) and send fake
coins to that address.
.. figure:: assets/3_deposit_address.png
:align: center
:target: assets/3_deposit_address.png
:alt: generated address
5) Back in the deposit address page you should see the deposit in your history
.. figure:: assets/4_wallet_deposit_history.png
:align: center
:target: assets/4_wallet_deposit_history.png
:alt: wallet deposit history

View File

@ -32,8 +32,8 @@ from .feed import (
stream_quotes, stream_quotes,
) )
# from .broker import ( # from .broker import (
# trades_dialogue, # trades_dialogue,
# norm_trade_records, # norm_trade_records,
# ) # )
__all__ = [ __all__ = [
@ -50,7 +50,7 @@ __all__ = [
__enable_modules__: list[str] = [ __enable_modules__: list[str] = [
'api', 'api',
'feed', 'feed',
# 'broker', 'broker',
] ]
# passed to ``tractor.ActorNursery.start_actor()`` # passed to ``tractor.ActorNursery.start_actor()``

View File

@ -1,3 +1,6 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or # the Free Software Foundation, either version 3 of the License, or
@ -15,29 +18,52 @@
Deribit backend. Deribit backend.
''' '''
import json
import time
import asyncio
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm, AsyncExitStack
from itertools import count
from functools import partial
from datetime import datetime from datetime import datetime
from typing import Any, Optional, List from typing import Any, List, Dict, Optional, Iterable, Callable
import pendulum import pendulum
import asks import asks
import trio
from trio_typing import Nursery, TaskStatus
from fuzzywuzzy import process as fuzzy from fuzzywuzzy import process as fuzzy
import numpy as np import numpy as np
from piker.data.types import Struct from piker.data.types import Struct
from piker.data._web_bs import NoBsWs, open_autorecon_ws
from .._util import resproc from .._util import resproc
from piker import config from piker import config
from piker.log import get_logger from piker.log import get_logger
from tractor.trionics import broadcast_receiver, BroadcastReceiver, maybe_open_context
from tractor import to_asyncio
from cryptofeed import FeedHandler
from cryptofeed.defines import (
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
)
from cryptofeed.symbols import Symbol from cryptofeed.symbols import Symbol
log = get_logger(__name__) log = get_logger(__name__)
_spawn_kwargs = {
'infect_asyncio': True,
}
_url = 'https://www.deribit.com' _url = 'https://www.deribit.com'
_ws_url = 'wss://www.deribit.com/ws/api/v2'
_testnet_ws_url = 'wss://test.deribit.com/ws/api/v2'
# Broker specific ohlc schema (rest) # Broker specific ohlc schema (rest)
@ -55,7 +81,9 @@ _ohlc_dtype = [
class JSONRPCResult(Struct): class JSONRPCResult(Struct):
jsonrpc: str = '2.0' jsonrpc: str = '2.0'
result: dict id: int
result: Optional[dict] = None
error: Optional[dict] = None
usIn: int usIn: int
usOut: int usOut: int
usDiff: int usDiff: int
@ -83,6 +111,8 @@ class Trade(Struct):
instrument_name: str instrument_name: str
index_price: float index_price: float
direction: str direction: str
combo_trade_id: Optional[int] = 0,
combo_id: Optional[str] = '',
amount: float amount: float
class LastTradesResult(Struct): class LastTradesResult(Struct):
@ -95,24 +125,161 @@ def deribit_timestamp(when):
return int((when.timestamp() * 1000) + (when.microsecond / 1000)) return int((when.timestamp() * 1000) + (when.microsecond / 1000))
def str_to_cb_sym(name: str) -> Symbol:
base, strike_price, expiry_date, option_type = name.split('-')
quote = base
if option_type == 'put':
option_type = PUT
elif option_type == 'call':
option_type = CALL
else:
raise Exception("Couldn\'t parse option type")
return Symbol(
base, quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date,
expiry_normalize=False)
def piker_sym_to_cb_sym(name: str) -> Symbol:
base, expiry_date, strike_price, option_type = tuple(
name.upper().split('-'))
quote = base
if option_type == 'P':
option_type = PUT
elif option_type == 'C':
option_type = CALL
else:
raise Exception("Couldn\'t parse option type")
return Symbol(
base, quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date.upper())
def cb_sym_to_deribit_inst(sym: Symbol):
# cryptofeed normalized
cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z']
# deribit specific
months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC']
exp = sym.expiry_date
# YYMDD
# 01234
year, month, day = (
exp[:2], months[cb_norm.index(exp[2:3])], exp[3:])
otype = 'C' if sym.option_type == CALL else 'P'
return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}'
def get_config() -> dict[str, Any]:
conf, path = config.load()
section = conf.get('deribit')
# TODO: document why we send this, basically because logging params for cryptofeed
conf['log'] = {}
conf['log']['disabled'] = True
if section is None:
log.warning(f'No config section found for deribit in {path}')
return conf
class Client: class Client:
def __init__(self) -> None: def __init__(self, json_rpc: Callable) -> None:
self._sesh = asks.Session(connections=4) self._pairs: dict[str, Any] = None
self._sesh.base_location = _url
self._pairs: dict[str, Any] = {}
async def _api( config = get_config().get('deribit', {})
if ('key_id' in config) and ('key_secret' in config):
self._key_id = config['key_id']
self._key_secret = config['key_secret']
else:
self._key_id = None
self._key_secret = None
self.json_rpc = json_rpc
@property
def currencies(self):
return ['btc', 'eth', 'sol', 'usd']
async def get_balances(self, kind: str = 'option') -> dict[str, float]:
"""Return the set of positions for this account
by symbol.
"""
balances = {}
for currency in self.currencies:
resp = await self.json_rpc(
'private/get_positions', params={
'currency': currency.upper(),
'kind': kind})
balances[currency] = resp.result
return balances
async def get_assets(self) -> dict[str, float]:
"""Return the set of asset balances for this account
by symbol.
"""
balances = {}
for currency in self.currencies:
resp = await self.json_rpc(
'private/get_account_summary', params={
'currency': currency.upper()})
balances[currency] = resp.result['balance']
return balances
async def submit_limit(
self, self,
method: str, symbol: str,
params: dict, price: float,
) -> dict[str, Any]: action: str,
resp = await self._sesh.get( size: float
path=f'/api/v2/public/{method}', ) -> dict:
params=params, """Place an order
timeout=float('inf') """
) params = {
return resproc(resp, log) 'instrument_name': symbol.upper(),
'amount': size,
'type': 'limit',
'price': price,
}
resp = await self.json_rpc(
f'private/{action}', params)
return resp.result
async def submit_cancel(self, oid: str):
"""Send cancel request for order id
"""
resp = await self.json_rpc(
'private/cancel', {'order_id': oid})
return resp.result
async def symbol_info( async def symbol_info(
self, self,
@ -121,11 +288,11 @@ class Client:
kind: str = 'option', kind: str = 'option',
expired: bool = False expired: bool = False
) -> dict[str, Any]: ) -> dict[str, Any]:
'''Get symbol info for the exchange. """Get symbol info for the exchange.
''' """
# TODO: we can load from our self._pairs cache if self._pairs:
# on repeat calls... return self._pairs
# will retrieve all symbols by default # will retrieve all symbols by default
params = { params = {
@ -134,13 +301,13 @@ class Client:
'expired': str(expired).lower() 'expired': str(expired).lower()
} }
resp = await self._api( resp = await self.json_rpc('public/get_instruments', params)
'get_instruments', params=params) results = resp.result
results = resp['result']
instruments = { instruments = {
item['instrument_name']: item for item in results} item['instrument_name'].lower(): item
for item in results
}
if instrument is not None: if instrument is not None:
return instruments[instrument] return instruments[instrument]
@ -158,20 +325,18 @@ class Client:
async def search_symbols( async def search_symbols(
self, self,
pattern: str, pattern: str,
limit: int = None, limit: int = 30,
) -> dict[str, Any]: ) -> dict[str, Any]:
if self._pairs is not None: data = await self.symbol_info()
data = self._pairs
else:
data = await self.symbol_info()
matches = fuzzy.extractBests( matches = fuzzy.extractBests(
pattern, pattern,
data, data,
score_cutoff=50, score_cutoff=35,
limit=limit
) )
# repack in dict form # repack in dict form
return {item[0]['instrument_name']: item[0] return {item[0]['instrument_name'].lower(): item[0]
for item in matches} for item in matches}
async def bars( async def bars(
@ -195,19 +360,16 @@ class Client:
end_time = deribit_timestamp(end_dt) end_time = deribit_timestamp(end_dt)
# https://docs.deribit.com/#public-get_tradingview_chart_data # https://docs.deribit.com/#public-get_tradingview_chart_data
response = await self._api( resp = await self.json_rpc(
'get_tradingview_chart_data', 'public/get_tradingview_chart_data',
params={ params={
'instrument_name': instrument.upper(), 'instrument_name': instrument.upper(),
'start_timestamp': start_time, 'start_timestamp': start_time,
'end_timestamp': end_time, 'end_timestamp': end_time,
'resolution': '1' 'resolution': '1'
} })
)
klines = JSONRPCResult(**response) result = KLinesResult(**resp.result)
result = KLinesResult(**klines.result)
new_bars = [] new_bars = []
for i in range(len(result.close)): for i in range(len(result.close)):
@ -237,19 +399,308 @@ class Client:
instrument: str, instrument: str,
count: int = 10 count: int = 10
): ):
response = await self._api( resp = await self.json_rpc(
'get_last_trades_by_instrument', 'public/get_last_trades_by_instrument',
params={ params={
'instrument_name': instrument, 'instrument_name': instrument,
'count': count 'count': count
} })
)
return LastTradesResult(**(JSONRPCResult(**response).result)) return LastTradesResult(**resp.result)
@acm @acm
async def get_client() -> Client: async def get_client() -> Client:
client = Client() async with (
await client.cache_symbols() trio.open_nursery() as n,
yield client open_autorecon_ws(_testnet_ws_url) as ws
):
_rpc_id: Iterable = count(0)
_rpc_results: Dict[int, Dict] = {}
_expiry_time: int = float('inf')
_access_token: Optional[str] = None
_refresh_token: Optional[str] = None
def _next_json_body(method: str, params: Dict):
"""get the typical json rpc 2.0 msg body and increment the req id
"""
return {
'jsonrpc': '2.0',
'id': next(_rpc_id),
'method': method,
'params': params
}
async def json_rpc(method: str, params: Dict) -> Dict:
"""perform a json rpc call and wait for the result, raise exception in
case of error field present on response
"""
msg = _next_json_body(method, params)
_id = msg['id']
_rpc_results[_id] = {
'result': None,
'event': trio.Event()
}
await ws.send_msg(msg)
await _rpc_results[_id]['event'].wait()
ret = _rpc_results[_id]['result']
del _rpc_results[_id]
if ret.error is not None:
raise Exception(json.dumps(ret.error, indent=4))
return ret
async def _recv_task():
"""receives every ws message and stores it in its corresponding result
field, then sets the event to wakeup original sender tasks.
"""
async for msg in ws:
msg = JSONRPCResult(**msg)
if msg.id not in _rpc_results:
# in case this message wasn't beign accounted for store it
_rpc_results[msg.id] = {
'result': None,
'event': trio.Event()
}
_rpc_results[msg.id]['result'] = msg
_rpc_results[msg.id]['event'].set()
client = Client(json_rpc)
async def _auth_loop(
task_status: TaskStatus = trio.TASK_STATUS_IGNORED
):
"""Background task that adquires a first access token and then will
refresh the access token while the nursery isn't cancelled.
https://docs.deribit.com/?python#authentication-2
"""
renew_time = 10
access_scope = 'trade:read_write'
_expiry_time = time.time()
got_access = False
nonlocal _refresh_token
nonlocal _access_token
while True:
if time.time() - _expiry_time < renew_time:
# if we are close to token expiry time
if _refresh_token != None:
# if we have a refresh token already dont need to send
# secret
params = {
'grant_type': 'refresh_token',
'refresh_token': _refresh_token,
'scope': access_scope
}
else:
# we don't have refresh token, send secret to initialize
params = {
'grant_type': 'client_credentials',
'client_id': client._key_id,
'client_secret': client._key_secret,
'scope': access_scope
}
resp = await json_rpc('public/auth', params)
result = resp.result
_expiry_time = time.time() + result['expires_in']
_refresh_token = result['refresh_token']
if 'access_token' in result:
_access_token = result['access_token']
if not got_access:
# first time this loop runs we must indicate task is
# started, we have auth
got_access = True
task_status.started()
else:
await trio.sleep(renew_time / 2)
n.start_soon(_recv_task)
# if we have client creds launch auth loop
if client._key_id is not None:
await n.start(_auth_loop)
await client.cache_symbols()
yield client
@acm
async def open_feed_handler():
fh = FeedHandler(config=get_config())
yield fh
await to_asyncio.run_task(fh.stop_async)
@acm
async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
async with maybe_open_context(
acm_func=open_feed_handler,
key='feedhandler',
) as (cache_hit, fh):
yield fh
@acm
async def open_price_feed(
instrument: str
) -> trio.abc.ReceiveStream:
# XXX: hangs when going into this ctx mngr
async with maybe_open_feed_handler() as fh:
async def relay(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _trade(data: dict, receipt_timestamp):
to_trio.send_nowait(('trade', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'last': data,
'broker_ts': time.time(),
'data': data.to_dict(),
'receipt': receipt_timestamp
}))
async def _l1(data: dict, receipt_timestamp):
to_trio.send_nowait(('l1', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'ticks': [
{'type': 'bid',
'price': float(data.bid_price), 'size': float(data.bid_size)},
{'type': 'bsize',
'price': float(data.bid_price), 'size': float(data.bid_size)},
{'type': 'ask',
'price': float(data.ask_price), 'size': float(data.ask_size)},
{'type': 'asize',
'price': float(data.ask_price), 'size': float(data.ask_size)}
]
}))
fh.add_feed(
DERIBIT,
channels=[TRADES, L1_BOOK],
symbols=[instrument],
callbacks={
TRADES: _trade,
L1_BOOK: _l1
})
if not fh.running:
fh.run(
start_loop=False,
install_signal_handlers=False)
# sync with trio
to_trio.send_nowait(None)
try:
await asyncio.sleep(float('inf'))
except asyncio.exceptions.CancelledError:
...
async with to_asyncio.open_channel_from(
relay
) as (first, chan):
yield chan
@acm
async def maybe_open_price_feed(
instrument: str
) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context
async with maybe_open_context(
acm_func=open_price_feed,
kwargs={
'instrument': instrument
},
key=f'{instrument}-price',
) as (cache_hit, feed):
if cache_hit:
yield broadcast_receiver(feed, 10)
else:
yield feed
@acm
async def open_order_feed(
instrument: List[str]
) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh:
async def relay(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _fill(data: dict, receipt_timestamp):
breakpoint()
async def _order_info(data: dict, receipt_timestamp):
breakpoint()
fh.add_feed(
DERIBIT,
channels=[FILLS, ORDER_INFO],
symbols=[instrument],
callbacks={
FILLS: _fill,
ORDER_INFO: _order_info,
})
if not fh.running:
fh.run(
start_loop=False,
install_signal_handlers=False)
# sync with trio
to_trio.send_nowait(None)
try:
await asyncio.sleep(float('inf'))
except asyncio.exceptions.CancelledError:
...
async with to_asyncio.open_channel_from(
relay
) as (first, chan):
yield chan
@acm
async def maybe_open_order_feed(
instrument: str
) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context
async with maybe_open_context(
acm_func=open_order_feed,
kwargs={
'instrument': instrument
},
key=f'{instrument}-order',
) as (cache_hit, feed):
if cache_hit:
yield broadcast_receiver(feed, 10)
else:
yield feed

Binary file not shown.

After

Width:  |  Height:  |  Size: 169 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 106 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 59 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 70 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 132 KiB

View File

@ -0,0 +1,44 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Order api and machinery
'''
from typing import Any, AsyncIterator
import tractor
@tractor.context
async def trades_dialogue(
ctx: tractor.Context,
loglevel: str = None,
) -> AsyncIterator[dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
async with open_cached_client('deribit') as client:
if not client._key_id:
raise RuntimeError('Missing Deribit API key in `brokers.toml`!?!?')
acc_name = f'deribit.{client._key_id}'
await client.cache_symbols()
breakpoint()

View File

@ -1,3 +1,6 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or # the Free Software Foundation, either version 3 of the License, or
@ -15,9 +18,6 @@
Deribit backend. Deribit backend.
''' '''
import asyncio
from async_generator import aclosing
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from datetime import datetime from datetime import datetime
from typing import Any, Optional, List, Callable from typing import Any, Optional, List, Callable
@ -29,9 +29,7 @@ import pendulum
from fuzzywuzzy import process as fuzzy from fuzzywuzzy import process as fuzzy
import numpy as np import numpy as np
import tractor import tractor
from tractor import to_asyncio
from piker import config
from piker._cacheables import open_cached_client from piker._cacheables import open_cached_client
from piker.log import get_logger, get_console_log from piker.log import get_logger, get_console_log
from piker.data import ShmArray from piker.data import ShmArray
@ -47,170 +45,21 @@ from cryptofeed.defines import (
) )
from cryptofeed.symbols import Symbol from cryptofeed.symbols import Symbol
from .api import Client, Trade from .api import (
Client, Trade,
get_config,
str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
maybe_open_price_feed
)
_spawn_kwargs = { _spawn_kwargs = {
'infect_asyncio': True, 'infect_asyncio': True,
} }
def get_config() -> dict[str, Any]:
conf, path = config.load()
section = conf.get('deribit')
if section is None:
log.warning(f'No config section found for deribit in {path}')
return {}
conf['log'] = {}
conf['log']['disabled'] = True
# conf['log']['filename'] = '/tmp/feedhandler.log'
# conf['log']['level'] = 'WARNING'
return conf
log = get_logger(__name__) log = get_logger(__name__)
_url = 'https://www.deribit.com'
def str_to_cb_sym(name: str) -> Symbol:
base, strike_price, expiry_date, option_type = name.split('-')
quote = base
if option_type == 'put':
option_type = PUT
elif option_type == 'call':
option_type = CALL
else:
raise BaseException("Couldn\'t parse option type")
return Symbol(
base, quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date,
expiry_normalize=False)
def piker_sym_to_cb_sym(name: str) -> Symbol:
base, expiry_date, strike_price, option_type = tuple(
name.upper().split('-'))
quote = base
if option_type == 'P':
option_type = PUT
elif option_type == 'C':
option_type = CALL
else:
raise BaseException("Couldn\'t parse option type")
return Symbol(
base, quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date.upper())
def cb_sym_to_deribit_inst(sym: Symbol):
# cryptofeed normalized
cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z']
# deribit specific
months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC']
exp = sym.expiry_date
# YYMDD
# 01234
year, month, day = (
exp[:2], months[cb_norm.index(exp[2:3])], exp[3:])
otype = 'C' if sym.option_type == CALL else 'P'
return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}'
# inside here we are in an asyncio context
async def open_aio_cryptofeed_relay(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
instruments: List[str] = []
) -> None:
instruments = [piker_sym_to_cb_sym(i) for i in instruments]
async def trade_cb(data: dict, receipt_timestamp):
to_trio.send_nowait(('trade', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'last': data,
'broker_ts': time.time(),
'data': data.to_dict(),
'receipt': receipt_timestamp
}))
async def l1_book_cb(data: dict, receipt_timestamp):
to_trio.send_nowait(('l1', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'ticks': [
{'type': 'bid',
'price': float(data.bid_price), 'size': float(data.bid_size)},
{'type': 'bsize',
'price': float(data.bid_price), 'size': float(data.bid_size)},
{'type': 'ask',
'price': float(data.ask_price), 'size': float(data.ask_size)},
{'type': 'asize',
'price': float(data.ask_price), 'size': float(data.ask_size)}
]
}))
fh = FeedHandler(config=get_config())
fh.run(start_loop=False)
fh.add_feed(
DERIBIT,
channels=[L1_BOOK],
symbols=instruments,
callbacks={L1_BOOK: l1_book_cb})
fh.add_feed(
DERIBIT,
channels=[TRADES],
symbols=instruments,
callbacks={TRADES: trade_cb})
# sync with trio
to_trio.send_nowait(None)
await asyncio.sleep(float('inf'))
@acm
async def open_cryptofeeds(
instruments: List[str]
) -> trio.abc.ReceiveStream:
async with to_asyncio.open_channel_from(
open_aio_cryptofeed_relay,
instruments=instruments,
) as (first, chan):
yield chan
@acm @acm
async def open_history_client( async def open_history_client(
instrument: str, instrument: str,
@ -278,9 +127,7 @@ async def stream_quotes(
async with ( async with (
open_cached_client('deribit') as client, open_cached_client('deribit') as client,
send_chan as send_chan, send_chan as send_chan
trio.open_nursery() as n,
open_cryptofeeds(symbols) as stream
): ):
init_msgs = { init_msgs = {
@ -298,26 +145,36 @@ async def stream_quotes(
nsym = piker_sym_to_cb_sym(sym) nsym = piker_sym_to_cb_sym(sym)
# keep client cached for real-time section async with maybe_open_price_feed(sym) as stream:
cache = await client.cache_symbols()
last_trade = Trade(**(await client.last_trades( cache = await client.cache_symbols()
cb_sym_to_deribit_inst(nsym), count=1)).trades[0])
first_quote = { last_trades = (await client.last_trades(
'symbol': sym, cb_sym_to_deribit_inst(nsym), count=1)).trades
'last': last_trade.price,
'brokerd_ts': last_trade.timestamp, if len(last_trades) == 0:
'ticks': [{ last_trade = None
'type': 'trade', async for typ, quote in stream:
'price': last_trade.price, if typ == 'trade':
'size': last_trade.amount, last_trade = Trade(**(quote['data']))
'broker_ts': last_trade.timestamp break
}]
} else:
task_status.started((init_msgs, first_quote)) last_trade = Trade(**(last_trades[0]))
first_quote = {
'symbol': sym,
'last': last_trade.price,
'brokerd_ts': last_trade.timestamp,
'ticks': [{
'type': 'trade',
'price': last_trade.price,
'size': last_trade.amount,
'broker_ts': last_trade.timestamp
}]
}
task_status.started((init_msgs, first_quote))
async with aclosing(stream):
feed_is_live.set() feed_is_live.set()
async for typ, quote in stream: async for typ, quote in stream:
@ -338,15 +195,6 @@ async def open_symbol_search(
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
async for pattern in stream: async for pattern in stream:
# results = await client.symbol_info(sym=pattern.upper())
matches = fuzzy.extractBests(
pattern,
cache,
score_cutoff=30,
)
# repack in dict form # repack in dict form
await stream.send( await stream.send(
{item[0]['instrument_name']: item[0] await client.search_symbols(pattern))
for item in matches}
)

View File

@ -20,7 +20,7 @@ ToOlS fOr CoPInG wITh "tHE wEB" protocols.
""" """
from contextlib import asynccontextmanager, AsyncExitStack from contextlib import asynccontextmanager, AsyncExitStack
from types import ModuleType from types import ModuleType
from typing import Any, Callable, AsyncGenerator from typing import Any, Optional, Callable, AsyncGenerator
import json import json
import trio import trio
@ -54,8 +54,8 @@ class NoBsWs:
self, self,
url: str, url: str,
stack: AsyncExitStack, stack: AsyncExitStack,
fixture: Callable, fixture: Optional[Callable] = None,
serializer: ModuleType = json, serializer: ModuleType = json
): ):
self.url = url self.url = url
self.fixture = fixture self.fixture = fixture
@ -80,12 +80,14 @@ class NoBsWs:
self._ws = await self._stack.enter_async_context( self._ws = await self._stack.enter_async_context(
trio_websocket.open_websocket_url(self.url) trio_websocket.open_websocket_url(self.url)
) )
# rerun user code fixture
ret = await self._stack.enter_async_context(
self.fixture(self)
)
assert ret is None if self.fixture is not None:
# rerun user code fixture
ret = await self._stack.enter_async_context(
self.fixture(self)
)
assert ret is None
log.info(f'Connection success: {self.url}') log.info(f'Connection success: {self.url}')
return self._ws return self._ws
@ -121,13 +123,19 @@ class NoBsWs:
except self.recon_errors: except self.recon_errors:
await self._connect() await self._connect()
def __aiter__(self):
return self
async def __anext__(self):
return await self.recv_msg()
@asynccontextmanager @asynccontextmanager
async def open_autorecon_ws( async def open_autorecon_ws(
url: str, url: str,
# TODO: proper type annot smh # TODO: proper type annot smh
fixture: Callable, fixture: Optional[Callable] = None,
) -> AsyncGenerator[tuple[...], NoBsWs]: ) -> AsyncGenerator[tuple[...], NoBsWs]:
"""Apparently we can QoS for all sorts of reasons..so catch em. """Apparently we can QoS for all sorts of reasons..so catch em.

View File

@ -20,4 +20,4 @@
-e git+https://github.com/pikers/asyncvnc.git@main#egg=asyncvnc -e git+https://github.com/pikers/asyncvnc.git@main#egg=asyncvnc
# ``cryptofeed`` for connecting to various crypto exchanges + custom fixes # ``cryptofeed`` for connecting to various crypto exchanges + custom fixes
-e git+https://github.com/guilledk/cryptofeed.git@date_parsing#egg=cryptofeed -e git+https://github.com/pikers/cryptofeed.git@date_parsing#egg=cryptofeed