import asyncio import contextlib class Beacon: def __init__(self): self.watchers = set() @contextlib.contextmanager def msg_q(self): q = asyncio.Queue() self.watchers.add(q) try: yield q finally: self.watchers.remove(q) def flash(self, data): for w in self.watchers: w.put_nowait(data) async def watch(self, count=float('inf')): with self.msg_q() as q: i = 0 while i < count: yield await q.get() i += 1 async def next_flash(self): async for event in self.watch(1): return event async def watch_for(self, predicate, timeout=None): async def waiter(): async for event in self.watch(): if predicate(event): return event return await asyncio.wait_for(waiter(), timeout=timeout)