Compare commits
	
		
			6 Commits 
		
	
	
		
			30483c3218
			...
			ef9bc7d1ed
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | ef9bc7d1ed | |
|  | 0aa252507f | |
|  | 9ac1745271 | |
|  | ded2705397 | |
|  | 65fbd435a8 | |
|  | 7a21e87688 | 
|  | @ -98,13 +98,14 @@ async def open_cached_client( | |||
|     If one has not been setup do it and cache it. | ||||
| 
 | ||||
|     ''' | ||||
|     brokermod = get_brokermod(brokername) | ||||
|     brokermod: ModuleType = get_brokermod(brokername) | ||||
| 
 | ||||
|     # TODO: make abstract or `typing.Protocol` | ||||
|     # client: Client | ||||
|     async with maybe_open_context( | ||||
|         acm_func=brokermod.get_client, | ||||
|         kwargs=kwargs, | ||||
| 
 | ||||
|     ) as (cache_hit, client): | ||||
| 
 | ||||
|         if cache_hit: | ||||
|             log.runtime(f'Reusing existing {client}') | ||||
| 
 | ||||
|  |  | |||
|  | @ -471,11 +471,15 @@ def search( | |||
| 
 | ||||
|     ''' | ||||
|     # global opts | ||||
|     brokermods = list(config['brokermods'].values()) | ||||
|     brokermods: list[ModuleType] = list(config['brokermods'].values()) | ||||
| 
 | ||||
|     # TODO: this is coming from the `search --pdb` NOT from | ||||
|     # the `piker --pdb` XD .. | ||||
|     # -[ ] pull from the parent click ctx's values..dumdum | ||||
|     # assert pdb | ||||
| 
 | ||||
|     # define tractor entrypoint | ||||
|     async def main(func): | ||||
| 
 | ||||
|         async with maybe_open_pikerd( | ||||
|             loglevel=config['loglevel'], | ||||
|             debug_mode=pdb, | ||||
|  |  | |||
|  | @ -22,7 +22,9 @@ routines should be primitive data types where possible. | |||
| """ | ||||
| import inspect | ||||
| from types import ModuleType | ||||
| from typing import List, Dict, Any, Optional | ||||
| from typing import ( | ||||
|     Any, | ||||
| ) | ||||
| 
 | ||||
| import trio | ||||
| 
 | ||||
|  | @ -34,8 +36,10 @@ from ..accounting import MktPair | |||
| 
 | ||||
| 
 | ||||
| async def api(brokername: str, methname: str, **kwargs) -> dict: | ||||
|     """Make (proxy through) a broker API call by name and return its result. | ||||
|     """ | ||||
|     ''' | ||||
|     Make (proxy through) a broker API call by name and return its result. | ||||
| 
 | ||||
|     ''' | ||||
|     brokermod = get_brokermod(brokername) | ||||
|     async with brokermod.get_client() as client: | ||||
|         meth = getattr(client, methname, None) | ||||
|  | @ -62,10 +66,14 @@ async def api(brokername: str, methname: str, **kwargs) -> dict: | |||
| 
 | ||||
| async def stocks_quote( | ||||
|     brokermod: ModuleType, | ||||
|     tickers: List[str] | ||||
| ) -> Dict[str, Dict[str, Any]]: | ||||
|     """Return quotes dict for ``tickers``. | ||||
|     """ | ||||
|     tickers: list[str] | ||||
| 
 | ||||
| ) -> dict[str, dict[str, Any]]: | ||||
|     ''' | ||||
|     Return a `dict` of snapshot quotes for the provided input | ||||
|     `tickers`: a `list` of fqmes. | ||||
| 
 | ||||
|     ''' | ||||
|     async with brokermod.get_client() as client: | ||||
|         return await client.quote(tickers) | ||||
| 
 | ||||
|  | @ -74,13 +82,15 @@ async def stocks_quote( | |||
| async def option_chain( | ||||
|     brokermod: ModuleType, | ||||
|     symbol: str, | ||||
|     date: Optional[str] = None, | ||||
| ) -> Dict[str, Dict[str, Dict[str, Any]]]: | ||||
|     """Return option chain for ``symbol`` for ``date``. | ||||
|     date: str|None = None, | ||||
| ) -> dict[str, dict[str, dict[str, Any]]]: | ||||
|     ''' | ||||
|     Return option chain for ``symbol`` for ``date``. | ||||
| 
 | ||||
|     By default all expiries are returned. If ``date`` is provided | ||||
|     then contract quotes for that single expiry are returned. | ||||
|     """ | ||||
| 
 | ||||
|     ''' | ||||
|     async with brokermod.get_client() as client: | ||||
|         if date: | ||||
|             id = int((await client.tickers2ids([symbol]))[symbol]) | ||||
|  | @ -98,7 +108,7 @@ async def option_chain( | |||
| # async def contracts( | ||||
| #     brokermod: ModuleType, | ||||
| #     symbol: str, | ||||
| # ) -> Dict[str, Dict[str, Dict[str, Any]]]: | ||||
| # ) -> dict[str, dict[str, dict[str, Any]]]: | ||||
| #     """Return option contracts (all expiries) for ``symbol``. | ||||
| #     """ | ||||
| #     async with brokermod.get_client() as client: | ||||
|  | @ -110,15 +120,24 @@ async def bars( | |||
|     brokermod: ModuleType, | ||||
|     symbol: str, | ||||
|     **kwargs, | ||||
| ) -> Dict[str, Dict[str, Dict[str, Any]]]: | ||||
|     """Return option contracts (all expiries) for ``symbol``. | ||||
|     """ | ||||
| ) -> dict[str, dict[str, dict[str, Any]]]: | ||||
|     ''' | ||||
|     Return option contracts (all expiries) for ``symbol``. | ||||
| 
 | ||||
|     ''' | ||||
|     async with brokermod.get_client() as client: | ||||
|         return await client.bars(symbol, **kwargs) | ||||
| 
 | ||||
| 
 | ||||
| async def search_w_brokerd(name: str, pattern: str) -> dict: | ||||
| async def search_w_brokerd( | ||||
|     name: str, | ||||
|     pattern: str, | ||||
| ) -> dict: | ||||
| 
 | ||||
|     # TODO: WHY NOT WORK!?! | ||||
|     # when we `step` through the next block? | ||||
|     # import tractor | ||||
|     # await tractor.pause() | ||||
|     async with open_cached_client(name) as client: | ||||
| 
 | ||||
|         # TODO: support multiple asset type concurrent searches. | ||||
|  | @ -130,12 +149,12 @@ async def symbol_search( | |||
|     pattern: str, | ||||
|     **kwargs, | ||||
| 
 | ||||
| ) -> Dict[str, Dict[str, Dict[str, Any]]]: | ||||
| ) -> dict[str, dict[str, dict[str, Any]]]: | ||||
|     ''' | ||||
|     Return symbol info from broker. | ||||
| 
 | ||||
|     ''' | ||||
|     results = [] | ||||
|     results: list[str] = [] | ||||
| 
 | ||||
|     async def search_backend( | ||||
|         brokermod: ModuleType | ||||
|  | @ -143,6 +162,13 @@ async def symbol_search( | |||
| 
 | ||||
|         brokername: str = mod.name | ||||
| 
 | ||||
|         # TODO: figure this the FUCK OUT | ||||
|         # -> ok so obvi in the root actor any async task that's | ||||
|         # spawned outside the main tractor-root-actor task needs to | ||||
|         # call this.. | ||||
|         # await tractor.devx._debug.maybe_init_greenback() | ||||
|         # tractor.pause_from_sync() | ||||
| 
 | ||||
|         async with maybe_spawn_brokerd( | ||||
|             mod.name, | ||||
|             infect_asyncio=getattr( | ||||
|  | @ -162,7 +188,6 @@ async def symbol_search( | |||
|             )) | ||||
| 
 | ||||
|     async with trio.open_nursery() as n: | ||||
| 
 | ||||
|         for mod in brokermods: | ||||
|             n.start_soon(search_backend, mod.name) | ||||
| 
 | ||||
|  | @ -172,11 +197,13 @@ async def symbol_search( | |||
| async def mkt_info( | ||||
|     brokermod: ModuleType, | ||||
|     fqme: str, | ||||
| 
 | ||||
|     **kwargs, | ||||
| 
 | ||||
| ) -> MktPair: | ||||
|     ''' | ||||
|     Return MktPair info from broker including src and dst assets. | ||||
|     Return the `piker.accounting.MktPair` info struct from a given | ||||
|     backend broker tradable src/dst asset pair. | ||||
| 
 | ||||
|     ''' | ||||
|     async with open_cached_client(brokermod.name) as client: | ||||
|  |  | |||
|  | @ -587,7 +587,7 @@ async def get_bars( | |||
|                         data_cs.cancel() | ||||
| 
 | ||||
|                     # spawn new data reset task | ||||
|                     data_cs, reset_done = await nurse.start( | ||||
|                     data_cs, reset_done = await tn.start( | ||||
|                         partial( | ||||
|                             wait_on_data_reset, | ||||
|                             proxy, | ||||
|  | @ -607,11 +607,11 @@ async def get_bars( | |||
|     # such that simultaneous symbol queries don't try data resettingn | ||||
|     # too fast.. | ||||
|     unset_resetter: bool = False | ||||
|     async with trio.open_nursery() as nurse: | ||||
|     async with trio.open_nursery() as tn: | ||||
| 
 | ||||
|         # start history request that we allow | ||||
|         # to run indefinitely until a result is acquired | ||||
|         nurse.start_soon(query) | ||||
|         tn.start_soon(query) | ||||
| 
 | ||||
|         # start history reset loop which waits up to the timeout | ||||
|         # for a result before triggering a data feed reset. | ||||
|  | @ -631,7 +631,7 @@ async def get_bars( | |||
|                 unset_resetter: bool = True | ||||
| 
 | ||||
|             # spawn new data reset task | ||||
|             data_cs, reset_done = await nurse.start( | ||||
|             data_cs, reset_done = await tn.start( | ||||
|                 partial( | ||||
|                     wait_on_data_reset, | ||||
|                     proxy, | ||||
|  | @ -705,7 +705,9 @@ async def _setup_quote_stream( | |||
|         # to_trio, from_aio = trio.open_memory_channel(2**8)  # type: ignore | ||||
|         def teardown(): | ||||
|             ticker.updateEvent.disconnect(push) | ||||
|             log.error(f"Disconnected stream for `{symbol}`") | ||||
|             log.error( | ||||
|                 f'Disconnected stream for `{symbol}`' | ||||
|             ) | ||||
|             client.ib.cancelMktData(contract) | ||||
| 
 | ||||
|             # decouple broadcast mem chan | ||||
|  | @ -761,7 +763,10 @@ async def open_aio_quote_stream( | |||
|     symbol: str, | ||||
|     contract: Contract | None = None, | ||||
| 
 | ||||
| ) -> trio.abc.ReceiveStream: | ||||
| ) -> ( | ||||
|     trio.abc.Channel|  # iface | ||||
|     tractor.to_asyncio.LinkedTaskChannel  # actually | ||||
| ): | ||||
| 
 | ||||
|     from tractor.trionics import broadcast_receiver | ||||
|     global _quote_streams | ||||
|  | @ -778,6 +783,7 @@ async def open_aio_quote_stream( | |||
|             yield from_aio | ||||
|             return | ||||
| 
 | ||||
|     from_aio: tractor.to_asyncio.LinkedTaskChannel | ||||
|     async with tractor.to_asyncio.open_channel_from( | ||||
|         _setup_quote_stream, | ||||
|         symbol=symbol, | ||||
|  | @ -983,17 +989,18 @@ async def stream_quotes( | |||
|         ) | ||||
|         cs: trio.CancelScope | None = None | ||||
|         startup: bool = True | ||||
|         iter_quotes: trio.abc.Channel | ||||
|         while ( | ||||
|             startup | ||||
|             or cs.cancel_called | ||||
|         ): | ||||
|             with trio.CancelScope() as cs: | ||||
|                 async with ( | ||||
|                     trio.open_nursery() as nurse, | ||||
|                     trio.open_nursery() as tn, | ||||
|                     open_aio_quote_stream( | ||||
|                         symbol=sym, | ||||
|                         contract=con, | ||||
|                     ) as stream, | ||||
|                     ) as iter_quotes, | ||||
|                 ): | ||||
|                     # ugh, clear ticks since we've consumed them | ||||
|                     # (ahem, ib_insync is stateful trash) | ||||
|  | @ -1021,9 +1028,9 @@ async def stream_quotes( | |||
|                         await rt_ev.wait() | ||||
|                         cs.cancel()  # cancel called should now be set | ||||
| 
 | ||||
|                     nurse.start_soon(reset_on_feed) | ||||
|                     tn.start_soon(reset_on_feed) | ||||
| 
 | ||||
|                     async with aclosing(stream): | ||||
|                     async with aclosing(iter_quotes): | ||||
|                         # if syminfo.get('no_vlm', False): | ||||
|                         if not init_msg.shm_write_opts['has_vlm']: | ||||
| 
 | ||||
|  | @ -1038,19 +1045,21 @@ async def stream_quotes( | |||
|                             # wait for real volume on feed (trading might be | ||||
|                             # closed) | ||||
|                             while True: | ||||
|                                 ticker = await stream.receive() | ||||
|                                 ticker = await iter_quotes.receive() | ||||
| 
 | ||||
|                                 # for a real volume contract we rait for | ||||
|                                 # the first "real" trade to take place | ||||
|                                 if ( | ||||
|                                     # not calc_price | ||||
|                                     # and not ticker.rtTime | ||||
|                                     not ticker.rtTime | ||||
|                                     False | ||||
|                                     # not ticker.rtTime | ||||
|                                 ): | ||||
|                                     # spin consuming tickers until we | ||||
|                                     # get a real market datum | ||||
|                                     log.debug(f"New unsent ticker: {ticker}") | ||||
|                                     continue | ||||
| 
 | ||||
|                                 else: | ||||
|                                     log.debug("Received first volume tick") | ||||
|                                     # ugh, clear ticks since we've | ||||
|  | @ -1066,13 +1075,18 @@ async def stream_quotes( | |||
|                             log.debug(f"First ticker received {quote}") | ||||
| 
 | ||||
|                         # tell data-layer spawner-caller that live | ||||
|                         # quotes are now streaming. | ||||
|                         # quotes are now active desptie not having | ||||
|                         # necessarily received a first vlm/clearing | ||||
|                         # tick. | ||||
|                         ticker = await iter_quotes.receive() | ||||
|                         feed_is_live.set() | ||||
|                         fqme: str = quote['fqme'] | ||||
|                         await send_chan.send({fqme: quote}) | ||||
| 
 | ||||
|                         # last = time.time() | ||||
|                         async for ticker in stream: | ||||
|                         async for ticker in iter_quotes: | ||||
|                             quote = normalize(ticker) | ||||
|                             fqme = quote['fqme'] | ||||
|                             fqme: str = quote['fqme'] | ||||
|                             await send_chan.send({fqme: quote}) | ||||
| 
 | ||||
|                             # ugh, clear ticks since we've consumed them | ||||
|  |  | |||
|  | @ -544,7 +544,7 @@ async def open_trade_dialog( | |||
|             # to be reloaded. | ||||
|             balances: dict[str, float] = await client.get_balances() | ||||
| 
 | ||||
|             verify_balances( | ||||
|             await verify_balances( | ||||
|                 acnt, | ||||
|                 src_fiat, | ||||
|                 balances, | ||||
|  |  | |||
|  | @ -37,6 +37,12 @@ import tractor | |||
| from async_generator import asynccontextmanager | ||||
| import numpy as np | ||||
| import wrapt | ||||
| 
 | ||||
| # TODO, port to `httpx`/`trio-websocket` whenver i get back to | ||||
| # writing a proper ws-api streamer for this backend (since the data | ||||
| # feeds are free now) as per GH feat-req: | ||||
| # https://github.com/pikers/piker/issues/509 | ||||
| # | ||||
| import asks | ||||
| 
 | ||||
| from ..calc import humanize, percent_change | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue