`.brokers.ib.feed`: better `tractor.to_asyncio` typing and var naming throughout!
							parent
							
								
									5e371f1d73
								
							
						
					
					
						commit
						7de20ebe42
					
				|  | @ -587,7 +587,7 @@ async def get_bars( | ||||||
|                         data_cs.cancel() |                         data_cs.cancel() | ||||||
| 
 | 
 | ||||||
|                     # spawn new data reset task |                     # spawn new data reset task | ||||||
|                     data_cs, reset_done = await nurse.start( |                     data_cs, reset_done = await tn.start( | ||||||
|                         partial( |                         partial( | ||||||
|                             wait_on_data_reset, |                             wait_on_data_reset, | ||||||
|                             proxy, |                             proxy, | ||||||
|  | @ -607,11 +607,11 @@ async def get_bars( | ||||||
|     # such that simultaneous symbol queries don't try data resettingn |     # such that simultaneous symbol queries don't try data resettingn | ||||||
|     # too fast.. |     # too fast.. | ||||||
|     unset_resetter: bool = False |     unset_resetter: bool = False | ||||||
|     async with trio.open_nursery() as nurse: |     async with trio.open_nursery() as tn: | ||||||
| 
 | 
 | ||||||
|         # start history request that we allow |         # start history request that we allow | ||||||
|         # to run indefinitely until a result is acquired |         # to run indefinitely until a result is acquired | ||||||
|         nurse.start_soon(query) |         tn.start_soon(query) | ||||||
| 
 | 
 | ||||||
|         # start history reset loop which waits up to the timeout |         # start history reset loop which waits up to the timeout | ||||||
|         # for a result before triggering a data feed reset. |         # for a result before triggering a data feed reset. | ||||||
|  | @ -631,7 +631,7 @@ async def get_bars( | ||||||
|                 unset_resetter: bool = True |                 unset_resetter: bool = True | ||||||
| 
 | 
 | ||||||
|             # spawn new data reset task |             # spawn new data reset task | ||||||
|             data_cs, reset_done = await nurse.start( |             data_cs, reset_done = await tn.start( | ||||||
|                 partial( |                 partial( | ||||||
|                     wait_on_data_reset, |                     wait_on_data_reset, | ||||||
|                     proxy, |                     proxy, | ||||||
|  | @ -705,7 +705,9 @@ async def _setup_quote_stream( | ||||||
|         # to_trio, from_aio = trio.open_memory_channel(2**8)  # type: ignore |         # to_trio, from_aio = trio.open_memory_channel(2**8)  # type: ignore | ||||||
|         def teardown(): |         def teardown(): | ||||||
|             ticker.updateEvent.disconnect(push) |             ticker.updateEvent.disconnect(push) | ||||||
|             log.error(f"Disconnected stream for `{symbol}`") |             log.error( | ||||||
|  |                 f'Disconnected stream for `{symbol}`' | ||||||
|  |             ) | ||||||
|             client.ib.cancelMktData(contract) |             client.ib.cancelMktData(contract) | ||||||
| 
 | 
 | ||||||
|             # decouple broadcast mem chan |             # decouple broadcast mem chan | ||||||
|  | @ -761,7 +763,10 @@ async def open_aio_quote_stream( | ||||||
|     symbol: str, |     symbol: str, | ||||||
|     contract: Contract | None = None, |     contract: Contract | None = None, | ||||||
| 
 | 
 | ||||||
| ) -> trio.abc.ReceiveStream: | ) -> ( | ||||||
|  |     trio.abc.Channel|  # iface | ||||||
|  |     tractor.to_asyncio.LinkedTaskChannel  # actually | ||||||
|  | ): | ||||||
| 
 | 
 | ||||||
|     from tractor.trionics import broadcast_receiver |     from tractor.trionics import broadcast_receiver | ||||||
|     global _quote_streams |     global _quote_streams | ||||||
|  | @ -778,6 +783,7 @@ async def open_aio_quote_stream( | ||||||
|             yield from_aio |             yield from_aio | ||||||
|             return |             return | ||||||
| 
 | 
 | ||||||
|  |     from_aio: tractor.to_asyncio.LinkedTaskChannel | ||||||
|     async with tractor.to_asyncio.open_channel_from( |     async with tractor.to_asyncio.open_channel_from( | ||||||
|         _setup_quote_stream, |         _setup_quote_stream, | ||||||
|         symbol=symbol, |         symbol=symbol, | ||||||
|  | @ -983,17 +989,18 @@ async def stream_quotes( | ||||||
|         ) |         ) | ||||||
|         cs: trio.CancelScope | None = None |         cs: trio.CancelScope | None = None | ||||||
|         startup: bool = True |         startup: bool = True | ||||||
|  |         iter_quotes: trio.abc.Channel | ||||||
|         while ( |         while ( | ||||||
|             startup |             startup | ||||||
|             or cs.cancel_called |             or cs.cancel_called | ||||||
|         ): |         ): | ||||||
|             with trio.CancelScope() as cs: |             with trio.CancelScope() as cs: | ||||||
|                 async with ( |                 async with ( | ||||||
|                     trio.open_nursery() as nurse, |                     trio.open_nursery() as tn, | ||||||
|                     open_aio_quote_stream( |                     open_aio_quote_stream( | ||||||
|                         symbol=sym, |                         symbol=sym, | ||||||
|                         contract=con, |                         contract=con, | ||||||
|                     ) as stream, |                     ) as iter_quotes, | ||||||
|                 ): |                 ): | ||||||
|                     # ugh, clear ticks since we've consumed them |                     # ugh, clear ticks since we've consumed them | ||||||
|                     # (ahem, ib_insync is stateful trash) |                     # (ahem, ib_insync is stateful trash) | ||||||
|  | @ -1021,9 +1028,9 @@ async def stream_quotes( | ||||||
|                         await rt_ev.wait() |                         await rt_ev.wait() | ||||||
|                         cs.cancel()  # cancel called should now be set |                         cs.cancel()  # cancel called should now be set | ||||||
| 
 | 
 | ||||||
|                     nurse.start_soon(reset_on_feed) |                     tn.start_soon(reset_on_feed) | ||||||
| 
 | 
 | ||||||
|                     async with aclosing(stream): |                     async with aclosing(iter_quotes): | ||||||
|                         # if syminfo.get('no_vlm', False): |                         # if syminfo.get('no_vlm', False): | ||||||
|                         if not init_msg.shm_write_opts['has_vlm']: |                         if not init_msg.shm_write_opts['has_vlm']: | ||||||
| 
 | 
 | ||||||
|  | @ -1038,19 +1045,21 @@ async def stream_quotes( | ||||||
|                             # wait for real volume on feed (trading might be |                             # wait for real volume on feed (trading might be | ||||||
|                             # closed) |                             # closed) | ||||||
|                             while True: |                             while True: | ||||||
|                                 ticker = await stream.receive() |                                 ticker = await iter_quotes.receive() | ||||||
| 
 | 
 | ||||||
|                                 # for a real volume contract we rait for |                                 # for a real volume contract we rait for | ||||||
|                                 # the first "real" trade to take place |                                 # the first "real" trade to take place | ||||||
|                                 if ( |                                 if ( | ||||||
|                                     # not calc_price |                                     # not calc_price | ||||||
|                                     # and not ticker.rtTime |                                     # and not ticker.rtTime | ||||||
|                                     not ticker.rtTime |                                     False | ||||||
|  |                                     # not ticker.rtTime | ||||||
|                                 ): |                                 ): | ||||||
|                                     # spin consuming tickers until we |                                     # spin consuming tickers until we | ||||||
|                                     # get a real market datum |                                     # get a real market datum | ||||||
|                                     log.debug(f"New unsent ticker: {ticker}") |                                     log.debug(f"New unsent ticker: {ticker}") | ||||||
|                                     continue |                                     continue | ||||||
|  | 
 | ||||||
|                                 else: |                                 else: | ||||||
|                                     log.debug("Received first volume tick") |                                     log.debug("Received first volume tick") | ||||||
|                                     # ugh, clear ticks since we've |                                     # ugh, clear ticks since we've | ||||||
|  | @ -1066,13 +1075,18 @@ async def stream_quotes( | ||||||
|                             log.debug(f"First ticker received {quote}") |                             log.debug(f"First ticker received {quote}") | ||||||
| 
 | 
 | ||||||
|                         # tell data-layer spawner-caller that live |                         # tell data-layer spawner-caller that live | ||||||
|                         # quotes are now streaming. |                         # quotes are now active desptie not having | ||||||
|  |                         # necessarily received a first vlm/clearing | ||||||
|  |                         # tick. | ||||||
|  |                         ticker = await iter_quotes.receive() | ||||||
|                         feed_is_live.set() |                         feed_is_live.set() | ||||||
|  |                         fqme: str = quote['fqme'] | ||||||
|  |                         await send_chan.send({fqme: quote}) | ||||||
| 
 | 
 | ||||||
|                         # last = time.time() |                         # last = time.time() | ||||||
|                         async for ticker in stream: |                         async for ticker in iter_quotes: | ||||||
|                             quote = normalize(ticker) |                             quote = normalize(ticker) | ||||||
|                             fqme = quote['fqme'] |                             fqme: str = quote['fqme'] | ||||||
|                             await send_chan.send({fqme: quote}) |                             await send_chan.send({fqme: quote}) | ||||||
| 
 | 
 | ||||||
|                             # ugh, clear ticks since we've consumed them |                             # ugh, clear ticks since we've consumed them | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue