Compare commits

..

No commits in common. "640214611622e69123c73142e532b14da14dbb48" and "75b73ee007e280082f1156ce65ded837d29cb61a" have entirely different histories.

18 changed files with 47 additions and 93 deletions

View File

@ -3,8 +3,7 @@
"allow": [
"Bash(chmod:*)",
"Bash(/tmp/piker_commits.txt)",
"Bash(python:*)",
"Bash(ls:*)"
"Bash(python:*)"
],
"deny": [],
"ask": []

View File

@ -31,7 +31,6 @@ from piker.log import (
from ._util import (
BrokerError,
SymbolNotFound,
MarketNotFound as MarketNotFound,
NoData,
DataUnavailable,
DataThrottle,

View File

@ -66,10 +66,6 @@ class SymbolNotFound(BrokerError):
"Symbol not found by broker search"
class MarketNotFound(SymbolNotFound):
"Mkt-pair not found by broker search"
# TODO: these should probably be moved to `.tsp/.data`?
class NoData(BrokerError):
'''

View File

@ -48,7 +48,7 @@ import tractor
from piker.brokers import (
open_cached_client,
NoData,
MarketNotFound,
SymbolNotFound,
)
from piker._cacheables import (
async_lifo_cache,
@ -325,21 +325,9 @@ async def get_mkt_info(
venue_lower: str = venue.lower()
if not venue:
if expiry:
expiry = f'.{expiry}'
expected: str = (
f'{mkt_ep}'
f'.<venue>'
f'{expiry}'
f'.{broker}'
)
raise MarketNotFound(
raise SymbolNotFound(
f'Invalid or missing .<venue> part in fqme?\n'
f'\n'
f'fqme: {fqme!r}\n'
f'expected-form>> {expected}\n'
f'\n'
f'Maybe you are missing a ".spot." ?\n'
)
# XXX TODO: we should change the usdtm_futes name to just

View File

@ -425,7 +425,7 @@ class DataFeed:
async def stream_to_file(
watchlist_name: str,
filename: str,
portal: tractor.Portal,
portal: tractor._portal.Portal,
tickers: List[str],
brokermod: ModuleType,
rate: int,

View File

@ -23,6 +23,7 @@ from contextlib import (
asynccontextmanager as acm,
)
from datetime import datetime
from functools import partial
import time
from typing import (
Any,
@ -523,12 +524,13 @@ async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
async def aio_price_feed_relay(
chan: to_asyncio.LinkedTaskChannel,
fh: FeedHandler,
instrument: Symbol,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _trade(data: dict, receipt_timestamp):
chan.send_nowait(('trade', {
to_trio.send_nowait(('trade', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'last': data,
@ -538,7 +540,7 @@ async def aio_price_feed_relay(
}))
async def _l1(data: dict, receipt_timestamp):
chan.send_nowait(('l1', {
to_trio.send_nowait(('l1', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'ticks': [
@ -568,7 +570,7 @@ async def aio_price_feed_relay(
install_signal_handlers=False)
# sync with trio
chan.started_nowait(None)
to_trio.send_nowait(None)
await asyncio.sleep(float('inf'))
@ -579,9 +581,11 @@ async def open_price_feed(
) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from(
partial(
aio_price_feed_relay,
fh=fh,
instrument=instrument,
fh,
instrument
)
) as (chan, first):
yield chan
@ -607,9 +611,10 @@ async def maybe_open_price_feed(
async def aio_order_feed_relay(
chan: to_asyncio.LinkedTaskChannel,
fh: FeedHandler,
instrument: Symbol,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _fill(data: dict, receipt_timestamp):
breakpoint()
@ -632,7 +637,7 @@ async def aio_order_feed_relay(
install_signal_handlers=False)
# sync with trio
chan.started_nowait(None)
to_trio.send_nowait(None)
await asyncio.sleep(float('inf'))
@ -643,9 +648,11 @@ async def open_order_feed(
) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from(
partial(
aio_order_feed_relay,
fh=fh,
instrument=instrument,
fh,
instrument
)
) as (chan, first):
yield chan

View File

@ -231,21 +231,20 @@ async def handle_order_requests(
async def recv_trade_updates(
chan: tractor.to_asyncio.LinkedTaskChannel,
client: Client,
to_trio: trio.abc.SendChannel,
) -> None:
'''
Receive and relay order control and positioning
related events from `ib_async`, pack as tuples and
push over mem-chan to our trio relay task for
processing and relay to EMS.
Receive and relay order control and positioning related events
from `ib_async`, pack as tuples and push over mem-chan to our
trio relay task for processing and relay to EMS.
'''
client.inline_errors(chan)
client.inline_errors(to_trio)
# sync with trio task
chan.started_nowait(client.ib)
to_trio.send_nowait(client.ib)
def push_tradesies(
eventkit_obj,
@ -283,7 +282,7 @@ async def recv_trade_updates(
try:
# emit event name + relevant ibis internal objects
chan.send_nowait((event_name, emit))
to_trio.send_nowait((event_name, emit))
except trio.BrokenResourceError:
log.exception(f'Disconnected from {eventkit_obj} updates')
eventkit_obj.disconnect(push_tradesies)

View File

@ -27,7 +27,7 @@ from types import ModuleType
import click
import trio
import tractor
from tractor.discovery._multiaddr import parse_maddr
from tractor._multiaddr import parse_maddr
from ..log import (
get_console_log,
@ -345,7 +345,7 @@ def services(
if not ports:
ports: list[int] = [_default_registry_port]
addr = tractor.discovery._addr.wrap_address(
addr = tractor._addr.wrap_address(
addr=(host, ports[0])
)

View File

@ -77,7 +77,7 @@ from ._sampling import (
if TYPE_CHECKING:
from .flows import Flume
from tractor.discovery._addr import Address
from tractor._addr import Address
from tractor.msg.types import Aid

View File

@ -91,7 +91,7 @@ async def open_piker_runtime(
try:
actor = tractor.current_actor()
except tractor._exceptions.NoRuntime:
tractor.runtime._state._runtime_vars[
tractor._state._runtime_vars[
'piker_vars'
] = tractor_runtime_overrides
@ -264,7 +264,7 @@ async def maybe_open_pikerd(
**kwargs,
) -> (
tractor.Portal
tractor._portal.Portal
|ClassVar[Services]
):
'''

View File

@ -79,17 +79,10 @@ async def maybe_spawn_daemon(
lock = Services.locks[service_name]
await lock.acquire()
if not pikerd_kwargs:
# XXX NOTE, pin to apprope `tractor` branch!
rtvs: dict = tractor.get_runtime_vars()
registry_addrs: list[tuple] = list(
map(tuple, rtvs['_registry_addrs'])
)
try:
async with find_service(
service_name,
registry_addrs=registry_addrs,
registry_addrs=[('127.0.0.1', 6116)],
) as portal:
if portal is not None:
lock.release()
@ -106,7 +99,6 @@ async def maybe_spawn_daemon(
# process tree
async with maybe_open_pikerd(
loglevel=loglevel,
registry_addrs=registry_addrs,
**pikerd_kwargs,
) as pikerd_portal:

View File

@ -48,7 +48,7 @@ log = get_logger(name=__name__)
# new actors and supervises them to completion?
class Services:
actor_n: tractor.ActorNursery
actor_n: tractor._supervise.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[

View File

@ -306,7 +306,7 @@ def ldshm(
# TODO: call null-seg fixer somehow?
if null_segs:
if tractor.runtime._state.is_debug_mode():
if tractor._state.is_debug_mode():
await tractor.pause()
# async with (
# trio.open_nursery() as tn,

View File

@ -196,7 +196,7 @@ async def markup_gaps(
prev_r: pl.DataFrame = prev_row_by_i
# debug any missing pre-row
if tractor.runtime._state.is_debug_mode():
if tractor._state.is_debug_mode():
await tractor.pause()
istart: int = prev_r['index'][0]

View File

@ -167,7 +167,7 @@ async def stream_symbol_selection():
async def _async_main(
name: str,
portal: tractor.Portal,
portal: tractor._portal.Portal,
symbols: List[str],
brokermod: ModuleType,
loglevel: str = 'info',

View File

@ -436,7 +436,7 @@ class OptionChain(object):
async def new_chain_ui(
portal: tractor.Portal,
portal: tractor._portal.Portal,
symbol: str,
brokermod: types.ModuleType,
rate: int = 1,

View File

@ -137,6 +137,7 @@ async def _open_test_pikerd(
raddr = portal.chan.raddr
uw_raddr: tuple = raddr.unwrap()
assert uw_raddr == reg_addr
await tractor.pause()
yield (
raddr._host,
raddr._port,

View File

@ -23,35 +23,13 @@ from piker.accounting import (
'fqmes',
[
# binance
(100, {
# !TODO, write a suite which validates raising against
# bad/legacy fqmes such as this!
# 'btcusdt.binance',
'btcusdt.spot.binance',
'ethusdt.spot.binance',
}, False),
(100, {'btcusdt.binance', 'ethusdt.binance'}, False),
# kraken
(20, {
# !TODO, write a suite which validates raising against
# bad/legacy fqmes such as this!
# 'ethusdt.kraken',
# 'xbtusd.kraken',
'ethusdt.spot.kraken',
'xbtusd.spot.kraken',
}, True),
(20, {'ethusdt.kraken', 'xbtusd.kraken'}, True),
# binance + kraken
(100, {
# !TODO, write a suite which validates raising against
# bad/legacy fqmes such as this!
# 'btcusdt.binance',
# 'xbtusd.kraken',
'btcusdt.spot.binance',
'xbtusd.spot.kraken',
}, False),
(100, {'btcusdt.binance', 'xbtusd.kraken'}, False),
],
ids=lambda param: f'quotes={param[0]}@fqmes={param[1]}',
)
@ -70,17 +48,12 @@ def test_multi_fqsn_feed(
if (
ci_env
and
not run_in_ci
and not run_in_ci
):
pytest.skip(
'CI-disabled-test due to live-feed restrictions'
)
pytest.skip('Skipping CI disabled test due to feed restrictions')
brokers = set()
for fqme in fqmes:
# ?TODO, add this unpack + normalize check to a symbology
# helper fn?
brokername, *_ = unpack_fqme(fqme)
brokers.add(brokername)