Thiago Baldim
Thiago Baldim

Reputation: 7732

Apache Flink - Unable to get data From Twitter

I'm trying to get some messages with Twitter Streaming API using Apache Flink.

But, my code is not writing anything in the output file. I'm trying to count the input data for specific words.

Plese check my example:

import java.util.Properties

import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.twitter._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import com.twitter.hbc.core.endpoint.{Location, StatusesFilterEndpoint, StreamingEndpoint}
import org.apache.flink.streaming.api.windowing.time.Time

import scala.collection.JavaConverters._


//////////////////////////////////////////////////////
// Create an Endpoint to Track our terms
class myFilterEndpoint extends TwitterSource.EndpointInitializer with Serializable {
  @Override
  def createEndpoint(): StreamingEndpoint = {
    //val chicago = new Location(new Location.Coordinate(-86.0, 41.0), new Location.Coordinate(-87.0, 42.0))
    val endpoint = new StatusesFilterEndpoint()
    //endpoint.locations(List(chicago).asJava)
    endpoint.trackTerms(List("odebrecht", "lava", "jato").asJava)
    endpoint
  }
}

object Connection {
  def main(args: Array[String]): Unit = {

    val props = new Properties()

    val params: ParameterTool = ParameterTool.fromArgs(args)
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.getConfig.setGlobalJobParameters(params)
    env.setParallelism(params.getInt("parallelism", 1))

    props.setProperty(TwitterSource.CONSUMER_KEY, params.get("consumer-key"))
    props.setProperty(TwitterSource.CONSUMER_SECRET, params.get("consumer-key"))
    props.setProperty(TwitterSource.TOKEN, params.get("token"))
    props.setProperty(TwitterSource.TOKEN_SECRET, params.get("token-secret"))

    val source = new TwitterSource(props)
    val epInit = new myFilterEndpoint()

    source.setCustomEndpointInitializer(epInit)

    val streamSource = env.addSource(source)

    streamSource.map(s => (0, 1))
      .keyBy(0)
      .timeWindow(Time.minutes(2), Time.seconds(30))
      .sum(1)
      .map(t => t._2)
      .writeAsText(params.get("output"))

    env.execute("Twitter Count")
  }
}

The point is, I have no error message and I can see at my Dashboard. My source is sending data to my TriggerWindow. But it is not receive any data: enter image description here

I have two questions in once.

First: Why my source is sending bytes to my TriggerWindow if it is not received anything?

Seccond: Is something wrong to my code that I can't take data from twitter?

Upvotes: 0

Views: 427

Answers (1)

Till Rohrmann
Till Rohrmann

Reputation: 13346

Your application source did not send actual records to the window which you can see by looking at the Records sent column. The bytes which are sent belong to control messages which Flink sends from time to time between the tasks. More specifically, it is the LatencyMarker message which is used to measure the end to end latency of a Flink job.

The code looks good to me. I even tried out your code and worked for me. Thus, I conclude that there has to be something wrong with the Twitter connection credentials. Please re-check whether you've entered the right credentials.

Upvotes: 1

Related Questions