piker/piker/brokers/ib/feed.py

1081 lines
34 KiB
Python

# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Data feed endpoints pre-wrapped and ready for use with ``tractor``/``trio``.
"""
from __future__ import annotations
import asyncio
from contextlib import (
asynccontextmanager as acm,
)
from dataclasses import asdict
from datetime import datetime
from functools import partial
from pprint import pformat
from math import isnan
import time
from typing import (
Any,
Callable,
TYPE_CHECKING,
)
from async_generator import aclosing
import ib_insync as ibis
import numpy as np
from pendulum import (
now,
from_timestamp,
# DateTime,
Duration,
duration as mk_duration,
)
import tractor
import trio
from trio_typing import TaskStatus
from piker.accounting import (
MktPair,
)
from piker.data.validate import FeedInit
from piker.brokers._util import (
NoData,
DataUnavailable,
)
from .api import (
# _adhoc_futes_set,
Client,
con2fqme,
log,
load_aio_clients,
MethodProxy,
open_client_proxies,
get_preferred_data_client,
Ticker,
Contract,
RequestError,
)
from ._util import data_reset_hack
from .symbols import get_mkt_info
if TYPE_CHECKING:
from trio._core._run import Task
# XXX NOTE: See available types table docs:
# https://interactivebrokers.github.io/tws-api/tick_types.html
tick_types = {
77: 'trade',
# a "utrade" aka an off exchange "unreportable" (dark) vlm:
# https://interactivebrokers.github.io/tws-api/tick_types.html#rt_volume
48: 'dark_trade',
# standard L1 ticks
0: 'bsize',
1: 'bid',
2: 'ask',
3: 'asize',
4: 'last',
5: 'size',
8: 'volume',
# ``ib_insync`` already packs these into
# quotes under the following fields.
55: 'trades_per_min', # `'tradeRate'`
56: 'vlm_per_min', # `'volumeRate'`
89: 'shortable_units', # `'shortableShares'`
}
@acm
async def open_data_client() -> MethodProxy:
'''
Open the first found preferred "data client" as defined in the
user's ``brokers.toml`` in the ``ib.prefer_data_account`` variable
and deliver that client wrapped in a ``MethodProxy``.
'''
async with (
open_client_proxies() as (proxies, clients),
):
account_name, client = get_preferred_data_client(clients)
proxy = proxies.get(f'ib.{account_name}')
if not proxy:
raise ValueError(
f'No preferred data client could be found for {account_name}!'
)
yield proxy
@acm
async def open_history_client(
mkt: MktPair,
) -> tuple[Callable, int]:
'''
History retreival endpoint - delivers a historical frame callble
that takes in ``pendulum.datetime`` and returns ``numpy`` arrays.
'''
# TODO: mostly meta-data processing to drive shm and tsdb storage..
# - add logic to handle tradable hours and only grab
# valid bars in the range?
# - we want to avoid overrunning the underlying shm array buffer and
# we should probably calc the number of calls to make depending on
# that until we have the `marketstore` daemon in place in which case
# the shm size will be driven by user config and available sys
# memory.
# IB's internal symbology does not expect the "source asset" in
# the "symbol name", what we call the "market pair name". This is
# common in most legacy market brokers since it's presumed that
# given a certain stock exchange, listed assets are traded
# "from" a particular source fiat, normally something like USD
# on the given venue-provider, eg. nasdaq, nyse, etc.
fqme_kwargs: dict[str, Any] = {}
if mkt.dst.atype != 'fiat':
fqme_kwargs = {
'without_src': True, # default is True
'delim_char': '', # bc they would normally use a frickin `.` smh
}
fqme: str = mkt.get_bs_fqme(**(fqme_kwargs))
async with open_data_client() as proxy:
max_timeout: float = 2.
mean: float = 0
count: int = 0
head_dt: None | datetime = None
if (
# fx cons seem to not provide this endpoint?
# TODO: guard against all contract types which don't
# support it?
'idealpro' not in fqme
):
head_dt: datetime | None = await proxy.maybe_get_head_time(
fqme=fqme
)
async def get_hist(
timeframe: float,
end_dt: datetime | None = None,
start_dt: datetime | None = None,
) -> tuple[np.ndarray, str]:
nonlocal max_timeout, mean, count
if (
start_dt
and start_dt.timestamp() == 0
):
await tractor.pause()
query_start = time.time()
out, timedout = await get_bars(
proxy,
fqme,
timeframe,
end_dt=end_dt,
start_dt=start_dt,
)
latency = time.time() - query_start
if (
not timedout
# and latency <= max_timeout
):
count += 1
mean += latency / count
print(
f'HISTORY FRAME QUERY LATENCY: {latency}\n'
f'mean: {mean}'
)
# could be trying to retreive bars over weekend
if out is None:
log.error(f"Can't grab bars starting at {end_dt}!?!?")
if (
end_dt
and head_dt
and end_dt <= head_dt
):
raise DataUnavailable(
f'First timestamp is {head_dt}\n'
f'But {end_dt} was requested..'
)
else:
raise NoData(
info={
'fqme': fqme,
'head_dt': head_dt,
'start_dt': start_dt,
'end_dt': end_dt,
'timedout': timedout,
},
)
# also see return type for `get_bars()`
bars: ibis.objects.BarDataList
bars_array: np.ndarray
first_dt: datetime
last_dt: datetime
(
bars,
bars_array,
first_dt,
last_dt,
) = out
# TODO: audit the sampling period here as well?
# timestep should always be at least as large as the
# period step.
# tdiff: np.ndarray = np.diff(bars_array['time'])
# if (tdiff < timeframe).any():
# await tractor.pause()
# volume cleaning since there's -ve entries,
# wood luv to know what crookery that is..
vlm = bars_array['volume']
vlm[vlm < 0] = 0
return bars_array, first_dt, last_dt
# TODO: it seems like we can do async queries for ohlc
# but getting the order right still isn't working and I'm not
# quite sure why.. needs some tinkering and probably
# a lookthrough of the ``ib_insync`` machinery, for eg. maybe
# we have to do the batch queries on the `asyncio` side?
yield (
get_hist,
{
'erlangs': 1, # max conc reqs
'rate': 3, # max req rate
'frame_types': { # expected frame sizes
1: mk_duration(seconds=2e3),
60: mk_duration(days=2),
}
},
)
_pacing: str = (
'Historical Market Data Service error '
'message:Historical data request pacing violation'
)
async def wait_on_data_reset(
proxy: MethodProxy,
reset_type: str = 'data',
timeout: float = 16, # float('inf'),
task_status: TaskStatus[
tuple[
trio.CancelScope,
trio.Event,
]
] = trio.TASK_STATUS_IGNORED,
) -> bool:
# TODO: we might have to put a task lock around this
# method..
hist_ev = proxy.status_event(
'HMDS data farm connection is OK:ushmds'
)
# TODO: other event messages we might want to try and
# wait for but i wasn't able to get any of this
# reliable..
# reconnect_start = proxy.status_event(
# 'Market data farm is connecting:usfuture'
# )
# live_ev = proxy.status_event(
# 'Market data farm connection is OK:usfuture'
# )
# try to wait on the reset event(s) to arrive, a timeout
# will trigger a retry up to 6 times (for now).
client: Client = proxy._aio_ns
done = trio.Event()
with trio.move_on_after(timeout) as cs:
task_status.started((cs, done))
log.warning(
'Sending DATA RESET request:\n'
f'{client.ib.client}'
)
res = await data_reset_hack(
client=client,
reset_type=reset_type,
)
if not res:
log.warning(
'NO VNC DETECTED!\n'
'Manually press ctrl-alt-f on your IB java app'
)
done.set()
return False
# TODO: not sure if waiting on other events
# is all that useful here or not.
# - in theory you could wait on one of the ones above first
# to verify the reset request was sent?
# - we need the same for real-time quote feeds which can
# sometimes flake out and stop delivering..
for name, ev in [
('history', hist_ev),
]:
await ev.wait()
log.info(f"{name} DATA RESET")
done.set()
return True
if cs.cancel_called:
log.warning(
'Data reset task canceled?'
)
done.set()
return False
_data_resetter_task: Task | None = None
_failed_resets: int = 0
async def get_bars(
proxy: MethodProxy,
fqme: str,
timeframe: int,
# blank to start which tells ib to look up the latest datum
end_dt: str = '',
start_dt: str | None = '',
# TODO: make this more dynamic based on measured frame rx latency?
# how long before we trigger a feed reset (seconds)
feed_reset_timeout: float = 3,
# how many days to subtract before giving up on further
# history queries for instrument, presuming that most don't
# not trade for a week XD
max_nodatas: int = 6,
max_failed_resets: int = 6,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> tuple[
tuple[ # result tuple
ibis.objects.BarDataList,
np.ndarray,
datetime,
datetime,
] | None,
bool, # timed out hint
]:
'''
Retrieve historical data from a ``trio``-side task using
a ``MethoProxy``.
'''
global _data_resetter_task, _failed_resets
nodatas_count: int = 0
data_cs: trio.CancelScope | None = None
result: tuple[
ibis.objects.BarDataList,
np.ndarray,
datetime,
datetime,
] | None = None
result_ready = trio.Event()
async def query():
global _failed_resets
nonlocal result, data_cs, end_dt, nodatas_count
while _failed_resets < max_failed_resets:
try:
(
bars,
bars_array,
dt_duration,
) = await proxy.bars(
fqme=fqme,
end_dt=end_dt,
sample_period_s=timeframe,
# ideally we cancel the request just before we
# cancel on the ``trio``-side and trigger a data
# reset hack.. the problem is there's no way (with
# current impl) to detect a cancel case.
# timeout=timeout,
)
# usually either a request during a venue closure
# or into a large (weekend) closure gap.
if not bars:
# no data returned?
log.warning(
'History frame is blank?\n'
f'start_dt: {start_dt}\n'
f'end_dt: {end_dt}\n'
f'duration: {dt_duration}\n'
)
# NOTE: REQUIRED to pass back value..
result = None
return None
# not enough bars signal, likely due to venue
# operational gaps.
if end_dt:
dur_s: float = len(bars) * timeframe
bars_dur = Duration(seconds=dur_s)
dt_dur_s: float = dt_duration.in_seconds()
if dur_s < dt_dur_s:
log.warning(
'History frame is shorter then expected?\n'
f'start_dt: {start_dt}\n'
f'end_dt: {end_dt}\n'
f'duration: {dt_dur_s}\n'
f'frame duration seconds: {dur_s}\n'
f'dur diff: {dt_duration - bars_dur}\n'
)
# NOTE: we used to try to get a minimal
# set of bars by recursing but this ran
# into possible infinite query loops
# when logic in the `Client.bars()` dt
# diffing went bad. So instead for now
# we just return the
# shorter-then-expected history with
# a warning.
# TODO: in the future it prolly makes
# the most send to do venue operating
# hours lookup and
# timestamp-in-operating-range set
# checking to know for sure if we can
# safely and quickly ignore non-uniform history
# frame timestamp gaps..
# end_dt -= dt_duration
# continue
# await tractor.pause()
first_dt = from_timestamp(
bars[0].date.timestamp())
last_dt = from_timestamp(
bars[-1].date.timestamp())
time = bars_array['time']
assert time[-1] == last_dt.timestamp()
assert time[0] == first_dt.timestamp()
log.info(
f'{len(bars)} bars retreived {first_dt} -> {last_dt}'
)
if data_cs:
data_cs.cancel()
# NOTE: setting this is critical!
result = (
bars, # ib native
bars_array, # numpy
first_dt,
last_dt,
)
# signal data reset loop parent task
result_ready.set()
# NOTE: this isn't getting collected anywhere!
return result
except RequestError as err:
msg: str = err.message
if 'No market data permissions for' in msg:
# TODO: signalling for no permissions searches
raise NoData(
f'Symbol: {fqme}',
)
elif (
'HMDS query returned no data' in msg
):
# XXX: this is now done in the storage mgmt
# layer and we shouldn't implicitly decrement
# the frame dt index since the upper layer may
# be doing so concurrently and we don't want to
# be delivering frames that weren't asked for.
# try to decrement start point and look further back
# end_dt = end_dt.subtract(seconds=2000)
logmsg = "SUBTRACTING DAY from DT index"
if end_dt is not None:
end_dt = end_dt.subtract(days=1)
elif end_dt is None:
end_dt = now().subtract(days=1)
log.warning(
f'NO DATA found ending @ {end_dt}\n'
+ logmsg
)
if nodatas_count >= max_nodatas:
raise DataUnavailable(
f'Presuming {fqme} has no further history '
f'after {max_nodatas} tries..'
)
nodatas_count += 1
continue
elif (
'API historical data query cancelled'
in
err.message
):
log.warning(
'Query cancelled by IB (:eyeroll:):\n'
f'{err.message}'
)
continue
elif (
'Trading TWS session is connected from a different IP'
in
err.message
):
log.warning("ignoring ip address warning")
continue
# XXX: more or less same as above timeout case
elif (
_pacing in msg
):
log.warning(
'History throttle rate reached!\n'
'Resetting farms with `ctrl-alt-f` hack\n'
)
client = proxy._aio_ns.ib.client
# cancel any existing reset task
if data_cs:
log.cancel(f'Cancelling existing reset for {client}')
data_cs.cancel()
# spawn new data reset task
data_cs, reset_done = await nurse.start(
partial(
wait_on_data_reset,
proxy,
reset_type='connection'
)
)
if reset_done:
_failed_resets = 0
else:
_failed_resets += 1
continue
else:
raise
# TODO: make this global across all history task/requests
# such that simultaneous symbol queries don't try data resettingn
# too fast..
unset_resetter: bool = False
async with trio.open_nursery() as nurse:
# start history request that we allow
# to run indefinitely until a result is acquired
nurse.start_soon(query)
# start history reset loop which waits up to the timeout
# for a result before triggering a data feed reset.
while not result_ready.is_set():
with trio.move_on_after(feed_reset_timeout):
await result_ready.wait()
break
if _data_resetter_task:
# don't double invoke the reset hack if another
# requester task already has it covered.
continue
else:
_data_resetter_task = trio.lowlevel.current_task()
unset_resetter: bool = True
# spawn new data reset task
data_cs, reset_done = await nurse.start(
partial(
wait_on_data_reset,
proxy,
reset_type='data',
)
)
# sync wait on reset to complete
await reset_done.wait()
_data_resetter_task = (
None
if unset_resetter
else _data_resetter_task
)
assert result
return (
result,
data_cs is not None,
)
_quote_streams: dict[str, trio.abc.ReceiveStream] = {}
async def _setup_quote_stream(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
symbol: str,
opts: tuple[int] = (
'375', # RT trade volume (excludes utrades)
'233', # RT trade volume (includes utrades)
'236', # Shortable shares
# these all appear to only be updated every 25s thus
# making them mostly useless and explains why the scanner
# is always slow XD
# '293', # Trade count for day
# '294', # Trade rate / minute
# '295', # Vlm rate / minute
),
contract: Contract | None = None,
) -> trio.abc.ReceiveChannel:
'''
Stream a ticker using the std L1 api.
This task is ``asyncio``-side and must be called from
``tractor.to_asyncio.open_channel_from()``.
'''
global _quote_streams
async with load_aio_clients(
disconnect_on_exit=False,
) as accts2clients:
caccount_name, client = get_preferred_data_client(accts2clients)
contract = contract or (await client.find_contract(symbol))
to_trio.send_nowait(contract) # cuz why not
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
# NOTE: it's batch-wise and slow af but I guess could
# be good for backchecking? Seems to be every 5s maybe?
# ticker: Ticker = client.ib.reqTickByTickData(
# contract, 'Last',
# )
# # define a simple queue push routine that streams quote packets
# # to trio over the ``to_trio`` memory channel.
# to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
def teardown():
ticker.updateEvent.disconnect(push)
log.error(f"Disconnected stream for `{symbol}`")
client.ib.cancelMktData(contract)
# decouple broadcast mem chan
_quote_streams.pop(symbol, None)
def push(t: Ticker) -> None:
"""
Push quotes to trio task.
"""
# log.debug(t)
try:
to_trio.send_nowait(t)
except (
trio.BrokenResourceError,
# XXX: HACK, not sure why this gets left stale (probably
# due to our terrible ``tractor.to_asyncio``
# implementation for streams.. but if the mem chan
# gets left here and starts blocking just kill the feed?
# trio.WouldBlock,
):
# XXX: eventkit's ``Event.emit()`` for whatever redic
# reason will catch and ignore regular exceptions
# resulting in tracebacks spammed to console..
# Manually do the dereg ourselves.
teardown()
except trio.WouldBlock:
# log.warning(
# f'channel is blocking symbol feed for {symbol}?'
# f'\n{to_trio.statistics}'
# )
pass
# except trio.WouldBlock:
# # for slow debugging purposes to avoid clobbering prompt
# # with log msgs
# pass
ticker.updateEvent.connect(push)
try:
await asyncio.sleep(float('inf'))
finally:
teardown()
# return from_aio
@acm
async def open_aio_quote_stream(
symbol: str,
contract: Contract | None = None,
) -> trio.abc.ReceiveStream:
from tractor.trionics import broadcast_receiver
global _quote_streams
from_aio = _quote_streams.get(symbol)
if from_aio:
# if we already have a cached feed deliver a rx side clone
# to consumer
async with broadcast_receiver(
from_aio,
2**6,
) as from_aio:
yield from_aio
return
async with tractor.to_asyncio.open_channel_from(
_setup_quote_stream,
symbol=symbol,
contract=contract,
) as (contract, from_aio):
assert contract
# cache feed for later consumers
_quote_streams[symbol] = from_aio
yield from_aio
# TODO: cython/mypyc/numba this!
# or we can at least cache a majority of the values
# except for the ones we expect to change?..
def normalize(
ticker: Ticker,
calc_price: bool = False
) -> dict:
# check for special contract types
con = ticker.contract
fqme, calc_price = con2fqme(con)
# convert named tuples to dicts so we send usable keys
new_ticks = []
for tick in ticker.ticks:
if tick and not isinstance(tick, dict):
td = tick._asdict()
td['type'] = tick_types.get(
td['tickType'],
'n/a',
)
new_ticks.append(td)
tbt = ticker.tickByTicks
if tbt:
print(f'tickbyticks:\n {ticker.tickByTicks}')
ticker.ticks = new_ticks
# some contracts don't have volume so we may want to calculate
# a midpoint price based on data we can acquire (such as bid / ask)
if calc_price:
ticker.ticks.append(
{'type': 'trade', 'price': ticker.marketPrice()}
)
# serialize for transport
data = asdict(ticker)
# generate fqme with possible specialized suffix
# for derivatives, note the lowercase.
data['symbol'] = data['fqme'] = fqme
# convert named tuples to dicts for transport
tbts = data.get('tickByTicks')
if tbts:
data['tickByTicks'] = [tbt._asdict() for tbt in tbts]
# add time stamps for downstream latency measurements
data['brokerd_ts'] = time.time()
# stupid stupid shit...don't even care any more..
# leave it until we do a proper latency study
# if ticker.rtTime is not None:
# data['broker_ts'] = data['rtTime_s'] = float(
# ticker.rtTime.timestamp) / 1000.
data.pop('rtTime')
return data
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
loglevel: str = None,
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Stream symbol quotes.
This is a ``trio`` callable routine meant to be invoked
once the brokerd is up.
'''
# TODO: support multiple subscriptions
sym = symbols[0]
log.info(f'request for real-time quotes: {sym}')
init_msgs: list[FeedInit] = []
proxy: MethodProxy
mkt: MktPair
details: ibis.ContractDetails
async with (
open_data_client() as proxy,
# trio.open_nursery() as tn,
):
mkt, details = await get_mkt_info(
sym,
proxy=proxy, # passed to avoid implicit client load
)
init_msg = FeedInit(mkt_info=mkt)
if mkt.dst.atype in {
'fiat',
'index',
'commodity',
}:
# tell sampler config that it shouldn't do vlm summing.
init_msg.shm_write_opts['sum_tick_vlm'] = False
init_msg.shm_write_opts['has_vlm'] = False
init_msgs.append(init_msg)
con: Contract = details.contract
first_ticker: Ticker | None = None
with trio.move_on_after(1):
first_ticker: Ticker = await proxy.get_quote(
contract=con,
raise_on_timeout=False,
)
if first_ticker:
first_quote: dict = normalize(first_ticker)
# TODO: we need a stack-oriented log levels filters for
# this!
# log.info(message, filter={'stack': 'live_feed'}) ?
log.runtime(
'Rxed init quote:\n\n'
f'{pformat(first_quote)}\n'
)
# NOTE: it might be outside regular trading hours for
# assets with "standard venue operating hours" so we
# only "pretend the feed is live" when the dst asset
# type is NOT within the NON-NORMAL-venue set: aka not
# commodities, forex or crypto currencies which CAN
# always return a NaN on a snap quote request during
# normal venue hours. In the case of a closed venue
# (equitiies, futes, bonds etc.) we at least try to
# grab the OHLC history.
if (
first_ticker
and
isnan(first_ticker.last)
# SO, if the last quote price value is NaN we ONLY
# "pretend to do" `feed_is_live.set()` if it's a known
# dst asset venue with a lot of closed operating hours.
and mkt.dst.atype not in {
'commodity',
'fiat',
'crypto',
}
):
task_status.started((
init_msgs,
first_quote,
))
# it's not really live but this will unblock
# the brokerd feed task to tell the ui to update?
feed_is_live.set()
# block and let data history backfill code run.
await trio.sleep_forever()
return # we never expect feed to come up?
# TODO: we should instead spawn a task that waits on a feed
# to start and let it wait indefinitely..instead of this
# hard coded stuff.
# async def wait_for_first_quote():
# with trio.CancelScope() as cs:
# XXX: MUST acquire a ticker + first quote before starting
# the live quotes loop!
# with trio.move_on_after(1):
first_ticker = await proxy.get_quote(
contract=con,
raise_on_timeout=True,
)
first_quote: dict = normalize(first_ticker)
# TODO: we need a stack-oriented log levels filters for
# this!
# log.info(message, filter={'stack': 'live_feed'}) ?
log.runtime(
'Rxed init quote:\n'
f'{pformat(first_quote)}'
)
cs: trio.CancelScope | None = None
startup: bool = True
while (
startup
or cs.cancel_called
):
with trio.CancelScope() as cs:
async with (
trio.open_nursery() as nurse,
open_aio_quote_stream(
symbol=sym,
contract=con,
) as stream,
):
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash)
first_ticker.ticks = []
# only on first entry at feed boot up
if startup:
startup: bool = False
task_status.started((
init_msgs,
first_quote,
))
# start a stream restarter task which monitors the
# data feed event.
async def reset_on_feed():
# TODO: this seems to be surpressed from the
# traceback in ``tractor``?
# assert 0
rt_ev = proxy.status_event(
'Market data farm connection is OK:usfarm'
)
await rt_ev.wait()
cs.cancel() # cancel called should now be set
nurse.start_soon(reset_on_feed)
async with aclosing(stream):
# if syminfo.get('no_vlm', False):
if not init_msg.shm_write_opts['has_vlm']:
# generally speaking these feeds don't
# include vlm data.
atype: str = mkt.dst.atype
log.info(
f'No-vlm {mkt.fqme}@{atype}, skipping quote poll'
)
else:
# wait for real volume on feed (trading might be
# closed)
while True:
ticker = await stream.receive()
# for a real volume contract we rait for
# the first "real" trade to take place
if (
# not calc_price
# and not ticker.rtTime
not ticker.rtTime
):
# spin consuming tickers until we
# get a real market datum
log.debug(f"New unsent ticker: {ticker}")
continue
else:
log.debug("Received first volume tick")
# ugh, clear ticks since we've
# consumed them (ahem, ib_insync is
# truly stateful trash)
ticker.ticks = []
# XXX: this works because we don't use
# ``aclosing()`` above?
break
quote = normalize(ticker)
log.debug(f"First ticker received {quote}")
# tell data-layer spawner-caller that live
# quotes are now streaming.
feed_is_live.set()
# last = time.time()
async for ticker in stream:
quote = normalize(ticker)
fqme = quote['fqme']
await send_chan.send({fqme: quote})
# ugh, clear ticks since we've consumed them
ticker.ticks = []
# last = time.time()