Warm tip: This article is reproduced from serverfault.com, please click

scala-使用Spark的Twitter流式传输

(scala - Twitter streaming using Spark)

发布于 2017-10-12 04:16:42

我正在尝试使用Spark Scala代码流Twitter数据。我能够获取数据并创建一个数据框并查看它。但是当尝试提取status.getPlace.getCountry()时,我得到了一个java.lang.NullPointerException。

Spark版本:2.0.0,Scala版本:2.11.8

尝试是否有条件,检查值等,但徒劳无功。

代码:

val spark = SparkSession.builder().appName("Twitter Spark Example").getOrCreate()
val ssc = new StreamingContext(spark.sparkContext,Seconds(5))

val filters:Seq[String] =  Seq("hadoop")
val cb = new ConfigurationBuilder()
      .setOAuthConsumerKey("******")
      .setOAuthConsumerSecret("******")
      .setOAuthAccessToken("********")
      .setOAuthAccessTokenSecret("******").build()

val twitter_auth = new TwitterFactory(cb)
val a = new OAuthAuthorization(cb)
val atwitter:Option[twitter4j.auth.Authorization] =  Some(twitter_auth.getInstance(a).getAuthorization())

val tweetsdstream = TwitterUtils.createStream(ssc, atwitter, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
val data = tweetsdstream.map {status => 
      val places = status.getPlace
      val id = status.getUser.getId
      val date = status.getUser.getCreatedAt.toString()
      val user = status.getUser.getName()
      val place = places.getCountry()

      (id,date,user,place)
      }
data.foreachRDD{rdd =>
      import spark.implicits._
      rdd.toDF("id","date","user","place").show()
    }

ssc.start()
ssc.awaitTermination()

从Twitter访问位置信息有任何限制吗?任何的意见都将会有帮助。

谢谢

Questioner
Pooja Nayak
Viewed
0
stefanobaghino 2017-10-13 21:58:05

你可以Option用来处理nulls:

val data = tweetsdstream.map {
  status =>
    val place = Option(status.getPlace).map(_.getCountry).orNull
    val id = status.getUser.getId
    val user = status.getUser.getName
    val date = status.getUser.getCreatedAt.toString
    (id, date, user, place)
}

这样,你将能够可视化所有推文,无论它们是否具有国家(如果未定义国家,它将为null)。

Option 对于处理可能丢失的数据非常有用,请随时将其用于其他可能为空的字段。