Use `aclosing()` around ws async gen
							parent
							
								
									2e3cac1407
								
							
						
					
					
						commit
						8984c1b60b
					
				| 
						 | 
					@ -32,6 +32,7 @@ from typing import (
 | 
				
			||||||
    Union,
 | 
					    Union,
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from async_generator import aclosing
 | 
				
			||||||
from bidict import bidict
 | 
					from bidict import bidict
 | 
				
			||||||
import pendulum
 | 
					import pendulum
 | 
				
			||||||
import trio
 | 
					import trio
 | 
				
			||||||
| 
						 | 
					@ -316,6 +317,7 @@ async def trades_dialogue(
 | 
				
			||||||
                ),
 | 
					                ),
 | 
				
			||||||
            ) as ws,
 | 
					            ) as ws,
 | 
				
			||||||
            trio.open_nursery() as n,
 | 
					            trio.open_nursery() as n,
 | 
				
			||||||
 | 
					            aclosing(stream_messages(ws)) as stream,
 | 
				
			||||||
        ):
 | 
					        ):
 | 
				
			||||||
            # task local msg dialog tracking
 | 
					            # task local msg dialog tracking
 | 
				
			||||||
            emsflow: dict[
 | 
					            emsflow: dict[
 | 
				
			||||||
| 
						 | 
					@ -339,7 +341,7 @@ async def trades_dialogue(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # enter relay loop
 | 
					            # enter relay loop
 | 
				
			||||||
            await handle_order_updates(
 | 
					            await handle_order_updates(
 | 
				
			||||||
                ws,
 | 
					                stream,
 | 
				
			||||||
                ems_stream,
 | 
					                ems_stream,
 | 
				
			||||||
                emsflow,
 | 
					                emsflow,
 | 
				
			||||||
                ids,
 | 
					                ids,
 | 
				
			||||||
| 
						 | 
					@ -351,7 +353,7 @@ async def trades_dialogue(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def handle_order_updates(
 | 
					async def handle_order_updates(
 | 
				
			||||||
    ws: NoBsWs,
 | 
					    ws_stream: NoBsWs,
 | 
				
			||||||
    ems_stream: tractor.MsgStream,
 | 
					    ems_stream: tractor.MsgStream,
 | 
				
			||||||
    emsflow: dict[str, list[MsgUnion]],
 | 
					    emsflow: dict[str, list[MsgUnion]],
 | 
				
			||||||
    ids: bidict[str, int],
 | 
					    ids: bidict[str, int],
 | 
				
			||||||
| 
						 | 
					@ -372,7 +374,7 @@ async def handle_order_updates(
 | 
				
			||||||
    # on new trade clearing events (aka order "fills")
 | 
					    # on new trade clearing events (aka order "fills")
 | 
				
			||||||
    trans: list[pp.Transaction]
 | 
					    trans: list[pp.Transaction]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async for msg in stream_messages(ws):
 | 
					    async for msg in ws_stream:
 | 
				
			||||||
        match msg:
 | 
					        match msg:
 | 
				
			||||||
            # process and relay clearing trade events to ems
 | 
					            # process and relay clearing trade events to ems
 | 
				
			||||||
            # https://docs.kraken.com/websockets/#message-ownTrades
 | 
					            # https://docs.kraken.com/websockets/#message-ownTrades
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue