Expect new init message in feed from brokers
parent
b4a4f12aa4
commit
92efb8fd8e
|
@ -21,7 +21,7 @@ We provide tsdb integrations for retrieving
|
||||||
and storing data from your brokers as well as
|
and storing data from your brokers as well as
|
||||||
sharing your feeds with other fellow pikers.
|
sharing your feeds with other fellow pikers.
|
||||||
"""
|
"""
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass, field
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from importlib import import_module
|
from importlib import import_module
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
|
@ -42,7 +42,7 @@ from ._sharedmem import (
|
||||||
ShmArray,
|
ShmArray,
|
||||||
get_shm_token,
|
get_shm_token,
|
||||||
)
|
)
|
||||||
from ._source import base_iohlc_dtype
|
from ._source import base_iohlc_dtype, Symbol
|
||||||
from ._buffer import (
|
from ._buffer import (
|
||||||
increment_ohlc_buffer,
|
increment_ohlc_buffer,
|
||||||
subscribe_ohlc_for_increment
|
subscribe_ohlc_for_increment
|
||||||
|
@ -149,6 +149,10 @@ class Feed:
|
||||||
_index_stream: Optional[AsyncIterator[int]] = None
|
_index_stream: Optional[AsyncIterator[int]] = None
|
||||||
_trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None
|
_trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None
|
||||||
|
|
||||||
|
# cache of symbol info messages received as first message when
|
||||||
|
# a stream startsc.
|
||||||
|
symbols: Dict[str, Symbol] = field(default_factory=dict)
|
||||||
|
|
||||||
async def receive(self) -> dict:
|
async def receive(self) -> dict:
|
||||||
return await self.stream.__anext__()
|
return await self.stream.__anext__()
|
||||||
|
|
||||||
|
@ -208,23 +212,26 @@ def sym_to_shm_key(
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def open_feed(
|
async def open_feed(
|
||||||
name: str,
|
brokername: str,
|
||||||
symbols: Sequence[str],
|
symbols: Sequence[str],
|
||||||
loglevel: Optional[str] = None,
|
loglevel: Optional[str] = None,
|
||||||
) -> AsyncIterator[Dict[str, Any]]:
|
) -> AsyncIterator[Dict[str, Any]]:
|
||||||
"""Open a "data feed" which provides streamed real-time quotes.
|
"""Open a "data feed" which provides streamed real-time quotes.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
mod = get_brokermod(name)
|
mod = get_brokermod(brokername)
|
||||||
except ImportError:
|
except ImportError:
|
||||||
mod = get_ingestormod(name)
|
mod = get_ingestormod(brokername)
|
||||||
|
|
||||||
if loglevel is None:
|
if loglevel is None:
|
||||||
loglevel = tractor.current_actor().loglevel
|
loglevel = tractor.current_actor().loglevel
|
||||||
|
|
||||||
|
# TODO: do all!
|
||||||
|
sym = symbols[0]
|
||||||
|
|
||||||
# Attempt to allocate (or attach to) shm array for this broker/symbol
|
# Attempt to allocate (or attach to) shm array for this broker/symbol
|
||||||
shm, opened = maybe_open_shm_array(
|
shm, opened = maybe_open_shm_array(
|
||||||
key=sym_to_shm_key(name, symbols[0]),
|
key=sym_to_shm_key(brokername, sym),
|
||||||
|
|
||||||
# use any broker defined ohlc dtype:
|
# use any broker defined ohlc dtype:
|
||||||
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
|
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
|
||||||
|
@ -234,34 +241,51 @@ async def open_feed(
|
||||||
)
|
)
|
||||||
|
|
||||||
async with maybe_spawn_brokerd(
|
async with maybe_spawn_brokerd(
|
||||||
mod.name,
|
brokername,
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
) as portal:
|
) as portal:
|
||||||
stream = await portal.run(
|
stream = await portal.run(
|
||||||
mod.stream_quotes,
|
mod.stream_quotes,
|
||||||
|
|
||||||
|
# TODO: actually handy multiple symbols...
|
||||||
symbols=symbols,
|
symbols=symbols,
|
||||||
|
|
||||||
shm_token=shm.token,
|
shm_token=shm.token,
|
||||||
|
|
||||||
# compat with eventual ``tractor.msg.pub``
|
# compat with eventual ``tractor.msg.pub``
|
||||||
topics=symbols,
|
topics=symbols,
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: we can't do this **and** be compate with
|
feed = Feed(
|
||||||
# ``tractor.msg.pub``, should we maybe just drop this after
|
name=brokername,
|
||||||
# tests are in?
|
|
||||||
shm_token, is_writer = await stream.receive()
|
|
||||||
|
|
||||||
if opened:
|
|
||||||
assert is_writer
|
|
||||||
log.info("Started shared mem bar writer")
|
|
||||||
|
|
||||||
shm_token['dtype_descr'] = list(shm_token['dtype_descr'])
|
|
||||||
assert shm_token == shm.token # sanity
|
|
||||||
|
|
||||||
yield Feed(
|
|
||||||
name=name,
|
|
||||||
stream=stream,
|
stream=stream,
|
||||||
shm=shm,
|
shm=shm,
|
||||||
mod=mod,
|
mod=mod,
|
||||||
_brokerd_portal=portal,
|
_brokerd_portal=portal,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# TODO: we can't do this **and** be compate with
|
||||||
|
# ``tractor.msg.pub``, should we maybe just drop this after
|
||||||
|
# tests are in?
|
||||||
|
init_msg = await stream.receive()
|
||||||
|
|
||||||
|
for sym, data in init_msg.items():
|
||||||
|
|
||||||
|
si = data['symbol_info']
|
||||||
|
symbol = Symbol(
|
||||||
|
sym,
|
||||||
|
min_tick=si.get('minTick', 0.01),
|
||||||
|
)
|
||||||
|
symbol.broker_info[brokername] = si
|
||||||
|
|
||||||
|
feed.symbols[sym] = symbol
|
||||||
|
|
||||||
|
shm_token = data['shm_token']
|
||||||
|
if opened:
|
||||||
|
assert data['is_shm_writer']
|
||||||
|
log.info("Started shared mem bar writer")
|
||||||
|
|
||||||
|
shm_token['dtype_descr'] = list(shm_token['dtype_descr'])
|
||||||
|
assert shm_token == shm.token # sanity
|
||||||
|
|
||||||
|
yield feed
|
||||||
|
|
|
@ -17,9 +17,9 @@
|
||||||
"""
|
"""
|
||||||
numpy data source coversion helpers.
|
numpy data source coversion helpers.
|
||||||
"""
|
"""
|
||||||
from typing import List
|
from typing import Dict, Any, List
|
||||||
import decimal
|
import decimal
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass, field
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
@ -82,9 +82,13 @@ class Symbol:
|
||||||
|
|
||||||
"""
|
"""
|
||||||
key: str = ''
|
key: str = ''
|
||||||
brokers: List[str] = None
|
|
||||||
min_tick: float = 0.01
|
min_tick: float = 0.01
|
||||||
contract: str = ''
|
broker_info: Dict[str, Dict[str, Any]] = field(default_factory=dict)
|
||||||
|
deriv: str = ''
|
||||||
|
|
||||||
|
@property
|
||||||
|
def brokers(self) -> List[str]:
|
||||||
|
return list(self.broker_info.keys())
|
||||||
|
|
||||||
def digits(self) -> int:
|
def digits(self) -> int:
|
||||||
"""Return the trailing number of digits specified by the
|
"""Return the trailing number of digits specified by the
|
||||||
|
@ -93,6 +97,13 @@ class Symbol:
|
||||||
"""
|
"""
|
||||||
return float_digits(self.min_tick)
|
return float_digits(self.min_tick)
|
||||||
|
|
||||||
|
def nearest_tick(self, value: float) -> float:
|
||||||
|
"""Return the nearest tick value based on mininum increment.
|
||||||
|
|
||||||
|
"""
|
||||||
|
mult = 1 / self.min_tick
|
||||||
|
return round(value * mult) / mult
|
||||||
|
|
||||||
|
|
||||||
def from_df(
|
def from_df(
|
||||||
df: pd.DataFrame,
|
df: pd.DataFrame,
|
||||||
|
|
Loading…
Reference in New Issue