Open data feed in ems main entrypoint

cached_feeds
Tyler Goodlet 2021-03-29 08:35:58 -04:00
parent 14c5fc24ec
commit 7cc395b5bf
2 changed files with 83 additions and 76 deletions

View File

@ -181,6 +181,7 @@ async def maybe_open_emsd(
async with tractor.find_actor('pikerd') as portal: async with tractor.find_actor('pikerd') as portal:
assert portal assert portal
name = await portal.run( name = await portal.run(
spawn_emsd, spawn_emsd,
brokername=brokername, brokername=brokername,

View File

@ -40,7 +40,11 @@ log = get_logger(__name__)
# TODO: numba all of this # TODO: numba all of this
def mk_check(trigger_price, known_last) -> Callable[[float, float], bool]: def mk_check(
trigger_price: float,
known_last: float,
action: str,
) -> Callable[[float, float], bool]:
"""Create a predicate for given ``exec_price`` based on last known """Create a predicate for given ``exec_price`` based on last known
price, ``known_last``. price, ``known_last``.
@ -230,6 +234,7 @@ async def execute_triggers(
async def exec_loop( async def exec_loop(
ctx: tractor.Context, ctx: tractor.Context,
feed: 'Feed', # noqa
broker: str, broker: str,
symbol: str, symbol: str,
_exec_mode: str, _exec_mode: str,
@ -239,66 +244,61 @@ async def exec_loop(
to brokers. to brokers.
""" """
async with data.open_feed(
broker,
[symbol],
loglevel='info',
) as feed:
# TODO: get initial price quote from target broker # TODO: get initial price quote from target broker
first_quote = await feed.receive() first_quote = await feed.receive()
book = get_dark_book(broker) book = get_dark_book(broker)
book.lasts[(broker, symbol)] = first_quote[symbol]['last'] book.lasts[(broker, symbol)] = first_quote[symbol]['last']
# TODO: wrap this in a more re-usable general api # TODO: wrap this in a more re-usable general api
client_factory = getattr(feed.mod, 'get_client_proxy', None) client_factory = getattr(feed.mod, 'get_client_proxy', None)
if client_factory is not None and _exec_mode != 'paper': if client_factory is not None and _exec_mode != 'paper':
# we have an order API for this broker # we have an order API for this broker
client = client_factory(feed._brokerd_portal) client = client_factory(feed._brokerd_portal)
else: else:
# force paper mode # force paper mode
log.warning(f'Entering paper trading mode for {broker}') log.warning(f'Entering paper trading mode for {broker}')
client = PaperBoi( client = PaperBoi(
broker, broker,
*trio.open_memory_channel(100), *trio.open_memory_channel(100),
_buys={}, _buys={},
_sells={}, _sells={},
_reqids={}, _reqids={},
) )
# for paper mode we need to mock this trades response feed # for paper mode we need to mock this trades response feed
# so we pass a duck-typed feed-looking mem chan which is fed # so we pass a duck-typed feed-looking mem chan which is fed
# fill and submission events from the exec loop # fill and submission events from the exec loop
feed._trade_stream = client.trade_stream feed._trade_stream = client.trade_stream
# init the trades stream # init the trades stream
client._to_trade_stream.send_nowait({'local_trades': 'start'}) client._to_trade_stream.send_nowait({'local_trades': 'start'})
_exec_mode = 'paper' _exec_mode = 'paper'
# return control to parent task # return control to parent task
task_status.started((first_quote, feed, client)) task_status.started((first_quote, feed, client))
stream = feed.stream stream = feed.stream
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
n.start_soon( n.start_soon(
execute_triggers, execute_triggers,
broker, broker,
symbol, symbol,
stream, stream,
ctx, ctx,
client, client,
book book
) )
if _exec_mode == 'paper': if _exec_mode == 'paper':
n.start_soon(simulate_fills, stream.clone(), client) n.start_soon(simulate_fills, stream.clone(), client)
# TODO: lots of cases still to handle # TODO: lots of cases still to handle
@ -556,7 +556,7 @@ async def process_order_cmds(
# price received from the feed, instead of being # price received from the feed, instead of being
# like every other shitty tina platform that makes # like every other shitty tina platform that makes
# the user choose the predicate operator. # the user choose the predicate operator.
pred = mk_check(trigger_price, last) pred = mk_check(trigger_price, last, action)
tick_slap: float = 5 tick_slap: float = 5
min_tick = feed.symbols[sym].tick_size min_tick = feed.symbols[sym].tick_size
@ -646,35 +646,41 @@ async def _emsd_main(
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
# TODO: eventually support N-brokers # TODO: eventually support N-brokers
async with data.open_feed(
# start the condition scan loop
quote, feed, client = await n.start(
exec_loop,
ctx,
broker, broker,
symbol, [symbol],
_mode, loglevel='info',
) ) as feed:
await n.start( # start the condition scan loop
process_broker_trades, quote, feed, client = await n.start(
ctx, exec_loop,
feed, ctx,
dark_book, feed,
) broker,
symbol,
_mode,
)
# connect back to the calling actor (the one that is await n.start(
# acting as an EMS client and will submit orders) to process_broker_trades,
# receive requests pushed over a tractor stream ctx,
# using (for now) an async generator. feed,
order_stream = await portal.run(send_order_cmds) dark_book,
)
# start inbound order request processing # connect back to the calling actor (the one that is
await process_order_cmds( # acting as an EMS client and will submit orders) to
ctx, # receive requests pushed over a tractor stream
order_stream, # using (for now) an async generator.
symbol, order_stream = await portal.run(send_order_cmds)
feed,
client, # start inbound order request processing
dark_book, await process_order_cmds(
) ctx,
order_stream,
symbol,
feed,
client,
dark_book,
)