Compare commits
6 Commits
75b73ee007
...
6402146116
| Author | SHA1 | Date |
|---|---|---|
|
|
6402146116 | |
|
|
5e44ad05e8 | |
|
|
799c9f45b4 | |
|
|
3254f9233c | |
|
|
ced898f580 | |
|
|
188ce1415e |
|
|
@ -3,7 +3,8 @@
|
||||||
"allow": [
|
"allow": [
|
||||||
"Bash(chmod:*)",
|
"Bash(chmod:*)",
|
||||||
"Bash(/tmp/piker_commits.txt)",
|
"Bash(/tmp/piker_commits.txt)",
|
||||||
"Bash(python:*)"
|
"Bash(python:*)",
|
||||||
|
"Bash(ls:*)"
|
||||||
],
|
],
|
||||||
"deny": [],
|
"deny": [],
|
||||||
"ask": []
|
"ask": []
|
||||||
|
|
|
||||||
|
|
@ -31,6 +31,7 @@ from piker.log import (
|
||||||
from ._util import (
|
from ._util import (
|
||||||
BrokerError,
|
BrokerError,
|
||||||
SymbolNotFound,
|
SymbolNotFound,
|
||||||
|
MarketNotFound as MarketNotFound,
|
||||||
NoData,
|
NoData,
|
||||||
DataUnavailable,
|
DataUnavailable,
|
||||||
DataThrottle,
|
DataThrottle,
|
||||||
|
|
|
||||||
|
|
@ -66,6 +66,10 @@ class SymbolNotFound(BrokerError):
|
||||||
"Symbol not found by broker search"
|
"Symbol not found by broker search"
|
||||||
|
|
||||||
|
|
||||||
|
class MarketNotFound(SymbolNotFound):
|
||||||
|
"Mkt-pair not found by broker search"
|
||||||
|
|
||||||
|
|
||||||
# TODO: these should probably be moved to `.tsp/.data`?
|
# TODO: these should probably be moved to `.tsp/.data`?
|
||||||
class NoData(BrokerError):
|
class NoData(BrokerError):
|
||||||
'''
|
'''
|
||||||
|
|
|
||||||
|
|
@ -48,7 +48,7 @@ import tractor
|
||||||
from piker.brokers import (
|
from piker.brokers import (
|
||||||
open_cached_client,
|
open_cached_client,
|
||||||
NoData,
|
NoData,
|
||||||
SymbolNotFound,
|
MarketNotFound,
|
||||||
)
|
)
|
||||||
from piker._cacheables import (
|
from piker._cacheables import (
|
||||||
async_lifo_cache,
|
async_lifo_cache,
|
||||||
|
|
@ -325,9 +325,21 @@ async def get_mkt_info(
|
||||||
venue_lower: str = venue.lower()
|
venue_lower: str = venue.lower()
|
||||||
|
|
||||||
if not venue:
|
if not venue:
|
||||||
raise SymbolNotFound(
|
if expiry:
|
||||||
|
expiry = f'.{expiry}'
|
||||||
|
expected: str = (
|
||||||
|
f'{mkt_ep}'
|
||||||
|
f'.<venue>'
|
||||||
|
f'{expiry}'
|
||||||
|
f'.{broker}'
|
||||||
|
)
|
||||||
|
raise MarketNotFound(
|
||||||
f'Invalid or missing .<venue> part in fqme?\n'
|
f'Invalid or missing .<venue> part in fqme?\n'
|
||||||
|
f'\n'
|
||||||
f'fqme: {fqme!r}\n'
|
f'fqme: {fqme!r}\n'
|
||||||
|
f'expected-form>> {expected}\n'
|
||||||
|
f'\n'
|
||||||
|
f'Maybe you are missing a ".spot." ?\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX TODO: we should change the usdtm_futes name to just
|
# XXX TODO: we should change the usdtm_futes name to just
|
||||||
|
|
|
||||||
|
|
@ -425,7 +425,7 @@ class DataFeed:
|
||||||
async def stream_to_file(
|
async def stream_to_file(
|
||||||
watchlist_name: str,
|
watchlist_name: str,
|
||||||
filename: str,
|
filename: str,
|
||||||
portal: tractor._portal.Portal,
|
portal: tractor.Portal,
|
||||||
tickers: List[str],
|
tickers: List[str],
|
||||||
brokermod: ModuleType,
|
brokermod: ModuleType,
|
||||||
rate: int,
|
rate: int,
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,6 @@ from contextlib import (
|
||||||
asynccontextmanager as acm,
|
asynccontextmanager as acm,
|
||||||
)
|
)
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from functools import partial
|
|
||||||
import time
|
import time
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
|
|
@ -524,13 +523,12 @@ async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
|
||||||
|
|
||||||
|
|
||||||
async def aio_price_feed_relay(
|
async def aio_price_feed_relay(
|
||||||
|
chan: to_asyncio.LinkedTaskChannel,
|
||||||
fh: FeedHandler,
|
fh: FeedHandler,
|
||||||
instrument: Symbol,
|
instrument: Symbol,
|
||||||
from_trio: asyncio.Queue,
|
|
||||||
to_trio: trio.abc.SendChannel,
|
|
||||||
) -> None:
|
) -> None:
|
||||||
async def _trade(data: dict, receipt_timestamp):
|
async def _trade(data: dict, receipt_timestamp):
|
||||||
to_trio.send_nowait(('trade', {
|
chan.send_nowait(('trade', {
|
||||||
'symbol': cb_sym_to_deribit_inst(
|
'symbol': cb_sym_to_deribit_inst(
|
||||||
str_to_cb_sym(data.symbol)).lower(),
|
str_to_cb_sym(data.symbol)).lower(),
|
||||||
'last': data,
|
'last': data,
|
||||||
|
|
@ -540,7 +538,7 @@ async def aio_price_feed_relay(
|
||||||
}))
|
}))
|
||||||
|
|
||||||
async def _l1(data: dict, receipt_timestamp):
|
async def _l1(data: dict, receipt_timestamp):
|
||||||
to_trio.send_nowait(('l1', {
|
chan.send_nowait(('l1', {
|
||||||
'symbol': cb_sym_to_deribit_inst(
|
'symbol': cb_sym_to_deribit_inst(
|
||||||
str_to_cb_sym(data.symbol)).lower(),
|
str_to_cb_sym(data.symbol)).lower(),
|
||||||
'ticks': [
|
'ticks': [
|
||||||
|
|
@ -570,7 +568,7 @@ async def aio_price_feed_relay(
|
||||||
install_signal_handlers=False)
|
install_signal_handlers=False)
|
||||||
|
|
||||||
# sync with trio
|
# sync with trio
|
||||||
to_trio.send_nowait(None)
|
chan.started_nowait(None)
|
||||||
|
|
||||||
await asyncio.sleep(float('inf'))
|
await asyncio.sleep(float('inf'))
|
||||||
|
|
||||||
|
|
@ -581,11 +579,9 @@ async def open_price_feed(
|
||||||
) -> trio.abc.ReceiveStream:
|
) -> trio.abc.ReceiveStream:
|
||||||
async with maybe_open_feed_handler() as fh:
|
async with maybe_open_feed_handler() as fh:
|
||||||
async with to_asyncio.open_channel_from(
|
async with to_asyncio.open_channel_from(
|
||||||
partial(
|
aio_price_feed_relay,
|
||||||
aio_price_feed_relay,
|
fh=fh,
|
||||||
fh,
|
instrument=instrument,
|
||||||
instrument
|
|
||||||
)
|
|
||||||
) as (chan, first):
|
) as (chan, first):
|
||||||
yield chan
|
yield chan
|
||||||
|
|
||||||
|
|
@ -611,10 +607,9 @@ async def maybe_open_price_feed(
|
||||||
|
|
||||||
|
|
||||||
async def aio_order_feed_relay(
|
async def aio_order_feed_relay(
|
||||||
|
chan: to_asyncio.LinkedTaskChannel,
|
||||||
fh: FeedHandler,
|
fh: FeedHandler,
|
||||||
instrument: Symbol,
|
instrument: Symbol,
|
||||||
from_trio: asyncio.Queue,
|
|
||||||
to_trio: trio.abc.SendChannel,
|
|
||||||
) -> None:
|
) -> None:
|
||||||
async def _fill(data: dict, receipt_timestamp):
|
async def _fill(data: dict, receipt_timestamp):
|
||||||
breakpoint()
|
breakpoint()
|
||||||
|
|
@ -637,7 +632,7 @@ async def aio_order_feed_relay(
|
||||||
install_signal_handlers=False)
|
install_signal_handlers=False)
|
||||||
|
|
||||||
# sync with trio
|
# sync with trio
|
||||||
to_trio.send_nowait(None)
|
chan.started_nowait(None)
|
||||||
|
|
||||||
await asyncio.sleep(float('inf'))
|
await asyncio.sleep(float('inf'))
|
||||||
|
|
||||||
|
|
@ -648,11 +643,9 @@ async def open_order_feed(
|
||||||
) -> trio.abc.ReceiveStream:
|
) -> trio.abc.ReceiveStream:
|
||||||
async with maybe_open_feed_handler() as fh:
|
async with maybe_open_feed_handler() as fh:
|
||||||
async with to_asyncio.open_channel_from(
|
async with to_asyncio.open_channel_from(
|
||||||
partial(
|
aio_order_feed_relay,
|
||||||
aio_order_feed_relay,
|
fh=fh,
|
||||||
fh,
|
instrument=instrument,
|
||||||
instrument
|
|
||||||
)
|
|
||||||
) as (chan, first):
|
) as (chan, first):
|
||||||
yield chan
|
yield chan
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -231,20 +231,21 @@ async def handle_order_requests(
|
||||||
|
|
||||||
async def recv_trade_updates(
|
async def recv_trade_updates(
|
||||||
|
|
||||||
|
chan: tractor.to_asyncio.LinkedTaskChannel,
|
||||||
client: Client,
|
client: Client,
|
||||||
to_trio: trio.abc.SendChannel,
|
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Receive and relay order control and positioning related events
|
Receive and relay order control and positioning
|
||||||
from `ib_async`, pack as tuples and push over mem-chan to our
|
related events from `ib_async`, pack as tuples and
|
||||||
trio relay task for processing and relay to EMS.
|
push over mem-chan to our trio relay task for
|
||||||
|
processing and relay to EMS.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
client.inline_errors(to_trio)
|
client.inline_errors(chan)
|
||||||
|
|
||||||
# sync with trio task
|
# sync with trio task
|
||||||
to_trio.send_nowait(client.ib)
|
chan.started_nowait(client.ib)
|
||||||
|
|
||||||
def push_tradesies(
|
def push_tradesies(
|
||||||
eventkit_obj,
|
eventkit_obj,
|
||||||
|
|
@ -282,7 +283,7 @@ async def recv_trade_updates(
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# emit event name + relevant ibis internal objects
|
# emit event name + relevant ibis internal objects
|
||||||
to_trio.send_nowait((event_name, emit))
|
chan.send_nowait((event_name, emit))
|
||||||
except trio.BrokenResourceError:
|
except trio.BrokenResourceError:
|
||||||
log.exception(f'Disconnected from {eventkit_obj} updates')
|
log.exception(f'Disconnected from {eventkit_obj} updates')
|
||||||
eventkit_obj.disconnect(push_tradesies)
|
eventkit_obj.disconnect(push_tradesies)
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,7 @@ from types import ModuleType
|
||||||
import click
|
import click
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
from tractor._multiaddr import parse_maddr
|
from tractor.discovery._multiaddr import parse_maddr
|
||||||
|
|
||||||
from ..log import (
|
from ..log import (
|
||||||
get_console_log,
|
get_console_log,
|
||||||
|
|
@ -345,7 +345,7 @@ def services(
|
||||||
if not ports:
|
if not ports:
|
||||||
ports: list[int] = [_default_registry_port]
|
ports: list[int] = [_default_registry_port]
|
||||||
|
|
||||||
addr = tractor._addr.wrap_address(
|
addr = tractor.discovery._addr.wrap_address(
|
||||||
addr=(host, ports[0])
|
addr=(host, ports[0])
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -77,7 +77,7 @@ from ._sampling import (
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from .flows import Flume
|
from .flows import Flume
|
||||||
from tractor._addr import Address
|
from tractor.discovery._addr import Address
|
||||||
from tractor.msg.types import Aid
|
from tractor.msg.types import Aid
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -91,7 +91,7 @@ async def open_piker_runtime(
|
||||||
try:
|
try:
|
||||||
actor = tractor.current_actor()
|
actor = tractor.current_actor()
|
||||||
except tractor._exceptions.NoRuntime:
|
except tractor._exceptions.NoRuntime:
|
||||||
tractor._state._runtime_vars[
|
tractor.runtime._state._runtime_vars[
|
||||||
'piker_vars'
|
'piker_vars'
|
||||||
] = tractor_runtime_overrides
|
] = tractor_runtime_overrides
|
||||||
|
|
||||||
|
|
@ -264,7 +264,7 @@ async def maybe_open_pikerd(
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> (
|
) -> (
|
||||||
tractor._portal.Portal
|
tractor.Portal
|
||||||
|ClassVar[Services]
|
|ClassVar[Services]
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
|
|
|
||||||
|
|
@ -79,10 +79,17 @@ async def maybe_spawn_daemon(
|
||||||
lock = Services.locks[service_name]
|
lock = Services.locks[service_name]
|
||||||
await lock.acquire()
|
await lock.acquire()
|
||||||
|
|
||||||
|
if not pikerd_kwargs:
|
||||||
|
# XXX NOTE, pin to apprope `tractor` branch!
|
||||||
|
rtvs: dict = tractor.get_runtime_vars()
|
||||||
|
registry_addrs: list[tuple] = list(
|
||||||
|
map(tuple, rtvs['_registry_addrs'])
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with find_service(
|
async with find_service(
|
||||||
service_name,
|
service_name,
|
||||||
registry_addrs=[('127.0.0.1', 6116)],
|
registry_addrs=registry_addrs,
|
||||||
) as portal:
|
) as portal:
|
||||||
if portal is not None:
|
if portal is not None:
|
||||||
lock.release()
|
lock.release()
|
||||||
|
|
@ -99,6 +106,7 @@ async def maybe_spawn_daemon(
|
||||||
# process tree
|
# process tree
|
||||||
async with maybe_open_pikerd(
|
async with maybe_open_pikerd(
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
|
registry_addrs=registry_addrs,
|
||||||
**pikerd_kwargs,
|
**pikerd_kwargs,
|
||||||
|
|
||||||
) as pikerd_portal:
|
) as pikerd_portal:
|
||||||
|
|
|
||||||
|
|
@ -48,7 +48,7 @@ log = get_logger(name=__name__)
|
||||||
# new actors and supervises them to completion?
|
# new actors and supervises them to completion?
|
||||||
class Services:
|
class Services:
|
||||||
|
|
||||||
actor_n: tractor._supervise.ActorNursery
|
actor_n: tractor.ActorNursery
|
||||||
service_n: trio.Nursery
|
service_n: trio.Nursery
|
||||||
debug_mode: bool # tractor sub-actor debug mode flag
|
debug_mode: bool # tractor sub-actor debug mode flag
|
||||||
service_tasks: dict[
|
service_tasks: dict[
|
||||||
|
|
|
||||||
|
|
@ -306,7 +306,7 @@ def ldshm(
|
||||||
# TODO: call null-seg fixer somehow?
|
# TODO: call null-seg fixer somehow?
|
||||||
if null_segs:
|
if null_segs:
|
||||||
|
|
||||||
if tractor._state.is_debug_mode():
|
if tractor.runtime._state.is_debug_mode():
|
||||||
await tractor.pause()
|
await tractor.pause()
|
||||||
# async with (
|
# async with (
|
||||||
# trio.open_nursery() as tn,
|
# trio.open_nursery() as tn,
|
||||||
|
|
|
||||||
|
|
@ -196,7 +196,7 @@ async def markup_gaps(
|
||||||
prev_r: pl.DataFrame = prev_row_by_i
|
prev_r: pl.DataFrame = prev_row_by_i
|
||||||
|
|
||||||
# debug any missing pre-row
|
# debug any missing pre-row
|
||||||
if tractor._state.is_debug_mode():
|
if tractor.runtime._state.is_debug_mode():
|
||||||
await tractor.pause()
|
await tractor.pause()
|
||||||
|
|
||||||
istart: int = prev_r['index'][0]
|
istart: int = prev_r['index'][0]
|
||||||
|
|
|
||||||
|
|
@ -167,7 +167,7 @@ async def stream_symbol_selection():
|
||||||
|
|
||||||
async def _async_main(
|
async def _async_main(
|
||||||
name: str,
|
name: str,
|
||||||
portal: tractor._portal.Portal,
|
portal: tractor.Portal,
|
||||||
symbols: List[str],
|
symbols: List[str],
|
||||||
brokermod: ModuleType,
|
brokermod: ModuleType,
|
||||||
loglevel: str = 'info',
|
loglevel: str = 'info',
|
||||||
|
|
|
||||||
|
|
@ -436,7 +436,7 @@ class OptionChain(object):
|
||||||
|
|
||||||
|
|
||||||
async def new_chain_ui(
|
async def new_chain_ui(
|
||||||
portal: tractor._portal.Portal,
|
portal: tractor.Portal,
|
||||||
symbol: str,
|
symbol: str,
|
||||||
brokermod: types.ModuleType,
|
brokermod: types.ModuleType,
|
||||||
rate: int = 1,
|
rate: int = 1,
|
||||||
|
|
|
||||||
|
|
@ -137,7 +137,6 @@ async def _open_test_pikerd(
|
||||||
raddr = portal.chan.raddr
|
raddr = portal.chan.raddr
|
||||||
uw_raddr: tuple = raddr.unwrap()
|
uw_raddr: tuple = raddr.unwrap()
|
||||||
assert uw_raddr == reg_addr
|
assert uw_raddr == reg_addr
|
||||||
await tractor.pause()
|
|
||||||
yield (
|
yield (
|
||||||
raddr._host,
|
raddr._host,
|
||||||
raddr._port,
|
raddr._port,
|
||||||
|
|
|
||||||
|
|
@ -23,13 +23,35 @@ from piker.accounting import (
|
||||||
'fqmes',
|
'fqmes',
|
||||||
[
|
[
|
||||||
# binance
|
# binance
|
||||||
(100, {'btcusdt.binance', 'ethusdt.binance'}, False),
|
(100, {
|
||||||
|
# !TODO, write a suite which validates raising against
|
||||||
|
# bad/legacy fqmes such as this!
|
||||||
|
# 'btcusdt.binance',
|
||||||
|
'btcusdt.spot.binance',
|
||||||
|
'ethusdt.spot.binance',
|
||||||
|
}, False),
|
||||||
|
|
||||||
# kraken
|
# kraken
|
||||||
(20, {'ethusdt.kraken', 'xbtusd.kraken'}, True),
|
(20, {
|
||||||
|
# !TODO, write a suite which validates raising against
|
||||||
|
# bad/legacy fqmes such as this!
|
||||||
|
# 'ethusdt.kraken',
|
||||||
|
# 'xbtusd.kraken',
|
||||||
|
|
||||||
|
'ethusdt.spot.kraken',
|
||||||
|
'xbtusd.spot.kraken',
|
||||||
|
}, True),
|
||||||
|
|
||||||
# binance + kraken
|
# binance + kraken
|
||||||
(100, {'btcusdt.binance', 'xbtusd.kraken'}, False),
|
(100, {
|
||||||
|
# !TODO, write a suite which validates raising against
|
||||||
|
# bad/legacy fqmes such as this!
|
||||||
|
# 'btcusdt.binance',
|
||||||
|
# 'xbtusd.kraken',
|
||||||
|
|
||||||
|
'btcusdt.spot.binance',
|
||||||
|
'xbtusd.spot.kraken',
|
||||||
|
}, False),
|
||||||
],
|
],
|
||||||
ids=lambda param: f'quotes={param[0]}@fqmes={param[1]}',
|
ids=lambda param: f'quotes={param[0]}@fqmes={param[1]}',
|
||||||
)
|
)
|
||||||
|
|
@ -48,12 +70,17 @@ def test_multi_fqsn_feed(
|
||||||
|
|
||||||
if (
|
if (
|
||||||
ci_env
|
ci_env
|
||||||
and not run_in_ci
|
and
|
||||||
|
not run_in_ci
|
||||||
):
|
):
|
||||||
pytest.skip('Skipping CI disabled test due to feed restrictions')
|
pytest.skip(
|
||||||
|
'CI-disabled-test due to live-feed restrictions'
|
||||||
|
)
|
||||||
|
|
||||||
brokers = set()
|
brokers = set()
|
||||||
for fqme in fqmes:
|
for fqme in fqmes:
|
||||||
|
# ?TODO, add this unpack + normalize check to a symbology
|
||||||
|
# helper fn?
|
||||||
brokername, *_ = unpack_fqme(fqme)
|
brokername, *_ = unpack_fqme(fqme)
|
||||||
brokers.add(brokername)
|
brokers.add(brokername)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue