`flake8` linter cleanup and comment out order ctl draft code

deribit_updates
Tyler Goodlet 2023-03-03 18:32:24 -05:00
parent 1bd421a0f3
commit 4c838474be
2 changed files with 128 additions and 125 deletions

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers
# Copyright (C) Guillermo Rodriguez (in stewardship for piker0)
# Copyright (C) Guillermo Rodriguez (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -18,49 +18,48 @@
Deribit backend.
'''
import json
from __future__ import annotations
import time
import asyncio
from contextlib import asynccontextmanager as acm, AsyncExitStack
from contextlib import asynccontextmanager as acm
from functools import partial
from datetime import datetime
from typing import Any, Optional, Iterable, Callable
from typing import (
Any,
Optional,
Callable,
)
from cryptofeed import FeedHandler
from cryptofeed.defines import (
DERIBIT,
L1_BOOK,
TRADES,
OPTION,
CALL,
PUT,
)
import pendulum
import asks
import trio
from trio_typing import Nursery, TaskStatus
from trio_typing import TaskStatus
from fuzzywuzzy import process as fuzzy
import numpy as np
from tractor.trionics import (
broadcast_receiver,
maybe_open_context
)
from tractor import to_asyncio
from cryptofeed.symbols import Symbol
from piker.data.types import Struct
from piker.data._web_bs import (
NoBsWs,
open_autorecon_ws,
open_jsonrpc_session
)
from .._util import resproc
from piker import config
from piker.log import get_logger
from tractor.trionics import (
broadcast_receiver,
BroadcastReceiver,
maybe_open_context
)
from tractor import to_asyncio
from cryptofeed import FeedHandler
from cryptofeed.defines import (
DERIBIT,
L1_BOOK, TRADES,
OPTION, CALL, PUT
)
from cryptofeed.symbols import Symbol
log = get_logger(__name__)
@ -190,7 +189,12 @@ def cb_sym_to_deribit_inst(sym: Symbol):
cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z']
# deribit specific
months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC']
months = [
'JAN', 'FEB', 'MAR',
'APR', 'MAY', 'JUN',
'JUL', 'AUG', 'SEP',
'OCT', 'NOV', 'DEC',
]
exp = sym.expiry_date
@ -210,7 +214,8 @@ def get_config() -> dict[str, Any]:
section = conf.get('deribit')
# TODO: document why we send this, basically because logging params for cryptofeed
# TODO: document why we send this, basically because logging params
# for cryptofeed
conf['log'] = {}
conf['log']['disabled'] = True
@ -364,6 +369,7 @@ class Client:
end_dt: Optional[datetime] = None,
limit: int = 1000,
as_np: bool = True,
) -> dict:
instrument = symbol
@ -389,14 +395,8 @@ class Client:
result = KLinesResult(**resp.result)
new_bars = []
for i in range(len(result.close)):
_open = result.open[i]
high = result.high[i]
low = result.low[i]
close = result.close[i]
volume = result.volume[i]
row = [
(start_time + (i * (60 * 1000))) / 1000.0, # time
result.open[i],
@ -409,7 +409,7 @@ class Client:
new_bars.append((i,) + tuple(row))
array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else klines
array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else new_bars
return array
async def last_trades(
@ -463,7 +463,7 @@ async def get_client(
if time.time() - _expiry_time < renew_time:
# if we are close to token expiry time
if _refresh_token != None:
if _refresh_token is not None:
# if we have a refresh token already dont need to send
# secret
params = {
@ -473,7 +473,8 @@ async def get_client(
}
else:
# we don't have refresh token, send secret to initialize
# we don't have refresh token, send secret to
# initialize
params = {
'grant_type': 'client_credentials',
'client_id': client._key_id,
@ -541,20 +542,30 @@ async def aio_price_feed_relay(
}))
async def _l1(data: dict, receipt_timestamp):
to_trio.send_nowait(('l1', {
to_trio.send_nowait(
('l1', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'ticks': [
{'type': 'bid',
'price': float(data.bid_price), 'size': float(data.bid_size)},
'price': float(data.bid_price),
'size': float(data.bid_size)},
{'type': 'bsize',
'price': float(data.bid_price), 'size': float(data.bid_size)},
'price': float(data.bid_price),
'size': float(data.bid_size)},
{'type': 'ask',
'price': float(data.ask_price), 'size': float(data.ask_size)},
'price': float(data.ask_price),
'size': float(data.ask_size)},
{'type': 'asize',
'price': float(data.ask_price), 'size': float(data.ask_size)}
'price': float(data.ask_price),
'size': float(data.ask_size)}
]
}))
})
)
fh.add_feed(
DERIBIT,
@ -610,69 +621,71 @@ async def maybe_open_price_feed(
yield feed
# TODO: order broker support: this is all draft code from @guilledk B)
async def aio_order_feed_relay(
fh: FeedHandler,
instrument: Symbol,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _fill(data: dict, receipt_timestamp):
breakpoint()
# async def aio_order_feed_relay(
# fh: FeedHandler,
# instrument: Symbol,
# from_trio: asyncio.Queue,
# to_trio: trio.abc.SendChannel,
async def _order_info(data: dict, receipt_timestamp):
breakpoint()
# ) -> None:
# async def _fill(data: dict, receipt_timestamp):
# breakpoint()
fh.add_feed(
DERIBIT,
channels=[FILLS, ORDER_INFO],
symbols=[instrument.upper()],
callbacks={
FILLS: _fill,
ORDER_INFO: _order_info,
})
# async def _order_info(data: dict, receipt_timestamp):
# breakpoint()
if not fh.running:
fh.run(
start_loop=False,
install_signal_handlers=False)
# fh.add_feed(
# DERIBIT,
# channels=[FILLS, ORDER_INFO],
# symbols=[instrument.upper()],
# callbacks={
# FILLS: _fill,
# ORDER_INFO: _order_info,
# })
# sync with trio
to_trio.send_nowait(None)
# if not fh.running:
# fh.run(
# start_loop=False,
# install_signal_handlers=False)
await asyncio.sleep(float('inf'))
# # sync with trio
# to_trio.send_nowait(None)
# await asyncio.sleep(float('inf'))
@acm
async def open_order_feed(
instrument: list[str]
) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from(
partial(
aio_order_feed_relay,
fh,
instrument
)
) as (first, chan):
yield chan
# @acm
# async def open_order_feed(
# instrument: list[str]
# ) -> trio.abc.ReceiveStream:
# async with maybe_open_feed_handler() as fh:
# async with to_asyncio.open_channel_from(
# partial(
# aio_order_feed_relay,
# fh,
# instrument
# )
# ) as (first, chan):
# yield chan
@acm
async def maybe_open_order_feed(
instrument: str
) -> trio.abc.ReceiveStream:
# @acm
# async def maybe_open_order_feed(
# instrument: str
# ) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context
async with maybe_open_context(
acm_func=open_order_feed,
kwargs={
'instrument': instrument,
'fh': fh
},
key=f'{instrument}-order',
) as (cache_hit, feed):
if cache_hit:
yield broadcast_receiver(feed, 10)
else:
yield feed
# # TODO: add a predicate to maybe_open_context
# async with maybe_open_context(
# acm_func=open_order_feed,
# kwargs={
# 'instrument': instrument,
# 'fh': fh
# },
# key=f'{instrument}-order',
# ) as (cache_hit, feed):
# if cache_hit:
# yield broadcast_receiver(feed, 10)
# else:
# yield feed

View File

@ -20,36 +20,26 @@ Deribit backend.
'''
from contextlib import asynccontextmanager as acm
from datetime import datetime
from typing import Any, Optional, Callable
import time
from typing import (
Callable,
)
import trio
from trio_typing import TaskStatus
import pendulum
from fuzzywuzzy import process as fuzzy
import numpy as np
import tractor
from piker._cacheables import open_cached_client
from piker.log import get_logger, get_console_log
from piker.data import ShmArray
from piker.brokers._util import (
BrokerError,
DataUnavailable,
)
from cryptofeed import FeedHandler
from cryptofeed.defines import (
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
)
from cryptofeed.symbols import Symbol
from .api import (
Client,
Trade,
get_config,
str_to_cb_sym,
piker_sym_to_cb_sym,
cb_sym_to_deribit_inst,
maybe_open_price_feed
@ -73,8 +63,8 @@ async def open_history_client(
async def get_ohlc(
timeframe: float,
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
end_dt: datetime | None = None,
start_dt: datetime | None = None,
) -> tuple[
np.ndarray,
@ -139,7 +129,7 @@ async def stream_quotes(
async with maybe_open_price_feed(sym) as stream:
cache = await client.cache_symbols()
await client.cache_symbols()
last_trades = (await client.last_trades(
cb_sym_to_deribit_inst(nsym), count=1)).trades
@ -181,7 +171,7 @@ async def open_symbol_search(
async with open_cached_client('deribit') as client:
# load all symbols locally for fast search
cache = await client.cache_symbols()
await client.cache_symbols()
await ctx.started()
async with ctx.open_stream() as stream: