如何使用from_json和模式作为字符串(即json编码的模式)?

时间:2022-10-24 12:41:02

I'm reading a stream from kafka, and I convert the value from kafka ( which is JSON ) in to Structure.

我正在读取kafka的流,并将kafka (JSON)的值转换为Structure。

from_json has a variant that takes a schema of type String, but I could not find a sample. Please advise what is wrong in the below code.

from_json有一个变体,它接受类型为String的模式,但我找不到示例。请告知下面的代码有什么问题。

Error

错误

Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: extraneous input '(' expecting {'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT',== SQL ==STRUCT ( `firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY ( STRUCT ( `city`: STRING, `state`: STRING, `zip`: STRING )  )  ) -------^^^at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217)

Program

程序

public static void main(String[] args) throws AnalysisException {    String master = "local[*]";    String brokers = "quickstart:9092";    String topics = "simple_topic_6";    SparkSession sparkSession = SparkSession            .builder().appName(EmployeeSchemaLoader.class.getName())            .master(master).getOrCreate();   String employeeSchema = "STRUCT ( firstName: STRING, lastName: STRING, email: STRING, " +            "addresses: ARRAY ( STRUCT ( city: STRING, state: STRING, zip: STRING )  )  ) ";    SparkContext context = sparkSession.sparkContext();    context.setLogLevel("ERROR");    SQLContext sqlCtx = sparkSession.sqlContext();    Dataset<Row> employeeDataset = sparkSession.readStream().            format("kafka").            option("kafka.bootstrap.servers", brokers)            .option("subscribe", topics).load();    employeeDataset.printSchema();    employeeDataset = employeeDataset.withColumn("strValue", employeeDataset.col("value").cast("string"));    employeeDataset = employeeDataset.withColumn("employeeRecord",            functions.from_json(employeeDataset.col("strValue"),employeeSchema, new HashMap<>()));    employeeDataset.printSchema();    employeeDataset.createOrReplaceTempView("employeeView");    sparkSession.catalog().listTables().show();    sqlCtx.sql("select * from employeeView").show();}

1 个解决方案

#1


4  

Your question helped me to find that the variant of from_json with String-based schema was only available in Java and has recently been added to Spark API for Scala in the upcoming 2.3.0. I've so long lived with the strong belief that Spark API for Scala was always the most feature-rich and your question helped me to learn it should not have been so before the change in 2.3.0 (!)

您的问题帮助我发现,from_json与基于字符串的模式的变体仅在Java中可用,并且最近在即将到来的2.3.0中被添加到Scala的Spark API中。我一直坚信Scala的Spark API是最富特性的,您的问题帮助我认识到在2.3.0 (!

Back to your question, you can define the string-based schema in JSON or DDL format actually.

回到您的问题,实际上您可以用JSON或DDL格式定义基于字符串的模式。

Writing JSON by hand may be a bit cumbersome and so I'd take a different approach (that given I'm a Scala developer is fairly easy).

手工编写JSON可能有点麻烦,因此我将采用另一种方法(考虑到我是Scala开发人员,这相当容易)。

Let's first define the schema using Spark API for Scala.

让我们首先使用Spark API为Scala定义模式。

import org.apache.spark.sql.types._val addressesSchema = new StructType()  .add($"city".string)  .add($"state".string)  .add($"zip".string)val schema = new StructType()  .add($"firstName".string)  .add($"lastName".string)  .add($"email".string)  .add($"addresses".array(addressesSchema))scala> schema.printTreeStringroot |-- firstName: string (nullable = true) |-- lastName: string (nullable = true) |-- email: string (nullable = true) |-- addresses: array (nullable = true) |    |-- element: struct (containsNull = true) |    |    |-- city: string (nullable = true) |    |    |-- state: string (nullable = true) |    |    |-- zip: string (nullable = true)

That seems to match your schema, doesn't it?

这似乎符合您的模式,不是吗?

With that convert the schema to a JSON-encoded string was a breeze with json method.

将模式转换为json编码的字符串是一种轻松的json方法。

val schemaAsJson = schema.json

schemaAsJson is exactly your JSON string which looks pretty...hmmm...complex. For the display purposes I'd rather use prettyJson method.

schemaAsJson就是您的JSON字符串,看起来非常……嗯……复杂。出于显示目的,我宁愿使用prettyJson方法。

scala> println(schema.prettyJson){  "type" : "struct",  "fields" : [ {    "name" : "firstName",    "type" : "string",    "nullable" : true,    "metadata" : { }  }, {    "name" : "lastName",    "type" : "string",    "nullable" : true,    "metadata" : { }  }, {    "name" : "email",    "type" : "string",    "nullable" : true,    "metadata" : { }  }, {    "name" : "addresses",    "type" : {      "type" : "array",      "elementType" : {        "type" : "struct",        "fields" : [ {          "name" : "city",          "type" : "string",          "nullable" : true,          "metadata" : { }        }, {          "name" : "state",          "type" : "string",          "nullable" : true,          "metadata" : { }        }, {          "name" : "zip",          "type" : "string",          "nullable" : true,          "metadata" : { }        } ]      },      "containsNull" : true    },    "nullable" : true,    "metadata" : { }  } ]}

That's your schema in JSON.

这是JSON格式的模式。

You can use DataType and "validate" the JSON string (using DataType.fromJson that Spark uses under the covers for from_json).

您可以使用数据类型和“验证”JSON字符串(使用Spark在from_json的掩护下使用的DataType. fromjson)。

import org.apache.spark.sql.types.DataTypeval dt = DataType.fromJson(schemaAsJson)scala> println(dt.sql)STRUCT<`firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY<STRUCT<`city`: STRING, `state`: STRING, `zip`: STRING>>>

All seems fine. Mind if I'm checking this out with a sample dataset?

所有看起来很好。介意我用一个样本数据集检查一下吗?

val rawJsons = Seq("""  {    "firstName" : "Jacek",    "lastName" : "Laskowski",    "email" : "jacek@japila.pl",    "addresses" : [      {        "city" : "Warsaw",        "state" : "N/A",        "zip" : "02-791"      }    ]  }""").toDF("rawjson")val people = rawJsons  .select(from_json($"rawjson", schemaAsJson, Map.empty[String, String]) as "json")  .select("json.*") // <-- flatten the struct field  .withColumn("address", explode($"addresses")) // <-- explode the array field  .drop("addresses")  // <-- no longer needed  .select("firstName", "lastName", "email", "address.*") // <-- flatten the struct fieldscala> people.show+---------+---------+---------------+------+-----+------+|firstName| lastName|          email|  city|state|   zip|+---------+---------+---------------+------+-----+------+|    Jacek|Laskowski|jacek@japila.pl|Warsaw|  N/A|02-791|+---------+---------+---------------+------+-----+------+

#1


4  

Your question helped me to find that the variant of from_json with String-based schema was only available in Java and has recently been added to Spark API for Scala in the upcoming 2.3.0. I've so long lived with the strong belief that Spark API for Scala was always the most feature-rich and your question helped me to learn it should not have been so before the change in 2.3.0 (!)

您的问题帮助我发现,from_json与基于字符串的模式的变体仅在Java中可用,并且最近在即将到来的2.3.0中被添加到Scala的Spark API中。我一直坚信Scala的Spark API是最富特性的,您的问题帮助我认识到在2.3.0 (!

Back to your question, you can define the string-based schema in JSON or DDL format actually.

回到您的问题,实际上您可以用JSON或DDL格式定义基于字符串的模式。

Writing JSON by hand may be a bit cumbersome and so I'd take a different approach (that given I'm a Scala developer is fairly easy).

手工编写JSON可能有点麻烦,因此我将采用另一种方法(考虑到我是Scala开发人员,这相当容易)。

Let's first define the schema using Spark API for Scala.

让我们首先使用Spark API为Scala定义模式。

import org.apache.spark.sql.types._val addressesSchema = new StructType()  .add($"city".string)  .add($"state".string)  .add($"zip".string)val schema = new StructType()  .add($"firstName".string)  .add($"lastName".string)  .add($"email".string)  .add($"addresses".array(addressesSchema))scala> schema.printTreeStringroot |-- firstName: string (nullable = true) |-- lastName: string (nullable = true) |-- email: string (nullable = true) |-- addresses: array (nullable = true) |    |-- element: struct (containsNull = true) |    |    |-- city: string (nullable = true) |    |    |-- state: string (nullable = true) |    |    |-- zip: string (nullable = true)

That seems to match your schema, doesn't it?

这似乎符合您的模式,不是吗?

With that convert the schema to a JSON-encoded string was a breeze with json method.

将模式转换为json编码的字符串是一种轻松的json方法。

val schemaAsJson = schema.json

schemaAsJson is exactly your JSON string which looks pretty...hmmm...complex. For the display purposes I'd rather use prettyJson method.

schemaAsJson就是您的JSON字符串,看起来非常……嗯……复杂。出于显示目的,我宁愿使用prettyJson方法。

scala> println(schema.prettyJson){  "type" : "struct",  "fields" : [ {    "name" : "firstName",    "type" : "string",    "nullable" : true,    "metadata" : { }  }, {    "name" : "lastName",    "type" : "string",    "nullable" : true,    "metadata" : { }  }, {    "name" : "email",    "type" : "string",    "nullable" : true,    "metadata" : { }  }, {    "name" : "addresses",    "type" : {      "type" : "array",      "elementType" : {        "type" : "struct",        "fields" : [ {          "name" : "city",          "type" : "string",          "nullable" : true,          "metadata" : { }        }, {          "name" : "state",          "type" : "string",          "nullable" : true,          "metadata" : { }        }, {          "name" : "zip",          "type" : "string",          "nullable" : true,          "metadata" : { }        } ]      },      "containsNull" : true    },    "nullable" : true,    "metadata" : { }  } ]}

That's your schema in JSON.

这是JSON格式的模式。

You can use DataType and "validate" the JSON string (using DataType.fromJson that Spark uses under the covers for from_json).

您可以使用数据类型和“验证”JSON字符串(使用Spark在from_json的掩护下使用的DataType. fromjson)。

import org.apache.spark.sql.types.DataTypeval dt = DataType.fromJson(schemaAsJson)scala> println(dt.sql)STRUCT<`firstName`: STRING, `lastName`: STRING, `email`: STRING, `addresses`: ARRAY<STRUCT<`city`: STRING, `state`: STRING, `zip`: STRING>>>

All seems fine. Mind if I'm checking this out with a sample dataset?

所有看起来很好。介意我用一个样本数据集检查一下吗?

val rawJsons = Seq("""  {    "firstName" : "Jacek",    "lastName" : "Laskowski",    "email" : "jacek@japila.pl",    "addresses" : [      {        "city" : "Warsaw",        "state" : "N/A",        "zip" : "02-791"      }    ]  }""").toDF("rawjson")val people = rawJsons  .select(from_json($"rawjson", schemaAsJson, Map.empty[String, String]) as "json")  .select("json.*") // <-- flatten the struct field  .withColumn("address", explode($"addresses")) // <-- explode the array field  .drop("addresses")  // <-- no longer needed  .select("firstName", "lastName", "email", "address.*") // <-- flatten the struct fieldscala> people.show+---------+---------+---------------+------+-----+------+|firstName| lastName|          email|  city|state|   zip|+---------+---------+---------------+------+-----+------+|    Jacek|Laskowski|jacek@japila.pl|Warsaw|  N/A|02-791|+---------+---------+---------------+------+-----+------+