From 7cc395b5bf6c7b915f28c1d83f6781e829f3c7e5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 29 Mar 2021 08:35:58 -0400 Subject: [PATCH] Open data feed in ems main entrypoint --- piker/clearing/_client.py | 1 + piker/clearing/_ems.py | 158 ++++++++++++++++++++------------------ 2 files changed, 83 insertions(+), 76 deletions(-) diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 6accc0b8..121cd080 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -181,6 +181,7 @@ async def maybe_open_emsd( async with tractor.find_actor('pikerd') as portal: assert portal + name = await portal.run( spawn_emsd, brokername=brokername, diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 9e7b9045..ed9a2102 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -40,7 +40,11 @@ log = get_logger(__name__) # TODO: numba all of this -def mk_check(trigger_price, known_last) -> Callable[[float, float], bool]: +def mk_check( + trigger_price: float, + known_last: float, + action: str, +) -> Callable[[float, float], bool]: """Create a predicate for given ``exec_price`` based on last known price, ``known_last``. @@ -230,6 +234,7 @@ async def execute_triggers( async def exec_loop( ctx: tractor.Context, + feed: 'Feed', # noqa broker: str, symbol: str, _exec_mode: str, @@ -239,66 +244,61 @@ async def exec_loop( to brokers. """ - async with data.open_feed( - broker, - [symbol], - loglevel='info', - ) as feed: - # TODO: get initial price quote from target broker - first_quote = await feed.receive() + # TODO: get initial price quote from target broker + first_quote = await feed.receive() - book = get_dark_book(broker) - book.lasts[(broker, symbol)] = first_quote[symbol]['last'] + book = get_dark_book(broker) + book.lasts[(broker, symbol)] = first_quote[symbol]['last'] - # TODO: wrap this in a more re-usable general api - client_factory = getattr(feed.mod, 'get_client_proxy', None) + # TODO: wrap this in a more re-usable general api + client_factory = getattr(feed.mod, 'get_client_proxy', None) - if client_factory is not None and _exec_mode != 'paper': + if client_factory is not None and _exec_mode != 'paper': - # we have an order API for this broker - client = client_factory(feed._brokerd_portal) + # we have an order API for this broker + client = client_factory(feed._brokerd_portal) - else: - # force paper mode - log.warning(f'Entering paper trading mode for {broker}') + else: + # force paper mode + log.warning(f'Entering paper trading mode for {broker}') - client = PaperBoi( - broker, - *trio.open_memory_channel(100), - _buys={}, - _sells={}, + client = PaperBoi( + broker, + *trio.open_memory_channel(100), + _buys={}, + _sells={}, - _reqids={}, - ) + _reqids={}, + ) - # for paper mode we need to mock this trades response feed - # so we pass a duck-typed feed-looking mem chan which is fed - # fill and submission events from the exec loop - feed._trade_stream = client.trade_stream + # for paper mode we need to mock this trades response feed + # so we pass a duck-typed feed-looking mem chan which is fed + # fill and submission events from the exec loop + feed._trade_stream = client.trade_stream - # init the trades stream - client._to_trade_stream.send_nowait({'local_trades': 'start'}) + # init the trades stream + client._to_trade_stream.send_nowait({'local_trades': 'start'}) - _exec_mode = 'paper' + _exec_mode = 'paper' - # return control to parent task - task_status.started((first_quote, feed, client)) + # return control to parent task + task_status.started((first_quote, feed, client)) - stream = feed.stream - async with trio.open_nursery() as n: - n.start_soon( - execute_triggers, - broker, - symbol, - stream, - ctx, - client, - book - ) + stream = feed.stream + async with trio.open_nursery() as n: + n.start_soon( + execute_triggers, + broker, + symbol, + stream, + ctx, + client, + book + ) - if _exec_mode == 'paper': - n.start_soon(simulate_fills, stream.clone(), client) + if _exec_mode == 'paper': + n.start_soon(simulate_fills, stream.clone(), client) # TODO: lots of cases still to handle @@ -556,7 +556,7 @@ async def process_order_cmds( # price received from the feed, instead of being # like every other shitty tina platform that makes # the user choose the predicate operator. - pred = mk_check(trigger_price, last) + pred = mk_check(trigger_price, last, action) tick_slap: float = 5 min_tick = feed.symbols[sym].tick_size @@ -646,35 +646,41 @@ async def _emsd_main( async with trio.open_nursery() as n: # TODO: eventually support N-brokers - - # start the condition scan loop - quote, feed, client = await n.start( - exec_loop, - ctx, + async with data.open_feed( broker, - symbol, - _mode, - ) + [symbol], + loglevel='info', + ) as feed: - await n.start( - process_broker_trades, - ctx, - feed, - dark_book, - ) + # start the condition scan loop + quote, feed, client = await n.start( + exec_loop, + ctx, + feed, + broker, + symbol, + _mode, + ) - # connect back to the calling actor (the one that is - # acting as an EMS client and will submit orders) to - # receive requests pushed over a tractor stream - # using (for now) an async generator. - order_stream = await portal.run(send_order_cmds) + await n.start( + process_broker_trades, + ctx, + feed, + dark_book, + ) - # start inbound order request processing - await process_order_cmds( - ctx, - order_stream, - symbol, - feed, - client, - dark_book, - ) + # connect back to the calling actor (the one that is + # acting as an EMS client and will submit orders) to + # receive requests pushed over a tractor stream + # using (for now) an async generator. + order_stream = await portal.run(send_order_cmds) + + # start inbound order request processing + await process_order_cmds( + ctx, + order_stream, + symbol, + feed, + client, + dark_book, + )