From 186befc704a41b75dcff7bcef4c50b994b281013 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 May 2018 19:15:43 -0400 Subject: [PATCH 1/4] Only run 'startup sequence' on reconnect When a client loses a connection it will currently need to re-subscribe for symbols and receive a symbol data summary as a first quote response. Only run the provided coroutine on reconnect and call the kwarg `on_reconnect`. The client consuming code is entirely expected at this point to know how the symbol registration protocol works. --- piker/brokers/core.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 90b9420c..5ec1071c 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -116,11 +116,11 @@ class Client: """ def __init__( self, sockaddr: tuple, - startup_seq: Coroutine, + on_reconnect: Coroutine, auto_reconnect: bool = True, ): self.sockaddr = sockaddr - self._startup_seq = startup_seq + self._recon_seq = on_reconnect self._autorecon = auto_reconnect self.squeue = None @@ -128,7 +128,6 @@ class Client: sockaddr = sockaddr or self.sockaddr stream = await trio.open_tcp_stream(*sockaddr, **kwargs) self.squeue = StreamQueue(stream) - await self._startup_seq(self) return stream async def send(self, item): @@ -168,6 +167,8 @@ class Client: continue else: log.warn("Stream connection re-established!") + # run any reconnection sequence + await self._recon_seq(self) break except (OSError, ConnectionRefusedError): if not down: From 84fadf7ac4ebcd0105ab2b14fec3f3c8b764a9ae Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 May 2018 20:33:44 -0400 Subject: [PATCH 2/4] Explicitly subscribe for tickers at wl startup --- piker/ui/watchlist.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/piker/ui/watchlist.py b/piker/ui/watchlist.py index b94cc9fb..07c720fd 100644 --- a/piker/ui/watchlist.py +++ b/piker/ui/watchlist.py @@ -387,6 +387,8 @@ async def _async_main(name, client, tickers, brokermod, rate): This is started with cli command `piker watch`. ''' + # subscribe for tickers + await client.send((brokermod.name, tickers)) # get initial symbol data (long term data including last days close price) # TODO: need something better this this toy protocol sd = await client.recv() From a05a8cc557963ac89b3b0a1f5804b6205d591aaf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 May 2018 20:39:47 -0400 Subject: [PATCH 3/4] Include process name in log messages --- piker/log.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/log.py b/piker/log.py index cc7d9771..b12aeae2 100644 --- a/piker/log.py +++ b/piker/log.py @@ -17,7 +17,7 @@ LOG_FORMAT = ( # "{bold_white}{log_color}{asctime}{reset}" "{log_color}{asctime}{reset}" " {bold_white}{thin_white}({reset}" - "{thin_white}{threadName}{reset}{bold_white}{thin_white})" + "{thin_white}{processName}: {threadName}{reset}{bold_white}{thin_white})" " {reset}{log_color}[{reset}{bold_log_color}{levelname}{reset}{log_color}]" " {log_color}{name}" " {thin_white}{filename}{log_color}:{reset}{thin_white}{lineno}{log_color}" From 7e5e3c4cc6c45bffdb9619ec4c2b19e945320565 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 16 May 2018 20:44:15 -0400 Subject: [PATCH 4/4] Adjust reconnect coro to swallow symbol data resp Couple fixes here: - if no tickers for a watchlist name -> bail - swallow the symbol data response in the reconnect handler coro - don't sleep 5 seconds before connecting to subproc daemon... Resolves #43 --- piker/cli.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/piker/cli.py b/piker/cli.py index 0ae1f5ce..7fa8edf5 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -126,14 +126,20 @@ def watch(loglevel, broker, rate, name, dhost): watchlist_from_file = wl.ensure_watchlists(_watchlists_data_path) watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) tickers = watchlists[name] + if not tickers: + log.error(f"No symbols found for watchlist `{name}`?") + return async def launch_client(sleep=0.5, tries=10): async def subscribe(client): - # initial request for symbols price streams + # initial subs request for symbols await client.send((brokermod.name, tickers)) + # symbol data is returned in first response which we'll + # ignore on reconnect + await client.recv() - client = Client((dhost, 1616), subscribe) + client = Client((dhost, 1616), on_reconnect=subscribe) for _ in range(tries): # try for 5 seconds try: await client.connect() @@ -163,9 +169,10 @@ def watch(loglevel, broker, rate, name, dhost): target=run, args=(partial(_daemon_main, dhost), loglevel), daemon=True, + name='pikerd', ) child.start() - trio.run(launch_client, 5) + trio.run(partial(launch_client, tries=5)) child.join()