Compare commits
6 Commits
75b73ee007
...
6402146116
| Author | SHA1 | Date |
|---|---|---|
|
|
6402146116 | |
|
|
5e44ad05e8 | |
|
|
799c9f45b4 | |
|
|
3254f9233c | |
|
|
ced898f580 | |
|
|
188ce1415e |
|
|
@ -3,7 +3,8 @@
|
|||
"allow": [
|
||||
"Bash(chmod:*)",
|
||||
"Bash(/tmp/piker_commits.txt)",
|
||||
"Bash(python:*)"
|
||||
"Bash(python:*)",
|
||||
"Bash(ls:*)"
|
||||
],
|
||||
"deny": [],
|
||||
"ask": []
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ from piker.log import (
|
|||
from ._util import (
|
||||
BrokerError,
|
||||
SymbolNotFound,
|
||||
MarketNotFound as MarketNotFound,
|
||||
NoData,
|
||||
DataUnavailable,
|
||||
DataThrottle,
|
||||
|
|
|
|||
|
|
@ -66,6 +66,10 @@ class SymbolNotFound(BrokerError):
|
|||
"Symbol not found by broker search"
|
||||
|
||||
|
||||
class MarketNotFound(SymbolNotFound):
|
||||
"Mkt-pair not found by broker search"
|
||||
|
||||
|
||||
# TODO: these should probably be moved to `.tsp/.data`?
|
||||
class NoData(BrokerError):
|
||||
'''
|
||||
|
|
|
|||
|
|
@ -48,7 +48,7 @@ import tractor
|
|||
from piker.brokers import (
|
||||
open_cached_client,
|
||||
NoData,
|
||||
SymbolNotFound,
|
||||
MarketNotFound,
|
||||
)
|
||||
from piker._cacheables import (
|
||||
async_lifo_cache,
|
||||
|
|
@ -325,9 +325,21 @@ async def get_mkt_info(
|
|||
venue_lower: str = venue.lower()
|
||||
|
||||
if not venue:
|
||||
raise SymbolNotFound(
|
||||
if expiry:
|
||||
expiry = f'.{expiry}'
|
||||
expected: str = (
|
||||
f'{mkt_ep}'
|
||||
f'.<venue>'
|
||||
f'{expiry}'
|
||||
f'.{broker}'
|
||||
)
|
||||
raise MarketNotFound(
|
||||
f'Invalid or missing .<venue> part in fqme?\n'
|
||||
f'\n'
|
||||
f'fqme: {fqme!r}\n'
|
||||
f'expected-form>> {expected}\n'
|
||||
f'\n'
|
||||
f'Maybe you are missing a ".spot." ?\n'
|
||||
)
|
||||
|
||||
# XXX TODO: we should change the usdtm_futes name to just
|
||||
|
|
|
|||
|
|
@ -425,7 +425,7 @@ class DataFeed:
|
|||
async def stream_to_file(
|
||||
watchlist_name: str,
|
||||
filename: str,
|
||||
portal: tractor._portal.Portal,
|
||||
portal: tractor.Portal,
|
||||
tickers: List[str],
|
||||
brokermod: ModuleType,
|
||||
rate: int,
|
||||
|
|
|
|||
|
|
@ -23,7 +23,6 @@ from contextlib import (
|
|||
asynccontextmanager as acm,
|
||||
)
|
||||
from datetime import datetime
|
||||
from functools import partial
|
||||
import time
|
||||
from typing import (
|
||||
Any,
|
||||
|
|
@ -524,13 +523,12 @@ async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
|
|||
|
||||
|
||||
async def aio_price_feed_relay(
|
||||
chan: to_asyncio.LinkedTaskChannel,
|
||||
fh: FeedHandler,
|
||||
instrument: Symbol,
|
||||
from_trio: asyncio.Queue,
|
||||
to_trio: trio.abc.SendChannel,
|
||||
) -> None:
|
||||
async def _trade(data: dict, receipt_timestamp):
|
||||
to_trio.send_nowait(('trade', {
|
||||
chan.send_nowait(('trade', {
|
||||
'symbol': cb_sym_to_deribit_inst(
|
||||
str_to_cb_sym(data.symbol)).lower(),
|
||||
'last': data,
|
||||
|
|
@ -540,7 +538,7 @@ async def aio_price_feed_relay(
|
|||
}))
|
||||
|
||||
async def _l1(data: dict, receipt_timestamp):
|
||||
to_trio.send_nowait(('l1', {
|
||||
chan.send_nowait(('l1', {
|
||||
'symbol': cb_sym_to_deribit_inst(
|
||||
str_to_cb_sym(data.symbol)).lower(),
|
||||
'ticks': [
|
||||
|
|
@ -570,7 +568,7 @@ async def aio_price_feed_relay(
|
|||
install_signal_handlers=False)
|
||||
|
||||
# sync with trio
|
||||
to_trio.send_nowait(None)
|
||||
chan.started_nowait(None)
|
||||
|
||||
await asyncio.sleep(float('inf'))
|
||||
|
||||
|
|
@ -581,11 +579,9 @@ async def open_price_feed(
|
|||
) -> trio.abc.ReceiveStream:
|
||||
async with maybe_open_feed_handler() as fh:
|
||||
async with to_asyncio.open_channel_from(
|
||||
partial(
|
||||
aio_price_feed_relay,
|
||||
fh,
|
||||
instrument
|
||||
)
|
||||
fh=fh,
|
||||
instrument=instrument,
|
||||
) as (chan, first):
|
||||
yield chan
|
||||
|
||||
|
|
@ -611,10 +607,9 @@ async def maybe_open_price_feed(
|
|||
|
||||
|
||||
async def aio_order_feed_relay(
|
||||
chan: to_asyncio.LinkedTaskChannel,
|
||||
fh: FeedHandler,
|
||||
instrument: Symbol,
|
||||
from_trio: asyncio.Queue,
|
||||
to_trio: trio.abc.SendChannel,
|
||||
) -> None:
|
||||
async def _fill(data: dict, receipt_timestamp):
|
||||
breakpoint()
|
||||
|
|
@ -637,7 +632,7 @@ async def aio_order_feed_relay(
|
|||
install_signal_handlers=False)
|
||||
|
||||
# sync with trio
|
||||
to_trio.send_nowait(None)
|
||||
chan.started_nowait(None)
|
||||
|
||||
await asyncio.sleep(float('inf'))
|
||||
|
||||
|
|
@ -648,11 +643,9 @@ async def open_order_feed(
|
|||
) -> trio.abc.ReceiveStream:
|
||||
async with maybe_open_feed_handler() as fh:
|
||||
async with to_asyncio.open_channel_from(
|
||||
partial(
|
||||
aio_order_feed_relay,
|
||||
fh,
|
||||
instrument
|
||||
)
|
||||
fh=fh,
|
||||
instrument=instrument,
|
||||
) as (chan, first):
|
||||
yield chan
|
||||
|
||||
|
|
|
|||
|
|
@ -231,20 +231,21 @@ async def handle_order_requests(
|
|||
|
||||
async def recv_trade_updates(
|
||||
|
||||
chan: tractor.to_asyncio.LinkedTaskChannel,
|
||||
client: Client,
|
||||
to_trio: trio.abc.SendChannel,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Receive and relay order control and positioning related events
|
||||
from `ib_async`, pack as tuples and push over mem-chan to our
|
||||
trio relay task for processing and relay to EMS.
|
||||
Receive and relay order control and positioning
|
||||
related events from `ib_async`, pack as tuples and
|
||||
push over mem-chan to our trio relay task for
|
||||
processing and relay to EMS.
|
||||
|
||||
'''
|
||||
client.inline_errors(to_trio)
|
||||
client.inline_errors(chan)
|
||||
|
||||
# sync with trio task
|
||||
to_trio.send_nowait(client.ib)
|
||||
chan.started_nowait(client.ib)
|
||||
|
||||
def push_tradesies(
|
||||
eventkit_obj,
|
||||
|
|
@ -282,7 +283,7 @@ async def recv_trade_updates(
|
|||
|
||||
try:
|
||||
# emit event name + relevant ibis internal objects
|
||||
to_trio.send_nowait((event_name, emit))
|
||||
chan.send_nowait((event_name, emit))
|
||||
except trio.BrokenResourceError:
|
||||
log.exception(f'Disconnected from {eventkit_obj} updates')
|
||||
eventkit_obj.disconnect(push_tradesies)
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ from types import ModuleType
|
|||
import click
|
||||
import trio
|
||||
import tractor
|
||||
from tractor._multiaddr import parse_maddr
|
||||
from tractor.discovery._multiaddr import parse_maddr
|
||||
|
||||
from ..log import (
|
||||
get_console_log,
|
||||
|
|
@ -345,7 +345,7 @@ def services(
|
|||
if not ports:
|
||||
ports: list[int] = [_default_registry_port]
|
||||
|
||||
addr = tractor._addr.wrap_address(
|
||||
addr = tractor.discovery._addr.wrap_address(
|
||||
addr=(host, ports[0])
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -77,7 +77,7 @@ from ._sampling import (
|
|||
|
||||
if TYPE_CHECKING:
|
||||
from .flows import Flume
|
||||
from tractor._addr import Address
|
||||
from tractor.discovery._addr import Address
|
||||
from tractor.msg.types import Aid
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -91,7 +91,7 @@ async def open_piker_runtime(
|
|||
try:
|
||||
actor = tractor.current_actor()
|
||||
except tractor._exceptions.NoRuntime:
|
||||
tractor._state._runtime_vars[
|
||||
tractor.runtime._state._runtime_vars[
|
||||
'piker_vars'
|
||||
] = tractor_runtime_overrides
|
||||
|
||||
|
|
@ -264,7 +264,7 @@ async def maybe_open_pikerd(
|
|||
**kwargs,
|
||||
|
||||
) -> (
|
||||
tractor._portal.Portal
|
||||
tractor.Portal
|
||||
|ClassVar[Services]
|
||||
):
|
||||
'''
|
||||
|
|
|
|||
|
|
@ -79,10 +79,17 @@ async def maybe_spawn_daemon(
|
|||
lock = Services.locks[service_name]
|
||||
await lock.acquire()
|
||||
|
||||
if not pikerd_kwargs:
|
||||
# XXX NOTE, pin to apprope `tractor` branch!
|
||||
rtvs: dict = tractor.get_runtime_vars()
|
||||
registry_addrs: list[tuple] = list(
|
||||
map(tuple, rtvs['_registry_addrs'])
|
||||
)
|
||||
|
||||
try:
|
||||
async with find_service(
|
||||
service_name,
|
||||
registry_addrs=[('127.0.0.1', 6116)],
|
||||
registry_addrs=registry_addrs,
|
||||
) as portal:
|
||||
if portal is not None:
|
||||
lock.release()
|
||||
|
|
@ -99,6 +106,7 @@ async def maybe_spawn_daemon(
|
|||
# process tree
|
||||
async with maybe_open_pikerd(
|
||||
loglevel=loglevel,
|
||||
registry_addrs=registry_addrs,
|
||||
**pikerd_kwargs,
|
||||
|
||||
) as pikerd_portal:
|
||||
|
|
|
|||
|
|
@ -48,7 +48,7 @@ log = get_logger(name=__name__)
|
|||
# new actors and supervises them to completion?
|
||||
class Services:
|
||||
|
||||
actor_n: tractor._supervise.ActorNursery
|
||||
actor_n: tractor.ActorNursery
|
||||
service_n: trio.Nursery
|
||||
debug_mode: bool # tractor sub-actor debug mode flag
|
||||
service_tasks: dict[
|
||||
|
|
|
|||
|
|
@ -306,7 +306,7 @@ def ldshm(
|
|||
# TODO: call null-seg fixer somehow?
|
||||
if null_segs:
|
||||
|
||||
if tractor._state.is_debug_mode():
|
||||
if tractor.runtime._state.is_debug_mode():
|
||||
await tractor.pause()
|
||||
# async with (
|
||||
# trio.open_nursery() as tn,
|
||||
|
|
|
|||
|
|
@ -196,7 +196,7 @@ async def markup_gaps(
|
|||
prev_r: pl.DataFrame = prev_row_by_i
|
||||
|
||||
# debug any missing pre-row
|
||||
if tractor._state.is_debug_mode():
|
||||
if tractor.runtime._state.is_debug_mode():
|
||||
await tractor.pause()
|
||||
|
||||
istart: int = prev_r['index'][0]
|
||||
|
|
|
|||
|
|
@ -167,7 +167,7 @@ async def stream_symbol_selection():
|
|||
|
||||
async def _async_main(
|
||||
name: str,
|
||||
portal: tractor._portal.Portal,
|
||||
portal: tractor.Portal,
|
||||
symbols: List[str],
|
||||
brokermod: ModuleType,
|
||||
loglevel: str = 'info',
|
||||
|
|
|
|||
|
|
@ -436,7 +436,7 @@ class OptionChain(object):
|
|||
|
||||
|
||||
async def new_chain_ui(
|
||||
portal: tractor._portal.Portal,
|
||||
portal: tractor.Portal,
|
||||
symbol: str,
|
||||
brokermod: types.ModuleType,
|
||||
rate: int = 1,
|
||||
|
|
|
|||
|
|
@ -137,7 +137,6 @@ async def _open_test_pikerd(
|
|||
raddr = portal.chan.raddr
|
||||
uw_raddr: tuple = raddr.unwrap()
|
||||
assert uw_raddr == reg_addr
|
||||
await tractor.pause()
|
||||
yield (
|
||||
raddr._host,
|
||||
raddr._port,
|
||||
|
|
|
|||
|
|
@ -23,13 +23,35 @@ from piker.accounting import (
|
|||
'fqmes',
|
||||
[
|
||||
# binance
|
||||
(100, {'btcusdt.binance', 'ethusdt.binance'}, False),
|
||||
(100, {
|
||||
# !TODO, write a suite which validates raising against
|
||||
# bad/legacy fqmes such as this!
|
||||
# 'btcusdt.binance',
|
||||
'btcusdt.spot.binance',
|
||||
'ethusdt.spot.binance',
|
||||
}, False),
|
||||
|
||||
# kraken
|
||||
(20, {'ethusdt.kraken', 'xbtusd.kraken'}, True),
|
||||
(20, {
|
||||
# !TODO, write a suite which validates raising against
|
||||
# bad/legacy fqmes such as this!
|
||||
# 'ethusdt.kraken',
|
||||
# 'xbtusd.kraken',
|
||||
|
||||
'ethusdt.spot.kraken',
|
||||
'xbtusd.spot.kraken',
|
||||
}, True),
|
||||
|
||||
# binance + kraken
|
||||
(100, {'btcusdt.binance', 'xbtusd.kraken'}, False),
|
||||
(100, {
|
||||
# !TODO, write a suite which validates raising against
|
||||
# bad/legacy fqmes such as this!
|
||||
# 'btcusdt.binance',
|
||||
# 'xbtusd.kraken',
|
||||
|
||||
'btcusdt.spot.binance',
|
||||
'xbtusd.spot.kraken',
|
||||
}, False),
|
||||
],
|
||||
ids=lambda param: f'quotes={param[0]}@fqmes={param[1]}',
|
||||
)
|
||||
|
|
@ -48,12 +70,17 @@ def test_multi_fqsn_feed(
|
|||
|
||||
if (
|
||||
ci_env
|
||||
and not run_in_ci
|
||||
and
|
||||
not run_in_ci
|
||||
):
|
||||
pytest.skip('Skipping CI disabled test due to feed restrictions')
|
||||
pytest.skip(
|
||||
'CI-disabled-test due to live-feed restrictions'
|
||||
)
|
||||
|
||||
brokers = set()
|
||||
for fqme in fqmes:
|
||||
# ?TODO, add this unpack + normalize check to a symbology
|
||||
# helper fn?
|
||||
brokername, *_ = unpack_fqme(fqme)
|
||||
brokers.add(brokername)
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue