diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6af351ec..ca18f2c4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,9 +3,8 @@ name: CI on: # Triggers the workflow on push or pull request events but only for the master branch - push: - branches: [ master ] pull_request: + push: branches: [ master ] # Allows you to run this workflow manually from the Actions tab diff --git a/dockering/ib/docker-compose.yml b/dockering/ib/docker-compose.yml index f8be5684..8c676623 100644 --- a/dockering/ib/docker-compose.yml +++ b/dockering/ib/docker-compose.yml @@ -8,7 +8,7 @@ services: # https://github.com/waytrade/ib-gateway-docker#supported-tags # image: waytrade/ib-gateway:981.3j image: waytrade/ib-gateway:1012.2i - restart: always # restart whenev there's a crash or user clicsk + restart: 'no' # restart on boot whenev there's a crash or user clicsk network_mode: 'host' volumes: @@ -64,7 +64,7 @@ services: # ib_gw_live: # image: waytrade/ib-gateway:1012.2i - # restart: always + # restart: no # network_mode: 'host' # volumes: diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 5ea7860a..0c177db8 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -519,14 +519,15 @@ async def stream_quotes( subs.append("{sym}@bookTicker") # unsub from all pairs on teardown - await ws.send_msg({ - "method": "UNSUBSCRIBE", - "params": subs, - "id": uid, - }) + if ws.connected(): + await ws.send_msg({ + "method": "UNSUBSCRIBE", + "params": subs, + "id": uid, + }) - # XXX: do we need to ack the unsub? - # await ws.recv_msg() + # XXX: do we need to ack the unsub? + # await ws.recv_msg() async with open_autorecon_ws( 'wss://stream.binance.com/ws', diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index 714ef61b..deb0422f 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -94,21 +94,6 @@ async def open_history_client( yield get_ohlc, {'erlangs': 3, 'rate': 3} -async def backfill_bars( - symbol: str, - shm: ShmArray, # type: ignore # noqa - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, -) -> None: - """Fill historical bars into shared mem / storage afap. - """ - instrument = symbol - with trio.CancelScope() as cs: - async with open_cached_client('deribit') as client: - bars = await client.bars(instrument) - shm.push(bars) - task_status.started(cs) - - async def stream_quotes( send_chan: trio.abc.SendChannel, diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 16fbe5e1..dda79d4e 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -162,6 +162,7 @@ _futes_venues = ( 'CMECRYPTO', 'COMEX', 'CMDTY', # special name case.. + 'CBOT', # (treasury) yield futures ) _adhoc_futes_set = { @@ -197,6 +198,21 @@ _adhoc_futes_set = { 'xagusd.cmdty', # silver spot 'ni.comex', # silver futes 'qi.comex', # mini-silver futes + + # treasury yields + # etfs by duration: + # SHY -> IEI -> IEF -> TLT + 'zt.cbot', # 2y + 'z3n.cbot', # 3y + 'zf.cbot', # 5y + 'zn.cbot', # 10y + 'zb.cbot', # 30y + + # (micros of above) + '2yy.cbot', + '5yy.cbot', + '10y.cbot', + '30y.cbot', } diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 52b26970..8ceb1602 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -611,7 +611,7 @@ async def trades_dialogue( pp = table.pps[bsuid] if msg.size != pp.size: log.error( - 'Position mismatch {pp.symbol.front_fqsn()}:\n' + f'Position mismatch {pp.symbol.front_fqsn()}:\n' f'ib: {msg.size}\n' f'piker: {pp.size}\n' ) diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 442cbdeb..bee69ae6 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -135,7 +135,10 @@ async def open_history_client( # fx cons seem to not provide this endpoint? 'idealpro' not in fqsn ): - head_dt = await proxy.get_head_time(fqsn=fqsn) + try: + head_dt = await proxy.get_head_time(fqsn=fqsn) + except RequestError: + head_dt = None async def get_hist( timeframe: float, diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py index 80feab49..802fb699 100644 --- a/piker/brokers/kraken/api.py +++ b/piker/brokers/kraken/api.py @@ -510,10 +510,6 @@ class Client: ''' ticker = cls._ntable[ticker] - symlen = len(ticker) - if symlen != 6: - raise ValueError(f'Unhandled symbol: {ticker}') - return ticker.lower() diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index c77f2140..b33050ff 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -413,20 +413,27 @@ async def trades_dialogue( ) -> AsyncIterator[dict[str, Any]]: - # XXX: required to propagate ``tractor`` loglevel to piker logging + # XXX: required to propagate ``tractor`` loglevel to ``piker`` logging get_console_log(loglevel or tractor.current_actor().loglevel) async with get_client() as client: - # TODO: make ems flip to paper mode via - # some returned signal if the user only wants to use - # the data feed or we return this? - # await ctx.started(({}, ['paper'])) - if not client._api_key: raise RuntimeError( 'Missing Kraken API key in `brokers.toml`!?!?') + # TODO: make ems flip to paper mode via + # some returned signal if the user only wants to use + # the data feed or we return this? + # else: + # await ctx.started(({}, ['paper'])) + + # NOTE: currently we expect the user to define a "source fiat" + # (much like the web UI let's you set an "account currency") + # such that all positions (nested or flat) will be translated to + # this source currency's terms. + src_fiat = client.conf['src_fiat'] + # auth required block acctid = client._name acc_name = 'kraken.' + acctid @@ -444,10 +451,9 @@ async def trades_dialogue( # NOTE: testing code for making sure the rt incremental update # of positions, via newly generated msgs works. In order to test # this, - # - delete the *ABSOLUTE LAST* entry from accont's corresponding + # - delete the *ABSOLUTE LAST* entry from account's corresponding # trade ledgers file (NOTE this MUST be the last record - # delivered from the - # api ledger), + # delivered from the api ledger), # - open you ``pps.toml`` and find that same tid and delete it # from the pp's clears table, # - set this flag to `True` @@ -486,27 +492,68 @@ async def trades_dialogue( # and do diff with ledger to determine # what amount of trades-transactions need # to be reloaded. - sizes = await client.get_balances() - for dst, size in sizes.items(): + balances = await client.get_balances() + for dst, size in balances.items(): # we don't care about tracking positions # in the user's source fiat currency. - if dst == client.conf['src_fiat']: + if dst == src_fiat: continue - def has_pp(dst: str) -> Position | bool: - pps_dst_assets = {bsuid[:3]: bsuid for bsuid in table.pps} - pair = pps_dst_assets.get(dst) - pp = table.pps.get(pair) + def has_pp( + dst: str, + size: float, - if ( - not pair or not pp - or not math.isclose(pp.size, size) - ): - return False + ) -> Position | bool: - return pp + src2dst: dict[str, str] = {} + for bsuid in table.pps: + try: + dst_name_start = bsuid.rindex(src_fiat) + except ( + ValueError, # substr not found + ): + # TODO: handle nested positions..(i.e. + # positions where the src fiat was used to + # buy some other dst which was furhter used + # to buy another dst..) + log.warning( + f'No src fiat {src_fiat} found in {bsuid}?' + ) + continue - pos = has_pp(dst) + _dst = bsuid[:dst_name_start] + if _dst != dst: + continue + + src2dst[src_fiat] = dst + + for src, dst in src2dst.items(): + pair = f'{dst}{src_fiat}' + pp = table.pps.get(pair) + if ( + pp + and math.isclose(pp.size, size) + ): + return pp + + elif ( + size == 0 + and pp.size + ): + log.warning( + f'`kraken` account says you have a ZERO ' + f'balance for {bsuid}:{pair}\n' + f'but piker seems to think `{pp.size}`\n' + 'This is likely a discrepancy in piker ' + 'accounting if the above number is' + "large,' though it's likely to due lack" + "f tracking xfers fees.." + ) + return pp + + return False + + pos = has_pp(dst, size) if not pos: # we have a balance for which there is no pp @@ -514,12 +561,15 @@ async def trades_dialogue( # ledger. updated = table.update_from_trans(ledger_trans) log.info(f'Updated pps from ledger:\n{pformat(updated)}') - pos = has_pp(dst) + pos = has_pp(dst, size) - if not pos and not simulate_pp_update: + if ( + not pos + and not simulate_pp_update + ): # try reloading from API table.update_from_trans(api_trans) - pos = has_pp(dst) + pos = has_pp(dst, size) if not pos: # get transfers to make sense of abs balances. @@ -557,7 +607,7 @@ async def trades_dialogue( f'{pformat(updated)}' ) - if not has_pp(dst): + if has_pp(dst, size): raise ValueError( 'Could not reproduce balance:\n' f'dst: {dst}, {size}\n' diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index 7088011d..57fc0126 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -303,24 +303,6 @@ async def open_history_client( yield get_ohlc, {'erlangs': 1, 'rate': 1} -async def backfill_bars( - - sym: str, - shm: ShmArray, # type: ignore # noqa - count: int = 10, # NOTE: any more and we'll overrun the underlying buffer - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, - -) -> None: - ''' - Fill historical bars into shared mem / storage afap. - ''' - with trio.CancelScope() as cs: - async with open_cached_client('kraken') as client: - bars = await client.bars(symbol=sym) - shm.push(bars) - task_status.started(cs) - - async def stream_quotes( send_chan: trio.abc.SendChannel, @@ -419,14 +401,15 @@ async def stream_quotes( yield # unsub from all pairs on teardown - await ws.send_msg({ - 'pair': list(ws_pairs.values()), - 'event': 'unsubscribe', - 'subscription': ['ohlc', 'spread'], - }) + if ws.connected(): + await ws.send_msg({ + 'pair': list(ws_pairs.values()), + 'event': 'unsubscribe', + 'subscription': ['ohlc', 'spread'], + }) - # XXX: do we need to ack the unsub? - # await ws.recv_msg() + # XXX: do we need to ack the unsub? + # await ws.recv_msg() # see the tips on reconnection logic: # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 8af82d61..1577a678 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -18,16 +18,24 @@ ToOlS fOr CoPInG wITh "tHE wEB" protocols. """ -from contextlib import asynccontextmanager, AsyncExitStack +from contextlib import ( + asynccontextmanager, + AsyncExitStack, +) from itertools import count from types import ModuleType -from typing import Any, Optional, Callable, AsyncGenerator +from typing import ( + Any, + Optional, + Callable, + AsyncGenerator, + Iterable, +) import json -import sys import trio import trio_websocket -from wsproto.utilities import LocalProtocolError +from wsproto.utilities import LocalProtocolError from trio_websocket._impl import ( ConnectionClosed, DisconnectionTimeout, @@ -44,9 +52,12 @@ log = get_logger(__name__) class NoBsWs: - """Make ``trio_websocket`` sockets stay up no matter the bs. + ''' + Make ``trio_websocket`` sockets stay up no matter the bs. - """ + You can provide a ``fixture`` async-context-manager which will be + enter/exitted around each reconnect operation. + ''' recon_errors = ( ConnectionClosed, DisconnectionTimeout, @@ -68,10 +79,16 @@ class NoBsWs: self._stack = stack self._ws: 'WebSocketConnection' = None # noqa + # TODO: is there some method we can call + # on the underlying `._ws` to get this? + self._connected: bool = False + async def _connect( self, tries: int = 1000, ) -> None: + + self._connected = False while True: try: await self._stack.aclose() @@ -96,6 +113,8 @@ class NoBsWs: assert ret is None log.info(f'Connection success: {self.url}') + + self._connected = True return self._ws except self.recon_errors as err: @@ -105,11 +124,15 @@ class NoBsWs: f'{type(err)}...retry attempt {i}' ) await trio.sleep(0.5) + self._connected = False continue else: log.exception('ws connection fail...') raise last_err + def connected(self) -> bool: + return self._connected + async def send_msg( self, data: Any, @@ -161,6 +184,7 @@ async def open_autorecon_ws( ''' JSONRPC response-request style machinery for transparent multiplexing of msgs over a NoBsWs. + ''' @@ -170,6 +194,7 @@ class JSONRPCResult(Struct): result: Optional[dict] = None error: Optional[dict] = None + @asynccontextmanager async def open_jsonrpc_session( url: str, @@ -220,15 +245,16 @@ async def open_jsonrpc_session( async def recv_task(): ''' - receives every ws message and stores it in its corresponding result - field, then sets the event to wakeup original sender tasks. - also recieves responses to requests originated from the server side. - ''' + receives every ws message and stores it in its corresponding + result field, then sets the event to wakeup original sender + tasks. also recieves responses to requests originated from + the server side. + ''' async for msg in ws: match msg: case { - 'result': result, + 'result': _, 'id': mid, } if res_entry := rpc_results.get(mid): @@ -239,7 +265,9 @@ async def open_jsonrpc_session( 'result': _, 'id': mid, } if not rpc_results.get(mid): - log.warning(f'Wasn\'t expecting ws msg: {json.dumps(msg, indent=4)}') + log.warning( + f'Unexpected ws msg: {json.dumps(msg, indent=4)}' + ) case { 'method': _, @@ -259,7 +287,6 @@ async def open_jsonrpc_session( case _: log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') - n.start_soon(recv_task) yield json_rpc n.cancel_scope.cancel() diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index eb5eaff4..5d389e29 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -439,7 +439,7 @@ async def cascade( profiler.finish() async for i in istream: - # log.runtime(f'FSP incrementing {i}') + # print(f'FSP incrementing {i}') # respawn the compute task if the source # array has been updated such that we compute