`.clearing`: broad rename of `fqsn` -> `fqme`

rekt_pps
Tyler Goodlet 2023-03-21 16:59:45 -04:00
parent d4a5a3057c
commit 1d08ee6d01
2 changed files with 51 additions and 50 deletions

View File

@ -27,7 +27,7 @@ import trio
import tractor import tractor
from tractor.trionics import broadcast_receiver from tractor.trionics import broadcast_receiver
from ..accounting._mktinfo import unpack_fqsn from ..accounting._mktinfo import unpack_fqme
from ..log import get_logger from ..log import get_logger
from ..data.types import Struct from ..data.types import Struct
from ..service import maybe_open_emsd from ..service import maybe_open_emsd
@ -177,7 +177,7 @@ async def relay_order_cmds_from_sync_code(
@acm @acm
async def open_ems( async def open_ems(
fqsn: str, fqme: str,
mode: str = 'live', mode: str = 'live',
loglevel: str = 'error', loglevel: str = 'error',
@ -229,7 +229,7 @@ async def open_ems(
# ready for order commands # ready for order commands
book = get_orders() book = get_orders()
broker, symbol, suffix = unpack_fqsn(fqsn) broker, symbol, suffix = unpack_fqme(fqme)
async with maybe_open_emsd(broker) as portal: async with maybe_open_emsd(broker) as portal:
@ -246,7 +246,7 @@ async def open_ems(
portal.open_context( portal.open_context(
_emsd_main, _emsd_main,
fqsn=fqsn, fqme=fqme,
exec_mode=mode, exec_mode=mode,
loglevel=loglevel, loglevel=loglevel,
@ -266,7 +266,7 @@ async def open_ems(
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
n.start_soon( n.start_soon(
relay_order_cmds_from_sync_code, relay_order_cmds_from_sync_code,
fqsn, fqme,
trades_stream trades_stream
) )

View File

@ -44,7 +44,7 @@ import tractor
from ..log import get_logger from ..log import get_logger
from ..data._normalize import iterticks from ..data._normalize import iterticks
from ..accounting._mktinfo import ( from ..accounting._mktinfo import (
unpack_fqsn, unpack_fqme,
float_digits, float_digits,
) )
from ..data.feed import ( from ..data.feed import (
@ -156,7 +156,7 @@ async def clear_dark_triggers(
brokerd_orders_stream: tractor.MsgStream, brokerd_orders_stream: tractor.MsgStream,
quote_stream: tractor.ReceiveMsgStream, # noqa quote_stream: tractor.ReceiveMsgStream, # noqa
broker: str, broker: str,
fqsn: str, fqme: str,
book: DarkBook, book: DarkBook,
@ -231,7 +231,7 @@ async def clear_dark_triggers(
account=account, account=account,
size=size, size=size,
): ):
bfqsn: str = symbol.replace(f'.{broker}', '') bfqme: str = symbol.replace(f'.{broker}', '')
submit_price = price + abs_diff_away submit_price = price + abs_diff_away
resp = 'triggered' # hidden on client-side resp = 'triggered' # hidden on client-side
@ -244,7 +244,7 @@ async def clear_dark_triggers(
oid=oid, oid=oid,
account=account, account=account,
time_ns=time.time_ns(), time_ns=time.time_ns(),
symbol=bfqsn, symbol=bfqme,
price=submit_price, price=submit_price,
size=size, size=size,
) )
@ -287,14 +287,14 @@ async def clear_dark_triggers(
# send response to client-side # send response to client-side
await router.client_broadcast( await router.client_broadcast(
fqsn, fqme,
status, status,
) )
else: # condition scan loop complete else: # condition scan loop complete
log.debug(f'execs are {execs}') log.debug(f'execs are {execs}')
if execs: if execs:
book.triggers[fqsn] = execs book.triggers[fqme] = execs
# print(f'execs scan took: {time.time() - start}') # print(f'execs scan took: {time.time() - start}')
@ -335,7 +335,7 @@ class Router(Struct):
# sets of clients mapped from subscription keys # sets of clients mapped from subscription keys
subscribers: defaultdict[ subscribers: defaultdict[
str, # sub key, default fqsn str, # sub key, default fqme
set[tractor.MsgStream], # unique client streams set[tractor.MsgStream], # unique client streams
] = defaultdict(set) ] = defaultdict(set)
@ -424,7 +424,7 @@ class Router(Struct):
# actor to simulate the real IPC load it'll have when also # actor to simulate the real IPC load it'll have when also
# pulling data from feeds # pulling data from feeds
open_trades_endpoint = paper.open_paperboi( open_trades_endpoint = paper.open_paperboi(
fqsn='.'.join([symbol, broker]), fqme='.'.join([symbol, broker]),
loglevel=loglevel, loglevel=loglevel,
) )
@ -506,7 +506,7 @@ class Router(Struct):
async def open_trade_relays( async def open_trade_relays(
self, self,
fqsn: str, fqme: str,
exec_mode: str, exec_mode: str,
loglevel: str, loglevel: str,
@ -520,24 +520,24 @@ class Router(Struct):
none already exists. none already exists.
''' '''
broker, symbol, suffix = unpack_fqsn(fqsn) broker, symbol, suffix = unpack_fqme(fqme)
async with ( async with (
maybe_open_feed( maybe_open_feed(
[fqsn], [fqme],
loglevel=loglevel, loglevel=loglevel,
) as feed, ) as feed,
): ):
brokername, _, _ = unpack_fqsn(fqsn) brokername, _, _ = unpack_fqme(fqme)
brokermod = feed.mods[brokername] brokermod = feed.mods[brokername]
broker = brokermod.name broker = brokermod.name
portal = feed.portals[brokermod] portal = feed.portals[brokermod]
# XXX: this should be initial price quote from target provider # XXX: this should be initial price quote from target provider
flume = feed.flumes[fqsn] flume = feed.flumes[fqme]
first_quote: dict = flume.first_quote first_quote: dict = flume.first_quote
book: DarkBook = self.get_dark_book(broker) book: DarkBook = self.get_dark_book(broker)
book.lasts[fqsn]: float = first_quote['last'] book.lasts[fqme]: float = first_quote['last']
async with self.maybe_open_brokerd_dialog( async with self.maybe_open_brokerd_dialog(
brokermod=brokermod, brokermod=brokermod,
@ -556,7 +556,7 @@ class Router(Struct):
relay.brokerd_stream, relay.brokerd_stream,
flume.stream, flume.stream,
broker, broker,
fqsn, # form: <name>.<venue>.<suffix>.<broker> fqme, # form: <name>.<venue>.<suffix>.<broker>
book book
) )
@ -945,7 +945,7 @@ async def translate_and_relay_brokerd_events(
# may end up with collisions? # may end up with collisions?
status_msg = Status(**brokerd_msg) status_msg = Status(**brokerd_msg)
# NOTE: be sure to pack an fqsn for the client side! # NOTE: be sure to pack an fqme for the client side!
order = Order(**status_msg.req) order = Order(**status_msg.req)
order.symbol = f'{order.symbol}.{broker}' order.symbol = f'{order.symbol}.{broker}'
@ -1022,7 +1022,7 @@ async def process_client_order_cmds(
client_order_stream: tractor.MsgStream, client_order_stream: tractor.MsgStream,
brokerd_order_stream: tractor.MsgStream, brokerd_order_stream: tractor.MsgStream,
fqsn: str, fqme: str,
flume: Flume, flume: Flume,
dark_book: DarkBook, dark_book: DarkBook,
router: Router, router: Router,
@ -1049,11 +1049,11 @@ async def process_client_order_cmds(
# backend can be routed and relayed to subscribed clients. # backend can be routed and relayed to subscribed clients.
subs = router.dialogs[oid] subs = router.dialogs[oid]
# add all subscribed clients for this fqsn (should eventually be # add all subscribed clients for this fqme (should eventually be
# a more generalize subscription system) to received order msg # a more generalize subscription system) to received order msg
# updates (and thus show stuff in the UI). # updates (and thus show stuff in the UI).
subs.add(client_order_stream) subs.add(client_order_stream)
subs.update(router.subscribers[fqsn]) subs.update(router.subscribers[fqme])
reqid = dark_book._ems2brokerd_ids.inverse.get(oid) reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
@ -1111,7 +1111,7 @@ async def process_client_order_cmds(
and status.resp == 'dark_open' and status.resp == 'dark_open'
): ):
# remove from dark book clearing # remove from dark book clearing
entry = dark_book.triggers[fqsn].pop(oid, None) entry = dark_book.triggers[fqme].pop(oid, None)
if entry: if entry:
( (
pred, pred,
@ -1127,7 +1127,7 @@ async def process_client_order_cmds(
status.req = cmd status.req = cmd
await router.client_broadcast( await router.client_broadcast(
fqsn, fqme,
status, status,
) )
@ -1137,7 +1137,7 @@ async def process_client_order_cmds(
dark_book._active.pop(oid) dark_book._active.pop(oid)
else: else:
log.exception(f'No dark order for {fqsn}?') log.exception(f'No dark order for {fqme}?')
# TODO: eventually we should be receiving # TODO: eventually we should be receiving
# this struct on the wire unpacked in a scoped protocol # this struct on the wire unpacked in a scoped protocol
@ -1146,7 +1146,7 @@ async def process_client_order_cmds(
# LIVE order REQUEST # LIVE order REQUEST
case { case {
'oid': oid, 'oid': oid,
'symbol': fqsn, 'symbol': fqme,
'price': trigger_price, 'price': trigger_price,
'size': size, 'size': size,
'action': ('buy' | 'sell') as action, 'action': ('buy' | 'sell') as action,
@ -1159,7 +1159,7 @@ async def process_client_order_cmds(
# remove the broker part before creating a message # remove the broker part before creating a message
# to send to the specific broker since they probably # to send to the specific broker since they probably
# aren't expectig their own name, but should they? # aren't expectig their own name, but should they?
sym = fqsn.replace(f'.{broker}', '') sym = fqme.replace(f'.{broker}', '')
if status is not None: if status is not None:
# if we already had a broker order id then # if we already had a broker order id then
@ -1216,7 +1216,7 @@ async def process_client_order_cmds(
# DARK-order / alert REQUEST # DARK-order / alert REQUEST
case { case {
'oid': oid, 'oid': oid,
'symbol': fqsn, 'symbol': fqme,
'price': trigger_price, 'price': trigger_price,
'size': size, 'size': size,
'exec_mode': exec_mode, 'exec_mode': exec_mode,
@ -1238,7 +1238,7 @@ async def process_client_order_cmds(
# price received from the feed, instead of being # price received from the feed, instead of being
# like every other shitty tina platform that makes # like every other shitty tina platform that makes
# the user choose the predicate operator. # the user choose the predicate operator.
last = dark_book.lasts[fqsn] last = dark_book.lasts[fqme]
# sometimes the real-time feed hasn't come up # sometimes the real-time feed hasn't come up
# so just pull from the latest history. # so just pull from the latest history.
@ -1280,7 +1280,7 @@ async def process_client_order_cmds(
# NOTE: this may result in an override of an existing # NOTE: this may result in an override of an existing
# dark book entry if the order id already exists # dark book entry if the order id already exists
dark_book.triggers.setdefault( dark_book.triggers.setdefault(
fqsn, {} fqme, {}
)[oid] = ( )[oid] = (
pred, pred,
tickfilter, tickfilter,
@ -1305,7 +1305,7 @@ async def process_client_order_cmds(
# broadcast status to all subscribed clients # broadcast status to all subscribed clients
await router.client_broadcast( await router.client_broadcast(
fqsn, fqme,
status, status,
) )
@ -1316,35 +1316,36 @@ async def process_client_order_cmds(
@acm @acm
async def maybe_open_trade_relays( async def maybe_open_trade_relays(
router: Router, router: Router,
fqsn: str, fqme: str,
exec_mode: str, # ('paper', 'live') exec_mode: str, # ('paper', 'live')
loglevel: str = 'info', loglevel: str = 'info',
) -> tuple: ) -> tuple:
def cache_on_fqsn_unless_paper( def cache_on_fqme_unless_paper(
router: Router, router: Router,
fqsn: str, fqme: str,
exec_mode: str, # ('paper', 'live') exec_mode: str, # ('paper', 'live')
loglevel: str = 'info', loglevel: str = 'info',
) -> Hashable: ) -> Hashable:
if exec_mode == 'paper': if exec_mode == 'paper':
return f'paper_{fqsn}' return f'paper_{fqme}'
else: else:
return fqsn return fqme
# XXX: closure to enable below use of # XXX: closure to enable below use of
# ``tractor.trionics.maybe_open_context()`` # ``tractor.trionics.maybe_open_context()``
@acm @acm
async def cached_mngr( async def cached_mngr(
router: Router, router: Router,
fqsn: str, fqme: str,
exec_mode: str, # ('paper', 'live') exec_mode: str, # ('paper', 'live')
loglevel: str = 'info', loglevel: str = 'info',
): ):
relay, feed, client_ready = await _router.nursery.start( relay, feed, client_ready = await _router.nursery.start(
_router.open_trade_relays, _router.open_trade_relays,
fqsn, fqme,
exec_mode, exec_mode,
loglevel, loglevel,
) )
@ -1354,11 +1355,11 @@ async def maybe_open_trade_relays(
acm_func=cached_mngr, acm_func=cached_mngr,
kwargs={ kwargs={
'router': _router, 'router': _router,
'fqsn': fqsn, 'fqme': fqme,
'exec_mode': exec_mode, 'exec_mode': exec_mode,
'loglevel': loglevel, 'loglevel': loglevel,
}, },
key=cache_on_fqsn_unless_paper, key=cache_on_fqme_unless_paper,
) as ( ) as (
cache_hit, cache_hit,
(relay, feed, client_ready) (relay, feed, client_ready)
@ -1369,7 +1370,7 @@ async def maybe_open_trade_relays(
@tractor.context @tractor.context
async def _emsd_main( async def _emsd_main(
ctx: tractor.Context, ctx: tractor.Context,
fqsn: str, fqme: str,
exec_mode: str, # ('paper', 'live') exec_mode: str, # ('paper', 'live')
loglevel: str = 'info', loglevel: str = 'info',
@ -1426,7 +1427,7 @@ async def _emsd_main(
global _router global _router
assert _router assert _router
broker, symbol, suffix = unpack_fqsn(fqsn) broker, symbol, suffix = unpack_fqme(fqme)
# TODO: would be nice if in tractor we can require either a ctx arg, # TODO: would be nice if in tractor we can require either a ctx arg,
# or a named arg with ctx in it and a type annotation of # or a named arg with ctx in it and a type annotation of
@ -1443,7 +1444,7 @@ async def _emsd_main(
# few duplicate streams as necessary per ems actor. # few duplicate streams as necessary per ems actor.
async with maybe_open_trade_relays( async with maybe_open_trade_relays(
_router, _router,
fqsn, fqme,
exec_mode, exec_mode,
loglevel, loglevel,
) as (relay, feed, client_ready): ) as (relay, feed, client_ready):
@ -1466,28 +1467,28 @@ async def _emsd_main(
# register the client side before starting the # register the client side before starting the
# brokerd-side relay task to ensure the client is # brokerd-side relay task to ensure the client is
# delivered all exisiting open orders on startup. # delivered all exisiting open orders on startup.
# TODO: instead of by fqsn we need a subscription # TODO: instead of by fqme we need a subscription
# system/schema here to limit what each new client is # system/schema here to limit what each new client is
# allowed to see in terms of broadcasted order flow # allowed to see in terms of broadcasted order flow
# updates per dialog. # updates per dialog.
_router.subscribers[fqsn].add(client_stream) _router.subscribers[fqme].add(client_stream)
client_ready.set() client_ready.set()
# start inbound (from attached client) order request processing # start inbound (from attached client) order request processing
# main entrypoint, run here until cancelled. # main entrypoint, run here until cancelled.
try: try:
flume = feed.flumes[fqsn] flume = feed.flumes[fqme]
await process_client_order_cmds( await process_client_order_cmds(
client_stream, client_stream,
brokerd_stream, brokerd_stream,
fqsn, fqme,
flume, flume,
dark_book, dark_book,
_router, _router,
) )
finally: finally:
# try to remove client from subscription registry # try to remove client from subscription registry
_router.subscribers[fqsn].remove(client_stream) _router.subscribers[fqme].remove(client_stream)
for oid, client_streams in _router.dialogs.items(): for oid, client_streams in _router.dialogs.items():
client_streams.discard(client_stream) client_streams.discard(client_stream)