Warm tip: This article is reproduced from serverfault.com, please click

其他-Spark Streaming到达数据框列,并添加新列以查找Redis

(其他 - Spark Streaming reach dataframe columns and add new column looking up to Redis)

发布于 2020-12-10 18:46:41

在我之前的问题(使用Redis进行Spark结构化流动态查找)中,由于https://stackoverflow.com/users/689676/fe2s,我成功地通过mapparttions实现了redis。

我尝试使用mappartitions,但是我无法解决一个问题,即如何在迭代时到达以下代码部分中的每一行列。因为我想丰富我在Redis中保留的查找字段的行数。我发现了类似的内容,但是我如何到达数据框列并添加新列以查找Redis。对于任何帮助,我都非常感谢,谢谢。

import org.apache.spark.sql.types._

def transformRow(row: Row): Row =  {
    Row.fromSeq(row.toSeq ++ Array[Any]("val1", "val2"))
}

def transformRows(iter: Iterator[Row]): Iterator[Row] =
{ 
    val redisConn =new RedisClient("xxx.xxx.xx.xxx",6379,1,Option("Secret123"))    
    println(redisConn.get("ModelValidityPeriodName").getOrElse("")) 
    //want to  reach  DataFrame column here   
    redisConn.close()
    iter.map(transformRow)     
}

val newSchema = StructType(raw_customer_df.schema.fields ++ 
    Array(
            StructField("ModelValidityPeriod", StringType, false), 
            StructField("ModelValidityPeriod2", StringType, false)
        )
  )

spark.sqlContext.createDataFrame(raw_customer_df.rdd.mapPartitions(transformRows), newSchema).show
Questioner
mustangc
Viewed
11
fe2s 2020-12-11 19:10:24

迭代器iter表示数据帧行上的迭代器。因此,如果我正确地提出了你的问题,则可以通过迭代iter并调用来访问列值

row.getAs[Column_Type](column_name)

像这样

def transformRows(iter: Iterator[Row]): Iterator[Row] = {
    val redisConn = new RedisClient("xxx.xxx.xx.xxx",6379,1,Option("Secret123"))
    println(redisConn.get("ModelValidityPeriodName").getOrElse(""))
    //want to  reach  DataFrame column here
    val res = iter.map { row =>
      val columnValue = row.getAs[String]("column_name")
      // lookup in redis
      val valueFromRedis = redisConn.get(...)
      Row.fromSeq(row.toSeq ++ Array[Any](valueFromRedis))
    }.toList

    redisConn.close()
    res.iterator
  }