Comment out "unknown msg" case for now
							parent
							
								
									72889b4d1f
								
							
						
					
					
						commit
						e28c1748fc
					
				|  | @ -192,10 +192,11 @@ async def clear_dark_triggers( | ||||||
|                         ttype not in tf or |                         ttype not in tf or | ||||||
|                         not pred(price) |                         not pred(price) | ||||||
|                     ): |                     ): | ||||||
|                         log.debug( |                         # log.runtime( | ||||||
|                             f'skipping quote for {sym} ' |                         #     f'skipping quote for {sym} ' | ||||||
|                             f'{pred}, {ttype} not in {tf}?, {pred(price)}' |                         #     f'{pred} -> {pred(price)}\n' | ||||||
|                         ) |                         #     f'{ttype} not in {tf}?' | ||||||
|  |                         # ) | ||||||
|                         # majority of iterations will be non-matches |                         # majority of iterations will be non-matches | ||||||
|                         continue |                         continue | ||||||
| 
 | 
 | ||||||
|  | @ -731,46 +732,47 @@ async def translate_and_relay_brokerd_events( | ||||||
|                 log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') |                 log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') | ||||||
| 
 | 
 | ||||||
|             # unknown valid message case? |             # unknown valid message case? | ||||||
|             case { |             # case { | ||||||
|                 'name': name, |             #     'name': name, | ||||||
|                 'symbol': sym, |             #     'symbol': sym, | ||||||
|                 'reqid': reqid,  # brokerd generated order-request id |             #     'reqid': reqid,  # brokerd generated order-request id | ||||||
|                 # 'oid': oid,  # ems order-dialog id |             #     # 'oid': oid,  # ems order-dialog id | ||||||
|                 'broker_details': details, |             #     'broker_details': details, | ||||||
| 
 | 
 | ||||||
|             } if ( |             # } if ( | ||||||
|                 book._ems2brokerd_ids.inverse.get(reqid) is None |             #     book._ems2brokerd_ids.inverse.get(reqid) is None | ||||||
|             ): |             # ): | ||||||
|                 # TODO: pretty sure we can drop this now? |             #     # TODO: pretty sure we can drop this now? | ||||||
|                 # XXX: paper clearing special cases |  | ||||||
|                 # paper engine race case: ``Client.submit_limit()`` hasn't |  | ||||||
|                 # returned yet and provided an output reqid to register |  | ||||||
|                 # locally, so we need to retreive the oid that was already |  | ||||||
|                 # packed at submission since we already know it ahead of |  | ||||||
|                 # time |  | ||||||
|                 paper = details.get('paper_info') |  | ||||||
|                 ext = details.get('external') |  | ||||||
| 
 | 
 | ||||||
|                 if paper: |             #     # XXX: paper clearing special cases | ||||||
|                     # paperboi keeps the ems id up front |             #     # paper engine race case: ``Client.submit_limit()`` hasn't | ||||||
|                     oid = paper['oid'] |             #     # returned yet and provided an output reqid to register | ||||||
|  |             #     # locally, so we need to retreive the oid that was already | ||||||
|  |             #     # packed at submission since we already know it ahead of | ||||||
|  |             #     # time | ||||||
|  |             #     paper = details.get('paper_info') | ||||||
|  |             #     ext = details.get('external') | ||||||
| 
 | 
 | ||||||
|                 elif ext: |             #     if paper: | ||||||
|                     # may be an order msg specified as "external" to the |             #         # paperboi keeps the ems id up front | ||||||
|                     # piker ems flow (i.e. generated by some other |             #         oid = paper['oid'] | ||||||
|                     # external broker backend client (like tws for ib) |  | ||||||
|                     log.error(f"External trade event {name}@{ext}") |  | ||||||
| 
 | 
 | ||||||
|                 else: |             #     elif ext: | ||||||
|                     # something is out of order, we don't have an oid for |             #         # may be an order msg specified as "external" to the | ||||||
|                     # this broker-side message. |             #         # piker ems flow (i.e. generated by some other | ||||||
|                     log.error( |             #         # external broker backend client (like tws for ib) | ||||||
|                         f'Unknown oid: {oid} for msg {name}:\n' |             #         log.error(f"External trade event {name}@{ext}") | ||||||
|                         f'{pformat(brokerd_msg)}\n' |  | ||||||
|                         'Unable to relay message to client side!?' |  | ||||||
|                     ) |  | ||||||
| 
 | 
 | ||||||
