diff --git a/piker/brokers/__init__.py b/piker/brokers/__init__.py index 0c328d9f..94e4cbe1 100644 --- a/piker/brokers/__init__.py +++ b/piker/brokers/__init__.py @@ -98,13 +98,14 @@ async def open_cached_client( If one has not been setup do it and cache it. ''' - brokermod = get_brokermod(brokername) + brokermod: ModuleType = get_brokermod(brokername) + + # TODO: make abstract or `typing.Protocol` + # client: Client async with maybe_open_context( acm_func=brokermod.get_client, kwargs=kwargs, - ) as (cache_hit, client): - if cache_hit: log.runtime(f'Reusing existing {client}') diff --git a/piker/brokers/cli.py b/piker/brokers/cli.py index d54b2203..626b4ff8 100644 --- a/piker/brokers/cli.py +++ b/piker/brokers/cli.py @@ -471,11 +471,15 @@ def search( ''' # global opts - brokermods = list(config['brokermods'].values()) + brokermods: list[ModuleType] = list(config['brokermods'].values()) + + # TODO: this is coming from the `search --pdb` NOT from + # the `piker --pdb` XD .. + # -[ ] pull from the parent click ctx's values..dumdum + # assert pdb # define tractor entrypoint async def main(func): - async with maybe_open_pikerd( loglevel=config['loglevel'], debug_mode=pdb, diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 6111d307..c1aa88ac 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -22,7 +22,9 @@ routines should be primitive data types where possible. """ import inspect from types import ModuleType -from typing import List, Dict, Any, Optional +from typing import ( + Any, +) import trio @@ -34,8 +36,10 @@ from ..accounting import MktPair async def api(brokername: str, methname: str, **kwargs) -> dict: - """Make (proxy through) a broker API call by name and return its result. - """ + ''' + Make (proxy through) a broker API call by name and return its result. + + ''' brokermod = get_brokermod(brokername) async with brokermod.get_client() as client: meth = getattr(client, methname, None) @@ -62,10 +66,14 @@ async def api(brokername: str, methname: str, **kwargs) -> dict: async def stocks_quote( brokermod: ModuleType, - tickers: List[str] -) -> Dict[str, Dict[str, Any]]: - """Return quotes dict for ``tickers``. - """ + tickers: list[str] + +) -> dict[str, dict[str, Any]]: + ''' + Return a `dict` of snapshot quotes for the provided input + `tickers`: a `list` of fqmes. + + ''' async with brokermod.get_client() as client: return await client.quote(tickers) @@ -74,13 +82,15 @@ async def stocks_quote( async def option_chain( brokermod: ModuleType, symbol: str, - date: Optional[str] = None, -) -> Dict[str, Dict[str, Dict[str, Any]]]: - """Return option chain for ``symbol`` for ``date``. + date: str|None = None, +) -> dict[str, dict[str, dict[str, Any]]]: + ''' + Return option chain for ``symbol`` for ``date``. By default all expiries are returned. If ``date`` is provided then contract quotes for that single expiry are returned. - """ + + ''' async with brokermod.get_client() as client: if date: id = int((await client.tickers2ids([symbol]))[symbol]) @@ -98,7 +108,7 @@ async def option_chain( # async def contracts( # brokermod: ModuleType, # symbol: str, -# ) -> Dict[str, Dict[str, Dict[str, Any]]]: +# ) -> dict[str, dict[str, dict[str, Any]]]: # """Return option contracts (all expiries) for ``symbol``. # """ # async with brokermod.get_client() as client: @@ -110,15 +120,24 @@ async def bars( brokermod: ModuleType, symbol: str, **kwargs, -) -> Dict[str, Dict[str, Dict[str, Any]]]: - """Return option contracts (all expiries) for ``symbol``. - """ +) -> dict[str, dict[str, dict[str, Any]]]: + ''' + Return option contracts (all expiries) for ``symbol``. + + ''' async with brokermod.get_client() as client: return await client.bars(symbol, **kwargs) -async def search_w_brokerd(name: str, pattern: str) -> dict: +async def search_w_brokerd( + name: str, + pattern: str, +) -> dict: + # TODO: WHY NOT WORK!?! + # when we `step` through the next block? + # import tractor + # await tractor.pause() async with open_cached_client(name) as client: # TODO: support multiple asset type concurrent searches. @@ -130,12 +149,12 @@ async def symbol_search( pattern: str, **kwargs, -) -> Dict[str, Dict[str, Dict[str, Any]]]: +) -> dict[str, dict[str, dict[str, Any]]]: ''' Return symbol info from broker. ''' - results = [] + results: list[str] = [] async def search_backend( brokermod: ModuleType @@ -143,6 +162,13 @@ async def symbol_search( brokername: str = mod.name + # TODO: figure this the FUCK OUT + # -> ok so obvi in the root actor any async task that's + # spawned outside the main tractor-root-actor task needs to + # call this.. + # await tractor.devx._debug.maybe_init_greenback() + # tractor.pause_from_sync() + async with maybe_spawn_brokerd( mod.name, infect_asyncio=getattr( @@ -162,7 +188,6 @@ async def symbol_search( )) async with trio.open_nursery() as n: - for mod in brokermods: n.start_soon(search_backend, mod.name) @@ -172,11 +197,13 @@ async def symbol_search( async def mkt_info( brokermod: ModuleType, fqme: str, + **kwargs, ) -> MktPair: ''' - Return MktPair info from broker including src and dst assets. + Return the `piker.accounting.MktPair` info struct from a given + backend broker tradable src/dst asset pair. ''' async with open_cached_client(brokermod.name) as client: diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 2c1a9224..062b2c2e 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -587,7 +587,7 @@ async def get_bars( data_cs.cancel() # spawn new data reset task - data_cs, reset_done = await nurse.start( + data_cs, reset_done = await tn.start( partial( wait_on_data_reset, proxy, @@ -607,11 +607,11 @@ async def get_bars( # such that simultaneous symbol queries don't try data resettingn # too fast.. unset_resetter: bool = False - async with trio.open_nursery() as nurse: + async with trio.open_nursery() as tn: # start history request that we allow # to run indefinitely until a result is acquired - nurse.start_soon(query) + tn.start_soon(query) # start history reset loop which waits up to the timeout # for a result before triggering a data feed reset. @@ -631,7 +631,7 @@ async def get_bars( unset_resetter: bool = True # spawn new data reset task - data_cs, reset_done = await nurse.start( + data_cs, reset_done = await tn.start( partial( wait_on_data_reset, proxy, @@ -705,7 +705,9 @@ async def _setup_quote_stream( # to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore def teardown(): ticker.updateEvent.disconnect(push) - log.error(f"Disconnected stream for `{symbol}`") + log.error( + f'Disconnected stream for `{symbol}`' + ) client.ib.cancelMktData(contract) # decouple broadcast mem chan @@ -761,7 +763,10 @@ async def open_aio_quote_stream( symbol: str, contract: Contract | None = None, -) -> trio.abc.ReceiveStream: +) -> ( + trio.abc.Channel| # iface + tractor.to_asyncio.LinkedTaskChannel # actually +): from tractor.trionics import broadcast_receiver global _quote_streams @@ -778,6 +783,7 @@ async def open_aio_quote_stream( yield from_aio return + from_aio: tractor.to_asyncio.LinkedTaskChannel async with tractor.to_asyncio.open_channel_from( _setup_quote_stream, symbol=symbol, @@ -983,17 +989,18 @@ async def stream_quotes( ) cs: trio.CancelScope | None = None startup: bool = True + iter_quotes: trio.abc.Channel while ( startup or cs.cancel_called ): with trio.CancelScope() as cs: async with ( - trio.open_nursery() as nurse, + trio.open_nursery() as tn, open_aio_quote_stream( symbol=sym, contract=con, - ) as stream, + ) as iter_quotes, ): # ugh, clear ticks since we've consumed them # (ahem, ib_insync is stateful trash) @@ -1021,9 +1028,9 @@ async def stream_quotes( await rt_ev.wait() cs.cancel() # cancel called should now be set - nurse.start_soon(reset_on_feed) + tn.start_soon(reset_on_feed) - async with aclosing(stream): + async with aclosing(iter_quotes): # if syminfo.get('no_vlm', False): if not init_msg.shm_write_opts['has_vlm']: @@ -1038,19 +1045,21 @@ async def stream_quotes( # wait for real volume on feed (trading might be # closed) while True: - ticker = await stream.receive() + ticker = await iter_quotes.receive() # for a real volume contract we rait for # the first "real" trade to take place if ( # not calc_price # and not ticker.rtTime - not ticker.rtTime + False + # not ticker.rtTime ): # spin consuming tickers until we # get a real market datum log.debug(f"New unsent ticker: {ticker}") continue + else: log.debug("Received first volume tick") # ugh, clear ticks since we've @@ -1066,13 +1075,18 @@ async def stream_quotes( log.debug(f"First ticker received {quote}") # tell data-layer spawner-caller that live - # quotes are now streaming. + # quotes are now active desptie not having + # necessarily received a first vlm/clearing + # tick. + ticker = await iter_quotes.receive() feed_is_live.set() + fqme: str = quote['fqme'] + await send_chan.send({fqme: quote}) # last = time.time() - async for ticker in stream: + async for ticker in iter_quotes: quote = normalize(ticker) - fqme = quote['fqme'] + fqme: str = quote['fqme'] await send_chan.send({fqme: quote}) # ugh, clear ticks since we've consumed them diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index eb5963cd..3c571747 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -544,7 +544,7 @@ async def open_trade_dialog( # to be reloaded. balances: dict[str, float] = await client.get_balances() - verify_balances( + await verify_balances( acnt, src_fiat, balances, diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 31133f23..97ef5a3a 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -37,6 +37,12 @@ import tractor from async_generator import asynccontextmanager import numpy as np import wrapt + +# TODO, port to `httpx`/`trio-websocket` whenver i get back to +# writing a proper ws-api streamer for this backend (since the data +# feeds are free now) as per GH feat-req: +# https://github.com/pikers/piker/issues/509 +# import asks from ..calc import humanize, percent_change