Group bcast errors as `Sampler.bcast_errors`
A new class var `tuple[Exception]` such that the err set can be reffed externally as needed for catching other similar pub-sub/IPC failures in other (related) real-time sub-systems. Also added some now-masked logging for debugging live-feed stream reading issues that should ONLY be used for debugging since they'll greatly degrade HFT perf. Used the new `log.mk_repr()` stuff (that one day we should prolly pull from `modden` as a dep) for pretty console emissions.fix_deribit_hist_queries
							parent
							
								
									767d25a9b9
								
							
						
					
					
						commit
						3e5e704c2f
					
				|  | @ -30,7 +30,6 @@ import time | |||
| from typing import ( | ||||
|     Any, | ||||
|     AsyncIterator, | ||||
|     Callable, | ||||
|     TYPE_CHECKING, | ||||
| ) | ||||
| 
 | ||||
|  | @ -101,6 +100,12 @@ class Sampler: | |||
|     # history loading. | ||||
|     incr_task_cs: trio.CancelScope | None = None | ||||
| 
 | ||||
|     bcast_errors: tuple[Exception] = ( | ||||
|         trio.BrokenResourceError, | ||||
|         trio.ClosedResourceError, | ||||
|         trio.EndOfChannel, | ||||
|     ) | ||||
| 
 | ||||
|     # holds all the ``tractor.Context`` remote subscriptions for | ||||
|     # a particular sample period increment event: all subscribers are | ||||
|     # notified on a step. | ||||
|  | @ -264,14 +269,15 @@ class Sampler: | |||
|         subs: set | ||||
|         last_ts, subs = pair | ||||
| 
 | ||||
|         task = trio.lowlevel.current_task() | ||||
|         log.debug( | ||||
|             f'SUBS {self.subscribers}\n' | ||||
|             f'PAIR {pair}\n' | ||||
|             f'TASK: {task}: {id(task)}\n' | ||||
|             f'broadcasting {period_s} -> {last_ts}\n' | ||||
|             # f'consumers: {subs}' | ||||
|         ) | ||||
|         # NOTE, for debugging pub-sub issues | ||||
|         # task = trio.lowlevel.current_task() | ||||
|         # log.debug( | ||||
|         #     f'AlL-SUBS@{period_s!r}: {self.subscribers}\n' | ||||
|         #     f'PAIR: {pair}\n' | ||||
|         #     f'TASK: {task}: {id(task)}\n' | ||||
|         #     f'broadcasting {period_s} -> {last_ts}\n' | ||||
|         #     f'consumers: {subs}' | ||||
|         # ) | ||||
|         borked: set[MsgStream] = set() | ||||
|         sent: set[MsgStream] = set() | ||||
|         while True: | ||||
|  | @ -288,12 +294,11 @@ class Sampler: | |||
|                         await stream.send(msg) | ||||
|                         sent.add(stream) | ||||
| 
 | ||||
