在我之前的问题(使用Redis进行Spark结构化流动态查找)中,由于https://stackoverflow.com/users/689676/fe2s,我成功地通过mapparttions实现了redis。
我尝试使用mappartitions,但是我无法解决一个问题,即如何在迭代时到达以下代码部分中的每一行列。因为我想丰富我在Redis中保留的查找字段的行数。我发现了类似的内容,但是我如何到达数据框列并添加新列以查找Redis。对于任何帮助,我都非常感谢,谢谢。
import org.apache.spark.sql.types._
def transformRow(row: Row): Row = {
Row.fromSeq(row.toSeq ++ Array[Any]("val1", "val2"))
}
def transformRows(iter: Iterator[Row]): Iterator[Row] =
{
val redisConn =new RedisClient("xxx.xxx.xx.xxx",6379,1,Option("Secret123"))
println(redisConn.get("ModelValidityPeriodName").getOrElse(""))
//want to reach DataFrame column here
redisConn.close()
iter.map(transformRow)
}
val newSchema = StructType(raw_customer_df.schema.fields ++
Array(
StructField("ModelValidityPeriod", StringType, false),
StructField("ModelValidityPeriod2", StringType, false)
)
)
spark.sqlContext.createDataFrame(raw_customer_df.rdd.mapPartitions(transformRows), newSchema).show
迭代器iter
表示数据帧行上的迭代器。因此,如果我正确地提出了你的问题,则可以通过迭代iter
并调用来访问列值
row.getAs[Column_Type](column_name)
像这样
def transformRows(iter: Iterator[Row]): Iterator[Row] = {
val redisConn = new RedisClient("xxx.xxx.xx.xxx",6379,1,Option("Secret123"))
println(redisConn.get("ModelValidityPeriodName").getOrElse(""))
//want to reach DataFrame column here
val res = iter.map { row =>
val columnValue = row.getAs[String]("column_name")
// lookup in redis
val valueFromRedis = redisConn.get(...)
Row.fromSeq(row.toSeq ++ Array[Any](valueFromRedis))
}.toList
redisConn.close()
res.iterator
}
嗨fe2s,谢谢你的建议。该动态添加字段的架构可以做什么?
另外,在原始数据框中有嵌套的列。val columnValue = row.getAs [String](“ C.field1”)这在到达嵌套列时给出了错误
嗨@mustangc 不了解问题重排架构。至于访问嵌套列,应该可以通过以下方式实现:row.getAs [Row](“ C”)。getAs [String](“ field1”)
嗨,@ fe2s。我修复了如下解析嵌套项目的问题:var BirthPlaceCityCode = Try(row.getAs [Row](“ ObjectLevel1”).getAs [GenericRowWithSchema](“ ObjectLevel2”).getAs [Integer](“ BirthPlaceCityCode”))。getOrElse (-1).toString这样就可以了。我的主要目的是:a)来自kafka主题的ReadStream b)丰富withRedis并“添加新列” c)WriteStream到kafkatopic“具有新列”,因此,在执行WriteStream时我应该使用foreachbatch吗?特别感谢