Reputation: 49
Need help in Spark Scala to write the code for below issue. I have a file having records like below.
aaa|2019-07-11 02:15:50
bbb|2019-07-03 22:21:50
vvv|2019-07-03 19:30:40
bzx|2019-07-11 02:15:30
rrr|2019-06-24 01:29:10
mmm|2019-06-23 20:35:05
qqq|2019-07-12 08:10:15
eee|2019-07-11 01:49:30
iii|2019-06-23 22:31:45
I have split the file and took the 2nd column
val file = spark.read.format("csv").option("delimiter", """|""").load(pathOfDumpfile).toDF()
now I need to add "0000-00-00 00:00:05" to all the values of the file (second column that is in date format) and save it as file like below
aaa|2019-07-11 02:15:55
bbb|2019-07-03 22:21:55
vvv|2019-07-03 19:30:45
bzx|2019-07-11 02:15:35
rrr|2019-06-24 01:29:15
mmm|2019-06-23 20:35:10
qqq|2019-07-12 08:10:20
eee|2019-07-11 01:49:35
iii|2019-06-23 22:31:50
Can anyone suggest me how I can add + 5 seconds to all the records in file/column.
Will really be helpful. After adding in Date time field, the second or minute should change , it shouldn't effect the date like its 2019-07-11 23:59:59 then even adding 1 to the second it will be 2019-07-12 00:00:00. I want to add but doesn't want to change the date so how I can do this the date shouldn't be changed only there should be a change in minute or second.
Upvotes: 0
Views: 573
Reputation: 1892
you can you do by using unix_timestamp
scala> var dfv = Seq(("aaa","2019-07-11 23:59:59"),("bbb","2019-07-03 22:21:50"),("vvv","2019-07-03 19:30:40"),("bzx","2019-07-11 02:15:30"),("rrr","2019-06-24 01:29:10"),("mmm","2019-06-23 20:35:05"),("qqq","2019-07-12 08:10:15"),("eee","2019-07-11 01:49:30"),("iii","2019-06-23 22:31:45")).toDF("value","_date")
scala> dfv.show
+-----+-------------------+
|value| _date|
+-----+-------------------+
| aaa|2019-07-11 23:59:59|
| bbb|2019-07-03 22:21:50|
| vvv|2019-07-03 19:30:40|
| bzx|2019-07-11 02:15:30|
| rrr|2019-06-24 01:29:10|
| mmm|2019-06-23 20:35:05|
| qqq|2019-07-12 08:10:15|
| eee|2019-07-11 01:49:30|
| iii|2019-06-23 22:31:45|
+-----+-------------------+
scala> dfv.withColumn("_date_v1",when(date_format(from_unixtime(unix_timestamp($"_date")),"HH:mm:ss ")>"23:59:55",$"_date").otherwise(from_unixtime(unix_timestamp($"_date")+5,"yyyy-MM-dd HH:mm:ss"))).show
+-----+-------------------+-------------------+
|value| _date| _date_v1|
+-----+-------------------+-------------------+
| aaa|2019-07-11 23:59:59|2019-07-11 23:59:59|
| bbb|2019-07-03 22:21:50|2019-07-03 22:21:55|
| vvv|2019-07-03 19:30:40|2019-07-03 19:30:45|
| bzx|2019-07-11 02:15:30|2019-07-11 02:15:35|
| rrr|2019-06-24 01:29:10|2019-06-24 01:29:15|
| mmm|2019-06-23 20:35:05|2019-06-23 20:35:10|
| qqq|2019-07-12 08:10:15|2019-07-12 08:10:20|
| eee|2019-07-11 01:49:30|2019-07-11 01:49:35|
| iii|2019-06-23 22:31:45|2019-06-23 22:31:50|
+-----+-------------------+-------------------+
let me know if you have any question related to the same.
Upvotes: 1
Reputation: 8758
You can use INTERVAL syntax in spark.
Using Dataframe:
val foo = spark.sql(""" select current_timestamp as ts """)
foo.select($"ts", $"ts" + expr("INTERVAL 5 SECONDS") as "ts_plus").show(false)
+-----------------------+-----------------------+
|ts |ts_plus |
+-----------------------+-----------------------+
|2019-09-16 10:33:17.626|2019-09-16 10:33:22.626|
+-----------------------+-----------------------+
Using Spark SQL:
foo.createOrReplaceTempView("fooView")
spark.sql(""" select ts, ts + INTERVAL 5 seconds from fooView""").show(false)
+-----------------------+------------------------------------------+
|ts |CAST(ts + interval 5 seconds AS TIMESTAMP)|
+-----------------------+------------------------------------------+
|2019-09-16 10:35:12.847|2019-09-16 10:35:17.847 |
+-----------------------+------------------------------------------+
Upvotes: 1
Reputation: 591
You can do this with the help of a custom udf, like:
import org.apache.spark.sql.functions.{col, udf}
val file = spark.read.format("csv").option("delimiter", """|""").load(pathOfDumpfile).toDF("a", "b")
val timeUDF = udf((x: java.sql.Timestamp) => new java.sql.Timestamp(time.getTime + 5000)) //getTime returns ms
file.select(col("a"), timeUDF(col("b")))
.write(...)
Upvotes: 1