Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions tools/cloud_run/event_driven_bq_ingest/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Cloud Run Event Driven Pipeline Tutorial

This sample shows how to create an event driven pipeline on Cloud Run via Pub/Sub

[![Open in Cloud Shell](https://gstatic.com/cloudssh/images/open-btn.png)](https://ssh.cloud.google.com/cloudshell/open?cloudshell_git_repo=https://github.com/yrvine-g/bigquery-utils.git&cloudshell_tutorial=tools/cloud_run/event_driven_bq_ingest/README.md)



## Dependencies

* **Spring Boot**: Web server framework.
* **Jib**: Container build tool.

## Setup
Create GCS bucket and a folder path with the following format:
gs://bucket/project/dataset/table_name/*.avro

Enable following APIs:
* container registry
* cloud run handler

```sh
gcloud config set project MY_PROJECT
```

Configure environment variables:

```sh
export MY_RUN_SERVICE=run-service
export MY_RUN_CONTAINER=run-container
export PROJECT=$(gcloud config get-value project)
export MY_GCS_BUCKET="$(gcloud config get-value project)-gcs-bucket"
export REGION=us-central1
export SERVICE_ACCOUNT=cloud-run-pubsub-invoker
```

## Quickstart

Use the [Jib Maven Plugin](https://github.com/GoogleContainerTools/jib/tree/master/jib-maven-plugin) to build and push container image:

Switch to directory with .pom

```sh
mvn compile jib:build -Dimage=gcr.io/$PROJECT/$MY_RUN_CONTAINER
```

Deploy Cloud Run service:
```sh
gcloud config set run/region $REGION
gcloud run deploy $MY_RUN_SERVICE \
--image gcr.io/$PROJECT/$MY_RUN_CONTAINER \
--no-allow-unauthenticated
```


Create PubSub Topic and GCS notification
```sh
gcloud pubsub topics create pipelineNotification

gsutil notification create -f json -t pipelineNotification -e OBJECT_FINALIZE gs://"$MY_GCS_BUCKET"
```


Create Service Account for Cloud Run and Pub/Sub permissions
```sh
gcloud iam service-accounts create $SERVICE_ACCOUNT \
--display-name "Cloud Run Pub/Sub Invoker"
gcloud run services add-iam-policy-binding $MY_RUN_SERVICE \
--member=serviceAccount:$SERVICE_ACCOUNT@$PROJECT.iam.gserviceaccount.com \
--role=roles/run.invoker
```

Create Pub/Sub Subscription
```sh
export RUN_SERVICE_URL=$(gcloud run services describe $MY_RUN_SERVICE --format='value(status.url)')
gcloud pubsub subscriptions create pipelineTrigger --topic pipelineNotification \
--push-endpoint=$RUN_SERVICE_URL \
--push-auth-service-account=$SERVICE_ACCOUNT@$PROJECT.iam.gserviceaccount.com \
--ack-deadline=600
```


Create log sink
```shell
export PROJECT_NO=$(gcloud projects list --filter="$PROJECT" --format="value(PROJECT_NUMBER)")
gcloud logging sinks create bq-job-completed \
pubsub.googleapis.com/projects/$PROJECT/topics/pipelineNotification \
--log-filter='resource.type="bigquery_project"
severity=INFO
protoPayload.metadata.jobChange.job.jobStatus.jobState="DONE"
protoPayload.authenticationInfo.principalEmail="'$PROJECT_NO'[email protected]"'
export LOG_SERVICE_ACCOUNT=$(gcloud logging sinks describe bq-job-completed --format='value(writerIdentity)')
gcloud pubsub topics add-iam-policy-binding pipelineNotification \
--member=serviceAccount:LOG_SERVICE_ACCOUNT --role=roles/pubsub.publisher
```
135 changes: 135 additions & 0 deletions tools/cloud_run/event_driven_bq_ingest/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
/*
* Copyright 2022 Google LLC

*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.cloudrun</groupId>
<artifactId>pubsub</artifactId>
<version>0.0.1-SNAPSHOT</version>
<!-- The parent pom defines common style checks and testing strategies for our samples.
Removing or replacing it should not affect the execution of the samples in anyway. -->
<parent>
<groupId>com.google.cloud.samples</groupId>
<artifactId>shared-configuration</artifactId>
<version>1.0.23</version>
</parent>

<properties>
<maven.compiler.target>11</maven.compiler.target>
<maven.compiler.source>11</maven.compiler.source>
<lombok-version>1.18.12</lombok-version>
<jackson.version.core>2.13.0</jackson.version.core>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<!-- Import dependency management from Spring Boot -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.5.5</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>2020.0.4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>24.0.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
<version>2.1.9</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok-version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.apis/google-api-services-pubsub -->
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-pubsub</artifactId>
<version>v1-rev452-1.25.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.5.4</version>
</plugin>
<plugin>
<groupId>com.google.cloud.tools</groupId>
<artifactId>jib-maven-plugin</artifactId>
<version>3.1.4</version>
<configuration>
<to>
<image>gcr.io/PROJECT_ID/event_driven_pipeline</image>
</to>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example.cloudrun;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.ExternalTableDefinition;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.QueryJobConfiguration;
import lombok.extern.log4j.Log4j2;

@Log4j2
public class BQAccessor {

private static final String EXTERNAL_TABLE_NAME = "externalTable";

public static void insertIntoBQ(
GCSNotificationMetadata.GCSObjectProperties GCSObjectProperties, String fileFormat) {

// create sourceUri with format --> gs://bucket/project/dataset/table/table*
String sourceUri =
String.format(
"gs://%s/%s/%s/%s/%s*",
GCSObjectProperties.getBucketId(),
GCSObjectProperties.getProject(),
GCSObjectProperties.getDataset(),
GCSObjectProperties.getTable(),
GCSObjectProperties.getTable());
log.info("source URI is: {}", sourceUri);

try {

// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests.
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

FormatOptions format = FormatOptions.of(fileFormat);

ExternalTableDefinition externalTable =
ExternalTableDefinition.newBuilder(sourceUri, format).build();

log.info("external table config: {}", externalTable);

String query =
String.format(
"INSERT INTO `%s.%s.%s` SELECT * FROM %s",
GCSObjectProperties.getProject(),
GCSObjectProperties.getDataset(),
GCSObjectProperties.getTable(),
EXTERNAL_TABLE_NAME);
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(query)
.addTableDefinition(EXTERNAL_TABLE_NAME, externalTable)
.build();
log.info("query fired: {}", query);
JobInfo jobInfo = JobInfo.of(queryConfig);
Job job = bigquery.create(jobInfo);
JobId jobId = job.getJobId();
log.info("job id: {}", jobId);

} catch (Exception e) {
throw new RuntimeException("Exception occured during insertion to BQ", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.example.cloudrun;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.extern.log4j.Log4j2;

@Log4j2
public class BigQueryLogMetadata extends GenericMessage {

private String insertId;
private ProtoPayload protoPayload;

public BigQueryLogMetadata(
@JsonProperty("insertId") String insertId,
@JsonProperty(value = "protoPayload", required = true) ProtoPayload protoPayload) {
this.insertId = insertId;
this.protoPayload = protoPayload;
}

public String getInsertId() {
return insertId;
}

public void setInsertId(String insertId) {
this.insertId = insertId;
}

public ProtoPayload getProtoPayload() {
return protoPayload;
}

public void setProtoPayload(ProtoPayload protoPayload) {
this.protoPayload = protoPayload;
}

public static class ProtoPayload {
private String resourceName;

public ProtoPayload(
@JsonProperty(value = "resourceName", required = true) String resourceName) {
this.resourceName = resourceName;
}

public String getResourceName() {
return resourceName;
}

public void setResourceName(String resourceName) {
this.resourceName = resourceName;
}
}
}
Loading