|                 continue |             #     else: | ||||||
|  |             #         # something is out of order, we don't have an oid for | ||||||
|  |             #         # this broker-side message. | ||||||
|  |             #         log.error( | ||||||
|  |             #             f'Unknown oid: {oid} for msg {name}:\n' | ||||||
|  |             #             f'{pformat(brokerd_msg)}\n' | ||||||
|  |             #             'Unable to relay message to client side!?' | ||||||
|  |             #         ) | ||||||
|  | 
 | ||||||
|  |             #     continue | ||||||
| 
 | 
 | ||||||
|             case _: |             case _: | ||||||
|                 raise ValueError(f'Brokerd message {brokerd_msg} is invalid') |                 raise ValueError(f'Brokerd message {brokerd_msg} is invalid') | ||||||
|  | @ -811,11 +813,11 @@ async def translate_and_relay_brokerd_events( | ||||||
| 
 | 
 | ||||||
| async def process_client_order_cmds( | async def process_client_order_cmds( | ||||||
| 
 | 
 | ||||||
|     client_order_stream: tractor.MsgStream,  # noqa |     client_order_stream: tractor.MsgStream, | ||||||
|     brokerd_order_stream: tractor.MsgStream, |     brokerd_order_stream: tractor.MsgStream, | ||||||
| 
 | 
 | ||||||
|     symbol: str, |     symbol: str, | ||||||
|     feed: Feed,  # noqa |     feed: Feed, | ||||||
|     dark_book: _DarkBook, |     dark_book: _DarkBook, | ||||||
|     router: Router, |     router: Router, | ||||||
| 
 | 
 | ||||||
|  | @ -837,7 +839,6 @@ async def process_client_order_cmds( | ||||||
|         live_entry = dark_book._ems_entries.get(oid) |         live_entry = dark_book._ems_entries.get(oid) | ||||||
| 
 | 
 | ||||||
|         match cmd: |         match cmd: | ||||||
| 
 |  | ||||||
|             # existing live-broker order cancel |             # existing live-broker order cancel | ||||||
|             case { |             case { | ||||||
|                 'action': 'cancel', |                 'action': 'cancel', | ||||||
|  | @ -1035,17 +1036,16 @@ async def process_client_order_cmds( | ||||||
| 
 | 
 | ||||||
| @tractor.context | @tractor.context | ||||||
| async def _emsd_main( | async def _emsd_main( | ||||||
| 
 |  | ||||||
|     ctx: tractor.Context, |     ctx: tractor.Context, | ||||||
|     fqsn: str, |     fqsn: str, | ||||||
|     exec_mode: str,  # ('paper', 'live') |     exec_mode: str,  # ('paper', 'live') | ||||||
| 
 |  | ||||||
|     loglevel: str = 'info', |     loglevel: str = 'info', | ||||||
| 
 | 
 | ||||||
| ) -> None: | ) -> None: | ||||||
|     '''EMS (sub)actor entrypoint providing the |     ''' | ||||||
|     execution management (micro)service which conducts broker |     EMS (sub)actor entrypoint providing the execution management | ||||||
|     order clearing control on behalf of clients. |     (micro)service which conducts broker order clearing control on | ||||||
|  |     behalf of clients. | ||||||
| 
 | 
 | ||||||
|     This is the daemon (child) side routine which starts an EMS runtime |     This is the daemon (child) side routine which starts an EMS runtime | ||||||
|     task (one per broker-feed) and and begins streaming back alerts from |     task (one per broker-feed) and and begins streaming back alerts from | ||||||
|  | @ -1089,9 +1089,8 @@ async def _emsd_main( | ||||||
|     # tractor.Context instead of strictly requiring a ctx arg. |     # tractor.Context instead of strictly requiring a ctx arg. | ||||||
|     ems_ctx = ctx |     ems_ctx = ctx | ||||||
| 
 | 
 | ||||||
|     feed: Feed |  | ||||||
| 
 |  | ||||||
|     # spawn one task per broker feed |     # spawn one task per broker feed | ||||||
|  |     feed: Feed | ||||||
|     async with ( |     async with ( | ||||||
|         maybe_open_feed( |         maybe_open_feed( | ||||||
|             [fqsn], |             [fqsn], | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue