Warm tip: This article is reproduced from serverfault.com, please click

Spark Structured Streaming dynamic lookup with Redis

发布于 2020-10-29 14:18:29

i am new to spark. We are currently building a pipeline :

  1. Read the events from Kafka topic
  2. Enrich this data with the help of Redis-Lookup
  3. Write events to the new Kafka topic

So, my problem is when i want to use spark-redis library it performs very well, but data stays static in my streaming job.

Although data is refreshed at Redis, it does not reflect to my dataframe. Spark reads data at first then never updates it. Also i am reading from REDIS data at first,total data about 1mio key-val string.

What kind of approaches/methods i can do, i want to use Redis as in-memory dynamic lookup. And lookup table is changing almost 1 hour.

Thanks.

used libraries: spark-redis-2.4.1.jar commons-pool2-2.0.jar jedis-3.2.0.jar

Here is the code part:

import com.intertech.hortonworks.spark.registry.functions._
val config = Map[String, Object]("schema.registry.url" -> "http://aa.bbb.ccc.yyy:xxxx/api/v1")
implicit val srConfig:SchemaRegistryConfig = SchemaRegistryConfig(config)
var rawEventSchema = sparkSchema("my_raw_json_events") 


val my_raw_events_df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "aa.bbb.ccc.yyy:9092")
.option("subscribe", "my-raw-event")
.option("failOnDataLoss","false")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger",1000)
.load()
.select(from_json($"value".cast("string"),rawEventSchema, Map.empty[String, String])
        .alias("C"))


import com.redislabs.provider.redis._
val sc = spark.sparkContext
val stringRdd = sc.fromRedisKV("PARAMETERS:*") 
val lookup_map = stringRdd.collect().toMap
val lookup = udf((key: String) => lookup_map.getOrElse(key,"") )



val curated_df = my_raw_events_df 
.select(

     ...
     $"C.SystemEntryDate".alias("RecordCreateDate")
    ,$"C.Profile".alias("ProfileCode")     
    ,**lookup(expr("'PARAMETERS:PROFILE||'||NVL(C.Profile,'')")).alias("ProfileName")**
    ,$"C.IdentityType"     
    ,lookup(expr("'PARAMETERS:IdentityType||'||NVL(C.IdentityType,'')")).alias("IdentityTypeName")     
     ...

).as("C")



import org.apache.spark.sql.streaming.Trigger

val query = curated_df
   .select(to_sr(struct($"*"), "curated_event_sch").alias("value"))
   .writeStream
   .format("kafka")
   .option("kafka.bootstrap.servers", "aa.bbb.ccc.yyy:9092")
   .option("topic", "curated-event")
   .option("checkpointLocation","/user/spark/checkPointLocation/xyz")
   .trigger(Trigger.ProcessingTime("30 seconds"))
   .start()

   query.awaitTermination()
Questioner
mustangc
Viewed
0
fe2s 2020-10-30 00:30:38

One option is to not use spark-redis, but rather lookup in Redis directly. This can be achieved with df.mapPartitions function. You can find some examples for Spark DStreams here https://blog.codecentric.de/en/2017/07/lookup-additional-data-in-spark-streaming/. The idea for Structural Streaming is similar. Be careful to handle the Redis connection properly.