Reputation: 895
*Hi all,
I have an easy question for you all. I have an RDD, created from kafka streaming using createStream method. Now i want to add a timestamp as a value to this rdd before converting in to dataframe. I have tried doing to add a value to the dataframe using with withColumn() but returning this error*
val topicMaps = Map("topic" -> 1)
val now = java.util.Calendar.getInstance().getTime()
val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER)
messages.foreachRDD(rdd =>
{
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val dataframe = sqlContext.read.json(rdd.map(_._2))
val d =dataframe.withColumn("timeStamp_column",dataframe.col("now"))
val d =dataframe.withColumn("timeStamp_column",dataframe.col("now")) org.apache.spark.sql.AnalysisException: Cannot resolve column name "now" among (action, device_os_ver, device_type, event_name, item_name, lat, lon, memberid, productUpccd, tenantid); at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:15
As i came to know that DataFrames cannot be altered as they are immutable, but RDDs are immutable as well. Then what is the best way to do it. How to a value to the RDD(adding timestamp to an RDD dynamically).
Upvotes: 11
Views: 37780
Reputation: 1706
I see in comments that some folks are having trouble getting the timestamp to string. Here is a way to do that using spark 3 datetime format
import org.apache.spark.sql.functions._
val d =dataframe.
.withColumn("timeStamp_column", date_format(current_timestamp(), "y-M-d'T'H:m:sX"))
Upvotes: 0
Reputation: 523
Try current_timestamp function.
import org.apache.spark.sql.functions.current_timestamp
df.withColumn("time_stamp", current_timestamp())
Upvotes: 18
Reputation: 21
In Scala/Databricks:
import org.apache.spark.sql.functions._
val newDF = oldDF.withColumn("Timestamp",current_timestamp())
Upvotes: 1
Reputation: 29
This works for me. I usually perform a write after this.
val d = dataframe.withColumn("SparkLoadedAt", current_timestamp())
Upvotes: 2
Reputation: 5686
For add a new column with a constant like timestamp, you can use lit
function:
import org.apache.spark.sql.functions._
val newDF = oldDF.withColumn("timeStamp_column", lit(System.currentTimeMillis))
Upvotes: 4