Anji
Anji

Reputation: 305

Inserting Data Into Cassandra table Using Spark DataFrame

I'm using Scala Version 2.10.5 Cassandra 3.0 and Spark 1.6. I want to insert data into cassandra so I tried Out basic Example

scala> val collection = sc.parallelize(Seq(("cat", 30), ("fox", 40)))
scala> collection.saveToCassandra("test", "words", SomeColumns("word", "count"))

Which Works and able insert data into Cassandra.So I had a csv file Which I wan to insert into Cassandra table by matching schema

val person = sc.textFile("hdfs://localhost:9000/user/hduser/person")
import org.apache.spark.sql._
val schema =  StructType(Array(StructField("firstName",StringType,true),StructField("lastName",StringType,true),StructField("age",IntegerType,true)))
val rowRDD = person.map(_.split(",")).map(p => org.apache.spark.sql.Row(p(0),p(1),p(2).toInt))
val personSchemaRDD = sqlContext.applySchema(rowRDD, schema)
 personSchemaRDD.saveToCassandra

When I am using SaveToCassndra Iam getting saveToCassandra is not part of personSchemaRDD. So taught of trying in different way

 df.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "words_copy", "keyspace" -> "test")).save()

But getting the cannot connect to cassandra on ip:port.can any one tell me the best way to do it. I need to periodically save data to cassandra from the files.

Upvotes: 6

Views: 15677

Answers (2)

Prashant Sahoo
Prashant Sahoo

Reputation: 1095

I am putting my code here to save Spark dataset into Cassandra table using Spark Java.

private static void readBigEmptable(SparkSession sparkSession) {
   String cassandraEmpColumns= "id,name,salary,age,city";
    Dataset<Row> bigDataset = sparkSession.sql("select * from big_emp");
    // Generate the schema for output row
    List<StructField> fields = new ArrayList<>();
    for (String fieldName : cassandraEmpColumns.split(",")) {
        StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
        fields.add(field);
    }
    StructType schemaStructure = DataTypes.createStructType(fields);
    // Converting big dataset to RDD to perform operation on Row field
    JavaRDD<Row> bigRDD = bigDataset.toJavaRDD();
    JavaRDD<Row> resultRDD = bigRDD .map(new Function<Row, Row>() {

        /**
         * 
         */
        private static final long serialVersionUID = 1L;

        @Override
        public Row call(Row row) throws Exception {
            // return compareField(row).iterator();
            Row outputRow = RowFactory.create(row.getAs("id"), row.getAs("name"), row.getAs("salary"),
                    row.getAs("age"), row.getAs("city"));
            return outputRow;
        }
    });
    Dataset<Row> empDs = sparkSession.createDataFrame(resultRDD, schemaStructure);
    empDs.show();
    writeToCassandraTable(empDs);

}

private static void writeToCassandraTable(Dataset<Row> dataset) {
    Map<String, String> tableProperties = new HashMap();
    tableProperties.put("keyspace", "test_keyspace");
    tableProperties.put("table", "emp_test");
    tableProperties.put("confirm.truncate", "true");
    dataset.write().format("org.apache.spark.sql.cassandra").options(tableProperties).mode(SaveMode.Overwrite)
            .save();
}

Note: If we are using mode(SaveMode.Overwrite) then we should use tableProperties.put("confirm.truncate", "true"); otherwise we will get error message.

SaveMode.Append

  • Append mode means that when saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.

SaveMode.ErrorIfExists

  • ErrorIfExists mode means that when saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown.

SaveMode.Ignore

  • Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data.

SaveMode.Overwrite

  • Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.

Upvotes: 0

maasg
maasg

Reputation: 37435

sqlContext.applySchema(...) returns a DataFrame and a DataFrame does not have the saveToCassandra method.

You could the .write method with it:

val personDF = sqlContext.applySchema(rowRDD, schema)
personDF.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "words_copy", "keyspace" -> "test")).save()

If we want to use the savetoCassandra method, the best way is to have a schema-aware RDD, using a case class.

case class Person(firstname:String, lastName:String, age:Int)
val rowRDD = person.map(_.split(",")).map(p => Person(p(0),p(1),p(2).toInt)
rowRDD.saveToCassandra(keyspace, table)

The Dataframe write method should work. Check that you have configured your context correctly.

Upvotes: 10

Related Questions