Reputation: 1142
I have a Cassandra database that have to receive its data in my Flink program from socket like steam for Streamprocessing. So, I wrote a simple client program that read data from Cassandra and sent the data to the socket;also,I wrote the Flink program in server base.In fact, my client program is simple and does not use any Flink instructions;it just send a Cassandra row in string format to socket and Server must receive the row. First, I run the Flink program to listen to the client and then run the client program. The client received this stream from server (because server send datastream data and client cannot receive it correctly):
Hi Client org.apache.flink.streaming.api.datastream.DataStreamSource@68c72235
After that both programs stay running without sending and receiving any data and there is no error.
The Flink program is in following: public class WordCount_in_cassandra {
private static int myport=9999;
private static String hostname="localhost";
//static ServerSocket variable
private static ServerSocket server;
private static int count_row=0;
public static void main(String[] args) throws Exception {
// Checking input parameters
final ParameterTool params = ParameterTool.fromArgs(args);
// set up the execution environment
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
//create the socket server object
server = new ServerSocket(myport);
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
while (true){
System.out.println("Waiting for client request");
//creating socket and waiting for client connection
Socket socket = server.accept();
DataStream<String> stream = env.socketTextStream(hostname,
myport);
stream.print();
//write object to Socket
oos.writeObject("Hi Client " + stream.toString());
oos.close();
socket.close();
// parse the data, group it, window it, and aggregate the
counts
DataStream<Tuple2<String, Long>> counts = stream
.flatMap(new FlatMapFunction<String, Tuple2<String,
Long>>() {
@Override
public void flatMap(String value,
Collector<Tuple2<String, Long>> out) {
// normalize and split the line
String[] words = value.toLowerCase().split("\\W+");
// emit the pairs
for (String word : words) {
if (!word.isEmpty()) {
out.collect(new Tuple2<String, Long>(word, 1L));
}
}
}
})
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
// emit result
if (params.has("output")) {
counts.writeAsText(params.get("output"));
} else {
System.out.println("Printing result to stdout. Use --
output to specify output path.");
counts.print();
}
//terminate the server if client sends exit request
if (stream.equals("exit")){
System.out.println("row_count : "+count_row);
break;
}
// execute program
env.execute("Streaming WordCount");
}//while true
System.out.println("Shutting down Socket server!!");
server.close();
}//main
}
The client program is like this:
public class client_code {
private static Cluster cluster =
Cluster.builder().addContactPoint("127.0.0.1")
.withPort(9042).build();
private static Session session = cluster.connect("mar1");
public static void main(String[] args) throws UnknownHostException,
IOException, ClassNotFoundException, InterruptedException {
String serverIP = "localhost";
int port=9999;
Socket socket = null;
ObjectOutputStream oos = null;
ObjectInputStream ois = null;
ResultSet result = session.execute("select * from tlbtest15");
for (Row row : result) {
//establish socket connection to server
socket = new Socket(serverIP, port);
//write to socket using ObjectOutputStream
oos = new ObjectOutputStream(socket.getOutputStream());
System.out.println("Sending request to Socket Server");
if (row==result) oos.writeObject("exit");
else oos.writeObject(""+row+"");
//read the server response message
ois = new ObjectInputStream(socket.getInputStream());
String message = (String) ois.readObject();
System.out.println("Message: " + message);
//close resources
ois.close();
oos.close();
Thread.sleep(100);
}
cluster.close();
}
}
Would you please tell me how I can solve my problem?
Any help would be appreciated.
Upvotes: 0
Views: 327
Reputation: 43499
There are several problems with the way you've tried to construct the Flink application. A few comments:
while(true)
loop.socketTextStream
sets up a client connection. Your server doesn't appear to do anything useful.stream.equals("exit")
-- stream is a DataStream, not a String. If you want to do something special when a stream element has a specific value, that needs to be done differently, by using one of the stream operations that does event-at-a-time processing. As for shutting down the Flink job, streaming jobs are normally designed to either run indefinitely, or to run until a finite input source reaches its end, at which point they shutdown on their own. You can simplify things considerably. I would start over, and begin by replacing your client with a command line like this:
cqlsh -e "SELECT * from tlbtest15;" | nc -lk 9999
nc (netcat) will act as a server in this case, allowing Flink to be a client. This will make things easier, as that's how env.socketTextTream is meant to be used.
Then you'll be able to process the results with a normal Flink application. The socketTextStream will produce a stream containing the query's results as lines of text, one for each row.
Upvotes: 2