Compare commits
	
		
			No commits in common. "c53071e43a291251924f42cba84d8e01672b729f" and "96f5a8abb878c391c875e6c0b3853e397c8d50d2" have entirely different histories. 
		
	
	
		
			c53071e43a
			...
			96f5a8abb8
		
	
		|  | @ -195,8 +195,9 @@ async def open_piker_runtime( | ||||||
| 
 | 
 | ||||||
| ) -> Optional[tractor._portal.Portal]: | ) -> Optional[tractor._portal.Portal]: | ||||||
|     ''' |     ''' | ||||||
|     Start a piker actor who's runtime will automatically sync with |     Start a piker actor who's runtime will automatically | ||||||
|     existing piker actors on the local link based on configuration. |     sync with existing piker actors in local network | ||||||
|  |     based on configuration. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     global _services |     global _services | ||||||
|  |  | ||||||
|  | @ -412,7 +412,7 @@ class Client: | ||||||
|             # ``end_dt`` which exceeds the ``duration``, |             # ``end_dt`` which exceeds the ``duration``, | ||||||
|             # - a timeout occurred in which case insync internals return |             # - a timeout occurred in which case insync internals return | ||||||
|             # an empty list thing with bars.clear()... |             # an empty list thing with bars.clear()... | ||||||
|             return [], np.empty(0), dt_duration |             return [], np.empty(0) | ||||||
|             # TODO: we could maybe raise ``NoData`` instead if we |             # TODO: we could maybe raise ``NoData`` instead if we | ||||||
|             # rewrite the method in the first case? right now there's no |             # rewrite the method in the first case? right now there's no | ||||||
|             # way to detect a timeout. |             # way to detect a timeout. | ||||||
|  | @ -483,7 +483,7 @@ class Client: | ||||||
|         self, |         self, | ||||||
|         pattern: str, |         pattern: str, | ||||||
|         # how many contracts to search "up to" |         # how many contracts to search "up to" | ||||||
|         upto: int = 16, |         upto: int = 6, | ||||||
|         asdicts: bool = True, |         asdicts: bool = True, | ||||||
| 
 | 
 | ||||||
|     ) -> dict[str, ContractDetails]: |     ) -> dict[str, ContractDetails]: | ||||||
|  | @ -518,16 +518,6 @@ class Client: | ||||||
| 
 | 
 | ||||||
|                 exch = tract.exchange |                 exch = tract.exchange | ||||||
|                 if exch not in _exch_skip_list: |                 if exch not in _exch_skip_list: | ||||||
| 
 |  | ||||||
|                     # try to lookup any contracts from our adhoc set |  | ||||||
|                     # since often the exchange/venue is named slightly |  | ||||||
|                     # different (eg. BRR.CMECRYPTO` instead of just |  | ||||||
|                     # `.CME`). |  | ||||||
|                     info = _adhoc_symbol_map.get(sym) |  | ||||||
|                     if info: |  | ||||||
|                         con_kwargs, bars_kwargs = info |  | ||||||
|                         exch = con_kwargs['exchange'] |  | ||||||
| 
 |  | ||||||
