Merge pull request #442 from pikers/misc_brokerd_backend_repairs

Misc brokerd backend repairs
epoch_index
goodboy 2023-01-23 18:44:00 -05:00 committed by GitHub
commit 4833d56ecb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 158 additions and 98 deletions

View File

@ -3,9 +3,8 @@ name: CI
on: on:
# Triggers the workflow on push or pull request events but only for the master branch # Triggers the workflow on push or pull request events but only for the master branch
push:
branches: [ master ]
pull_request: pull_request:
push:
branches: [ master ] branches: [ master ]
# Allows you to run this workflow manually from the Actions tab # Allows you to run this workflow manually from the Actions tab

View File

@ -8,7 +8,7 @@ services:
# https://github.com/waytrade/ib-gateway-docker#supported-tags # https://github.com/waytrade/ib-gateway-docker#supported-tags
# image: waytrade/ib-gateway:981.3j # image: waytrade/ib-gateway:981.3j
image: waytrade/ib-gateway:1012.2i image: waytrade/ib-gateway:1012.2i
restart: always # restart whenev there's a crash or user clicsk restart: 'no' # restart on boot whenev there's a crash or user clicsk
network_mode: 'host' network_mode: 'host'
volumes: volumes:
@ -64,7 +64,7 @@ services:
# ib_gw_live: # ib_gw_live:
# image: waytrade/ib-gateway:1012.2i # image: waytrade/ib-gateway:1012.2i
# restart: always # restart: no
# network_mode: 'host' # network_mode: 'host'
# volumes: # volumes:

View File

