diff --git a/tools/cloud_run/event_driven_bq_ingest/README.md b/tools/cloud_run/event_driven_bq_ingest/README.md
new file mode 100644
index 000000000..c6fa6ff43
--- /dev/null
+++ b/tools/cloud_run/event_driven_bq_ingest/README.md
@@ -0,0 +1,95 @@
+# Cloud Run Event Driven Pipeline Tutorial
+
+This sample shows how to create an event driven pipeline on Cloud Run via Pub/Sub
+
+[](https://ssh.cloud.google.com/cloudshell/open?cloudshell_git_repo=https://github.com/yrvine-g/bigquery-utils.git&cloudshell_tutorial=tools/cloud_run/event_driven_bq_ingest/README.md)
+
+
+
+## Dependencies
+
+* **Spring Boot**: Web server framework.
+* **Jib**: Container build tool.
+
+## Setup
+Create GCS bucket and a folder path with the following format:
+gs://bucket/project/dataset/table_name/*.avro
+
+Enable following APIs:
+* container registry
+* cloud run handler
+
+```sh
+gcloud config set project MY_PROJECT
+```
+
+Configure environment variables:
+
+```sh
+export MY_RUN_SERVICE=run-service
+export MY_RUN_CONTAINER=run-container
+export PROJECT=$(gcloud config get-value project)
+export MY_GCS_BUCKET="$(gcloud config get-value project)-gcs-bucket"
+export REGION=us-central1
+export SERVICE_ACCOUNT=cloud-run-pubsub-invoker
+```
+
+## Quickstart
+
+Use the [Jib Maven Plugin](https://github.com/GoogleContainerTools/jib/tree/master/jib-maven-plugin) to build and push container image:
+
+Switch to directory with .pom
+
+```sh
+mvn compile jib:build -Dimage=gcr.io/$PROJECT/$MY_RUN_CONTAINER
+```
+
+Deploy Cloud Run service:
+```sh
+gcloud config set run/region $REGION
+gcloud run deploy $MY_RUN_SERVICE \
+--image gcr.io/$PROJECT/$MY_RUN_CONTAINER \
+--no-allow-unauthenticated
+```
+
+
+Create PubSub Topic and GCS notification
+```sh
+gcloud pubsub topics create pipelineNotification
+
+gsutil notification create -f json -t pipelineNotification -e OBJECT_FINALIZE gs://"$MY_GCS_BUCKET"
+```
+
+
+Create Service Account for Cloud Run and Pub/Sub permissions
+```sh
+gcloud iam service-accounts create $SERVICE_ACCOUNT \
+--display-name "Cloud Run Pub/Sub Invoker"
+gcloud run services add-iam-policy-binding $MY_RUN_SERVICE \
+--member=serviceAccount:$SERVICE_ACCOUNT@$PROJECT.iam.gserviceaccount.com \
+--role=roles/run.invoker
+```
+
+Create Pub/Sub Subscription
+```sh
+export RUN_SERVICE_URL=$(gcloud run services describe $MY_RUN_SERVICE --format='value(status.url)')
+gcloud pubsub subscriptions create pipelineTrigger --topic pipelineNotification \
+ --push-endpoint=$RUN_SERVICE_URL \
+ --push-auth-service-account=$SERVICE_ACCOUNT@$PROJECT.iam.gserviceaccount.com \
+ --ack-deadline=600
+```
+
+
+Create log sink
+```shell
+export PROJECT_NO=$(gcloud projects list --filter="$PROJECT" --format="value(PROJECT_NUMBER)")
+gcloud logging sinks create bq-job-completed \
+pubsub.googleapis.com/projects/$PROJECT/topics/pipelineNotification \
+ --log-filter='resource.type="bigquery_project"
+severity=INFO
+protoPayload.metadata.jobChange.job.jobStatus.jobState="DONE"
+protoPayload.authenticationInfo.principalEmail="'$PROJECT_NO'-compute@developer.gserviceaccount.com"'
+export LOG_SERVICE_ACCOUNT=$(gcloud logging sinks describe bq-job-completed --format='value(writerIdentity)')
+gcloud pubsub topics add-iam-policy-binding pipelineNotification \
+ --member=serviceAccount:LOG_SERVICE_ACCOUNT --role=roles/pubsub.publisher
+```
diff --git a/tools/cloud_run/event_driven_bq_ingest/pom.xml b/tools/cloud_run/event_driven_bq_ingest/pom.xml
new file mode 100644
index 000000000..a65cf1419
--- /dev/null
+++ b/tools/cloud_run/event_driven_bq_ingest/pom.xml
@@ -0,0 +1,135 @@
+
+
+
+ 4.0.0
+ com.example.cloudrun
+ pubsub
+ 0.0.1-SNAPSHOT
+
+
+ com.google.cloud.samples
+ shared-configuration
+ 1.0.23
+
+
+
+ 11
+ 11
+ 1.18.12
+ 2.13.0
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-dependencies
+ 2.5.5
+ pom
+ import
+
+
+ org.springframework.cloud
+ spring-cloud-dependencies
+ 2020.0.4
+ pom
+ import
+
+
+ com.google.cloud
+ libraries-bom
+ 24.0.0
+ pom
+ import
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+ org.apache.commons
+ commons-lang3
+ 3.12.0
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ com.google.cloud
+ google-cloud-bigquery
+
+
+ com.google.cloud
+ google-cloud-storage
+ 2.1.9
+
+
+ junit
+ junit
+ 4.13.2
+ test
+
+
+ org.projectlombok
+ lombok
+ ${lombok-version}
+ true
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ 2.11.0
+
+
+
+ com.google.apis
+ google-api-services-pubsub
+ v1-rev452-1.25.0
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+ 2.5.4
+
+
+ com.google.cloud.tools
+ jib-maven-plugin
+ 3.1.4
+
+
+ gcr.io/PROJECT_ID/event_driven_pipeline
+
+
+
+
+
+
diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BQAccessor.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BQAccessor.java
new file mode 100644
index 000000000..aaa21972f
--- /dev/null
+++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BQAccessor.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.cloudrun;
+
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.cloud.bigquery.ExternalTableDefinition;
+import com.google.cloud.bigquery.FormatOptions;
+import com.google.cloud.bigquery.Job;
+import com.google.cloud.bigquery.JobInfo;
+import com.google.cloud.bigquery.JobId;
+import com.google.cloud.bigquery.QueryJobConfiguration;
+import lombok.extern.log4j.Log4j2;
+
+@Log4j2
+public class BQAccessor {
+
+ private static final String EXTERNAL_TABLE_NAME = "externalTable";
+
+ public static void insertIntoBQ(
+ GCSNotificationMetadata.GCSObjectProperties GCSObjectProperties, String fileFormat) {
+
+ // create sourceUri with format --> gs://bucket/project/dataset/table/table*
+ String sourceUri =
+ String.format(
+ "gs://%s/%s/%s/%s/%s*",
+ GCSObjectProperties.getBucketId(),
+ GCSObjectProperties.getProject(),
+ GCSObjectProperties.getDataset(),
+ GCSObjectProperties.getTable(),
+ GCSObjectProperties.getTable());
+ log.info("source URI is: {}", sourceUri);
+
+ try {
+
+ // Initialize client that will be used to send requests. This client only needs to be created
+ // once, and can be reused for multiple requests.
+ BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
+
+ FormatOptions format = FormatOptions.of(fileFormat);
+
+ ExternalTableDefinition externalTable =
+ ExternalTableDefinition.newBuilder(sourceUri, format).build();
+
+ log.info("external table config: {}", externalTable);
+
+ String query =
+ String.format(
+ "INSERT INTO `%s.%s.%s` SELECT * FROM %s",
+ GCSObjectProperties.getProject(),
+ GCSObjectProperties.getDataset(),
+ GCSObjectProperties.getTable(),
+ EXTERNAL_TABLE_NAME);
+ QueryJobConfiguration queryConfig =
+ QueryJobConfiguration.newBuilder(query)
+ .addTableDefinition(EXTERNAL_TABLE_NAME, externalTable)
+ .build();
+ log.info("query fired: {}", query);
+ JobInfo jobInfo = JobInfo.of(queryConfig);
+ Job job = bigquery.create(jobInfo);
+ JobId jobId = job.getJobId();
+ log.info("job id: {}", jobId);
+
+ } catch (Exception e) {
+ throw new RuntimeException("Exception occured during insertion to BQ", e);
+ }
+ }
+}
diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BigQueryLogMetadata.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BigQueryLogMetadata.java
new file mode 100644
index 000000000..61a19afa6
--- /dev/null
+++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/BigQueryLogMetadata.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.cloudrun;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.extern.log4j.Log4j2;
+
+@Log4j2
+public class BigQueryLogMetadata extends GenericMessage {
+
+ private String insertId;
+ private ProtoPayload protoPayload;
+
+ public BigQueryLogMetadata(
+ @JsonProperty("insertId") String insertId,
+ @JsonProperty(value = "protoPayload", required = true) ProtoPayload protoPayload) {
+ this.insertId = insertId;
+ this.protoPayload = protoPayload;
+ }
+
+ public String getInsertId() {
+ return insertId;
+ }
+
+ public void setInsertId(String insertId) {
+ this.insertId = insertId;
+ }
+
+ public ProtoPayload getProtoPayload() {
+ return protoPayload;
+ }
+
+ public void setProtoPayload(ProtoPayload protoPayload) {
+ this.protoPayload = protoPayload;
+ }
+
+ public static class ProtoPayload {
+ private String resourceName;
+
+ public ProtoPayload(
+ @JsonProperty(value = "resourceName", required = true) String resourceName) {
+ this.resourceName = resourceName;
+ }
+
+ public String getResourceName() {
+ return resourceName;
+ }
+
+ public void setResourceName(String resourceName) {
+ this.resourceName = resourceName;
+ }
+ }
+}
diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSAccessor.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSAccessor.java
new file mode 100644
index 000000000..b556d01d3
--- /dev/null
+++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSAccessor.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.cloudrun;
+
+import com.google.api.gax.paging.Page;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.CopyWriter;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+
+import java.util.Arrays;
+
+import lombok.extern.log4j.Log4j2;
+
+@Log4j2
+public class GCSAccessor {
+
+ private static final Storage storage = StorageOptions.getDefaultInstance().getService();
+
+ public static void archiveFiles(String sourceUri) {
+
+ String sourceBucketName = getBucketName(sourceUri);
+ String targetBucketName = sourceBucketName + "_archival";
+ String sourceObjectName = getObjectName(sourceUri);
+ log.info("source uri: {}", sourceUri);
+ log.info("sourceBucketName: {}", sourceBucketName);
+ log.info("targetBucketName: {}", targetBucketName);
+ log.info("sourceObjectName: {}", sourceObjectName);
+
+ try {
+
+ Page blobs =
+ storage.list(sourceBucketName, Storage.BlobListOption.prefix(sourceObjectName));
+ log.info("Iterating the blobs");
+ for (Blob blob : blobs.iterateAll()) {
+ log.info("Blob is: " + blob.getName());
+ if (!blob.getName().equalsIgnoreCase(sourceObjectName)) {
+ CopyWriter copyWriter = blob.copyTo(targetBucketName, blob.getName());
+ Blob copiedBlob = copyWriter.getResult();
+ }
+ blob.delete();
+ }
+ } catch (RuntimeException e) {
+ log.error("failed to process the message", e);
+ throw new RuntimeException("failed to archive files", e);
+ }
+ }
+
+ public static String getBucketName(String filePath) {
+ String[] path = filePath.replace("gs://", "").split("/");
+
+ String bucket = path[0];
+ return bucket;
+ }
+
+ public static String getObjectName(String filePath) {
+ String[] path = filePath.replace("gs://", "").split("/");
+ String objectName = String.join("/", Arrays.copyOfRange(path, 1, path.length));
+
+ return objectName;
+ }
+}
diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSNotificationMetadata.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSNotificationMetadata.java
new file mode 100644
index 000000000..d670c2c51
--- /dev/null
+++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GCSNotificationMetadata.java
@@ -0,0 +1,73 @@
+package com.example.cloudrun;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Builder;
+import lombok.Value;
+import lombok.extern.log4j.Log4j2;
+
+@Log4j2
+public class GCSNotificationMetadata extends GenericMessage{
+ private String name;
+ private String bucket;
+
+ public GCSNotificationMetadata() {}
+
+ public GCSNotificationMetadata(@JsonProperty(value = "name", required = true) String name, @JsonProperty(value = "bucket", required = true) String bucket) {
+ this.name = name;
+ this.bucket = bucket;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getBucket() {
+ return bucket;
+ }
+
+ public void setBucket(String bucket) {
+ this.bucket = bucket;
+ }
+
+ public GCSObjectProperties getGCSObjectProperties() {
+ String bucketId = this.bucket;
+ String objectId = this.name;
+
+ // return null if there is no objectId
+ if (objectId == null || bucketId == null) {
+ return null;
+ }
+
+ String[] parsedObjectId = objectId.split("/");
+
+ if (parsedObjectId.length < 4) {
+ throw new RuntimeException("The object id is formatted incorrectly");
+ }
+ String project = parsedObjectId[0];
+ String dataset = parsedObjectId[1];
+ String table = parsedObjectId[2];
+ String triggerFileName = parsedObjectId[3];
+
+ return GCSObjectProperties.builder()
+ .bucketId(bucketId)
+ .project(project)
+ .dataset(dataset)
+ .table(table)
+ .triggerFile(triggerFileName)
+ .build();
+ }
+
+ @Value
+ @Builder
+ public static class GCSObjectProperties {
+ String bucketId;
+ String project;
+ String dataset;
+ String table;
+ String triggerFile;
+ }
+}
diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GenericMessage.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GenericMessage.java
new file mode 100644
index 000000000..ac57a7d32
--- /dev/null
+++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/GenericMessage.java
@@ -0,0 +1,3 @@
+package com.example.cloudrun;
+
+public abstract class GenericMessage {}
diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/JobAccessor.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/JobAccessor.java
new file mode 100644
index 000000000..ff5ca3019
--- /dev/null
+++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/JobAccessor.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.cloudrun;
+
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryException;
+import com.google.cloud.bigquery.BigQueryOptions;
+import com.google.cloud.bigquery.ExternalTableDefinition;
+import com.google.cloud.bigquery.Job;
+import com.google.cloud.bigquery.JobId;
+import com.google.cloud.bigquery.QueryJobConfiguration;
+import com.google.cloud.bigquery.JobConfiguration;
+import lombok.extern.log4j.Log4j2;
+
+import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
+
+@Log4j2
+public class JobAccessor {
+
+ public static List checkJobCompeletion(BigQueryLogMetadata bigQueryLogMetadata) {
+ log.info("Resource Name:{}", bigQueryLogMetadata.getProtoPayload().getResourceName());
+
+ try {
+ String resourceName = bigQueryLogMetadata.getProtoPayload().getResourceName();
+ String[] parsedName = resourceName.split("/");
+ String jobName = parsedName[parsedName.length - 1];
+
+ BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
+ JobId jobId = JobId.of(jobName);
+ Job job = bigquery.getJob(jobId);
+
+ List allSourceUris = new ArrayList();
+ // check for error
+ if (job.isDone() && job.getStatus().getError() == null) {
+ log.info("Job successfully loaded BQ table");
+ JobConfiguration jobConfig = job.getConfiguration();
+ if (jobConfig instanceof QueryJobConfiguration) {
+ Map tableConfigs =
+ ((QueryJobConfiguration) jobConfig).getTableDefinitions();
+ tableConfigs.forEach(
+ (tableKey, tableValue) ->
+ tableValue
+ .getSourceUris()
+ .forEach((item) -> allSourceUris.add(removeWildcard(item))));
+ } else {
+ log.info("job configuration is not an instance");
+ }
+ } else {
+ log.info(
+ "BigQuery was unable to load into the table due to an error:"
+ + job.getStatus().getError());
+ throw new RuntimeException(
+ "BigQuery was unable to load into the table due to an error: "
+ + job.getStatus().getError().getMessage());
+ }
+ return allSourceUris;
+ } catch (NullPointerException | BigQueryException e) {
+ log.info("Job not retrieved. \n" + e);
+ return null;
+ }
+ }
+
+ public static String removeWildcard(String str) {
+ String[] arrOfStr = str.split("\\*", 0);
+ return arrOfStr[0];
+ }
+}
diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineApplication.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineApplication.java
new file mode 100644
index 000000000..fa064b9d2
--- /dev/null
+++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineApplication.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.cloudrun;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class PipelineApplication {
+ public static void main(String[] args) {
+ SpringApplication.run(PipelineApplication.class, args);
+ }
+}
diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineController.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineController.java
new file mode 100644
index 000000000..7cbdb9a45
--- /dev/null
+++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineController.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.cloudrun;
+
+import lombok.extern.log4j.Log4j2;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.util.List;
+import com.google.api.services.pubsub.model.PubsubMessage;
+
+@RestController
+@Log4j2
+public class PipelineController {
+
+ private static final String TRIGGER_FILE_NAME = "trigger.txt";
+ private static final String FILE_FORMAT = "AVRO";
+
+ @RequestMapping(value = "/", method = RequestMethod.POST)
+ public ResponseEntity receiveMessage(@RequestBody PipelineRequestBody request) {
+ // Get PubSub pubSubMessage from request body.
+ PubsubMessage pubSubMessage = request.getMessage();
+ if (pubSubMessage == null) {
+ log.info("Bad Request: invalid Pub/Sub pubSubMessage format");
+ return new ResponseEntity("invalid Pub/Sub pubSubMessage", HttpStatus.BAD_REQUEST);
+ }
+ GenericMessage metadata = PubSubMessageParser.parseMessage(pubSubMessage);
+ if (metadata == null) {
+ log.info("Bad Request: message does not follow format for data extraction");
+ return new ResponseEntity("invalid message data", HttpStatus.BAD_REQUEST);
+ } else if (metadata instanceof GCSNotificationMetadata) {
+ // handle gcs notification
+ GCSNotificationMetadata gcsNotificationMetadata = (GCSNotificationMetadata) metadata;
+ GCSNotificationMetadata.GCSObjectProperties gcsObjectProperties =
+ gcsNotificationMetadata.getGCSObjectProperties();
+ if (TRIGGER_FILE_NAME.equals(gcsObjectProperties.getTriggerFile())) {
+ log.info("Found Trigger file, started BQ insert");
+ BQAccessor.insertIntoBQ(gcsObjectProperties, FILE_FORMAT);
+ return new ResponseEntity("triggered successfully", HttpStatus.OK);
+ } else {
+ log.info("Not trigger file");
+ return new ResponseEntity("Not trigger file", HttpStatus.OK);
+ }
+ } else if (metadata instanceof BigQueryLogMetadata) {
+ // handle bq job complete notification
+ BigQueryLogMetadata bigQueryLogMetadata = (BigQueryLogMetadata) metadata;
+ List sourceUris = JobAccessor.checkJobCompeletion(bigQueryLogMetadata);
+ sourceUris.forEach((sourceUri -> GCSAccessor.archiveFiles(sourceUri)));
+ return new ResponseEntity("job completed", HttpStatus.OK);
+ } else {
+ log.error("failed to process the message");
+ return new ResponseEntity("failed to process the message", HttpStatus.OK);
+ }
+ }
+}
diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineRequestBody.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineRequestBody.java
new file mode 100644
index 000000000..d33343bd3
--- /dev/null
+++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PipelineRequestBody.java
@@ -0,0 +1,17 @@
+package com.example.cloudrun;
+
+import com.google.api.services.pubsub.model.PubsubMessage;
+
+public class PipelineRequestBody {
+ private PubsubMessage message;
+
+ public PipelineRequestBody() {}
+
+ public PubsubMessage getMessage() {
+ return message;
+ }
+
+ public void setMessage(PubsubMessage message) {
+ this.message = message;
+ }
+}
diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageParser.java b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageParser.java
new file mode 100644
index 000000000..bf5fbd0f1
--- /dev/null
+++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/PubSubMessageParser.java
@@ -0,0 +1,38 @@
+package com.example.cloudrun;
+
+import java.util.Map;
+import lombok.extern.log4j.Log4j2;
+import java.util.Base64;
+import org.apache.commons.lang3.StringUtils;
+import com.fasterxml.jackson.databind.*;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.api.services.pubsub.model.PubsubMessage;
+
+@Log4j2
+public class PubSubMessageParser {
+ public static GenericMessage parseMessage(PubsubMessage message) {
+ String data = message.getData();
+ if (data == null) {
+ return null;
+ } else {
+ String dataStr =
+ !StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : "";
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
+ try {
+ GenericMessage dataObj = mapper.readValue(dataStr, GCSNotificationMetadata.class);
+
+ return dataObj;
+ } catch (JsonProcessingException error1) {
+ try {
+ GenericMessage dataObj = mapper.readValue(dataStr, BigQueryLogMetadata.class);
+ return dataObj;
+ } catch (JsonProcessingException error2) {
+ log.error("failed to parse GCS Notification metadata", error1);
+ log.error("failed to parse BQ Log metadata", error2);
+ return null;
+ }
+ }
+ }
+ }
+}
diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/testdata.json b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/testdata.json
new file mode 100644
index 000000000..12ca354e9
--- /dev/null
+++ b/tools/cloud_run/event_driven_bq_ingest/src/main/java/com/example/cloudrun/testdata.json
@@ -0,0 +1,11 @@
+{
+ "message": {
+ "data": "dGVzdA==",
+ "attributes": {
+ "bucketId": "event-driven-pipeline-bucket",
+ "objectId": "yrvine-rotation-demo/san_francisco_bikeshare/bikeshare_regions/trigger.txt"
+ },
+ "publishTime": "2017-09-25T23:16:42.302Z",
+ "messageId": "91010751788941"
+ }
+}
diff --git a/tools/cloud_run/event_driven_bq_ingest/src/main/resources/application.properties b/tools/cloud_run/event_driven_bq_ingest/src/main/resources/application.properties
new file mode 100644
index 000000000..81d437a30
--- /dev/null
+++ b/tools/cloud_run/event_driven_bq_ingest/src/main/resources/application.properties
@@ -0,0 +1,14 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+server.port=${PORT:8080}
diff --git a/tools/cloud_run/event_driven_bq_ingest/src/test/java/com/example/cloudrun/PubSubControllerTests.java b/tools/cloud_run/event_driven_bq_ingest/src/test/java/com/example/cloudrun/PubSubControllerTests.java
new file mode 100644
index 000000000..fa388b9de
--- /dev/null
+++ b/tools/cloud_run/event_driven_bq_ingest/src/test/java/com/example/cloudrun/PubSubControllerTests.java
@@ -0,0 +1,81 @@
+/*
+ * Copyright 2019 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.cloudrun;
+
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.http.MediaType;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.test.web.servlet.MockMvc;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest
+@AutoConfigureMockMvc
+public class PubSubControllerTests {
+
+ @Autowired private MockMvc mockMvc;
+
+ @Test
+ public void addEmptyBody() throws Exception {
+ mockMvc.perform(post("/")).andExpect(status().isBadRequest());
+ }
+
+ @Test
+ public void addNoMessage() throws Exception {
+ String mockBody = "{}";
+
+ mockMvc
+ .perform(post("/").contentType(MediaType.APPLICATION_JSON).content(mockBody))
+ .andExpect(status().isBadRequest());
+ }
+
+ @Test
+ public void addInvalidMimetype() throws Exception {
+ String mockBody = "{\"message\":{\"data\":\"dGVzdA==\","
+ + "\"attributes\":{},\"messageId\":\"91010751788941\""
+ + ",\"publishTime\":\"2017-09-25T23:16:42.302Z\"}}";
+
+ mockMvc
+ .perform(post("/").contentType(MediaType.TEXT_HTML).content(mockBody))
+ .andExpect(status().isUnsupportedMediaType());
+ }
+
+ @Test
+ public void addMinimalBody() throws Exception {
+ String mockBody = "{\"message\":{}}";
+
+ mockMvc
+ .perform(post("/").contentType(MediaType.APPLICATION_JSON).content(mockBody))
+ .andExpect(status().isOk());
+ }
+
+ @Test
+ public void addFullBody() throws Exception {
+ String mockBody = "{\"message\":{\"data\":\"dGVzdA==\","
+ + "\"attributes\":{},\"messageId\":\"91010751788941\""
+ + ",\"publishTime\":\"2017-09-25T23:16:42.302Z\"}}";
+ mockMvc
+ .perform(post("/").contentType(MediaType.APPLICATION_JSON).content(mockBody))
+ .andExpect(status().isOk());
+ }
+}