Compare commits
	
		
			2 Commits 
		
	
	
		
			e87af090e8
			...
			134889b199
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 134889b199 | |
|  | 393723375c | 
|  | @ -273,7 +273,7 @@ async def _reconnect_forever( | |||
|                 nobsws._connected.set() | ||||
|                 await trio.sleep_forever() | ||||
|         except HandshakeError: | ||||
|             log.exception('Retrying connection') | ||||
|             log.exception(f'Retrying connection') | ||||
| 
 | ||||
|         # ws & nursery block ends | ||||
| 
 | ||||
|  | @ -359,8 +359,8 @@ async def open_autorecon_ws( | |||
| 
 | ||||
| 
 | ||||
| ''' | ||||
| JSONRPC response-request style machinery for transparent multiplexing | ||||
| of msgs over a `NoBsWs`. | ||||
| JSONRPC response-request style machinery for transparent multiplexing of msgs | ||||
| over a NoBsWs. | ||||
| 
 | ||||
| ''' | ||||
| 
 | ||||
|  | @ -377,82 +377,43 @@ async def open_jsonrpc_session( | |||
|     url: str, | ||||
|     start_id: int = 0, | ||||
|     response_type: type = JSONRPCResult, | ||||
|     msg_recv_timeout: float = float('inf'), | ||||
|     # ^NOTE, since only `deribit` is using this jsonrpc stuff atm | ||||
|     # and options mkts are generally "slow moving".. | ||||
|     # | ||||
|     # FURTHER if we break the underlying ws connection then since we | ||||
|     # don't pass a `fixture` to the task that manages `NoBsWs`, i.e. | ||||
|     # `_reconnect_forever()`, the jsonrpc "transport pipe" get's | ||||
|     # broken and never restored with wtv init sequence is required to | ||||
|     # re-establish a working req-resp session. | ||||
| 
 | ||||
|     request_type: Optional[type] = None, | ||||
|     request_hook: Optional[Callable] = None, | ||||
|     error_hook: Optional[Callable] = None, | ||||
| ) -> Callable[[str, dict], dict]: | ||||
|     ''' | ||||
|     Init a json-RPC-over-websocket connection to the provided `url`. | ||||
| 
 | ||||
|     A `json_rpc: Callable[[str, dict], dict` is delivered to the | ||||
|     caller for sending requests and a bg-`trio.Task` handles | ||||
|     processing of response msgs including error reporting/raising in | ||||
|     the parent/caller task. | ||||
| 
 | ||||
|     ''' | ||||
|     # NOTE, store all request msgs so we can raise errors on the | ||||
|     # caller side! | ||||
|     req_msgs: dict[int, dict] = {} | ||||
| 
 | ||||
|     async with ( | ||||
|         trio.open_nursery() as tn, | ||||
|         open_autorecon_ws( | ||||
|             url=url, | ||||
|             msg_recv_timeout=msg_recv_timeout, | ||||
|         ) as ws | ||||
|         trio.open_nursery() as n, | ||||
|         open_autorecon_ws(url) as ws | ||||
|     ): | ||||
|         rpc_id: Iterable[int] = count(start_id) | ||||
|         rpc_id: Iterable = count(start_id) | ||||
|         rpc_results: dict[int, dict] = {} | ||||
| 
 | ||||
|         async def json_rpc( | ||||
|             method: str, | ||||
|             params: dict, | ||||
|         ) -> dict: | ||||
|         async def json_rpc(method: str, params: dict) -> dict: | ||||
|             ''' | ||||
|             perform a json rpc call and wait for the result, raise exception in | ||||
|             case of error field present on response | ||||
|             ''' | ||||
|             nonlocal req_msgs | ||||
| 
 | ||||
|             req_id: int = next(rpc_id) | ||||
|             msg = { | ||||
|                 'jsonrpc': '2.0', | ||||
|                 'id': req_id, | ||||
|                 'id': next(rpc_id), | ||||
|                 'method': method, | ||||
|                 'params': params | ||||
|             } | ||||
|             _id = msg['id'] | ||||
| 
 | ||||
|             result = rpc_results[_id] = { | ||||
|             rpc_results[_id] = { | ||||
|                 'result': None, | ||||
|                 'error': None, | ||||
|                 'event': trio.Event(),  # signal caller resp arrived | ||||
|                 'event': trio.Event() | ||||
|             } | ||||
|             req_msgs[_id] = msg | ||||
| 
 | ||||
|             await ws.send_msg(msg) | ||||
| 
 | ||||
|             # wait for reponse before unblocking requester code | ||||
|             await rpc_results[_id]['event'].wait() | ||||
| 
 | ||||
|             if (maybe_result := result['result']): | ||||
|                 ret = maybe_result | ||||
|                 del rpc_results[_id] | ||||
|             ret = rpc_results[_id]['result'] | ||||
| 
 | ||||
|             else: | ||||
|                 err = result['error'] | ||||
|                 raise Exception( | ||||
|                     f'JSONRPC request failed\n' | ||||
|                     f'req: {msg}\n' | ||||
|                     f'resp: {err}\n' | ||||
|                 ) | ||||
|             del rpc_results[_id] | ||||
| 
 | ||||
|             if ret.error is not None: | ||||
|                 raise Exception(json.dumps(ret.error, indent=4)) | ||||
|  | @ -467,7 +428,6 @@ async def open_jsonrpc_session( | |||
|             the server side. | ||||
| 
 | ||||
|             ''' | ||||
|             nonlocal req_msgs | ||||
|             async for msg in ws: | ||||
|                 match msg: | ||||
|                     case { | ||||
|  | @ -491,28 +451,19 @@ async def open_jsonrpc_session( | |||
|                         'params': _, | ||||
|                     }: | ||||
|                         log.debug(f'Recieved\n{msg}') | ||||
|                         if request_hook: | ||||
|                             await request_hook(request_type(**msg)) | ||||
| 
 | ||||
|                     case { | ||||
|                         'error': error | ||||
|                     }: | ||||
|                         # retreive orig request msg, set error | ||||
|                         # response in original "result" msg, | ||||
|                         # THEN FINALLY set the event to signal caller | ||||
|                         # to raise the error in the parent task. | ||||
|                         req_id: int = error['id'] | ||||
|                         req_msg: dict = req_msgs[req_id] | ||||
|                         result: dict = rpc_results[req_id] | ||||
|                         result['error'] = error | ||||
|                         result['event'].set() | ||||
|                         log.error( | ||||
|                             f'JSONRPC request failed\n' | ||||
|                             f'req: {req_msg}\n' | ||||
|                             f'resp: {error}\n' | ||||
|                         ) | ||||
|                         log.warning(f'Recieved\n{error}') | ||||
|                         if error_hook: | ||||
|                             await error_hook(response_type(**msg)) | ||||
| 
 | ||||
|                     case _: | ||||
|                         log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') | ||||
| 
 | ||||
|         tn.start_soon(recv_task) | ||||
|         n.start_soon(recv_task) | ||||
|         yield json_rpc | ||||
|         tn.cancel_scope.cancel() | ||||
|         n.cancel_scope.cancel() | ||||
|  |  | |||
|  | @ -386,8 +386,6 @@ def ldshm( | |||
|             open_annot_ctl() as actl, | ||||
|         ): | ||||
|             shm_df: pl.DataFrame | None = None | ||||
|             tf2aids: dict[float, dict] = {} | ||||
| 
 | ||||
|             for ( | ||||
|                 shmfile, | ||||
|                 shm, | ||||
|  | @ -528,17 +526,16 @@ def ldshm( | |||
|                             new_df, | ||||
|                             step_gaps, | ||||
|                         ) | ||||
| 
 | ||||
|                         # last chance manual overwrites in REPL | ||||
|                         # await tractor.pause() | ||||
|                         await tractor.pause() | ||||
|                         assert aids | ||||
|                         tf2aids[period_s] = aids | ||||
| 
 | ||||
|                 else: | ||||
|                     # allow interaction even when no ts problems. | ||||
|                     assert not diff | ||||
|                     await tractor.pause() | ||||
|                     # assert not diff | ||||
| 
 | ||||
|             await tractor.pause() | ||||
|             log.info('Exiting TSP shm anal-izer!') | ||||
| 
 | ||||
|             if shm_df is None: | ||||
|                 log.error( | ||||
|  |  | |||
|  | @ -161,13 +161,7 @@ class NativeStorageClient: | |||
| 
 | ||||
|     def index_files(self): | ||||
|         for path in self._datadir.iterdir(): | ||||
|             if ( | ||||
|                 path.is_dir() | ||||
|                 or | ||||
|                 '.parquet' not in str(path) | ||||
|                 # or | ||||
|                 # path.name in {'borked', 'expired',} | ||||
|             ): | ||||
|             if path.name in {'borked', 'expired',}: | ||||
|                 continue | ||||
| 
 | ||||
|             key: str = path.name.rstrip('.parquet') | ||||
|  |  | |||
|  | @ -44,10 +44,8 @@ import trio | |||
| from trio_typing import TaskStatus | ||||
| import tractor | ||||
| from pendulum import ( | ||||
|     Interval, | ||||
|     DateTime, | ||||
|     Duration, | ||||
|     duration as mk_duration, | ||||
|     from_timestamp, | ||||
| ) | ||||
| import numpy as np | ||||
|  | @ -216,8 +214,7 @@ async def maybe_fill_null_segments( | |||
|         # pair, immediately stop backfilling? | ||||
|         if ( | ||||
|             start_dt | ||||
|             and | ||||
|             end_dt < start_dt | ||||
|             and end_dt < start_dt | ||||
|         ): | ||||
|             await tractor.pause() | ||||
|             break | ||||
|  | @ -265,7 +262,6 @@ async def maybe_fill_null_segments( | |||
|         except tractor.ContextCancelled: | ||||
|             # log.exception | ||||
|             await tractor.pause() | ||||
|             raise | ||||
| 
 | ||||
|     null_segs_detected.set() | ||||
|     # RECHECK for more null-gaps | ||||
|  | @ -353,7 +349,7 @@ async def maybe_fill_null_segments( | |||
| 
 | ||||
| async def start_backfill( | ||||
|     get_hist, | ||||
|     def_frame_duration: Duration, | ||||
|     frame_types: dict[str, Duration] | None, | ||||
|     mod: ModuleType, | ||||
|     mkt: MktPair, | ||||
|     shm: ShmArray, | ||||
|  | @ -383,23 +379,22 @@ async def start_backfill( | |||
|         update_start_on_prepend: bool = False | ||||
|         if backfill_until_dt is None: | ||||
| 
 | ||||
|             # TODO: per-provider default history-durations? | ||||
|             # -[ ] inside the `open_history_client()` config allow | ||||
|             #    declaring the history duration limits instead of | ||||
|             #    guessing and/or applying the same limits to all? | ||||
|             # | ||||
|             # -[ ] allow declaring (default) per-provider backfill | ||||
|             #     limits inside a [storage] sub-section in conf.toml? | ||||
|             # | ||||
|             # NOTE, when no tsdb "last datum" is provided, we just | ||||
|             # load some near-term history by presuming a "decently | ||||
|             # large" 60s duration limit and a much shorter 1s range. | ||||
|             # TODO: drop this right and just expose the backfill | ||||
|             # limits inside a [storage] section in conf.toml? | ||||
|             # when no tsdb "last datum" is provided, we just load | ||||
|             # some near-term history. | ||||
|             # periods = { | ||||
|             #     1: {'days': 1}, | ||||
|             #     60: {'days': 14}, | ||||
|             # } | ||||
| 
 | ||||
|             # do a decently sized backfill and load it into storage. | ||||
|             periods = { | ||||
|                 1: {'days': 2}, | ||||
|                 60: {'years': 6}, | ||||
|             } | ||||
|             period_duration: int = periods[timeframe] | ||||
|             update_start_on_prepend: bool = True | ||||
|             update_start_on_prepend = True | ||||
| 
 | ||||
|             # NOTE: manually set the "latest" datetime which we intend to | ||||
|             # backfill history "until" so as to adhere to the history | ||||
|  | @ -421,6 +416,7 @@ async def start_backfill( | |||
|                 f'backfill_until_dt: {backfill_until_dt}\n' | ||||
|                 f'last_start_dt: {last_start_dt}\n' | ||||
|             ) | ||||
| 
 | ||||
|             try: | ||||
|                 ( | ||||
|                     array, | ||||
|  | @ -430,114 +426,71 @@ async def start_backfill( | |||
|                     timeframe, | ||||
|                     end_dt=last_start_dt, | ||||
|                 ) | ||||
| 
 | ||||
|             except NoData as _daterr: | ||||
|                 orig_last_start_dt: datetime = last_start_dt | ||||
|                 gap_report: str = ( | ||||
|                     f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n' | ||||
|                     f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n' | ||||
|                     f'last_start_dt: {orig_last_start_dt}\n\n' | ||||
|                     f'bf_until: {backfill_until_dt}\n' | ||||
|                 ) | ||||
|                 # EMPTY FRAME signal with 3 (likely) causes: | ||||
|                 # | ||||
|                 # 1. range contains legit gap in venue history | ||||
|                 # 2. history actually (edge case) **began** at the | ||||
|                 #    value `last_start_dt` | ||||
|                 # 3. some other unknown error (ib blocking the | ||||
|                 #    history-query bc they don't want you seeing how | ||||
|                 #    they cucked all the tinas.. like with options | ||||
|                 #    hist) | ||||
|                 # | ||||
|                 if def_frame_duration: | ||||
|                     # decrement by a duration's (frame) worth of time | ||||
|                     # as maybe indicated by the backend to see if we | ||||
|                     # can get older data before this possible | ||||
|                     # "history gap". | ||||
|                     last_start_dt: datetime = last_start_dt.subtract( | ||||
|                         seconds=def_frame_duration.total_seconds() | ||||
|                 # 3 cases: | ||||
|                 # - frame in the middle of a legit venue gap | ||||
|                 # - history actually began at the `last_start_dt` | ||||
|                 # - some other unknown error (ib blocking the | ||||
|                 #   history bc they don't want you seeing how they | ||||
|                 #   cucked all the tinas..) | ||||
|                 if dur := frame_types.get(timeframe): | ||||
|                     # decrement by a frame's worth of duration and | ||||
|                     # retry a few times. | ||||
|                     last_start_dt.subtract( | ||||
|                         seconds=dur.total_seconds() | ||||
|                     ) | ||||
|                     gap_report += ( | ||||
|                         f'Decrementing `end_dt` and retrying with,\n' | ||||
|                         f'def_frame_duration: {def_frame_duration}\n' | ||||
|                         f'(new) last_start_dt: {last_start_dt}\n' | ||||
|                     log.warning( | ||||
|                         f'{mod.name} -> EMPTY FRAME for end_dt?\n' | ||||
|                         f'tf@fqme: {timeframe}@{mkt.fqme}\n' | ||||
|                         'bf_until <- last_start_dt:\n' | ||||
|                         f'{backfill_until_dt} <- {last_start_dt}\n' | ||||
|                         f'Decrementing `end_dt` by {dur} and retry..\n' | ||||
|                     ) | ||||
|                     log.warning(gap_report) | ||||
|                     # skip writing to shm/tsdb and try the next | ||||
|                     # duration's worth of prior history. | ||||
|                     continue | ||||
| 
 | ||||
|                 else: | ||||
|                     # await tractor.pause() | ||||
|                     raise DataUnavailable(gap_report) | ||||
| 
 | ||||
|             # broker says there never was or is no more history to pull | ||||
|             except DataUnavailable as due: | ||||
|                 message: str = due.args[0] | ||||
|             except DataUnavailable: | ||||
|                 log.warning( | ||||
|                     f'Provider {mod.name!r} halted backfill due to,\n\n' | ||||
| 
 | ||||
|                     f'{message}\n' | ||||
| 
 | ||||
|                     f'fqme: {mkt.fqme}\n' | ||||
|                     f'timeframe: {timeframe}\n' | ||||
|                     f'last_start_dt: {last_start_dt}\n' | ||||
|                     f'bf_until: {backfill_until_dt}\n' | ||||
|                     f'NO-MORE-DATA in range?\n' | ||||
|                     f'`{mod.name}` halted history:\n' | ||||
|                     f'tf@fqme: {timeframe}@{mkt.fqme}\n' | ||||
|                     'bf_until <- last_start_dt:\n' | ||||
|                     f'{backfill_until_dt} <- {last_start_dt}\n' | ||||
|                 ) | ||||
|                 # UGH: what's a better way? | ||||
|                 # TODO: backends are responsible for being correct on | ||||
|                 # this right!? | ||||
|                 # -[ ] in the `ib` case we could maybe offer some way | ||||
|                 #     to halt the request loop until the condition is | ||||
|                 #     resolved or should the backend be entirely in | ||||
|                 #     charge of solving such faults? yes, right? | ||||
| 
 | ||||
|                 # ugh, what's a better way? | ||||
|                 # TODO: fwiw, we probably want a way to signal a throttle | ||||
|                 # condition (eg. with ib) so that we can halt the | ||||
|                 # request loop until the condition is resolved? | ||||
|                 if timeframe > 1: | ||||
|                     await tractor.pause() | ||||
|                 return | ||||
| 
 | ||||
|             time: np.ndarray = array['time'] | ||||
|             assert ( | ||||
|                 time[0] | ||||
|                 array['time'][0] | ||||
|                 == | ||||
|                 next_start_dt.timestamp() | ||||
|             ) | ||||
| 
 | ||||
|             assert time[-1] == next_end_dt.timestamp() | ||||
| 
 | ||||
|             expected_dur: Interval = last_start_dt - next_start_dt | ||||
|             diff = last_start_dt - next_start_dt | ||||
|             frame_time_diff_s = diff.seconds | ||||
| 
 | ||||
|             # frame's worth of sample-period-steps, in seconds | ||||
|             frame_size_s: float = len(array) * timeframe | ||||
|             recv_frame_dur: Duration = ( | ||||
|                 from_timestamp(array[-1]['time']) | ||||
|                 - | ||||
|                 from_timestamp(array[0]['time']) | ||||
|             ) | ||||
|             if ( | ||||
|                 (lt_frame := (recv_frame_dur < expected_dur)) | ||||
|                 or | ||||
|                 (null_frame := (frame_size_s == 0)) | ||||
|                 # ^XXX, should NEVER hit now! | ||||
|             ): | ||||
|             expected_frame_size_s: float = frame_size_s + timeframe | ||||
|             if frame_time_diff_s > expected_frame_size_s: | ||||
| 
 | ||||
|                 # XXX: query result includes a start point prior to our | ||||
|                 # expected "frame size" and thus is likely some kind of | ||||
|                 # history gap (eg. market closed period, outage, etc.) | ||||
|                 # so just report it to console for now. | ||||
|                 if lt_frame: | ||||
|                     reason = 'Possible GAP (or first-datum)' | ||||
|                 else: | ||||
|                     assert null_frame | ||||
|                     reason = 'NULL-FRAME' | ||||
| 
 | ||||
|                 missing_dur: Interval = expected_dur.end - recv_frame_dur.end | ||||
|                 log.warning( | ||||
|                     f'{timeframe}s-series {reason} detected!\n' | ||||
|                     f'fqme: {mkt.fqme}\n' | ||||
|                     f'last_start_dt: {last_start_dt}\n\n' | ||||
|                     f'recv interval: {recv_frame_dur}\n' | ||||
|                     f'expected interval: {expected_dur}\n\n' | ||||
| 
 | ||||
|                     f'Missing duration of history of {missing_dur.in_words()!r}\n' | ||||
|                     f'{missing_dur}\n' | ||||
|                     'GAP DETECTED:\n' | ||||
|                     f'last_start_dt: {last_start_dt}\n' | ||||
|                     f'diff: {diff}\n' | ||||
|                     f'frame_time_diff_s: {frame_time_diff_s}\n' | ||||
|                 ) | ||||
|                 # await tractor.pause() | ||||
| 
 | ||||
|             to_push = diff_history( | ||||
|                 array, | ||||
|  | @ -612,27 +565,22 @@ async def start_backfill( | |||
|             # long-term storage. | ||||
|             if ( | ||||
|                 storage is not None | ||||
|                 and | ||||
|                 write_tsdb | ||||
|                 and write_tsdb | ||||
|             ): | ||||
|                 log.info( | ||||
|                     f'Writing {ln} frame to storage:\n' | ||||
|                     f'{next_start_dt} -> {last_start_dt}' | ||||
|                 ) | ||||
| 
 | ||||
|                 # NOTE, always drop the src asset token for | ||||
|                 # always drop the src asset token for | ||||
|                 # non-currency-pair like market types (for now) | ||||
|                 # | ||||
|                 # THAT IS, for now our table key schema is NOT | ||||
|                 # including the dst[/src] source asset token. SO, | ||||
|                 # 'tsla.nasdaq.ib' over 'tsla/usd.nasdaq.ib' for | ||||
|                 # historical reasons ONLY. | ||||
|                 if mkt.dst.atype not in { | ||||
|                     'crypto', | ||||
|                     'crypto_currency', | ||||
|                     'fiat',  # a "forex pair" | ||||
|                     'perpetual_future',  # stupid "perps" from cex land | ||||
|                 }: | ||||
|                     # for now, our table key schema is not including | ||||
|                     # the dst[/src] source asset token. | ||||
|                     col_sym_key: str = mkt.get_fqme( | ||||
|                         delim_char='', | ||||
|                         without_src=True, | ||||
|  | @ -737,7 +685,7 @@ async def back_load_from_tsdb( | |||
|         last_tsdb_dt | ||||
|         and latest_start_dt | ||||
|     ): | ||||
|         backfilled_size_s: Duration = ( | ||||
|         backfilled_size_s = ( | ||||
|             latest_start_dt - last_tsdb_dt | ||||
|         ).seconds | ||||
|         # if the shm buffer len is not large enough to contain | ||||
|  | @ -960,8 +908,6 @@ async def tsdb_backfill( | |||
|             f'{pformat(config)}\n' | ||||
|         ) | ||||
| 
 | ||||
|         # concurrently load the provider's most-recent-frame AND any | ||||
|         # pre-existing tsdb history already saved in `piker` storage. | ||||
|         dt_eps: list[DateTime, DateTime] = [] | ||||
|         async with trio.open_nursery() as tn: | ||||
|             tn.start_soon( | ||||
|  | @ -972,6 +918,7 @@ async def tsdb_backfill( | |||
|                 timeframe, | ||||
|                 config, | ||||
|             ) | ||||
| 
 | ||||
|             tsdb_entry: tuple = await load_tsdb_hist( | ||||
|                 storage, | ||||
|                 mkt, | ||||
|  | @ -1000,25 +947,6 @@ async def tsdb_backfill( | |||
|                 mr_end_dt, | ||||
|             ) = dt_eps | ||||
| 
 | ||||
|             first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds | ||||
|             calced_frame_size: Duration = mk_duration( | ||||
|                 seconds=first_frame_dur_s, | ||||
|             ) | ||||
|             # NOTE, attempt to use the backend declared default frame | ||||
|             # sizing (as allowed by their time-series query APIs) and | ||||
|             # if not provided try to construct a default from the | ||||
|             # first frame received above. | ||||
|             def_frame_durs: dict[ | ||||
|                 int, | ||||
|                 Duration, | ||||
|             ]|None = config.get('frame_types', None) | ||||
|             if def_frame_durs: | ||||
|                 def_frame_size: Duration = def_frame_durs[timeframe] | ||||
|                 assert def_frame_size == calced_frame_size | ||||
|             else: | ||||
|                 # use what we calced from first frame above. | ||||
|                 def_frame_size = calced_frame_size | ||||
| 
 | ||||
|             # NOTE: when there's no offline data, there's 2 cases: | ||||
|             # - data backend doesn't support timeframe/sample | ||||
|             #   period (in which case `dt_eps` should be `None` and | ||||
|  | @ -1049,7 +977,7 @@ async def tsdb_backfill( | |||
|                     partial( | ||||
|                         start_backfill, | ||||
|                         get_hist=get_hist, | ||||
|                         def_frame_duration=def_frame_size, | ||||
|                         frame_types=config.get('frame_types', None), | ||||
|                         mod=mod, | ||||
|                         mkt=mkt, | ||||
|                         shm=shm, | ||||
|  |  | |||
|  | @ -616,18 +616,6 @@ def detect_price_gaps( | |||
|     # ]) | ||||
|     ... | ||||
| 
 | ||||
| # TODO: probably just use the null_segs impl above? | ||||
| def detect_vlm_gaps( | ||||
|     df: pl.DataFrame, | ||||
|     col: str = 'volume', | ||||
| 
 | ||||
| ) -> pl.DataFrame: | ||||
| 
 | ||||
|     vnull: pl.DataFrame = w_dts.filter( | ||||
|         pl.col(col) == 0 | ||||
|     ) | ||||
|     return vnull | ||||
| 
 | ||||
| 
 | ||||
| def dedupe( | ||||
|     src_df: pl.DataFrame, | ||||
|  | @ -638,6 +626,7 @@ def dedupe( | |||
| 
 | ||||
| ) -> tuple[ | ||||
|     pl.DataFrame,  # with dts | ||||
|     pl.DataFrame,  # gaps | ||||
|     pl.DataFrame,  # with deduplicated dts (aka gap/repeat removal) | ||||
|     int,  # len diff between input and deduped | ||||
| ]: | ||||
|  | @ -650,22 +639,19 @@ def dedupe( | |||
|     ''' | ||||
|     wdts: pl.DataFrame = with_dts(src_df) | ||||
| 
 | ||||
|     deduped = wdts | ||||
| 
 | ||||
|     # remove duplicated datetime samples/sections | ||||
|     deduped: pl.DataFrame = wdts.unique( | ||||
|         # subset=['dt'], | ||||
|         subset=['time'], | ||||
|         maintain_order=True, | ||||
|     ) | ||||
| 
 | ||||
|     # maybe sort on any time field | ||||
|     if sort: | ||||
|         deduped = deduped.sort(by='time') | ||||
|         wdts = wdts.sort(by='time') | ||||
|         # TODO: detect out-of-order segments which were corrected! | ||||
|         # -[ ] report in log msg | ||||
|         # -[ ] possibly return segment sections which were moved? | ||||
| 
 | ||||
|     # remove duplicated datetime samples/sections | ||||
|     deduped: pl.DataFrame = wdts.unique( | ||||
|         subset=['dt'], | ||||
|         maintain_order=True, | ||||
|     ) | ||||
| 
 | ||||
|     diff: int = ( | ||||
|         wdts.height | ||||
|         - | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue