Merge pull request #161 from pikers/cached_feeds

Cached feeds
binance_backend
goodboy 2021-04-10 14:33:43 -04:00 committed by GitHub
commit 54d272ea29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1041 additions and 818 deletions

View File

@ -103,6 +103,21 @@ bet you weren't expecting this from the foss bby::
piker -b kraken chart XBTUSD piker -b kraken chart XBTUSD
run in distributed mode
***********************
start the service daemon::
pikerd -l info
connect yourt chart::
piker -b kraken chart XMRXBT
enjoy persistent real-time data feeds tied to daemon lifetime.
if anyone asks you what this project is about if anyone asks you what this project is about
********************************************* *********************************************
you don't talk about it. you don't talk about it.

View File

@ -18,6 +18,7 @@
Structured, daemon tree service management. Structured, daemon tree service management.
""" """
from functools import partial
from typing import Optional, Union from typing import Optional, Union
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
@ -87,6 +88,18 @@ async def open_pikerd(
yield _services yield _services
@asynccontextmanager
async def maybe_open_runtime(
loglevel: Optional[str] = None,
**kwargs,
) -> None:
if not tractor.current_actor(err_on_no_runtime=False):
async with tractor.open_root_actor(loglevel=loglevel, **kwargs):
yield
else:
yield
@asynccontextmanager @asynccontextmanager
async def maybe_open_pikerd( async def maybe_open_pikerd(
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
@ -100,14 +113,14 @@ async def maybe_open_pikerd(
if loglevel: if loglevel:
get_console_log(loglevel) get_console_log(loglevel)
try: # subtle, we must have the runtime up here or portal lookup will fail
async with maybe_open_runtime(loglevel, **kwargs):
async with tractor.find_actor(_root_dname) as portal: async with tractor.find_actor(_root_dname) as portal:
assert portal is not None # assert portal is not None
if portal is not None:
yield portal yield portal
return return
except (RuntimeError, AssertionError): # tractor runtime not started yet
# presume pikerd role # presume pikerd role
async with open_pikerd( async with open_pikerd(
loglevel, loglevel,
@ -124,7 +137,7 @@ _data_mods = [
'piker.brokers.core', 'piker.brokers.core',
'piker.brokers.data', 'piker.brokers.data',
'piker.data', 'piker.data',
'piker.data._buffer' 'piker.data._sampling'
] ]
@ -134,6 +147,8 @@ async def spawn_brokerd(
**tractor_kwargs **tractor_kwargs
) -> tractor._portal.Portal: ) -> tractor._portal.Portal:
from .data import _setup_persistent_brokerd
log.info(f'Spawning {brokername} broker daemon') log.info(f'Spawning {brokername} broker daemon')
brokermod = get_brokermod(brokername) brokermod = get_brokermod(brokername)
@ -145,13 +160,28 @@ async def spawn_brokerd(
global _services global _services
assert _services assert _services
await _services.actor_n.start_actor( portal = await _services.actor_n.start_actor(
dname, dname,
enable_modules=_data_mods + [brokermod.__name__], enable_modules=_data_mods + [brokermod.__name__],
loglevel=loglevel, loglevel=loglevel,
**tractor_kwargs **tractor_kwargs
) )
# TODO: so i think this is the perfect use case for supporting
# a cross-actor async context manager api instead of this
# shoort-and-forget task spawned in the root nursery, we'd have an
# async exit stack that we'd register the `portal.open_context()`
# call with and then have the ability to unwind the call whenevs.
# non-blocking setup of brokerd service nursery
_services.service_n.start_soon(
partial(
portal.run,
_setup_persistent_brokerd,
brokername=brokername,
)
)
return dname return dname

View File

@ -22,7 +22,6 @@ from typing import Dict
from contextlib import asynccontextmanager, AsyncExitStack from contextlib import asynccontextmanager, AsyncExitStack
import trio import trio
import tractor
from . import get_brokermod from . import get_brokermod
from ..log import get_logger from ..log import get_logger
@ -30,10 +29,12 @@ from ..log import get_logger
log = get_logger(__name__) log = get_logger(__name__)
_cache: Dict[str, 'Client'] = {}
_cache: Dict[str, 'Client'] = {} # noqa
@asynccontextmanager @asynccontextmanager
async def get_cached_client( async def open_cached_client(
brokername: str, brokername: str,
*args, *args,
**kwargs, **kwargs,
@ -77,7 +78,8 @@ async def get_cached_client(
yield client yield client
finally: finally:
if client is not None:
# if no more consumers, teardown the client
client._consumers -= 1 client._consumers -= 1
if client._consumers <= 0: if client._consumers <= 0:
# teardown the client
await client._exit_stack.aclose() await client._exit_stack.aclose()

View File

@ -34,10 +34,11 @@ import logging
import time import time
import trio import trio
from trio_typing import TaskStatus
import tractor import tractor
from async_generator import aclosing from async_generator import aclosing
from ib_insync.wrapper import RequestError from ib_insync.wrapper import RequestError
from ib_insync.contract import Contract, ContractDetails from ib_insync.contract import Contract, ContractDetails, Option
from ib_insync.order import Order from ib_insync.order import Order
from ib_insync.ticker import Ticker from ib_insync.ticker import Ticker
from ib_insync.objects import Position from ib_insync.objects import Position
@ -46,14 +47,9 @@ from ib_insync.wrapper import Wrapper
from ib_insync.client import Client as ib_Client from ib_insync.client import Client as ib_Client
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ( from ..data import maybe_spawn_brokerd
maybe_spawn_brokerd,
iterticks,
attach_shm_array,
subscribe_ohlc_for_increment,
_buffer,
)
from ..data._source import from_df from ..data._source import from_df
from ..data._sharedmem import ShmArray
from ._util import SymbolNotFound from ._util import SymbolNotFound
@ -168,6 +164,7 @@ class Client:
# contract cache # contract cache
self._contracts: Dict[str, Contract] = {} self._contracts: Dict[str, Contract] = {}
self._feeds: Dict[str, trio.abc.SendChannel] = {}
# NOTE: the ib.client here is "throttled" to 45 rps by default # NOTE: the ib.client here is "throttled" to 45 rps by default
@ -384,42 +381,6 @@ class Client:
formatDate=2, # timezone aware UTC datetime formatDate=2, # timezone aware UTC datetime
) )
async def stream_ticker(
self,
symbol: str,
to_trio,
opts: Tuple[int] = ('375', '233', '236'),
contract: Optional[Contract] = None,
) -> None:
"""Stream a ticker using the std L1 api.
"""
contract = contract or (await self.find_contract(symbol))
ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts))
# define a simple queue push routine that streams quote packets
# to trio over the ``to_trio`` memory channel.
def push(t):
"""Push quotes to trio task.
"""
# log.debug(t)
try:
to_trio.send_nowait(t)
except trio.BrokenResourceError:
# XXX: eventkit's ``Event.emit()`` for whatever redic
# reason will catch and ignore regular exceptions
# resulting in tracebacks spammed to console..
# Manually do the dereg ourselves.
ticker.updateEvent.disconnect(push)
log.error(f"Disconnected stream for `{symbol}`")
self.ib.cancelMktData(contract)
ticker.updateEvent.connect(push)
# let the engine run and stream
await self.ib.disconnectedEvent
async def get_quote( async def get_quote(
self, self,
symbol: str, symbol: str,
@ -613,6 +574,8 @@ async def _aio_get_client(
client_id: Optional[int] = None, client_id: Optional[int] = None,
) -> Client: ) -> Client:
"""Return an ``ib_insync.IB`` instance wrapped in our client API. """Return an ``ib_insync.IB`` instance wrapped in our client API.
Client instances are cached for later use.
""" """
# first check cache for existing client # first check cache for existing client
@ -652,8 +615,10 @@ async def _aio_get_client(
# create and cache # create and cache
try: try:
client = Client(ib) client = Client(ib)
_client_cache[(host, port)] = client _client_cache[(host, port)] = client
log.debug(f"Caching client for {(host, port)}") log.debug(f"Caching client for {(host, port)}")
yield client yield client
except BaseException: except BaseException:
@ -691,13 +656,14 @@ async def _trio_run_client_method(
# if the method is an *async gen* stream for it # if the method is an *async gen* stream for it
meth = getattr(Client, method) meth = getattr(Client, method)
if inspect.isasyncgenfunction(meth):
kwargs['_treat_as_stream'] = True
args = tuple(inspect.getfullargspec(meth).args)
if inspect.isasyncgenfunction(meth) or (
# if the method is an *async func* but manually # if the method is an *async func* but manually
# streams back results, make sure to also stream it # streams back results, make sure to also stream it
args = tuple(inspect.getfullargspec(meth).args) 'to_trio' in args
if 'to_trio' in args: ):
kwargs['_treat_as_stream'] = True kwargs['_treat_as_stream'] = True
result = await tractor.to_asyncio.run_task( result = await tractor.to_asyncio.run_task(
@ -780,7 +746,7 @@ def normalize(
# convert named tuples to dicts so we send usable keys # convert named tuples to dicts so we send usable keys
new_ticks = [] new_ticks = []
for tick in ticker.ticks: for tick in ticker.ticks:
if tick: if tick and not isinstance(tick, dict):
td = tick._asdict() td = tick._asdict()
td['type'] = tick_types.get(td['tickType'], 'n/a') td['type'] = tick_types.get(td['tickType'], 'n/a')
@ -811,36 +777,13 @@ def normalize(
return data return data
_local_buffer_writers = {} async def backfill_bars(
@asynccontextmanager
async def activate_writer(key: str) -> (bool, trio.Nursery):
"""Mark the current actor with module var determining
whether an existing shm writer task is already active.
This avoids more then one writer resulting in data
clobbering.
"""
global _local_buffer_writers
try:
assert not _local_buffer_writers.get(key, False)
_local_buffer_writers[key] = True
async with trio.open_nursery() as n:
yield n
finally:
_local_buffer_writers.pop(key, None)
async def fill_bars(
sym: str, sym: str,
first_bars: list, shm: ShmArray, # type: ignore # noqa
shm: 'ShmArray', # type: ignore # noqa
# count: int = 20, # NOTE: any more and we'll overrun underlying buffer # count: int = 20, # NOTE: any more and we'll overrun underlying buffer
count: int = 6, # NOTE: any more and we'll overrun the underlying buffer count: int = 10, # NOTE: any more and we'll overrun the underlying buffer
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
"""Fill historical bars into shared mem / storage afap. """Fill historical bars into shared mem / storage afap.
@ -848,6 +791,18 @@ async def fill_bars(
https://github.com/pikers/piker/issues/128 https://github.com/pikers/piker/issues/128
""" """
first_bars, bars_array = await _trio_run_client_method(
method='bars',
symbol=sym,
)
# write historical data to buffer
shm.push(bars_array)
with trio.CancelScope() as cs:
task_status.started(cs)
next_dt = first_bars[0].date next_dt = first_bars[0].date
i = 0 i = 0
@ -860,6 +815,9 @@ async def fill_bars(
end_dt=next_dt, end_dt=next_dt,
) )
if bars_array is None:
raise SymbolNotFound(sym)
shm.push(bars_array, prepend=True) shm.push(bars_array, prepend=True)
i += 1 i += 1
next_dt = bars[0].date next_dt = bars[0].date
@ -877,7 +835,9 @@ async def fill_bars(
else: else:
log.exception( log.exception(
"Data query rate reached: Press `ctrl-alt-f` in TWS") "Data query rate reached: Press `ctrl-alt-f`"
"in TWS"
)
# TODO: should probably create some alert on screen # TODO: should probably create some alert on screen
# and then somehow get that to trigger an event here # and then somehow get that to trigger an event here
@ -904,27 +864,96 @@ asset_type_map = {
} }
# TODO: figure out how to share quote feeds sanely despite _quote_streams: Dict[str, trio.abc.ReceiveStream] = {}
# the wacky ``ib_insync`` api.
# @tractor.msg.pub
@tractor.stream async def _setup_quote_stream(
symbol: str,
opts: Tuple[int] = ('375', '233', '236'),
contract: Optional[Contract] = None,
) -> None:
"""Stream a ticker using the std L1 api.
"""
global _quote_streams
async with _aio_get_client() as client:
contract = contract or (await client.find_contract(symbol))
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
# define a simple queue push routine that streams quote packets
# to trio over the ``to_trio`` memory channel.
to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
def push(t):
"""Push quotes to trio task.
"""
# log.debug(t)
try:
to_trio.send_nowait(t)
except trio.BrokenResourceError:
# XXX: eventkit's ``Event.emit()`` for whatever redic
# reason will catch and ignore regular exceptions
# resulting in tracebacks spammed to console..
# Manually do the dereg ourselves.
ticker.updateEvent.disconnect(push)
log.error(f"Disconnected stream for `{symbol}`")
client.ib.cancelMktData(contract)
# decouple broadcast mem chan
_quote_streams.pop(symbol, None)
ticker.updateEvent.connect(push)
return from_aio
async def start_aio_quote_stream(
symbol: str,
contract: Optional[Contract] = None,
) -> trio.abc.ReceiveStream:
global _quote_streams
from_aio = _quote_streams.get(symbol)
if from_aio:
# if we already have a cached feed deliver a rx side clone to consumer
return from_aio.clone()
else:
from_aio = await tractor.to_asyncio.run_task(
_setup_quote_stream,
symbol=symbol,
contract=contract,
)
# cache feed for later consumers
_quote_streams[symbol] = from_aio
return from_aio
async def stream_quotes( async def stream_quotes(
ctx: tractor.Context,
send_chan: trio.abc.SendChannel,
symbols: List[str], symbols: List[str],
shm_token: Tuple[str, str, List[tuple]], shm: ShmArray,
feed_is_live: trio.Event,
loglevel: str = None, loglevel: str = None,
# compat for @tractor.msg.pub # startup sync
topics: Any = None, task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED,
get_topics: Callable = None,
) -> AsyncIterator[Dict[str, Any]]: ) -> None:
"""Stream symbol quotes. """Stream symbol quotes.
This is a ``trio`` callable routine meant to be invoked This is a ``trio`` callable routine meant to be invoked
once the brokerd is up. once the brokerd is up.
""" """
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
# TODO: support multiple subscriptions # TODO: support multiple subscriptions
sym = symbols[0] sym = symbols[0]
@ -934,63 +963,9 @@ async def stream_quotes(
symbol=sym, symbol=sym,
) )
stream = await _trio_run_client_method( stream = await start_aio_quote_stream(symbol=sym, contract=contract)
method='stream_ticker',
contract=contract, # small speedup
symbol=sym,
)
shm = None
async with trio.open_nursery() as ln:
# check if a writer already is alive in a streaming task,
# otherwise start one and mark it as now existing
key = shm_token['shm_name']
writer_already_exists = _local_buffer_writers.get(key, False)
# maybe load historical ohlcv in to shared mem
# check if shm has already been created by previous
# feed initialization
if not writer_already_exists:
_local_buffer_writers[key] = True
shm = attach_shm_array(
token=shm_token,
# we are the buffer writer
readonly=False,
)
# async def retrieve_and_push():
start = time.time()
bars, bars_array = await _trio_run_client_method(
method='bars',
symbol=sym,
)
log.info(f"bars_array request: {time.time() - start}")
if bars_array is None:
raise SymbolNotFound(sym)
# write historical data to buffer
shm.push(bars_array)
shm_token = shm.token
# TODO: generalize this for other brokers
# start bar filler task in bg
ln.start_soon(fill_bars, sym, bars, shm)
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
subscribe_ohlc_for_increment(shm, delay_s)
# pass back some symbol info like min_tick, trading_hours, etc. # pass back some symbol info like min_tick, trading_hours, etc.
# con = asdict(contract)
# syminfo = contract
syminfo = asdict(details) syminfo = asdict(details)
syminfo.update(syminfo['contract']) syminfo.update(syminfo['contract'])
@ -1011,12 +986,9 @@ async def stream_quotes(
# pass back token, and bool, signalling if we're the writer # pass back token, and bool, signalling if we're the writer
# and that history has been written # and that history has been written
sym: { sym: {
'is_shm_writer': not writer_already_exists,
'shm_token': shm_token,
'symbol_info': syminfo, 'symbol_info': syminfo,
} }
} }
await ctx.send_yield(init_msgs)
# check for special contract types # check for special contract types
if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex):
@ -1028,7 +1000,6 @@ async def stream_quotes(
# no real volume so we have to calculate the price # no real volume so we have to calculate the price
suffix = 'secType' suffix = 'secType'
calc_price = True calc_price = True
# ticker = first_ticker
# pass first quote asap # pass first quote asap
quote = normalize(first_ticker, calc_price=calc_price) quote = normalize(first_ticker, calc_price=calc_price)
@ -1038,26 +1009,21 @@ async def stream_quotes(
first_quote = {topic: quote} first_quote = {topic: quote}
# yield first quote asap
await ctx.send_yield(first_quote)
# ticker.ticks = []
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash) # (ahem, ib_insync is stateful trash)
first_ticker.ticks = [] first_ticker.ticks = []
log.debug(f"First ticker received {quote}") log.debug(f"First ticker received {quote}")
task_status.started((init_msgs, first_quote))
if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex):
suffix = 'exchange' suffix = 'exchange'
calc_price = False # should be real volume for contract calc_price = False # should be real volume for contract
# with trio.move_on_after(10) as cs:
# wait for real volume on feed (trading might be closed) # wait for real volume on feed (trading might be closed)
async with aclosing(stream): async with aclosing(stream):
async for ticker in stream: async for ticker in stream:
# for a real volume contract we rait for the first # for a real volume contract we rait for the first
@ -1072,42 +1038,13 @@ async def stream_quotes(
# (ahem, ib_insync is truly stateful trash) # (ahem, ib_insync is truly stateful trash)
ticker.ticks = [] ticker.ticks = []
# tell incrementer task it can start
_buffer.shm_incrementing(key).set()
# XXX: this works because we don't use # XXX: this works because we don't use
# ``aclosing()`` above? # ``aclosing()`` above?
break break
# enter stream loop # tell caller quotes are now coming in live
try: feed_is_live.set()
await stream_and_write(
stream=stream,
calc_price=calc_price,
topic=topic,
writer_already_exists=writer_already_exists,
shm=shm,
suffix=suffix,
ctx=ctx,
)
finally:
if not writer_already_exists:
_local_buffer_writers[key] = False
async def stream_and_write(
stream,
calc_price: bool,
topic: str,
writer_already_exists: bool,
suffix: str,
ctx: tractor.Context,
shm: Optional['SharedArray'], # noqa
) -> None:
"""Core quote streaming and shm writing loop; optimize for speed!
"""
# real-time stream
async for ticker in stream: async for ticker in stream:
# print(ticker.vwap) # print(ticker.vwap)
@ -1115,54 +1052,12 @@ async def stream_and_write(
ticker, ticker,
calc_price=calc_price calc_price=calc_price
) )
quote['symbol'] = topic
# TODO: in theory you can send the IPC msg *before*
# writing to the sharedmem array to decrease latency,
# however, that will require `tractor.msg.pub` support
# here or at least some way to prevent task switching
# at the yield such that the array write isn't delayed
# while another consumer is serviced..
# if we are the lone tick writer start writing
# the buffer with appropriate trade data
if not writer_already_exists:
for tick in iterticks(quote, types=('trade', 'utrade',)):
last = tick['price']
# print(f"{quote['symbol']}: {tick}")
# update last entry
# benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][
['open', 'high', 'low', 'volume']
]
new_v = tick.get('size', 0)
if v == 0 and new_v:
# no trades for this bar yet so the open
# is also the close/last trade price
o = last
shm.array[[
'open',
'high',
'low',
'close',
'volume',
]][-1] = (
o,
max(high, last),
min(low, last),
last,
v + new_v,
)
con = quote['contract'] con = quote['contract']
topic = '.'.join((con['symbol'], con[suffix])).lower() topic = '.'.join((con['symbol'], con[suffix])).lower()
quote['symbol'] = topic quote['symbol'] = topic
await ctx.send_yield({topic: quote}) await send_chan.send({topic: quote})
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
ticker.ticks = [] ticker.ticks = []
@ -1170,10 +1065,18 @@ async def stream_and_write(
def pack_position(pos: Position) -> Dict[str, Any]: def pack_position(pos: Position) -> Dict[str, Any]:
con = pos.contract con = pos.contract
if isinstance(con, Option):
# TODO: option symbol parsing and sane display:
symbol = con.localSymbol.replace(' ', '')
else:
symbol = con.symbol
return { return {
'broker': 'ib', 'broker': 'ib',
'account': pos.account, 'account': pos.account,
'symbol': con.symbol, 'symbol': symbol,
'currency': con.currency, 'currency': con.currency,
'size': float(pos.position), 'size': float(pos.position),
'avg_price': float(pos.avgCost) / float(con.multiplier or 1.0), 'avg_price': float(pos.avgCost) / float(con.multiplier or 1.0),

View File

@ -16,15 +16,17 @@
""" """
Kraken backend. Kraken backend.
""" """
from contextlib import asynccontextmanager, AsyncExitStack from contextlib import asynccontextmanager, AsyncExitStack
from dataclasses import asdict, field from dataclasses import asdict, field
from types import ModuleType from types import ModuleType
from typing import List, Dict, Any, Tuple, Optional from typing import List, Dict, Any, Tuple
import json import json
import time import time
import trio_websocket import trio_websocket
from trio_typing import TaskStatus
from trio_websocket._impl import ( from trio_websocket._impl import (
ConnectionClosed, ConnectionClosed,
DisconnectionTimeout, DisconnectionTimeout,
@ -41,15 +43,11 @@ import tractor
from pydantic.dataclasses import dataclass from pydantic.dataclasses import dataclass
from pydantic import BaseModel from pydantic import BaseModel
from .api import open_cached_client
from ._util import resproc, SymbolNotFound, BrokerError from ._util import resproc, SymbolNotFound, BrokerError
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data import ( from ..data import ShmArray
_buffer,
# iterticks,
attach_shm_array,
get_shm_token,
subscribe_ohlc_for_increment,
)
log = get_logger(__name__) log = get_logger(__name__)
@ -315,6 +313,7 @@ def normalize(
quote['brokerd_ts'] = time.time() quote['brokerd_ts'] = time.time()
quote['symbol'] = quote['pair'] = quote['pair'].replace('/', '') quote['symbol'] = quote['pair'] = quote['pair'].replace('/', '')
quote['last'] = quote['close'] quote['last'] = quote['close']
quote['bar_wap'] = ohlc.vwap
# seriously eh? what's with this non-symmetry everywhere # seriously eh? what's with this non-symmetry everywhere
# in subscription systems... # in subscription systems...
@ -426,17 +425,37 @@ async def open_autorecon_ws(url):
await stack.aclose() await stack.aclose()
# @tractor.msg.pub async def backfill_bars(
sym: str,
shm: ShmArray, # type: ignore # noqa
count: int = 10, # NOTE: any more and we'll overrun the underlying buffer
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Fill historical bars into shared mem / storage afap.
"""
with trio.CancelScope() as cs:
async with open_cached_client('kraken') as client:
bars = await client.bars(symbol=sym)
shm.push(bars)
task_status.started(cs)
async def stream_quotes( async def stream_quotes(
# get_topics: Callable,
shm_token: Tuple[str, str, List[tuple]], send_chan: trio.abc.SendChannel,
symbols: List[str] = ['XBTUSD', 'XMRUSD'], symbols: List[str],
# These are the symbols not expected by the ws api shm: ShmArray,
# they are looked up inside this routine. feed_is_live: trio.Event,
sub_type: str = 'ohlc',
loglevel: str = None, loglevel: str = None,
# compat with eventual ``tractor.msg.pub``
topics: Optional[List[str]] = None, # backend specific
sub_type: str = 'ohlc',
# startup sync
task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
"""Subscribe for ohlc stream of quotes for ``pairs``. """Subscribe for ohlc stream of quotes for ``pairs``.
@ -447,7 +466,8 @@ async def stream_quotes(
ws_pairs = {} ws_pairs = {}
sym_infos = {} sym_infos = {}
async with get_client() as client:
async with open_cached_client('kraken') as client, send_chan as send_chan:
# keep client cached for real-time section # keep client cached for real-time section
for sym in symbols: for sym in symbols:
@ -458,40 +478,16 @@ async def stream_quotes(
sym_infos[sym] = syminfo sym_infos[sym] = syminfo
ws_pairs[sym] = si.wsname ws_pairs[sym] = si.wsname
# maybe load historical ohlcv in to shared mem
# check if shm has already been created by previous
# feed initialization
writer_exists = get_shm_token(shm_token['shm_name'])
symbol = symbols[0] symbol = symbols[0]
if not writer_exists:
shm = attach_shm_array(
token=shm_token,
# we are writer
readonly=False,
)
bars = await client.bars(symbol=symbol)
shm.push(bars)
shm_token = shm.token
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
subscribe_ohlc_for_increment(shm, delay_s)
# yield shm_token, not writer_exists
init_msgs = { init_msgs = {
# pass back token, and bool, signalling if we're the writer # pass back token, and bool, signalling if we're the writer
# and that history has been written # and that history has been written
symbol: { symbol: {
'is_shm_writer': not writer_exists,
'shm_token': shm_token,
'symbol_info': sym_infos[sym], 'symbol_info': sym_infos[sym],
'shm_write_opts': {'sum_tick_vml': False},
},
} }
# for sym in symbols
}
yield init_msgs
async with open_autorecon_ws('wss://ws.kraken.com/') as ws: async with open_autorecon_ws('wss://ws.kraken.com/') as ws:
@ -521,15 +517,16 @@ async def stream_quotes(
# pull a first quote and deliver # pull a first quote and deliver
msg_gen = stream_messages(ws) msg_gen = stream_messages(ws)
# TODO: use ``anext()`` when it lands in 3.10!
typ, ohlc_last = await msg_gen.__anext__() typ, ohlc_last = await msg_gen.__anext__()
topic, quote = normalize(ohlc_last) topic, quote = normalize(ohlc_last)
# packetize as {topic: quote} first_quote = {topic: quote}
yield {topic: quote} task_status.started((init_msgs, first_quote))
# tell incrementer task it can start # lol, only "closes" when they're margin squeezing clients ;P
_buffer.shm_incrementing(shm_token['shm_name']).set() feed_is_live.set()
# keep start of last interval for volume tracking # keep start of last interval for volume tracking
last_interval_start = ohlc_last.etime last_interval_start = ohlc_last.etime
@ -546,15 +543,18 @@ async def stream_quotes(
# https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m
volume = ohlc.volume volume = ohlc.volume
# new interval # new OHLC sample interval
if ohlc.etime > last_interval_start: if ohlc.etime > last_interval_start:
last_interval_start = ohlc.etime last_interval_start = ohlc.etime
tick_volume = volume tick_volume = volume
else: else:
# this is the tick volume *within the interval* # this is the tick volume *within the interval*
tick_volume = volume - ohlc_last.volume tick_volume = volume - ohlc_last.volume
ohlc_last = ohlc
last = ohlc.close last = ohlc.close
if tick_volume: if tick_volume:
ohlc.ticks.append({ ohlc.ticks.append({
'type': 'trade', 'type': 'trade',
@ -564,43 +564,10 @@ async def stream_quotes(
topic, quote = normalize(ohlc) topic, quote = normalize(ohlc)
# if we are the lone tick writer start writing
# the buffer with appropriate trade data
if not writer_exists:
# update last entry
# benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][
['open', 'high', 'low', 'volume']
]
new_v = tick_volume
if v == 0 and new_v:
# no trades for this bar yet so the open
# is also the close/last trade price
o = last
# write shm
shm.array[
['open',
'high',
'low',
'close',
'bar_wap', # in this case vwap of bar
'volume']
][-1] = (
o,
max(high, last),
min(low, last),
last,
ohlc.vwap,
volume,
)
ohlc_last = ohlc
elif typ == 'l1': elif typ == 'l1':
quote = ohlc quote = ohlc
topic = quote['symbol'] topic = quote['symbol']
# XXX: format required by ``tractor.msg.pub`` # XXX: format required by ``tractor.msg.pub``
# requires a ``Dict[topic: str, quote: dict]`` # requires a ``Dict[topic: str, quote: dict]``
yield {topic: quote} await send_chan.send({topic: quote})

View File

@ -1180,6 +1180,11 @@ def normalize(
return new return new
# TODO: currently this backend uses entirely different
# data feed machinery that was written earlier then the
# existing stuff used in other backends. This needs to
# be ported eventually and should *just work* despite
# being a multi-symbol, poll-style feed system.
@tractor.stream @tractor.stream
async def stream_quotes( async def stream_quotes(
ctx: tractor.Context, # marks this as a streaming func ctx: tractor.Context, # marks this as a streaming func
@ -1192,7 +1197,7 @@ async def stream_quotes(
# XXX: required to propagate ``tractor`` loglevel to piker logging # XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel) get_console_log(loglevel)
async with api.get_cached_client('questrade') as client: async with api.open_cached_client('questrade') as client:
if feed_type == 'stock': if feed_type == 'stock':
formatter = format_stock_quote formatter = format_stock_quote
get_quotes = await stock_quoter(client, symbols) get_quotes = await stock_quoter(client, symbols)

View File

@ -181,6 +181,7 @@ async def maybe_open_emsd(
async with tractor.find_actor('pikerd') as portal: async with tractor.find_actor('pikerd') as portal:
assert portal assert portal
name = await portal.run( name = await portal.run(
spawn_emsd, spawn_emsd,
brokername=brokername, brokername=brokername,
@ -190,7 +191,6 @@ async def maybe_open_emsd(
yield portal yield portal
@asynccontextmanager @asynccontextmanager
async def open_ems( async def open_ems(
broker: str, broker: str,
@ -247,4 +247,13 @@ async def open_ems(
with trio.fail_after(10): with trio.fail_after(10):
await book._ready_to_receive.wait() await book._ready_to_receive.wait()
try:
yield book, trades_stream yield book, trades_stream
finally:
# TODO: we want to eventually keep this up (by having
# the exec loop keep running in the pikerd tree) but for
# now we have to kill the context to avoid backpressure
# build-up on the shm write loop.
with trio.CancelScope(shield=True):
await trades_stream.aclose()

View File

@ -40,7 +40,11 @@ log = get_logger(__name__)
# TODO: numba all of this # TODO: numba all of this
def mk_check(trigger_price, known_last) -> Callable[[float, float], bool]: def mk_check(
trigger_price: float,
known_last: float,
action: str,
) -> Callable[[float, float], bool]:
"""Create a predicate for given ``exec_price`` based on last known """Create a predicate for given ``exec_price`` based on last known
price, ``known_last``. price, ``known_last``.
@ -68,7 +72,7 @@ def mk_check(trigger_price, known_last) -> Callable[[float, float], bool]:
return check_lt return check_lt
else: else:
return None, None return None
@dataclass @dataclass
@ -230,6 +234,7 @@ async def execute_triggers(
async def exec_loop( async def exec_loop(
ctx: tractor.Context, ctx: tractor.Context,
feed: 'Feed', # noqa
broker: str, broker: str,
symbol: str, symbol: str,
_exec_mode: str, _exec_mode: str,
@ -239,11 +244,6 @@ async def exec_loop(
to brokers. to brokers.
""" """
async with data.open_feed(
broker,
[symbol],
loglevel='info',
) as feed:
# TODO: get initial price quote from target broker # TODO: get initial price quote from target broker
first_quote = await feed.receive() first_quote = await feed.receive()
@ -268,6 +268,7 @@ async def exec_loop(
*trio.open_memory_channel(100), *trio.open_memory_channel(100),
_buys={}, _buys={},
_sells={}, _sells={},
_reqids={}, _reqids={},
) )
@ -284,9 +285,7 @@ async def exec_loop(
# return control to parent task # return control to parent task
task_status.started((first_quote, feed, client)) task_status.started((first_quote, feed, client))
# shield this field so the remote brokerd does not get cancelled
stream = feed.stream stream = feed.stream
with stream.shield():
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
n.start_soon( n.start_soon(
execute_triggers, execute_triggers,
@ -299,7 +298,9 @@ async def exec_loop(
) )
if _exec_mode == 'paper': if _exec_mode == 'paper':
n.start_soon(simulate_fills, stream.clone(), client) # TODO: make this an actual broadcast channels as in:
# https://github.com/python-trio/trio/issues/987
n.start_soon(simulate_fills, stream, client)
# TODO: lots of cases still to handle # TODO: lots of cases still to handle
@ -512,7 +513,6 @@ async def process_order_cmds(
exec_mode = cmd['exec_mode'] exec_mode = cmd['exec_mode']
broker = brokers[0] broker = brokers[0]
last = dark_book.lasts[(broker, sym)]
if exec_mode == 'live' and action in ('buy', 'sell',): if exec_mode == 'live' and action in ('buy', 'sell',):
@ -557,9 +557,10 @@ async def process_order_cmds(
# price received from the feed, instead of being # price received from the feed, instead of being
# like every other shitty tina platform that makes # like every other shitty tina platform that makes
# the user choose the predicate operator. # the user choose the predicate operator.
pred = mk_check(trigger_price, last) last = dark_book.lasts[(broker, sym)]
pred = mk_check(trigger_price, last, action)
tick_slap: float = 5 spread_slap: float = 5
min_tick = feed.symbols[sym].tick_size min_tick = feed.symbols[sym].tick_size
if action == 'buy': if action == 'buy':
@ -569,12 +570,12 @@ async def process_order_cmds(
# TODO: we probably need to scale this based # TODO: we probably need to scale this based
# on some near term historical spread # on some near term historical spread
# measure? # measure?
abs_diff_away = tick_slap * min_tick abs_diff_away = spread_slap * min_tick
elif action == 'sell': elif action == 'sell':
tickfilter = ('bid', 'last', 'trade') tickfilter = ('bid', 'last', 'trade')
percent_away = -0.005 percent_away = -0.005
abs_diff_away = -tick_slap * min_tick abs_diff_away = -spread_slap * min_tick
else: # alert else: # alert
tickfilter = ('trade', 'utrade', 'last') tickfilter = ('trade', 'utrade', 'last')
@ -647,11 +648,17 @@ async def _emsd_main(
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
# TODO: eventually support N-brokers # TODO: eventually support N-brokers
async with data.open_feed(
broker,
[symbol],
loglevel='info',
) as feed:
# start the condition scan loop # start the condition scan loop
quote, feed, client = await n.start( quote, feed, client = await n.start(
exec_loop, exec_loop,
ctx, ctx,
feed,
broker, broker,
symbol, symbol,
_mode, _mode,

View File

@ -20,20 +20,29 @@ Data feed apis and infra.
We provide tsdb integrations for retrieving We provide tsdb integrations for retrieving
and storing data from your brokers as well as and storing data from your brokers as well as
sharing your feeds with other fellow pikers. sharing your feeds with other fellow pikers.
""" """
from dataclasses import dataclass, field from dataclasses import dataclass, field
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from functools import partial
from importlib import import_module from importlib import import_module
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
Dict, Any, Sequence, AsyncIterator, Optional Dict, Any, Sequence,
AsyncIterator, Optional,
List
) )
import trio
from trio_typing import TaskStatus
import tractor import tractor
from pydantic import BaseModel
from ..brokers import get_brokermod from ..brokers import get_brokermod
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from .._daemon import spawn_brokerd, maybe_open_pikerd from .._daemon import (
maybe_spawn_brokerd,
)
from ._normalize import iterticks from ._normalize import iterticks
from ._sharedmem import ( from ._sharedmem import (
maybe_open_shm_array, maybe_open_shm_array,
@ -43,9 +52,12 @@ from ._sharedmem import (
get_shm_token, get_shm_token,
) )
from ._source import base_iohlc_dtype, Symbol from ._source import base_iohlc_dtype, Symbol
from ._buffer import ( from ._sampling import (
_shms,
_incrementers,
increment_ohlc_buffer, increment_ohlc_buffer,
subscribe_ohlc_for_increment iter_ohlc_periods,
sample_and_broadcast,
) )
__all__ = [ __all__ = [
@ -54,7 +66,7 @@ __all__ = [
'attach_shm_array', 'attach_shm_array',
'open_shm_array', 'open_shm_array',
'get_shm_token', 'get_shm_token',
'subscribe_ohlc_for_increment', # 'subscribe_ohlc_for_increment',
] ]
@ -74,57 +86,229 @@ def get_ingestormod(name: str) -> ModuleType:
return module return module
@asynccontextmanager class _FeedsBus(BaseModel):
async def maybe_spawn_brokerd( """Data feeds broadcaster and persistence management.
brokername: str,
loglevel: Optional[str] = None,
# XXX: you should pretty much never want debug mode This is a brokerd side api used to manager persistent real-time
# for data daemons when running in production. streams that can be allocated and left alive indefinitely.
debug_mode: bool = True,
) -> tractor._portal.Portal:
"""If no ``brokerd.{brokername}`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it.
""" """
if loglevel: brokername: str
get_console_log(loglevel) nursery: trio.Nursery
feeds: Dict[str, trio.CancelScope] = {}
subscribers: Dict[str, List[tractor.Context]] = {}
task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
dname = f'brokerd.{brokername}' class Config:
async with tractor.find_actor(dname) as portal: arbitrary_types_allowed = True
# WTF: why doesn't this work? async def cancel_all(self) -> None:
if portal is not None: for sym, (cs, msg, quote) in self.feeds.items():
yield portal log.debug(f'Cancelling cached feed for {self.brokername}:{sym}')
cs.cancel()
else:
# ask root ``pikerd`` daemon to spawn the daemon we need if
# pikerd is not live we now become the root of the
# process tree
async with maybe_open_pikerd(
loglevel=loglevel
) as pikerd_portal:
if pikerd_portal is None: _bus: _FeedsBus = None
# we are root so spawn brokerd directly in our tree
# the root nursery is accessed through process global state
await spawn_brokerd(brokername, loglevel=loglevel)
else:
await pikerd_portal.run( def get_feed_bus(
spawn_brokerd, brokername: str,
nursery: Optional[trio.Nursery] = None,
) -> _FeedsBus:
"""
Retreive broker-daemon-local data feeds bus from process global
scope. Serialize task access to lock.
"""
global _bus
if nursery is not None:
assert _bus is None, "Feeds manager is already setup?"
# this is initial setup by parent actor
_bus = _FeedsBus(
brokername=brokername, brokername=brokername,
loglevel=loglevel, nursery=nursery,
debug_mode=debug_mode, )
assert not _bus.feeds
assert _bus.brokername == brokername, "Uhhh wtf"
return _bus
async def _setup_persistent_brokerd(brokername: str) -> None:
"""Allocate a actor-wide service nursery in ``brokerd``
such that feeds can be run in the background persistently by
the broker backend as needed.
"""
try:
async with trio.open_nursery() as service_nursery:
# assign a nursery to the feeds bus for spawning
# background tasks from clients
bus = get_feed_bus(brokername, service_nursery)
# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
await trio.sleep_forever()
finally:
# TODO: this needs to be shielded?
await bus.cancel_all()
async def allocate_persistent_feed(
ctx: tractor.Context,
bus: _FeedsBus,
brokername: str,
symbol: str,
loglevel: str,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
try:
mod = get_brokermod(brokername)
except ImportError:
mod = get_ingestormod(brokername)
# allocate shm array for this broker/symbol
# XXX: we should get an error here if one already exists
shm, opened = maybe_open_shm_array(
key=sym_to_shm_key(brokername, symbol),
# use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
# we expect the sub-actor to write
readonly=False,
) )
async with tractor.wait_for_actor(dname) as portal: # do history validation?
yield portal assert opened, f'Persistent shm for {symbol} was already open?!'
# if not opened:
# raise RuntimeError("Persistent shm for sym was already open?!")
send, quote_stream = trio.open_memory_channel(10)
feed_is_live = trio.Event()
# establish broker backend quote stream
# ``stream_quotes()`` is a required backend func
init_msg, first_quote = await bus.nursery.start(
partial(
mod.stream_quotes,
send_chan=send,
feed_is_live=feed_is_live,
symbols=[symbol],
shm=shm,
loglevel=loglevel,
)
)
init_msg[symbol]['shm_token'] = shm.token
cs = bus.nursery.cancel_scope
# TODO: make this into a composed type which also
# contains the backfiller cs for individual super-based
# resspawns when needed.
bus.feeds[symbol] = (cs, init_msg, first_quote)
if opened:
# start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is
# filled with first set of ohlc bars
await bus.nursery.start(mod.backfill_bars, symbol, shm)
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
# pass OHLC sample rate in seconds
init_msg[symbol]['sample_rate'] = delay_s
# yield back control to starting nursery
task_status.started((init_msg, first_quote))
await feed_is_live.wait()
if opened:
_shms.setdefault(delay_s, []).append(shm)
# start shm incrementing for OHLC sampling
if _incrementers.get(delay_s) is None:
cs = await bus.nursery.start(increment_ohlc_buffer, delay_s)
sum_tick_vlm: bool = init_msg.get(
'shm_write_opts', {}
).get('sum_tick_vlm', True)
# start sample loop
await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm)
@tractor.stream
async def attach_feed_bus(
ctx: tractor.Context,
brokername: str,
symbol: str,
loglevel: str,
):
# try:
if loglevel is None:
loglevel = tractor.current_actor().loglevel
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
# ensure we are who we think we are
assert 'brokerd' in tractor.current_actor().name
bus = get_feed_bus(brokername)
async with bus.task_lock:
task_cs = bus.feeds.get(symbol)
sub_only: bool = False
# if no cached feed for this symbol has been created for this
# brokerd yet, start persistent stream and shm writer task in
# service nursery
if task_cs is None:
init_msg, first_quote = await bus.nursery.start(
partial(
allocate_persistent_feed,
ctx=ctx,
bus=bus,
brokername=brokername,
symbol=symbol,
loglevel=loglevel,
)
)
bus.subscribers.setdefault(symbol, []).append(ctx)
else:
sub_only = True
# XXX: ``first_quote`` may be outdated here if this is secondary
# subscriber
cs, init_msg, first_quote = bus.feeds[symbol]
# send this even to subscribers to existing feed?
await ctx.send_yield(init_msg)
await ctx.send_yield(first_quote)
if sub_only:
bus.subscribers[symbol].append(ctx)
try:
await trio.sleep_forever()
finally:
bus.subscribers[symbol].remove(ctx)
@dataclass @dataclass
class Feed: class Feed:
"""A data feed for client-side interaction with far-process """A data feed for client-side interaction with far-process# }}}
real-time data sources. real-time data sources.
This is an thin abstraction on top of ``tractor``'s portals for This is an thin abstraction on top of ``tractor``'s portals for
@ -135,10 +319,11 @@ class Feed:
stream: AsyncIterator[Dict[str, Any]] stream: AsyncIterator[Dict[str, Any]]
shm: ShmArray shm: ShmArray
mod: ModuleType mod: ModuleType
# ticks: ShmArray
_brokerd_portal: tractor._portal.Portal _brokerd_portal: tractor._portal.Portal
_index_stream: Optional[AsyncIterator[int]] = None _index_stream: Optional[AsyncIterator[int]] = None
_trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None _trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None
_max_sample_rate: int = 0
# cache of symbol info messages received as first message when # cache of symbol info messages received as first message when
# a stream startsc. # a stream startsc.
@ -147,15 +332,19 @@ class Feed:
async def receive(self) -> dict: async def receive(self) -> dict:
return await self.stream.__anext__() return await self.stream.__anext__()
async def index_stream(self) -> AsyncIterator[int]: async def index_stream(
self,
delay_s: Optional[int] = None
) -> AsyncIterator[int]:
if not self._index_stream: if not self._index_stream:
# XXX: this should be singleton on a host, # XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be # a lone broker-daemon per provider should be
# created for all practical purposes # created for all practical purposes
self._index_stream = await self._brokerd_portal.run( self._index_stream = await self._brokerd_portal.run(
increment_ohlc_buffer, iter_ohlc_periods,
shm_token=self.shm.token, delay_s=delay_s or self._max_sample_rate,
topics=['index'],
) )
return self._index_stream return self._index_stream
@ -214,40 +403,29 @@ async def open_feed(
# TODO: do all! # TODO: do all!
sym = symbols[0] sym = symbols[0]
# Attempt to allocate (or attach to) shm array for this broker/symbol
shm, opened = maybe_open_shm_array(
key=sym_to_shm_key(brokername, sym),
# use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
# we expect the sub-actor to write
readonly=True,
)
async with maybe_spawn_brokerd( async with maybe_spawn_brokerd(
brokername, brokername,
loglevel=loglevel, loglevel=loglevel,
# TODO: add a cli flag for this
# debug_mode=False,
) as portal: ) as portal:
stream = await portal.run( stream = await portal.run(
mod.stream_quotes, attach_feed_bus,
brokername=brokername,
# TODO: actually handy multiple symbols... symbol=sym,
symbols=symbols,
shm_token=shm.token,
# compat with eventual ``tractor.msg.pub``
topics=symbols,
loglevel=loglevel, loglevel=loglevel,
) )
# TODO: can we make this work better with the proposed
# context based bidirectional streaming style api proposed in:
# https://github.com/goodboy/tractor/issues/53
init_msg = await stream.receive()
# we can only read from shm
shm = attach_shm_array(
token=init_msg[sym]['shm_token'],
readonly=True,
)
feed = Feed( feed = Feed(
name=brokername, name=brokername,
stream=stream, stream=stream,
@ -255,15 +433,12 @@ async def open_feed(
mod=mod, mod=mod,
_brokerd_portal=portal, _brokerd_portal=portal,
) )
ohlc_sample_rates = []
# TODO: we can't do this **and** be compate with
# ``tractor.msg.pub``, should we maybe just drop this after
# tests are in?
init_msg = await stream.receive()
for sym, data in init_msg.items(): for sym, data in init_msg.items():
si = data['symbol_info'] si = data['symbol_info']
ohlc_sample_rates.append(data['sample_rate'])
symbol = Symbol( symbol = Symbol(
key=sym, key=sym,
@ -275,12 +450,17 @@ async def open_feed(
feed.symbols[sym] = symbol feed.symbols[sym] = symbol
# cast shm dtype to list... can't member why we need this
shm_token = data['shm_token'] shm_token = data['shm_token']
if opened:
assert data['is_shm_writer']
log.info("Started shared mem bar writer")
shm_token['dtype_descr'] = list(shm_token['dtype_descr']) shm_token['dtype_descr'] = list(shm_token['dtype_descr'])
assert shm_token == shm.token # sanity assert shm_token == shm.token # sanity
feed._max_sample_rate = max(ohlc_sample_rates)
try:
yield feed yield feed
finally:
# always cancel the far end producer task
with trio.CancelScope(shield=True):
await stream.aclose()

View File

@ -1,115 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Data buffers for fast shared humpy.
"""
from typing import Tuple, Callable, Dict
# import time
import tractor
import trio
from ._sharedmem import ShmArray
_shms: Dict[int, ShmArray] = {}
_start_increment: Dict[str, trio.Event] = {}
def shm_incrementing(shm_token_name: str) -> trio.Event:
global _start_increment
return _start_increment.setdefault(shm_token_name, trio.Event())
@tractor.msg.pub
async def increment_ohlc_buffer(
shm_token: dict,
get_topics: Callable[..., Tuple[str]],
# delay_s: Optional[float] = None,
):
"""Task which inserts new bars into the provide shared memory array
every ``delay_s`` seconds.
This task fulfills 2 purposes:
- it takes the subscribed set of shm arrays and increments them
on a common time period
- broadcast of this increment "signal" message to other actor
subscribers
Note that if **no** actor has initiated this task then **none** of
the underlying buffers will actually be incremented.
"""
# wait for brokerd to signal we should start sampling
await shm_incrementing(shm_token['shm_name']).wait()
# TODO: right now we'll spin printing bars if the last time stamp is
# before a large period of no market activity. Likely the best way
# to solve this is to make this task aware of the instrument's
# tradable hours?
# adjust delay to compensate for trio processing time
ad = min(_shms.keys()) - 0.001
total_s = 0 # total seconds counted
lowest = min(_shms.keys())
ad = lowest - 0.001
while True:
# TODO: do we want to support dynamically
# adding a "lower" lowest increment period?
await trio.sleep(ad)
total_s += lowest
# increment all subscribed shm arrays
# TODO: this in ``numba``
for delay_s, shms in _shms.items():
if total_s % delay_s != 0:
continue
# TODO: numa this!
for shm in shms:
# TODO: in theory we could make this faster by copying the
# "last" readable value into the underlying larger buffer's
# next value and then incrementing the counter instead of
# using ``.push()``?
# append new entry to buffer thus "incrementing" the bar
array = shm.array
last = array[-1:][shm._write_fields].copy()
# (index, t, close) = last[0][['index', 'time', 'close']]
(t, close) = last[0][['time', 'close']]
# this copies non-std fields (eg. vwap) from the last datum
last[
['time', 'volume', 'open', 'high', 'low', 'close']
][0] = (t + delay_s, 0, close, close, close, close)
# write to the buffer
shm.push(last)
# broadcast the buffer index step
yield {'index': shm._last.value}
def subscribe_ohlc_for_increment(
shm: ShmArray,
delay: int,
) -> None:
"""Add an OHLC ``ShmArray`` to the increment set.
"""
_shms.setdefault(delay, []).append(shm)

View File

@ -0,0 +1,240 @@
# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Data buffers for fast shared humpy.
"""
from typing import Dict, List
import tractor
import trio
from trio_typing import TaskStatus
from ._sharedmem import ShmArray
from ..log import get_logger
log = get_logger(__name__)
# TODO: we could stick these in a composed type to avoid
# angering the "i hate module scoped variables crowd" (yawn).
_shms: Dict[int, List[ShmArray]] = {}
_start_increment: Dict[str, trio.Event] = {}
_incrementers: Dict[int, trio.CancelScope] = {}
_subscribers: Dict[str, tractor.Context] = {}
def shm_incrementing(shm_token_name: str) -> trio.Event:
global _start_increment
return _start_increment.setdefault(shm_token_name, trio.Event())
async def increment_ohlc_buffer(
delay_s: int,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
):
"""Task which inserts new bars into the provide shared memory array
every ``delay_s`` seconds.
This task fulfills 2 purposes:
- it takes the subscribed set of shm arrays and increments them
on a common time period
- broadcast of this increment "signal" message to other actor
subscribers
Note that if **no** actor has initiated this task then **none** of
the underlying buffers will actually be incremented.
"""
# # wait for brokerd to signal we should start sampling
# await shm_incrementing(shm_token['shm_name']).wait()
# TODO: right now we'll spin printing bars if the last time stamp is
# before a large period of no market activity. Likely the best way
# to solve this is to make this task aware of the instrument's
# tradable hours?
global _incrementers
# adjust delay to compensate for trio processing time
ad = min(_shms.keys()) - 0.001
total_s = 0 # total seconds counted
lowest = min(_shms.keys())
ad = lowest - 0.001
with trio.CancelScope() as cs:
# register this time period step as active
_incrementers[delay_s] = cs
task_status.started(cs)
while True:
# TODO: do we want to support dynamically
# adding a "lower" lowest increment period?
await trio.sleep(ad)
total_s += lowest
# increment all subscribed shm arrays
# TODO: this in ``numba``
for delay_s, shms in _shms.items():
if total_s % delay_s != 0:
continue
# TODO: ``numba`` this!
for shm in shms:
# TODO: in theory we could make this faster by copying the
# "last" readable value into the underlying larger buffer's
# next value and then incrementing the counter instead of
# using ``.push()``?
# append new entry to buffer thus "incrementing" the bar
array = shm.array
last = array[-1:][shm._write_fields].copy()
# (index, t, close) = last[0][['index', 'time', 'close']]
(t, close) = last[0][['time', 'close']]
# this copies non-std fields (eg. vwap) from the last datum
last[
['time', 'volume', 'open', 'high', 'low', 'close']
][0] = (t + delay_s, 0, close, close, close, close)
# write to the buffer
shm.push(last)
# broadcast the buffer index step
# yield {'index': shm._last.value}
for ctx in _subscribers.get(delay_s, ()):
try:
await ctx.send_yield({'index': shm._last.value})
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
log.error(f'{ctx.chan.uid} dropped connection')
@tractor.stream
async def iter_ohlc_periods(
ctx: tractor.Context,
delay_s: int,
) -> None:
"""
Subscribe to OHLC sampling "step" events: when the time
aggregation period increments, this event stream emits an index
event.
"""
# add our subscription
global _subscribers
subs = _subscribers.setdefault(delay_s, [])
subs.append(ctx)
try:
# stream and block until cancelled
await trio.sleep_forever()
finally:
subs.remove(ctx)
async def sample_and_broadcast(
bus: '_FeedBus', # noqa
shm: ShmArray,
quote_stream: trio.abc.ReceiveChannel,
sum_tick_vlm: bool = True,
) -> None:
log.info("Started shared mem bar writer")
# iterate stream delivered by broker
async for quotes in quote_stream:
# TODO: ``numba`` this!
for sym, quote in quotes.items():
# TODO: in theory you can send the IPC msg *before*
# writing to the sharedmem array to decrease latency,
# however, that will require `tractor.msg.pub` support
# here or at least some way to prevent task switching
# at the yield such that the array write isn't delayed
# while another consumer is serviced..
# start writing the shm buffer with appropriate
# trade data
for tick in quote['ticks']:
# if tick['type'] in ('utrade',):
# print(tick)
# write trade events to shm last OHLC sample
if tick['type'] in ('trade', 'utrade'):
last = tick['price']
# update last entry
# benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][
['open', 'high', 'low', 'volume']
]
new_v = tick.get('size', 0)
if v == 0 and new_v:
# no trades for this bar yet so the open
# is also the close/last trade price
o = last
if sum_tick_vlm:
volume = v + new_v
else:
# presume backend takes care of summing
# it's own vlm
volume = quote['volume']
shm.array[[
'open',
'high',
'low',
'close',
'bar_wap', # can be optionally provided
'volume',
]][-1] = (
o,
max(high, last),
min(low, last),
last,
quote.get('bar_wap', 0),
volume,
)
# XXX: we need to be very cautious here that no
# context-channel is left lingering which doesn't have
# a far end receiver actor-task. In such a case you can
# end up triggering backpressure which which will
# eventually block this producer end of the feed and
# thus other consumers still attached.
subs = bus.subscribers[sym]
for ctx in subs:
# print(f'sub is {ctx.chan.uid}')
try:
await ctx.send_yield({sym: quote})
except (
trio.BrokenResourceError,
trio.ClosedResourceError
):
subs.remove(ctx)
log.error(f'{ctx.chan.uid} dropped connection')

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers # piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) # Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -16,6 +16,7 @@
""" """
NumPy compatible shared memory buffers for real-time FSP. NumPy compatible shared memory buffers for real-time FSP.
""" """
from dataclasses import dataclass, asdict from dataclasses import dataclass, asdict
from sys import byteorder from sys import byteorder
@ -370,7 +371,7 @@ def attach_shm_array(
key = token.shm_name key = token.shm_name
if key in _known_tokens: if key in _known_tokens:
assert _known_tokens[key] == token, "WTF" assert _Token.from_msg(_known_tokens[key]) == token, "WTF"
# attach to array buffer and view as per dtype # attach to array buffer and view as per dtype
shm = SharedMemory(name=key) shm = SharedMemory(name=key)
@ -426,7 +427,7 @@ def maybe_open_shm_array(
**kwargs, **kwargs,
) -> Tuple[ShmArray, bool]: ) -> Tuple[ShmArray, bool]:
"""Attempt to attach to a shared memory block using a "key" lookup """Attempt to attach to a shared memory block using a "key" lookup
to registered blocks in the users overall "system" registryt to registered blocks in the users overall "system" registry
(presumes you don't have the block's explicit token). (presumes you don't have the block's explicit token).
This function is meant to solve the problem of discovering whether This function is meant to solve the problem of discovering whether

View File

@ -28,7 +28,7 @@ from ..log import get_logger
from .. import data from .. import data
from ._momo import _rsi, _wma from ._momo import _rsi, _wma
from ._volume import _tina_vwap from ._volume import _tina_vwap
from ..data import attach_shm_array, Feed from ..data import attach_shm_array
log = get_logger(__name__) log = get_logger(__name__)
@ -62,23 +62,6 @@ async def latency(
yield value yield value
async def increment_signals(
feed: Feed,
dst_shm: 'SharedArray', # noqa
) -> None:
"""Increment the underlying shared memory buffer on every "increment"
msg received from the underlying data feed.
"""
async for msg in await feed.index_stream():
array = dst_shm.array
last = array[-1:].copy()
# write new slot to the buffer
dst_shm.push(last)
len(dst_shm.array)
@tractor.stream @tractor.stream
async def cascade( async def cascade(
ctx: tractor.Context, ctx: tractor.Context,
@ -150,7 +133,6 @@ async def cascade(
) )
history[fsp_func_name] = history_output history[fsp_func_name] = history_output
# check for data length mis-allignment and fill missing values # check for data length mis-allignment and fill missing values
diff = len(src.array) - len(history) diff = len(src.array) - len(history)
if diff >= 0: if diff >= 0:
@ -182,8 +164,8 @@ async def cascade(
cs = await n.start(fsp_compute) cs = await n.start(fsp_compute)
# Increment the underlying shared memory buffer on every "increment" # Increment the underlying shared memory buffer on every
# msg received from the underlying data feed. # "increment" msg received from the underlying data feed.
async for msg in await feed.index_stream(): async for msg in await feed.index_stream():
@ -198,10 +180,11 @@ async def cascade(
# TODO: adopt an incremental update engine/approach # TODO: adopt an incremental update engine/approach
# where possible here eventually! # where possible here eventually!
# read out last shm row
array = dst.array array = dst.array
last = array[-1:].copy() last = array[-1:].copy()
# write new slot to the buffer # write new row to the shm buffer
dst.push(last) dst.push(last)
last_len = new_len last_len = new_len

View File

@ -138,7 +138,7 @@ class ChartSpace(QtGui.QWidget):
""" """
# XXX: let's see if this causes mem problems # XXX: let's see if this causes mem problems
self.window.setWindowTitle( self.window.setWindowTitle(
f'piker chart {symbol.key}@{symbol.brokers} ' f'{symbol.key}@{symbol.brokers} '
f'tick:{symbol.tick_size}' f'tick:{symbol.tick_size}'
) )

View File

@ -22,6 +22,7 @@ All global Qt runtime settings are mostly defined here.
""" """
from typing import Tuple, Callable, Dict, Any from typing import Tuple, Callable, Dict, Any
import os import os
import platform
import signal import signal
import time import time
import traceback import traceback
@ -87,16 +88,19 @@ def current_screen() -> QtGui.QScreen:
assert screen, "Wow Qt is dumb as shit and has no screen..." assert screen, "Wow Qt is dumb as shit and has no screen..."
return screen return screen
# XXX: pretty sure none of this shit works
# XXX: pretty sure none of this shit works on linux as per:
# https://bugreports.qt.io/browse/QTBUG-53022 # https://bugreports.qt.io/browse/QTBUG-53022
# it seems to work on windows.. no idea wtf is up.
if platform.system() == "Windows":
# Proper high DPI scaling is available in Qt >= 5.6.0. This attibute # Proper high DPI scaling is available in Qt >= 5.6.0. This attibute
# must be set before creating the application # must be set before creating the application
# if hasattr(Qt, 'AA_EnableHighDpiScaling'): if hasattr(Qt, 'AA_EnableHighDpiScaling'):
# QCoreApplication.setAttribute(Qt.AA_EnableHighDpiScaling, True) QCoreApplication.setAttribute(Qt.AA_EnableHighDpiScaling, True)
# if hasattr(Qt, 'AA_UseHighDpiPixmaps'): if hasattr(Qt, 'AA_UseHighDpiPixmaps'):
# QCoreApplication.setAttribute(Qt.AA_UseHighDpiPixmaps, True) QCoreApplication.setAttribute(Qt.AA_UseHighDpiPixmaps, True)
class MainWindow(QtGui.QMainWindow): class MainWindow(QtGui.QMainWindow):
@ -196,7 +200,6 @@ def run_qtractor(
async def main(): async def main():
async with maybe_open_pikerd( async with maybe_open_pikerd(
name='qtractor',
start_method='trio', start_method='trio',
**tractor_kwargs, **tractor_kwargs,
): ):

View File

@ -90,16 +90,11 @@ class LineDot(pg.CurvePoint):
self, self,
ev: QtCore.QEvent, ev: QtCore.QEvent,
) -> None: ) -> None:
# print((ev, type(ev)))
if not isinstance( if not isinstance(
ev, QtCore.QDynamicPropertyChangeEvent ev, QtCore.QDynamicPropertyChangeEvent
) or self.curve() is None: ) or self.curve() is None:
return False return False
# if ev.propertyName() == 'index':
# print(ev)
# # self.setProperty
(x, y) = self.curve().getData() (x, y) = self.curve().getData()
index = self.property('index') index = self.property('index')
# first = self._plot._ohlc[0]['index'] # first = self._plot._ohlc[0]['index']
@ -172,8 +167,6 @@ class ContentsLabel(pg.LabelItem):
if inspect.isfunction(margins[1]): if inspect.isfunction(margins[1]):
margins = margins[0], ydim(anchor_font_size) margins = margins[0], ydim(anchor_font_size)
print(f'margins: {margins}')
self.anchor(itemPos=index, parentPos=index, offset=margins) self.anchor(itemPos=index, parentPos=index, offset=margins)
def update_from_ohlc( def update_from_ohlc(
@ -403,7 +396,6 @@ class Cursor(pg.GraphicsObject):
# update all trackers # update all trackers
for item in self._trackers: for item in self._trackers:
# print(f'setting {item} with {(ix, y)}')
item.on_tracked_source(ix, iy) item.on_tracked_source(ix, iy)
if ix != last_ix: if ix != last_ix:

View File

@ -155,6 +155,8 @@ class LevelLabel(YAxisLabel):
self._h_shift * (w + self._x_offset), self._h_shift * (w + self._x_offset),
abs_pos.y() + self._v_shift * h abs_pos.y() + self._v_shift * h
)) ))
# XXX: definitely need this!
self.update()
def set_fmt_str( def set_fmt_str(
self, self,

View File

@ -148,6 +148,7 @@ def chart(config, symbol, profile):
tractor_kwargs={ tractor_kwargs={
'debug_mode': True, 'debug_mode': True,
'loglevel': tractorloglevel, 'loglevel': tractorloglevel,
'name': 'chart',
'enable_modules': [ 'enable_modules': [
'piker.clearing._client' 'piker.clearing._client'
], ],

View File

@ -321,9 +321,7 @@ async def start_order_mode(
async with open_ems( async with open_ems(
brokername, brokername,
symbol, symbol,
) as (book, trades_stream): ) as (book, trades_stream), open_order_mode(
async with open_order_mode(
symbol, symbol,
chart, chart,
book, book,