我有一个列表org.apache.avro.generic.GenericRecord
,avro schema
使用这个我们需要创建dataframe
的帮助下SQLContext
API,创建dataframe
它需要RDD
的org.apache.spark.sql.Row
和avro schema
。创建DF的先决条件是我们应该具有org.apache.spark.sql.Row的RDD,可以使用以下代码来实现它,但是有些代码无法正常工作并给出错误和示例代码。
1. Convert GenericRecord to Row
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.avro.Schema
import org.apache.spark.sql.types.StructType
def convertGenericRecordToRow(genericRecords: Seq[GenericRecord], avroSchema: Schema, schemaType: StructType): Seq[Row] =
{
val fields = avroSchema.getFields
var rows = new Seq[Row]
for (avroRecord <- genericRecords) {
var avroFieldsSeq = Seq[Any]();
for (i <- 0 to fields.size - 1) {
avroFieldsSeq = avroFieldsSeq :+avroRecord.get(fields.get(i).name)
}
val avroFieldArr = avroFieldsSeq.toArray
val genericRow = new GenericRowWithSchema(avroFieldArr, schemaType)
rows = rows :+ genericRow
}
return rows;
}
2. Convert `Avro schema` to `Structtype`
Use `com.databricks.spark.avro.SchemaConverters -> toSqlType` function , it will convert avro schema to StructType
3. Create `Dataframe` using `SQLContext`
val rowSeq= convertGenericRecordToRow(genericRecords, avroSchema, schemaType)
val rowRdd = sc.parallelize(rowSeq, 1)
val finalDF =sqlContext.createDataFrame(rowRDD,structType)
但这会在创建时引发错误DataFrame
。有人可以帮我上面代码中的错误吗。除此之外,如果有人对的转换和创建具有不同的逻辑dataframe
。
每当我在Dataframe上调用任何操作时,它都会执行DAG并尝试创建DF对象,但是在此操作中,出现以下异常而失败
ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
Error :Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, hdpoc-c01-r06-01, executor 1): java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; local class incompatible: stream classdesc serialVersionUID = 2, local class serialVersionUID = 1
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
在此之后,我想在spark提交的jar参数中提供正确的版本jar,并使用--conf spark.driver.userClassPathFirst = true的其他参数,但是现在MapR失败了
ERROR CLDBRpcCommonUtils: Exception during init
java.lang.UnsatisfiedLinkError: com.mapr.security.JNISecurity.SetClusterOption(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)
at com.mapr.security.JNISecurity.SetClusterOption(Native Method)
at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.init(CLDBRpcCommonUtils.java:163)
at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<init>(CLDBRpcCommonUtils.java:73)
at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<clinit>(CLDBRpcCommonUtils.java:63)
at org.apache.hadoop.conf.CoreDefaultProperties.<clinit>(CoreDefaultProperties.java:69)
at java.lang.Class.forName0(Native Method)
我们正在使用MapR分配,并且在spark-submit中更改了类路径之后,由于上述异常而失败。
有人可以在这里提供帮助吗,或者我的基本需要将Avro GenericRecord转换为Spark Row,以便我可以用它创建Dataframe,请帮助
谢谢。
从RDD [GenericRecord]创建数据框时,只需执行几个步骤
使用com.databricks.spark.avro.SchemaConverters.createConverterToSQL(sourceAvroSchema:Schema,targetSqlType:DataType)
这是spark-avro 3.2版本中的私有方法。如果我们等于或小于3.2,则将此方法复制到您自己的util类中并使用它,否则直接使用它。
val rdd = ssc.sparkContext.parallelize(rowSeq,numParition)val dataframe = sparkSession.createDataFrame(rowRDD,schemaType)
这解决了我的问题。