Getting tweets with keyword “Bolsonaro” and sending them to the Event Hub in real-time using Azure Databricks (Spark)

Prof. Dr. Caio Moreno
2 min readOct 22, 2018

In this tutorial, I will demonstrate an example of getting tweets with keyword "Bolsonaro" and sending them to Azure Event Hub in realtime using Azure Databricks (Spark).

import java.util._
import scala.collection.JavaConverters._
import com.microsoft.azure.eventhubs._
import java.util.concurrent._
val namespaceName = "iotsmarthouseeventhubkafka"
val eventHubName = "tweetshub"
val sasKeyName = "senderpolicy"
val sasKey = "YOUR_KEY"
val connStr = new ConnectionStringBuilder()
.setNamespaceName(namespaceName)
.setEventHubName(eventHubName)
.setSasKeyName(sasKeyName)
.setSasKey(sasKey)
val pool = Executors.newFixedThreadPool(1)
val eventHubClient = EventHubClient.create(connStr.toString(), pool)
def sendEvent(message: String) = {
val messageData = EventData.create(message.getBytes("UTF-8"))
eventHubClient.get().send(messageData)
System.out.println("Sent event: " + message + "\n")
}
import twitter4j._
import twitter4j.TwitterFactory
import twitter4j.Twitter
import twitter4j.conf.ConfigurationBuilder
// Twitter configuration!
// Replace values below with yours
val twitterConsumerKey = "REPLACE_HERE"
val twitterConsumerSecret = "REPLACE_HERE"
val twitterOauthAccessToken = "REPLACE_HERE"
val twitterOauthTokenSecret = "REPLACE_HERE"
val cb = new ConfigurationBuilder()
cb.setDebugEnabled(true)
.setOAuthConsumerKey(twitterConsumerKey)
.setOAuthConsumerSecret(twitterConsumerSecret)
.setOAuthAccessToken(twitterOauthAccessToken)
.setOAuthAccessTokenSecret(twitterOauthTokenSecret)
val twitterFactory = new TwitterFactory(cb.build())
val twitter = twitterFactory.getInstance()
// Getting tweets with keyword "Bolsonaro" and sending them to the Event Hub in realtime!val query = new Query(" #Bolsonaro ")
query.setCount(100)
query.lang("en")
var finished = false
while (!finished) {
val result = twitter.search(query)
val statuses = result.getTweets()
var lowestStatusId = Long.MaxValue
for (status <- statuses.asScala) {
if(!status.isRetweet()){
sendEvent(status.getText())
}
lowestStatusId = Math.min(status.getId(), lowestStatusId)
Thread.sleep(2000)
}
query.setMaxId(lowestStatusId - 1)
}
// Closing connection to the Event Hub
eventHubClient.get().close()

--

--

Prof. Dr. Caio Moreno

Solutions Architect and Data Scientist @databricks | Adjunct Professor at @IEuniversity | PhD @unicomplutense (Opinions are my own)