Drop the `Router.clients: set`, `.subscribers` is enough
parent
525f805cdb
commit
2fbfe583dd
|
@ -248,8 +248,6 @@ async def clear_dark_triggers(
|
||||||
|
|
||||||
await brokerd_orders_stream.send(brokerd_msg)
|
await brokerd_orders_stream.send(brokerd_msg)
|
||||||
|
|
||||||
# book._msgflows[oid].maps.insert(0, live_req)
|
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
raise ValueError(f'Invalid dark book entry: {cmd}')
|
raise ValueError(f'Invalid dark book entry: {cmd}')
|
||||||
|
|
||||||
|
@ -333,9 +331,6 @@ class Router(Struct):
|
||||||
# broker to book map
|
# broker to book map
|
||||||
books: dict[str, _DarkBook] = {}
|
books: dict[str, _DarkBook] = {}
|
||||||
|
|
||||||
# order id to client stream map
|
|
||||||
clients: set[tractor.MsgStream] = set()
|
|
||||||
|
|
||||||
# sets of clients mapped from subscription keys
|
# sets of clients mapped from subscription keys
|
||||||
subscribers: defaultdict[
|
subscribers: defaultdict[
|
||||||
str, # sub key, default fqsn
|
str, # sub key, default fqsn
|
||||||
|
@ -577,7 +572,6 @@ class Router(Struct):
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
):
|
):
|
||||||
to_remove.add(client_stream)
|
to_remove.add(client_stream)
|
||||||
self.clients.remove(client_stream)
|
|
||||||
log.warning(
|
log.warning(
|
||||||
f'client for {client_stream} was already closed?')
|
f'client for {client_stream} was already closed?')
|
||||||
|
|
||||||
|
@ -1324,8 +1318,6 @@ async def _emsd_main(
|
||||||
# register the client side before starting the
|
# register the client side before starting the
|
||||||
# brokerd-side relay task to ensure the client is
|
# brokerd-side relay task to ensure the client is
|
||||||
# delivered all exisiting open orders on startup.
|
# delivered all exisiting open orders on startup.
|
||||||
_router.clients.add(client_stream)
|
|
||||||
|
|
||||||
# TODO: instead of by fqsn we need a subscription
|
# TODO: instead of by fqsn we need a subscription
|
||||||
# system/schema here to limit what each new client is
|
# system/schema here to limit what each new client is
|
||||||
# allowed to see in terms of broadcasted order flow
|
# allowed to see in terms of broadcasted order flow
|
||||||
|
@ -1345,20 +1337,11 @@ async def _emsd_main(
|
||||||
)
|
)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# try to remove client from "registry"
|
# try to remove client from subscription registry
|
||||||
try:
|
|
||||||
_router.clients.remove(client_stream)
|
|
||||||
except KeyError:
|
|
||||||
log.warning(
|
|
||||||
f'Stream {client_stream._ctx.chan.uid}'
|
|
||||||
' was already dropped?'
|
|
||||||
)
|
|
||||||
|
|
||||||
_router.subscribers[fqsn].remove(client_stream)
|
_router.subscribers[fqsn].remove(client_stream)
|
||||||
dialogs = _router.dialogs
|
|
||||||
for oid, client_streams in dialogs.items():
|
for oid, client_streams in _router.dialogs.items():
|
||||||
if client_stream in client_streams:
|
client_streams.discard(client_stream)
|
||||||
client_streams.remove(client_stream)
|
|
||||||
|
|
||||||
# TODO: for order dialogs left "alive" in
|
# TODO: for order dialogs left "alive" in
|
||||||
# the ems this is where we should allow some
|
# the ems this is where we should allow some
|
||||||
|
@ -1368,6 +1351,6 @@ async def _emsd_main(
|
||||||
# from client, run some algo, etc.)
|
# from client, run some algo, etc.)
|
||||||
if not client_streams:
|
if not client_streams:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Order dialog is being unmonitored:\n'
|
f'Order dialog is not being monitored:\n'
|
||||||
f'{oid} ->\n{client_stream._ctx.chan.uid}'
|
f'{oid} ->\n{client_stream._ctx.chan.uid}'
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue