diff --git a/piker/_daemon.py b/piker/_daemon.py index c206a4d2..77462d35 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -47,7 +47,7 @@ _root_modules = [ class Services(BaseModel): - actor_n: tractor._trionics.ActorNursery + actor_n: tractor._supervise.ActorNursery service_n: trio.Nursery debug_mode: bool # tractor sub-actor debug mode flag service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {} diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 51cf4a39..248039d4 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -21,10 +21,12 @@ Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is built on it) and thus actor aware API calls must be spawned with ``infected_aio==True``. """ -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager as acm from dataclasses import asdict from datetime import datetime from functools import partial +import itertools +from math import isnan from typing import ( Any, Optional, AsyncIterator, Awaitable, @@ -32,8 +34,8 @@ from typing import ( import asyncio from pprint import pformat import inspect -import itertools import logging +import platform from random import randint import time @@ -501,7 +503,11 @@ class Client: contract, snapshot=True, ) - ticker = await ticker.updateEvent + + # ensure a last price gets filled in before we deliver quote + while isnan(ticker.last): + ticker = await ticker.updateEvent + details = (await details_fute)[0] return contract, ticker, details @@ -569,66 +575,6 @@ class Client: ) ) - async def recv_trade_updates( - self, - to_trio: trio.abc.SendChannel, - - ) -> None: - """Stream a ticker using the std L1 api. - """ - self.inline_errors(to_trio) - - def push_tradesies(eventkit_obj, obj, fill=None): - """Push events to trio task. - - """ - if fill is not None: - # execution details event - item = ('fill', (obj, fill)) - - elif eventkit_obj.name() == 'positionEvent': - item = ('position', obj) - - else: - item = ('status', obj) - - log.info(f'eventkit event ->\n{pformat(item)}') - - try: - to_trio.send_nowait(item) - except trio.BrokenResourceError: - log.exception(f'Disconnected from {eventkit_obj} updates') - eventkit_obj.disconnect(push_tradesies) - - # hook up to the weird eventkit object - event stream api - for ev_name in [ - 'orderStatusEvent', # all order updates - 'execDetailsEvent', # all "fill" updates - 'positionEvent', # avg price updates per symbol per account - - # 'commissionReportEvent', - # XXX: ugh, it is a separate event from IB and it's - # emitted as follows: - # self.ib.commissionReportEvent.emit(trade, fill, report) - - # XXX: not sure yet if we need these - # 'updatePortfolioEvent', - - # XXX: these all seem to be weird ib_insync intrernal - # events that we probably don't care that much about - # given the internal design is wonky af.. - # 'newOrderEvent', - # 'orderModifyEvent', - # 'cancelOrderEvent', - # 'openOrderEvent', - ]: - eventkit_obj = getattr(self.ib, ev_name) - handler = partial(push_tradesies, eventkit_obj) - eventkit_obj.connect(handler) - - # let the engine run and stream - await self.ib.disconnectedEvent - def inline_errors( self, to_trio: trio.abc.SendChannel, @@ -674,6 +620,71 @@ class Client: return self.ib.positions(account=account) +async def recv_trade_updates( + + client: Client, + to_trio: trio.abc.SendChannel, + +) -> None: + """Stream a ticker using the std L1 api. + """ + client.inline_errors(to_trio) + + # sync with trio task + to_trio.send_nowait(None) + + def push_tradesies(eventkit_obj, obj, fill=None): + """Push events to trio task. + + """ + if fill is not None: + # execution details event + item = ('fill', (obj, fill)) + + elif eventkit_obj.name() == 'positionEvent': + item = ('position', obj) + + else: + item = ('status', obj) + + log.info(f'eventkit event ->\n{pformat(item)}') + + try: + to_trio.send_nowait(item) + except trio.BrokenResourceError: + log.exception(f'Disconnected from {eventkit_obj} updates') + eventkit_obj.disconnect(push_tradesies) + + # hook up to the weird eventkit object - event stream api + for ev_name in [ + 'orderStatusEvent', # all order updates + 'execDetailsEvent', # all "fill" updates + 'positionEvent', # avg price updates per symbol per account + + # 'commissionReportEvent', + # XXX: ugh, it is a separate event from IB and it's + # emitted as follows: + # self.ib.commissionReportEvent.emit(trade, fill, report) + + # XXX: not sure yet if we need these + # 'updatePortfolioEvent', + + # XXX: these all seem to be weird ib_insync intrernal + # events that we probably don't care that much about + # given the internal design is wonky af.. + # 'newOrderEvent', + # 'orderModifyEvent', + # 'cancelOrderEvent', + # 'openOrderEvent', + ]: + eventkit_obj = getattr(client.ib, ev_name) + handler = partial(push_tradesies, eventkit_obj) + eventkit_obj.connect(handler) + + # let the engine run and stream + await client.ib.disconnectedEvent + + # default config ports _tws_port: int = 7497 _gw_port: int = 4002 @@ -684,7 +695,8 @@ _try_ports = [ # TODO: remove the randint stuff and use proper error checking in client # factor below.. _client_ids = itertools.count(randint(1, 100)) -_client_cache = {} +_client_cache: dict[tuple[str, int], Client] = {} +_scan_ignore: set[tuple[str, int]] = set() def get_config() -> dict[str, Any]: @@ -703,7 +715,7 @@ def get_config() -> dict[str, Any]: _accounts2clients: dict[str, Client] = {} -@asynccontextmanager +@acm async def load_aio_clients( host: str = '127.0.0.1', @@ -718,8 +730,7 @@ async def load_aio_clients( TODO: consider doing this with a ctx mngr eventually? ''' - global _accounts2clients - global _client_cache + global _accounts2clients, _client_cache, _scan_ignore conf = get_config() ib = None @@ -727,7 +738,17 @@ async def load_aio_clients( # attempt to get connection info from config; if no .toml entry # exists, we try to load from a default localhost connection. - host = conf.get('host', '127.0.0.1') + localhost = '127.0.0.1' + host, hosts = conf.get('host'), conf.get('hosts') + if not (hosts or host): + host = localhost + + if not hosts: + hosts = [host] + elif host and hosts: + raise ValueError( + 'Specify only one of `host` or `hosts` in `brokers.toml` config') + ports = conf.get( 'ports', @@ -735,93 +756,146 @@ async def load_aio_clients( { 'gw': 4002, 'tws': 7497, - 'order': ['gw', 'tws'] + # 'order': ['gw', 'tws'] } ) - order = ports['order'] + order = ports.pop('order', None) + if order: + log.warning('`ports.order` section in `brokers.toml` is deprecated') accounts_def = config.load_accounts(['ib']) - - try_ports = [ports[key] for key in order] + try_ports = list(ports.values()) ports = try_ports if port is None else [port] + # we_connected = [] + connect_timeout = 0.5 if platform.system() != 'Windows' else 1 + combos = list(itertools.product(hosts, ports)) - we_connected = [] # allocate new and/or reload disconnected but cached clients - try: - # TODO: support multiple clients allowing for execution on - # multiple accounts (including a paper instance running on the - # same machine) and switching between accounts in the EMs + # try: + # TODO: support multiple clients allowing for execution on + # multiple accounts (including a paper instance running on the + # same machine) and switching between accounts in the EMs - _err = None + _err = None - # (re)load any and all clients that can be found - # from connection details in ``brokers.toml``. - for port in ports: - client = _client_cache.get((host, port)) - accounts_found: dict[str, Client] = {} - if not client or not client.ib.isConnected(): - try: - ib = NonShittyIB() + # (re)load any and all clients that can be found + # from connection details in ``brokers.toml``. + for host, port in combos: - # if this is a persistent brokerd, try to allocate - # a new id for each client - client_id = next(_client_ids) + sockaddr = (host, port) + client = _client_cache.get(sockaddr) + accounts_found: dict[str, Client] = {} - log.info(f"Connecting to the EYEBEE on port {port}!") - await ib.connectAsync(host, port, clientId=client_id) + if ( + client and client.ib.isConnected() or + sockaddr in _scan_ignore + ): + continue - # create and cache client - client = Client(ib) + try: + ib = NonShittyIB() - # Pre-collect all accounts available for this - # connection and map account names to this client - # instance. - pps = ib.positions() - if pps: - for pp in pps: - accounts_found[ - accounts_def.inverse[pp.account] - ] = client + # XXX: not sure if we ever really need to increment the + # client id if teardown is sucessful. + client_id = 616 - # if there are no positions or accounts - # without positions we should still register - # them for this client - for value in ib.accountValues(): - acct = value.account - if acct not in accounts_found: - accounts_found[ - accounts_def.inverse[acct] - ] = client + await ib.connectAsync( + host, + port, + clientId=client_id, - log.info( - f'Loaded accounts for client @ {host}:{port}\n' - f'{pformat(accounts_found)}' - ) + # this timeout is sensative on windows and will + # fail without a good "timeout error" so be + # careful. + timeout=connect_timeout, + ) - # update all actor-global caches - log.info(f"Caching client for {(host, port)}") - _client_cache[(host, port)] = client - we_connected.append(client) - _accounts2clients.update(accounts_found) + # create and cache client + client = Client(ib) - except ConnectionRefusedError as ce: - _err = ce - log.warning(f'Failed to connect on {port}') - else: - if not _client_cache: - raise ConnectionRefusedError(_err) + # Pre-collect all accounts available for this + # connection and map account names to this client + # instance. + pps = ib.positions() + if pps: + for pp in pps: + accounts_found[ + accounts_def.inverse[pp.account] + ] = client - # retreive first loaded client - clients = list(_client_cache.values()) - if clients: - client = clients[0] + # if there are no positions or accounts + # without positions we should still register + # them for this client + for value in ib.accountValues(): + acct = value.account + if acct not in accounts_found: + accounts_found[ + accounts_def.inverse[acct] + ] = client - yield client, _client_cache, _accounts2clients + log.info( + f'Loaded accounts for client @ {host}:{port}\n' + f'{pformat(accounts_found)}' + ) - except BaseException: - for client in we_connected: - client.ib.disconnect() - raise + # update all actor-global caches + log.info(f"Caching client for {(host, port)}") + _client_cache[(host, port)] = client + + # we_connected.append((host, port, client)) + + # TODO: don't do it this way, get a gud to_asyncio + # context / .start() system goin.. + def pop_and_discon(): + log.info(f'Disconnecting client {client}') + client.ib.disconnect() + _client_cache.pop((host, port), None) + + # NOTE: the above callback **CAN'T FAIL** or shm won't get + # torn down correctly ... + tractor._actor._lifetime_stack.callback(pop_and_discon) + + _accounts2clients.update(accounts_found) + + except ( + ConnectionRefusedError, + + # TODO: if trying to scan for remote api clients + # pretty sure we need to catch this, though it + # definitely needs a shorter timeout since it hangs + # for like 5s.. + asyncio.exceptions.TimeoutError, + OSError, + ) as ce: + _err = ce + log.warning(f'Failed to connect on {port}') + + # cache logic to avoid rescanning if we already have all + # clients loaded. + _scan_ignore.add(sockaddr) + else: + if not _client_cache: + raise ConnectionError( + 'No ib APIs could be found scanning @:\n' + f'{pformat(combos)}\n' + 'Check your `brokers.toml` and/or network' + ) from _err + + # retreive first loaded client + clients = list(_client_cache.values()) + if clients: + client = clients[0] + + yield client, _client_cache, _accounts2clients + + # TODO: this in a way that works xD + # finally: + # pass + # # async with trio.CancelScope(shield=True): + # for host, port, client in we_connected: + # client.ib.disconnect() + # _client_cache.pop((host, port)) + # raise async def _aio_run_client_method( @@ -862,16 +936,16 @@ async def _trio_run_client_method( assert ca.is_infected_aio() # if the method is an *async gen* stream for it - meth = getattr(Client, method) + # meth = getattr(Client, method) - args = tuple(inspect.getfullargspec(meth).args) + # args = tuple(inspect.getfullargspec(meth).args) - if inspect.isasyncgenfunction(meth) or ( - # if the method is an *async func* but manually - # streams back results, make sure to also stream it - 'to_trio' in args - ): - kwargs['_treat_as_stream'] = True + # if inspect.isasyncgenfunction(meth) or ( + # # if the method is an *async func* but manually + # # streams back results, make sure to also stream it + # 'to_trio' in args + # ): + # kwargs['_treat_as_stream'] = True return await tractor.to_asyncio.run_task( _aio_run_client_method, @@ -921,7 +995,7 @@ def get_client_proxy( return proxy -@asynccontextmanager +@acm async def get_client( **kwargs, ) -> Client: @@ -990,8 +1064,10 @@ def normalize( async def get_bars( + sym: str, end_dt: str = "", + ) -> (dict, np.ndarray): _err: Optional[Exception] = None @@ -1019,10 +1095,20 @@ async def get_bars( # TODO: retreive underlying ``ib_insync`` error? if err.code == 162: + # TODO: so this error is normally raised (it seems) if + # we try to retrieve history for a time range for which + # there is none. in that case we should not only report + # the "empty range" but also do a iteration on the time + # step for ``next_dt`` to see if we can pull older + # history. if 'HMDS query returned no data' in err.message: - # means we hit some kind of historical "dead zone" - # and further requests seem to always cause - # throttling despite the rps being low + # means we hit some kind of historical "empty space" + # and further requests will need to decrement the + # start time dt in order to not receive a further + # error? + # OLDER: seem to always cause throttling despite low rps + + # raise err break elif 'No market data permissions for' in err.message: @@ -1045,17 +1131,22 @@ async def get_bars( fails += 1 continue - return (None, None) - + return None, None # else: # throttle wasn't fixed so error out immediately # raise _err async def backfill_bars( + sym: str, shm: ShmArray, # type: ignore # noqa - # count: int = 20, # NOTE: any more and we'll overrun underlying buffer - count: int = 6, # NOTE: any more and we'll overrun the underlying buffer + + # TODO: we want to avoid overrunning the underlying shm array buffer + # and we should probably calc the number of calls to make depending + # on that until we have the `marketstore` daemon in place in which + # case the shm size will be driven by user config and available sys + # memory. + count: int = 16, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, @@ -1066,7 +1157,13 @@ async def backfill_bars( https://github.com/pikers/piker/issues/128 """ - (first_bars, bars_array, next_dt), fails = await get_bars(sym) + out, fails = await get_bars(sym) + if out is None: + raise RuntimeError("Could not pull currrent history?!") + + (first_bars, bars_array, next_dt) = out + vlm = bars_array['volume'] + vlm[vlm < 0] = 0 # write historical data to buffer shm.push(bars_array) @@ -1091,6 +1188,17 @@ async def backfill_bars( continue bars, bars_array, next_dt = out + + # volume cleaning since there's -ve entries, + # wood luv to know what crookery that is.. + vlm = bars_array['volume'] + vlm[vlm < 0] = 0 + # TODO we should probably dig into forums to see what peeps + # think this data "means" and then use it as an indicator of + # sorts? dinkus has mentioned that $vlms for the day dont' + # match other platforms nor the summary stat tws shows in + # the monitor - it's probably worth investigating. + shm.push(bars_array, prepend=True) i += 1 @@ -1118,14 +1226,21 @@ _quote_streams: dict[str, trio.abc.ReceiveStream] = {} async def _setup_quote_stream( + + from_trio: asyncio.Queue, + to_trio: trio.abc.SendChannel, + symbol: str, opts: tuple[int] = ('375', '233', '236'), contract: Optional[Contract] = None, -) -> None: + +) -> trio.abc.ReceiveChannel: """Stream a ticker using the std L1 api. """ global _quote_streams + to_trio.send_nowait(None) + async with load_aio_clients() as ( client, clients, @@ -1134,29 +1249,40 @@ async def _setup_quote_stream( contract = contract or (await client.find_contract(symbol)) ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) - # define a simple queue push routine that streams quote packets - # to trio over the ``to_trio`` memory channel. - to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore + # # define a simple queue push routine that streams quote packets + # # to trio over the ``to_trio`` memory channel. + # to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore + def teardown(): + ticker.updateEvent.disconnect(push) + log.error(f"Disconnected stream for `{symbol}`") + client.ib.cancelMktData(contract) - def push(t): - """Push quotes to trio task. + # decouple broadcast mem chan + _quote_streams.pop(symbol, None) + + def push(t: Ticker) -> None: + """ + Push quotes to trio task. """ # log.debug(t) try: to_trio.send_nowait(t) - except trio.BrokenResourceError: + except ( + trio.BrokenResourceError, + + # XXX: HACK, not sure why this gets left stale (probably + # due to our terrible ``tractor.to_asyncio`` + # implementation for streams.. but if the mem chan + # gets left here and starts blocking just kill the feed? + # trio.WouldBlock, + ): # XXX: eventkit's ``Event.emit()`` for whatever redic # reason will catch and ignore regular exceptions # resulting in tracebacks spammed to console.. # Manually do the dereg ourselves. - ticker.updateEvent.disconnect(push) - log.error(f"Disconnected stream for `{symbol}`") - client.ib.cancelMktData(contract) - - # decouple broadcast mem chan - _quote_streams.pop(symbol, None) + teardown() # except trio.WouldBlock: # # for slow debugging purposes to avoid clobbering prompt @@ -1164,35 +1290,43 @@ async def _setup_quote_stream( # pass ticker.updateEvent.connect(push) + try: + await asyncio.sleep(float('inf')) + finally: + teardown() - return from_aio + # return from_aio -async def start_aio_quote_stream( +@acm +async def open_aio_quote_stream( + symbol: str, contract: Optional[Contract] = None, + ) -> trio.abc.ReceiveStream: + from tractor.trionics import broadcast_receiver global _quote_streams from_aio = _quote_streams.get(symbol) if from_aio: # if we already have a cached feed deliver a rx side clone to consumer - return from_aio.clone() + async with broadcast_receiver(from_aio) as from_aio: + yield from_aio + return - else: - - from_aio = await tractor.to_asyncio.run_task( - _setup_quote_stream, - symbol=symbol, - contract=contract, - ) + async with tractor.to_asyncio.open_channel_from( + _setup_quote_stream, + symbol=symbol, + contract=contract, + ) as (first, from_aio): # cache feed for later consumers _quote_streams[symbol] = from_aio - return from_aio + yield from_aio async def stream_quotes( @@ -1221,116 +1355,120 @@ async def stream_quotes( symbol=sym, ) - stream = await start_aio_quote_stream(symbol=sym, contract=contract) + # stream = await start_aio_quote_stream(symbol=sym, contract=contract) + async with open_aio_quote_stream( + symbol=sym, contract=contract + ) as stream: - # pass back some symbol info like min_tick, trading_hours, etc. - syminfo = asdict(details) - syminfo.update(syminfo['contract']) + # pass back some symbol info like min_tick, trading_hours, etc. + syminfo = asdict(details) + syminfo.update(syminfo['contract']) - # nested dataclass we probably don't need and that won't IPC serialize - syminfo.pop('secIdList') + # nested dataclass we probably don't need and that won't IPC serialize + syminfo.pop('secIdList') - # TODO: more consistent field translation - atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']] + # TODO: more consistent field translation + atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']] - # for stocks it seems TWS reports too small a tick size - # such that you can't submit orders with that granularity? - min_tick = 0.01 if atype == 'stock' else 0 + # for stocks it seems TWS reports too small a tick size + # such that you can't submit orders with that granularity? + min_tick = 0.01 if atype == 'stock' else 0 - syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick) + syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick) - # for "traditional" assets, volume is normally discreet, not a float - syminfo['lot_tick_size'] = 0.0 + # for "traditional" assets, volume is normally discreet, not a float + syminfo['lot_tick_size'] = 0.0 - # TODO: for loop through all symbols passed in - init_msgs = { - # pass back token, and bool, signalling if we're the writer - # and that history has been written - sym: { - 'symbol_info': syminfo, + # TODO: for loop through all symbols passed in + init_msgs = { + # pass back token, and bool, signalling if we're the writer + # and that history has been written + sym: { + 'symbol_info': syminfo, + } } - } - con = first_ticker.contract + con = first_ticker.contract - # should be real volume for this contract by default - calc_price = False + # should be real volume for this contract by default + calc_price = False - # check for special contract types - if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): + # check for special contract types + if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): - suffix = con.primaryExchange - if not suffix: - suffix = con.exchange + suffix = con.primaryExchange + if not suffix: + suffix = con.exchange - else: - # commodities and forex don't have an exchange name and - # no real volume so we have to calculate the price - suffix = con.secType - # no real volume on this tract - calc_price = True + else: + # commodities and forex don't have an exchange name and + # no real volume so we have to calculate the price + suffix = con.secType + # no real volume on this tract + calc_price = True - quote = normalize(first_ticker, calc_price=calc_price) - con = quote['contract'] - topic = '.'.join((con['symbol'], suffix)).lower() - quote['symbol'] = topic + quote = normalize(first_ticker, calc_price=calc_price) + con = quote['contract'] + topic = '.'.join((con['symbol'], suffix)).lower() + quote['symbol'] = topic - # pass first quote asap - first_quote = {topic: quote} + # pass first quote asap + first_quote = {topic: quote} - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is stateful trash) - first_ticker.ticks = [] + # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is stateful trash) + first_ticker.ticks = [] - log.debug(f"First ticker received {quote}") + log.debug(f"First ticker received {quote}") - task_status.started((init_msgs, first_quote)) + task_status.started((init_msgs, first_quote)) - if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): - # suffix = 'exchange' - # calc_price = False # should be real volume for contract + async with aclosing(stream): + if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): + # suffix = 'exchange' + # calc_price = False # should be real volume for contract - # wait for real volume on feed (trading might be closed) - while True: + # wait for real volume on feed (trading might be closed) + while True: - ticker = await stream.receive() + ticker = await stream.receive() - # for a real volume contract we rait for the first - # "real" trade to take place - if not calc_price and not ticker.rtTime: - # spin consuming tickers until we get a real market datum - log.debug(f"New unsent ticker: {ticker}") - continue - else: - log.debug("Received first real volume tick") - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is truly stateful trash) - ticker.ticks = [] + # for a real volume contract we rait for the first + # "real" trade to take place + if not calc_price and not ticker.rtTime: + # spin consuming tickers until we get a real + # market datum + log.debug(f"New unsent ticker: {ticker}") + continue + else: + log.debug("Received first real volume tick") + # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is truly stateful trash) + ticker.ticks = [] - # XXX: this works because we don't use - # ``aclosing()`` above? - break + # XXX: this works because we don't use + # ``aclosing()`` above? + break - # tell caller quotes are now coming in live - feed_is_live.set() + # tell caller quotes are now coming in live + feed_is_live.set() - # last = time.time() - async with aclosing(stream): - async for ticker in stream: - # print(f'ticker rate: {1/(time.time() - last)}') - - # print(ticker.vwap) - quote = normalize( - ticker, - calc_price=calc_price - ) - - quote['symbol'] = topic - await send_chan.send({topic: quote}) - - # ugh, clear ticks since we've consumed them - ticker.ticks = [] # last = time.time() + async for ticker in stream: + # print(f'ticker rate: {1/(time.time() - last)}') + + # print(ticker.vwap) + quote = normalize( + ticker, + calc_price=calc_price + ) + + quote['symbol'] = topic + await send_chan.send({topic: quote}) + + # ugh, clear ticks since we've consumed them + ticker.ticks = [] + # last = time.time() def pack_position( @@ -1472,14 +1610,25 @@ async def trades_dialogue( accounts = set() clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] - for account, client in _accounts2clients.items(): + async with trio.open_nursery() as nurse: + for account, client in _accounts2clients.items(): - # each client to an api endpoint will have it's own event stream - trade_event_stream = await _trio_run_client_method( - method='recv_trade_updates', - client=client, - ) - clients.append((client, trade_event_stream)) + async def open_stream( + task_status: TaskStatus[ + trio.abc.ReceiveChannel + ] = trio.TASK_STATUS_IGNORED, + ): + # each api client has a unique event stream + async with tractor.to_asyncio.open_channel_from( + recv_trade_updates, + client=client, + ) as (first, trade_event_stream): + task_status.started(trade_event_stream) + await trio.sleep_forever() + + trade_event_stream = await nurse.start(open_stream) + + clients.append((client, trade_event_stream)) for client in _client_cache.values(): for pos in client.positions(): @@ -1488,26 +1637,29 @@ async def trades_dialogue( accounts.add(msg.account) all_positions.append(msg.dict()) - await ctx.started((all_positions, accounts)) + await ctx.started(( + all_positions, + tuple(name for name in accounts_def if name in accounts), + )) - async with ( - ctx.open_stream() as ems_stream, - trio.open_nursery() as n, - ): - # start order request handler **before** local trades event loop - n.start_soon(handle_order_requests, ems_stream, accounts_def) + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + # start order request handler **before** local trades event loop + n.start_soon(handle_order_requests, ems_stream, accounts_def) - # allocate event relay tasks for each client connection - for client, stream in clients: - n.start_soon( - deliver_trade_events, - stream, - ems_stream, - accounts_def - ) + # allocate event relay tasks for each client connection + for client, stream in clients: + n.start_soon( + deliver_trade_events, + stream, + ems_stream, + accounts_def + ) - # block until cancelled - await trio.sleep_forever() + # block until cancelled + await trio.sleep_forever() async def deliver_trade_events( diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 4f766daf..84983808 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -25,7 +25,7 @@ from dataclasses import dataclass, field import trio import tractor -from tractor._broadcast import broadcast_receiver +from tractor.trionics import broadcast_receiver from ..data._source import Symbol from ..log import get_logger diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 254fee76..483db8a8 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -868,7 +868,9 @@ async def display_symbol_data( ) async with ( - maybe_open_vlm_display(linkedsplits, ohlcv), + # XXX: this slipped in during a commits refacotr, + # it's actually landing proper in #231 + # maybe_open_vlm_display(linkedsplits, ohlcv), open_order_mode( feed, diff --git a/snippets/ib_data_reset.py b/snippets/ib_data_reset.py new file mode 100644 index 00000000..f5d4ca39 --- /dev/null +++ b/snippets/ib_data_reset.py @@ -0,0 +1,61 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +IB api client data feed reset hack for i3. + +''' +import subprocess + +import i3ipc + +i3 = i3ipc.Connection() +t = i3.get_tree() + +# for tws +win_names: list[str] = [ + 'Interactive Brokers', # tws running in i3 + 'IB Gateway.', # gw running in i3 +] + +for name in win_names: + results = t.find_named(name) + if results: + con = results[0] + print(f'Resetting data feed for {name}') + win_id = str(con.window) + w, h = con.rect.width, con.rect.height + + # TODO: seems to be a few libs for python but not sure + # if they support all the sub commands we need, order of + # most recent commit history: + # https://github.com/rr-/pyxdotool + # https://github.com/ShaneHutter/pyxdotool + # https://github.com/cphyc/pyxdotool + subprocess.call([ + 'xdotool', + 'windowactivate', '--sync', win_id, + + # move mouse to bottom left of window (where there should + # be nothing to click). + 'mousemove_relative', '--sync', str(w-3), str(h-3), + + # NOTE: we may need to stick a `--retry 3` in here.. + 'click', '--window', win_id, '1', + + # hackzorzes + 'key', 'ctrl+alt+f', + ])