Compare commits

..

No commits in common. "640214611622e69123c73142e532b14da14dbb48" and "75b73ee007e280082f1156ce65ded837d29cb61a" have entirely different histories.

18 changed files with 47 additions and 93 deletions

View File

@ -3,8 +3,7 @@
"allow": [ "allow": [
"Bash(chmod:*)", "Bash(chmod:*)",
"Bash(/tmp/piker_commits.txt)", "Bash(/tmp/piker_commits.txt)",
"Bash(python:*)", "Bash(python:*)"
"Bash(ls:*)"
], ],
"deny": [], "deny": [],
"ask": [] "ask": []

View File

@ -31,7 +31,6 @@ from piker.log import (
from ._util import ( from ._util import (
BrokerError, BrokerError,
SymbolNotFound, SymbolNotFound,
MarketNotFound as MarketNotFound,
NoData, NoData,
DataUnavailable, DataUnavailable,
DataThrottle, DataThrottle,

View File

@ -66,10 +66,6 @@ class SymbolNotFound(BrokerError):
"Symbol not found by broker search" "Symbol not found by broker search"
class MarketNotFound(SymbolNotFound):
"Mkt-pair not found by broker search"
# TODO: these should probably be moved to `.tsp/.data`? # TODO: these should probably be moved to `.tsp/.data`?
class NoData(BrokerError): class NoData(BrokerError):
''' '''

View File

@ -48,7 +48,7 @@ import tractor
from piker.brokers import ( from piker.brokers import (
open_cached_client, open_cached_client,
NoData, NoData,
MarketNotFound, SymbolNotFound,
) )
from piker._cacheables import ( from piker._cacheables import (
async_lifo_cache, async_lifo_cache,
@ -325,21 +325,9 @@ async def get_mkt_info(
venue_lower: str = venue.lower() venue_lower: str = venue.lower()
if not venue: if not venue:
if expiry: raise SymbolNotFound(
expiry = f'.{expiry}'
expected: str = (
f'{mkt_ep}'
f'.<venue>'
f'{expiry}'
f'.{broker}'
)
raise MarketNotFound(
f'Invalid or missing .<venue> part in fqme?\n' f'Invalid or missing .<venue> part in fqme?\n'
f'\n'
f'fqme: {fqme!r}\n' f'fqme: {fqme!r}\n'
f'expected-form>> {expected}\n'
f'\n'
f'Maybe you are missing a ".spot." ?\n'
) )
# XXX TODO: we should change the usdtm_futes name to just # XXX TODO: we should change the usdtm_futes name to just

View File

@ -425,7 +425,7 @@ class DataFeed:
async def stream_to_file( async def stream_to_file(
watchlist_name: str, watchlist_name: str,
filename: str, filename: str,
portal: tractor.Portal, portal: tractor._portal.Portal,
tickers: List[str], tickers: List[str],
brokermod: ModuleType, brokermod: ModuleType,
rate: int, rate: int,

View File

@ -23,6 +23,7 @@ from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from datetime import datetime from datetime import datetime
from functools import partial
import time import time
from typing import ( from typing import (
Any, Any,
@ -523,12 +524,13 @@ async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
async def aio_price_feed_relay( async def aio_price_feed_relay(
chan: to_asyncio.LinkedTaskChannel,
fh: FeedHandler, fh: FeedHandler,
instrument: Symbol, instrument: Symbol,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None: ) -> None:
async def _trade(data: dict, receipt_timestamp): async def _trade(data: dict, receipt_timestamp):
chan.send_nowait(('trade', { to_trio.send_nowait(('trade', {
'symbol': cb_sym_to_deribit_inst( 'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(), str_to_cb_sym(data.symbol)).lower(),
'last': data, 'last': data,
@ -538,7 +540,7 @@ async def aio_price_feed_relay(
})) }))
async def _l1(data: dict, receipt_timestamp): async def _l1(data: dict, receipt_timestamp):
chan.send_nowait(('l1', { to_trio.send_nowait(('l1', {
'symbol': cb_sym_to_deribit_inst( 'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(), str_to_cb_sym(data.symbol)).lower(),
'ticks': [ 'ticks': [
@ -568,7 +570,7 @@ async def aio_price_feed_relay(
install_signal_handlers=False) install_signal_handlers=False)
# sync with trio # sync with trio
chan.started_nowait(None) to_trio.send_nowait(None)
await asyncio.sleep(float('inf')) await asyncio.sleep(float('inf'))
@ -579,9 +581,11 @@ async def open_price_feed(
) -> trio.abc.ReceiveStream: ) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh: async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from( async with to_asyncio.open_channel_from(
partial(
aio_price_feed_relay, aio_price_feed_relay,
fh=fh, fh,
instrument=instrument, instrument
)
) as (chan, first): ) as (chan, first):
yield chan yield chan
@ -607,9 +611,10 @@ async def maybe_open_price_feed(
async def aio_order_feed_relay( async def aio_order_feed_relay(
chan: to_asyncio.LinkedTaskChannel,
fh: FeedHandler, fh: FeedHandler,
instrument: Symbol, instrument: Symbol,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None: ) -> None:
async def _fill(data: dict, receipt_timestamp): async def _fill(data: dict, receipt_timestamp):
breakpoint() breakpoint()
@ -632,7 +637,7 @@ async def aio_order_feed_relay(
install_signal_handlers=False) install_signal_handlers=False)
# sync with trio # sync with trio
chan.started_nowait(None) to_trio.send_nowait(None)
await asyncio.sleep(float('inf')) await asyncio.sleep(float('inf'))
@ -643,9 +648,11 @@ async def open_order_feed(
) -> trio.abc.ReceiveStream: ) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh: async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from( async with to_asyncio.open_channel_from(
partial(
aio_order_feed_relay, aio_order_feed_relay,
fh=fh, fh,
instrument=instrument, instrument
)
) as (chan, first): ) as (chan, first):
yield chan yield chan

View File

@ -231,21 +231,20 @@ async def handle_order_requests(
async def recv_trade_updates( async def recv_trade_updates(
chan: tractor.to_asyncio.LinkedTaskChannel,
client: Client, client: Client,
to_trio: trio.abc.SendChannel,
) -> None: ) -> None:
''' '''
Receive and relay order control and positioning Receive and relay order control and positioning related events
related events from `ib_async`, pack as tuples and from `ib_async`, pack as tuples and push over mem-chan to our
push over mem-chan to our trio relay task for trio relay task for processing and relay to EMS.
processing and relay to EMS.
''' '''
client.inline_errors(chan) client.inline_errors(to_trio)
# sync with trio task # sync with trio task
chan.started_nowait(client.ib) to_trio.send_nowait(client.ib)
def push_tradesies( def push_tradesies(
eventkit_obj, eventkit_obj,
@ -283,7 +282,7 @@ async def recv_trade_updates(
try: try:
# emit event name + relevant ibis internal objects # emit event name + relevant ibis internal objects
chan.send_nowait((event_name, emit)) to_trio.send_nowait((event_name, emit))
except trio.BrokenResourceError: except trio.BrokenResourceError:
log.exception(f'Disconnected from {eventkit_obj} updates') log.exception(f'Disconnected from {eventkit_obj} updates')
eventkit_obj.disconnect(push_tradesies) eventkit_obj.disconnect(push_tradesies)

View File

@ -27,7 +27,7 @@ from types import ModuleType
import click import click
import trio import trio
import tractor import tractor
from tractor.discovery._multiaddr import parse_maddr from tractor._multiaddr import parse_maddr
from ..log import ( from ..log import (
get_console_log, get_console_log,
@ -345,7 +345,7 @@ def services(
if not ports: if not ports:
ports: list[int] = [_default_registry_port] ports: list[int] = [_default_registry_port]
addr = tractor.discovery._addr.wrap_address( addr = tractor._addr.wrap_address(
addr=(host, ports[0]) addr=(host, ports[0])
) )

View File

@ -77,7 +77,7 @@ from ._sampling import (
if TYPE_CHECKING: if TYPE_CHECKING:
from .flows import Flume from .flows import Flume
from tractor.discovery._addr import Address from tractor._addr import Address
from tractor.msg.types import Aid from tractor.msg.types import Aid

View File

@ -91,7 +91,7 @@ async def open_piker_runtime(
try: try:
actor = tractor.current_actor() actor = tractor.current_actor()
except tractor._exceptions.NoRuntime: except tractor._exceptions.NoRuntime:
tractor.runtime._state._runtime_vars[ tractor._state._runtime_vars[
'piker_vars' 'piker_vars'
] = tractor_runtime_overrides ] = tractor_runtime_overrides
@ -264,7 +264,7 @@ async def maybe_open_pikerd(
**kwargs, **kwargs,
) -> ( ) -> (
tractor.Portal tractor._portal.Portal
|ClassVar[Services] |ClassVar[Services]
): ):
''' '''

View File

@ -79,17 +79,10 @@ async def maybe_spawn_daemon(
lock = Services.locks[service_name] lock = Services.locks[service_name]
await lock.acquire() await lock.acquire()
if not pikerd_kwargs:
# XXX NOTE, pin to apprope `tractor` branch!
rtvs: dict = tractor.get_runtime_vars()
registry_addrs: list[tuple] = list(
map(tuple, rtvs['_registry_addrs'])
)
try: try:
async with find_service( async with find_service(
service_name, service_name,
registry_addrs=registry_addrs, registry_addrs=[('127.0.0.1', 6116)],
) as portal: ) as portal:
if portal is not None: if portal is not None:
lock.release() lock.release()
@ -106,7 +99,6 @@ async def maybe_spawn_daemon(
# process tree # process tree
async with maybe_open_pikerd( async with maybe_open_pikerd(
loglevel=loglevel, loglevel=loglevel,
registry_addrs=registry_addrs,
**pikerd_kwargs, **pikerd_kwargs,
) as pikerd_portal: ) as pikerd_portal:

View File

@ -48,7 +48,7 @@ log = get_logger(name=__name__)
# new actors and supervises them to completion? # new actors and supervises them to completion?
class Services: class Services:
actor_n: tractor.ActorNursery actor_n: tractor._supervise.ActorNursery
service_n: trio.Nursery service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[ service_tasks: dict[

View File

@ -306,7 +306,7 @@ def ldshm(
# TODO: call null-seg fixer somehow? # TODO: call null-seg fixer somehow?
if null_segs: if null_segs:
if tractor.runtime._state.is_debug_mode(): if tractor._state.is_debug_mode():
await tractor.pause() await tractor.pause()
# async with ( # async with (
# trio.open_nursery() as tn, # trio.open_nursery() as tn,

View File

@ -196,7 +196,7 @@ async def markup_gaps(
prev_r: pl.DataFrame = prev_row_by_i prev_r: pl.DataFrame = prev_row_by_i
# debug any missing pre-row # debug any missing pre-row
if tractor.runtime._state.is_debug_mode(): if tractor._state.is_debug_mode():
await tractor.pause() await tractor.pause()
istart: int = prev_r['index'][0] istart: int = prev_r['index'][0]

View File

@ -167,7 +167,7 @@ async def stream_symbol_selection():
async def _async_main( async def _async_main(
name: str, name: str,
portal: tractor.Portal, portal: tractor._portal.Portal,
symbols: List[str], symbols: List[str],
brokermod: ModuleType, brokermod: ModuleType,
loglevel: str = 'info', loglevel: str = 'info',

View File

@ -436,7 +436,7 @@ class OptionChain(object):
async def new_chain_ui( async def new_chain_ui(
portal: tractor.Portal, portal: tractor._portal.Portal,
symbol: str, symbol: str,
brokermod: types.ModuleType, brokermod: types.ModuleType,
rate: int = 1, rate: int = 1,

View File

@ -137,6 +137,7 @@ async def _open_test_pikerd(
raddr = portal.chan.raddr raddr = portal.chan.raddr
uw_raddr: tuple = raddr.unwrap() uw_raddr: tuple = raddr.unwrap()
assert uw_raddr == reg_addr assert uw_raddr == reg_addr
await tractor.pause()
yield ( yield (
raddr._host, raddr._host,
raddr._port, raddr._port,

View File

@ -23,35 +23,13 @@ from piker.accounting import (
'fqmes', 'fqmes',
[ [
# binance # binance
(100, { (100, {'btcusdt.binance', 'ethusdt.binance'}, False),
# !TODO, write a suite which validates raising against
# bad/legacy fqmes such as this!
# 'btcusdt.binance',
'btcusdt.spot.binance',
'ethusdt.spot.binance',
}, False),
# kraken # kraken
(20, { (20, {'ethusdt.kraken', 'xbtusd.kraken'}, True),
# !TODO, write a suite which validates raising against
# bad/legacy fqmes such as this!
# 'ethusdt.kraken',
# 'xbtusd.kraken',
'ethusdt.spot.kraken',
'xbtusd.spot.kraken',
}, True),
# binance + kraken # binance + kraken
(100, { (100, {'btcusdt.binance', 'xbtusd.kraken'}, False),
# !TODO, write a suite which validates raising against
# bad/legacy fqmes such as this!
# 'btcusdt.binance',
# 'xbtusd.kraken',
'btcusdt.spot.binance',
'xbtusd.spot.kraken',
}, False),
], ],
ids=lambda param: f'quotes={param[0]}@fqmes={param[1]}', ids=lambda param: f'quotes={param[0]}@fqmes={param[1]}',
) )
@ -70,17 +48,12 @@ def test_multi_fqsn_feed(
if ( if (
ci_env ci_env
and and not run_in_ci
not run_in_ci
): ):
pytest.skip( pytest.skip('Skipping CI disabled test due to feed restrictions')
'CI-disabled-test due to live-feed restrictions'
)
brokers = set() brokers = set()
for fqme in fqmes: for fqme in fqmes:
# ?TODO, add this unpack + normalize check to a symbology
# helper fn?
brokername, *_ = unpack_fqme(fqme) brokername, *_ = unpack_fqme(fqme)
brokers.add(brokername) brokers.add(brokername)