Compare commits
No commits in common. "558d2564c512e2b20a27f0f8201ff7cd36f4f53f" and "f6b54f02c0233cd5ab619ae0e4e53071261b8831" have entirely different histories.
558d2564c5
...
f6b54f02c0
|
@ -21,15 +21,11 @@ import os
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from operator import attrgetter
|
from operator import attrgetter
|
||||||
from operator import itemgetter
|
from operator import itemgetter
|
||||||
from typing import (
|
|
||||||
Any,
|
|
||||||
)
|
|
||||||
|
|
||||||
import click
|
import click
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
|
|
||||||
# from .._daemon import maybe_open_runtime
|
|
||||||
from ..cli import cli
|
from ..cli import cli
|
||||||
from .. import watchlists as wl
|
from .. import watchlists as wl
|
||||||
from ..log import get_console_log, colorize_json, get_logger
|
from ..log import get_console_log, colorize_json, get_logger
|
||||||
|
@ -43,142 +39,6 @@ _config_dir = click.get_app_dir('piker')
|
||||||
_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json')
|
_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json')
|
||||||
|
|
||||||
|
|
||||||
@cli.command()
|
|
||||||
@click.argument('broker', nargs=1, required=True)
|
|
||||||
@click.pass_obj
|
|
||||||
def brokercheck(config, broker):
|
|
||||||
'''
|
|
||||||
Test broker apis for completeness.
|
|
||||||
|
|
||||||
'''
|
|
||||||
OK = '\033[92m'
|
|
||||||
WARNING = '\033[93m'
|
|
||||||
FAIL = '\033[91m'
|
|
||||||
ENDC = '\033[0m'
|
|
||||||
|
|
||||||
def print_ok(s: str, **kwargs):
|
|
||||||
print(OK + s + ENDC, **kwargs)
|
|
||||||
|
|
||||||
def print_error(s: str, **kwargs):
|
|
||||||
print(FAIL + s + ENDC, **kwargs)
|
|
||||||
|
|
||||||
async def run_method(
|
|
||||||
client,
|
|
||||||
meth_name: str,
|
|
||||||
**kwargs,
|
|
||||||
|
|
||||||
) -> Any:
|
|
||||||
print(f'checking client for method \'{meth_name}\'...', end='', flush=True)
|
|
||||||
method = getattr(client, meth_name, None)
|
|
||||||
assert method, f'.{meth_name} does not exist for {client}!'
|
|
||||||
print_ok('found!, running...', end='', flush=True)
|
|
||||||
result = await method(**kwargs)
|
|
||||||
print_ok(f'done! result: {type(result)}')
|
|
||||||
return result
|
|
||||||
|
|
||||||
async def run_test(broker_name: str):
|
|
||||||
|
|
||||||
brokermod = get_brokermod(broker_name)
|
|
||||||
total = 0
|
|
||||||
passed = 0
|
|
||||||
failed = 0
|
|
||||||
|
|
||||||
print('getting client...', end='', flush=True)
|
|
||||||
if not hasattr(brokermod, 'get_client'):
|
|
||||||
print_error('fail! no \'get_client\' context manager found.')
|
|
||||||
return
|
|
||||||
|
|
||||||
# extra_tractor_kwargs = getattr(
|
|
||||||
# brokermod,
|
|
||||||
# '_spawn_kwargs',
|
|
||||||
# {},
|
|
||||||
# )
|
|
||||||
|
|
||||||
# TODO: eventually avoid this hack for `ib` XD
|
|
||||||
import inspect
|
|
||||||
get_client = brokermod.get_client
|
|
||||||
if 'is_brokercheck' in inspect.signature(get_client).parameters:
|
|
||||||
kwargs = {'is_brokercheck': True}
|
|
||||||
else:
|
|
||||||
kwargs = {}
|
|
||||||
|
|
||||||
async with (
|
|
||||||
# TODO: in theory we can actually spawn a local `brokerd`
|
|
||||||
# and then try to make some basic feed queries?
|
|
||||||
# maybe_open_runtime(**extra_tractor_kwargs),
|
|
||||||
brokermod.get_client(**kwargs) as client,
|
|
||||||
):
|
|
||||||
print_ok('done! inside client context.')
|
|
||||||
|
|
||||||
# check for methods present on brokermod
|
|
||||||
method_list = [
|
|
||||||
# not required eps i'm pretty sure?
|
|
||||||
# 'backfill_bars',
|
|
||||||
# 'stream_messages',
|
|
||||||
|
|
||||||
'open_history_client',
|
|
||||||
'stream_quotes',
|
|
||||||
'open_symbol_search',
|
|
||||||
'trades_dialogue',
|
|
||||||
]
|
|
||||||
|
|
||||||
for method in method_list:
|
|
||||||
print(
|
|
||||||
f"checking brokermod for method '{method}'...",
|
|
||||||
end='',
|
|
||||||
flush=True,
|
|
||||||
)
|
|
||||||
if not hasattr(brokermod, method):
|
|
||||||
print_error(f"fail! method '{method}' not found.")
|
|
||||||
failed += 1
|
|
||||||
else:
|
|
||||||
print_ok('done!')
|
|
||||||
passed += 1
|
|
||||||
|
|
||||||
total += 1
|
|
||||||
|
|
||||||
# check for methods present con brokermod.Client and attempt
|
|
||||||
# to use them and gather output results.
|
|
||||||
|
|
||||||
symbol_info = getattr(client, 'symbol_info', None)
|
|
||||||
if symbol_info:
|
|
||||||
syms = await run_method(
|
|
||||||
client,
|
|
||||||
'symbol_info',
|
|
||||||
)
|
|
||||||
|
|
||||||
total += 1
|
|
||||||
|
|
||||||
if len(syms) == 0:
|
|
||||||
raise BaseException('Empty Symbol list?')
|
|
||||||
|
|
||||||
passed += 1
|
|
||||||
|
|
||||||
first_sym = tuple(syms.keys())[0]
|
|
||||||
|
|
||||||
method_list = [
|
|
||||||
('cache_symbols', {}),
|
|
||||||
('search_symbols', {'pattern': first_sym[:-1]}),
|
|
||||||
('bars', {'symbol': first_sym})
|
|
||||||
]
|
|
||||||
|
|
||||||
for method_name, method_kwargs in method_list:
|
|
||||||
try:
|
|
||||||
await run_method(client, method_name, **method_kwargs)
|
|
||||||
passed += 1
|
|
||||||
|
|
||||||
except AssertionError:
|
|
||||||
print_error(f'fail! method \'{method_name}\' not found.')
|
|
||||||
failed += 1
|
|
||||||
|
|
||||||
total += 1
|
|
||||||
|
|
||||||
print(f'total: {total}, passed: {passed}, failed: {failed}')
|
|
||||||
|
|
||||||
trio.run(run_test, broker)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@cli.command()
|
@cli.command()
|
||||||
@click.option('--keys', '-k', multiple=True,
|
@click.option('--keys', '-k', multiple=True,
|
||||||
help='Return results only for these keys')
|
help='Return results only for these keys')
|
||||||
|
@ -333,8 +193,6 @@ def contracts(ctx, loglevel, broker, symbol, ids):
|
||||||
brokermod = get_brokermod(broker)
|
brokermod = get_brokermod(broker)
|
||||||
get_console_log(loglevel)
|
get_console_log(loglevel)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
contracts = trio.run(partial(core.contracts, brokermod, symbol))
|
contracts = trio.run(partial(core.contracts, brokermod, symbol))
|
||||||
if not ids:
|
if not ids:
|
||||||
# just print out expiry dates which can be used with
|
# just print out expiry dates which can be used with
|
||||||
|
|
|
@ -1,483 +0,0 @@
|
||||||
# piker: trading gear for hackers
|
|
||||||
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
|
|
||||||
|
|
||||||
# This program is free software: you can redistribute it and/or modify
|
|
||||||
# it under the terms of the GNU Affero General Public License as published by
|
|
||||||
# the Free Software Foundation, either version 3 of the License, or
|
|
||||||
# (at your option) any later version.
|
|
||||||
|
|
||||||
# This program is distributed in the hope that it will be useful,
|
|
||||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
# GNU Affero General Public License for more details.
|
|
||||||
|
|
||||||
# You should have received a copy of the GNU Affero General Public License
|
|
||||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
"""
|
|
||||||
Deribit backend
|
|
||||||
|
|
||||||
"""
|
|
||||||
import asyncio
|
|
||||||
from contextlib import asynccontextmanager as acm
|
|
||||||
from datetime import datetime
|
|
||||||
from typing import (
|
|
||||||
Any, Union, Optional, List,
|
|
||||||
AsyncGenerator, Callable,
|
|
||||||
)
|
|
||||||
import time
|
|
||||||
|
|
||||||
import trio
|
|
||||||
from trio_typing import TaskStatus
|
|
||||||
import pendulum
|
|
||||||
import asks
|
|
||||||
from fuzzywuzzy import process as fuzzy
|
|
||||||
import numpy as np
|
|
||||||
import tractor
|
|
||||||
from tractor import to_asyncio
|
|
||||||
from pydantic.dataclasses import dataclass
|
|
||||||
from pydantic import BaseModel
|
|
||||||
import wsproto
|
|
||||||
|
|
||||||
from .. import config
|
|
||||||
from .._cacheables import open_cached_client
|
|
||||||
from ._util import resproc, SymbolNotFound
|
|
||||||
from ..log import get_logger, get_console_log
|
|
||||||
from ..data import ShmArray
|
|
||||||
from ..data._web_bs import open_autorecon_ws, NoBsWs
|
|
||||||
|
|
||||||
|
|
||||||
from cryptofeed import FeedHandler
|
|
||||||
|
|
||||||
from cryptofeed.callback import (
|
|
||||||
L1BookCallback,
|
|
||||||
TradeCallback
|
|
||||||
)
|
|
||||||
from cryptofeed.defines import (
|
|
||||||
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
|
|
||||||
)
|
|
||||||
from cryptofeed.symbols import Symbol
|
|
||||||
|
|
||||||
_spawn_kwargs = {
|
|
||||||
'infect_asyncio': True,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def get_config() -> dict[str, Any]:
|
|
||||||
|
|
||||||
conf, path = config.load()
|
|
||||||
|
|
||||||
section = conf.get('deribit')
|
|
||||||
|
|
||||||
if section is None:
|
|
||||||
log.warning(f'No config section found for deribit in {path}')
|
|
||||||
return {}
|
|
||||||
|
|
||||||
conf['log'] = {}
|
|
||||||
conf['log']['filename'] = 'feedhandler.log'
|
|
||||||
conf['log']['level'] = 'WARNING'
|
|
||||||
|
|
||||||
return conf
|
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
_url = 'https://www.deribit.com'
|
|
||||||
|
|
||||||
|
|
||||||
# Broker specific ohlc schema (rest)
|
|
||||||
_ohlc_dtype = [
|
|
||||||
('index', int),
|
|
||||||
('time', int),
|
|
||||||
('open', float),
|
|
||||||
('high', float),
|
|
||||||
('low', float),
|
|
||||||
('close', float),
|
|
||||||
('volume', float),
|
|
||||||
('bar_wap', float), # will be zeroed by sampler if not filled
|
|
||||||
]
|
|
||||||
|
|
||||||
class KLinesResult(BaseModel):
|
|
||||||
close: List[float]
|
|
||||||
cost: List[float]
|
|
||||||
high: List[float]
|
|
||||||
low: List[float]
|
|
||||||
open: List[float]
|
|
||||||
status: str
|
|
||||||
ticks: List[int]
|
|
||||||
volume: List[float]
|
|
||||||
|
|
||||||
class KLines(BaseModel):
|
|
||||||
jsonrpc: str = '2.0'
|
|
||||||
result: KLinesResult
|
|
||||||
usIn: int
|
|
||||||
usOut: int
|
|
||||||
usDiff: int
|
|
||||||
testnet: bool
|
|
||||||
|
|
||||||
|
|
||||||
# convert datetime obj timestamp to unixtime in milliseconds
|
|
||||||
def deribit_timestamp(when):
|
|
||||||
return int((when.timestamp() * 1000) + (when.microsecond / 1000))
|
|
||||||
|
|
||||||
|
|
||||||
class Client:
|
|
||||||
|
|
||||||
def __init__(self) -> None:
|
|
||||||
self._sesh = asks.Session(connections=4)
|
|
||||||
self._sesh.base_location = _url
|
|
||||||
self._pairs: dict[str, Any] = {}
|
|
||||||
|
|
||||||
async def _api(
|
|
||||||
self,
|
|
||||||
method: str,
|
|
||||||
params: dict,
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
resp = await self._sesh.get(
|
|
||||||
path=f'/api/v2/public/{method}',
|
|
||||||
params=params,
|
|
||||||
timeout=float('inf')
|
|
||||||
)
|
|
||||||
return resproc(resp, log)
|
|
||||||
|
|
||||||
async def symbol_info(
|
|
||||||
self,
|
|
||||||
instrument: Optional[str] = None,
|
|
||||||
currency: str = 'btc', # BTC, ETH, SOL, USDC
|
|
||||||
kind: str = 'option',
|
|
||||||
expired: bool = False
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
'''Get symbol info for the exchange.
|
|
||||||
|
|
||||||
'''
|
|
||||||
# TODO: we can load from our self._pairs cache
|
|
||||||
# on repeat calls...
|
|
||||||
|
|
||||||
# will retrieve all symbols by default
|
|
||||||
params = {
|
|
||||||
'currency': currency.upper(),
|
|
||||||
'kind': kind,
|
|
||||||
'expired': str(expired).lower()
|
|
||||||
}
|
|
||||||
|
|
||||||
resp = await self._api(
|
|
||||||
'get_instruments', params=params)
|
|
||||||
|
|
||||||
results = resp['result']
|
|
||||||
|
|
||||||
instruments = {
|
|
||||||
item['instrument_name']: item for item in results}
|
|
||||||
|
|
||||||
if instrument is not None:
|
|
||||||
return instruments[instrument]
|
|
||||||
else:
|
|
||||||
return instruments
|
|
||||||
|
|
||||||
async def cache_symbols(
|
|
||||||
self,
|
|
||||||
) -> dict:
|
|
||||||
if not self._pairs:
|
|
||||||
self._pairs = await self.symbol_info()
|
|
||||||
|
|
||||||
return self._pairs
|
|
||||||
|
|
||||||
async def search_symbols(
|
|
||||||
self,
|
|
||||||
pattern: str,
|
|
||||||
limit: int = None,
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
if self._pairs is not None:
|
|
||||||
data = self._pairs
|
|
||||||
else:
|
|
||||||
data = await self.symbol_info()
|
|
||||||
|
|
||||||
matches = fuzzy.extractBests(
|
|
||||||
pattern,
|
|
||||||
data,
|
|
||||||
score_cutoff=50,
|
|
||||||
)
|
|
||||||
# repack in dict form
|
|
||||||
return {item[0]['instrument_name']: item[0]
|
|
||||||
for item in matches}
|
|
||||||
|
|
||||||
async def bars(
|
|
||||||
self,
|
|
||||||
symbol: str,
|
|
||||||
start_dt: Optional[datetime] = None,
|
|
||||||
end_dt: Optional[datetime] = None,
|
|
||||||
limit: int = 1000,
|
|
||||||
as_np: bool = True,
|
|
||||||
) -> dict:
|
|
||||||
instrument = symbol
|
|
||||||
|
|
||||||
if end_dt is None:
|
|
||||||
end_dt = pendulum.now('UTC')
|
|
||||||
|
|
||||||
if start_dt is None:
|
|
||||||
start_dt = end_dt.start_of(
|
|
||||||
'minute').subtract(minutes=limit)
|
|
||||||
|
|
||||||
start_time = deribit_timestamp(start_dt)
|
|
||||||
end_time = deribit_timestamp(end_dt)
|
|
||||||
|
|
||||||
# https://docs.deribit.com/#public-get_tradingview_chart_data
|
|
||||||
response = await self._api(
|
|
||||||
'get_tradingview_chart_data',
|
|
||||||
params={
|
|
||||||
'instrument_name': instrument.upper(),
|
|
||||||
'start_timestamp': start_time,
|
|
||||||
'end_timestamp': end_time,
|
|
||||||
'resolution': '1'
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
klines = KLines(**response)
|
|
||||||
|
|
||||||
result = klines.result
|
|
||||||
new_bars = []
|
|
||||||
for i in range(len(result.close)):
|
|
||||||
|
|
||||||
_open = result.open[i]
|
|
||||||
high = result.high[i]
|
|
||||||
low = result.low[i]
|
|
||||||
close = result.close[i]
|
|
||||||
volume = result.volume[i]
|
|
||||||
|
|
||||||
row = [
|
|
||||||
(start_time + (i * (60 * 1000))) / 1000.0, # time
|
|
||||||
result.open[i],
|
|
||||||
result.high[i],
|
|
||||||
result.low[i],
|
|
||||||
result.close[i],
|
|
||||||
result.volume[i]
|
|
||||||
]
|
|
||||||
|
|
||||||
new_bars.append((i,) + tuple(row))
|
|
||||||
|
|
||||||
array = np.array(
|
|
||||||
[i, ], dtype=_ohlc_dtype) if as_np else klines
|
|
||||||
return array
|
|
||||||
|
|
||||||
|
|
||||||
@acm
|
|
||||||
async def get_client() -> Client:
|
|
||||||
client = Client()
|
|
||||||
await client.cache_symbols()
|
|
||||||
yield client
|
|
||||||
|
|
||||||
|
|
||||||
# inside here we are in an asyncio context
|
|
||||||
async def open_aio_cryptofeed_relay(
|
|
||||||
from_trio: asyncio.Queue,
|
|
||||||
to_trio: trio.abc.SendChannel,
|
|
||||||
instruments: List[str] = []
|
|
||||||
) -> None:
|
|
||||||
|
|
||||||
conf = get_config()
|
|
||||||
|
|
||||||
def format_sym(name: str) -> str:
|
|
||||||
base, expiry_date, strike_price, option_type = tuple(
|
|
||||||
name.upper().split('-'))
|
|
||||||
|
|
||||||
quote = base
|
|
||||||
|
|
||||||
if option_type == 'P':
|
|
||||||
option_type = PUT
|
|
||||||
elif option_type == 'C':
|
|
||||||
option_type = CALL
|
|
||||||
else:
|
|
||||||
raise BaseException("Instrument name must end in 'c' for calls or 'p' for puts")
|
|
||||||
|
|
||||||
return Symbol(
|
|
||||||
base, quote,
|
|
||||||
type=OPTION,
|
|
||||||
strike_price=strike_price,
|
|
||||||
option_type=option_type,
|
|
||||||
expiry_date=expiry_date.upper()).normalized
|
|
||||||
|
|
||||||
instruments = [format_sym(i) for i in instruments]
|
|
||||||
|
|
||||||
async def trade_cb(data: dict, receipt_timestamp):
|
|
||||||
breakpoint()
|
|
||||||
# to_trio.send_nowait(('trade', {
|
|
||||||
# 'symbol': data.symbol.lower(),
|
|
||||||
# 'last': data.
|
|
||||||
# 'broker_ts': time.time(),
|
|
||||||
# 'data': data.to_dict(),
|
|
||||||
# 'receipt': receipt_timestamp}))
|
|
||||||
|
|
||||||
async def l1_book_cb(data: dict, receipt_timestamp):
|
|
||||||
to_trio.send_nowait(('l1', {
|
|
||||||
'symbol': data.symbol.lower(),
|
|
||||||
'ticks': [
|
|
||||||
{'type': 'bid',
|
|
||||||
'price': float(data.bid_price), 'size': float(data.bid_size)},
|
|
||||||
{'type': 'bsize',
|
|
||||||
'price': float(data.bid_price), 'size': float(data.bid_size)},
|
|
||||||
{'type': 'ask',
|
|
||||||
'price': float(data.ask_price), 'size': float(data.ask_size)},
|
|
||||||
{'type': 'asize',
|
|
||||||
'price': float(data.ask_price), 'size': float(data.ask_size)}
|
|
||||||
]}))
|
|
||||||
|
|
||||||
fh = FeedHandler(config=conf)
|
|
||||||
fh.run(start_loop=False)
|
|
||||||
|
|
||||||
fh.add_feed(
|
|
||||||
DERIBIT,
|
|
||||||
channels=[L1_BOOK, TRADES],
|
|
||||||
symbols=instruments,
|
|
||||||
callbacks={
|
|
||||||
L1_BOOK: L1BookCallback(l1_book_cb),
|
|
||||||
TRADES: TradeCallback(trade_cb)
|
|
||||||
})
|
|
||||||
|
|
||||||
# sync with trio
|
|
||||||
to_trio.send_nowait(None)
|
|
||||||
|
|
||||||
await from_trio.get()
|
|
||||||
|
|
||||||
|
|
||||||
async def open_cryptofeeds(
|
|
||||||
instruments: List[str],
|
|
||||||
to_chart: trio.abc.SendChannel,
|
|
||||||
|
|
||||||
# startup sync
|
|
||||||
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
|
|
||||||
):
|
|
||||||
async with to_asyncio.open_channel_from(
|
|
||||||
open_aio_cryptofeed_relay,
|
|
||||||
instruments=instruments,
|
|
||||||
) as (first, chan):
|
|
||||||
assert first is None
|
|
||||||
|
|
||||||
await chan.send(None)
|
|
||||||
|
|
||||||
async with chan.subscribe() as msg_stream:
|
|
||||||
task_status.started()
|
|
||||||
async for msg in msg_stream:
|
|
||||||
await to_chart.send(msg)
|
|
||||||
|
|
||||||
|
|
||||||
@acm
|
|
||||||
async def open_history_client(
|
|
||||||
instrument: str,
|
|
||||||
) -> tuple[Callable, int]:
|
|
||||||
|
|
||||||
# TODO implement history getter for the new storage layer.
|
|
||||||
async with open_cached_client('deribit') as client:
|
|
||||||
|
|
||||||
async def get_ohlc(
|
|
||||||
end_dt: Optional[datetime] = None,
|
|
||||||
start_dt: Optional[datetime] = None,
|
|
||||||
|
|
||||||
) -> tuple[
|
|
||||||
np.ndarray,
|
|
||||||
datetime, # start
|
|
||||||
datetime, # end
|
|
||||||
]:
|
|
||||||
|
|
||||||
array = await client.bars(
|
|
||||||
instrument,
|
|
||||||
start_dt=start_dt,
|
|
||||||
end_dt=end_dt,
|
|
||||||
)
|
|
||||||
start_dt = pendulum.from_timestamp(array[0]['time'])
|
|
||||||
end_dt = pendulum.from_timestamp(array[-1]['time'])
|
|
||||||
return array, start_dt, end_dt
|
|
||||||
|
|
||||||
yield get_ohlc, {'erlangs': 3, 'rate': 3}
|
|
||||||
|
|
||||||
|
|
||||||
async def backfill_bars(
|
|
||||||
symbol: str,
|
|
||||||
shm: ShmArray, # type: ignore # noqa
|
|
||||||
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
|
||||||
) -> None:
|
|
||||||
"""Fill historical bars into shared mem / storage afap.
|
|
||||||
"""
|
|
||||||
instrument = symbol
|
|
||||||
with trio.CancelScope() as cs:
|
|
||||||
async with open_cached_client('deribit') as client:
|
|
||||||
bars = await client.bars(instrument)
|
|
||||||
shm.push(bars)
|
|
||||||
task_status.started(cs)
|
|
||||||
|
|
||||||
|
|
||||||
async def stream_quotes(
|
|
||||||
|
|
||||||
send_chan: trio.abc.SendChannel,
|
|
||||||
symbols: list[str],
|
|
||||||
feed_is_live: trio.Event,
|
|
||||||
loglevel: str = None,
|
|
||||||
|
|
||||||
# startup sync
|
|
||||||
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
|
|
||||||
|
|
||||||
) -> None:
|
|
||||||
# XXX: required to propagate ``tractor`` loglevel to piker logging
|
|
||||||
get_console_log(loglevel or tractor.current_actor().loglevel)
|
|
||||||
|
|
||||||
sym = symbols[0]
|
|
||||||
to_chart, from_feed = trio.open_memory_channel(1)
|
|
||||||
|
|
||||||
async with (
|
|
||||||
open_cached_client('deribit') as client,
|
|
||||||
send_chan as send_chan,
|
|
||||||
trio.open_nursery() as n
|
|
||||||
):
|
|
||||||
await n.start(
|
|
||||||
open_cryptofeeds, symbols, to_chart)
|
|
||||||
|
|
||||||
init_msgs = {
|
|
||||||
# pass back token, and bool, signalling if we're the writer
|
|
||||||
# and that history has been written
|
|
||||||
sym: {
|
|
||||||
'symbol_info': {},
|
|
||||||
'shm_write_opts': {'sum_tick_vml': False},
|
|
||||||
'fqsn': sym,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
# keep client cached for real-time section
|
|
||||||
cache = await client.cache_symbols()
|
|
||||||
|
|
||||||
async with from_feed:
|
|
||||||
typ, quote = await anext(from_feed)
|
|
||||||
|
|
||||||
while typ != 'trade':
|
|
||||||
typ, quote = await anext(from_feed)
|
|
||||||
|
|
||||||
task_status.started((init_msgs, quote))
|
|
||||||
|
|
||||||
async for typ, msg in from_feed:
|
|
||||||
topic = msg['symbol']
|
|
||||||
await send_chan.send({topic: msg})
|
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
|
||||||
async def open_symbol_search(
|
|
||||||
ctx: tractor.Context,
|
|
||||||
) -> Client:
|
|
||||||
async with open_cached_client('deribit') as client:
|
|
||||||
|
|
||||||
# load all symbols locally for fast search
|
|
||||||
cache = await client.cache_symbols()
|
|
||||||
await ctx.started()
|
|
||||||
|
|
||||||
async with ctx.open_stream() as stream:
|
|
||||||
|
|
||||||
async for pattern in stream:
|
|
||||||
# results = await client.symbol_info(sym=pattern.upper())
|
|
||||||
|
|
||||||
matches = fuzzy.extractBests(
|
|
||||||
pattern,
|
|
||||||
cache,
|
|
||||||
score_cutoff=50,
|
|
||||||
)
|
|
||||||
# repack in dict form
|
|
||||||
await stream.send(
|
|
||||||
{item[0]['instrument_name']: item[0]
|
|
||||||
for item in matches}
|
|
||||||
)
|
|
|
@ -1353,7 +1353,6 @@ async def open_client_proxy(
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def get_client(
|
async def get_client(
|
||||||
is_brokercheck: bool = False,
|
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> Client:
|
) -> Client:
|
||||||
|
@ -1362,10 +1361,6 @@ async def get_client(
|
||||||
a method proxy to it.
|
a method proxy to it.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
if is_brokercheck:
|
|
||||||
yield Client
|
|
||||||
return
|
|
||||||
|
|
||||||
# TODO: the IPC via portal relay layer for when this current
|
# TODO: the IPC via portal relay layer for when this current
|
||||||
# actor isn't in aio mode.
|
# actor isn't in aio mode.
|
||||||
async with open_data_client() as proxy:
|
async with open_data_client() as proxy:
|
||||||
|
|
Loading…
Reference in New Issue