Aleksey N Yakushev
Aleksey N Yakushev

Reputation: 465

Java Load data into parquet

I have source data in json (simple strings). Example:

{"_t":1480647647,"_p":"[email protected]","_n":"aloaded","device_type":"desktop"}
{"_t":1480647676,"_p":"[email protected]","_n":"aloaded","device_type":"desktop"}

where _t is timestamp. 1480647647 presents - Friday, 2 December 2016 г., 03:00:47

I need load this data into parquet file. The part of code

                loaded_prq_fpath - full path with file name

                MessageType APPLOADED_FILE_SCHEMA = Types.buildMessage()
                  .required(INT64).as(TIMESTAMP_MILLIS).named("time")
                  .required(BINARY).as(UTF8).named("email")
                  .required(BINARY).as(UTF8).named("device_type")
                  .named("AppLoaded");

            SimpleGroupFactory GROUP_FACTORY_APP_LOADED =
                  new SimpleGroupFactory(APPLOADED_FILE_SCHEMA);

            File fp = new File(loaded_prq_fpath);
            Path file1;
            file1 = new Path(fp.toString());
              File fp = new File(loaded_prq_fpath);
            Path file1;
            file1 = new Path(fp.toString());
            logger.info(file1.getName());

            ParquetWriter<Group> writer1 = 
                ExampleParquetWriter.builder(file1)
                .withType(APPLOADED_FILE_SCHEMA)
                .build();
            ...
             while(jp.nextToken() == JsonToken.START_OBJECT) {
                  // read everything from this START_OBJECT to the matching END_OBJECT {}
                  // and return it as a tree model TreeNode
                  JsonNode node = mapper.readTree(jp);
                  TotalEventsCnt++;
                    if (node.get("_n").toString().equals("\"aloaded\"")) {  
                        LoadCounter++;
                        ((ObjectNode) node).remove("_n");
                         Group group1 = GROUP_FACTORY_APP_LOADED.newGroup();
                            group1.add("time",       node.get("_t").asLong());
                            group1.add("email",      node.get("_p").toString());
                            group1.add("device_type",node.get("device_type").toString());
                            writer1.write(group1);
                    } 
                  ...
                  writer1.close();
                   SparkSession spark = SparkSession
                .builder()
                .appName("Java Spark SQL basic example")
                .config("spark.master", "local[*]")
                .getOrCreate();

Data loaded without any errors, And next I show parquet data

            Dataset<Row> df_appl = spark.read().load(loaded_prq_fpath);
            df_appl.show(); 

            df_appl.createOrReplaceTempView("v_appl");
            df_appl.printSchema();
            Dataset<Row> df_v_appl = spark.sql("SELECT CAST(time AS DATE) AS the_datetime from v_appl");
            df_v_appl.show();

output is

+--------------------+------------+-----------+
|                time|       email|device_type|
+--------------------+------------+-----------+
|1970-01-17 22:17:...|"[email protected]"|  "desktop"|
|1970-01-17 22:17:...|"[email protected]"|  "desktop"|
|1970-01-17 22:59:...|"[email protected]"|  "desktop"|
|1970-01-17 22:59:...|"[email protected]"|  "desktop"|
+--------------------+------------+-----------+

 root
 |-- time: timestamp (nullable = true)
 |-- email: string (nullable = true)
 |-- device_type: string (nullable = true)

+------------+
|the_datetime|
+------------+
|  1970-01-17|
|  1970-01-17|
|  1970-01-17|
|  1970-01-17|
+------------+   

I think that problem in this line:

.required(INT64).as(TIMESTAMP_MILLIS).named("time")

or here

 group1.add("time",       node.get("_t").asLong());

Any help please. (I new in Java, last exp 8 years ago)

Upvotes: 0

Views: 317

Answers (1)

Aleksey N Yakushev
Aleksey N Yakushev

Reputation: 465

I solve my problem with it:

    group1.add("time",       node.get("_t").asLong()*1000);

because

multiply by 1000, since java is expecting milliseconds:

Look here

Upvotes: 1

Related Questions