diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 90b9420c..5ec1071c 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -116,11 +116,11 @@ class Client: """ def __init__( self, sockaddr: tuple, - startup_seq: Coroutine, + on_reconnect: Coroutine, auto_reconnect: bool = True, ): self.sockaddr = sockaddr - self._startup_seq = startup_seq + self._recon_seq = on_reconnect self._autorecon = auto_reconnect self.squeue = None @@ -128,7 +128,6 @@ class Client: sockaddr = sockaddr or self.sockaddr stream = await trio.open_tcp_stream(*sockaddr, **kwargs) self.squeue = StreamQueue(stream) - await self._startup_seq(self) return stream async def send(self, item): @@ -168,6 +167,8 @@ class Client: continue else: log.warn("Stream connection re-established!") + # run any reconnection sequence + await self._recon_seq(self) break except (OSError, ConnectionRefusedError): if not down: diff --git a/piker/cli.py b/piker/cli.py index 0ae1f5ce..7fa8edf5 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -126,14 +126,20 @@ def watch(loglevel, broker, rate, name, dhost): watchlist_from_file = wl.ensure_watchlists(_watchlists_data_path) watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) tickers = watchlists[name] + if not tickers: + log.error(f"No symbols found for watchlist `{name}`?") + return async def launch_client(sleep=0.5, tries=10): async def subscribe(client): - # initial request for symbols price streams + # initial subs request for symbols await client.send((brokermod.name, tickers)) + # symbol data is returned in first response which we'll + # ignore on reconnect + await client.recv() - client = Client((dhost, 1616), subscribe) + client = Client((dhost, 1616), on_reconnect=subscribe) for _ in range(tries): # try for 5 seconds try: await client.connect() @@ -163,9 +169,10 @@ def watch(loglevel, broker, rate, name, dhost): target=run, args=(partial(_daemon_main, dhost), loglevel), daemon=True, + name='pikerd', ) child.start() - trio.run(launch_client, 5) + trio.run(partial(launch_client, tries=5)) child.join() diff --git a/piker/log.py b/piker/log.py index cc7d9771..b12aeae2 100644 --- a/piker/log.py +++ b/piker/log.py @@ -17,7 +17,7 @@ LOG_FORMAT = ( # "{bold_white}{log_color}{asctime}{reset}" "{log_color}{asctime}{reset}" " {bold_white}{thin_white}({reset}" - "{thin_white}{threadName}{reset}{bold_white}{thin_white})" + "{thin_white}{processName}: {threadName}{reset}{bold_white}{thin_white})" " {reset}{log_color}[{reset}{bold_log_color}{levelname}{reset}{log_color}]" " {log_color}{name}" " {thin_white}{filename}{log_color}:{reset}{thin_white}{lineno}{log_color}" diff --git a/piker/ui/watchlist.py b/piker/ui/watchlist.py index b94cc9fb..07c720fd 100644 --- a/piker/ui/watchlist.py +++ b/piker/ui/watchlist.py @@ -387,6 +387,8 @@ async def _async_main(name, client, tickers, brokermod, rate): This is started with cli command `piker watch`. ''' + # subscribe for tickers + await client.send((brokermod.name, tickers)) # get initial symbol data (long term data including last days close price) # TODO: need something better this this toy protocol sd = await client.recv()