Add service tree tests for data-feeds and the EMS
parent
7cfd431a2b
commit
fe4fb37b58
|
@ -3,10 +3,31 @@ Actor tree daemon sub-service verifications
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from typing import AsyncContextManager
|
from typing import AsyncContextManager
|
||||||
|
from contextlib import asynccontextmanager as acm
|
||||||
|
|
||||||
|
import pytest
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
|
|
||||||
|
from piker._daemon import (
|
||||||
|
find_service,
|
||||||
|
check_for_service,
|
||||||
|
Services,
|
||||||
|
)
|
||||||
|
from piker.data import (
|
||||||
|
open_feed,
|
||||||
|
)
|
||||||
|
from piker.clearing import (
|
||||||
|
open_ems,
|
||||||
|
)
|
||||||
|
from piker.clearing._messages import (
|
||||||
|
BrokerdPosition,
|
||||||
|
Status,
|
||||||
|
)
|
||||||
|
from piker.clearing._client import (
|
||||||
|
OrderBook,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_runtime_boot(
|
def test_runtime_boot(
|
||||||
open_test_pikerd: AsyncContextManager
|
open_test_pikerd: AsyncContextManager
|
||||||
|
@ -20,11 +41,12 @@ def test_runtime_boot(
|
||||||
async def main():
|
async def main():
|
||||||
port = 6666
|
port = 6666
|
||||||
daemon_addr = ('127.0.0.1', port)
|
daemon_addr = ('127.0.0.1', port)
|
||||||
|
services: Services
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
open_test_pikerd(
|
open_test_pikerd(
|
||||||
reg_addr=daemon_addr,
|
reg_addr=daemon_addr,
|
||||||
) as (_, _, pikerd_portal),
|
) as (_, _, pikerd_portal, services),
|
||||||
|
|
||||||
tractor.wait_for_actor(
|
tractor.wait_for_actor(
|
||||||
'pikerd',
|
'pikerd',
|
||||||
|
@ -35,3 +57,120 @@ def test_runtime_boot(
|
||||||
assert pikerd_portal.channel.raddr == portal.channel.raddr
|
assert pikerd_portal.channel.raddr == portal.channel.raddr
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
@acm
|
||||||
|
async def ensure_service(
|
||||||
|
name: str,
|
||||||
|
sockaddr: tuple[str, int] | None = None,
|
||||||
|
) -> None:
|
||||||
|
async with find_service(name) as portal:
|
||||||
|
remote_sockaddr = portal.channel.raddr
|
||||||
|
print(f'FOUND `{name}` @ {remote_sockaddr}')
|
||||||
|
|
||||||
|
if sockaddr:
|
||||||
|
assert remote_sockaddr == sockaddr
|
||||||
|
|
||||||
|
yield portal
|
||||||
|
|
||||||
|
|
||||||
|
def test_ensure_datafeed_actors(
|
||||||
|
open_test_pikerd: AsyncContextManager
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Verify that booting a data feed starts a `brokerd`
|
||||||
|
actor and a singleton global `samplerd` and opening
|
||||||
|
an order mode in paper opens the `paperboi` service.
|
||||||
|
|
||||||
|
'''
|
||||||
|
actor_name: str = 'brokerd'
|
||||||
|
backend: str = 'kraken'
|
||||||
|
brokerd_name: str = f'{actor_name}.{backend}'
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
async with (
|
||||||
|
open_test_pikerd(),
|
||||||
|
open_feed(
|
||||||
|
['xbtusdt.kraken'],
|
||||||
|
loglevel='info',
|
||||||
|
) as feed
|
||||||
|
):
|
||||||
|
# halt rt quote streams since we aren't testing them
|
||||||
|
await feed.pause()
|
||||||
|
|
||||||
|
async with (
|
||||||
|
ensure_service(brokerd_name),
|
||||||
|
ensure_service('samplerd'),
|
||||||
|
):
|
||||||
|
pass
|
||||||
|
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
def test_ensure_ems_in_paper_actors(
|
||||||
|
open_test_pikerd: AsyncContextManager
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
actor_name: str = 'brokerd'
|
||||||
|
backend: str = 'kraken'
|
||||||
|
brokerd_name: str = f'{actor_name}.{backend}'
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
|
||||||
|
# type declares
|
||||||
|
book: OrderBook
|
||||||
|
trades_stream: tractor.MsgStream
|
||||||
|
pps: dict[str, list[BrokerdPosition]]
|
||||||
|
accounts: list[str]
|
||||||
|
dialogs: dict[str, Status]
|
||||||
|
|
||||||
|
# ensure we timeout after is startup is too slow.
|
||||||
|
# TODO: something like this should be our start point for
|
||||||
|
# benchmarking end-to-end startup B)
|
||||||
|
with trio.fail_after(7):
|
||||||
|
async with (
|
||||||
|
open_test_pikerd() as (_, _, _, services),
|
||||||
|
|
||||||
|
open_ems(
|
||||||
|
'xbtusdt.kraken',
|
||||||
|
mode='paper',
|
||||||
|
) as (
|
||||||
|
book,
|
||||||
|
trades_stream,
|
||||||
|
pps,
|
||||||
|
accounts,
|
||||||
|
dialogs,
|
||||||
|
),
|
||||||
|
):
|
||||||
|
# there should be no on-going positions,
|
||||||
|
# TODO: though eventually we'll want to validate against
|
||||||
|
# local ledger and `pps.toml` state ;)
|
||||||
|
assert not pps
|
||||||
|
assert not dialogs
|
||||||
|
|
||||||
|
pikerd_subservices = ['emsd', 'samplerd']
|
||||||
|
|
||||||
|
async with (
|
||||||
|
ensure_service('emsd'),
|
||||||
|
ensure_service(brokerd_name),
|
||||||
|
ensure_service(f'paperboi.{backend}'),
|
||||||
|
):
|
||||||
|
for name in pikerd_subservices:
|
||||||
|
assert name in services.service_tasks
|
||||||
|
|
||||||
|
# brokerd.kraken actor should have been started
|
||||||
|
# implicitly by the ems.
|
||||||
|
assert brokerd_name in services.service_tasks
|
||||||
|
|
||||||
|
print('ALL SERVICES STARTED, terminating..')
|
||||||
|
await services.cancel_service('emsd')
|
||||||
|
|
||||||
|
with pytest.raises(
|
||||||
|
tractor._exceptions.ContextCancelled,
|
||||||
|
) as exc_info:
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
cancel_msg: str = '`_emsd_main()` was remotely cancelled by its caller'
|
||||||
|
assert cancel_msg in exc_info.value.args[0]
|
||||||
|
|
Loading…
Reference in New Issue