温馨提示:本文翻译自stackoverflow.com,查看原文请点击:其他 - Convert org.apache.avro.generic.GenericRecord to org.apache.spark.sql.Row

其他 - 将org.apache.avro.generic.GenericRecord转换为org.apache.spark.sql.Row

发布于 2020-03-27 15:49:52

我有一个列表org.apache.avro.generic.GenericRecordavro schema使用这个我们需要创建dataframe的帮助下SQLContextAPI,创建dataframe它需要RDDorg.apache.spark.sql.Rowavro schema创建DF的先决条件是我们应该具有org.apache.spark.sql.Row的RDD,可以使用以下代码来实现它,但是有些代码无法正常工作并给出错误和示例代码。

 1. Convert GenericRecord to Row
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
    import org.apache.avro.Schema
    import org.apache.spark.sql.types.StructType
    def convertGenericRecordToRow(genericRecords: Seq[GenericRecord], avroSchema: Schema, schemaType: StructType): Seq[Row] =
    {
      val fields = avroSchema.getFields
      var rows = new Seq[Row]
      for (avroRecord <- genericRecords) {
        var avroFieldsSeq = Seq[Any]();
        for (i <- 0 to fields.size - 1) {
          avroFieldsSeq = avroFieldsSeq :+avroRecord.get(fields.get(i).name)
        }
        val avroFieldArr = avroFieldsSeq.toArray
        val genericRow = new GenericRowWithSchema(avroFieldArr, schemaType)
        rows = rows :+ genericRow
      }
      return rows;
    }

2. Convert `Avro schema` to `Structtype`
   Use `com.databricks.spark.avro.SchemaConverters -> toSqlType` function , it will convert avro schema to StructType

3. Create `Dataframe` using `SQLContext`
   val rowSeq= convertGenericRecordToRow(genericRecords, avroSchema, schemaType)
   val rowRdd = sc.parallelize(rowSeq, 1)
   val finalDF =sqlContext.createDataFrame(rowRDD,structType)

但这会在创建时引发错误DataFrame有人可以帮我上面代码中的错误吗。除此之外,如果有人对的转换和创建具有不同的逻辑dataframe

每当我在Dataframe上调用任何操作时,它都会执行DAG并尝试创建DF对象,但是在此操作中,出现以下异常而失败

 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
 Error :Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, hdpoc-c01-r06-01, executor 1): java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; local class incompatible: stream classdesc serialVersionUID = 2, local class serialVersionUID = 1
                        at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
                        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)

在此之后,我想在spark提交的jar参数中提供正确的版本jar,并使用--conf spark.driver.userClassPathFirst = true的其他参数,但是现在MapR失败了

ERROR CLDBRpcCommonUtils: Exception during init
java.lang.UnsatisfiedLinkError: com.mapr.security.JNISecurity.SetClusterOption(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)
                    at com.mapr.security.JNISecurity.SetClusterOption(Native Method)
                    at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.init(CLDBRpcCommonUtils.java:163)
                    at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<init>(CLDBRpcCommonUtils.java:73)
                    at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<clinit>(CLDBRpcCommonUtils.java:63)
                    at org.apache.hadoop.conf.CoreDefaultProperties.<clinit>(CoreDefaultProperties.java:69)
                    at java.lang.Class.forName0(Native Method)

我们正在使用MapR分配,并且在spark-submit中更改了类路径之后,由于上述异常而失败。

有人可以在这里提供帮助吗,或者我的基本需要将Avro GenericRecord转换为Spark Row,以便我可以用它创建Dataframe,请帮助
谢谢。

查看更多

查看更多

提问者
Sagar balai
被浏览
109
Sagar balai 2018-03-28 13:25

从RDD [GenericRecord]创建数据框时,只需执行几个步骤

  1. 首先需要将org.apache.avro.generic.GenericRecord转换为org.apache.spark.sql.Row

使用com.databricks.spark.avro.SchemaConverters.createConverterToSQL(sourceAvroSchema:Schema,targetSqlType:DataType)

这是spark-avro 3.2版本中的私有方法。如果我们等于或小于3.2,则将此方法复制到您自己的util类中并使用它,否则直接使用它。

  1. 从行(rowSeq)的集合创建数据框。

val rdd = ssc.sparkContext.parallelize(rowSeq,numParition)val dataframe = sparkSession.createDataFrame(rowRDD,schemaType)

这解决了我的问题。