Parametrize ems service test to cancel with API and kbi
parent
f6549fcb62
commit
abd3cefd84
|
@ -20,7 +20,6 @@ from typing import (
|
||||||
)
|
)
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
# import pytest_trio
|
|
||||||
from exceptiongroup import BaseExceptionGroup
|
from exceptiongroup import BaseExceptionGroup
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
@ -50,6 +49,7 @@ from piker.accounting import (
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
async def order_and_and_wait_for_ppmsg(
|
async def order_and_and_wait_for_ppmsg(
|
||||||
client: OrderClient,
|
client: OrderClient,
|
||||||
trades_stream: tractor.MsgStream,
|
trades_stream: tractor.MsgStream,
|
||||||
|
@ -370,7 +370,10 @@ def test_multi_fill_positions(
|
||||||
|
|
||||||
run_and_tollerate_cancels(atest)
|
run_and_tollerate_cancels(atest)
|
||||||
|
|
||||||
if check_cross_session or accum_size != 0:
|
if (
|
||||||
|
check_cross_session
|
||||||
|
or accum_size != 0
|
||||||
|
):
|
||||||
# rerun just to check that position info is persistent for the paper
|
# rerun just to check that position info is persistent for the paper
|
||||||
# account (i.e. a user can expect to see paper pps persist across
|
# account (i.e. a user can expect to see paper pps persist across
|
||||||
# runtime sessions.
|
# runtime sessions.
|
||||||
|
|
|
@ -2,9 +2,13 @@
|
||||||
Actor tree daemon sub-service verifications
|
Actor tree daemon sub-service verifications
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from typing import AsyncContextManager
|
from typing import (
|
||||||
|
AsyncContextManager,
|
||||||
|
Callable,
|
||||||
|
)
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
|
|
||||||
|
from exceptiongroup import BaseExceptionGroup
|
||||||
import pytest
|
import pytest
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
|
@ -61,24 +65,10 @@ def test_runtime_boot(
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
@acm
|
|
||||||
async def ensure_service(
|
|
||||||
name: str,
|
|
||||||
sockaddr: tuple[str, int] | None = None,
|
|
||||||
) -> None:
|
|
||||||
async with find_service(name) as portal:
|
|
||||||
remote_sockaddr = portal.channel.raddr
|
|
||||||
print(f'FOUND `{name}` @ {remote_sockaddr}')
|
|
||||||
|
|
||||||
if sockaddr:
|
|
||||||
assert remote_sockaddr == sockaddr
|
|
||||||
|
|
||||||
yield portal
|
|
||||||
|
|
||||||
|
|
||||||
def test_ensure_datafeed_actors(
|
def test_ensure_datafeed_actors(
|
||||||
open_test_pikerd: AsyncContextManager,
|
open_test_pikerd: AsyncContextManager,
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
|
# cancel_method: str,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -94,6 +84,7 @@ def test_ensure_datafeed_actors(
|
||||||
async def main():
|
async def main():
|
||||||
async with (
|
async with (
|
||||||
open_test_pikerd(),
|
open_test_pikerd(),
|
||||||
|
|
||||||
open_feed(
|
open_feed(
|
||||||
['xbtusdt.kraken'],
|
['xbtusdt.kraken'],
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
|
@ -106,15 +97,89 @@ def test_ensure_datafeed_actors(
|
||||||
ensure_service(brokerd_name),
|
ensure_service(brokerd_name),
|
||||||
ensure_service('samplerd'),
|
ensure_service('samplerd'),
|
||||||
):
|
):
|
||||||
pass
|
await trio.sleep(0.1)
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
@acm
|
||||||
|
async def ensure_service(
|
||||||
|
name: str,
|
||||||
|
sockaddr: tuple[str, int] | None = None,
|
||||||
|
) -> None:
|
||||||
|
async with find_service(name) as portal:
|
||||||
|
remote_sockaddr = portal.channel.raddr
|
||||||
|
print(f'FOUND `{name}` @ {remote_sockaddr}')
|
||||||
|
|
||||||
|
if sockaddr:
|
||||||
|
assert remote_sockaddr == sockaddr
|
||||||
|
|
||||||
|
yield portal
|
||||||
|
|
||||||
|
|
||||||
|
def run_test_w_cancel_method(
|
||||||
|
cancel_method: str,
|
||||||
|
main: Callable,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Run our runtime under trio and expect a certain type of cancel condition
|
||||||
|
depending on input.
|
||||||
|
|
||||||
|
'''
|
||||||
|
cancelled_msg: str = (
|
||||||
|
"was remotely cancelled by remote actor (\'pikerd\'")
|
||||||
|
|
||||||
|
if cancel_method == 'sigint':
|
||||||
|
with pytest.raises(
|
||||||
|
BaseExceptionGroup,
|
||||||
|
) as exc_info:
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
multi = exc_info.value
|
||||||
|
|
||||||
|
for suberr in multi.exceptions:
|
||||||
|
match suberr:
|
||||||
|
# ensure we receive a remote cancellation error caused
|
||||||
|
# by the pikerd root actor since we used the
|
||||||
|
# `.cancel_service()` API above B)
|
||||||
|
case tractor.ContextCancelled():
|
||||||
|
assert cancelled_msg in suberr.args[0]
|
||||||
|
|
||||||
|
case KeyboardInterrupt():
|
||||||
|
pass
|
||||||
|
|
||||||
|
case _:
|
||||||
|
pytest.fail(f'Unexpected error {suberr}')
|
||||||
|
|
||||||
|
elif cancel_method == 'services':
|
||||||
|
|
||||||
|
# XXX NOTE: oddly, when you pass --pdb to pytest, i think since
|
||||||
|
# we also use that to enable the underlying tractor debug mode,
|
||||||
|
# it causes this to not raise for some reason? So if you see
|
||||||
|
# that while changing this test.. it's prolly that.
|
||||||
|
|
||||||
|
with pytest.raises(
|
||||||
|
tractor.ContextCancelled
|
||||||
|
) as exc_info:
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
assert cancelled_msg in exc_info.value.args[0]
|
||||||
|
|
||||||
|
else:
|
||||||
|
pytest.fail(f'Test is broken due to {cancel_method}')
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'cancel_method',
|
||||||
|
['services', 'sigint'],
|
||||||
|
)
|
||||||
def test_ensure_ems_in_paper_actors(
|
def test_ensure_ems_in_paper_actors(
|
||||||
open_test_pikerd: AsyncContextManager,
|
open_test_pikerd: AsyncContextManager,
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
|
|
||||||
|
cancel_method: str,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
actor_name: str = 'brokerd'
|
actor_name: str = 'brokerd'
|
||||||
|
@ -153,7 +218,7 @@ def test_ensure_ems_in_paper_actors(
|
||||||
# local ledger and `pps.toml` state ;)
|
# local ledger and `pps.toml` state ;)
|
||||||
assert not pps
|
assert not pps
|
||||||
assert not dialogs
|
assert not dialogs
|
||||||
|
# XXX: should be new client with no state from other tests
|
||||||
assert not client._sent_orders
|
assert not client._sent_orders
|
||||||
assert accounts
|
assert accounts
|
||||||
|
|
||||||
|
@ -171,16 +236,13 @@ def test_ensure_ems_in_paper_actors(
|
||||||
# implicitly by the ems.
|
# implicitly by the ems.
|
||||||
assert brokerd_name in services.service_tasks
|
assert brokerd_name in services.service_tasks
|
||||||
|
|
||||||
print('ALL SERVICES STARTED, terminating..')
|
print('ALL SERVICES STARTED, cancelling runtime with:\n'
|
||||||
|
f'-> {cancel_method}')
|
||||||
|
|
||||||
|
if cancel_method == 'services':
|
||||||
await services.cancel_service('emsd')
|
await services.cancel_service('emsd')
|
||||||
|
|
||||||
# ensure we receive a remote cancellation error caused by the
|
elif cancel_method == 'sigint':
|
||||||
# pikerd root actor since we used the `.cancel_service()` API
|
raise KeyboardInterrupt
|
||||||
# above B)
|
|
||||||
with pytest.raises(
|
|
||||||
tractor._exceptions.ContextCancelled,
|
|
||||||
) as exc_info:
|
|
||||||
trio.run(main)
|
|
||||||
|
|
||||||
cancelled_msg: str = "was remotely cancelled by remote actor (\'pikerd\'"
|
run_test_w_cancel_method(cancel_method, main)
|
||||||
assert cancelled_msg in exc_info.value.args[0]
|
|
||||||
|
|
Loading…
Reference in New Issue