Get ib data feed hackzorz workin
ib has a throttle limit for "hft" bars but contained in here is some hackery using ``xdotool`` to reset data farms auto-magically B) This copies the working script into the ib backend mod as a routine and now uses `trio.run_process()` and calls into it from the `get_bars()` history retriever and then waits for "data re-established" events to be received from the client before making more history queries. TL;DR summary of changes: - relay ib's "system status" events (like for data farm statuses) as a new "event" msg that can be processed by registers of `Client.inline_errors()` (though we should probably make a new method for this). - add `MethodProxy.status_event()` which allows a proxy user to register for a particular "system event" (as mentioned above), which puts a `trio.Event` entry in a small table can be set by an relay task if there are any detected waiters. - start a "msg relay task" when opening the method proxy which does the event setting mentioned above in the background. - drop the request error handling around the proxy creation, doesn't seem necessary any more now that we have better error propagation from `asyncio`. - add event waiting logic around the data feed reset hackzorin. - change the order relay task to only log system events for now (though we need to do some better parsing/logic to get tws-external order updates to work again..mkts_backup
							parent
							
								
									65d4c317c6
								
							
						
					
					
						commit
						6f06f646cf
					
				| 
						 | 
					@ -294,7 +294,8 @@ class Client:
 | 
				
			||||||
        bars_kwargs = {'whatToShow': 'TRADES'}
 | 
					        bars_kwargs = {'whatToShow': 'TRADES'}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        global _enters
 | 
					        global _enters
 | 
				
			||||||
        print(f'ENTER BARS {_enters} @ end={end_dt}')
 | 
					        # log.info(f'REQUESTING BARS {_enters} @ end={end_dt}')
 | 
				
			||||||
 | 
					        print(f'REQUESTING BARS {_enters} @ end={end_dt}')
 | 
				
			||||||
        _enters += 1
 | 
					        _enters += 1
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        contract = await self.find_contract(fqsn)
 | 
					        contract = await self.find_contract(fqsn)
 | 
				
			||||||
| 
						 | 
					@ -304,6 +305,7 @@ class Client:
 | 
				
			||||||
        bars = await self.ib.reqHistoricalDataAsync(
 | 
					        bars = await self.ib.reqHistoricalDataAsync(
 | 
				
			||||||
            contract,
 | 
					            contract,
 | 
				
			||||||
            endDateTime=end_dt,
 | 
					            endDateTime=end_dt,
 | 
				
			||||||
 | 
					            formatDate=2,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # time history length values format:
 | 
					            # time history length values format:
 | 
				
			||||||
            # ``durationStr=integer{SPACE}unit (S|D|W|M|Y)``
 | 
					            # ``durationStr=integer{SPACE}unit (S|D|W|M|Y)``
 | 
				
			||||||
| 
						 | 
					@ -738,25 +740,38 @@ class Client:
 | 
				
			||||||
    def inline_errors(
 | 
					    def inline_errors(
 | 
				
			||||||
        self,
 | 
					        self,
 | 
				
			||||||
        to_trio: trio.abc.SendChannel,
 | 
					        to_trio: trio.abc.SendChannel,
 | 
				
			||||||
    ) -> None:
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # connect error msgs
 | 
					    ) -> None:
 | 
				
			||||||
 | 
					        '''
 | 
				
			||||||
 | 
					        Setup error relay to the provided ``trio`` mem chan such that
 | 
				
			||||||
 | 
					        trio tasks can retreive and parse ``asyncio``-side API request
 | 
				
			||||||
 | 
					        errors.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        '''
 | 
				
			||||||
        def push_err(
 | 
					        def push_err(
 | 
				
			||||||
            reqId: int,
 | 
					            reqId: int,
 | 
				
			||||||
            errorCode: int,
 | 
					            errorCode: int,
 | 
				
			||||||
            errorString: str,
 | 
					            errorString: str,
 | 
				
			||||||
            contract: Contract,
 | 
					            contract: Contract,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        ) -> None:
 | 
					        ) -> None:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            log.error(errorString)
 | 
					            log.error(errorString)
 | 
				
			||||||
 | 
					            reason = errorString
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            if reqId == -1:
 | 
				
			||||||
 | 
					                # it's a general event?
 | 
				
			||||||
 | 
					                key = 'event'
 | 
				
			||||||
 | 
					            else:
 | 
				
			||||||
 | 
					                key = 'error'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            try:
 | 
					            try:
 | 
				
			||||||
                to_trio.send_nowait((
 | 
					                to_trio.send_nowait((
 | 
				
			||||||
                    'error',
 | 
					                    key,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    # error "object"
 | 
					                    # error "object"
 | 
				
			||||||
                    {'reqid': reqId,
 | 
					                    {'reqid': reqId,
 | 
				
			||||||
                     'reason': errorString,
 | 
					                     'reason': reason,
 | 
				
			||||||
                     'contract': contract}
 | 
					                     'contract': contract}
 | 
				
			||||||
                ))
 | 
					                ))
 | 
				
			||||||
            except trio.BrokenResourceError:
 | 
					            except trio.BrokenResourceError:
 | 
				
			||||||
| 
						 | 
					@ -1123,9 +1138,11 @@ class MethodProxy:
 | 
				
			||||||
    def __init__(
 | 
					    def __init__(
 | 
				
			||||||
        self,
 | 
					        self,
 | 
				
			||||||
        chan: to_asyncio.LinkedTaskChannel,
 | 
					        chan: to_asyncio.LinkedTaskChannel,
 | 
				
			||||||
 | 
					        event_table: dict[str, trio.Event],
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    ) -> None:
 | 
					    ) -> None:
 | 
				
			||||||
        self.chan = chan
 | 
					        self.chan = chan
 | 
				
			||||||
 | 
					        self.event_table = event_table
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def _run_method(
 | 
					    async def _run_method(
 | 
				
			||||||
        self,
 | 
					        self,
 | 
				
			||||||
| 
						 | 
					@ -1140,18 +1157,43 @@ class MethodProxy:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        '''
 | 
					        '''
 | 
				
			||||||
        chan = self.chan
 | 
					        chan = self.chan
 | 
				
			||||||
        # send through method + ``kwargs: dict`` as pair
 | 
					 | 
				
			||||||
        await chan.send((meth, kwargs))
 | 
					        await chan.send((meth, kwargs))
 | 
				
			||||||
        msg = await chan.receive()
 | 
					 | 
				
			||||||
        res = msg.get('result')
 | 
					 | 
				
			||||||
        if res:
 | 
					 | 
				
			||||||
            return res
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        err = msg.get('error')
 | 
					        while not chan.closed():
 | 
				
			||||||
        if not err:
 | 
					            # send through method + ``kwargs: dict`` as pair
 | 
				
			||||||
            raise ValueError(f'Received unexpected asyncio msg {msg}')
 | 
					            msg = await chan.receive()
 | 
				
			||||||
 | 
					            # print(f'NEXT MSG: {msg}')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        raise err
 | 
					            # TODO: py3.10 ``match:`` syntax B)
 | 
				
			||||||
 | 
					            if 'result' in msg:
 | 
				
			||||||
 | 
					                res = msg.get('result')
 | 
				
			||||||
 | 
					                return res
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            elif 'exception' in msg:
 | 
				
			||||||
 | 
					                err = msg.get('exception')
 | 
				
			||||||
 | 
					                raise err
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            elif 'error' in msg:
 | 
				
			||||||
 | 
					                etype, emsg = msg
 | 
				
			||||||
 | 
					                log.warning(f'IB error relay: {emsg}')
 | 
				
			||||||
 | 
					                continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            else:
 | 
				
			||||||
 | 
					                log.warning(f'UNKNOWN IB MSG: {msg}')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def status_event(
 | 
				
			||||||
 | 
					        self,
 | 
				
			||||||
 | 
					        pattern: str,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    ) -> Union[dict[str, Any], trio.Event]:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        ev = self.event_table.get(pattern)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        if not ev or ev.is_set():
 | 
				
			||||||
 | 
					            # print(f'inserting new data reset event item')
 | 
				
			||||||
 | 
					            ev = self.event_table[pattern] = trio.Event()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        return ev
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async def wait_for_data_reset(self) -> None:
 | 
					    async def wait_for_data_reset(self) -> None:
 | 
				
			||||||
        '''
 | 
					        '''
 | 
				
			||||||
| 
						 | 
					@ -1166,6 +1208,7 @@ class MethodProxy:
 | 
				
			||||||
async def open_aio_client_method_relay(
 | 
					async def open_aio_client_method_relay(
 | 
				
			||||||
    from_trio: asyncio.Queue,
 | 
					    from_trio: asyncio.Queue,
 | 
				
			||||||
    to_trio: trio.abc.SendChannel,
 | 
					    to_trio: trio.abc.SendChannel,
 | 
				
			||||||
 | 
					    event_consumers: dict[str, trio.Event],
 | 
				
			||||||
 | 
					
 | 
				
			||||||
) -> None:
 | 
					) -> None:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1177,7 +1220,7 @@ async def open_aio_client_method_relay(
 | 
				
			||||||
        to_trio.send_nowait(client)
 | 
					        to_trio.send_nowait(client)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # TODO: separate channel for error handling?
 | 
					        # TODO: separate channel for error handling?
 | 
				
			||||||
        # client.inline_errors(to_trio)
 | 
					        client.inline_errors(to_trio)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # relay all method requests to ``asyncio``-side client and
 | 
					        # relay all method requests to ``asyncio``-side client and
 | 
				
			||||||
        # deliver back results
 | 
					        # deliver back results
 | 
				
			||||||
| 
						 | 
					@ -1188,8 +1231,8 @@ async def open_aio_client_method_relay(
 | 
				
			||||||
                break
 | 
					                break
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            meth_name, kwargs = msg
 | 
					            meth_name, kwargs = msg
 | 
				
			||||||
 | 
					 | 
				
			||||||
            meth = getattr(client, meth_name)
 | 
					            meth = getattr(client, meth_name)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            try:
 | 
					            try:
 | 
				
			||||||
                resp = await meth(**kwargs)
 | 
					                resp = await meth(**kwargs)
 | 
				
			||||||
                # echo the msg back
 | 
					                # echo the msg back
 | 
				
			||||||
| 
						 | 
					@ -1201,54 +1244,61 @@ async def open_aio_client_method_relay(
 | 
				
			||||||
                # TODO: relay all errors to trio?
 | 
					                # TODO: relay all errors to trio?
 | 
				
			||||||
                # BaseException,
 | 
					                # BaseException,
 | 
				
			||||||
            ) as err:
 | 
					            ) as err:
 | 
				
			||||||
                to_trio.send_nowait({'error': err})
 | 
					                to_trio.send_nowait({'exception': err})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@acm
 | 
					@acm
 | 
				
			||||||
async def open_client_proxy() -> MethodProxy:
 | 
					async def open_client_proxy() -> MethodProxy:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    try:
 | 
					    # try:
 | 
				
			||||||
        async with to_asyncio.open_channel_from(
 | 
					    event_table = {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    async with (
 | 
				
			||||||
 | 
					        to_asyncio.open_channel_from(
 | 
				
			||||||
            open_aio_client_method_relay,
 | 
					            open_aio_client_method_relay,
 | 
				
			||||||
        ) as (first, chan):
 | 
					            event_consumers=event_table,
 | 
				
			||||||
 | 
					        ) as (first, chan),
 | 
				
			||||||
 | 
					        trio.open_nursery() as relay_n,
 | 
				
			||||||
 | 
					    ):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            assert isinstance(first, Client)
 | 
					        assert isinstance(first, Client)
 | 
				
			||||||
            proxy = MethodProxy(chan)
 | 
					        proxy = MethodProxy(chan, event_table)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # mock all remote methods on ib ``Client``.
 | 
				
			||||||
 | 
					        for name, method in inspect.getmembers(
 | 
				
			||||||
 | 
					            Client, predicate=inspect.isfunction
 | 
				
			||||||
 | 
					        ):
 | 
				
			||||||
 | 
					            if '_' == name[0]:
 | 
				
			||||||
 | 
					                continue
 | 
				
			||||||
 | 
					            setattr(proxy, name, partial(proxy._run_method, meth=name))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        async def relay_events():
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            async with chan.subscribe() as msg_stream:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                async for msg in msg_stream:
 | 
				
			||||||
 | 
					                    if 'event' not in msg:
 | 
				
			||||||
 | 
					                        continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    # if 'event' in msg:
 | 
				
			||||||
 | 
					                    # wake up any system event waiters.
 | 
				
			||||||
 | 
					                    etype, status_msg = msg
 | 
				
			||||||
 | 
					                    reason = status_msg['reason']
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    ev = proxy.event_table.pop(reason, None)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    if ev and ev.statistics().tasks_waiting:
 | 
				
			||||||
 | 
					                        log.info(f'Relaying ib status message: {msg}')
 | 
				
			||||||
 | 
					                        ev.set()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # mock all remote methods on ib ``Client``.
 | 
					 | 
				
			||||||
            for name, method in inspect.getmembers(
 | 
					 | 
				
			||||||
                Client, predicate=inspect.isfunction
 | 
					 | 
				
			||||||
            ):
 | 
					 | 
				
			||||||
                if '_' == name[0]:
 | 
					 | 
				
			||||||
                    continue
 | 
					                    continue
 | 
				
			||||||
                setattr(proxy, name, partial(proxy._run_method, meth=name))
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
            yield proxy
 | 
					        relay_n.start_soon(relay_events)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # terminate asyncio side task
 | 
					        yield proxy
 | 
				
			||||||
            await chan.send(None)
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    except (
 | 
					        # terminate asyncio side task
 | 
				
			||||||
        RequestError,
 | 
					        await chan.send(None)
 | 
				
			||||||
        # BaseException,
 | 
					 | 
				
			||||||
    )as err:
 | 
					 | 
				
			||||||
        code = getattr(err, 'code', None)
 | 
					 | 
				
			||||||
        if code:
 | 
					 | 
				
			||||||
            msg = err.message
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            # TODO: retreive underlying ``ib_insync`` error?
 | 
					 | 
				
			||||||
            if (
 | 
					 | 
				
			||||||
                code == 162 and (
 | 
					 | 
				
			||||||
                    'HMDS query returned no data' in msg
 | 
					 | 
				
			||||||
                    or 'No market data permissions for' in msg
 | 
					 | 
				
			||||||
                )
 | 
					 | 
				
			||||||
                or code == 200
 | 
					 | 
				
			||||||
            ):
 | 
					 | 
				
			||||||
                # these cases should not cause a task crash
 | 
					 | 
				
			||||||
                log.warning(msg)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        else:
 | 
					 | 
				
			||||||
            raise
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@acm
 | 
					@acm
 | 
				
			||||||
| 
						 | 
					@ -1378,6 +1428,12 @@ def normalize(
 | 
				
			||||||
    return data
 | 
					    return data
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					_pacing: str = (
 | 
				
			||||||
 | 
					    'Historical Market Data Service error '
 | 
				
			||||||
 | 
					    'message:Historical data request pacing violation'
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def get_bars(
 | 
					async def get_bars(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    proxy: MethodProxy,
 | 
					    proxy: MethodProxy,
 | 
				
			||||||
| 
						 | 
					@ -1396,14 +1452,13 @@ async def get_bars(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    fails = 0
 | 
					    fails = 0
 | 
				
			||||||
    bars: Optional[list] = None
 | 
					    bars: Optional[list] = None
 | 
				
			||||||
    in_throttle: bool = False
 | 
					 | 
				
			||||||
    first_dt: datetime = None
 | 
					    first_dt: datetime = None
 | 
				
			||||||
    last_dt: datetime = None
 | 
					    last_dt: datetime = None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if end_dt:
 | 
					    if end_dt:
 | 
				
			||||||
        last_dt = pendulum.from_timestamp(end_dt.timestamp())
 | 
					        last_dt = pendulum.from_timestamp(end_dt.timestamp())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    for _ in range(10):
 | 
					    for _ in range(2):
 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
            bars, bars_array = await proxy.bars(
 | 
					            bars, bars_array = await proxy.bars(
 | 
				
			||||||
                fqsn=fqsn,
 | 
					                fqsn=fqsn,
 | 
				
			||||||
| 
						 | 
					@ -1449,26 +1504,43 @@ async def get_bars(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                continue
 | 
					                continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            else:
 | 
					            elif _pacing in msg:
 | 
				
			||||||
                log.exception(
 | 
					
 | 
				
			||||||
                    "Data query rate reached: Press `ctrl-alt-f`"
 | 
					                log.warning(
 | 
				
			||||||
                    "in TWS"
 | 
					                    'History throttle rate reached!\n'
 | 
				
			||||||
 | 
					                    'Resetting farms with `ctrl-alt-f` hack\n'
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
 | 
					                # TODO: we might have to put a task lock around this
 | 
				
			||||||
 | 
					                # method..
 | 
				
			||||||
 | 
					                hist_ev = proxy.status_event(
 | 
				
			||||||
 | 
					                    'HMDS data farm connection is OK:ushmds'
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					                # live_ev = proxy.status_event(
 | 
				
			||||||
 | 
					                #     # 'Market data farm connection is OK:usfuture'
 | 
				
			||||||
 | 
					                #     'Market data farm connection is OK:usfarm'
 | 
				
			||||||
 | 
					                # )
 | 
				
			||||||
 | 
					                # TODO: some kinda resp here that indicates success
 | 
				
			||||||
 | 
					                # otherwise retry?
 | 
				
			||||||
 | 
					                await data_reset_hack()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # TODO: should probably create some alert on screen
 | 
					                # TODO: a while loop here if we timeout?
 | 
				
			||||||
                # and then somehow get that to trigger an event here
 | 
					                for name, ev in [
 | 
				
			||||||
                # that restarts/resumes this task?
 | 
					                    ('history', hist_ev),
 | 
				
			||||||
                if not in_throttle:
 | 
					                    # ('live', live_ev),
 | 
				
			||||||
                    await tractor.breakpoint()
 | 
					                ]:
 | 
				
			||||||
 | 
					                    with trio.move_on_after(22) as cs:
 | 
				
			||||||
 | 
					                        await ev.wait()
 | 
				
			||||||
 | 
					                        log.info(f"{name} DATA RESET")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                # TODO: wait on data con reset event
 | 
					                    if cs.cancelled_caught:
 | 
				
			||||||
                # then begin backfilling again.
 | 
					                        log.warning("reset hack failed on first try?")
 | 
				
			||||||
                # await proxy.wait_for_data()
 | 
					                        # await tractor.breakpoint()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                in_throttle = True
 | 
					 | 
				
			||||||
                fails += 1
 | 
					                fails += 1
 | 
				
			||||||
                continue
 | 
					                continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            else:
 | 
				
			||||||
 | 
					                raise
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    return None, None
 | 
					    return None, None
 | 
				
			||||||
    # else:  # throttle wasn't fixed so error out immediately
 | 
					    # else:  # throttle wasn't fixed so error out immediately
 | 
				
			||||||
| 
						 | 
					@ -1520,8 +1592,7 @@ async def backfill_bars(
 | 
				
			||||||
    # on that until we have the `marketstore` daemon in place in which
 | 
					    # on that until we have the `marketstore` daemon in place in which
 | 
				
			||||||
    # case the shm size will be driven by user config and available sys
 | 
					    # case the shm size will be driven by user config and available sys
 | 
				
			||||||
    # memory.
 | 
					    # memory.
 | 
				
			||||||
    # count: int = 120,
 | 
					    count: int = 65,
 | 
				
			||||||
    count: int = 36,
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
 | 
					    task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -1566,9 +1637,6 @@ async def backfill_bars(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                out, fails = await get_bars(proxy, fqsn, end_dt=first_dt)
 | 
					                out, fails = await get_bars(proxy, fqsn, end_dt=first_dt)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                if fails is None or fails > 1:
 | 
					 | 
				
			||||||
                    break
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                if out == (None, None):
 | 
					                if out == (None, None):
 | 
				
			||||||
                    # could be trying to retreive bars over weekend
 | 
					                    # could be trying to retreive bars over weekend
 | 
				
			||||||
                    # TODO: add logic here to handle tradable hours and
 | 
					                    # TODO: add logic here to handle tradable hours and
 | 
				
			||||||
| 
						 | 
					@ -2222,18 +2290,26 @@ async def deliver_trade_events(
 | 
				
			||||||
            msg = pack_position(item)
 | 
					            msg = pack_position(item)
 | 
				
			||||||
            msg.account = accounts_def.inverse[msg.account]
 | 
					            msg.account = accounts_def.inverse[msg.account]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if getattr(msg, 'reqid', 0) < -1:
 | 
					        elif event_name == 'event':
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # it's a trade event generated by TWS usage.
 | 
					            # it's either a general system status event or an external
 | 
				
			||||||
            log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
 | 
					            # trade event?
 | 
				
			||||||
 | 
					            log.info(f"TWS system status: \n{pformat(item)}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            msg.reqid = 'tws-' + str(-1 * msg.reqid)
 | 
					            # TODO: support this again but needs parsing at the callback
 | 
				
			||||||
 | 
					            # level...
 | 
				
			||||||
 | 
					            # reqid = item.get('reqid', 0)
 | 
				
			||||||
 | 
					            # if getattr(msg, 'reqid', 0) < -1:
 | 
				
			||||||
 | 
					            # log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # msg.reqid = 'tws-' + str(-1 * reqid)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # mark msg as from "external system"
 | 
					            # mark msg as from "external system"
 | 
				
			||||||
            # TODO: probably something better then this.. and start
 | 
					            # TODO: probably something better then this.. and start
 | 
				
			||||||
            # considering multiplayer/group trades tracking
 | 
					            # considering multiplayer/group trades tracking
 | 
				
			||||||
            msg.broker_details['external_src'] = 'tws'
 | 
					            # msg.broker_details['external_src'] = 'tws'
 | 
				
			||||||
            continue
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # XXX: we always serialize to a dict for msgpack
 | 
					        # XXX: we always serialize to a dict for msgpack
 | 
				
			||||||
        # translations, ideally we can move to an msgspec (or other)
 | 
					        # translations, ideally we can move to an msgspec (or other)
 | 
				
			||||||
| 
						 | 
					@ -2336,3 +2412,104 @@ async def open_symbol_search(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            log.debug(f"sending matches: {matches.keys()}")
 | 
					            log.debug(f"sending matches: {matches.keys()}")
 | 
				
			||||||
            await stream.send(matches)
 | 
					            await stream.send(matches)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					async def data_reset_hack(
 | 
				
			||||||
 | 
					    reset_type: str = 'data',
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					) -> None:
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
 | 
					    Run key combos for resetting data feeds and yield back to caller
 | 
				
			||||||
 | 
					    when complete.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    This is a linux-only hack around:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    https://interactivebrokers.github.io/tws-api/historical_limitations.html#pacing_violations
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    TODOs:
 | 
				
			||||||
 | 
					        - a return type that hopefully determines if the hack was
 | 
				
			||||||
 | 
					          successful.
 | 
				
			||||||
 | 
					        - other OS support?
 | 
				
			||||||
 | 
					        - integration with ``ib-gw`` run in docker + Xorg?
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    '''
 | 
				
			||||||
 | 
					    # TODO: try out this lib instead, seems to be the most modern
 | 
				
			||||||
 | 
					    # and usess the underlying lib:
 | 
				
			||||||
 | 
					    # https://github.com/rshk/python-libxdo
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # TODO: seems to be a few libs for python but not sure
 | 
				
			||||||
 | 
					    # if they support all the sub commands we need, order of
 | 
				
			||||||
 | 
					    # most recent commit history:
 | 
				
			||||||
 | 
					    # https://github.com/rr-/pyxdotool
 | 
				
			||||||
 | 
					    # https://github.com/ShaneHutter/pyxdotool
 | 
				
			||||||
 | 
					    # https://github.com/cphyc/pyxdotool
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    try:
 | 
				
			||||||
 | 
					        import i3ipc
 | 
				
			||||||
 | 
					    except ImportError:
 | 
				
			||||||
 | 
					        return False
 | 
				
			||||||
 | 
					        log.warning('IB data hack no-supported on ur platformz')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    i3 = i3ipc.Connection()
 | 
				
			||||||
 | 
					    t = i3.get_tree()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    orig_win_id = t.find_focused().window
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # for tws
 | 
				
			||||||
 | 
					    win_names: list[str] = [
 | 
				
			||||||
 | 
					        'Interactive Brokers',  # tws running in i3
 | 
				
			||||||
 | 
					        'IB Gateway',  # gw running in i3
 | 
				
			||||||
 | 
					        # 'IB',  # gw running in i3 (newer version?)
 | 
				
			||||||
 | 
					    ]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    combos: dict[str, str] = {
 | 
				
			||||||
 | 
					        # only required if we need a connection reset.
 | 
				
			||||||
 | 
					        'connection': ('ctrl+alt+r', 12),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        # data feed reset.
 | 
				
			||||||
 | 
					        'data': ('ctrl+alt+f', 6)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    for name in win_names:
 | 
				
			||||||
 | 
					        results = t.find_titled(name)
 | 
				
			||||||
 | 
					        print(f'results for {name}: {results}')
 | 
				
			||||||
 | 
					        if results:
 | 
				
			||||||
 | 
					            con = results[0]
 | 
				
			||||||
 | 
					            print(f'Resetting data feed for {name}')
 | 
				
			||||||
 | 
					            win_id = str(con.window)
 | 
				
			||||||
 | 
					            w, h = con.rect.width, con.rect.height
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            # TODO: only run the reconnect (2nd) kc on a detected
 | 
				
			||||||
 | 
					            # disconnect?
 | 
				
			||||||
 | 
					            key_combo, timeout = combos[reset_type]
 | 
				
			||||||
 | 
					            # for key_combo, timeout in [
 | 
				
			||||||
 | 
					            #     # only required if we need a connection reset.
 | 
				
			||||||
 | 
					            #     # ('ctrl+alt+r', 12),
 | 
				
			||||||
 | 
					            #     # data feed reset.
 | 
				
			||||||
 | 
					            #     ('ctrl+alt+f', 6)
 | 
				
			||||||
 | 
					            # ]:
 | 
				
			||||||
 | 
					            await trio.run_process([
 | 
				
			||||||
 | 
					                'xdotool',
 | 
				
			||||||
 | 
					                'windowactivate', '--sync', win_id,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # move mouse to bottom left of window (where there should
 | 
				
			||||||
 | 
					                # be nothing to click).
 | 
				
			||||||
 | 
					                'mousemove_relative', '--sync', str(w-4), str(h-4),
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # NOTE: we may need to stick a `--retry 3` in here..
 | 
				
			||||||
 | 
					                'click', '--window', win_id,
 | 
				
			||||||
 | 
					                '--repeat', '3', '1',
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # hackzorzes
 | 
				
			||||||
 | 
					                'key', key_combo,
 | 
				
			||||||
 | 
					                # ],
 | 
				
			||||||
 | 
					                # timeout=timeout,
 | 
				
			||||||
 | 
					            ])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # re-activate and focus original window
 | 
				
			||||||
 | 
					    await trio.run_process([
 | 
				
			||||||
 | 
					        'xdotool',
 | 
				
			||||||
 | 
					        'windowactivate', '--sync', str(orig_win_id),
 | 
				
			||||||
 | 
					        'click', '--window', str(orig_win_id), '1',
 | 
				
			||||||
 | 
					    ])
 | 
				
			||||||
 | 
					    return True
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue