From 802fc0d3be5e91be4e8c74290c50e62af46752da Mon Sep 17 00:00:00 2001 From: shopifyalan <106708662+shopifyalan@users.noreply.github.com> Date: Tue, 25 Oct 2022 10:13:15 -0400 Subject: [PATCH 1/3] Add GCP Pub/Sub event source --- plugins/event_source/gcp_pubsub.py | 68 +++++++++++++++++++++++++++++ requirements.txt | 4 +- rulebooks/gcp-pubsub-test-rules.yml | 20 +++++++++ 3 files changed, 91 insertions(+), 1 deletion(-) create mode 100644 plugins/event_source/gcp_pubsub.py create mode 100644 rulebooks/gcp-pubsub-test-rules.yml diff --git a/plugins/event_source/gcp_pubsub.py b/plugins/event_source/gcp_pubsub.py new file mode 100644 index 00000000..e1eb0a74 --- /dev/null +++ b/plugins/event_source/gcp_pubsub.py @@ -0,0 +1,68 @@ +""" +gcp_pubsub.py + +An ansible-events event source module for receiving events from GCP Pub/Sub + +Arguments: + project_id: The GCP project name + subscription_id: The name of the topic to pull messages from + max_messages: The number of messages to retreive + Default 3 + retry_deadline: How long to keep retrying in seconds + Default 300 + +Example: + + - ansible.eda.gpc_pubsub: + project_id: "{{ project_id }}" + subscription_id: "{{ subscription_id }}" + max_messages: "{{ max_messages }}" + retry_deadline: "{{ retry_deadline }}" + +""" + +import asyncio +from typing import Any, Dict + +from google.api_core import retry +from google.cloud import pubsub_v1 + +async def main(queue: asyncio.Queue, args: Dict[str, Any]): + subscriber = pubsub_v1.SubscriberClient() + + subscription_path = subscriber.subscription_path(args.get("project_id"), args.get("subscription_id")) + + with subscriber: + while True: + response = subscriber.pull( + request={"subscription": subscription_path, "max_messages": args.get("max_messages", 3)}, + retry=retry.Retry(deadline=args.get("retry_deadline", 300)), + ) + + if len(response.received_messages) == 0: + continue + + ack_ids = [] + for received_message in response.received_messages: + data = {"message": received_message.message.data.decode(), + "attributes": dict(received_message.message.attributes)} + + await queue.put(data) + + ack_ids.append(received_message.ack_id) + + subscriber.acknowledge( + request={"subscription": subscription_path, "ack_ids": ack_ids} + ) + + await asyncio.sleep(1) + +if __name__ == "__main__": + class MockQueue: + @staticmethod + async def put(self, event): + print(event) + + asyncio.run(main(MockQueue(), {"project_id": "lab", "subscription_id": "eda", + "max_messages": 3, "retry_deadline": 300})) + diff --git a/requirements.txt b/requirements.txt index c7eb22f0..facbecb1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,6 @@ azure-servicebus aiohttp aiokafka watchdog -asyncio \ No newline at end of file +asyncio +google-cloud-pubsub +protobuf==3.20.* diff --git a/rulebooks/gcp-pubsub-test-rules.yml b/rulebooks/gcp-pubsub-test-rules.yml new file mode 100644 index 00000000..d1510bc5 --- /dev/null +++ b/rulebooks/gcp-pubsub-test-rules.yml @@ -0,0 +1,20 @@ +--- +- name: Listen for events on a GPC Pub/Sub topic + hosts: all + + ## Define our source for events + + sources: + - ansible.eda.gcp_pubsub: + project_id: "playground-s-11-73f6aaa0" + subscription_id: "test" + + ## Define the conditions we are looking for + + rules: + - name: Message attribute action equals test + condition: event.attributes.action == "test" + + ## Define the action we should take should the condition be met + action: + debug: From 13f61906e32afca31213ad34f9d41261ac55d7c4 Mon Sep 17 00:00:00 2001 From: shopifyalan <106708662+shopifyalan@users.noreply.github.com> Date: Mon, 31 Oct 2022 09:29:44 -0400 Subject: [PATCH 2/3] Update rulebook project --- rulebooks/gcp-pubsub-test-rules.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rulebooks/gcp-pubsub-test-rules.yml b/rulebooks/gcp-pubsub-test-rules.yml index d1510bc5..a24f40b0 100644 --- a/rulebooks/gcp-pubsub-test-rules.yml +++ b/rulebooks/gcp-pubsub-test-rules.yml @@ -6,7 +6,7 @@ sources: - ansible.eda.gcp_pubsub: - project_id: "playground-s-11-73f6aaa0" + project_id: "eda" subscription_id: "test" ## Define the conditions we are looking for From cb77e8e58df6e4dc56908f77b09147cf772466f4 Mon Sep 17 00:00:00 2001 From: shopifyalan <106708662+shopifyalan@users.noreply.github.com> Date: Mon, 31 Oct 2022 10:03:54 -0400 Subject: [PATCH 3/3] Resolve sanity test issues --- plugins/event_source/gcp_pubsub.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/plugins/event_source/gcp_pubsub.py b/plugins/event_source/gcp_pubsub.py index e1eb0a74..fd44ee33 100644 --- a/plugins/event_source/gcp_pubsub.py +++ b/plugins/event_source/gcp_pubsub.py @@ -1,10 +1,10 @@ """ gcp_pubsub.py -An ansible-events event source module for receiving events from GCP Pub/Sub +An ansible-events event source module for receiving events from GCP Pub/Sub Arguments: - project_id: The GCP project name + project_id: The GCP project name subscription_id: The name of the topic to pull messages from max_messages: The number of messages to retreive Default 3 @@ -27,6 +27,7 @@ from google.api_core import retry from google.cloud import pubsub_v1 + async def main(queue: asyncio.Queue, args: Dict[str, Any]): subscriber = pubsub_v1.SubscriberClient() @@ -45,7 +46,7 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): ack_ids = [] for received_message in response.received_messages: data = {"message": received_message.message.data.decode(), - "attributes": dict(received_message.message.attributes)} + "attributes": dict(received_message.message.attributes)} await queue.put(data) @@ -60,9 +61,8 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]): if __name__ == "__main__": class MockQueue: @staticmethod - async def put(self, event): + async def put(event): print(event) asyncio.run(main(MockQueue(), {"project_id": "lab", "subscription_id": "eda", "max_messages": 3, "retry_deadline": 300})) -