jack AKA karthik
jack AKA karthik

Reputation: 895

how can i add a timestamp as an extra column to my dataframe

*Hi all,

I have an easy question for you all. I have an RDD, created from kafka streaming using createStream method. Now i want to add a timestamp as a value to this rdd before converting in to dataframe. I have tried doing to add a value to the dataframe using with withColumn() but returning this error*

val topicMaps = Map("topic" -> 1)
    val now = java.util.Calendar.getInstance().getTime()

    val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER)

      messages.foreachRDD(rdd =>
          {

            val sqlContext = new org.apache.spark.sql.SQLContext(sc)
            import sqlContext.implicits._

            val dataframe = sqlContext.read.json(rdd.map(_._2))



        val d =dataframe.withColumn("timeStamp_column",dataframe.col("now"))

val d =dataframe.withColumn("timeStamp_column",dataframe.col("now")) org.apache.spark.sql.AnalysisException: Cannot resolve column name "now" among (action, device_os_ver, device_type, event_name, item_name, lat, lon, memberid, productUpccd, tenantid); at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:15

As i came to know that DataFrames cannot be altered as they are immutable, but RDDs are immutable as well. Then what is the best way to do it. How to a value to the RDD(adding timestamp to an RDD dynamically).

Upvotes: 11

Views: 37780

Answers (5)

Michael West
Michael West

Reputation: 1706

I see in comments that some folks are having trouble getting the timestamp to string. Here is a way to do that using spark 3 datetime format

import org.apache.spark.sql.functions._
val d =dataframe. 
  .withColumn("timeStamp_column", date_format(current_timestamp(), "y-M-d'T'H:m:sX"))

Upvotes: 0

venkat
venkat

Reputation: 523

Try current_timestamp function.

import org.apache.spark.sql.functions.current_timestamp    
df.withColumn("time_stamp", current_timestamp())

Upvotes: 18

Zahid Maqbool
Zahid Maqbool

Reputation: 21

In Scala/Databricks:

import org.apache.spark.sql.functions._
val newDF = oldDF.withColumn("Timestamp",current_timestamp())

See my output

Upvotes: 1

Squidy666
Squidy666

Reputation: 29

This works for me. I usually perform a write after this.

val d = dataframe.withColumn("SparkLoadedAt", current_timestamp())

Upvotes: 2

Javier Montón
Javier Montón

Reputation: 5686

For add a new column with a constant like timestamp, you can use litfunction:

import org.apache.spark.sql.functions._
val newDF = oldDF.withColumn("timeStamp_column", lit(System.currentTimeMillis))

Upvotes: 4

Related Questions