Reputation: 7732
I'm trying to get some messages with Twitter Streaming API using Apache Flink.
But, my code is not writing anything in the output file. I'm trying to count the input data for specific words.
Plese check my example:
import java.util.Properties
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.twitter._
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import com.twitter.hbc.core.endpoint.{Location, StatusesFilterEndpoint, StreamingEndpoint}
import org.apache.flink.streaming.api.windowing.time.Time
import scala.collection.JavaConverters._
//////////////////////////////////////////////////////
// Create an Endpoint to Track our terms
class myFilterEndpoint extends TwitterSource.EndpointInitializer with Serializable {
@Override
def createEndpoint(): StreamingEndpoint = {
//val chicago = new Location(new Location.Coordinate(-86.0, 41.0), new Location.Coordinate(-87.0, 42.0))
val endpoint = new StatusesFilterEndpoint()
//endpoint.locations(List(chicago).asJava)
endpoint.trackTerms(List("odebrecht", "lava", "jato").asJava)
endpoint
}
}
object Connection {
def main(args: Array[String]): Unit = {
val props = new Properties()
val params: ParameterTool = ParameterTool.fromArgs(args)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setGlobalJobParameters(params)
env.setParallelism(params.getInt("parallelism", 1))
props.setProperty(TwitterSource.CONSUMER_KEY, params.get("consumer-key"))
props.setProperty(TwitterSource.CONSUMER_SECRET, params.get("consumer-key"))
props.setProperty(TwitterSource.TOKEN, params.get("token"))
props.setProperty(TwitterSource.TOKEN_SECRET, params.get("token-secret"))
val source = new TwitterSource(props)
val epInit = new myFilterEndpoint()
source.setCustomEndpointInitializer(epInit)
val streamSource = env.addSource(source)
streamSource.map(s => (0, 1))
.keyBy(0)
.timeWindow(Time.minutes(2), Time.seconds(30))
.sum(1)
.map(t => t._2)
.writeAsText(params.get("output"))
env.execute("Twitter Count")
}
}
The point is, I have no error message and I can see at my Dashboard. My source is sending data to my TriggerWindow. But it is not receive any data:
I have two questions in once.
First: Why my source is sending bytes to my TriggerWindow if it is not received anything?
Seccond: Is something wrong to my code that I can't take data from twitter?
Upvotes: 0
Views: 427
Reputation: 13346
Your application source did not send actual records to the window which you can see by looking at the Records sent column. The bytes which are sent belong to control messages which Flink sends from time to time between the tasks. More specifically, it is the LatencyMarker
message which is used to measure the end to end latency of a Flink job.
The code looks good to me. I even tried out your code and worked for me. Thus, I conclude that there has to be something wrong with the Twitter connection credentials. Please re-check whether you've entered the right credentials.
Upvotes: 1