Merge pull request #167 from pikers/feed_fixes

Feed fixes
binance_backend
goodboy 2021-04-29 09:00:07 -04:00 committed by GitHub
commit a89da98141
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 570 additions and 458 deletions

View File

@ -43,6 +43,7 @@ _root_modules = [
class Services(BaseModel):
actor_n: tractor._trionics.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
class Config:
arbitrary_types_allowed = True
@ -53,10 +54,16 @@ _services: Optional[Services] = None
@asynccontextmanager
async def open_pikerd(
start_method: str = 'trio',
loglevel: Optional[str] = None,
**kwargs,
# XXX: you should pretty much never want debug mode
# for data daemons when running in production.
debug_mode: bool = False,
) -> Optional[tractor._portal.Portal]:
"""Start a root piker daemon who's lifetime extends indefinitely
"""
Start a root piker daemon who's lifetime extends indefinitely
until cancelled.
A root actor nursery is created which can be used to create and keep
@ -71,18 +78,23 @@ async def open_pikerd(
# passed through to ``open_root_actor``
name=_root_dname,
loglevel=loglevel,
debug_mode=debug_mode,
start_method=start_method,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
# enable_modules=[__name__],
enable_modules=_root_modules,
) as _, tractor.open_nursery() as actor_nursery:
async with trio.open_nursery() as service_nursery:
# assign globally for future daemon/task creation
_services = Services(
actor_n=actor_nursery,
service_n=service_nursery
service_n=service_nursery,
debug_mode=debug_mode,
)
yield _services
@ -93,6 +105,10 @@ async def maybe_open_runtime(
loglevel: Optional[str] = None,
**kwargs,
) -> None:
"""
Start the ``tractor`` runtime (a root actor) if none exists.
"""
if not tractor.current_actor(err_on_no_runtime=False):
async with tractor.open_root_actor(loglevel=loglevel, **kwargs):
yield
@ -123,8 +139,7 @@ async def maybe_open_pikerd(
# presume pikerd role
async with open_pikerd(
loglevel,
**kwargs,
loglevel=loglevel,
) as _:
# in the case where we're starting up the
# tractor-piker runtime stack in **this** process
@ -137,14 +152,17 @@ _data_mods = [
'piker.brokers.core',
'piker.brokers.data',
'piker.data',
'piker.data.feed',
'piker.data._sampling'
]
async def spawn_brokerd(
brokername,
brokername: str,
loglevel: Optional[str] = None,
**tractor_kwargs
**tractor_kwargs,
) -> tractor._portal.Portal:
from .data import _setup_persistent_brokerd
@ -164,6 +182,7 @@ async def spawn_brokerd(
dname,
enable_modules=_data_mods + [brokermod.__name__],
loglevel=loglevel,
debug_mode=_services.debug_mode,
**tractor_kwargs
)
@ -187,14 +206,14 @@ async def spawn_brokerd(
@asynccontextmanager
async def maybe_spawn_brokerd(
brokername: str,
loglevel: Optional[str] = None,
**kwargs,
# XXX: you should pretty much never want debug mode
# for data daemons when running in production.
debug_mode: bool = True,
) -> tractor._portal.Portal:
"""If no ``brokerd.{brokername}`` daemon-actor can be found,
"""
If no ``brokerd.{brokername}`` daemon-actor can be found,
spawn one in a local subactor and return a portal to it.
"""
@ -213,7 +232,8 @@ async def maybe_spawn_brokerd(
# pikerd is not live we now become the root of the
# process tree
async with maybe_open_pikerd(
loglevel=loglevel
loglevel=loglevel,
**kwargs,
) as pikerd_portal:
if pikerd_portal is None:
@ -226,7 +246,6 @@ async def maybe_spawn_brokerd(
spawn_brokerd,
brokername=brokername,
loglevel=loglevel,
debug_mode=debug_mode,
)
async with tractor.wait_for_actor(dname) as portal:
@ -234,11 +253,16 @@ async def maybe_spawn_brokerd(
async def spawn_emsd(
brokername,
brokername: str,
loglevel: Optional[str] = None,
**extra_tractor_kwargs
) -> tractor._portal.Portal:
) -> tractor._portal.Portal:
"""
Start the clearing engine under ``pikerd``.
"""
log.info('Spawning emsd')
# TODO: raise exception when _services == None?
@ -251,6 +275,7 @@ async def spawn_emsd(
'piker.clearing._client',
],
loglevel=loglevel,
debug_mode=_services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs
)
return 'emsd'

View File

@ -30,7 +30,7 @@ import tractor
from ..cli import cli
from .. import watchlists as wl
from ..log import get_console_log, colorize_json, get_logger
from ..data import maybe_spawn_brokerd
from .._daemon import maybe_spawn_brokerd
from ..brokers import core, get_brokermod, data
log = get_logger('cli')

View File

@ -137,8 +137,9 @@ def get_orders(
# TODO: make this a ``tractor.msg.pub``
async def send_order_cmds():
"""Order streaming task: deliver orders transmitted from UI
async def send_order_cmds(symbol_key: str):
"""
Order streaming task: deliver orders transmitted from UI
to downstream consumers.
This is run in the UI actor (usually the one running Qt but could be
@ -160,10 +161,18 @@ async def send_order_cmds():
book._ready_to_receive.set()
async for cmd in orders_stream:
print(cmd)
if cmd['symbol'] == symbol_key:
# send msg over IPC / wire
log.info(f'Send order cmd:\n{pformat(cmd)}')
yield cmd
else:
# XXX BRUTAL HACKZORZES !!!
# re-insert for another consumer
# we need broadcast channelz...asap
# https://github.com/goodboy/tractor/issues/204
book._to_ems.send_nowait(cmd)
@asynccontextmanager

View File

@ -175,7 +175,7 @@ async def execute_triggers(
tuple(execs.items())
):
if (ttype not in tf) or (not pred(price)):
if not pred or (ttype not in tf) or (not pred(price)):
# majority of iterations will be non-matches
continue
@ -675,7 +675,10 @@ async def _emsd_main(
# acting as an EMS client and will submit orders) to
# receive requests pushed over a tractor stream
# using (for now) an async generator.
order_stream = await portal.run(send_order_cmds)
order_stream = await portal.run(
send_order_cmds,
symbol_key=symbol,
)
# start inbound order request processing
await process_order_cmds(

View File

@ -32,15 +32,25 @@ _context_defaults = dict(
@click.command()
@click.option('--loglevel', '-l', default='warning', help='Logging level')
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.option('--pdb', is_flag=True, help='Enable tractor debug mode')
@click.option('--host', '-h', default='127.0.0.1', help='Host address to bind')
def pikerd(loglevel, host, tl):
def pikerd(loglevel, host, tl, pdb):
"""Spawn the piker broker-daemon.
"""
from .._daemon import _data_mods, open_pikerd
get_console_log(loglevel)
log = get_console_log(loglevel)
if pdb:
log.warning((
"\n"
"!!! You have enabled daemon DEBUG mode !!!\n"
"If a daemon crashes it will likely block"
" the service until resumed from console!\n"
"\n"
))
async def main():
async with open_pikerd(loglevel):
async with open_pikerd(loglevel=loglevel, debug_mode=pdb):
await trio.sleep_forever()
trio.run(main)

View File

@ -15,452 +15,35 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Data feed apis and infra.
Data infra.
We provide tsdb integrations for retrieving
and storing data from your brokers as well as
sharing your feeds with other fellow pikers.
sharing live streams over a network.
"""
from dataclasses import dataclass, field
from contextlib import asynccontextmanager
from functools import partial
from importlib import import_module
from types import ModuleType
from typing import (
Dict, Any, Sequence,
AsyncIterator, Optional,
List
)
import trio
from trio_typing import TaskStatus
import tractor
from pydantic import BaseModel
from ..brokers import get_brokermod
from ..log import get_logger, get_console_log
from .._daemon import (
maybe_spawn_brokerd,
)
from ._normalize import iterticks
from ._sharedmem import (
maybe_open_shm_array,
attach_shm_array,
open_shm_array,
ShmArray,
get_shm_token,
ShmArray,
)
from ._source import base_iohlc_dtype, Symbol
from ._sampling import (
_shms,
_incrementers,
increment_ohlc_buffer,
iter_ohlc_periods,
sample_and_broadcast,
from .feed import (
open_feed,
_setup_persistent_brokerd,
)
__all__ = [
'open_feed',
'maybe_spawn_brokerd',
'ShmArray',
'iterticks',
'maybe_open_shm_array',
'attach_shm_array',
'open_shm_array',
'get_shm_token',
# 'subscribe_ohlc_for_increment',
'_setup_persistent_brokerd',
]
log = get_logger(__name__)
__ingestors__ = [
'marketstore',
]
def get_ingestormod(name: str) -> ModuleType:
"""Return the imported ingestor module by name.
"""
module = import_module('.' + name, 'piker.data')
# we only allow monkeying because it's for internal keying
module.name = module.__name__.split('.')[-1]
return module
class _FeedsBus(BaseModel):
"""Data feeds broadcaster and persistence management.
This is a brokerd side api used to manager persistent real-time
streams that can be allocated and left alive indefinitely.
"""
brokername: str
nursery: trio.Nursery
feeds: Dict[str, trio.CancelScope] = {}
subscribers: Dict[str, List[tractor.Context]] = {}
task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
class Config:
arbitrary_types_allowed = True
async def cancel_all(self) -> None:
for sym, (cs, msg, quote) in self.feeds.items():
log.debug(f'Cancelling cached feed for {self.brokername}:{sym}')
cs.cancel()
_bus: _FeedsBus = None
def get_feed_bus(
brokername: str,
nursery: Optional[trio.Nursery] = None,
) -> _FeedsBus:
"""
Retreive broker-daemon-local data feeds bus from process global
scope. Serialize task access to lock.
"""
global _bus
if nursery is not None:
assert _bus is None, "Feeds manager is already setup?"
# this is initial setup by parent actor
_bus = _FeedsBus(
brokername=brokername,
nursery=nursery,
)
assert not _bus.feeds
assert _bus.brokername == brokername, "Uhhh wtf"
return _bus
async def _setup_persistent_brokerd(brokername: str) -> None:
"""Allocate a actor-wide service nursery in ``brokerd``
such that feeds can be run in the background persistently by
the broker backend as needed.
"""
try:
async with trio.open_nursery() as service_nursery:
# assign a nursery to the feeds bus for spawning
# background tasks from clients
bus = get_feed_bus(brokername, service_nursery)
# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
await trio.sleep_forever()
finally:
# TODO: this needs to be shielded?
await bus.cancel_all()
async def allocate_persistent_feed(
ctx: tractor.Context,
bus: _FeedsBus,
brokername: str,
symbol: str,
loglevel: str,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
try:
mod = get_brokermod(brokername)
except ImportError:
mod = get_ingestormod(brokername)
# allocate shm array for this broker/symbol
# XXX: we should get an error here if one already exists
shm, opened = maybe_open_shm_array(
key=sym_to_shm_key(brokername, symbol),
# use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
# we expect the sub-actor to write
readonly=False,
)
# do history validation?
assert opened, f'Persistent shm for {symbol} was already open?!'
# if not opened:
# raise RuntimeError("Persistent shm for sym was already open?!")
send, quote_stream = trio.open_memory_channel(10)
feed_is_live = trio.Event()
# establish broker backend quote stream
# ``stream_quotes()`` is a required backend func
init_msg, first_quote = await bus.nursery.start(
partial(
mod.stream_quotes,
send_chan=send,
feed_is_live=feed_is_live,
symbols=[symbol],
shm=shm,
loglevel=loglevel,
)
)
init_msg[symbol]['shm_token'] = shm.token
cs = bus.nursery.cancel_scope
# TODO: make this into a composed type which also
# contains the backfiller cs for individual super-based
# resspawns when needed.
bus.feeds[symbol] = (cs, init_msg, first_quote)
if opened:
# start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is
# filled with first set of ohlc bars
await bus.nursery.start(mod.backfill_bars, symbol, shm)
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
# pass OHLC sample rate in seconds
init_msg[symbol]['sample_rate'] = delay_s
# yield back control to starting nursery
task_status.started((init_msg, first_quote))
await feed_is_live.wait()
if opened:
_shms.setdefault(delay_s, []).append(shm)
# start shm incrementing for OHLC sampling
if _incrementers.get(delay_s) is None:
cs = await bus.nursery.start(increment_ohlc_buffer, delay_s)
sum_tick_vlm: bool = init_msg.get(
'shm_write_opts', {}
).get('sum_tick_vlm', True)
# start sample loop
await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm)
@tractor.stream
async def attach_feed_bus(
ctx: tractor.Context,
brokername: str,
symbol: str,
loglevel: str,
):
# try:
if loglevel is None:
loglevel = tractor.current_actor().loglevel
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
# ensure we are who we think we are
assert 'brokerd' in tractor.current_actor().name
bus = get_feed_bus(brokername)
async with bus.task_lock:
task_cs = bus.feeds.get(symbol)
sub_only: bool = False
# if no cached feed for this symbol has been created for this
# brokerd yet, start persistent stream and shm writer task in
# service nursery
if task_cs is None:
init_msg, first_quote = await bus.nursery.start(
partial(
allocate_persistent_feed,
ctx=ctx,
bus=bus,
brokername=brokername,
symbol=symbol,
loglevel=loglevel,
)
)
bus.subscribers.setdefault(symbol, []).append(ctx)
else:
sub_only = True
# XXX: ``first_quote`` may be outdated here if this is secondary
# subscriber
cs, init_msg, first_quote = bus.feeds[symbol]
# send this even to subscribers to existing feed?
await ctx.send_yield(init_msg)
await ctx.send_yield(first_quote)
if sub_only:
bus.subscribers[symbol].append(ctx)
try:
await trio.sleep_forever()
finally:
bus.subscribers[symbol].remove(ctx)
@dataclass
class Feed:
"""A data feed for client-side interaction with far-process# }}}
real-time data sources.
This is an thin abstraction on top of ``tractor``'s portals for
interacting with IPC streams and conducting automatic
memory buffer orchestration.
"""
name: str
stream: AsyncIterator[Dict[str, Any]]
shm: ShmArray
mod: ModuleType
_brokerd_portal: tractor._portal.Portal
_index_stream: Optional[AsyncIterator[int]] = None
_trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None
_max_sample_rate: int = 0
# cache of symbol info messages received as first message when
# a stream startsc.
symbols: Dict[str, Symbol] = field(default_factory=dict)
async def receive(self) -> dict:
return await self.stream.__anext__()
async def index_stream(
self,
delay_s: Optional[int] = None
) -> AsyncIterator[int]:
if not self._index_stream:
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
# created for all practical purposes
self._index_stream = await self._brokerd_portal.run(
iter_ohlc_periods,
delay_s=delay_s or self._max_sample_rate,
)
return self._index_stream
async def recv_trades_data(self) -> AsyncIterator[dict]:
if not getattr(self.mod, 'stream_trades', False):
log.warning(
f"{self.mod.name} doesn't have trade data support yet :(")
if not self._trade_stream:
raise RuntimeError(
f'Can not stream trade data from {self.mod.name}')
# NOTE: this can be faked by setting a rx chan
# using the ``_.set_fake_trades_stream()`` method
if self._trade_stream is None:
self._trade_stream = await self._brokerd_portal.run(
self.mod.stream_trades,
# do we need this? -> yes
# the broker side must declare this key
# in messages, though we could probably use
# more then one?
topics=['local_trades'],
)
return self._trade_stream
def sym_to_shm_key(
broker: str,
symbol: str,
) -> str:
return f'{broker}.{symbol}'
@asynccontextmanager
async def open_feed(
brokername: str,
symbols: Sequence[str],
loglevel: Optional[str] = None,
) -> AsyncIterator[Dict[str, Any]]:
"""Open a "data feed" which provides streamed real-time quotes.
"""
try:
mod = get_brokermod(brokername)
except ImportError:
mod = get_ingestormod(brokername)
if loglevel is None:
loglevel = tractor.current_actor().loglevel
# TODO: do all!
sym = symbols[0]
async with maybe_spawn_brokerd(
brokername,
loglevel=loglevel,
) as portal:
stream = await portal.run(
attach_feed_bus,
brokername=brokername,
symbol=sym,
loglevel=loglevel,
)
# TODO: can we make this work better with the proposed
# context based bidirectional streaming style api proposed in:
# https://github.com/goodboy/tractor/issues/53
init_msg = await stream.receive()
# we can only read from shm
shm = attach_shm_array(
token=init_msg[sym]['shm_token'],
readonly=True,
)
feed = Feed(
name=brokername,
stream=stream,
shm=shm,
mod=mod,
_brokerd_portal=portal,
)
ohlc_sample_rates = []
for sym, data in init_msg.items():
si = data['symbol_info']
ohlc_sample_rates.append(data['sample_rate'])
symbol = Symbol(
key=sym,
type_key=si.get('asset_type', 'forex'),
tick_size=si.get('price_tick_size', 0.01),
lot_tick_size=si.get('lot_tick_size', 0.0),
)
symbol.broker_info[brokername] = si
feed.symbols[sym] = symbol
# cast shm dtype to list... can't member why we need this
shm_token = data['shm_token']
shm_token['dtype_descr'] = list(shm_token['dtype_descr'])
assert shm_token == shm.token # sanity
feed._max_sample_rate = max(ohlc_sample_rates)
try:
yield feed
finally:
# always cancel the far end producer task
with trio.CancelScope(shield=True):
await stream.aclose()

436
piker/data/feed.py 100644
View File

@ -0,0 +1,436 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Data feed apis and infra.
"""
from dataclasses import dataclass, field
from contextlib import asynccontextmanager
from functools import partial
from types import ModuleType
from typing import (
Dict, Any, Sequence,
AsyncIterator, Optional,
List
)
import trio
from trio_typing import TaskStatus
import tractor
from pydantic import BaseModel
from ..brokers import get_brokermod
from ..log import get_logger, get_console_log
from .._daemon import (
maybe_spawn_brokerd,
)
from ._sharedmem import (
maybe_open_shm_array,
attach_shm_array,
ShmArray,
)
from ._source import base_iohlc_dtype, Symbol
from ._sampling import (
_shms,
_incrementers,
increment_ohlc_buffer,
iter_ohlc_periods,
sample_and_broadcast,
)
log = get_logger(__name__)
class _FeedsBus(BaseModel):
"""Data feeds broadcaster and persistence management.
This is a brokerd side api used to manager persistent real-time
streams that can be allocated and left alive indefinitely.
"""
brokername: str
nursery: trio.Nursery
feeds: Dict[str, trio.CancelScope] = {}
subscribers: Dict[str, List[tractor.Context]] = {}
task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock()
class Config:
arbitrary_types_allowed = True
async def cancel_all(self) -> None:
for sym, (cs, msg, quote) in self.feeds.items():
log.debug(f'Cancelling cached feed for {self.brokername}:{sym}')
cs.cancel()
_bus: _FeedsBus = None
def get_feed_bus(
brokername: str,
nursery: Optional[trio.Nursery] = None,
) -> _FeedsBus:
"""
Retreive broker-daemon-local data feeds bus from process global
scope. Serialize task access to lock.
"""
global _bus
if nursery is not None:
assert _bus is None, "Feeds manager is already setup?"
# this is initial setup by parent actor
_bus = _FeedsBus(
brokername=brokername,
nursery=nursery,
)
assert not _bus.feeds
assert _bus.brokername == brokername, "Uhhh wtf"
return _bus
async def _setup_persistent_brokerd(brokername: str) -> None:
"""Allocate a actor-wide service nursery in ``brokerd``
such that feeds can be run in the background persistently by
the broker backend as needed.
"""
try:
async with trio.open_nursery() as service_nursery:
# assign a nursery to the feeds bus for spawning
# background tasks from clients
bus = get_feed_bus(brokername, service_nursery)
# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
await trio.sleep_forever()
finally:
# TODO: this needs to be shielded?
await bus.cancel_all()
async def allocate_persistent_feed(
ctx: tractor.Context,
bus: _FeedsBus,
brokername: str,
symbol: str,
loglevel: str,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
try:
mod = get_brokermod(brokername)
except ImportError:
mod = get_ingestormod(brokername)
# allocate shm array for this broker/symbol
# XXX: we should get an error here if one already exists
shm, opened = maybe_open_shm_array(
key=sym_to_shm_key(brokername, symbol),
# use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
# we expect the sub-actor to write
readonly=False,
)
# do history validation?
assert opened, f'Persistent shm for {symbol} was already open?!'
# if not opened:
# raise RuntimeError("Persistent shm for sym was already open?!")
send, quote_stream = trio.open_memory_channel(10)
feed_is_live = trio.Event()
# establish broker backend quote stream
# ``stream_quotes()`` is a required backend func
init_msg, first_quote = await bus.nursery.start(
partial(
mod.stream_quotes,
send_chan=send,
feed_is_live=feed_is_live,
symbols=[symbol],
shm=shm,
loglevel=loglevel,
)
)
init_msg[symbol]['shm_token'] = shm.token
cs = bus.nursery.cancel_scope
# TODO: make this into a composed type which also
# contains the backfiller cs for individual super-based
# resspawns when needed.
bus.feeds[symbol] = (cs, init_msg, first_quote)
if opened:
# start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is
# filled with first set of ohlc bars
await bus.nursery.start(mod.backfill_bars, symbol, shm)
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
# pass OHLC sample rate in seconds
init_msg[symbol]['sample_rate'] = delay_s
# yield back control to starting nursery
task_status.started((init_msg, first_quote))
await feed_is_live.wait()
if opened:
_shms.setdefault(delay_s, []).append(shm)
# start shm incrementing for OHLC sampling
if _incrementers.get(delay_s) is None:
cs = await bus.nursery.start(increment_ohlc_buffer, delay_s)
sum_tick_vlm: bool = init_msg.get(
'shm_write_opts', {}
).get('sum_tick_vlm', True)
# start sample loop
await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm)
@tractor.stream
async def attach_feed_bus(
ctx: tractor.Context,
brokername: str,
symbol: str,
loglevel: str,
):
# try:
if loglevel is None:
loglevel = tractor.current_actor().loglevel
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
# ensure we are who we think we are
assert 'brokerd' in tractor.current_actor().name
bus = get_feed_bus(brokername)
async with bus.task_lock:
task_cs = bus.feeds.get(symbol)
sub_only: bool = False
# if no cached feed for this symbol has been created for this
# brokerd yet, start persistent stream and shm writer task in
# service nursery
if task_cs is None:
init_msg, first_quote = await bus.nursery.start(
partial(
allocate_persistent_feed,
ctx=ctx,
bus=bus,
brokername=brokername,
symbol=symbol,
loglevel=loglevel,
)
)
bus.subscribers.setdefault(symbol, []).append(ctx)
else:
sub_only = True
# XXX: ``first_quote`` may be outdated here if this is secondary
# subscriber
cs, init_msg, first_quote = bus.feeds[symbol]
# send this even to subscribers to existing feed?
await ctx.send_yield(init_msg)
await ctx.send_yield(first_quote)
if sub_only:
bus.subscribers[symbol].append(ctx)
try:
await trio.sleep_forever()
finally:
bus.subscribers[symbol].remove(ctx)
@dataclass
class Feed:
"""A data feed for client-side interaction with far-process# }}}
real-time data sources.
This is an thin abstraction on top of ``tractor``'s portals for
interacting with IPC streams and conducting automatic
memory buffer orchestration.
"""
name: str
stream: AsyncIterator[Dict[str, Any]]
shm: ShmArray
mod: ModuleType
_brokerd_portal: tractor._portal.Portal
_index_stream: Optional[AsyncIterator[int]] = None
_trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None
_max_sample_rate: int = 0
# cache of symbol info messages received as first message when
# a stream startsc.
symbols: Dict[str, Symbol] = field(default_factory=dict)
async def receive(self) -> dict:
return await self.stream.__anext__()
async def index_stream(
self,
delay_s: Optional[int] = None
) -> AsyncIterator[int]:
if not self._index_stream:
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
# created for all practical purposes
self._index_stream = await self._brokerd_portal.run(
iter_ohlc_periods,
delay_s=delay_s or self._max_sample_rate,
)
return self._index_stream
async def recv_trades_data(self) -> AsyncIterator[dict]:
if not getattr(self.mod, 'stream_trades', False):
log.warning(
f"{self.mod.name} doesn't have trade data support yet :(")
if not self._trade_stream:
raise RuntimeError(
f'Can not stream trade data from {self.mod.name}')
# NOTE: this can be faked by setting a rx chan
# using the ``_.set_fake_trades_stream()`` method
if self._trade_stream is None:
self._trade_stream = await self._brokerd_portal.run(
self.mod.stream_trades,
# do we need this? -> yes
# the broker side must declare this key
# in messages, though we could probably use
# more then one?
topics=['local_trades'],
)
return self._trade_stream
def sym_to_shm_key(
broker: str,
symbol: str,
) -> str:
return f'{broker}.{symbol}'
@asynccontextmanager
async def open_feed(
brokername: str,
symbols: Sequence[str],
loglevel: Optional[str] = None,
) -> AsyncIterator[Dict[str, Any]]:
"""Open a "data feed" which provides streamed real-time quotes.
"""
try:
mod = get_brokermod(brokername)
except ImportError:
mod = get_ingestormod(brokername)
if loglevel is None:
loglevel = tractor.current_actor().loglevel
# TODO: do all!
sym = symbols[0]
async with maybe_spawn_brokerd(
brokername,
loglevel=loglevel,
) as portal:
stream = await portal.run(
attach_feed_bus,
brokername=brokername,
symbol=sym,
loglevel=loglevel,
)
# TODO: can we make this work better with the proposed
# context based bidirectional streaming style api proposed in:
# https://github.com/goodboy/tractor/issues/53
init_msg = await stream.receive()
# we can only read from shm
shm = attach_shm_array(
token=init_msg[sym]['shm_token'],
readonly=True,
)
feed = Feed(
name=brokername,
stream=stream,
shm=shm,
mod=mod,
_brokerd_portal=portal,
)
ohlc_sample_rates = []
for sym, data in init_msg.items():
si = data['symbol_info']
ohlc_sample_rates.append(data['sample_rate'])
symbol = Symbol(
key=sym,
type_key=si.get('asset_type', 'forex'),
tick_size=si.get('price_tick_size', 0.01),
lot_tick_size=si.get('lot_tick_size', 0.0),
)
symbol.broker_info[brokername] = si
feed.symbols[sym] = symbol
# cast shm dtype to list... can't member why we need this
shm_token = data['shm_token']
shm_token['dtype_descr'] = list(shm_token['dtype_descr'])
assert shm_token == shm.token # sanity
feed._max_sample_rate = max(ohlc_sample_rates)
try:
yield feed
finally:
# always cancel the far end producer task
with trio.CancelScope(shield=True):
await stream.aclose()

View File

@ -0,0 +1,41 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Ingestion, for dataz.
Api layer likely in here...
"""
from types import ModuleType
from importlib import import_module
from ..log import get_logger
log = get_logger(__name__)
__ingestors__ = [
'marketstore',
]
def get_ingestormod(name: str) -> ModuleType:
"""Return the imported ingestor module by name.
"""
module = import_module('.' + name, 'piker.data')
# we only allow monkeying because it's for internal keying
module.name = module.__name__.split('.')[-1]
return module

View File

@ -200,7 +200,6 @@ def run_qtractor(
async def main():
async with maybe_open_pikerd(
start_method='trio',
**tractor_kwargs,
):
await func(*((widgets,) + args))

View File

@ -24,7 +24,7 @@ import tractor
from ..cli import cli
from .. import watchlists as wl
from ..data import maybe_spawn_brokerd
from .._daemon import maybe_spawn_brokerd
_config_dir = click.get_app_dir('piker')
@ -125,9 +125,14 @@ def optschain(config, symbol, date, rate, test):
is_flag=True,
help='Enable pyqtgraph profiling'
)
@click.option(
'--pdb',
is_flag=True,
help='Enable tractor debug mode'
)
@click.argument('symbol', required=True)
@click.pass_obj
def chart(config, symbol, profile):
def chart(config, symbol, profile, pdb):
"""Start a real-time chartng UI
"""
from .. import _profile
@ -146,7 +151,7 @@ def chart(config, symbol, profile):
brokername=brokername,
piker_loglevel=pikerloglevel,
tractor_kwargs={
'debug_mode': True,
'debug_mode': pdb,
'loglevel': tractorloglevel,
'name': 'chart',
'enable_modules': [

View File

@ -317,6 +317,7 @@ async def start_order_mode(
symbol: Symbol,
brokername: str,
) -> None:
# spawn EMS actor-service
async with open_ems(
brokername,