@ -519,14 +519,15 @@ async def stream_quotes(
subs.append("{sym}@bookTicker") subs.append("{sym}@bookTicker")
# unsub from all pairs on teardown # unsub from all pairs on teardown
await ws.send_msg({ if ws.connected():
"method": "UNSUBSCRIBE", await ws.send_msg({
"params": subs, "method": "UNSUBSCRIBE",
"id": uid, "params": subs,
}) "id": uid,
})
# XXX: do we need to ack the unsub? # XXX: do we need to ack the unsub?
# await ws.recv_msg() # await ws.recv_msg()
async with open_autorecon_ws( async with open_autorecon_ws(
'wss://stream.binance.com/ws', 'wss://stream.binance.com/ws',

View File

@ -94,21 +94,6 @@ async def open_history_client(
yield get_ohlc, {'erlangs': 3, 'rate': 3} yield get_ohlc, {'erlangs': 3, 'rate': 3}
async def backfill_bars(
symbol: str,
shm: ShmArray, # type: ignore # noqa
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Fill historical bars into shared mem / storage afap.
"""
instrument = symbol
with trio.CancelScope() as cs:
async with open_cached_client('deribit') as client:
bars = await client.bars(instrument)
shm.push(bars)
task_status.started(cs)
async def stream_quotes( async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,

View File

@ -162,6 +162,7 @@ _futes_venues = (
'CMECRYPTO', 'CMECRYPTO',
'COMEX', 'COMEX',
'CMDTY', # special name case.. 'CMDTY', # special name case..
'CBOT', # (treasury) yield futures
) )
_adhoc_futes_set = { _adhoc_futes_set = {
@ -197,6 +198,21 @@ _adhoc_futes_set = {
'xagusd.cmdty', # silver spot 'xagusd.cmdty', # silver spot
'ni.comex', # silver futes 'ni.comex', # silver futes
'qi.comex', # mini-silver futes 'qi.comex', # mini-silver futes
# treasury yields
# etfs by duration:
# SHY -> IEI -> IEF -> TLT
'zt.cbot', # 2y
'z3n.cbot', # 3y
'zf.cbot', # 5y
'zn.cbot', # 10y
'zb.cbot', # 30y
# (micros of above)
'2yy.cbot',
'5yy.cbot',
'10y.cbot',
'30y.cbot',
} }

View File

@ -611,7 +611,7 @@ async def trades_dialogue(
pp = table.pps[bsuid] pp = table.pps[bsuid]
if msg.size != pp.size: if msg.size != pp.size:
log.error( log.error(
'Position mismatch {pp.symbol.front_fqsn()}:\n' f'Position mismatch {pp.symbol.front_fqsn()}:\n'
f'ib: {msg.size}\n' f'ib: {msg.size}\n'
f'piker: {pp.size}\n' f'piker: {pp.size}\n'
) )

View File

@ -135,7 +135,10 @@ async def open_history_client(
# fx cons seem to not provide this endpoint? # fx cons seem to not provide this endpoint?
'idealpro' not in fqsn 'idealpro' not in fqsn
): ):
head_dt = await proxy.get_head_time(fqsn=fqsn) try:
head_dt = await proxy.get_head_time(fqsn=fqsn)
except RequestError:
head_dt = None
async def get_hist( async def get_hist(
timeframe: float, timeframe: float,

View File

@ -510,10 +510,6 @@ class Client:
''' '''
ticker = cls._ntable[ticker] ticker = cls._ntable[ticker]
symlen = len(ticker)
if symlen != 6:
raise ValueError(f'Unhandled symbol: {ticker}')
return ticker.lower() return ticker.lower()

View File

@ -413,20 +413,27 @@ async def trades_dialogue(
) -> AsyncIterator[dict[str, Any]]: ) -> AsyncIterator[dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging # XXX: required to propagate ``tractor`` loglevel to ``piker`` logging
get_console_log(loglevel or tractor.current_actor().loglevel) get_console_log(loglevel or tractor.current_actor().loglevel)
async with get_client() as client: async with get_client() as client:
# TODO: make ems flip to paper mode via
# some returned signal if the user only wants to use
# the data feed or we return this?
# await ctx.started(({}, ['paper']))
if not client._api_key: if not client._api_key:
raise RuntimeError( raise RuntimeError(
'Missing Kraken API key in `brokers.toml`!?!?') 'Missing Kraken API key in `brokers.toml`!?!?')
# TODO: make ems flip to paper mode via
# some returned signal if the user only wants to use
# the data feed or we return this?
# else:
# await ctx.started(({}, ['paper']))
# NOTE: currently we expect the user to define a "source fiat"
# (much like the web UI let's you set an "account currency")
# such that all positions (nested or flat) will be translated to
# this source currency's terms.
src_fiat = client.conf['src_fiat']
# auth required block # auth required block
acctid = client._name acctid = client._name
acc_name = 'kraken.' + acctid acc_name = 'kraken.' + acctid
@ -444,10 +451,9 @@ async def trades_dialogue(
# NOTE: testing code for making sure the rt incremental update # NOTE: testing code for making sure the rt incremental update
# of positions, via newly generated msgs works. In order to test # of positions, via newly generated msgs works. In order to test
# this, # this,
# - delete the *ABSOLUTE LAST* entry from accont's corresponding # - delete the *ABSOLUTE LAST* entry from account's corresponding
# trade ledgers file (NOTE this MUST be the last record # trade ledgers file (NOTE this MUST be the last record
# delivered from the # delivered from the api ledger),
# api ledger),
# - open you ``pps.toml`` and find that same tid and delete it # - open you ``pps.toml`` and find that same tid and delete it
# from the pp's clears table, # from the pp's clears table,
# - set this flag to `True` # - set this flag to `True`
@ -486,27 +492,68 @@ async def trades_dialogue(
# and do diff with ledger to determine # and do diff with ledger to determine
# what amount of trades-transactions need # what amount of trades-transactions need
# to be reloaded. # to be reloaded.
sizes = await client.get_balances() balances = await client.get_balances()
for dst, size in sizes.items(): for dst, size in balances.items():
# we don't care about tracking positions # we don't care about tracking positions
# in the user's source fiat currency. # in the user's source fiat currency.
if dst == client.conf['src_fiat']: if dst == src_fiat:
continue continue
def has_pp(dst: str) -> Position | bool: def has_pp(
pps_dst_assets = {bsuid[:3]: bsuid for bsuid in table.pps} dst: str,
pair = pps_dst_assets.get(dst) size: float,
pp = table.pps.get(pair)
if ( ) -> Position | bool:
not pair or not pp
or not math.isclose(pp.size, size)
):
return False
return pp src2dst: dict[str, str] = {}
for bsuid in table.pps:
try:
dst_name_start = bsuid.rindex(src_fiat)
except (
ValueError, # substr not found
):
# TODO: handle nested positions..(i.e.
# positions where the src fiat was used to
# buy some other dst which was furhter used
# to buy another dst..)
log.warning(
f'No src fiat {src_fiat} found in {bsuid}?'
)
continue
pos = has_pp(dst) _dst = bsuid[:dst_name_start]
if _dst != dst:
continue
src2dst[src_fiat] = dst
for src, dst in src2dst.items():
pair = f'{dst}{src_fiat}'
pp = table.pps.get(pair)
if (
pp
and math.isclose(pp.size, size)
):
return pp
elif (
size == 0
and pp.size
):
log.warning(
f'`kraken` account says you have a ZERO '
f'balance for {bsuid}:{pair}\n'
f'but piker seems to think `{pp.size}`\n'
'This is likely a discrepancy in piker '
'accounting if the above number is'
"large,' though it's likely to due lack"
"f tracking xfers fees.."
)
return pp
return False
pos = has_pp(dst, size)
if not pos: if not pos:
# we have a balance for which there is no pp # we have a balance for which there is no pp
@ -514,12 +561,15 @@ async def trades_dialogue(
# ledger. # ledger.
updated = table.update_from_trans(ledger_trans) updated = table.update_from_trans(ledger_trans)
log.info(f'Updated pps from ledger:\n{pformat(updated)}') log.info(f'Updated pps from ledger:\n{pformat(updated)}')
pos = has_pp(dst) pos = has_pp(dst, size)
if not pos and not simulate_pp_update: if (
not pos
and not simulate_pp_update
):
# try reloading from API # try reloading from API
table.update_from_trans(api_trans) table.update_from_trans(api_trans)
pos = has_pp(dst) pos = has_pp(dst, size)
if not pos: if not pos:
# get transfers to make sense of abs balances. # get transfers to make sense of abs balances.
@ -557,7 +607,7 @@ async def trades_dialogue(
f'{pformat(updated)}' f'{pformat(updated)}'
) )
if not has_pp(dst): if has_pp(dst, size):
raise ValueError( raise ValueError(
'Could not reproduce balance:\n' 'Could not reproduce balance:\n'
f'dst: {dst}, {size}\n' f'dst: {dst}, {size}\n'

View File

@ -303,24 +303,6 @@ async def open_history_client(
yield get_ohlc, {'erlangs': 1, 'rate': 1} yield get_ohlc, {'erlangs': 1, 'rate': 1}
async def backfill_bars(
sym: str,
shm: ShmArray, # type: ignore # noqa
count: int = 10, # NOTE: any more and we'll overrun the underlying buffer
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Fill historical bars into shared mem / storage afap.
'''
with trio.CancelScope() as cs:
async with open_cached_client('kraken') as client:
bars = await client.bars(symbol=sym)
shm.push(bars)
task_status.started(cs)
async def stream_quotes( async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,
@ -419,14 +401,15 @@ async def stream_quotes(
yield yield
# unsub from all pairs on teardown # unsub from all pairs on teardown
await ws.send_msg({ if ws.connected():
'pair': list(ws_pairs.values()), await ws.send_msg({
'event': 'unsubscribe', 'pair': list(ws_pairs.values()),
'subscription': ['ohlc', 'spread'], 'event': 'unsubscribe',
}) 'subscription': ['ohlc', 'spread'],
})
# XXX: do we need to ack the unsub? # XXX: do we need to ack the unsub?
# await ws.recv_msg() # await ws.recv_msg()
# see the tips on reconnection logic: # see the tips on reconnection logic:
# https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds

View File

@ -18,16 +18,24 @@
ToOlS fOr CoPInG wITh "tHE wEB" protocols. ToOlS fOr CoPInG wITh "tHE wEB" protocols.
""" """
from contextlib import asynccontextmanager, AsyncExitStack from contextlib import (
asynccontextmanager,
AsyncExitStack,
)
from itertools import count from itertools import count
from types import ModuleType from types import ModuleType
from typing import Any, Optional, Callable, AsyncGenerator from typing import (
Any,
Optional,
Callable,
AsyncGenerator,
Iterable,
)
import json import json
import sys
import trio import trio
import trio_websocket import trio_websocket
from wsproto.utilities import LocalProtocolError from wsproto.utilities import LocalProtocolError
from trio_websocket._impl import ( from trio_websocket._impl import (
ConnectionClosed, ConnectionClosed,
DisconnectionTimeout, DisconnectionTimeout,
@ -44,9 +52,12 @@ log = get_logger(__name__)
class NoBsWs: class NoBsWs:
"""Make ``trio_websocket`` sockets stay up no matter the bs. '''
Make ``trio_websocket`` sockets stay up no matter the bs.
""" You can provide a ``fixture`` async-context-manager which will be
enter/exitted around each reconnect operation.
'''
recon_errors = ( recon_errors = (
ConnectionClosed, ConnectionClosed,
DisconnectionTimeout, DisconnectionTimeout,
@ -68,10 +79,16 @@ class NoBsWs:
self._stack = stack self._stack = stack
self._ws: 'WebSocketConnection' = None # noqa self._ws: 'WebSocketConnection' = None # noqa
# TODO: is there some method we can call
# on the underlying `._ws` to get this?
self._connected: bool = False
async def _connect( async def _connect(
self, self,
tries: int = 1000, tries: int = 1000,
) -> None: ) -> None:
self._connected = False
while True: while True:
try: try:
await self._stack.aclose() await self._stack.aclose()
@ -96,6 +113,8 @@ class NoBsWs:
assert ret is None assert ret is None
log.info(f'Connection success: {self.url}') log.info(f'Connection success: {self.url}')
self._connected = True
return self._ws return self._ws
except self.recon_errors as err: except self.recon_errors as err:
@ -105,11 +124,15 @@ class NoBsWs:
f'{type(err)}...retry attempt {i}' f'{type(err)}...retry attempt {i}'
) )
await trio.sleep(0.5) await trio.sleep(0.5)
self._connected = False
continue continue
else: else:
log.exception('ws connection fail...') log.exception('ws connection fail...')
raise last_err raise last_err
def connected(self) -> bool:
return self._connected
async def send_msg( async def send_msg(
self, self,
data: Any, data: Any,
@ -161,6 +184,7 @@ async def open_autorecon_ws(
''' '''
JSONRPC response-request style machinery for transparent multiplexing of msgs JSONRPC response-request style machinery for transparent multiplexing of msgs
over a NoBsWs. over a NoBsWs.
''' '''
@ -170,6 +194,7 @@ class JSONRPCResult(Struct):
result: Optional[dict] = None result: Optional[dict] = None
error: Optional[dict] = None error: Optional[dict] = None
@asynccontextmanager @asynccontextmanager
async def open_jsonrpc_session( async def open_jsonrpc_session(
url: str, url: str,
@ -220,15 +245,16 @@ async def open_jsonrpc_session(
async def recv_task(): async def recv_task():
''' '''
receives every ws message and stores it in its corresponding result receives every ws message and stores it in its corresponding
field, then sets the event to wakeup original sender tasks. result field, then sets the event to wakeup original sender
also recieves responses to requests originated from the server side. tasks. also recieves responses to requests originated from
''' the server side.
'''
async for msg in ws: async for msg in ws:
match msg: match msg:
case { case {
'result': result, 'result': _,
'id': mid, 'id': mid,
} if res_entry := rpc_results.get(mid): } if res_entry := rpc_results.get(mid):
@ -239,7 +265,9 @@ async def open_jsonrpc_session(
'result': _, 'result': _,
'id': mid, 'id': mid,
} if not rpc_results.get(mid): } if not rpc_results.get(mid):
log.warning(f'Wasn\'t expecting ws msg: {json.dumps(msg, indent=4)}') log.warning(
f'Unexpected ws msg: {json.dumps(msg, indent=4)}'
)
case { case {
'method': _, 'method': _,
@ -259,7 +287,6 @@ async def open_jsonrpc_session(
case _: case _:
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')
n.start_soon(recv_task) n.start_soon(recv_task)
yield json_rpc yield json_rpc
n.cancel_scope.cancel() n.cancel_scope.cancel()

View File

@ -439,7 +439,7 @@ async def cascade(
profiler.finish() profiler.finish()
async for i in istream: async for i in istream:
# log.runtime(f'FSP incrementing {i}') # print(f'FSP incrementing {i}')
# respawn the compute task if the source # respawn the compute task if the source
# array has been updated such that we compute # array has been updated such that we compute