Merge pull request #331 from pikers/deribit

Deribit backend & minimal broker check tool
size_in_shm_token
goodboy 2022-08-25 10:08:29 -04:00 committed by GitHub
commit b5499b8225
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1222 additions and 10 deletions

View File

@ -50,3 +50,8 @@ prefer_data_account = [
paper = "XX0000000" paper = "XX0000000"
margin = "X0000000" margin = "X0000000"
ira = "X0000000" ira = "X0000000"
[deribit]
key_id = 'XXXXXXXX'
key_secret = 'Xx_XxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXx'

View File

@ -39,6 +39,148 @@ _config_dir = click.get_app_dir('piker')
_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') _watchlists_data_path = os.path.join(_config_dir, 'watchlists.json')
OK = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
def print_ok(s: str, **kwargs):
print(OK + s + ENDC, **kwargs)
def print_error(s: str, **kwargs):
print(FAIL + s + ENDC, **kwargs)
def get_method(client, meth_name: str):
print(f'checking client for method \'{meth_name}\'...', end='', flush=True)
method = getattr(client, meth_name, None)
assert method
print_ok('found!.')
return method
async def run_method(client, meth_name: str, **kwargs):
method = get_method(client, meth_name)
print('running...', end='', flush=True)
result = await method(**kwargs)
print_ok(f'done! result: {type(result)}')
return result
async def run_test(broker_name: str):
brokermod = get_brokermod(broker_name)
total = 0
passed = 0
failed = 0
print(f'getting client...', end='', flush=True)
if not hasattr(brokermod, 'get_client'):
print_error('fail! no \'get_client\' context manager found.')
return
async with brokermod.get_client(is_brokercheck=True) as client:
print_ok(f'done! inside client context.')
# check for methods present on brokermod
method_list = [
'backfill_bars',
'get_client',
'trades_dialogue',
'open_history_client',
'open_symbol_search',
'stream_quotes',
]
for method in method_list:
print(
f'checking brokermod for method \'{method}\'...',
end='', flush=True)
if not hasattr(brokermod, method):
print_error(f'fail! method \'{method}\' not found.')
failed += 1
else:
print_ok('done!')
passed += 1
total += 1
# check for methods present con brokermod.Client and their
# results
# for private methods only check is present
method_list = [
'get_balances',
'get_assets',
'get_trades',
'get_xfers',
'submit_limit',
'submit_cancel',
'search_symbols',
]
for method_name in method_list:
try:
get_method(client, method_name)
passed += 1
except AssertionError:
print_error(f'fail! method \'{method_name}\' not found.')
failed += 1
total += 1
# check for methods present con brokermod.Client and their
# results
syms = await run_method(client, 'symbol_info')
total += 1
if len(syms) == 0:
raise BaseException('Empty Symbol list?')
passed += 1
first_sym = tuple(syms.keys())[0]
method_list = [
('cache_symbols', {}),
('search_symbols', {'pattern': first_sym[:-1]}),
('bars', {'symbol': first_sym})
]
for method_name, method_kwargs in method_list:
try:
await run_method(client, method_name, **method_kwargs)
passed += 1
except AssertionError:
print_error(f'fail! method \'{method_name}\' not found.')
failed += 1
total += 1
print(f'total: {total}, passed: {passed}, failed: {failed}')
@cli.command()
@click.argument('broker', nargs=1, required=True)
@click.pass_obj
def brokercheck(config, broker):
'''
Test broker apis for completeness.
'''
async def bcheck_main():
async with maybe_spawn_brokerd(broker) as portal:
await portal.run(run_test, broker)
await portal.cancel_actor()
trio.run(run_test, broker)
@cli.command() @cli.command()
@click.option('--keys', '-k', multiple=True, @click.option('--keys', '-k', multiple=True,
help='Return results only for these keys') help='Return results only for these keys')
@ -193,6 +335,8 @@ def contracts(ctx, loglevel, broker, symbol, ids):
brokermod = get_brokermod(broker) brokermod = get_brokermod(broker)
get_console_log(loglevel) get_console_log(loglevel)
contracts = trio.run(partial(core.contracts, brokermod, symbol)) contracts = trio.run(partial(core.contracts, brokermod, symbol))
if not ids: if not ids:
# just print out expiry dates which can be used with # just print out expiry dates which can be used with

View File

@ -0,0 +1,70 @@
``deribit`` backend
------------------
pretty good liquidity crypto derivatives, uses custom json rpc over ws for
client methods, then `cryptofeed` for data streams.
status
******
- supports option charts
- no order support yet
config
******
In order to get order mode support your ``brokers.toml``
needs to have something like the following:
.. code:: toml
[deribit]
key_id = 'XXXXXXXX'
key_secret = 'Xx_XxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXxXx'
To obtain an api id and secret you need to create an account, which can be a
real market account over at:
- deribit.com (requires KYC for deposit address)
Or a testnet account over at:
- test.deribit.com
For testnet once the account is created here is how you deposit fake crypto to
try it out:
1) Go to Wallet:
.. figure:: assets/0_wallet.png
:align: center
:target: assets/0_wallet.png
:alt: wallet page
2) Then click on the elipsis menu and select deposit
.. figure:: assets/1_wallet_select_deposit.png
:align: center
:target: assets/1_wallet_select_deposit.png
:alt: wallet deposit page
3) This will take you to the deposit address page
.. figure:: assets/2_gen_deposit_addr.png
:align: center
:target: assets/2_gen_deposit_addr.png
:alt: generate deposit address page
4) After clicking generate you should see the address, copy it and go to the
`coin faucet <https://test.deribit.com/dericoin/BTC/deposit>`_ and send fake
coins to that address.
.. figure:: assets/3_deposit_address.png
:align: center
:target: assets/3_deposit_address.png
:alt: generated address
5) Back in the deposit address page you should see the deposit in your history
.. figure:: assets/4_wallet_deposit_history.png
:align: center
:target: assets/4_wallet_deposit_history.png
:alt: wallet deposit history

View File

@ -0,0 +1,65 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Deribit backend.
'''
from piker.log import get_logger
log = get_logger(__name__)
from .api import (
get_client,
)
from .feed import (
open_history_client,
open_symbol_search,
stream_quotes,
backfill_bars
)
# from .broker import (
# trades_dialogue,
# norm_trade_records,
# )
__all__ = [
'get_client',
# 'trades_dialogue',
'open_history_client',
'open_symbol_search',
'stream_quotes',
# 'norm_trade_records',
]
# tractor RPC enable arg
__enable_modules__: list[str] = [
'api',
'feed',
# 'broker',
]
# passed to ``tractor.ActorNursery.start_actor()``
_spawn_kwargs = {
'infect_asyncio': True,
}
# annotation to let backend agnostic code
# know if ``brokerd`` should be spawned with
# ``tractor``'s aio mode.
_infect_asyncio: bool = True

View File

@ -0,0 +1,716 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Deribit backend.
'''
import json
import time
import asyncio
from contextlib import asynccontextmanager as acm, AsyncExitStack
from itertools import count
from functools import partial
from datetime import datetime
from typing import Any, Optional, Iterable, Callable
import pendulum
import asks
import trio
from trio_typing import Nursery, TaskStatus
from fuzzywuzzy import process as fuzzy
import numpy as np
from piker.data.types import Struct
from piker.data._web_bs import NoBsWs, open_autorecon_ws
from .._util import resproc
from piker import config
from piker.log import get_logger
from tractor.trionics import (
broadcast_receiver,
BroadcastReceiver,
maybe_open_context
)
from tractor import to_asyncio
from cryptofeed import FeedHandler
from cryptofeed.defines import (
DERIBIT,
L1_BOOK, TRADES,
OPTION, CALL, PUT
)
from cryptofeed.symbols import Symbol
log = get_logger(__name__)
_spawn_kwargs = {
'infect_asyncio': True,
}
_url = 'https://www.deribit.com'
_ws_url = 'wss://www.deribit.com/ws/api/v2'
_testnet_ws_url = 'wss://test.deribit.com/ws/api/v2'
# Broker specific ohlc schema (rest)
_ohlc_dtype = [
('index', int),
('time', int),
('open', float),
('high', float),
('low', float),
('close', float),
('volume', float),
('bar_wap', float), # will be zeroed by sampler if not filled
]
class JSONRPCResult(Struct):
jsonrpc: str = '2.0'
id: int
result: Optional[dict] = None
error: Optional[dict] = None
usIn: int
usOut: int
usDiff: int
testnet: bool
class KLinesResult(Struct):
close: list[float]
cost: list[float]
high: list[float]
low: list[float]
open: list[float]
status: str
ticks: list[int]
volume: list[float]
class Trade(Struct):
trade_seq: int
trade_id: str
timestamp: int
tick_direction: int
price: float
mark_price: float
iv: float
instrument_name: str
index_price: float
direction: str
combo_trade_id: Optional[int] = 0,
combo_id: Optional[str] = '',
amount: float
class LastTradesResult(Struct):
trades: list[Trade]
has_more: bool
# convert datetime obj timestamp to unixtime in milliseconds
def deribit_timestamp(when):
return int((when.timestamp() * 1000) + (when.microsecond / 1000))
def str_to_cb_sym(name: str) -> Symbol:
base, strike_price, expiry_date, option_type = name.split('-')
quote = base
if option_type == 'put':
option_type = PUT
elif option_type == 'call':
option_type = CALL
else:
raise Exception("Couldn\'t parse option type")
return Symbol(
base, quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date,
expiry_normalize=False)
def piker_sym_to_cb_sym(name: str) -> Symbol:
base, expiry_date, strike_price, option_type = tuple(
name.upper().split('-'))
quote = base
if option_type == 'P':
option_type = PUT
elif option_type == 'C':
option_type = CALL
else:
raise Exception("Couldn\'t parse option type")
return Symbol(
base, quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date.upper())
def cb_sym_to_deribit_inst(sym: Symbol):
# cryptofeed normalized
cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z']
# deribit specific
months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC']
exp = sym.expiry_date
# YYMDD
# 01234
year, month, day = (
exp[:2], months[cb_norm.index(exp[2:3])], exp[3:])
otype = 'C' if sym.option_type == CALL else 'P'
return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}'
def get_config() -> dict[str, Any]:
conf, path = config.load()
section = conf.get('deribit')
# TODO: document why we send this, basically because logging params for cryptofeed
conf['log'] = {}
conf['log']['disabled'] = True
if section is None:
log.warning(f'No config section found for deribit in {path}')
return conf
class Client:
def __init__(self, json_rpc: Callable) -> None:
self._pairs: dict[str, Any] = None
config = get_config().get('deribit', {})
if ('key_id' in config) and ('key_secret' in config):
self._key_id = config['key_id']
self._key_secret = config['key_secret']
else:
self._key_id = None
self._key_secret = None
self.json_rpc = json_rpc
@property
def currencies(self):
return ['btc', 'eth', 'sol', 'usd']
async def get_balances(self, kind: str = 'option') -> dict[str, float]:
"""Return the set of positions for this account
by symbol.
"""
balances = {}
for currency in self.currencies:
resp = await self.json_rpc(
'private/get_positions', params={
'currency': currency.upper(),
'kind': kind})
balances[currency] = resp.result
return balances
async def get_assets(self) -> dict[str, float]:
"""Return the set of asset balances for this account
by symbol.
"""
balances = {}
for currency in self.currencies:
resp = await self.json_rpc(
'private/get_account_summary', params={
'currency': currency.upper()})
balances[currency] = resp.result['balance']
return balances
async def submit_limit(
self,
symbol: str,
price: float,
action: str,
size: float
) -> dict:
"""Place an order
"""
params = {
'instrument_name': symbol.upper(),
'amount': size,
'type': 'limit',
'price': price,
}
resp = await self.json_rpc(
f'private/{action}', params)
return resp.result
async def submit_cancel(self, oid: str):
"""Send cancel request for order id
"""
resp = await self.json_rpc(
'private/cancel', {'order_id': oid})
return resp.result
async def symbol_info(
self,
instrument: Optional[str] = None,
currency: str = 'btc', # BTC, ETH, SOL, USDC
kind: str = 'option',
expired: bool = False
) -> dict[str, Any]:
"""Get symbol info for the exchange.
"""
if self._pairs:
return self._pairs
# will retrieve all symbols by default
params = {
'currency': currency.upper(),
'kind': kind,
'expired': str(expired).lower()
}
resp = await self.json_rpc('public/get_instruments', params)
results = resp.result
instruments = {
item['instrument_name'].lower(): item
for item in results
}
if instrument is not None:
return instruments[instrument]
else:
return instruments
async def cache_symbols(
self,
) -> dict:
if not self._pairs:
self._pairs = await self.symbol_info()
return self._pairs
async def search_symbols(
self,
pattern: str,
limit: int = 30,
) -> dict[str, Any]:
data = await self.symbol_info()
matches = fuzzy.extractBests(
pattern,
data,
score_cutoff=35,
limit=limit
)
# repack in dict form
return {item[0]['instrument_name'].lower(): item[0]
for item in matches}
async def bars(
self,
symbol: str,
start_dt: Optional[datetime] = None,
end_dt: Optional[datetime] = None,
limit: int = 1000,
as_np: bool = True,
) -> dict:
instrument = symbol
if end_dt is None:
end_dt = pendulum.now('UTC')
if start_dt is None:
start_dt = end_dt.start_of(
'minute').subtract(minutes=limit)
start_time = deribit_timestamp(start_dt)
end_time = deribit_timestamp(end_dt)
# https://docs.deribit.com/#public-get_tradingview_chart_data
resp = await self.json_rpc(
'public/get_tradingview_chart_data',
params={
'instrument_name': instrument.upper(),
'start_timestamp': start_time,
'end_timestamp': end_time,
'resolution': '1'
})
result = KLinesResult(**resp.result)
new_bars = []
for i in range(len(result.close)):
_open = result.open[i]
high = result.high[i]
low = result.low[i]
close = result.close[i]
volume = result.volume[i]
row = [
(start_time + (i * (60 * 1000))) / 1000.0, # time
result.open[i],
result.high[i],
result.low[i],
result.close[i],
result.volume[i],
0
]
new_bars.append((i,) + tuple(row))
array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else klines
return array
async def last_trades(
self,
instrument: str,
count: int = 10
):
resp = await self.json_rpc(
'public/get_last_trades_by_instrument',
params={
'instrument_name': instrument,
'count': count
})
return LastTradesResult(**resp.result)
@acm
async def get_client(
is_brokercheck: bool = False
) -> Client:
async with (
trio.open_nursery() as n,
open_autorecon_ws(_testnet_ws_url) as ws
):
_rpc_id: Iterable = count(0)
_rpc_results: dict[int, dict] = {}
_expiry_time: int = float('inf')
_access_token: Optional[str] = None
_refresh_token: Optional[str] = None
async def json_rpc(method: str, params: dict) -> dict:
"""perform a json rpc call and wait for the result, raise exception in
case of error field present on response
"""
msg = {
'jsonrpc': '2.0',
'id': next(_rpc_id),
'method': method,
'params': params
}
_id = msg['id']
_rpc_results[_id] = {
'result': None,
'event': trio.Event()
}
await ws.send_msg(msg)
await _rpc_results[_id]['event'].wait()
ret = _rpc_results[_id]['result']
del _rpc_results[_id]
if ret.error is not None:
raise Exception(json.dumps(ret.error, indent=4))
return ret
async def _recv_task():
"""receives every ws message and stores it in its corresponding result
field, then sets the event to wakeup original sender tasks.
"""
async for msg in ws:
msg = JSONRPCResult(**msg)
if msg.id not in _rpc_results:
# in case this message wasn't beign accounted for store it
_rpc_results[msg.id] = {
'result': None,
'event': trio.Event()
}
_rpc_results[msg.id]['result'] = msg
_rpc_results[msg.id]['event'].set()
client = Client(json_rpc)
async def _auth_loop(
task_status: TaskStatus = trio.TASK_STATUS_IGNORED
):
"""Background task that adquires a first access token and then will
refresh the access token while the nursery isn't cancelled.
https://docs.deribit.com/?python#authentication-2
"""
renew_time = 10
access_scope = 'trade:read_write'
_expiry_time = time.time()
got_access = False
nonlocal _refresh_token
nonlocal _access_token
while True:
if time.time() - _expiry_time < renew_time:
# if we are close to token expiry time
if _refresh_token != None:
# if we have a refresh token already dont need to send
# secret
params = {
'grant_type': 'refresh_token',
'refresh_token': _refresh_token,
'scope': access_scope
}
else:
# we don't have refresh token, send secret to initialize
params = {
'grant_type': 'client_credentials',
'client_id': client._key_id,
'client_secret': client._key_secret,
'scope': access_scope
}
resp = await json_rpc('public/auth', params)
result = resp.result
_expiry_time = time.time() + result['expires_in']
_refresh_token = result['refresh_token']
if 'access_token' in result:
_access_token = result['access_token']
if not got_access:
# first time this loop runs we must indicate task is
# started, we have auth
got_access = True
task_status.started()
else:
await trio.sleep(renew_time / 2)
n.start_soon(_recv_task)
# if we have client creds launch auth loop
if client._key_id is not None:
await n.start(_auth_loop)
await client.cache_symbols()
yield client
n.cancel_scope.cancel()
@acm
async def open_feed_handler():
fh = FeedHandler(config=get_config())
yield fh
await to_asyncio.run_task(fh.stop_async)
@acm
async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
async with maybe_open_context(
acm_func=open_feed_handler,
key='feedhandler',
) as (cache_hit, fh):
yield fh
async def aio_price_feed_relay(
fh: FeedHandler,
instrument: Symbol,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _trade(data: dict, receipt_timestamp):
to_trio.send_nowait(('trade', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'last': data,
'broker_ts': time.time(),
'data': data.to_dict(),
'receipt': receipt_timestamp
}))
async def _l1(data: dict, receipt_timestamp):
to_trio.send_nowait(('l1', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'ticks': [
{'type': 'bid',
'price': float(data.bid_price), 'size': float(data.bid_size)},
{'type': 'bsize',
'price': float(data.bid_price), 'size': float(data.bid_size)},
{'type': 'ask',
'price': float(data.ask_price), 'size': float(data.ask_size)},
{'type': 'asize',
'price': float(data.ask_price), 'size': float(data.ask_size)}
]
}))
fh.add_feed(
DERIBIT,
channels=[TRADES, L1_BOOK],
symbols=[piker_sym_to_cb_sym(instrument)],
callbacks={
TRADES: _trade,
L1_BOOK: _l1
})
if not fh.running:
fh.run(
start_loop=False,
install_signal_handlers=False)
# sync with trio
to_trio.send_nowait(None)
await asyncio.sleep(float('inf'))
@acm
async def open_price_feed(
instrument: str
) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from(
partial(
aio_price_feed_relay,
fh,
instrument
)
) as (first, chan):
yield chan
@acm
async def maybe_open_price_feed(
instrument: str
) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context
async with maybe_open_context(
acm_func=open_price_feed,
kwargs={
'instrument': instrument
},
key=f'{instrument}-price',
) as (cache_hit, feed):
if cache_hit:
yield broadcast_receiver(feed, 10)
else:
yield feed
async def aio_order_feed_relay(
fh: FeedHandler,
instrument: Symbol,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _fill(data: dict, receipt_timestamp):
breakpoint()
async def _order_info(data: dict, receipt_timestamp):
breakpoint()
fh.add_feed(
DERIBIT,
channels=[FILLS, ORDER_INFO],
symbols=[instrument.upper()],
callbacks={
FILLS: _fill,
ORDER_INFO: _order_info,
})
if not fh.running:
fh.run(
start_loop=False,
install_signal_handlers=False)
# sync with trio
to_trio.send_nowait(None)
await asyncio.sleep(float('inf'))
@acm
async def open_order_feed(
instrument: list[str]
) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from(
partial(
aio_order_feed_relay,
fh,
instrument
)
) as (first, chan):
yield chan
@acm
async def maybe_open_order_feed(
instrument: str
) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context
async with maybe_open_context(
acm_func=open_order_feed,
kwargs={
'instrument': instrument,
'fh': fh
},
key=f'{instrument}-order',
) as (cache_hit, feed):
if cache_hit:
yield broadcast_receiver(feed, 10)
else:
yield feed

Binary file not shown.

After

Width:  |  Height:  |  Size: 169 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 106 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 59 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 70 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 132 KiB

View File

@ -0,0 +1,200 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Deribit backend.
'''
from contextlib import asynccontextmanager as acm
from datetime import datetime
from typing import Any, Optional, Callable
import time
import trio
from trio_typing import TaskStatus
import pendulum
from fuzzywuzzy import process as fuzzy
import numpy as np
import tractor
from piker._cacheables import open_cached_client
from piker.log import get_logger, get_console_log
from piker.data import ShmArray
from piker.brokers._util import (
BrokerError,
DataUnavailable,
)
from cryptofeed import FeedHandler
from cryptofeed.defines import (
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
)
from cryptofeed.symbols import Symbol
from .api import (
Client, Trade,
get_config,
str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
maybe_open_price_feed
)
_spawn_kwargs = {
'infect_asyncio': True,
}
log = get_logger(__name__)
@acm
async def open_history_client(
instrument: str,
) -> tuple[Callable, int]:
# TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client:
async def get_ohlc(
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
) -> tuple[
np.ndarray,
datetime, # start
datetime, # end
]:
array = await client.bars(
instrument,
start_dt=start_dt,
end_dt=end_dt,
)
if len(array) == 0:
raise DataUnavailable
start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time'])
return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 3, 'rate': 3}
async def backfill_bars(
symbol: str,
shm: ShmArray, # type: ignore # noqa
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Fill historical bars into shared mem / storage afap.
"""
instrument = symbol
with trio.CancelScope() as cs:
async with open_cached_client('deribit') as client:
bars = await client.bars(instrument)
shm.push(bars)
task_status.started(cs)
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
loglevel: str = None,
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
sym = symbols[0]
async with (
open_cached_client('deribit') as client,
send_chan as send_chan
):
init_msgs = {
# pass back token, and bool, signalling if we're the writer
# and that history has been written
sym: {
'symbol_info': {
'asset_type': 'option',
'price_tick_size': 0.0005
},
'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
},
}
nsym = piker_sym_to_cb_sym(sym)
async with maybe_open_price_feed(sym) as stream:
cache = await client.cache_symbols()
last_trades = (await client.last_trades(
cb_sym_to_deribit_inst(nsym), count=1)).trades
if len(last_trades) == 0:
last_trade = None
async for typ, quote in stream:
if typ == 'trade':
last_trade = Trade(**(quote['data']))
break
else:
last_trade = Trade(**(last_trades[0]))
first_quote = {
'symbol': sym,
'last': last_trade.price,
'brokerd_ts': last_trade.timestamp,
'ticks': [{
'type': 'trade',
'price': last_trade.price,
'size': last_trade.amount,
'broker_ts': last_trade.timestamp
}]
}
task_status.started((init_msgs, first_quote))
feed_is_live.set()
async for typ, quote in stream:
topic = quote['symbol']
await send_chan.send({topic: quote})
@tractor.context
async def open_symbol_search(
ctx: tractor.Context,
) -> Client:
async with open_cached_client('deribit') as client:
# load all symbols locally for fast search
cache = await client.cache_symbols()
await ctx.started()
async with ctx.open_stream() as stream:
async for pattern in stream:
# repack in dict form
await stream.send(
await client.search_symbols(pattern))

View File

@ -20,7 +20,7 @@ ToOlS fOr CoPInG wITh "tHE wEB" protocols.
""" """
from contextlib import asynccontextmanager, AsyncExitStack from contextlib import asynccontextmanager, AsyncExitStack
from types import ModuleType from types import ModuleType
from typing import Any, Callable, AsyncGenerator from typing import Any, Optional, Callable, AsyncGenerator
import json import json
import trio import trio
@ -54,8 +54,8 @@ class NoBsWs:
self, self,
url: str, url: str,
stack: AsyncExitStack, stack: AsyncExitStack,
fixture: Callable, fixture: Optional[Callable] = None,
serializer: ModuleType = json, serializer: ModuleType = json
): ):
self.url = url self.url = url
self.fixture = fixture self.fixture = fixture
@ -80,6 +80,8 @@ class NoBsWs:
self._ws = await self._stack.enter_async_context( self._ws = await self._stack.enter_async_context(
trio_websocket.open_websocket_url(self.url) trio_websocket.open_websocket_url(self.url)
) )
if self.fixture is not None:
# rerun user code fixture # rerun user code fixture
ret = await self._stack.enter_async_context( ret = await self._stack.enter_async_context(
self.fixture(self) self.fixture(self)
@ -121,13 +123,19 @@ class NoBsWs:
except self.recon_errors: except self.recon_errors:
await self._connect() await self._connect()
def __aiter__(self):
return self
async def __anext__(self):
return await self.recv_msg()
@asynccontextmanager @asynccontextmanager
async def open_autorecon_ws( async def open_autorecon_ws(
url: str, url: str,
# TODO: proper type annot smh # TODO: proper type annot smh
fixture: Callable, fixture: Optional[Callable] = None,
) -> AsyncGenerator[tuple[...], NoBsWs]: ) -> AsyncGenerator[tuple[...], NoBsWs]:
"""Apparently we can QoS for all sorts of reasons..so catch em. """Apparently we can QoS for all sorts of reasons..so catch em.

View File

@ -1,7 +1,7 @@
# we require a pinned dev branch to get some edge features that # we require a pinned dev branch to get some edge features that
# are often untested in tractor's CI and/or being tested by us # are often untested in tractor's CI and/or being tested by us
# first before committing as core features in tractor's base. # first before committing as core features in tractor's base.
-e git+https://github.com/goodboy/tractor.git@master#egg=tractor -e git+https://github.com/goodboy/tractor.git@reentrant_moc#egg=tractor
# `pyqtgraph` peeps keep breaking, fixing, improving so might as well # `pyqtgraph` peeps keep breaking, fixing, improving so might as well
# pin this to a dev branch that we have more control over especially # pin this to a dev branch that we have more control over especially
@ -18,3 +18,6 @@
# ``asyncvnc`` for sending interactions to ib-gw inside docker # ``asyncvnc`` for sending interactions to ib-gw inside docker
-e git+https://github.com/pikers/asyncvnc.git@main#egg=asyncvnc -e git+https://github.com/pikers/asyncvnc.git@main#egg=asyncvnc
# ``cryptofeed`` for connecting to various crypto exchanges + custom fixes
-e git+https://github.com/pikers/cryptofeed.git@date_parsing#egg=cryptofeed

View File

@ -58,6 +58,7 @@ setup(
# 'trimeter', # not released yet.. # 'trimeter', # not released yet..
# 'tractor', # 'tractor',
# asyncvnc, # asyncvnc,
# 'cryptofeed',
# brokers # brokers
'asks==2.4.8', 'asks==2.4.8',