|                     # try get all possible contracts for symbol as per, |                     # try get all possible contracts for symbol as per, | ||||||
|                     # https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut |                     # https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut | ||||||
|                     con = ibis.Future( |                     con = ibis.Future( | ||||||
|  | @ -1096,7 +1086,6 @@ async def load_aio_clients( | ||||||
|     # retry a few times to get the client going.. |     # retry a few times to get the client going.. | ||||||
|     connect_retries: int = 3, |     connect_retries: int = 3, | ||||||
|     connect_timeout: float = 0.5, |     connect_timeout: float = 0.5, | ||||||
|     disconnect_on_exit: bool = True, |  | ||||||
| 
 | 
 | ||||||
| ) -> dict[str, Client]: | ) -> dict[str, Client]: | ||||||
|     ''' |     ''' | ||||||
|  | @ -1238,11 +1227,10 @@ async def load_aio_clients( | ||||||
|     finally: |     finally: | ||||||
|         # TODO: for re-scans we'll want to not teardown clients which |         # TODO: for re-scans we'll want to not teardown clients which | ||||||
|         # are up and stable right? |         # are up and stable right? | ||||||
|         if disconnect_on_exit: |         for acct, client in _accounts2clients.items(): | ||||||
|             for acct, client in _accounts2clients.items(): |             log.info(f'Disconnecting {acct}@{client}') | ||||||
|                 log.info(f'Disconnecting {acct}@{client}') |             client.ib.disconnect() | ||||||
|                 client.ib.disconnect() |             _client_cache.pop((host, port), None) | ||||||
|                 _client_cache.pop((host, port), None) |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def load_clients_for_trio( | async def load_clients_for_trio( | ||||||
|  |  | ||||||
|  | @ -305,7 +305,7 @@ async def update_ledger_from_api_trades( | ||||||
|         entry['listingExchange'] = pexch |         entry['listingExchange'] = pexch | ||||||
| 
 | 
 | ||||||
|     conf = get_config() |     conf = get_config() | ||||||
|     entries = api_trades_to_ledger_entries( |     entries = trades_to_ledger_entries( | ||||||
|         conf['accounts'].inverse, |         conf['accounts'].inverse, | ||||||
|         trade_entries, |         trade_entries, | ||||||
|     ) |     ) | ||||||
|  | @ -371,8 +371,8 @@ async def update_and_audit_msgs( | ||||||
|                     else: |                     else: | ||||||
|                         entry = f'split_ratio = 1/{int(reverse_split_ratio)}' |                         entry = f'split_ratio = 1/{int(reverse_split_ratio)}' | ||||||
| 
 | 
 | ||||||
|                     raise ValueError( |                     # raise ValueError( | ||||||
|                     # log.error( |                     log.error( | ||||||
|                         f'POSITION MISMATCH ib <-> piker ledger:\n' |                         f'POSITION MISMATCH ib <-> piker ledger:\n' | ||||||
|                         f'ib: {ibppmsg}\n' |                         f'ib: {ibppmsg}\n' | ||||||
|                         f'piker: {msg}\n' |                         f'piker: {msg}\n' | ||||||
|  | @ -1123,16 +1123,18 @@ def norm_trade_records( | ||||||
|             continue |             continue | ||||||
| 
 | 
 | ||||||
|         # timestamping is way different in API records |         # timestamping is way different in API records | ||||||
|         dtstr = record.get('datetime') |  | ||||||
|         date = record.get('date') |         date = record.get('date') | ||||||
|         flex_dtstr = record.get('dateTime') |         if not date: | ||||||
| 
 |  | ||||||
|         if dtstr or date: |  | ||||||
|             dt = pendulum.parse(dtstr or date) |  | ||||||
| 
 |  | ||||||
|         elif flex_dtstr: |  | ||||||
|             # probably a flex record with a wonky non-std timestamp.. |             # probably a flex record with a wonky non-std timestamp.. | ||||||
|             dt = parse_flex_dt(record['dateTime']) |             date, ts = record['dateTime'].split(';') | ||||||
|  |             dt = pendulum.parse(date) | ||||||
|  |             ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}' | ||||||
|  |             tsdt = pendulum.parse(ts) | ||||||
|  |             dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second) | ||||||
|  | 
 | ||||||
|  |         else: | ||||||
|  |             # epoch_dt = pendulum.from_timestamp(record.get('time')) | ||||||
|  |             dt = pendulum.parse(date) | ||||||
| 
 | 
 | ||||||
|         # special handling of symbol extraction from |         # special handling of symbol extraction from | ||||||
|         # flex records using some ad-hoc schema parsing. |         # flex records using some ad-hoc schema parsing. | ||||||
|  | @ -1181,58 +1183,69 @@ def norm_trade_records( | ||||||
|     return {r.tid: r for r in records} |     return {r.tid: r for r in records} | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def parse_flex_dt( | def trades_to_ledger_entries( | ||||||
|     record: str, |  | ||||||
| ) -> pendulum.datetime: |  | ||||||
|     date, ts = record.split(';') |  | ||||||
|     dt = pendulum.parse(date) |  | ||||||
|     ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}' |  | ||||||
|     tsdt = pendulum.parse(ts) |  | ||||||
|     return dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second) |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| def api_trades_to_ledger_entries( |  | ||||||
|     accounts: bidict, |     accounts: bidict, | ||||||
|     trade_entries: list[object], |     trade_entries: list[object], | ||||||
|  |     source_type: str = 'api', | ||||||
| 
 | 
 | ||||||
| ) -> dict: | ) -> dict: | ||||||
|     ''' |     ''' | ||||||
|     Convert API execution objects entry objects into ``dict`` form, |     Convert either of API execution objects or flex report | ||||||
|     pretty much straight up without modification except add |     entry objects into ``dict`` form, pretty much straight up | ||||||
|     a `pydatetime` field from the parsed timestamp. |     without modification. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     trades_by_account = {} |     trades_by_account = {} | ||||||
|  | 
 | ||||||
|     for t in trade_entries: |     for t in trade_entries: | ||||||
|         # NOTE: example of schema we pull from the API client. |         if source_type == 'flex': | ||||||
|         # { |             entry = t.__dict__ | ||||||
|         #     'commissionReport': CommissionReport(... |  | ||||||
|         #     'contract': {... |  | ||||||
|         #     'execution': Execution(... |  | ||||||
|         #     'time': 1654801166.0 |  | ||||||
|         # } |  | ||||||
| 
 | 
 | ||||||
|         # flatten all sub-dicts and values into one top level entry. |             # XXX: LOL apparently ``toml`` has a bug | ||||||
|         entry = {} |             # where a section key error will show up in the write | ||||||
|         for section, val in t.items(): |             # if you leave a table key as an `int`? So i guess | ||||||
|             match section: |             # cast to strs for all keys.. | ||||||
|                 case 'contract' | 'execution' | 'commissionReport': |  | ||||||
|                     # sub-dict cases |  | ||||||
|                     entry.update(val) |  | ||||||
| 
 | 
 | ||||||
|                 case 'time': |             # oddly for some so-called "BookTrade" entries | ||||||
|                     # ib has wack ns timestamps, or is that us? |             # this field seems to be blank, no cuckin clue. | ||||||
|                     continue |             # trade['ibExecID'] | ||||||
|  |             tid = str(entry.get('ibExecID') or entry['tradeID']) | ||||||
|  |             # date = str(entry['tradeDate']) | ||||||
| 
 | 
 | ||||||
|                 case _: |             # XXX: is it going to cause problems if a account name | ||||||
|                     entry[section] = val |             # get's lost? The user should be able to find it based | ||||||
|  |             # on the actual exec history right? | ||||||
|  |             acctid = accounts[str(entry['accountId'])] | ||||||
| 
 | 
 | ||||||
|         tid = str(entry['execId']) |         elif source_type == 'api': | ||||||
|         dt = pendulum.from_timestamp(entry['time']) |             # NOTE: example of schema we pull from the API client. | ||||||
|         # TODO: why isn't this showing seconds in the str? |             # { | ||||||
|         entry['pydatetime'] = dt |             #     'commissionReport': CommissionReport(... | ||||||
|         entry['datetime'] = str(dt) |             #     'contract': {... | ||||||
|         acctid = accounts[entry['acctNumber']] |             #     'execution': Execution(... | ||||||
|  |             #     'time': 1654801166.0 | ||||||
|  |             # } | ||||||
|  | 
 | ||||||
|  |             # flatten all sub-dicts and values into one top level entry. | ||||||
|  |             entry = {} | ||||||
|  |             for section, val in t.items(): | ||||||
|  |                 match section: | ||||||
|  |                     case 'contract' | 'execution' | 'commissionReport': | ||||||
|  |                         # sub-dict cases | ||||||
|  |                         entry.update(val) | ||||||
|  | 
 | ||||||
|  |                     case 'time': | ||||||
|  |                         # ib has wack ns timestamps, or is that us? | ||||||
|  |                         continue | ||||||
|  | 
 | ||||||
|  |                     case _: | ||||||
|  |                         entry[section] = val | ||||||
|  | 
 | ||||||
|  |             tid = str(entry['execId']) | ||||||
|  |             dt = pendulum.from_timestamp(entry['time']) | ||||||
|  |             # TODO: why isn't this showing seconds in the str? | ||||||
|  |             entry['date'] = str(dt) | ||||||
|  |             acctid = accounts[entry['acctNumber']] | ||||||
| 
 | 
 | ||||||
|         if not tid: |         if not tid: | ||||||
|             # this is likely some kind of internal adjustment |             # this is likely some kind of internal adjustment | ||||||
|  | @ -1250,73 +1263,6 @@ def api_trades_to_ledger_entries( | ||||||
|             acctid, {} |             acctid, {} | ||||||
|         )[tid] = entry |         )[tid] = entry | ||||||
| 
 | 
 | ||||||
|     # sort entries in output by python based datetime |  | ||||||
|     for acctid in trades_by_account: |  | ||||||
|         trades_by_account[acctid] = dict(sorted( |  | ||||||
|             trades_by_account[acctid].items(), |  | ||||||
|             key=lambda entry: entry[1].pop('pydatetime'), |  | ||||||
|         )) |  | ||||||
| 
 |  | ||||||
|     return trades_by_account |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| def flex_records_to_ledger_entries( |  | ||||||
|     accounts: bidict, |  | ||||||
|     trade_entries: list[object], |  | ||||||
| 
 |  | ||||||
| ) -> dict: |  | ||||||
|     ''' |  | ||||||
|     Convert flex report entry objects into ``dict`` form, pretty much |  | ||||||
|     straight up without modification except add a `pydatetime` field |  | ||||||
|     from the parsed timestamp. |  | ||||||
| 
 |  | ||||||
|     ''' |  | ||||||
|     trades_by_account = {} |  | ||||||
|     for t in trade_entries: |  | ||||||
|         entry = t.__dict__ |  | ||||||
| 
 |  | ||||||
|         # XXX: LOL apparently ``toml`` has a bug |  | ||||||
|         # where a section key error will show up in the write |  | ||||||
|         # if you leave a table key as an `int`? So i guess |  | ||||||
|         # cast to strs for all keys.. |  | ||||||
| 
 |  | ||||||
|         # oddly for some so-called "BookTrade" entries |  | ||||||
|         # this field seems to be blank, no cuckin clue. |  | ||||||
|         # trade['ibExecID'] |  | ||||||
|         tid = str(entry.get('ibExecID') or entry['tradeID']) |  | ||||||
|         # date = str(entry['tradeDate']) |  | ||||||
| 
 |  | ||||||
|         # XXX: is it going to cause problems if a account name |  | ||||||
|         # get's lost? The user should be able to find it based |  | ||||||
|         # on the actual exec history right? |  | ||||||
|         acctid = accounts[str(entry['accountId'])] |  | ||||||
| 
 |  | ||||||
|         # probably a flex record with a wonky non-std timestamp.. |  | ||||||
|         dt = entry['pydatetime'] = parse_flex_dt(entry['dateTime']) |  | ||||||
|         entry['datetime'] = str(dt) |  | ||||||
| 
 |  | ||||||
|         if not tid: |  | ||||||
|             # this is likely some kind of internal adjustment |  | ||||||
|             # transaction, likely one of the following: |  | ||||||
|             # - an expiry event that will show a "book trade" indicating |  | ||||||
|             #   some adjustment to cash balances: zeroing or itm settle. |  | ||||||
|             # - a manual cash balance position adjustment likely done by |  | ||||||
|             #   the user from the accounts window in TWS where they can |  | ||||||
|             #   manually set the avg price and size: |  | ||||||
|             #   https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST |  | ||||||
|             log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}') |  | ||||||
|             continue |  | ||||||
| 
 |  | ||||||
|         trades_by_account.setdefault( |  | ||||||
|             acctid, {} |  | ||||||
|         )[tid] = entry |  | ||||||
| 
 |  | ||||||
|     for acctid in trades_by_account: |  | ||||||
|         trades_by_account[acctid] = dict(sorted( |  | ||||||
|             trades_by_account[acctid].items(), |  | ||||||
|             key=lambda entry: entry[1]['pydatetime'], |  | ||||||
|         )) |  | ||||||
| 
 |  | ||||||
|     return trades_by_account |     return trades_by_account | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -1363,16 +1309,15 @@ def load_flex_trades( | ||||||
|     ln = len(trade_entries) |     ln = len(trade_entries) | ||||||
|     log.info(f'Loaded {ln} trades from flex query') |     log.info(f'Loaded {ln} trades from flex query') | ||||||
| 
 | 
 | ||||||
|     trades_by_account = flex_records_to_ledger_entries( |     trades_by_account = trades_to_ledger_entries( | ||||||
|         conf['accounts'].inverse,  # reverse map to user account names |         # get reverse map to user account names | ||||||
|  |         conf['accounts'].inverse, | ||||||
|         trade_entries, |         trade_entries, | ||||||
|  |         source_type='flex', | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     ledger_dict: Optional[dict] = None |  | ||||||
| 
 |  | ||||||
|     for acctid in trades_by_account: |     for acctid in trades_by_account: | ||||||
|         trades_by_id = trades_by_account[acctid] |         trades_by_id = trades_by_account[acctid] | ||||||
| 
 |  | ||||||
|         with open_trade_ledger('ib', acctid) as ledger_dict: |         with open_trade_ledger('ib', acctid) as ledger_dict: | ||||||
|             tid_delta = set(trades_by_id) - set(ledger_dict) |             tid_delta = set(trades_by_id) - set(ledger_dict) | ||||||
|             log.info( |             log.info( | ||||||
|  | @ -1380,11 +1325,9 @@ def load_flex_trades( | ||||||
|                 f'{pformat(tid_delta)}' |                 f'{pformat(tid_delta)}' | ||||||
|             ) |             ) | ||||||
|             if tid_delta: |             if tid_delta: | ||||||
|                 sorted_delta = dict(sorted( |                 ledger_dict.update( | ||||||
|                     {tid: trades_by_id[tid] for tid in tid_delta}.items(), |                     {tid: trades_by_id[tid] for tid in tid_delta} | ||||||
|                     key=lambda entry: entry[1].pop('pydatetime'), |                 ) | ||||||
|                 )) |  | ||||||
|                 ledger_dict.update(sorted_delta) |  | ||||||
| 
 | 
 | ||||||
|     return ledger_dict |     return ledger_dict | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -254,9 +254,6 @@ async def wait_on_data_reset( | ||||||
|     return False |     return False | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| _data_resetter_task: trio.Task | None = None |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| async def get_bars( | async def get_bars( | ||||||
| 
 | 
 | ||||||
|     proxy: MethodProxy, |     proxy: MethodProxy, | ||||||
|  | @ -267,7 +264,7 @@ async def get_bars( | ||||||
|     end_dt: str = '', |     end_dt: str = '', | ||||||
| 
 | 
 | ||||||
|     # TODO: make this more dynamic based on measured frame rx latency.. |     # TODO: make this more dynamic based on measured frame rx latency.. | ||||||
|     timeout: float = 3,  # how long before we trigger a feed reset |     timeout: float = 1.5,  # how long before we trigger a feed reset | ||||||
| 
 | 
 | ||||||
|     task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, |     task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, | ||||||
| 
 | 
 | ||||||
|  | @ -277,15 +274,13 @@ async def get_bars( | ||||||
|     a ``MethoProxy``. |     a ``MethoProxy``. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     global _data_resetter_task |     data_cs: Optional[trio.CancelScope] = None | ||||||
| 
 |     result: Optional[tuple[ | ||||||
|     data_cs: trio.CancelScope | None = None |  | ||||||
|     result: tuple[ |  | ||||||
|         ibis.objects.BarDataList, |         ibis.objects.BarDataList, | ||||||
|         np.ndarray, |         np.ndarray, | ||||||
|         datetime, |         datetime, | ||||||
|         datetime, |         datetime, | ||||||
|     ] | None = None |     ]] = None | ||||||
|     result_ready = trio.Event() |     result_ready = trio.Event() | ||||||
| 
 | 
 | ||||||
|     async def query(): |     async def query(): | ||||||
|  | @ -312,7 +307,7 @@ async def get_bars( | ||||||
|                     log.warning( |                     log.warning( | ||||||
|                         f'History is blank for {dt_duration} from {end_dt}' |                         f'History is blank for {dt_duration} from {end_dt}' | ||||||
|                     ) |                     ) | ||||||
|                     end_dt -= dt_duration |                     end_dt = end_dt.subtract(dt_duration) | ||||||
|                     continue |                     continue | ||||||
| 
 | 
 | ||||||
|                 if bars_array is None: |                 if bars_array is None: | ||||||
|  | @ -405,10 +400,6 @@ async def get_bars( | ||||||
|                 else: |                 else: | ||||||
|                     raise |                     raise | ||||||
| 
 | 
 | ||||||
|     # TODO: make this global across all history task/requests |  | ||||||
|     # such that simultaneous symbol queries don't try data resettingn |  | ||||||
|     # too fast.. |  | ||||||
|     unset_resetter: bool = False |  | ||||||
|     async with trio.open_nursery() as nurse: |     async with trio.open_nursery() as nurse: | ||||||
| 
 | 
 | ||||||
|         # start history request that we allow |         # start history request that we allow | ||||||
|  | @ -423,14 +414,6 @@ async def get_bars( | ||||||
|                 await result_ready.wait() |                 await result_ready.wait() | ||||||
|                 break |                 break | ||||||
| 
 | 
 | ||||||
|             if _data_resetter_task: |  | ||||||
|                 # don't double invoke the reset hack if another |  | ||||||
|                 # requester task already has it covered. |  | ||||||
|                 continue |  | ||||||
|             else: |  | ||||||
|                 _data_resetter_task = trio.lowlevel.current_task() |  | ||||||
|                 unset_resetter = True |  | ||||||
| 
 |  | ||||||
|             # spawn new data reset task |             # spawn new data reset task | ||||||
|             data_cs, reset_done = await nurse.start( |             data_cs, reset_done = await nurse.start( | ||||||
|                 partial( |                 partial( | ||||||
|  | @ -442,7 +425,6 @@ async def get_bars( | ||||||
|             # sync wait on reset to complete |             # sync wait on reset to complete | ||||||
|             await reset_done.wait() |             await reset_done.wait() | ||||||
| 
 | 
 | ||||||
|     _data_resetter_task = None if unset_resetter else _data_resetter_task |  | ||||||
|     return result, data_cs is not None |     return result, data_cs is not None | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -501,9 +483,7 @@ async def _setup_quote_stream( | ||||||
| 
 | 
 | ||||||
|     to_trio.send_nowait(None) |     to_trio.send_nowait(None) | ||||||
| 
 | 
 | ||||||
|     async with load_aio_clients( |     async with load_aio_clients() as accts2clients: | ||||||
|         disconnect_on_exit=False, |  | ||||||
|     ) as accts2clients: |  | ||||||
|         caccount_name, client = get_preferred_data_client(accts2clients) |         caccount_name, client = get_preferred_data_client(accts2clients) | ||||||
|         contract = contract or (await client.find_contract(symbol)) |         contract = contract or (await client.find_contract(symbol)) | ||||||
|         ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) |         ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) | ||||||
|  | @ -583,8 +563,7 @@ async def open_aio_quote_stream( | ||||||
|     from_aio = _quote_streams.get(symbol) |     from_aio = _quote_streams.get(symbol) | ||||||
|     if from_aio: |     if from_aio: | ||||||
| 
 | 
 | ||||||
|         # if we already have a cached feed deliver a rx side clone |         # if we already have a cached feed deliver a rx side clone to consumer | ||||||
|         # to consumer |  | ||||||
|         async with broadcast_receiver( |         async with broadcast_receiver( | ||||||
|             from_aio, |             from_aio, | ||||||
|             2**6, |             2**6, | ||||||
|  | @ -775,97 +754,67 @@ async def stream_quotes( | ||||||
|             await trio.sleep_forever() |             await trio.sleep_forever() | ||||||
|             return  # we never expect feed to come up? |             return  # we never expect feed to come up? | ||||||
| 
 | 
 | ||||||
|         cs: Optional[trio.CancelScope] = None |         async with open_aio_quote_stream( | ||||||
|         startup: bool = True |             symbol=sym, | ||||||
|         while ( |             contract=con, | ||||||
|             startup |         ) as stream: | ||||||
|             or cs.cancel_called |  | ||||||
|         ): |  | ||||||
|             with trio.CancelScope() as cs: |  | ||||||
|                 async with ( |  | ||||||
|                     trio.open_nursery() as nurse, |  | ||||||
|                     open_aio_quote_stream( |  | ||||||
|                         symbol=sym, |  | ||||||
|                         contract=con, |  | ||||||
|                     ) as stream, |  | ||||||
|                 ): |  | ||||||
|                     # ugh, clear ticks since we've consumed them |  | ||||||
|                     # (ahem, ib_insync is stateful trash) |  | ||||||
|                     first_ticker.ticks = [] |  | ||||||
| 
 | 
 | ||||||
|                     # only on first entry at feed boot up |             # ugh, clear ticks since we've consumed them | ||||||
|                     if startup: |             # (ahem, ib_insync is stateful trash) | ||||||
|                         startup = False |             first_ticker.ticks = [] | ||||||
|                         task_status.started((init_msgs, first_quote)) |  | ||||||
| 
 | 
 | ||||||
|                     # start a stream restarter task which monitors the |             task_status.started((init_msgs, first_quote)) | ||||||
|                     # data feed event. |  | ||||||
|                     async def reset_on_feed(): |  | ||||||
| 
 | 
 | ||||||
|                         # TODO: this seems to be surpressed from the |             async with aclosing(stream): | ||||||
|                         # traceback in ``tractor``? |                 if syminfo.get('no_vlm', False): | ||||||
|                         # assert 0 |  | ||||||
| 
 | 
 | ||||||
|                         rt_ev = proxy.status_event( |                     # generally speaking these feeds don't | ||||||
|                             'Market data farm connection is OK:usfarm' |                     # include vlm data. | ||||||
|                         ) |                     atype = syminfo['asset_type'] | ||||||
|                         await rt_ev.wait() |                     log.info( | ||||||
|                         cs.cancel()  # cancel called should now be set |                         f'Non-vlm asset {sym}@{atype}, skipping quote poll...' | ||||||
|  |                     ) | ||||||
| 
 | 
 | ||||||
|                     nurse.start_soon(reset_on_feed) |                 else: | ||||||
| 
 |                     # wait for real volume on feed (trading might be closed) | ||||||
|                     async with aclosing(stream): |                     while True: | ||||||
|                         if syminfo.get('no_vlm', False): |                         ticker = await stream.receive() | ||||||
| 
 |  | ||||||
|                             # generally speaking these feeds don't |  | ||||||
|                             # include vlm data. |  | ||||||
|                             atype = syminfo['asset_type'] |  | ||||||
|                             log.info( |  | ||||||
|                                 f'No-vlm {sym}@{atype}, skipping quote poll' |  | ||||||
|                             ) |  | ||||||
| 
 | 
 | ||||||
|  |                         # for a real volume contract we rait for the first | ||||||
|  |                         # "real" trade to take place | ||||||
|  |                         if ( | ||||||
|  |                             # not calc_price | ||||||
|  |                             # and not ticker.rtTime | ||||||
|  |                             not ticker.rtTime | ||||||
|  |                         ): | ||||||
|  |                             # spin consuming tickers until we get a real | ||||||
|  |                             # market datum | ||||||
|  |                             log.debug(f"New unsent ticker: {ticker}") | ||||||
|  |                             continue | ||||||
|                         else: |                         else: | ||||||
|                             # wait for real volume on feed (trading might be |                             log.debug("Received first real volume tick") | ||||||
|                             # closed) |  | ||||||
|                             while True: |  | ||||||
|                                 ticker = await stream.receive() |  | ||||||
| 
 |  | ||||||
|                                 # for a real volume contract we rait for |  | ||||||
|                                 # the first "real" trade to take place |  | ||||||
|                                 if ( |  | ||||||
|                                     # not calc_price |  | ||||||
|                                     # and not ticker.rtTime |  | ||||||
|                                     not ticker.rtTime |  | ||||||
|                                 ): |  | ||||||
|                                     # spin consuming tickers until we |  | ||||||
|                                     # get a real market datum |  | ||||||
|                                     log.debug(f"New unsent ticker: {ticker}") |  | ||||||
|                                     continue |  | ||||||
|                                 else: |  | ||||||
|                                     log.debug("Received first volume tick") |  | ||||||
|                                     # ugh, clear ticks since we've |  | ||||||
|                                     # consumed them (ahem, ib_insync is |  | ||||||
|                                     # truly stateful trash) |  | ||||||
|                                     ticker.ticks = [] |  | ||||||
| 
 |  | ||||||
|                                     # XXX: this works because we don't use |  | ||||||
|                                     # ``aclosing()`` above? |  | ||||||
|                                     break |  | ||||||
| 
 |  | ||||||
|                             quote = normalize(ticker) |  | ||||||
|                             log.debug(f"First ticker received {quote}") |  | ||||||
| 
 |  | ||||||
|                         # tell caller quotes are now coming in live |  | ||||||
|                         feed_is_live.set() |  | ||||||
| 
 |  | ||||||
|                         # last = time.time() |  | ||||||
|                         async for ticker in stream: |  | ||||||
|                             quote = normalize(ticker) |  | ||||||
|                             await send_chan.send({quote['fqsn']: quote}) |  | ||||||
| 
 |  | ||||||
|                             # ugh, clear ticks since we've consumed them |                             # ugh, clear ticks since we've consumed them | ||||||
|  |                             # (ahem, ib_insync is truly stateful trash) | ||||||
|                             ticker.ticks = [] |                             ticker.ticks = [] | ||||||
|                             # last = time.time() | 
 | ||||||
|  |                             # XXX: this works because we don't use | ||||||
|  |                             # ``aclosing()`` above? | ||||||
|  |                             break | ||||||
|  | 
 | ||||||
|  |                     quote = normalize(ticker) | ||||||
|  |                     log.debug(f"First ticker received {quote}") | ||||||
|  | 
 | ||||||
|  |                 # tell caller quotes are now coming in live | ||||||
|  |                 feed_is_live.set() | ||||||
|  | 
 | ||||||
|  |                 # last = time.time() | ||||||
|  |                 async for ticker in stream: | ||||||
|  |                     quote = normalize(ticker) | ||||||
|  |                     await send_chan.send({quote['fqsn']: quote}) | ||||||
|  | 
 | ||||||
|  |                     # ugh, clear ticks since we've consumed them | ||||||
|  |                     ticker.ticks = [] | ||||||
|  |                     # last = time.time() | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def data_reset_hack( | async def data_reset_hack( | ||||||
|  | @ -973,14 +922,7 @@ async def open_symbol_search( | ||||||
|                     except trio.WouldBlock: |                     except trio.WouldBlock: | ||||||
|                         pass |                         pass | ||||||
| 
 | 
 | ||||||
|                 if ( |                 if not pattern or pattern.isspace(): | ||||||
|                     not pattern |  | ||||||
|                     or pattern.isspace() |  | ||||||
| 
 |  | ||||||
|                     # XXX: not sure if this is a bad assumption but it |  | ||||||
|                     # seems to make search snappier? |  | ||||||
|                     or len(pattern) < 1 |  | ||||||
|                 ): |  | ||||||
|                     log.warning('empty pattern received, skipping..') |                     log.warning('empty pattern received, skipping..') | ||||||
| 
 | 
 | ||||||
|                     # TODO: *BUG* if nothing is returned here the client |                     # TODO: *BUG* if nothing is returned here the client | ||||||
|  |  | ||||||
|  | @ -138,26 +138,25 @@ def cli(ctx, brokers, loglevel, tl, configdir): | ||||||
| @click.pass_obj | @click.pass_obj | ||||||
| def services(config, tl, names): | def services(config, tl, names): | ||||||
| 
 | 
 | ||||||
|     from .._daemon import open_piker_runtime |  | ||||||
| 
 |  | ||||||
|     async def list_services(): |     async def list_services(): | ||||||
|         async with ( | 
 | ||||||
|             open_piker_runtime( |         async with tractor.get_arbiter( | ||||||
|                 name='service_query', |             *_tractor_kwargs['arbiter_addr'] | ||||||
|                 loglevel=config['loglevel'] if tl else None, |         ) as portal: | ||||||
|             ), |  | ||||||
|             tractor.get_arbiter( |  | ||||||
|                 *_tractor_kwargs['arbiter_addr'] |  | ||||||
|             ) as portal |  | ||||||
|         ): |  | ||||||
|             registry = await portal.run_from_ns('self', 'get_registry') |             registry = await portal.run_from_ns('self', 'get_registry') | ||||||
|             json_d = {} |             json_d = {} | ||||||
|             for key, socket in registry.items(): |             for key, socket in registry.items(): | ||||||
|  |                 # name, uuid = uid | ||||||
|                 host, port = socket |                 host, port = socket | ||||||
|                 json_d[key] = f'{host}:{port}' |                 json_d[key] = f'{host}:{port}' | ||||||
|             click.echo(f"{colorize_json(json_d)}") |             click.echo(f"{colorize_json(json_d)}") | ||||||
| 
 | 
 | ||||||
|     trio.run(list_services) |     tractor.run( | ||||||
|  |         list_services, | ||||||
|  |         name='service_query', | ||||||
|  |         loglevel=config['loglevel'] if tl else None, | ||||||
|  |         arbiter_addr=_tractor_kwargs['arbiter_addr'], | ||||||
|  |     ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def _load_clis() -> None: | def _load_clis() -> None: | ||||||
|  |  | ||||||
|  | @ -333,7 +333,7 @@ async def start_backfill( | ||||||
|                 # do a decently sized backfill and load it into storage. |                 # do a decently sized backfill and load it into storage. | ||||||
|                 periods = { |                 periods = { | ||||||
|                     1: {'days': 6}, |                     1: {'days': 6}, | ||||||
|                     60: {'years': 6}, |                     60: {'years': 10}, | ||||||
|                 } |                 } | ||||||
| 
 | 
 | ||||||
|             kwargs = periods[step_size_s] |             kwargs = periods[step_size_s] | ||||||
|  | @ -348,45 +348,36 @@ async def start_backfill( | ||||||
|         # last retrieved start dt to the next request as |         # last retrieved start dt to the next request as | ||||||
|         # it's end dt. |         # it's end dt. | ||||||
|         starts: set[datetime] = set() |         starts: set[datetime] = set() | ||||||
|  | 
 | ||||||
|         while start_dt > last_tsdb_dt: |         while start_dt > last_tsdb_dt: | ||||||
|  | 
 | ||||||
|  |             print(f"QUERY end_dt={start_dt}") | ||||||
|             try: |             try: | ||||||
|                 log.info( |                 log.info( | ||||||
|                     f'Requesting {step_size_s}s frame ending in {start_dt}' |                     f'Requesting {step_size_s}s frame ending in {start_dt}' | ||||||
|                 ) |                 ) | ||||||
|                 array, next_start_dt, end_dt = await hist( |                 array, start_dt, end_dt = await hist( | ||||||
|                     timeframe, |                     timeframe, | ||||||
|                     end_dt=start_dt, |                     end_dt=start_dt, | ||||||
|                 ) |                 ) | ||||||
| 
 |  | ||||||
|                 if next_start_dt in starts: |  | ||||||
|                     start_dt = min(starts) |  | ||||||
|                     print("SKIPPING DUPLICATE FRAME @ {next_start_dt}") |  | ||||||
|                     continue |  | ||||||
| 
 |  | ||||||
|                 # only update new start point if new |  | ||||||
|                 start_dt = next_start_dt |  | ||||||
|                 starts.add(start_dt) |  | ||||||
| 
 |  | ||||||
|                 assert array['time'][0] == start_dt.timestamp() |                 assert array['time'][0] == start_dt.timestamp() | ||||||
| 
 | 
 | ||||||
|             except NoData: |             except NoData: | ||||||
|                 # XXX: unhandled history gap (shouldn't happen?) |  | ||||||
|                 log.warning( |                 log.warning( | ||||||
|                     f'NO DATA for {frame_size_s}s frame @ {start_dt} ?!?' |                     f'NO DATA for {frame_size_s}s frame @ {start_dt} ?!?' | ||||||
|                 ) |                 ) | ||||||
|                 await tractor.breakpoint() |                 return None  # discard signal | ||||||
| 
 | 
 | ||||||
|             except DataUnavailable:  # as duerr: |             except DataUnavailable as duerr: | ||||||
|                 # broker is being a bish and we can't pull any more.. |                 # broker is being a bish and we can't pull | ||||||
|                 log.warning( |                 # any more.. | ||||||
|                     f'NO-MORE-DATA: backend {mod.name} halted history!?' |                 log.warning('backend halted on data deliver !?!?') | ||||||
|                 ) |  | ||||||
| 
 | 
 | ||||||
|                 # ugh, what's a better way? |                 # ugh, what's a better way? | ||||||
|                 # TODO: fwiw, we probably want a way to signal a throttle |                 # TODO: fwiw, we probably want a way to signal a throttle | ||||||
|                 # condition (eg. with ib) so that we can halt the |                 # condition (eg. with ib) so that we can halt the | ||||||
|                 # request loop until the condition is resolved? |                 # request loop until the condition is resolved? | ||||||
|                 return |                 return duerr | ||||||
| 
 | 
 | ||||||
|             diff = end_dt - start_dt |             diff = end_dt - start_dt | ||||||
|             frame_time_diff_s = diff.seconds |             frame_time_diff_s = diff.seconds | ||||||
|  | @ -403,6 +394,22 @@ async def start_backfill( | ||||||
|                     f'{diff} ~= {frame_time_diff_s} seconds' |                     f'{diff} ~= {frame_time_diff_s} seconds' | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|  |             array, _start_dt, end_dt = await hist( | ||||||
|  |                 timeframe, | ||||||
|  |                 end_dt=start_dt, | ||||||
|  |             ) | ||||||
|  | 
 | ||||||
|  |             if ( | ||||||
|  |                 _start_dt in starts | ||||||
|  |             ): | ||||||
|  |                 print("SKIPPING DUPLICATE FRAME @ {_start_dt}") | ||||||
|  |                 start_dt = min(starts) | ||||||
|  |                 continue | ||||||
|  | 
 | ||||||
|  |             # only update new start point if new | ||||||
|  |             start_dt = _start_dt | ||||||
|  |             starts.add(start_dt) | ||||||
|  | 
 | ||||||
|             to_push = diff_history( |             to_push = diff_history( | ||||||
|                 array, |                 array, | ||||||
|                 start_dt, |                 start_dt, | ||||||
|  | @ -491,10 +498,10 @@ async def manage_history( | ||||||
|         readonly=False, |         readonly=False, | ||||||
|     ) |     ) | ||||||
|     # TODO: history validation |     # TODO: history validation | ||||||
|     # if not opened: |     if not opened: | ||||||
|     #     raise RuntimeError( |         raise RuntimeError( | ||||||
|     #         "Persistent shm for sym was already open?!" |             "Persistent shm for sym was already open?!" | ||||||
|     #     ) |         ) | ||||||
| 
 | 
 | ||||||
|     rt_shm, opened = maybe_open_shm_array( |     rt_shm, opened = maybe_open_shm_array( | ||||||
|         key=f'{fqsn}_rt', |         key=f'{fqsn}_rt', | ||||||
|  | @ -506,10 +513,10 @@ async def manage_history( | ||||||
|         readonly=False, |         readonly=False, | ||||||
|         size=3*_secs_in_day, |         size=3*_secs_in_day, | ||||||
|     ) |     ) | ||||||
|     # if not opened: |     if not opened: | ||||||
|     #     raise RuntimeError( |         raise RuntimeError( | ||||||
|     #         "Persistent shm for sym was already open?!" |             "Persistent shm for sym was already open?!" | ||||||
|     #     ) |         ) | ||||||
| 
 | 
 | ||||||
|     log.info('Scanning for existing `marketstored`') |     log.info('Scanning for existing `marketstored`') | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
							
								
								
									
										71
									
								
								piker/pp.py
								
								
								
								
							
							
						
						
									
										71
									
								
								piker/pp.py
								
								
								
								
							|  | @ -20,7 +20,6 @@ that doesn't try to cuk most humans who prefer to not lose their moneys.. | ||||||
| (looking at you `ib` and dirt-bird friends) | (looking at you `ib` and dirt-bird friends) | ||||||
| 
 | 
 | ||||||
| ''' | ''' | ||||||
| from __future__ import annotations |  | ||||||
| from contextlib import contextmanager as cm | from contextlib import contextmanager as cm | ||||||
| from pprint import pformat | from pprint import pformat | ||||||
| import os | import os | ||||||
|  | @ -139,31 +138,13 @@ class Position(Struct): | ||||||
| 
 | 
 | ||||||
|     # ordered record of known constituent trade messages |     # ordered record of known constituent trade messages | ||||||
|     clears: dict[ |     clears: dict[ | ||||||
|         str | int,  # trade id |         Union[str, int, Status],  # trade id | ||||||
|         dict[str, Any],  # transaction history summaries |         dict[str, Any],  # transaction history summaries | ||||||
|     ] = {} |     ] = {} | ||||||
|     first_clear_dt: Optional[datetime] = None |     first_clear_dt: Optional[datetime] = None | ||||||
| 
 | 
 | ||||||
|     expiry: Optional[datetime] = None |     expiry: Optional[datetime] = None | ||||||
| 
 | 
 | ||||||
|     # @property |  | ||||||
|     # def clears(self) -> dict[ |  | ||||||
|     #     Union[str, int, Status],  # trade id |  | ||||||
|     #     dict[str, Any],  # transaction history summaries |  | ||||||
|     # ]: |  | ||||||
|     #     ''' |  | ||||||
|     #     Datetime sorted reference to internal clears table. |  | ||||||
| 
 |  | ||||||
|     #     ''' |  | ||||||
|     #     # self._clears = {} |  | ||||||
|     #     self._clears = dict(sorted( |  | ||||||
|     #         self._clears.items(), |  | ||||||
|     #         key=lambda entry: entry[1]['dt'], |  | ||||||
|     #     )) |  | ||||||
|     #         # self._clears[k] = v |  | ||||||
| 
 |  | ||||||
|     #     return self._clears |  | ||||||
| 
 |  | ||||||
|     def to_dict(self) -> dict: |     def to_dict(self) -> dict: | ||||||
|         return { |         return { | ||||||
|             f: getattr(self, f) |             f: getattr(self, f) | ||||||
|  | @ -238,10 +219,6 @@ class Position(Struct): | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         clears = list(self.clears.values()) |         clears = list(self.clears.values()) | ||||||
|         if not clears: |  | ||||||
|             log.warning(f'No clears table for {self.symbol}!?') |  | ||||||
|             return |  | ||||||
| 
 |  | ||||||
|         self.first_clear_dt = min(list(entry['dt'] for entry in clears)) |         self.first_clear_dt = min(list(entry['dt'] for entry in clears)) | ||||||
|         last_clear = clears[-1] |         last_clear = clears[-1] | ||||||
| 
 | 
 | ||||||
|  | @ -646,7 +623,6 @@ class PpTable(Struct): | ||||||
| 
 | 
 | ||||||
|     def to_toml( |     def to_toml( | ||||||
|         self, |         self, | ||||||
|         min_clears: bool = True, |  | ||||||
|     ) -> dict[str, Any]: |     ) -> dict[str, Any]: | ||||||
| 
 | 
 | ||||||
|         active, closed = self.dump_active() |         active, closed = self.dump_active() | ||||||
|  | @ -659,9 +635,7 @@ class PpTable(Struct): | ||||||
| 
 | 
 | ||||||
|             # keep the minimal amount of clears that make up this |             # keep the minimal amount of clears that make up this | ||||||
|             # position since the last net-zero state. |             # position since the last net-zero state. | ||||||
|             if min_clears: |             pos.minimize_clears() | ||||||
|                 pos.minimize_clears() |  | ||||||
| 
 |  | ||||||
|             pos.ensure_state() |             pos.ensure_state() | ||||||
| 
 | 
 | ||||||
|             # serialize to pre-toml form |             # serialize to pre-toml form | ||||||
|  | @ -708,8 +682,6 @@ def load_pps_from_ledger( | ||||||
|     brokername: str, |     brokername: str, | ||||||
|     acctname: str, |     acctname: str, | ||||||
| 
 | 
 | ||||||
|     table: Optional[PpTable] = None, |  | ||||||
| 
 |  | ||||||
|     # post normalization filter on ledger entries to be processed |     # post normalization filter on ledger entries to be processed | ||||||
|     filter_by: Optional[list[dict]] = None, |     filter_by: Optional[list[dict]] = None, | ||||||
| 
 | 
 | ||||||
|  | @ -726,6 +698,7 @@ def load_pps_from_ledger( | ||||||
|     ''' |     ''' | ||||||
|     with ( |     with ( | ||||||
|         open_trade_ledger(brokername, acctname) as ledger, |         open_trade_ledger(brokername, acctname) as ledger, | ||||||
|  |         open_pps(brokername, acctname) as table, | ||||||
|     ): |     ): | ||||||
|         if not ledger: |         if not ledger: | ||||||
|             # null case, no ledger file with content |             # null case, no ledger file with content | ||||||
|  | @ -743,11 +716,7 @@ def load_pps_from_ledger( | ||||||
|         else: |         else: | ||||||
|             records = src_records |             records = src_records | ||||||
| 
 | 
 | ||||||
|         if table is None: |         updated = table.update_from_trans(records) | ||||||
|             with open_pps(brokername, acctname) as table: |  | ||||||
|                 updated = table.update_from_trans(records) |  | ||||||
|         else: |  | ||||||
|             updated = table.update_from_trans(records) |  | ||||||
| 
 | 
 | ||||||
|     return records, updated |     return records, updated | ||||||
| 
 | 
 | ||||||
|  | @ -917,27 +886,15 @@ def open_pps( | ||||||
|         conf=conf, |         conf=conf, | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|     # first pass populate all missing clears record tables |  | ||||||
|     # for fqsn, entry in pps.items(): |  | ||||||
|     #     # convert clears sub-tables (only in this form |  | ||||||
|     #     # for toml re-presentation) back into a master table. |  | ||||||
|     #     clears_list = entry.get('clears', []) |  | ||||||
| 
 |  | ||||||
|     #     # attempt to reload from ledger |  | ||||||
|     #     if not clears_list: |  | ||||||
|     #         trans, pos = load_pps_from_ledger( |  | ||||||
|     #             brokername, |  | ||||||
|     #             acctid, |  | ||||||
|     #             filter_by=[entry['bsuid']], |  | ||||||
|     #             table=table, |  | ||||||
|     #         ) |  | ||||||
|     #         # breakpoint() |  | ||||||
| 
 |  | ||||||
|     # unmarshal/load ``pps.toml`` config entries into object form |     # unmarshal/load ``pps.toml`` config entries into object form | ||||||
|     # and update `PpTable` obj entries. |     # and update `PpTable` obj entries. | ||||||
|     for fqsn, entry in pps.items(): |     for fqsn, entry in pps.items(): | ||||||
|         bsuid = entry['bsuid'] |         bsuid = entry['bsuid'] | ||||||
| 
 | 
 | ||||||
|  |         # convert clears sub-tables (only in this form | ||||||
|  |         # for toml re-presentation) back into a master table. | ||||||
|  |         clears_list = entry['clears'] | ||||||
|  | 
 | ||||||
|         # index clears entries in "object" form by tid in a top |         # index clears entries in "object" form by tid in a top | ||||||
|         # level dict instead of a list (as is presented in our |         # level dict instead of a list (as is presented in our | ||||||
|         # ``pps.toml``). |         # ``pps.toml``). | ||||||
|  | @ -949,18 +906,6 @@ def open_pps( | ||||||
|         # processing of new clear events. |         # processing of new clear events. | ||||||
|         trans: list[Transaction] = [] |         trans: list[Transaction] = [] | ||||||
| 
 | 
 | ||||||
|         # convert clears sub-tables (only in this form |  | ||||||
|         # for toml re-presentation) back into a master table. |  | ||||||
|         clears_list = entry['clears'] |  | ||||||
| 
 |  | ||||||
|         # # attempt to reload from ledger |  | ||||||
|         # if not clears_list: |  | ||||||
|         #     trans, pos = load_pps_from_ledger( |  | ||||||
|         #         brokername, |  | ||||||
|         #         acctid, |  | ||||||
|         #         table=table, |  | ||||||
|         #     ) |  | ||||||
| 
 |  | ||||||
|         for clears_table in clears_list: |         for clears_table in clears_list: | ||||||
|             tid = clears_table.pop('tid') |             tid = clears_table.pop('tid') | ||||||
|             dtstr = clears_table['dt'] |             dtstr = clears_table['dt'] | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue