From 800773e585d2903bbf22e2073d04f55dd8c7573b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 8 Dec 2022 15:46:27 -0500 Subject: [PATCH 01/14] ib: ignore throttles on `.get_head_time()` --- piker/brokers/ib/feed.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 442cbdeb..bee69ae6 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -135,7 +135,10 @@ async def open_history_client( # fx cons seem to not provide this endpoint? 'idealpro' not in fqsn ): - head_dt = await proxy.get_head_time(fqsn=fqsn) + try: + head_dt = await proxy.get_head_time(fqsn=fqsn) + except RequestError: + head_dt = None async def get_hist( timeframe: float, From 9be245e955b863ac9dfd0a3b08b8bb101fc421b0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 14 Dec 2022 21:29:34 -0500 Subject: [PATCH 02/14] `ib`: Add treasury yield futs to adhoc fqsn set --- piker/brokers/ib/api.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 16fbe5e1..dda79d4e 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -162,6 +162,7 @@ _futes_venues = ( 'CMECRYPTO', 'COMEX', 'CMDTY', # special name case.. + 'CBOT', # (treasury) yield futures ) _adhoc_futes_set = { @@ -197,6 +198,21 @@ _adhoc_futes_set = { 'xagusd.cmdty', # silver spot 'ni.comex', # silver futes 'qi.comex', # mini-silver futes + + # treasury yields + # etfs by duration: + # SHY -> IEI -> IEF -> TLT + 'zt.cbot', # 2y + 'z3n.cbot', # 3y + 'zf.cbot', # 5y + 'zn.cbot', # 10y + 'zb.cbot', # 30y + + # (micros of above) + '2yy.cbot', + '5yy.cbot', + '10y.cbot', + '30y.cbot', } From a9832dc0cb0eacf0461a0c7a41429e391fca6812 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 16 Dec 2022 20:59:31 -0500 Subject: [PATCH 03/14] `ib`: fix position log msg --- piker/brokers/ib/broker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 52b26970..8ceb1602 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -611,7 +611,7 @@ async def trades_dialogue( pp = table.pps[bsuid] if msg.size != pp.size: log.error( - 'Position mismatch {pp.symbol.front_fqsn()}:\n' + f'Position mismatch {pp.symbol.front_fqsn()}:\n' f'ib: {msg.size}\n' f'piker: {pp.size}\n' ) From f3ef73ef41c737fd3fceca3e29e951b8d201b650 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 21 Dec 2022 10:50:52 -0500 Subject: [PATCH 04/14] `kraken`: drop symbol token size =6 check --- piker/brokers/kraken/api.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/piker/brokers/kraken/api.py b/piker/brokers/kraken/api.py index 80feab49..802fb699 100644 --- a/piker/brokers/kraken/api.py +++ b/piker/brokers/kraken/api.py @@ -510,10 +510,6 @@ class Client: ''' ticker = cls._ntable[ticker] - symlen = len(ticker) - if symlen != 6: - raise ValueError(f'Unhandled symbol: {ticker}') - return ticker.lower() From 70ad1a186083e8b17b9d3b0e693b73a8be33f5f3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 21 Dec 2022 11:08:05 -0500 Subject: [PATCH 05/14] `kraken`: don't presume src fiat symbol size in pos predicate --- piker/brokers/kraken/broker.py | 87 +++++++++++++++++++++++----------- 1 file changed, 60 insertions(+), 27 deletions(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index c77f2140..bd992c95 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -413,20 +413,27 @@ async def trades_dialogue( ) -> AsyncIterator[dict[str, Any]]: - # XXX: required to propagate ``tractor`` loglevel to piker logging + # XXX: required to propagate ``tractor`` loglevel to ``piker`` logging get_console_log(loglevel or tractor.current_actor().loglevel) async with get_client() as client: - # TODO: make ems flip to paper mode via - # some returned signal if the user only wants to use - # the data feed or we return this? - # await ctx.started(({}, ['paper'])) - if not client._api_key: raise RuntimeError( 'Missing Kraken API key in `brokers.toml`!?!?') + # TODO: make ems flip to paper mode via + # some returned signal if the user only wants to use + # the data feed or we return this? + # else: + # await ctx.started(({}, ['paper'])) + + # NOTE: currently we expect the user to define a "source fiat" + # (much like the web UI let's you set an "account currency") + # such that all positions (nested or flat) will be translated to + # this source currency's terms. + src_fiat = client.conf['src_fiat'] + # auth required block acctid = client._name acc_name = 'kraken.' + acctid @@ -444,10 +451,9 @@ async def trades_dialogue( # NOTE: testing code for making sure the rt incremental update # of positions, via newly generated msgs works. In order to test # this, - # - delete the *ABSOLUTE LAST* entry from accont's corresponding + # - delete the *ABSOLUTE LAST* entry from account's corresponding # trade ledgers file (NOTE this MUST be the last record - # delivered from the - # api ledger), + # delivered from the api ledger), # - open you ``pps.toml`` and find that same tid and delete it # from the pp's clears table, # - set this flag to `True` @@ -486,27 +492,51 @@ async def trades_dialogue( # and do diff with ledger to determine # what amount of trades-transactions need # to be reloaded. - sizes = await client.get_balances() - for dst, size in sizes.items(): + balances = await client.get_balances() + for dst, size in balances.items(): # we don't care about tracking positions # in the user's source fiat currency. - if dst == client.conf['src_fiat']: + if dst == src_fiat: continue - def has_pp(dst: str) -> Position | bool: - pps_dst_assets = {bsuid[:3]: bsuid for bsuid in table.pps} - pair = pps_dst_assets.get(dst) - pp = table.pps.get(pair) + def has_pp( + dst: str, + size: float, - if ( - not pair or not pp - or not math.isclose(pp.size, size) - ): - return False + ) -> Position | bool: - return pp + src2dst: dict[str, str] = {} + for bsuid in table.pps: + try: + dst_name_start = bsuid.rindex(src_fiat) + except IndexError: + # TODO: handle nested positions..(i.e. + # positions where the src fiat was used to + # buy some other dst which was furhter used + # to buy another dst..) + log.warning( + f'No src fiat {src_fiat} found in {bsuid}?' + ) + continue - pos = has_pp(dst) + _dst = bsuid[:dst_name_start] + if _dst != dst: + continue + + src2dst[src_fiat] = dst + + for src, dst in src2dst.items(): + pair = f'{dst}{src_fiat}' + pp = table.pps.get(pair) + if ( + pp + and math.isclose(pp.size, size) + ): + return pp + + return False + + pos = has_pp(dst, size) if not pos: # we have a balance for which there is no pp @@ -514,12 +544,15 @@ async def trades_dialogue( # ledger. updated = table.update_from_trans(ledger_trans) log.info(f'Updated pps from ledger:\n{pformat(updated)}') - pos = has_pp(dst) + pos = has_pp(dst, size) - if not pos and not simulate_pp_update: + if ( + not pos + and not simulate_pp_update + ): # try reloading from API table.update_from_trans(api_trans) - pos = has_pp(dst) + pos = has_pp(dst, size) if not pos: # get transfers to make sense of abs balances. @@ -557,7 +590,7 @@ async def trades_dialogue( f'{pformat(updated)}' ) - if not has_pp(dst): + if not has_pp(dst, size): raise ValueError( 'Could not reproduce balance:\n' f'dst: {dst}, {size}\n' From a146ad9e690ae41828186fe4449edf1dd8e0c324 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 4 Jan 2023 23:18:09 -0500 Subject: [PATCH 06/14] Never restart `ib-gw` containers on boot --- dockering/ib/docker-compose.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dockering/ib/docker-compose.yml b/dockering/ib/docker-compose.yml index f8be5684..8c676623 100644 --- a/dockering/ib/docker-compose.yml +++ b/dockering/ib/docker-compose.yml @@ -8,7 +8,7 @@ services: # https://github.com/waytrade/ib-gateway-docker#supported-tags # image: waytrade/ib-gateway:981.3j image: waytrade/ib-gateway:1012.2i - restart: always # restart whenev there's a crash or user clicsk + restart: 'no' # restart on boot whenev there's a crash or user clicsk network_mode: 'host' volumes: @@ -64,7 +64,7 @@ services: # ib_gw_live: # image: waytrade/ib-gateway:1012.2i - # restart: always + # restart: no # network_mode: 'host' # volumes: From cf6e44cb9c7fc0f34368e38340ecdf1c55afb124 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 5 Jan 2023 17:28:10 -0500 Subject: [PATCH 07/14] Add `NoBsWs.connected()` predicate --- piker/data/_web_bs.py | 53 ++++++++++++++++++++++++++++++++----------- 1 file changed, 40 insertions(+), 13 deletions(-) diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 8af82d61..1577a678 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -18,16 +18,24 @@ ToOlS fOr CoPInG wITh "tHE wEB" protocols. """ -from contextlib import asynccontextmanager, AsyncExitStack +from contextlib import ( + asynccontextmanager, + AsyncExitStack, +) from itertools import count from types import ModuleType -from typing import Any, Optional, Callable, AsyncGenerator +from typing import ( + Any, + Optional, + Callable, + AsyncGenerator, + Iterable, +) import json -import sys import trio import trio_websocket -from wsproto.utilities import LocalProtocolError +from wsproto.utilities import LocalProtocolError from trio_websocket._impl import ( ConnectionClosed, DisconnectionTimeout, @@ -44,9 +52,12 @@ log = get_logger(__name__) class NoBsWs: - """Make ``trio_websocket`` sockets stay up no matter the bs. + ''' + Make ``trio_websocket`` sockets stay up no matter the bs. - """ + You can provide a ``fixture`` async-context-manager which will be + enter/exitted around each reconnect operation. + ''' recon_errors = ( ConnectionClosed, DisconnectionTimeout, @@ -68,10 +79,16 @@ class NoBsWs: self._stack = stack self._ws: 'WebSocketConnection' = None # noqa + # TODO: is there some method we can call + # on the underlying `._ws` to get this? + self._connected: bool = False + async def _connect( self, tries: int = 1000, ) -> None: + + self._connected = False while True: try: await self._stack.aclose() @@ -96,6 +113,8 @@ class NoBsWs: assert ret is None log.info(f'Connection success: {self.url}') + + self._connected = True return self._ws except self.recon_errors as err: @@ -105,11 +124,15 @@ class NoBsWs: f'{type(err)}...retry attempt {i}' ) await trio.sleep(0.5) + self._connected = False continue else: log.exception('ws connection fail...') raise last_err + def connected(self) -> bool: + return self._connected + async def send_msg( self, data: Any, @@ -161,6 +184,7 @@ async def open_autorecon_ws( ''' JSONRPC response-request style machinery for transparent multiplexing of msgs over a NoBsWs. + ''' @@ -170,6 +194,7 @@ class JSONRPCResult(Struct): result: Optional[dict] = None error: Optional[dict] = None + @asynccontextmanager async def open_jsonrpc_session( url: str, @@ -220,15 +245,16 @@ async def open_jsonrpc_session( async def recv_task(): ''' - receives every ws message and stores it in its corresponding result - field, then sets the event to wakeup original sender tasks. - also recieves responses to requests originated from the server side. - ''' + receives every ws message and stores it in its corresponding + result field, then sets the event to wakeup original sender + tasks. also recieves responses to requests originated from + the server side. + ''' async for msg in ws: match msg: case { - 'result': result, + 'result': _, 'id': mid, } if res_entry := rpc_results.get(mid): @@ -239,7 +265,9 @@ async def open_jsonrpc_session( 'result': _, 'id': mid, } if not rpc_results.get(mid): - log.warning(f'Wasn\'t expecting ws msg: {json.dumps(msg, indent=4)}') + log.warning( + f'Unexpected ws msg: {json.dumps(msg, indent=4)}' + ) case { 'method': _, @@ -259,7 +287,6 @@ async def open_jsonrpc_session( case _: log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') - n.start_soon(recv_task) yield json_rpc n.cancel_scope.cancel() From d2aee00a566df9d61c623fad44b307f9f6868834 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 5 Jan 2023 17:46:40 -0500 Subject: [PATCH 08/14] `kraken`: only do unsub if connected Trying to send a message in the `NoBsWs.fixture()` exit when the ws is not currently disconnected causes a double `._stack.close()` call which will corrupt `trio`'s coro stack. Instead only do the unsub if we detect the ws is still up. Also drops the legacy `backfill_bars()` module endpoint. Fixes #437 --- piker/brokers/kraken/feed.py | 33 ++++++++------------------------- 1 file changed, 8 insertions(+), 25 deletions(-) diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index 7088011d..57fc0126 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -303,24 +303,6 @@ async def open_history_client( yield get_ohlc, {'erlangs': 1, 'rate': 1} -async def backfill_bars( - - sym: str, - shm: ShmArray, # type: ignore # noqa - count: int = 10, # NOTE: any more and we'll overrun the underlying buffer - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, - -) -> None: - ''' - Fill historical bars into shared mem / storage afap. - ''' - with trio.CancelScope() as cs: - async with open_cached_client('kraken') as client: - bars = await client.bars(symbol=sym) - shm.push(bars) - task_status.started(cs) - - async def stream_quotes( send_chan: trio.abc.SendChannel, @@ -419,14 +401,15 @@ async def stream_quotes( yield # unsub from all pairs on teardown - await ws.send_msg({ - 'pair': list(ws_pairs.values()), - 'event': 'unsubscribe', - 'subscription': ['ohlc', 'spread'], - }) + if ws.connected(): + await ws.send_msg({ + 'pair': list(ws_pairs.values()), + 'event': 'unsubscribe', + 'subscription': ['ohlc', 'spread'], + }) - # XXX: do we need to ack the unsub? - # await ws.recv_msg() + # XXX: do we need to ack the unsub? + # await ws.recv_msg() # see the tips on reconnection logic: # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds From 23835f2c088608929d5f30f41a2a09291c9721ee Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 5 Jan 2023 17:50:15 -0500 Subject: [PATCH 09/14] `deribit`: drop old `backfill_bars()` ep --- piker/brokers/deribit/feed.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index 714ef61b..deb0422f 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -94,21 +94,6 @@ async def open_history_client( yield get_ohlc, {'erlangs': 3, 'rate': 3} -async def backfill_bars( - symbol: str, - shm: ShmArray, # type: ignore # noqa - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, -) -> None: - """Fill historical bars into shared mem / storage afap. - """ - instrument = symbol - with trio.CancelScope() as cs: - async with open_cached_client('deribit') as client: - bars = await client.bars(instrument) - shm.push(bars) - task_status.started(cs) - - async def stream_quotes( send_chan: trio.abc.SendChannel, From 73379d362737ca41d70630fe522c2aaa9d2ee21f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 10 Jan 2023 13:13:52 -0500 Subject: [PATCH 10/14] Run CI on all PRs --- .github/workflows/ci.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6af351ec..ca18f2c4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,9 +3,8 @@ name: CI on: # Triggers the workflow on push or pull request events but only for the master branch - push: - branches: [ master ] pull_request: + push: branches: [ master ] # Allows you to run this workflow manually from the Actions tab From 94290c7d8bc59fa4ceaeb50c13c59b8b469ebd83 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 12 Jan 2023 15:59:37 -0500 Subject: [PATCH 11/14] `kraken`: ignore mismatched zero-ed pps (for now) See more details in the GH comment: https://github.com/pikers/piker/issues/373#issuecomment-1380988581 More or less we need to pull and include the transfer fees for withdrawals in our ledger tracking but this serves as a sloppy workaround for the moment. --- piker/brokers/kraken/broker.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index bd992c95..6432ee57 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -534,6 +534,21 @@ async def trades_dialogue( ): return pp + elif ( + size == 0 + and pp.size + ): + log.warning( + f'`kraken` account says you have a ZERO ' + f'balance for {bsuid}:{pair}\n' + f'but piker seems to think `{pp.size}`\n' + 'This is likely a discrepancy in piker ' + 'accounting if the above number is' + "large,' though it's likely to due lack" + "f tracking xfers fees.." + ) + return pp + return False pos = has_pp(dst, size) @@ -590,7 +605,7 @@ async def trades_dialogue( f'{pformat(updated)}' ) - if not has_pp(dst, size): + if has_pp(dst, size): raise ValueError( 'Could not reproduce balance:\n' f'dst: {dst}, {size}\n' From b89fd9652ce815ee3411ee1f6a54f4f9705216c2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 5 Jan 2023 12:44:58 -0500 Subject: [PATCH 12/14] `binance`: always request an extra 1min OHLC bar Seems that by default their history indexing rounds down/back to the previous time step, so make sure we add a minute inside `Client.bars()` when the `end_dt=None`, indicating "get the latest bar". Add a breakpoint block that should trigger whenever the latest bar vs. the latest epoch time is mismatched; we'll remove this after some testing verifying the history bars issue is resolved. Further this drops the legacy `backfill_bars()` endpoint which has been deprecated and unused for a while. --- piker/fsp/_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index eb5eaff4..5d389e29 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -439,7 +439,7 @@ async def cascade( profiler.finish() async for i in istream: - # log.runtime(f'FSP incrementing {i}') + # print(f'FSP incrementing {i}') # respawn the compute task if the source # array has been updated such that we compute From afc45a8e16b78eede2293ec4613aaafebfa4441a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 19 Jan 2023 10:35:39 -0500 Subject: [PATCH 13/14] `binance`: same thing, only unsub when connected --- piker/brokers/binance.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 5ea7860a..0c177db8 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -519,14 +519,15 @@ async def stream_quotes( subs.append("{sym}@bookTicker") # unsub from all pairs on teardown - await ws.send_msg({ - "method": "UNSUBSCRIBE", - "params": subs, - "id": uid, - }) + if ws.connected(): + await ws.send_msg({ + "method": "UNSUBSCRIBE", + "params": subs, + "id": uid, + }) - # XXX: do we need to ack the unsub? - # await ws.recv_msg() + # XXX: do we need to ack the unsub? + # await ws.recv_msg() async with open_autorecon_ws( 'wss://stream.binance.com/ws', From 090d1ba524c1baf112f55b3f610655f2d5d0866c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 16 Jan 2023 16:40:22 -0500 Subject: [PATCH 14/14] `kraken`: catch value error not index on missing `src_fiat` in pair --- piker/brokers/kraken/broker.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 6432ee57..b33050ff 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -509,7 +509,9 @@ async def trades_dialogue( for bsuid in table.pps: try: dst_name_start = bsuid.rindex(src_fiat) - except IndexError: + except ( + ValueError, # substr not found + ): # TODO: handle nested positions..(i.e. # positions where the src fiat was used to # buy some other dst which was furhter used