|
| 1 | +""" |
| 2 | +This program simulates a restaurant order system using asynchronous caching and topic-based notifications. |
| 3 | +It includes a custom `CacheWithPublishClientAsync` class that wraps the standard cache client to automatically |
| 4 | +publish updates to a topic whenever an order status is set. |
| 5 | +
|
| 6 | +Actors: |
| 7 | +- Kitchen: The kitchen updates the status of orders (e.g., "Preparing", "Ready to Serve") and stores the current order status in a cache. |
| 8 | + After updating the cache, it publishes the order status to a topic to notify subscribers (e.g., waiters). |
| 9 | +- Waiter: The waiter subscribes to order updates via a topic. When the kitchen updates the order status, the waiter is notified in real-time through the published message. |
| 10 | + The waiter then processes the update and notifies the customer accordingly. |
| 11 | +
|
| 12 | +Flow: |
| 13 | +1. The kitchen updates the order status and stores it in the cache using the `set_and_publish` method of `CacheWithPublishClientAsync`. |
| 14 | +2. After storing the order status, `CacheWithPublishClientAsync` automatically publishes the update to a topic, notifying all subscribers. |
| 15 | +3. The waiter subscribes to this topic and listens for updates. When a new status is published, the waiter receives the notification and informs the customer. |
| 16 | +4. The kitchen can update the status multiple times, and the waiter will receive each update in real-time. |
| 17 | +
|
| 18 | +Key Components: |
| 19 | +- CacheWithPublishClientAsync: A wrapper around the cache client that automatically publishes to a topic when `set_and_publish` is called. |
| 20 | +- CacheSetAndPublishResponse: A response class with two subtypes (`Success` and `Error`) using Python's `@dataclass` decorator to handle cache set and publish operations. |
| 21 | +- Cache: Stores the latest state of each order (e.g., order number and status) for quick retrieval. |
| 22 | +- Topic: Publishes notifications to inform subscribers about updates to the order's status. |
| 23 | +""" |
| 24 | + |
| 25 | +import asyncio |
| 26 | +import logging |
| 27 | +from abc import ABC |
| 28 | +from datetime import timedelta |
| 29 | +from typing import Optional |
| 30 | + |
| 31 | +from momento import ( |
| 32 | + CacheClientAsync, |
| 33 | + Configurations, |
| 34 | + CredentialProvider, |
| 35 | + TopicClientAsync, |
| 36 | + TopicConfigurations, |
| 37 | +) |
| 38 | +from momento.config import Configuration |
| 39 | +from momento.errors import UnknownException |
| 40 | +from momento.internal.services import Service |
| 41 | +from momento.responses import CacheResponse, CacheSet, CreateCache, TopicPublish, TopicSubscribe, TopicSubscriptionItem |
| 42 | +from momento.responses.mixins import ErrorResponseMixin |
| 43 | + |
| 44 | +from example_utils.example_logging import initialize_logging |
| 45 | + |
| 46 | +_AUTH_PROVIDER = CredentialProvider.from_environment_variable("MOMENTO_API_KEY") |
| 47 | +_logger = logging.getLogger("order-system-example") |
| 48 | + |
| 49 | +# Constants |
| 50 | +# The cache where we will store the order status |
| 51 | +_CACHE_NAME = "cache" |
| 52 | +# The topic where we will publish order updates |
| 53 | +_ORDER_TOPIC = "order_updates" |
| 54 | + |
| 55 | + |
| 56 | +################################## |
| 57 | +# Custom Cache Client with Publish |
| 58 | +################################## |
| 59 | +class CacheSetAndPublishResponse(CacheResponse): |
| 60 | + """Parent response type for a `set_and_publish` request. |
| 61 | +
|
| 62 | + Its subtypes are: |
| 63 | + - `CacheSetAndPublish.Success` |
| 64 | + - `CacheSetAndPublish.Error` |
| 65 | +
|
| 66 | + See `CacheClient` for how to work with responses. |
| 67 | + """ |
| 68 | + |
| 69 | + |
| 70 | +class CacheSetAndPublish(ABC): |
| 71 | + """Groups all `CacheSetAndPublish` derived types under a common namespace.""" |
| 72 | + |
| 73 | + class Success(CacheSetAndPublishResponse): |
| 74 | + """Indicates the set succeeded and the publish succeeded.""" |
| 75 | + |
| 76 | + class Error(CacheSetAndPublishResponse, ErrorResponseMixin): |
| 77 | + """Contains information about an error returned from a request. |
| 78 | +
|
| 79 | + This includes: |
| 80 | + - `error_code`: `MomentoErrorCode` value for the error. |
| 81 | + - `messsage`: a detailed error message. |
| 82 | + """ |
| 83 | + |
| 84 | + |
| 85 | +class CacheWithPublishClientAsync(CacheClientAsync): |
| 86 | + """Wrapper around `CacheClientAsync` that adds a `set_and_publish` method.""" |
| 87 | + |
| 88 | + def __init__( |
| 89 | + self, |
| 90 | + configuration: Configuration, |
| 91 | + credential_provider: CredentialProvider, |
| 92 | + default_ttl: timedelta, |
| 93 | + topic_client: TopicClientAsync, |
| 94 | + ): |
| 95 | + super().__init__(configuration, credential_provider, default_ttl) |
| 96 | + self.topic_client = topic_client |
| 97 | + |
| 98 | + async def set_and_publish( |
| 99 | + self, |
| 100 | + cache_name: str, |
| 101 | + topic_name: str, |
| 102 | + key: str, |
| 103 | + value: str, |
| 104 | + ttl: Optional[timedelta] = None, |
| 105 | + ) -> CacheSetAndPublishResponse: |
| 106 | + set_response = await self.set(cache_name, key, value, ttl) |
| 107 | + match set_response: |
| 108 | + case CacheSet.Success(): |
| 109 | + pass |
| 110 | + case CacheSet.Error() as cache_error: |
| 111 | + return CacheSetAndPublish.Error(cache_error._error) |
| 112 | + case _: |
| 113 | + return CacheSetAndPublish.Error( |
| 114 | + UnknownException(f"Unknown response type: {set_response}", service=Service.CACHE) |
| 115 | + ) |
| 116 | + |
| 117 | + publish_response = await self.topic_client.publish(cache_name, topic_name, value) |
| 118 | + match publish_response: |
| 119 | + case TopicPublish.Success(): |
| 120 | + return CacheSetAndPublish.Success() |
| 121 | + case TopicPublish.Error() as topic_error: |
| 122 | + return CacheSetAndPublish.Error(topic_error._error) |
| 123 | + case _: |
| 124 | + return CacheSetAndPublish.Error( |
| 125 | + UnknownException(f"Unknown response type: {publish_response}", service=Service.TOPICS) |
| 126 | + ) |
| 127 | + |
| 128 | + |
| 129 | +async def setup_cache(client: CacheWithPublishClientAsync, cache_name: str) -> None: |
| 130 | + """Ensures that the example cache exists. |
| 131 | +
|
| 132 | + Args: |
| 133 | + client (CacheClientAsync): The cache client to use. |
| 134 | +
|
| 135 | + Raises: |
| 136 | + response.inner_exception: If the cache creation fails. |
| 137 | + """ |
| 138 | + response = await client.create_cache(cache_name) |
| 139 | + match response: |
| 140 | + case CreateCache.Success(): |
| 141 | + _logger.info("Cache created successfully.") |
| 142 | + case CreateCache.CacheAlreadyExists(): |
| 143 | + _logger.info("Using existing cache.") |
| 144 | + case CreateCache.Error(): |
| 145 | + _logger.error(f"Failed to create cache: {response.message}") |
| 146 | + raise response.inner_exception |
| 147 | + |
| 148 | + |
| 149 | +################################## |
| 150 | +# Actors in the Order System |
| 151 | +################################## |
| 152 | + |
| 153 | + |
| 154 | +class Kitchen: |
| 155 | + """Class for the kitchen to update the order status.""" |
| 156 | + |
| 157 | + def __init__(self, cache_with_publish_client: CacheWithPublishClientAsync, cache_name: str, topic_name: str): |
| 158 | + self.cache_with_publish_client = cache_with_publish_client |
| 159 | + self.cache_name = cache_name |
| 160 | + self.topic_name = topic_name |
| 161 | + |
| 162 | + async def update_order_status(self, order_number: int, status: str): |
| 163 | + """Method for the kitchen to update the order status.""" |
| 164 | + order_message = f"Order {order_number}: {status}" |
| 165 | + _logger.info(f"Kitchen updating order {order_number} with status: {status}") |
| 166 | + |
| 167 | + set_and_publish_response = await self.cache_with_publish_client.set_and_publish( |
| 168 | + self.cache_name, self.topic_name, f"order_{order_number}", order_message |
| 169 | + ) |
| 170 | + |
| 171 | + match set_and_publish_response: |
| 172 | + case CacheSetAndPublish.Success(): |
| 173 | + _logger.info(f"Updated and published order status: {order_message}") |
| 174 | + case CacheSetAndPublish.Error() as error: |
| 175 | + _logger.error(f"Failed to update or publish order status: {error.message}") |
| 176 | + return |
| 177 | + case _: |
| 178 | + _logger.error(f"Unexpected response: {set_and_publish_response}") |
| 179 | + return |
| 180 | + |
| 181 | + |
| 182 | +class Waiter: |
| 183 | + """Class for the waiter to poll order status updates and notify the customer.""" |
| 184 | + |
| 185 | + def __init__(self, client: TopicClientAsync, cache_name: str, order_topic: str): |
| 186 | + self.client = client |
| 187 | + self.cache_name = cache_name |
| 188 | + self.order_topic = order_topic |
| 189 | + |
| 190 | + async def poll_order_updates(self): |
| 191 | + """Method for the waiter to poll the order status updates.""" |
| 192 | + subscription = await self.client.subscribe(self.cache_name, self.order_topic) |
| 193 | + match subscription: |
| 194 | + case TopicSubscribe.SubscriptionAsync(): |
| 195 | + _logger.info("Waiter subscribed to order updates.") |
| 196 | + await self.poll_subscription(subscription) |
| 197 | + case TopicSubscribe.Error(): |
| 198 | + _logger.error(f"Subscription error: {subscription.message}") |
| 199 | + |
| 200 | + async def poll_subscription(self, subscription: TopicSubscribe.SubscriptionAsync): |
| 201 | + """Poll and process subscription items.""" |
| 202 | + async for item in subscription: |
| 203 | + match item: |
| 204 | + case TopicSubscriptionItem.Text(): |
| 205 | + _logger.info(f"Waiter received order update: {item.value}") |
| 206 | + self.notify_customer(item.value) |
| 207 | + case TopicSubscriptionItem.Error(): |
| 208 | + _logger.error(f"Stream closed: {item.inner_exception.message}") |
| 209 | + return |
| 210 | + |
| 211 | + def notify_customer(self, update: str): |
| 212 | + """Notify the customer about the order update.""" |
| 213 | + _logger.info(f"Waiter notifies customer: {update}") |
| 214 | + |
| 215 | + |
| 216 | +# Main function to initialize and run the system |
| 217 | +async def main() -> None: |
| 218 | + initialize_logging() |
| 219 | + |
| 220 | + async with TopicClientAsync( |
| 221 | + TopicConfigurations.Default.latest(), _AUTH_PROVIDER |
| 222 | + ) as topic_client, CacheWithPublishClientAsync( |
| 223 | + Configurations.Laptop.latest(), _AUTH_PROVIDER, timedelta(seconds=60), topic_client |
| 224 | + ) as cache_with_publish_client: |
| 225 | + await setup_cache(cache_with_publish_client, _CACHE_NAME) |
| 226 | + kitchen = Kitchen(cache_with_publish_client, _CACHE_NAME, _ORDER_TOPIC) |
| 227 | + waiter = Waiter(topic_client, _CACHE_NAME, _ORDER_TOPIC) |
| 228 | + |
| 229 | + waiter_task = asyncio.create_task(waiter.poll_order_updates()) |
| 230 | + await asyncio.sleep(1) |
| 231 | + _logger.info("The waiter is ready to update customers.") |
| 232 | + |
| 233 | + # Kitchen updates the order status |
| 234 | + await kitchen.update_order_status(order_number=1, status="Preparing") |
| 235 | + |
| 236 | + # Simulate kitchen preparing the order |
| 237 | + await asyncio.sleep(2) |
| 238 | + |
| 239 | + # Kitchen updates the order status |
| 240 | + await kitchen.update_order_status(order_number=1, status="Ready to Serve") |
| 241 | + |
| 242 | + # Simulate waiter serving the order |
| 243 | + await asyncio.sleep(5) |
| 244 | + _logger.info("The waiter has served the order.") |
| 245 | + |
| 246 | + # Now cancel the waiter task |
| 247 | + waiter_task.cancel() |
| 248 | + |
| 249 | + try: |
| 250 | + await waiter_task |
| 251 | + except asyncio.CancelledError: |
| 252 | + _logger.info("Waiter task cancelled successfully.") |
| 253 | + |
| 254 | + |
| 255 | +if __name__ == "__main__": |
| 256 | + asyncio.run(main()) |
0 commit comments