|                     except ( | ||||
|                         trio.BrokenResourceError, | ||||
|                         trio.ClosedResourceError | ||||
|                     ): | ||||
|                     except self.bcast_errors as err: | ||||
|                         log.error( | ||||
|                             f'{stream._ctx.chan.uid} dropped connection' | ||||
|                             f'Connection dropped for IPC ctx\n' | ||||
|                             f'{stream._ctx}\n\n' | ||||
|                             f'Due to {type(err)}' | ||||
|                         ) | ||||
|                         borked.add(stream) | ||||
|                 else: | ||||
|  | @ -579,7 +584,7 @@ async def open_sample_stream( | |||
| 
 | ||||
| 
 | ||||
| async def sample_and_broadcast( | ||||
|     bus: _FeedsBus,  # noqa | ||||
|     bus: _FeedsBus, | ||||
|     rt_shm: ShmArray, | ||||
|     hist_shm: ShmArray, | ||||
|     quote_stream: trio.abc.ReceiveChannel, | ||||
|  | @ -599,9 +604,13 @@ async def sample_and_broadcast( | |||
| 
 | ||||
|     overruns = Counter() | ||||
| 
 | ||||
|     # multiline nested `dict` formatter (since rn quote-msgs are | ||||
|     # just that). | ||||
|     pfmt: Callable[[str], str] = mk_repr() | ||||
|     # NOTE, only used for debugging live-data-feed issues, though | ||||
|     # this should be resolved more correctly in the future using the | ||||
|     # new typed-msgspec feats of `tractor`! | ||||
|     # | ||||
|     # XXX, a multiline nested `dict` formatter (since rn quote-msgs | ||||
|     # are just that). | ||||
|     # pfmt: Callable[[str], str] = mk_repr() | ||||
| 
 | ||||
|     # iterate stream delivered by broker | ||||
|     async for quotes in quote_stream: | ||||
|  | @ -614,7 +623,13 @@ async def sample_and_broadcast( | |||
|         #     f'{pfmt(quotes)}' | ||||
|         # ) | ||||
| 
 | ||||
|         # TODO: `numba` this! | ||||
|         # TODO, | ||||
|         # -[ ] `numba` or `cython`-nize this loop possibly? | ||||
|         #  |_alternatively could we do it in rust somehow by upacking | ||||
|         #    arrow msgs instead of using `msgspec`? | ||||
|         # -[ ] use `msgspec.Struct` support in new typed-msging from | ||||
|         #     `tractor` to ensure only allowed msgs are transmitted? | ||||
|         # | ||||
|         for broker_symbol, quote in quotes.items(): | ||||
|             # TODO: in theory you can send the IPC msg *before* writing | ||||
|             # to the sharedmem array to decrease latency, however, that | ||||
|  | @ -687,17 +702,20 @@ async def sample_and_broadcast( | |||
|             sub_key: str = broker_symbol.lower() | ||||
|             subs: set[Sub] = bus.get_subs(sub_key) | ||||
| 
 | ||||
|             if not subs: | ||||
|                 all_bs_fqmes: list[str] = list( | ||||
|                     bus._subscribers.keys() | ||||
|                 ) | ||||
|                 log.warning( | ||||
|                     f'No subscribers for {brokername!r} live-quote ??\n' | ||||
|                     f'broker_symbol: {broker_symbol}\n\n' | ||||
|             # TODO, figure out how to make this useful whilst | ||||
|             # incoporating feed "pausing" .. | ||||
|             # | ||||
|             # if not subs: | ||||
|             #     all_bs_fqmes: list[str] = list( | ||||
|             #         bus._subscribers.keys() | ||||
|             #     ) | ||||
|             #     log.warning( | ||||
|             #         f'No subscribers for {brokername!r} live-quote ??\n' | ||||
|             #         f'broker_symbol: {broker_symbol}\n\n' | ||||
| 
 | ||||
|                     f'Maybe the backend-sys symbol does not match one of,\n' | ||||
|                     f'{pfmt(all_bs_fqmes)}\n' | ||||
|                 ) | ||||
|             #         f'Maybe the backend-sys symbol does not match one of,\n' | ||||
|             #         f'{pfmt(all_bs_fqmes)}\n' | ||||
|             #     ) | ||||
| 
 | ||||
|             # NOTE: by default the broker backend doesn't append | ||||
|             # it's own "name" into the fqme schema (but maybe it | ||||
|  | @ -796,7 +814,6 @@ async def sample_and_broadcast( | |||
| 
 | ||||
| 
 | ||||
| async def uniform_rate_send( | ||||
| 
 | ||||
|     rate: float, | ||||
|     quote_stream: trio.abc.ReceiveChannel, | ||||
|     stream: MsgStream, | ||||
|  |  | |||
|  | @ -90,6 +90,8 @@ def colorize_json( | |||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| # TODO, eventually defer to the version in `modden` once | ||||
| # it becomes a dep! | ||||
| def mk_repr( | ||||
|     **repr_kws, | ||||
| ) -> Callable[[str], str]: | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue