Skip to content

Commit 041ae06

Browse files
Add integration test for Magnolify TableRowType compatibility (#5818)
Co-authored-by: Kellen Dye <[email protected]>
1 parent 1925d8b commit 041ae06

File tree

3 files changed

+213
-1
lines changed

3 files changed

+213
-1
lines changed

build.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1924,6 +1924,7 @@ lazy val integration = project
19241924
"com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersVersion % Test,
19251925
"com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion % Test,
19261926
"com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion % Test,
1927+
"com.spotify" %% "magnolify-bigquery" % magnolifyVersion % Test,
19271928
"com.spotify" %% "magnolify-datastore" % magnolifyVersion % Test,
19281929
"org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion % Test
19291930
)
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/*
2+
* Copyright 2025 Spotify AB.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing,
11+
* software distributed under the License is distributed on an
12+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
13+
* KIND, either express or implied. See the License for the
14+
* specific language governing permissions and limitations
15+
* under the License.
16+
*/
17+
18+
package com.spotify.scio.bigquery
19+
20+
import com.spotify.scio.bigquery.BigQueryTypedTable.Format
21+
import com.spotify.scio.bigquery.client.BigQuery
22+
import com.spotify.scio.bigquery.validation.SetProperty
23+
import com.spotify.scio.coders.Coder
24+
import com.spotify.scio.testing._
25+
import magnolify.bigquery._
26+
import magnolify.bigquery.unsafe._
27+
import magnolify.scalacheck.auto._
28+
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{Method => WriteMethod}
29+
import org.apache.beam.sdk.options.PipelineOptionsFactory
30+
import org.scalacheck._
31+
import org.scalatest.BeforeAndAfterAll
32+
33+
import java.time.format.DateTimeFormatter
34+
import java.time._
35+
36+
object MagnolifyBigQueryIT {
37+
case class Nested(int: Int)
38+
39+
case class Record(
40+
bool: Boolean,
41+
long: Long,
42+
double: Double,
43+
numeric: BigDecimal,
44+
string: String,
45+
timestamp: Instant,
46+
date: LocalDate,
47+
time: LocalTime,
48+
datetime: LocalDateTime,
49+
nestedRequired: Nested,
50+
nestedOptional: Option[Nested]
51+
)
52+
53+
def arbBigDecimal(precision: Int, scale: Int): Arbitrary[BigDecimal] = Arbitrary {
54+
val max = BigInt(10).pow(precision) - 1
55+
Gen.choose(-max, max).map(BigDecimal(_, scale))
56+
}
57+
58+
implicit val arbNumeric: Arbitrary[BigDecimal] =
59+
arbBigDecimal(Numeric.MaxNumericPrecision, Numeric.MaxNumericScale)
60+
implicit val arbString: Arbitrary[String] = Arbitrary(Gen.alphaStr)
61+
62+
// Workaround for millis rounding error
63+
val genInstant: Gen[Instant] =
64+
Gen.chooseNum[Long](0L, 1000000000000L).map(x => x / 1000 * 1000).map(Instant.ofEpochMilli)
65+
implicit val arbInstant: Arbitrary[Instant] = Arbitrary(genInstant)
66+
implicit val arbDate: Arbitrary[LocalDate] = Arbitrary(
67+
genInstant.map(_.atZone(ZoneOffset.UTC).toLocalDate)
68+
)
69+
implicit val arbTime: Arbitrary[LocalTime] = Arbitrary(
70+
genInstant.map(_.atZone(ZoneOffset.UTC).toLocalTime)
71+
)
72+
implicit val arbDatetime: Arbitrary[LocalDateTime] = Arbitrary(
73+
genInstant.map(_.atZone(ZoneOffset.UTC).toLocalDateTime)
74+
)
75+
76+
private def table(name: String) = {
77+
val formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss")
78+
val now = Instant.now().atZone(ZoneOffset.UTC).toLocalDateTime.format(formatter)
79+
val spec =
80+
s"data-integration-test:bigquery_avro_it.magnolify_$name${now}"
81+
Table.Spec(spec)
82+
}
83+
84+
def sample[T](gen: Gen[T]): Seq[T] = Gen.listOfN(5, gen).sample.get
85+
86+
val records = sample(implicitly[Arbitrary[Record]].arbitrary)
87+
88+
private val options = PipelineOptionsFactory
89+
.fromArgs(
90+
"--project=data-integration-test",
91+
"--tempLocation=gs://data-integration-test-eu/temp"
92+
)
93+
.create()
94+
}
95+
96+
class MagnolifyBigQueryIT extends PipelineSpec with BeforeAndAfterAll {
97+
import MagnolifyBigQueryIT._
98+
99+
private val bq = BigQuery.defaultInstance()
100+
101+
override def beforeAll(): Unit = {
102+
// We need this at runtime as well and tests are run in a fork
103+
SetProperty.setSystemProperty()
104+
()
105+
}
106+
107+
override protected def afterAll(): Unit = {
108+
Option(
109+
bq.client
110+
.execute(_.tables().list("data-integration-test", "bigquery_avro_it"))
111+
.getTables
112+
).foreach(_.forEach(t => bq.tables.delete(t.getTableReference)))
113+
}
114+
115+
def testRoundtrip[T: Coder: TableRowType](
116+
writeMethod: WriteMethod,
117+
readStorageApi: Boolean
118+
)(rows: Seq[T]): Unit = {
119+
val trt = implicitly[TableRowType[T]]
120+
val tableRef = table(s"${writeMethod}_storageAPI_${readStorageApi}".toLowerCase)
121+
122+
runWithRealContext(options) { sc =>
123+
sc
124+
.parallelize(rows)
125+
.map(trt.to)
126+
.saveAsBigQueryTable(
127+
tableRef,
128+
schema = trt.schema,
129+
createDisposition = CREATE_IF_NEEDED,
130+
method = writeMethod
131+
)
132+
}.waitUntilFinish()
133+
134+
runWithRealContext(options) { sc =>
135+
val data = if (readStorageApi) {
136+
sc
137+
.bigQueryStorage(tableRef)
138+
.map(trt.from)
139+
} else {
140+
sc
141+
.bigQueryTable(tableRef, Format.TableRow)
142+
.map(trt.from)
143+
}
144+
data should containInAnyOrder(rows)
145+
}
146+
}
147+
148+
"MagnolifyBigQuery" should "write case classes using FileLoads API and read using Extract API" in {
149+
testRoundtrip(WriteMethod.FILE_LOADS, readStorageApi = false)(records)
150+
}
151+
152+
it should "write case classes using FileLoads API and read using Storage API" in {
153+
testRoundtrip(WriteMethod.FILE_LOADS, readStorageApi = true)(records)
154+
}
155+
156+
it should "read case classes written using legacy BigQueryType" in {
157+
val tableRef = table("bqt_compat")
158+
val records: Seq[TypedBigQueryIT.Record] = TypedBigQueryIT.records
159+
160+
runWithRealContext(options) { sc =>
161+
sc
162+
.parallelize(records)
163+
.saveAsTypedBigQueryTable(
164+
tableRef,
165+
createDisposition = CREATE_IF_NEEDED
166+
)
167+
}.waitUntilFinish()
168+
169+
runWithRealContext(options) { sc =>
170+
import org.joda.time.{DateTimeFieldType, DateTimeZone}
171+
172+
val trt = TableRowType[Record]
173+
val data = sc
174+
.bigQueryStorage(tableRef)
175+
.map(trt.from)
176+
177+
// We're comparing two different Record classes; convert fields to comparable types
178+
data.map(r =>
179+
(
180+
r.bool,
181+
r.long,
182+
r.double,
183+
r.numeric,
184+
r.string,
185+
r.timestamp.toEpochMilli,
186+
r.date.atStartOfDay().toInstant(ZoneOffset.UTC).toEpochMilli,
187+
r.time.toSecondOfDay,
188+
r.datetime.toInstant(ZoneOffset.UTC).toEpochMilli,
189+
r.nestedRequired.int,
190+
r.nestedOptional.map(_.int)
191+
)
192+
) should containInAnyOrder(
193+
records.map(r =>
194+
(
195+
r.bool,
196+
r.long,
197+
r.double,
198+
r.numeric,
199+
r.string,
200+
r.timestamp.toInstant.getMillis,
201+
r.date.toDateTimeAtStartOfDay(DateTimeZone.UTC).getMillis,
202+
r.time.get(DateTimeFieldType.secondOfDay()),
203+
r.datetime.toDateTime(DateTimeZone.UTC).getMillis,
204+
r.nestedRequired.int,
205+
r.nestedOptional.map(_.int)
206+
)
207+
)
208+
)
209+
}
210+
}
211+
}

scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/validation/SetProperty.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.reflect.macros.blackbox
2222

2323
// This shouldn't be necessary in most production use cases. However passing System properties from
2424
// Intellij can cause issues. The ideal place to set this system property is in your build.sbt file.
25-
private[validation] object SetProperty {
25+
private[scio] object SetProperty {
2626
@compileTimeOnly(
2727
"enable macro paradise (2.12) or -Ymacro-annotations (2.13) to expand macro annotations"
2828
)

0 commit comments

Comments
 (0)