Compare commits
	
		
			No commits in common. "c53071e43a291251924f42cba84d8e01672b729f" and "96f5a8abb878c391c875e6c0b3853e397c8d50d2" have entirely different histories. 
		
	
	
		
			c53071e43a
			...
			96f5a8abb8
		
	
		|  | @ -195,8 +195,9 @@ async def open_piker_runtime( | |||
| 
 | ||||
| ) -> Optional[tractor._portal.Portal]: | ||||
|     ''' | ||||
|     Start a piker actor who's runtime will automatically sync with | ||||
|     existing piker actors on the local link based on configuration. | ||||
|     Start a piker actor who's runtime will automatically | ||||
|     sync with existing piker actors in local network | ||||
|     based on configuration. | ||||
| 
 | ||||
|     ''' | ||||
|     global _services | ||||
|  |  | |||
|  | @ -412,7 +412,7 @@ class Client: | |||
|             # ``end_dt`` which exceeds the ``duration``, | ||||
|             # - a timeout occurred in which case insync internals return | ||||
|             # an empty list thing with bars.clear()... | ||||
|             return [], np.empty(0), dt_duration | ||||
|             return [], np.empty(0) | ||||
|             # TODO: we could maybe raise ``NoData`` instead if we | ||||
|             # rewrite the method in the first case? right now there's no | ||||
|             # way to detect a timeout. | ||||
|  | @ -483,7 +483,7 @@ class Client: | |||
|         self, | ||||
|         pattern: str, | ||||
|         # how many contracts to search "up to" | ||||
|         upto: int = 16, | ||||
|         upto: int = 6, | ||||
|         asdicts: bool = True, | ||||
| 
 | ||||
|     ) -> dict[str, ContractDetails]: | ||||
|  | @ -518,16 +518,6 @@ class Client: | |||
| 
 | ||||
|                 exch = tract.exchange | ||||
|                 if exch not in _exch_skip_list: | ||||
| 
 | ||||
|                     # try to lookup any contracts from our adhoc set | ||||
|                     # since often the exchange/venue is named slightly | ||||
|                     # different (eg. BRR.CMECRYPTO` instead of just | ||||
|                     # `.CME`). | ||||
|                     info = _adhoc_symbol_map.get(sym) | ||||
|                     if info: | ||||
|                         con_kwargs, bars_kwargs = info | ||||
|                         exch = con_kwargs['exchange'] | ||||
| 
 | ||||
