Reputation: 31
I read the advantages of using RichSinkFunction
over directly calling the DB methods. Therefore, I decided to write my own RichSinkFunction
.
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import com.datastax.driver.core.{Session, Cluster}
class CassandraAsSink extends RichSinkFunction {
override def open(parameters: Configuration): Unit = {
val cluster = Cluster.builder().addContactPoint("localhost").build()//
val session = cluster.connect("example")
}
override def invoke(value: Nothing, context: SinkFunction.Context): Unit = {
session.execute(
s"""
INSERT INTO users (name, credits, user_id)
VALUES ($name, $credits, $userId)
"""
)
}
override def close(): Unit = {
//something like session.close()
}
}
However, I am not able to develop it fully. I want to call this method under a separate class which should pass 3 arguments that I want to enter mentioned in the code. The record is in JSON
format. I can manage that by parsing and getting the attributes. But how do I pass it to the invoke
method and how can I pass the session
object throughout the class. Also, is it a correct way of doing it since I am new to both Flink and Scala?
Will stream/string.new CassandraAsSink().invoke(name,credits,user_id
) work when it comes to the calling part?
Modified:
class CassandraSink extends RichSinkFunction[String] {
var cluster: Cluster = _
var session: Session = _
println("inside....")
override def open(parameters: Configuration): Unit = {
cluster = Cluster.builder().addContactPoint("localhost").build() //
session = cluster.connect("example")
println("Connected....")
}
override def invoke(value: String): Unit = {
println("inside invoke: " + value)
session.execute(
s"""
INSERT INTO jsondata1(records_b)
VALUES ($value)
"""
)
}
override def close(): Unit = {
session.close()
println("Session Closed...")
//something like session.close()
}
}
Calling part
:
val datastreamFromString:DataStream[String]=env.fromElements(data) // where data is string
datastreamFromString.addSink(new CassandraAsSink())
I figured out that there is some problem with my DataStream
created from String
. The class is working fine. I have initialized the env
variable as the second line in the class.
Upvotes: 1
Views: 717
Reputation: 43454
Flink already has a Cassandra sink; it has valuable features you haven't attempted to support, especially checkpointing.
As for your questions:
You can make session
a member variable that can be initialized in open
and used in invoke
.
Flink will call the invoke
method for every stream record coming into the sink. This record passed to invoke as the value
parameter. You'll need to extract the fields like name, etc from that value.
You'll need to attach the sink to your job graph; overall it will end up being something like this:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.addSource(source)
... // some processing
.addSink(new CassandraAsSink())
env.execute()
By the way, there are training lessons with examples and exercises included in the Flink documentation to help you get started.
Upvotes: 1