I'm trying to stream twitter data using spark scala code.I'm able to fetch data and create a dataframe and view it.But when trying to extract status.getPlace.getCountry() I get a java.lang.NullPointerException.
Spark version: 2.0.0, Scala version: 2.11.8
Tried with if conditions,checking value etc,but in vain.
Code:
val spark = SparkSession.builder().appName("Twitter Spark Example").getOrCreate()
val ssc = new StreamingContext(spark.sparkContext,Seconds(5))
val filters:Seq[String] = Seq("hadoop")
val cb = new ConfigurationBuilder()
.setOAuthConsumerKey("******")
.setOAuthConsumerSecret("******")
.setOAuthAccessToken("********")
.setOAuthAccessTokenSecret("******").build()
val twitter_auth = new TwitterFactory(cb)
val a = new OAuthAuthorization(cb)
val atwitter:Option[twitter4j.auth.Authorization] = Some(twitter_auth.getInstance(a).getAuthorization())
val tweetsdstream = TwitterUtils.createStream(ssc, atwitter, filters, StorageLevel.MEMORY_AND_DISK_SER_2)
val data = tweetsdstream.map {status =>
val places = status.getPlace
val id = status.getUser.getId
val date = status.getUser.getCreatedAt.toString()
val user = status.getUser.getName()
val place = places.getCountry()
(id,date,user,place)
}
data.foreachRDD{rdd =>
import spark.implicits._
rdd.toDF("id","date","user","place").show()
}
ssc.start()
ssc.awaitTermination()
Is there any restriction on accessing location information from twitter? Any suggestions would be helpful.
Thanks
You can use Option
to deal with null
s:
val data = tweetsdstream.map {
status =>
val place = Option(status.getPlace).map(_.getCountry).orNull
val id = status.getUser.getId
val user = status.getUser.getName
val date = status.getUser.getCreatedAt.toString
(id, date, user, place)
}
This way, you'll be able to visualize all tweets, regardless if they have a country or not (and it will be null where the country is not defined).
Option
is very useful to handle possibly missing data, feel free to use it for other possibly empty fields.