Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support BQ Json arrays #5544

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,12 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
ProblemFilters.exclude[MissingTypesProblem](
"com.spotify.scio.bigquery.types.package$Json$"
),
ProblemFilters.exclude[IncompatibleMethTypeProblem](
"com.spotify.scio.bigquery.types.package#Json.apply"
),
ProblemFilters.exclude[IncompatibleResultTypeProblem](
"com.spotify.scio.bigquery.types.package#Json.parse"
),
// tf-metadata upgrade
ProblemFilters.exclude[Problem](
"org.tensorflow.metadata.v0.*"
Expand Down Expand Up @@ -1017,6 +1023,7 @@ lazy val `scio-google-cloud-platform` = project
libraryDependencies ++= Seq(
// compile
"com.esotericsoftware" % "kryo-shaded" % kryoVersion,
"com.fasterxml.jackson.core" % "jackson-core" % jacksonVersion,
"com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion,
"com.fasterxml.jackson.datatype" % "jackson-datatype-joda" % jacksonVersion,
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % jacksonVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,12 @@ object TypedBigQueryIT {
)
implicit val arbJson: Arbitrary[Json] = Arbitrary(
for {
key <- Gen.alphaStr
isArray <- Arbitrary.arbBool.arbitrary
// f is a key field from TableRow. It cannot be used as column name
// see https://github.com/apache/beam/issues/33531
key <- Gen.alphaStr.retryUntil(_ != "f")
value <- Gen.alphaStr
} yield Json(s"""{"$key":"$value"}""")
} yield Json(if (isArray) s"""["$key","$value"]""" else s"""{"$key":"$value"}""")
)

implicit val arbBigNumeric: Arbitrary[BigNumeric] = Arbitrary {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,11 @@ object TableRowOps {
}

def json(value: AnyRef): Json = value match {
case x: Json => x
case x: TableRow => Json(x)
case x: String => Json(x)
case _ => throw new UnsupportedOperationException("Cannot convert to json: " + value)
case x: Json => x
case x: java.util.Map[_, _] => Json(x)
case x: java.util.List[_] => Json(x)
case x: String => Json(x)
case _ => throw new UnsupportedOperationException("Cannot convert to json: " + value)
}

def bignumeric(value: AnyRef): BigNumeric = value match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.typelevel.scalaccompat.annotation.nowarn
import java.math.MathContext
import java.nio.ByteBuffer
import scala.annotation.StaticAnnotation

package object types {

/**
Expand Down Expand Up @@ -72,8 +73,17 @@ package object types {
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);

def apply(row: TableRow): Json = Json(mapper.writeValueAsString(row))
def parse(json: Json): TableRow = mapper.readValue(json.wkt, classOf[TableRow])
def apply(value: AnyRef): Json = Json(mapper.writeValueAsString(value))
def parse(json: Json): AnyRef = {
val node = mapper.readTree(json.wkt)
if (node.isObject) {
mapper.treeToValue(node, classOf[java.util.Map[_, _]])
} else if (node.isArray) {
mapper.treeToValue(node, classOf[java.util.List[_]])
} else {
throw new IllegalArgumentException(s"Invalid json ${json.wkt}")
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ final class ConverterProviderSpec
}
implicit val arbJson: Arbitrary[Json] = Arbitrary(
for {
isArray <- Arbitrary.arbBool.arbitrary
// f is a key field from TableRow. It cannot be used as column name
// see https://github.com/apache/beam/issues/33531
key <- Gen.alphaStr.retryUntil(_ != "f")
value <- Gen.alphaStr
} yield Json(s"""{"$key":"$value"}""")
} yield Json(if (isArray) s"""["$key","$value"]""" else s"""{"$key":"$value"}""")
)
implicit val eqByteArrays: Eq[Array[Byte]] = Eq.instance[Array[Byte]](_.toList == _.toList)
implicit val eqByteString: Eq[ByteString] = Eq.instance[ByteString](_ == _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.joda.time.{Instant, LocalDate, LocalDateTime, LocalTime}
import org.scalatest.matchers.should.Matchers
import org.scalatest.flatspec.AnyFlatSpec

import scala.jdk.CollectionConverters._

class ConverterProviderTest extends AnyFlatSpec with Matchers {
import ConverterProviderTest._

Expand All @@ -49,13 +51,42 @@ class ConverterProviderTest extends AnyFlatSpec with Matchers {
}

it should "handle required json type" in {
val wkt = """{"name":"Alice","age":30}"""
val parsed = new TableRow()
.set("name", "Alice")
.set("age", 30)
// JSON object
{
val wkt = """{"name":"Alice","age":30,"address":{"street":"Broadway","city":"New York"}}"""
val parsed = Map(
"name" -> "Alice",
"age" -> 30,
"address" -> Map(
"street" -> "Broadway",
"city" -> "New York"
).asJava
).asJava

RequiredJson.fromTableRow(TableRow("a" -> parsed)) shouldBe RequiredJson(Json(wkt))
BigQueryType.toTableRow[RequiredJson](RequiredJson(Json(wkt))) shouldBe TableRow(
"a" -> parsed
)
}

// JSON array
{
val wkt = """["Alice",30,{"street":"Broadway","city":"New York"}]"""
val parsed = List(
"Alice",
30,
Map(
"street" -> "Broadway",
"city" -> "New York"
).asJava
).asJava

RequiredJson.fromTableRow(TableRow("a" -> parsed)) shouldBe RequiredJson(Json(wkt))
BigQueryType.toTableRow[RequiredJson](RequiredJson(Json(wkt))) shouldBe TableRow(
"a" -> parsed
)
}

RequiredJson.fromTableRow(TableRow("a" -> parsed)) shouldBe RequiredJson(Json(wkt))
BigQueryType.toTableRow[RequiredJson](RequiredJson(Json(wkt))) shouldBe TableRow("a" -> parsed)
}

it should "handle required big numeric type" in {
Expand Down
Loading