Skiel
Skiel

Reputation: 337

update cassandra from spark

I'm a table in cassandra tfm.foehis that have data.

When i did the first charge of data from spark to cassandra, I used this set of commands:

import org.apache.spark.sql.functions._
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._

val wkdir="/home/adminbigdata/tablas/"
val fileIn= "originales/22_FOEHIS2.csv"
val fileOut= "22_FOEHIS_PRE2"
val fileCQL= "22_FOEHISCQL"

val data = sc.textFile(wkdir + fileIn).filter(!_.contains("----")).map(_.trim.replaceAll(" +", "")).map(_.dropRight(1)).map(_.drop(1)).map(_.replaceAll(",", "")).filter(array => array(6) != "MOBIDI").filter(array => array(17) != "").saveAsTextFile(wkdir + fileOut)
val firstDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true").option("mode", "DROPMALFORMED").option("delimiter", "|").load(wkdir + fileOut)
val columns: Array[String] = firstDF.columns
val reorderedColumnNames: Array[String] = Array("hoclic","hodtac","hohrac","hotpac","honrac","hocdan","hocdrs","hocdsl","hocol","hocpny","hodesf","hodtcl","hodtcm","hodtea","hodtra","hodtrc","hodtto","hodtua","hohrcl","hohrcm","hohrea","hohrra","hohrrc","hohrua","holinh","holinr","honumr","hoobs","hooe","hotdsc","hotour","housca","houscl","houscm","housea","houser","housra","housrc")
val secondDF= firstDF.select(reorderedColumnNames.head, reorderedColumnNames.tail: _*)
secondDF.write.cassandraFormat("foehis", "tfm").save()

But when I load new data using the same script, I get errors. I don't know what's wrong? This is the message:

java.lang.UnsupportedOperationException: 'SaveMode is set to ErrorIfExists and Table
tfm.foehis already exists and contains data.
Perhaps you meant to set the DataFrame write mode to Append?
Example: df.write.format.options.mode(SaveMode.Append).save()" '

Upvotes: 1

Views: 1178

Answers (1)

Alex Ott
Alex Ott

Reputation: 87164

The error message clearly says you that you need to use Append mode & shows what you can do with it. In your case it happens because destination table already exists, and writing mode is set to "error if exists". If you still want to write data, the code should be following:

import org.apache.spark.sql.SaveMode
secondDF.write.cassandraFormat("foehis", "tfm").mode(SaveMode.Append).save()

Upvotes: 1

Related Questions