Use `match:` syntax in data feed subs processing
							parent
							
								
									4823f87422
								
							
						
					
					
						commit
						e757e1f277
					
				| 
						 | 
					@ -117,9 +117,8 @@ async def stream_messages(
 | 
				
			||||||
                too_slow_count = 0
 | 
					                too_slow_count = 0
 | 
				
			||||||
                continue
 | 
					                continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if isinstance(msg, dict):
 | 
					        match msg:
 | 
				
			||||||
            if msg.get('event') == 'heartbeat':
 | 
					            case {'event': 'heartbeat'}:
 | 
				
			||||||
 | 
					 | 
				
			||||||
                now = time.time()
 | 
					                now = time.time()
 | 
				
			||||||
                delay = now - last_hb
 | 
					                delay = now - last_hb
 | 
				
			||||||
                last_hb = now
 | 
					                last_hb = now
 | 
				
			||||||
| 
						 | 
					@ -130,11 +129,20 @@ async def stream_messages(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
                continue
 | 
					                continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            err = msg.get('errorMessage')
 | 
					            case {
 | 
				
			||||||
            if err:
 | 
					                'connectionID': _,
 | 
				
			||||||
                raise BrokerError(err)
 | 
					                'event': 'systemStatus',
 | 
				
			||||||
        else:
 | 
					                'status': 'online',
 | 
				
			||||||
            yield msg
 | 
					                'version': _,
 | 
				
			||||||
 | 
					            } as msg:
 | 
				
			||||||
 | 
					                log.info(
 | 
				
			||||||
 | 
					                    'WS connection is up:\n'
 | 
				
			||||||
 | 
					                    f'{msg}'
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					                continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            case _:
 | 
				
			||||||
 | 
					                yield msg
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
async def process_data_feed_msgs(
 | 
					async def process_data_feed_msgs(
 | 
				
			||||||
| 
						 | 
					@ -145,37 +153,60 @@ async def process_data_feed_msgs(
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    '''
 | 
					    '''
 | 
				
			||||||
    async for msg in stream_messages(ws):
 | 
					    async for msg in stream_messages(ws):
 | 
				
			||||||
 | 
					        match msg:
 | 
				
			||||||
 | 
					            case {
 | 
				
			||||||
 | 
					                'errorMessage': errmsg
 | 
				
			||||||
 | 
					            }:
 | 
				
			||||||
 | 
					                raise BrokerError(errmsg)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        chan_id, *payload_array, chan_name, pair = msg
 | 
					            case {
 | 
				
			||||||
 | 
					                'event': 'subscriptionStatus',
 | 
				
			||||||
 | 
					            } as sub:
 | 
				
			||||||
 | 
					                log.info(
 | 
				
			||||||
 | 
					                    'WS subscription is active:\n'
 | 
				
			||||||
 | 
					                    f'{sub}'
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					                continue
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        if 'ohlc' in chan_name:
 | 
					            case [
 | 
				
			||||||
 | 
					                chan_id,
 | 
				
			||||||
 | 
					                *payload_array,
 | 
				
			||||||
 | 
					                chan_name,
 | 
				
			||||||
 | 
					                pair
 | 
				
			||||||
 | 
					            ]:
 | 
				
			||||||
 | 
					                if 'ohlc' in chan_name:
 | 
				
			||||||
 | 
					                    yield 'ohlc', OHLC(
 | 
				
			||||||
 | 
					                        chan_id,
 | 
				
			||||||
 | 
					                        chan_name,
 | 
				
			||||||
 | 
					                        pair,
 | 
				
			||||||
 | 
					                        *payload_array[0]
 | 
				
			||||||
 | 
					                    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            yield 'ohlc', OHLC(chan_id, chan_name, pair, *payload_array[0])
 | 
					                elif 'spread' in chan_name:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        elif 'spread' in chan_name:
 | 
					                    bid, ask, ts, bsize, asize = map(
 | 
				
			||||||
 | 
					                        float, payload_array[0])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            bid, ask, ts, bsize, asize = map(float, payload_array[0])
 | 
					                    # TODO: really makes you think IB has a horrible API...
 | 
				
			||||||
 | 
					                    quote = {
 | 
				
			||||||
 | 
					                        'symbol': pair.replace('/', ''),
 | 
				
			||||||
 | 
					                        'ticks': [
 | 
				
			||||||
 | 
					                            {'type': 'bid', 'price': bid, 'size': bsize},
 | 
				
			||||||
 | 
					                            {'type': 'bsize', 'price': bid, 'size': bsize},
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # TODO: really makes you think IB has a horrible API...
 | 
					                            {'type': 'ask', 'price': ask, 'size': asize},
 | 
				
			||||||
            quote = {
 | 
					                            {'type': 'asize', 'price': ask, 'size': asize},
 | 
				
			||||||
                'symbol': pair.replace('/', ''),
 | 
					                        ],
 | 
				
			||||||
                'ticks': [
 | 
					                    }
 | 
				
			||||||
                    {'type': 'bid', 'price': bid, 'size': bsize},
 | 
					                    yield 'l1', quote
 | 
				
			||||||
                    {'type': 'bsize', 'price': bid, 'size': bsize},
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
                    {'type': 'ask', 'price': ask, 'size': asize},
 | 
					                # elif 'book' in msg[-2]:
 | 
				
			||||||
                    {'type': 'asize', 'price': ask, 'size': asize},
 | 
					                #     chan_id, *payload_array, chan_name, pair = msg
 | 
				
			||||||
                ],
 | 
					                #     print(msg)
 | 
				
			||||||
            }
 | 
					 | 
				
			||||||
            yield 'l1', quote
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # elif 'book' in msg[-2]:
 | 
					            case _:
 | 
				
			||||||
        #     chan_id, *payload_array, chan_name, pair = msg
 | 
					                print(f'UNHANDLED MSG: {msg}')
 | 
				
			||||||
        #     print(msg)
 | 
					                # yield msg
 | 
				
			||||||
 | 
					 | 
				
			||||||
        else:
 | 
					 | 
				
			||||||
            print(f'UNHANDLED MSG: {msg}')
 | 
					 | 
				
			||||||
            yield msg
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def normalize(
 | 
					def normalize(
 | 
				
			||||||
| 
						 | 
					@ -385,7 +416,7 @@ async def stream_quotes(
 | 
				
			||||||
            msg_gen = process_data_feed_msgs(ws)
 | 
					            msg_gen = process_data_feed_msgs(ws)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            # TODO: use ``anext()`` when it lands in 3.10!
 | 
					            # TODO: use ``anext()`` when it lands in 3.10!
 | 
				
			||||||
            typ, ohlc_last = await msg_gen.__anext__()
 | 
					            typ, ohlc_last = await anext(msg_gen)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            topic, quote = normalize(ohlc_last)
 | 
					            topic, quote = normalize(ohlc_last)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue