Karzyfox
Karzyfox

Reputation: 349

Add row to Spark Dataframe with timestamps and id

I have a dataframe named timeDF which has the schema below:

root
 |-- Id: long (nullable = true)
 |-- Model: timestamp (nullable = true)
 |-- Prevision: timestamp (nullable = true)

I want to add a new row at the end of timeDF by transforming two Calendar objects c1 & c2 to Timestamp. I know I can do it by first converting them to Timestamp like so :

val t1 = new Timestamp(c1.getTimeInMillis)
val t2 = new Timestamp(c2.getTimeInMillis)

However, I can't figure out how I then write those variables to timeDF as a new row, and how to let spark increase the Id column value ?

Should I create a List object with t1 and t2 and make a temporary dataframe from this list to then union the two dataframes ? If so how do I manage the Id column ? Isn't it too much a mess for such a simple operation ?

Can someone explain me please ?

Thanks.

Upvotes: 0

Views: 1176

Answers (3)

jgp
jgp

Reputation: 2091

Here is a solution you can try, in a nutshell:

  1. Ingest your file.
  2. Create a new dataframe with your data and unionByName().
  3. Correct the id.
  4. Clean up.

Create the extra record

First you create the extra record from scratch. As you mix several types, I used a POJO, here is the code:

List<ModelPrevisionRecord> data = new ArrayList<>();
ModelPrevisionRecord b = new ModelPrevisionRecord(
    -1L,
    new Timestamp(System.currentTimeMillis()),
    new Timestamp(System.currentTimeMillis()));
data.add(b);
Dataset<ModelPrevisionRecord> ds = spark.createDataset(data,
    Encoders.bean(ModelPrevisionRecord.class));
timeDf = timeDf.unionByName(ds.toDF());

ModelPrevisionRecord is a very basic POJO:

package net.jgp.labs.spark.l999_scrapbook.l000;

import java.sql.Timestamp;

public class ModelPrevisionRecord {

  public long getId() {
    return id;
  }

  public void setId(long id) {
    this.id = id;
  }

  public Timestamp getModel() {
    return model;
  }

  public void setModel(Timestamp model) {
    this.model = model;
  }

  public Timestamp getPrevision() {
    return prevision;
  }

  public void setPrevision(Timestamp prevision) {
    this.prevision = prevision;
  }

  private long id;
  private Timestamp model;
  private Timestamp prevision;

  public ModelPrevisionRecord(long id, Timestamp model, Timestamp prevision) {
    this.id = id;
    this.model = model;
    this.prevision = prevision;
  }
}

Correct the Id

The id is -1, so the id is to create a new column, id2, with the right id:

timeDf = timeDf.withColumn("id2",
    when(
        col("id").$eq$eq$eq(-1), timeDf.agg(max("id")).head().getLong(0)+1)
            .otherwise(col("id")));

Cleanup the dataframe

Finally, clean up your dataframe:

timeDf = timeDf.drop("id").withColumnRenamed("id2", "id");

Important notes

Upvotes: 1

Karzyfox
Karzyfox

Reputation: 349

I tried this but I don't know why, when printing the table saved, it only keep the last 2 rows, all others being deleted.

This is how I init the delta table :

val schema = StructType(
               StructField("Id", LongType, false) ::
               StructField("Model", TimestampType, false) ::
               StructField("Prevision", TimestampType, false) :: Nil
             )

var timestampDF = spark.createDataFrame(sc.emptyRDD[Row], schema)

val write_format = "delta"
val partition_by = "Model"
val save_path = "/mnt/path/to/folder"
val table_name = "myTable"

spark.sql("DROP TABLE IF EXISTS " + table_name)
dbutils.fs.rm(save_path, true)

timestampDF.write.partitionBy(partition_by)
                 .format(write_format)
                 .save(save_path)

spark.sql("CREATE TABLE " + table_name + " USING DELTA LOCATION '" + save_path + "'")

And this how I add a new item to it

def addTimeToData(model: Calendar, target: Calendar): Unit = {
  var timeDF = spark.read
                    .format("delta")
                    .load("/mnt/path/to/folder")
  
  val modelTS = new Timestamp(model.getTimeInMillis)
  val targetTS = new Timestamp(target.getTimeInMillis)
  var id: Long = 0
  
  if (!timeDF.head(1).isEmpty) {
    id = timeDF.agg(max("Id")).head().getLong(0) + 1
  }
  
  val newTime = Arrays.asList(RowFactory.create(id, modelTS, targetTS))
  val schema = StructType(
                 StructField("Id", LongType, false) ::
                 StructField("Model", TimestampType, false) ::
                 StructField("Prevision", TimestampType, false) :: Nil
               )  
  var newTimeDF = spark.createDataFrame(newTime, schema)
  val unionTimeDF = timeDF.union(newTimeDF)
  timeDF = unionTimeDF
  unionTimeDF.show
  val save_path = "/mnt/datalake/Exploration/Provisionning/MeteoFrance/Timestamps/"
  val table_name = "myTable"

  spark.sql("DROP TABLE IF EXISTS " + table_name)
  dbutils.fs.rm(save_path, true)
  timeDF.write.partitionBy("Model")
              .format("delta")
              .save(save_path)

  spark.sql("CREATE TABLE " + table_name + " USING DELTA LOCATION '" + save_path + "'")
}

I'm not very familiar with delta tables so I don't know if I can just use SQL on it to add values like so :

spark.sql("INSERT INTO 'myTable' VALUES (" + id + ", " + modelTS + ", " + previsionTS + ")");

And I don't if just putting the timestamps variable like so will work.

Upvotes: 0

Majid Hajibaba
Majid Hajibaba

Reputation: 3260

If your first dataframe can be sorted by ID and you need to add rows one by one you can find maximum ID in your list:

long max = timeDF.agg(functions.max("Id")).head().getLong(0);

and then increment and add it to your dataframe by Union. To do this, follow the following example which age can act like id. people.json is a file in spark examples.

Dataset<Row> df = spark.read().json("H:\\work\\HadoopWinUtils\\people.json");
df.show();

long max = df.agg(functions.max("age")).head().getLong(0);
List<Row> rows = Arrays.asList(RowFactory.create(max+1,  "test"));

StructType schema = DataTypes.createStructType(Arrays.asList(
                DataTypes.createStructField("age", DataTypes.LongType, false, Metadata.empty()),
                DataTypes.createStructField("name", DataTypes.StringType, false, Metadata.empty())));
Dataset<Row> df2 = spark.createDataFrame(rows, schema);
df2.show();
Dataset<Row> df3 = df.union(df2);
df3.show();

Upvotes: 0

Related Questions