forked from goodboy/tractor
				
			Add a @pub kwarg to allow specifying a "startup response message"
							parent
							
								
									8df10c2257
								
							
						
					
					
						commit
						230c5a87f8
					
				|  | @ -121,6 +121,7 @@ def pub( | |||
|     wrapped: typing.Callable = None, | ||||
|     *, | ||||
|     tasks: Set[str] = set(), | ||||
|     send_on_connect: Any = None, | ||||
| ): | ||||
|     """Publisher async generator decorator. | ||||
| 
 | ||||
|  | @ -206,7 +207,7 @@ def pub( | |||
| 
 | ||||
|     # handle the decorator not called with () case | ||||
|     if wrapped is None: | ||||
|         return partial(pub, tasks=tasks) | ||||
|         return partial(pub, tasks=tasks, send_on_connect=send_on_connect) | ||||
| 
 | ||||
|     task2lock: Dict[str, trio.StrictFIFOLock] = {} | ||||
| 
 | ||||
|  | @ -249,6 +250,11 @@ def pub( | |||
| 
 | ||||
|             try: | ||||
|                 modify_subs(topics2ctxs, topics, ctx) | ||||
| 
 | ||||
|                 # if specified send the startup message back to consumer | ||||
|                 if send_on_connect is not None: | ||||
|                     await ctx.send_yield(send_on_connect) | ||||
| 
 | ||||
|                 # block and let existing feed task deliver | ||||
|                 # stream data until it is cancelled in which case | ||||
|                 # the next waiting task will take over and spawn it again | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue