From 95b31cbc0fb89572bcf69569fc967709aa925977 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 29 Jan 2022 12:44:45 -0500 Subject: [PATCH] Drop references to deprecated `tractor.msg.pub` --- piker/brokers/data.py | 4 ++-- piker/brokers/kraken.py | 2 -- piker/data/_sampling.py | 11 +++++------ 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 48b20d80..035d6f4c 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -98,7 +98,7 @@ class BrokerFeed: ) -@tractor.msg.pub(tasks=['stock', 'option']) +@tractor.trionics.msgpub(tasks=['stock', 'option']) async def stream_poll_requests( get_topics: Callable, get_quotes: Coroutine, @@ -293,7 +293,7 @@ async def start_quote_stream( await stream_poll_requests( - # ``msg.pub`` required kwargs + # ``trionics.msgpub`` required kwargs task_name=feed_type, ctx=ctx, topics=symbols, diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 3278e40b..24d2dab3 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -553,8 +553,6 @@ async def stream_quotes( quote = ohlc topic = quote['symbol'].lower() - # XXX: format required by ``tractor.msg.pub`` - # requires a ``Dict[topic: str, quote: dict]`` await send_chan.send({topic: quote}) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 5e702e08..b29b0f7d 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -176,12 +176,11 @@ async def sample_and_broadcast( # TODO: ``numba`` this! for sym, quote in quotes.items(): - # TODO: in theory you can send the IPC msg *before* - # writing to the sharedmem array to decrease latency, - # however, that will require `tractor.msg.pub` support - # here or at least some way to prevent task switching - # at the yield such that the array write isn't delayed - # while another consumer is serviced.. + # TODO: in theory you can send the IPC msg *before* writing + # to the sharedmem array to decrease latency, however, that + # will require at least some way to prevent task switching + # at the yield such that the array write isn't delayed while + # another consumer is serviced.. # start writing the shm buffer with appropriate # trade data