我正在尝试使用Spark Scala代码流Twitter数据。我能够获取数据并创建一个数据框并查看它。但是当尝试提取status.getPlace.getCountry()时,我得到了一个java.lang.NullPointerException。
Spark版本:2.0.0,Scala版本:2.11.8
尝试是否有条件,检查值等,但徒劳无功。
代码:
val spark = SparkSession.builder().appName("Twitter Spark Example").getOrCreate()
val ssc = new StreamingContext(spark.sparkContext,Seconds(5))
val filters:Seq[String] = Seq("hadoop")
val cb = new ConfigurationBuilder()
.setOAuthConsumerKey("******")
.setOAuthConsumerSecret("******")
.setOAuthAccessToken("********")
.setOAuthAccessTokenSecret("******").build()
val twitter_auth = new TwitterFactory(cb)
val a = new OAuthAuthorization(cb)
val atwitter:Option[twitter4j.auth.Authorization] = Some(twitter_auth.getInstance(a).getAuthorization())
val tweetsdstream = TwitterUtils.createStream(ssc, atwitter, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
val data = tweetsdstream.map {status =>
val places = status.getPlace
val id = status.getUser.getId
val date = status.getUser.getCreatedAt.toString()
val user = status.getUser.getName()
val place = places.getCountry()
(id,date,user,place)
}
data.foreachRDD{rdd =>
import spark.implicits._
rdd.toDF("id","date","user","place").show()
}
ssc.start()
ssc.awaitTermination()
从Twitter访问位置信息有任何限制吗?任何的意见都将会有帮助。
谢谢
你可以Option
用来处理null
s:
val data = tweetsdstream.map {
status =>
val place = Option(status.getPlace).map(_.getCountry).orNull
val id = status.getUser.getId
val user = status.getUser.getName
val date = status.getUser.getCreatedAt.toString
(id, date, user, place)
}
这样,你将能够可视化所有推文,无论它们是否具有国家(如果未定义国家,它将为null)。
Option
对于处理可能丢失的数据非常有用,请随时将其用于其他可能为空的字段。