diff --git a/build.sbt b/build.sbt index cfd3355c4b..10c8420341 100644 --- a/build.sbt +++ b/build.sbt @@ -419,6 +419,12 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq( ProblemFilters.exclude[MissingTypesProblem]( "com.spotify.scio.bigquery.types.package$Json$" ), + ProblemFilters.exclude[IncompatibleMethTypeProblem]( + "com.spotify.scio.bigquery.types.package#Json.apply" + ), + ProblemFilters.exclude[IncompatibleResultTypeProblem]( + "com.spotify.scio.bigquery.types.package#Json.parse" + ), // tf-metadata upgrade ProblemFilters.exclude[Problem]( "org.tensorflow.metadata.v0.*" @@ -1017,6 +1023,7 @@ lazy val `scio-google-cloud-platform` = project libraryDependencies ++= Seq( // compile "com.esotericsoftware" % "kryo-shaded" % kryoVersion, + "com.fasterxml.jackson.core" % "jackson-core" % jacksonVersion, "com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion, "com.fasterxml.jackson.datatype" % "jackson-datatype-joda" % jacksonVersion, "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % jacksonVersion, diff --git a/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala b/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala index 8c53c34b34..12fc0a2221 100644 --- a/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala +++ b/integration/src/test/scala/com/spotify/scio/bigquery/TypedBigQueryIT.scala @@ -79,9 +79,12 @@ object TypedBigQueryIT { ) implicit val arbJson: Arbitrary[Json] = Arbitrary( for { - key <- Gen.alphaStr + isArray <- Arbitrary.arbBool.arbitrary + // f is a key field from TableRow. It cannot be used as column name + // see https://github.com/apache/beam/issues/33531 + key <- Gen.alphaStr.retryUntil(_ != "f") value <- Gen.alphaStr - } yield Json(s"""{"$key":"$value"}""") + } yield Json(if (isArray) s"""["$key","$value"]""" else s"""{"$key":"$value"}""") ) implicit val arbBigNumeric: Arbitrary[BigNumeric] = Arbitrary { diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/TableRowSyntax.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/TableRowSyntax.scala index 69283fdf0b..0d8e6bada1 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/TableRowSyntax.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/syntax/TableRowSyntax.scala @@ -121,10 +121,11 @@ object TableRowOps { } def json(value: AnyRef): Json = value match { - case x: Json => x - case x: TableRow => Json(x) - case x: String => Json(x) - case _ => throw new UnsupportedOperationException("Cannot convert to json: " + value) + case x: Json => x + case x: java.util.Map[_, _] => Json(x) + case x: java.util.List[_] => Json(x) + case x: String => Json(x) + case _ => throw new UnsupportedOperationException("Cannot convert to json: " + value) } def bignumeric(value: AnyRef): BigNumeric = value match { diff --git a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala index 687c2f17d5..4a3e909667 100644 --- a/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala +++ b/scio-google-cloud-platform/src/main/scala/com/spotify/scio/bigquery/types/package.scala @@ -28,6 +28,7 @@ import org.typelevel.scalaccompat.annotation.nowarn import java.math.MathContext import java.nio.ByteBuffer import scala.annotation.StaticAnnotation + package object types { /** @@ -72,8 +73,17 @@ package object types { .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS) .disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); - def apply(row: TableRow): Json = Json(mapper.writeValueAsString(row)) - def parse(json: Json): TableRow = mapper.readValue(json.wkt, classOf[TableRow]) + def apply(value: AnyRef): Json = Json(mapper.writeValueAsString(value)) + def parse(json: Json): AnyRef = { + val node = mapper.readTree(json.wkt) + if (node.isObject) { + mapper.treeToValue(node, classOf[java.util.Map[_, _]]) + } else if (node.isArray) { + mapper.treeToValue(node, classOf[java.util.List[_]]) + } else { + throw new IllegalArgumentException(s"Invalid json ${json.wkt}") + } + } } /** diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala index 580fb6f092..a37326a08c 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderSpec.scala @@ -53,11 +53,12 @@ final class ConverterProviderSpec } implicit val arbJson: Arbitrary[Json] = Arbitrary( for { + isArray <- Arbitrary.arbBool.arbitrary // f is a key field from TableRow. It cannot be used as column name // see https://github.com/apache/beam/issues/33531 key <- Gen.alphaStr.retryUntil(_ != "f") value <- Gen.alphaStr - } yield Json(s"""{"$key":"$value"}""") + } yield Json(if (isArray) s"""["$key","$value"]""" else s"""{"$key":"$value"}""") ) implicit val eqByteArrays: Eq[Array[Byte]] = Eq.instance[Array[Byte]](_.toList == _.toList) implicit val eqByteString: Eq[ByteString] = Eq.instance[ByteString](_ == _) diff --git a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderTest.scala b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderTest.scala index 1df7580117..614dcb2a3b 100644 --- a/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderTest.scala +++ b/scio-google-cloud-platform/src/test/scala/com/spotify/scio/bigquery/types/ConverterProviderTest.scala @@ -23,6 +23,8 @@ import org.joda.time.{Instant, LocalDate, LocalDateTime, LocalTime} import org.scalatest.matchers.should.Matchers import org.scalatest.flatspec.AnyFlatSpec +import scala.jdk.CollectionConverters._ + class ConverterProviderTest extends AnyFlatSpec with Matchers { import ConverterProviderTest._ @@ -49,13 +51,42 @@ class ConverterProviderTest extends AnyFlatSpec with Matchers { } it should "handle required json type" in { - val wkt = """{"name":"Alice","age":30}""" - val parsed = new TableRow() - .set("name", "Alice") - .set("age", 30) + // JSON object + { + val wkt = """{"name":"Alice","age":30,"address":{"street":"Broadway","city":"New York"}}""" + val parsed = Map( + "name" -> "Alice", + "age" -> 30, + "address" -> Map( + "street" -> "Broadway", + "city" -> "New York" + ).asJava + ).asJava + + RequiredJson.fromTableRow(TableRow("a" -> parsed)) shouldBe RequiredJson(Json(wkt)) + BigQueryType.toTableRow[RequiredJson](RequiredJson(Json(wkt))) shouldBe TableRow( + "a" -> parsed + ) + } + + // JSON array + { + val wkt = """["Alice",30,{"street":"Broadway","city":"New York"}]""" + val parsed = List( + "Alice", + 30, + Map( + "street" -> "Broadway", + "city" -> "New York" + ).asJava + ).asJava + + RequiredJson.fromTableRow(TableRow("a" -> parsed)) shouldBe RequiredJson(Json(wkt)) + BigQueryType.toTableRow[RequiredJson](RequiredJson(Json(wkt))) shouldBe TableRow( + "a" -> parsed + ) + } - RequiredJson.fromTableRow(TableRow("a" -> parsed)) shouldBe RequiredJson(Json(wkt)) - BigQueryType.toTableRow[RequiredJson](RequiredJson(Json(wkt))) shouldBe TableRow("a" -> parsed) } it should "handle required big numeric type" in {