Warm tip: This article is reproduced from serverfault.com, please click

Add values into existing nested json in a Spark DataFrame column

发布于 2020-11-27 16:05:11

Using Spark 2.3.2.

I am trying to use the values of some columns of a DataFrame and put them into an existing JSON structure. Assuming I have this DataFrame:

val testDF = Seq(("""{"foo": "bar", "meta":{"app1":{"p":"2", "o":"100"}, "app2":{"p":"5", "o":"200"}}}""", "10", "1337")).toDF("key", "p", "o")

// used as key for nested json structure
val app = "appX"

Basically, I would like to get from this column

{
  "foo": "bar",
  "meta": {
    "app1": {
      "p": "2",
      "o": "100"
    },
    "app2": {
      "p": "5",
      "o": "200"
    }
  }
}

to this:

{
  "meta": {
    "app1": {
      "p": "2",
      "o": "100"
    },
    "app2": {
      "p": "5",
      "o": "200"
    },
    "appX": {
      "p": "10",
      "o": "1337"
    }
  }
}

based on the columns p and o of the DataFrame.

I have tried:

def process(inputDF: DataFrame, appName: String): DataFrame = {
  val res = inputDF
    .withColumn(appName, to_json(expr("(p, o)")))
    .withColumn("meta", struct(get_json_object('key, "$.meta")))
    .selectExpr(s"""struct(meta.*, ${appName} as ${appName}) as myStruct""")
    .select(to_json('myStruct).as("newMeta"))

  res.show(false)
  res
}

val resultDF = process(testDF, app)

val resultString = resultDF.select("newMeta").collectAsList().get(0).getString(0)

StringContext.treatEscapes(resultString) must be ("""{"meta":{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"},"appX":{"p":"10","o":"1337"}}}""")

But this assertion is not matching as I can't

  • get the content of appX into the same level of the other two apps
  • do not know how to properly handle quotation marks, and
  • do not know how to rename "col1" into "meta".

The test fails with:

Expected :"{"[meta":{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"},"appX":{"p":"10","o":"1337"}}]}"
Actual   :"{"[col1":"{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"}}","appX":"{"p":"10","o":"1337"}"]}"
Questioner
mike
Viewed
0
Srinivas 2020-12-03 00:37:24
  1. Extract meta content
  2. Convert p,o column into map data type. map(lit(appX),struct($"p",$"o"))
  3. Then use map_concat function to concat data.

Check below code.

scala> testDF.show(false)
+---------------------------------------------------------------------------------+---+----+
|key                                                                              |p  |o   |
+---------------------------------------------------------------------------------+---+----+
|{"foo": "bar", "meta":{"app1":{"p":"2", "o":"100"}, "app2":{"p":"5", "o":"200"}}}|10 |1337|
+---------------------------------------------------------------------------------+---+----+

Create schema to convert string to json.

scala> val schema = new StructType().add("foo",StringType).add("meta",MapType(StringType,new StructType().add("p",StringType).add("o",StringType)))

Print Schema

scala> schema.printTreeString
root
 |-- foo: string (nullable = true)
 |-- meta: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- p: string (nullable = true)
 |    |    |-- o: string (nullable = true)
val appX = "appX"

testDF
.withColumn("key",from_json($"key",schema)) // convert json string to json using predefined schema.
.withColumn(
    "key",
    struct(
        $"key.foo", // foo value from key column.
        map_concat(
            $"key.meta", // extracting meta from key column.
            map(
                lit(appX), // Constant appX value
                struct($"p",$"o") // wrapping p, o values into struct.
            ) // converting appX,p,o into map(appX -> (p,o))
        )
        .as("meta") // giving alias to match existing meta in key.
    ) // using struct to combine foo, meta columns.
)
.select(to_json(struct($"key")).as("json_data")) // converting key value into json format.
.show(false)

Final Output

+-----------------------------------------------------------------------------------------------------------------+
|json_data                                                                                                        |
+-----------------------------------------------------------------------------------------------------------------+
|{"key":{"foo":"bar","meta":{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"},"appX":{"p":"10","o":"1337"}}}}|
+-----------------------------------------------------------------------------------------------------------------+

Spark Version >= 2.4.0

With UDF & Case class help.

Define Case class to hold p & o column values

scala> case class PO(p:String,o:String)

Define UDF to concat map.

scala> val map_concat = udf((mp:Map[String,PO],mpa:Map[String,PO]) => mp ++ mpa)
scala> df
.withColumn("key",from_json($"key",schema))
.withColumn(
    "key",
    to_json(
        struct(
            $"key.foo",
            map_concat(
                $"key.meta",
                map(
                    lit(app),
                    struct($"p",$"o")
                )
            ).as("meta")
        )
    )
)
.show(false)

Final Output

+-------------------------------------------+---+----+---------------------------------------------------------------------------------------------------------+
|key                                        |p  |o   |newMap                                                                                                   |
+-------------------------------------------+---+----+---------------------------------------------------------------------------------------------------------+
|[bar,Map(app1 -> [2,100], app2 -> [5,200])]|10 |1337|{"foo":"bar","meta":{"app1":{"p":"2","o":"100"},"app2":{"p":"5","o":"200"},"appX":{"p":"10","o":"1337"}}}|
+-------------------------------------------+---+----+---------------------------------------------------------------------------------------------------------+