|                     # try get all possible contracts for symbol as per, | ||||
|                     # https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut | ||||
|                     con = ibis.Future( | ||||
|  | @ -1096,7 +1086,6 @@ async def load_aio_clients( | |||
|     # retry a few times to get the client going.. | ||||
|     connect_retries: int = 3, | ||||
|     connect_timeout: float = 0.5, | ||||
|     disconnect_on_exit: bool = True, | ||||
| 
 | ||||
| ) -> dict[str, Client]: | ||||
|     ''' | ||||
|  | @ -1238,11 +1227,10 @@ async def load_aio_clients( | |||
|     finally: | ||||
|         # TODO: for re-scans we'll want to not teardown clients which | ||||
|         # are up and stable right? | ||||
|         if disconnect_on_exit: | ||||
|             for acct, client in _accounts2clients.items(): | ||||
|                 log.info(f'Disconnecting {acct}@{client}') | ||||
|                 client.ib.disconnect() | ||||
|                 _client_cache.pop((host, port), None) | ||||
|         for acct, client in _accounts2clients.items(): | ||||
|             log.info(f'Disconnecting {acct}@{client}') | ||||
|             client.ib.disconnect() | ||||
|             _client_cache.pop((host, port), None) | ||||
| 
 | ||||
| 
 | ||||
| async def load_clients_for_trio( | ||||
|  |  | |||
|  | @ -305,7 +305,7 @@ async def update_ledger_from_api_trades( | |||
|         entry['listingExchange'] = pexch | ||||
| 
 | ||||
|     conf = get_config() | ||||
|     entries = api_trades_to_ledger_entries( | ||||
|     entries = trades_to_ledger_entries( | ||||
|         conf['accounts'].inverse, | ||||
|         trade_entries, | ||||
|     ) | ||||
|  | @ -371,8 +371,8 @@ async def update_and_audit_msgs( | |||
|                     else: | ||||
|                         entry = f'split_ratio = 1/{int(reverse_split_ratio)}' | ||||
| 
 | ||||
|                     raise ValueError( | ||||
|                     # log.error( | ||||
|                     # raise ValueError( | ||||
|                     log.error( | ||||
|                         f'POSITION MISMATCH ib <-> piker ledger:\n' | ||||
|                         f'ib: {ibppmsg}\n' | ||||
|                         f'piker: {msg}\n' | ||||
|  | @ -1123,16 +1123,18 @@ def norm_trade_records( | |||
|             continue | ||||
| 
 | ||||
|         # timestamping is way different in API records | ||||
|         dtstr = record.get('datetime') | ||||
|         date = record.get('date') | ||||
|         flex_dtstr = record.get('dateTime') | ||||
| 
 | ||||
|         if dtstr or date: | ||||
|             dt = pendulum.parse(dtstr or date) | ||||
| 
 | ||||
|         elif flex_dtstr: | ||||
|         if not date: | ||||
|             # probably a flex record with a wonky non-std timestamp.. | ||||
|             dt = parse_flex_dt(record['dateTime']) | ||||
|             date, ts = record['dateTime'].split(';') | ||||
|             dt = pendulum.parse(date) | ||||
|             ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}' | ||||
|             tsdt = pendulum.parse(ts) | ||||
|             dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second) | ||||
| 
 | ||||
|         else: | ||||
|             # epoch_dt = pendulum.from_timestamp(record.get('time')) | ||||
|             dt = pendulum.parse(date) | ||||
| 
 | ||||
|         # special handling of symbol extraction from | ||||
|         # flex records using some ad-hoc schema parsing. | ||||
|  | @ -1181,58 +1183,69 @@ def norm_trade_records( | |||
|     return {r.tid: r for r in records} | ||||
| 
 | ||||
| 
 | ||||
| def parse_flex_dt( | ||||
|     record: str, | ||||
| ) -> pendulum.datetime: | ||||
|     date, ts = record.split(';') | ||||
|     dt = pendulum.parse(date) | ||||
|     ts = f'{ts[:2]}:{ts[2:4]}:{ts[4:]}' | ||||
|     tsdt = pendulum.parse(ts) | ||||
|     return dt.set(hour=tsdt.hour, minute=tsdt.minute, second=tsdt.second) | ||||
| 
 | ||||
| 
 | ||||
| def api_trades_to_ledger_entries( | ||||
| def trades_to_ledger_entries( | ||||
|     accounts: bidict, | ||||
|     trade_entries: list[object], | ||||
|     source_type: str = 'api', | ||||
| 
 | ||||
| ) -> dict: | ||||
|     ''' | ||||
|     Convert API execution objects entry objects into ``dict`` form, | ||||
|     pretty much straight up without modification except add | ||||
|     a `pydatetime` field from the parsed timestamp. | ||||
|     Convert either of API execution objects or flex report | ||||
|     entry objects into ``dict`` form, pretty much straight up | ||||
|     without modification. | ||||
| 
 | ||||
|     ''' | ||||
|     trades_by_account = {} | ||||
| 
 | ||||
|     for t in trade_entries: | ||||
|         # NOTE: example of schema we pull from the API client. | ||||
|         # { | ||||
|         #     'commissionReport': CommissionReport(... | ||||
|         #     'contract': {... | ||||
|         #     'execution': Execution(... | ||||
|         #     'time': 1654801166.0 | ||||
|         # } | ||||
|         if source_type == 'flex': | ||||
|             entry = t.__dict__ | ||||
| 
 | ||||
|         # flatten all sub-dicts and values into one top level entry. | ||||
|         entry = {} | ||||
|         for section, val in t.items(): | ||||
|             match section: | ||||
|                 case 'contract' | 'execution' | 'commissionReport': | ||||
|                     # sub-dict cases | ||||
|                     entry.update(val) | ||||
|             # XXX: LOL apparently ``toml`` has a bug | ||||
|             # where a section key error will show up in the write | ||||
|             # if you leave a table key as an `int`? So i guess | ||||
|             # cast to strs for all keys.. | ||||
| 
 | ||||
|                 case 'time': | ||||
|                     # ib has wack ns timestamps, or is that us? | ||||
|                     continue | ||||
|             # oddly for some so-called "BookTrade" entries | ||||
|             # this field seems to be blank, no cuckin clue. | ||||
|             # trade['ibExecID'] | ||||
|             tid = str(entry.get('ibExecID') or entry['tradeID']) | ||||
|             # date = str(entry['tradeDate']) | ||||
| 
 | ||||
|                 case _: | ||||
|                     entry[section] = val | ||||
|             # XXX: is it going to cause problems if a account name | ||||
|             # get's lost? The user should be able to find it based | ||||
|             # on the actual exec history right? | ||||
|             acctid = accounts[str(entry['accountId'])] | ||||
| 
 | ||||
|         tid = str(entry['execId']) | ||||
|         dt = pendulum.from_timestamp(entry['time']) | ||||
|         # TODO: why isn't this showing seconds in the str? | ||||
|         entry['pydatetime'] = dt | ||||
|         entry['datetime'] = str(dt) | ||||
|         acctid = accounts[entry['acctNumber']] | ||||
|         elif source_type == 'api': | ||||
|             # NOTE: example of schema we pull from the API client. | ||||
|             # { | ||||
|             #     'commissionReport': CommissionReport(... | ||||
|             #     'contract': {... | ||||
|             #     'execution': Execution(... | ||||
|             #     'time': 1654801166.0 | ||||
|             # } | ||||
| 
 | ||||
|             # flatten all sub-dicts and values into one top level entry. | ||||
|             entry = {} | ||||
|             for section, val in t.items(): | ||||
|                 match section: | ||||
|                     case 'contract' | 'execution' | 'commissionReport': | ||||
|                         # sub-dict cases | ||||
|                         entry.update(val) | ||||
| 
 | ||||
|                     case 'time': | ||||
|                         # ib has wack ns timestamps, or is that us? | ||||
|                         continue | ||||
| 
 | ||||
|                     case _: | ||||
|                         entry[section] = val | ||||
| 
 | ||||
|             tid = str(entry['execId']) | ||||
|             dt = pendulum.from_timestamp(entry['time']) | ||||
|             # TODO: why isn't this showing seconds in the str? | ||||
|             entry['date'] = str(dt) | ||||
|             acctid = accounts[entry['acctNumber']] | ||||
| 
 | ||||
|         if not tid: | ||||
|             # this is likely some kind of internal adjustment | ||||
|  | @ -1250,73 +1263,6 @@ def api_trades_to_ledger_entries( | |||
|             acctid, {} | ||||
|         )[tid] = entry | ||||
| 
 | ||||
|     # sort entries in output by python based datetime | ||||
|     for acctid in trades_by_account: | ||||
|         trades_by_account[acctid] = dict(sorted( | ||||
|             trades_by_account[acctid].items(), | ||||
|             key=lambda entry: entry[1].pop('pydatetime'), | ||||
|         )) | ||||
| 
 | ||||
|     return trades_by_account | ||||
| 
 | ||||
| 
 | ||||
| def flex_records_to_ledger_entries( | ||||
|     accounts: bidict, | ||||
|     trade_entries: list[object], | ||||
| 
 | ||||
| ) -> dict: | ||||
|     ''' | ||||
|     Convert flex report entry objects into ``dict`` form, pretty much | ||||
|     straight up without modification except add a `pydatetime` field | ||||
|     from the parsed timestamp. | ||||
| 
 | ||||
|     ''' | ||||
|     trades_by_account = {} | ||||
|     for t in trade_entries: | ||||
|         entry = t.__dict__ | ||||
| 
 | ||||
|         # XXX: LOL apparently ``toml`` has a bug | ||||
|         # where a section key error will show up in the write | ||||
|         # if you leave a table key as an `int`? So i guess | ||||
|         # cast to strs for all keys.. | ||||
| 
 | ||||
|         # oddly for some so-called "BookTrade" entries | ||||
|         # this field seems to be blank, no cuckin clue. | ||||
|         # trade['ibExecID'] | ||||
|         tid = str(entry.get('ibExecID') or entry['tradeID']) | ||||
|         # date = str(entry['tradeDate']) | ||||
| 
 | ||||
|         # XXX: is it going to cause problems if a account name | ||||
|         # get's lost? The user should be able to find it based | ||||
|         # on the actual exec history right? | ||||
|         acctid = accounts[str(entry['accountId'])] | ||||
| 
 | ||||
|         # probably a flex record with a wonky non-std timestamp.. | ||||
|         dt = entry['pydatetime'] = parse_flex_dt(entry['dateTime']) | ||||
|         entry['datetime'] = str(dt) | ||||
| 
 | ||||
|         if not tid: | ||||
|             # this is likely some kind of internal adjustment | ||||
|             # transaction, likely one of the following: | ||||
|             # - an expiry event that will show a "book trade" indicating | ||||
|             #   some adjustment to cash balances: zeroing or itm settle. | ||||
|             # - a manual cash balance position adjustment likely done by | ||||
|             #   the user from the accounts window in TWS where they can | ||||
|             #   manually set the avg price and size: | ||||
|             #   https://api.ibkr.com/lib/cstools/faq/web1/index.html#/tag/DTWS_ADJ_AVG_COST | ||||
|             log.warning(f'Skipping ID-less ledger entry:\n{pformat(entry)}') | ||||
|             continue | ||||
| 
 | ||||
|         trades_by_account.setdefault( | ||||
|             acctid, {} | ||||
|         )[tid] = entry | ||||
| 
 | ||||
|     for acctid in trades_by_account: | ||||
|         trades_by_account[acctid] = dict(sorted( | ||||
|             trades_by_account[acctid].items(), | ||||
|             key=lambda entry: entry[1]['pydatetime'], | ||||
|         )) | ||||
| 
 | ||||
|     return trades_by_account | ||||
| 
 | ||||
| 
 | ||||
|  | @ -1363,16 +1309,15 @@ def load_flex_trades( | |||
|     ln = len(trade_entries) | ||||
|     log.info(f'Loaded {ln} trades from flex query') | ||||
| 
 | ||||
|     trades_by_account = flex_records_to_ledger_entries( | ||||
|         conf['accounts'].inverse,  # reverse map to user account names | ||||
|     trades_by_account = trades_to_ledger_entries( | ||||
|         # get reverse map to user account names | ||||
|         conf['accounts'].inverse, | ||||
|         trade_entries, | ||||
|         source_type='flex', | ||||
|     ) | ||||
| 
 | ||||
|     ledger_dict: Optional[dict] = None | ||||
| 
 | ||||
|     for acctid in trades_by_account: | ||||
|         trades_by_id = trades_by_account[acctid] | ||||
| 
 | ||||
|         with open_trade_ledger('ib', acctid) as ledger_dict: | ||||
|             tid_delta = set(trades_by_id) - set(ledger_dict) | ||||
|             log.info( | ||||
|  | @ -1380,11 +1325,9 @@ def load_flex_trades( | |||
|                 f'{pformat(tid_delta)}' | ||||
|             ) | ||||
|             if tid_delta: | ||||
|                 sorted_delta = dict(sorted( | ||||
|                     {tid: trades_by_id[tid] for tid in tid_delta}.items(), | ||||
|                     key=lambda entry: entry[1].pop('pydatetime'), | ||||
|                 )) | ||||
|                 ledger_dict.update(sorted_delta) | ||||
|                 ledger_dict.update( | ||||
|                     {tid: trades_by_id[tid] for tid in tid_delta} | ||||
|                 ) | ||||
| 
 | ||||
|     return ledger_dict | ||||
| 
 | ||||
|  |  | |||
|  | @ -254,9 +254,6 @@ async def wait_on_data_reset( | |||
|     return False | ||||
| 
 | ||||
| 
 | ||||
| _data_resetter_task: trio.Task | None = None | ||||
| 
 | ||||
| 
 | ||||
| async def get_bars( | ||||
| 
 | ||||
|     proxy: MethodProxy, | ||||
|  | @ -267,7 +264,7 @@ async def get_bars( | |||
|     end_dt: str = '', | ||||
| 
 | ||||
|     # TODO: make this more dynamic based on measured frame rx latency.. | ||||
|     timeout: float = 3,  # how long before we trigger a feed reset | ||||
|     timeout: float = 1.5,  # how long before we trigger a feed reset | ||||
| 
 | ||||
|     task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, | ||||
| 
 | ||||
|  | @ -277,15 +274,13 @@ async def get_bars( | |||
|     a ``MethoProxy``. | ||||
| 
 | ||||
|     ''' | ||||
|     global _data_resetter_task | ||||
| 
 | ||||
|     data_cs: trio.CancelScope | None = None | ||||
|     result: tuple[ | ||||
|     data_cs: Optional[trio.CancelScope] = None | ||||
|     result: Optional[tuple[ | ||||
|         ibis.objects.BarDataList, | ||||
|         np.ndarray, | ||||
|         datetime, | ||||
|         datetime, | ||||
|     ] | None = None | ||||
|     ]] = None | ||||
|     result_ready = trio.Event() | ||||
| 
 | ||||
|     async def query(): | ||||
|  | @ -312,7 +307,7 @@ async def get_bars( | |||
|                     log.warning( | ||||
|                         f'History is blank for {dt_duration} from {end_dt}' | ||||
|                     ) | ||||
|                     end_dt -= dt_duration | ||||
|                     end_dt = end_dt.subtract(dt_duration) | ||||
|                     continue | ||||
| 
 | ||||
|                 if bars_array is None: | ||||
|  | @ -405,10 +400,6 @@ async def get_bars( | |||
|                 else: | ||||
|                     raise | ||||
| 
 | ||||
|     # TODO: make this global across all history task/requests | ||||
|     # such that simultaneous symbol queries don't try data resettingn | ||||
|     # too fast.. | ||||
|     unset_resetter: bool = False | ||||
|     async with trio.open_nursery() as nurse: | ||||
| 
 | ||||
|         # start history request that we allow | ||||
|  | @ -423,14 +414,6 @@ async def get_bars( | |||
|                 await result_ready.wait() | ||||
|                 break | ||||
| 
 | ||||
|             if _data_resetter_task: | ||||
|                 # don't double invoke the reset hack if another | ||||
|                 # requester task already has it covered. | ||||
|                 continue | ||||
|             else: | ||||
|                 _data_resetter_task = trio.lowlevel.current_task() | ||||
|                 unset_resetter = True | ||||
| 
 | ||||
|             # spawn new data reset task | ||||
|             data_cs, reset_done = await nurse.start( | ||||
|                 partial( | ||||
|  | @ -442,7 +425,6 @@ async def get_bars( | |||
|             # sync wait on reset to complete | ||||
|             await reset_done.wait() | ||||
| 
 | ||||
|     _data_resetter_task = None if unset_resetter else _data_resetter_task | ||||
|     return result, data_cs is not None | ||||
| 
 | ||||
| 
 | ||||
|  | @ -501,9 +483,7 @@ async def _setup_quote_stream( | |||
| 
 | ||||
|     to_trio.send_nowait(None) | ||||
| 
 | ||||
|     async with load_aio_clients( | ||||
|         disconnect_on_exit=False, | ||||
|     ) as accts2clients: | ||||
|     async with load_aio_clients() as accts2clients: | ||||
|         caccount_name, client = get_preferred_data_client(accts2clients) | ||||
|         contract = contract or (await client.find_contract(symbol)) | ||||
|         ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) | ||||
|  | @ -583,8 +563,7 @@ async def open_aio_quote_stream( | |||
|     from_aio = _quote_streams.get(symbol) | ||||
|     if from_aio: | ||||
| 
 | ||||
|         # if we already have a cached feed deliver a rx side clone | ||||
|         # to consumer | ||||
|         # if we already have a cached feed deliver a rx side clone to consumer | ||||
|         async with broadcast_receiver( | ||||
|             from_aio, | ||||
|             2**6, | ||||
|  | @ -775,97 +754,67 @@ async def stream_quotes( | |||
|             await trio.sleep_forever() | ||||
|             return  # we never expect feed to come up? | ||||
| 
 | ||||
|         cs: Optional[trio.CancelScope] = None | ||||
|         startup: bool = True | ||||
|         while ( | ||||
|             startup | ||||
|             or cs.cancel_called | ||||
|         ): | ||||
|             with trio.CancelScope() as cs: | ||||
|                 async with ( | ||||
|                     trio.open_nursery() as nurse, | ||||
|                     open_aio_quote_stream( | ||||
|                         symbol=sym, | ||||
|                         contract=con, | ||||
|                     ) as stream, | ||||
|                 ): | ||||
|                     # ugh, clear ticks since we've consumed them | ||||
|                     # (ahem, ib_insync is stateful trash) | ||||
|                     first_ticker.ticks = [] | ||||
|         async with open_aio_quote_stream( | ||||
|             symbol=sym, | ||||
|             contract=con, | ||||
|         ) as stream: | ||||
| 
 | ||||
|                     # only on first entry at feed boot up | ||||
|                     if startup: | ||||
|                         startup = False | ||||
|                         task_status.started((init_msgs, first_quote)) | ||||
|             # ugh, clear ticks since we've consumed them | ||||
|             # (ahem, ib_insync is stateful trash) | ||||
|             first_ticker.ticks = [] | ||||
| 
 | ||||
|                     # start a stream restarter task which monitors the | ||||
|                     # data feed event. | ||||
|                     async def reset_on_feed(): | ||||
|             task_status.started((init_msgs, first_quote)) | ||||
| 
 | ||||
|                         # TODO: this seems to be surpressed from the | ||||
|                         # traceback in ``tractor``? | ||||
|                         # assert 0 | ||||
|             async with aclosing(stream): | ||||
|                 if syminfo.get('no_vlm', False): | ||||
| 
 | ||||
|                         rt_ev = proxy.status_event( | ||||
|                             'Market data farm connection is OK:usfarm' | ||||
|                         ) | ||||
|                         await rt_ev.wait() | ||||
|                         cs.cancel()  # cancel called should now be set | ||||
|                     # generally speaking these feeds don't | ||||
|                     # include vlm data. | ||||
|                     atype = syminfo['asset_type'] | ||||
|                     log.info( | ||||
|                         f'Non-vlm asset {sym}@{atype}, skipping quote poll...' | ||||
|                     ) | ||||
| 
 | ||||
|                     nurse.start_soon(reset_on_feed) | ||||
| 
 | ||||
|                     async with aclosing(stream): | ||||
|                         if syminfo.get('no_vlm', False): | ||||
| 
 | ||||
|                             # generally speaking these feeds don't | ||||
|                             # include vlm data. | ||||
|                             atype = syminfo['asset_type'] | ||||
|                             log.info( | ||||
|                                 f'No-vlm {sym}@{atype}, skipping quote poll' | ||||
|                             ) | ||||
|                 else: | ||||
|                     # wait for real volume on feed (trading might be closed) | ||||
|                     while True: | ||||
|                         ticker = await stream.receive() | ||||
| 
 | ||||
|                         # for a real volume contract we rait for the first | ||||
|                         # "real" trade to take place | ||||
|                         if ( | ||||
|                             # not calc_price | ||||
|                             # and not ticker.rtTime | ||||
|                             not ticker.rtTime | ||||
|                         ): | ||||
|                             # spin consuming tickers until we get a real | ||||
|                             # market datum | ||||
|                             log.debug(f"New unsent ticker: {ticker}") | ||||
|                             continue | ||||
|                         else: | ||||
|                             # wait for real volume on feed (trading might be | ||||
|                             # closed) | ||||
|                             while True: | ||||
|                                 ticker = await stream.receive() | ||||
| 
 | ||||
|                                 # for a real volume contract we rait for | ||||
|                                 # the first "real" trade to take place | ||||
|                                 if ( | ||||
|                                     # not calc_price | ||||
|                                     # and not ticker.rtTime | ||||
|                                     not ticker.rtTime | ||||
|                                 ): | ||||
|                                     # spin consuming tickers until we | ||||
|                                     # get a real market datum | ||||
|                                     log.debug(f"New unsent ticker: {ticker}") | ||||
|                                     continue | ||||
|                                 else: | ||||
|                                     log.debug("Received first volume tick") | ||||
|                                     # ugh, clear ticks since we've | ||||
|                                     # consumed them (ahem, ib_insync is | ||||
|                                     # truly stateful trash) | ||||
|                                     ticker.ticks = [] | ||||
| 
 | ||||
|                                     # XXX: this works because we don't use | ||||
|                                     # ``aclosing()`` above? | ||||
|                                     break | ||||
| 
 | ||||
|                             quote = normalize(ticker) | ||||
|                             log.debug(f"First ticker received {quote}") | ||||
| 
 | ||||
|                         # tell caller quotes are now coming in live | ||||
|                         feed_is_live.set() | ||||
| 
 | ||||
|                         # last = time.time() | ||||
|                         async for ticker in stream: | ||||
|                             quote = normalize(ticker) | ||||
|                             await send_chan.send({quote['fqsn']: quote}) | ||||
| 
 | ||||
|                             log.debug("Received first real volume tick") | ||||
|                             # ugh, clear ticks since we've consumed them | ||||
|                             # (ahem, ib_insync is truly stateful trash) | ||||
|                             ticker.ticks = [] | ||||
|                             # last = time.time() | ||||
| 
 | ||||
|                             # XXX: this works because we don't use | ||||
|                             # ``aclosing()`` above? | ||||
|                             break | ||||
| 
 | ||||
|                     quote = normalize(ticker) | ||||
|                     log.debug(f"First ticker received {quote}") | ||||
| 
 | ||||
|                 # tell caller quotes are now coming in live | ||||
|                 feed_is_live.set() | ||||
| 
 | ||||
|                 # last = time.time() | ||||
|                 async for ticker in stream: | ||||
|                     quote = normalize(ticker) | ||||
|                     await send_chan.send({quote['fqsn']: quote}) | ||||
| 
 | ||||
|                     # ugh, clear ticks since we've consumed them | ||||
|                     ticker.ticks = [] | ||||
|                     # last = time.time() | ||||
| 
 | ||||
| 
 | ||||
| async def data_reset_hack( | ||||
|  | @ -973,14 +922,7 @@ async def open_symbol_search( | |||
|                     except trio.WouldBlock: | ||||
|                         pass | ||||
| 
 | ||||
|                 if ( | ||||
|                     not pattern | ||||
|                     or pattern.isspace() | ||||
| 
 | ||||
|                     # XXX: not sure if this is a bad assumption but it | ||||
|                     # seems to make search snappier? | ||||
|                     or len(pattern) < 1 | ||||
|                 ): | ||||
|                 if not pattern or pattern.isspace(): | ||||
|                     log.warning('empty pattern received, skipping..') | ||||
| 
 | ||||
|                     # TODO: *BUG* if nothing is returned here the client | ||||
|  |  | |||
|  | @ -138,26 +138,25 @@ def cli(ctx, brokers, loglevel, tl, configdir): | |||
| @click.pass_obj | ||||
| def services(config, tl, names): | ||||
| 
 | ||||
|     from .._daemon import open_piker_runtime | ||||
| 
 | ||||
|     async def list_services(): | ||||
|         async with ( | ||||
|             open_piker_runtime( | ||||
|                 name='service_query', | ||||
|                 loglevel=config['loglevel'] if tl else None, | ||||
|             ), | ||||
|             tractor.get_arbiter( | ||||
|                 *_tractor_kwargs['arbiter_addr'] | ||||
|             ) as portal | ||||
|         ): | ||||
| 
 | ||||
|         async with tractor.get_arbiter( | ||||
|             *_tractor_kwargs['arbiter_addr'] | ||||
|         ) as portal: | ||||
|             registry = await portal.run_from_ns('self', 'get_registry') | ||||
|             json_d = {} | ||||
|             for key, socket in registry.items(): | ||||
|                 # name, uuid = uid | ||||
|                 host, port = socket | ||||
|                 json_d[key] = f'{host}:{port}' | ||||
|             click.echo(f"{colorize_json(json_d)}") | ||||
| 
 | ||||
|     trio.run(list_services) | ||||
|     tractor.run( | ||||
|         list_services, | ||||
|         name='service_query', | ||||
|         loglevel=config['loglevel'] if tl else None, | ||||
|         arbiter_addr=_tractor_kwargs['arbiter_addr'], | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def _load_clis() -> None: | ||||
|  |  | |||
|  | @ -333,7 +333,7 @@ async def start_backfill( | |||
|                 # do a decently sized backfill and load it into storage. | ||||
|                 periods = { | ||||
|                     1: {'days': 6}, | ||||
|                     60: {'years': 6}, | ||||
|                     60: {'years': 10}, | ||||
|                 } | ||||
| 
 | ||||
|             kwargs = periods[step_size_s] | ||||
|  | @ -348,45 +348,36 @@ async def start_backfill( | |||
|         # last retrieved start dt to the next request as | ||||
|         # it's end dt. | ||||
|         starts: set[datetime] = set() | ||||
| 
 | ||||
|         while start_dt > last_tsdb_dt: | ||||
| 
 | ||||
|             print(f"QUERY end_dt={start_dt}") | ||||
|             try: | ||||
|                 log.info( | ||||
|                     f'Requesting {step_size_s}s frame ending in {start_dt}' | ||||
|                 ) | ||||
|                 array, next_start_dt, end_dt = await hist( | ||||
|                 array, start_dt, end_dt = await hist( | ||||
|                     timeframe, | ||||
|                     end_dt=start_dt, | ||||
|                 ) | ||||
| 
 | ||||
|                 if next_start_dt in starts: | ||||
|                     start_dt = min(starts) | ||||
|                     print("SKIPPING DUPLICATE FRAME @ {next_start_dt}") | ||||
|                     continue | ||||
| 
 | ||||
|                 # only update new start point if new | ||||
|                 start_dt = next_start_dt | ||||
|                 starts.add(start_dt) | ||||
| 
 | ||||
|                 assert array['time'][0] == start_dt.timestamp() | ||||
| 
 | ||||
|             except NoData: | ||||
|                 # XXX: unhandled history gap (shouldn't happen?) | ||||
|                 log.warning( | ||||
|                     f'NO DATA for {frame_size_s}s frame @ {start_dt} ?!?' | ||||
|                 ) | ||||
|                 await tractor.breakpoint() | ||||
|                 return None  # discard signal | ||||
| 
 | ||||
|             except DataUnavailable:  # as duerr: | ||||
|                 # broker is being a bish and we can't pull any more.. | ||||
|                 log.warning( | ||||
|                     f'NO-MORE-DATA: backend {mod.name} halted history!?' | ||||
|                 ) | ||||
|             except DataUnavailable as duerr: | ||||
|                 # broker is being a bish and we can't pull | ||||
|                 # any more.. | ||||
|                 log.warning('backend halted on data deliver !?!?') | ||||
| 
 | ||||
|                 # ugh, what's a better way? | ||||
|                 # TODO: fwiw, we probably want a way to signal a throttle | ||||
|                 # condition (eg. with ib) so that we can halt the | ||||
|                 # request loop until the condition is resolved? | ||||
|                 return | ||||
|                 return duerr | ||||
| 
 | ||||
|             diff = end_dt - start_dt | ||||
|             frame_time_diff_s = diff.seconds | ||||
|  | @ -403,6 +394,22 @@ async def start_backfill( | |||
|                     f'{diff} ~= {frame_time_diff_s} seconds' | ||||
|                 ) | ||||
| 
 | ||||
|             array, _start_dt, end_dt = await hist( | ||||
|                 timeframe, | ||||
|                 end_dt=start_dt, | ||||
|             ) | ||||
| 
 | ||||
|             if ( | ||||
|                 _start_dt in starts | ||||
|             ): | ||||
|                 print("SKIPPING DUPLICATE FRAME @ {_start_dt}") | ||||
|                 start_dt = min(starts) | ||||
|                 continue | ||||
| 
 | ||||
|             # only update new start point if new | ||||
|             start_dt = _start_dt | ||||
|             starts.add(start_dt) | ||||
| 
 | ||||
|             to_push = diff_history( | ||||
|                 array, | ||||
|                 start_dt, | ||||
|  | @ -491,10 +498,10 @@ async def manage_history( | |||
|         readonly=False, | ||||
|     ) | ||||
|     # TODO: history validation | ||||
|     # if not opened: | ||||
|     #     raise RuntimeError( | ||||
|     #         "Persistent shm for sym was already open?!" | ||||
|     #     ) | ||||
|     if not opened: | ||||
|         raise RuntimeError( | ||||
|             "Persistent shm for sym was already open?!" | ||||
|         ) | ||||
| 
 | ||||
|     rt_shm, opened = maybe_open_shm_array( | ||||
|         key=f'{fqsn}_rt', | ||||
|  | @ -506,10 +513,10 @@ async def manage_history( | |||
|         readonly=False, | ||||
|         size=3*_secs_in_day, | ||||
|     ) | ||||
|     # if not opened: | ||||
|     #     raise RuntimeError( | ||||
|     #         "Persistent shm for sym was already open?!" | ||||
|     #     ) | ||||
|     if not opened: | ||||
|         raise RuntimeError( | ||||
|             "Persistent shm for sym was already open?!" | ||||
|         ) | ||||
| 
 | ||||
|     log.info('Scanning for existing `marketstored`') | ||||
| 
 | ||||
|  |  | |||
							
								
								
									
										71
									
								
								piker/pp.py
								
								
								
								
							
							
						
						
									
										71
									
								
								piker/pp.py
								
								
								
								
							|  | @ -20,7 +20,6 @@ that doesn't try to cuk most humans who prefer to not lose their moneys.. | |||
| (looking at you `ib` and dirt-bird friends) | ||||
| 
 | ||||
| ''' | ||||
| from __future__ import annotations | ||||
| from contextlib import contextmanager as cm | ||||
| from pprint import pformat | ||||
| import os | ||||
|  | @ -139,31 +138,13 @@ class Position(Struct): | |||
| 
 | ||||
|     # ordered record of known constituent trade messages | ||||
|     clears: dict[ | ||||
|         str | int,  # trade id | ||||
|         Union[str, int, Status],  # trade id | ||||
|         dict[str, Any],  # transaction history summaries | ||||
|     ] = {} | ||||
|     first_clear_dt: Optional[datetime] = None | ||||
| 
 | ||||
|     expiry: Optional[datetime] = None | ||||
| 
 | ||||
|     # @property | ||||
|     # def clears(self) -> dict[ | ||||
|     #     Union[str, int, Status],  # trade id | ||||
|     #     dict[str, Any],  # transaction history summaries | ||||
|     # ]: | ||||
|     #     ''' | ||||
|     #     Datetime sorted reference to internal clears table. | ||||
| 
 | ||||
|     #     ''' | ||||
|     #     # self._clears = {} | ||||
|     #     self._clears = dict(sorted( | ||||
|     #         self._clears.items(), | ||||
|     #         key=lambda entry: entry[1]['dt'], | ||||
|     #     )) | ||||
|     #         # self._clears[k] = v | ||||
| 
 | ||||
|     #     return self._clears | ||||
| 
 | ||||
|     def to_dict(self) -> dict: | ||||
|         return { | ||||
|             f: getattr(self, f) | ||||
|  | @ -238,10 +219,6 @@ class Position(Struct): | |||
| 
 | ||||
|         ''' | ||||
|         clears = list(self.clears.values()) | ||||
|         if not clears: | ||||
|             log.warning(f'No clears table for {self.symbol}!?') | ||||
|             return | ||||
| 
 | ||||
|         self.first_clear_dt = min(list(entry['dt'] for entry in clears)) | ||||
|         last_clear = clears[-1] | ||||
| 
 | ||||
|  | @ -646,7 +623,6 @@ class PpTable(Struct): | |||
| 
 | ||||
|     def to_toml( | ||||
|         self, | ||||
|         min_clears: bool = True, | ||||
|     ) -> dict[str, Any]: | ||||
| 
 | ||||
|         active, closed = self.dump_active() | ||||
|  | @ -659,9 +635,7 @@ class PpTable(Struct): | |||
| 
 | ||||
|             # keep the minimal amount of clears that make up this | ||||
|             # position since the last net-zero state. | ||||
|             if min_clears: | ||||
|                 pos.minimize_clears() | ||||
| 
 | ||||
|             pos.minimize_clears() | ||||
|             pos.ensure_state() | ||||
| 
 | ||||
|             # serialize to pre-toml form | ||||
|  | @ -708,8 +682,6 @@ def load_pps_from_ledger( | |||
|     brokername: str, | ||||
|     acctname: str, | ||||
| 
 | ||||
|     table: Optional[PpTable] = None, | ||||
| 
 | ||||
|     # post normalization filter on ledger entries to be processed | ||||
|     filter_by: Optional[list[dict]] = None, | ||||
| 
 | ||||
|  | @ -726,6 +698,7 @@ def load_pps_from_ledger( | |||
|     ''' | ||||
|     with ( | ||||
|         open_trade_ledger(brokername, acctname) as ledger, | ||||
|         open_pps(brokername, acctname) as table, | ||||
|     ): | ||||
|         if not ledger: | ||||
|             # null case, no ledger file with content | ||||
|  | @ -743,11 +716,7 @@ def load_pps_from_ledger( | |||
|         else: | ||||
|             records = src_records | ||||
| 
 | ||||
|         if table is None: | ||||
|             with open_pps(brokername, acctname) as table: | ||||
|                 updated = table.update_from_trans(records) | ||||
|         else: | ||||
|             updated = table.update_from_trans(records) | ||||
|         updated = table.update_from_trans(records) | ||||
| 
 | ||||
|     return records, updated | ||||
| 
 | ||||
|  | @ -917,27 +886,15 @@ def open_pps( | |||
|         conf=conf, | ||||
|     ) | ||||
| 
 | ||||
|     # first pass populate all missing clears record tables | ||||
|     # for fqsn, entry in pps.items(): | ||||
|     #     # convert clears sub-tables (only in this form | ||||
|     #     # for toml re-presentation) back into a master table. | ||||
|     #     clears_list = entry.get('clears', []) | ||||
| 
 | ||||
|     #     # attempt to reload from ledger | ||||
|     #     if not clears_list: | ||||
|     #         trans, pos = load_pps_from_ledger( | ||||
|     #             brokername, | ||||
|     #             acctid, | ||||
|     #             filter_by=[entry['bsuid']], | ||||
|     #             table=table, | ||||
|     #         ) | ||||
|     #         # breakpoint() | ||||
| 
 | ||||
|     # unmarshal/load ``pps.toml`` config entries into object form | ||||
|     # and update `PpTable` obj entries. | ||||
|     for fqsn, entry in pps.items(): | ||||
|         bsuid = entry['bsuid'] | ||||
| 
 | ||||
|         # convert clears sub-tables (only in this form | ||||
|         # for toml re-presentation) back into a master table. | ||||
|         clears_list = entry['clears'] | ||||
| 
 | ||||
|         # index clears entries in "object" form by tid in a top | ||||
|         # level dict instead of a list (as is presented in our | ||||
|         # ``pps.toml``). | ||||
|  | @ -949,18 +906,6 @@ def open_pps( | |||
|         # processing of new clear events. | ||||
|         trans: list[Transaction] = [] | ||||
| 
 | ||||
|         # convert clears sub-tables (only in this form | ||||
|         # for toml re-presentation) back into a master table. | ||||
|         clears_list = entry['clears'] | ||||
| 
 | ||||
|         # # attempt to reload from ledger | ||||
|         # if not clears_list: | ||||
|         #     trans, pos = load_pps_from_ledger( | ||||
|         #         brokername, | ||||
|         #         acctid, | ||||
|         #         table=table, | ||||
|         #     ) | ||||
| 
 | ||||
|         for clears_table in clears_list: | ||||
|             tid = clears_table.pop('tid') | ||||
|             dtstr = clears_table['dt'] | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue