user17775951
user17775951

Reputation: 31

RichSinkFunction for Cassandra in Flink

I read the advantages of using RichSinkFunction over directly calling the DB methods. Therefore, I decided to write my own RichSinkFunction.

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import com.datastax.driver.core.{Session, Cluster}

class CassandraAsSink extends RichSinkFunction {
  override def open(parameters: Configuration): Unit = {
    val cluster = Cluster.builder().addContactPoint("localhost").build()//
    val session = cluster.connect("example")
  }

  override def invoke(value: Nothing, context: SinkFunction.Context): Unit = {
    session.execute(
      s"""
  INSERT INTO users (name, credits, user_id)
  VALUES ($name, $credits, $userId)
  """
    )
  }

  override def close(): Unit = {
  //something like session.close()
  }

}  

However, I am not able to develop it fully. I want to call this method under a separate class which should pass 3 arguments that I want to enter mentioned in the code. The record is in JSON format. I can manage that by parsing and getting the attributes. But how do I pass it to the invoke method and how can I pass the session object throughout the class. Also, is it a correct way of doing it since I am new to both Flink and Scala?

Will stream/string.new CassandraAsSink().invoke(name,credits,user_id) work when it comes to the calling part?

Modified:

class CassandraSink extends RichSinkFunction[String] {
  var cluster: Cluster = _
  var session: Session = _
  println("inside....")

  override def open(parameters: Configuration): Unit = {
    cluster = Cluster.builder().addContactPoint("localhost").build() //
    session = cluster.connect("example")
    println("Connected....")
  }

  override def invoke(value: String): Unit = {
    println("inside invoke: " + value)
    session.execute(
      s"""
  INSERT INTO jsondata1(records_b)
  VALUES ($value)
  """
    )
  }
override def close(): Unit = {
    session.close()
    println("Session Closed...")
    //something like session.close()
  }
}

Calling part:

val datastreamFromString:DataStream[String]=env.fromElements(data) // where data is string
        datastreamFromString.addSink(new CassandraAsSink())

I figured out that there is some problem with my DataStream created from String. The class is working fine. I have initialized the env variable as the second line in the class.

Upvotes: 1

Views: 717

Answers (1)

David Anderson
David Anderson

Reputation: 43454

Flink already has a Cassandra sink; it has valuable features you haven't attempted to support, especially checkpointing.

As for your questions:

You can make session a member variable that can be initialized in open and used in invoke.

Flink will call the invoke method for every stream record coming into the sink. This record passed to invoke as the value parameter. You'll need to extract the fields like name, etc from that value.

You'll need to attach the sink to your job graph; overall it will end up being something like this:

val env = StreamExecutionEnvironment.getExecutionEnvironment

env
  .addSource(source)
  ... // some processing
  .addSink(new CassandraAsSink())

env.execute()

By the way, there are training lessons with examples and exercises included in the Flink documentation to help you get started.

Upvotes: 1

Related Questions