Reputation: 465
I have source data in json (simple strings). Example:
{"_t":1480647647,"_p":"[email protected]","_n":"aloaded","device_type":"desktop"}
{"_t":1480647676,"_p":"[email protected]","_n":"aloaded","device_type":"desktop"}
where _t is timestamp. 1480647647 presents - Friday, 2 December 2016 г., 03:00:47
I need load this data into parquet file. The part of code
loaded_prq_fpath - full path with file name
MessageType APPLOADED_FILE_SCHEMA = Types.buildMessage()
.required(INT64).as(TIMESTAMP_MILLIS).named("time")
.required(BINARY).as(UTF8).named("email")
.required(BINARY).as(UTF8).named("device_type")
.named("AppLoaded");
SimpleGroupFactory GROUP_FACTORY_APP_LOADED =
new SimpleGroupFactory(APPLOADED_FILE_SCHEMA);
File fp = new File(loaded_prq_fpath);
Path file1;
file1 = new Path(fp.toString());
File fp = new File(loaded_prq_fpath);
Path file1;
file1 = new Path(fp.toString());
logger.info(file1.getName());
ParquetWriter<Group> writer1 =
ExampleParquetWriter.builder(file1)
.withType(APPLOADED_FILE_SCHEMA)
.build();
...
while(jp.nextToken() == JsonToken.START_OBJECT) {
// read everything from this START_OBJECT to the matching END_OBJECT {}
// and return it as a tree model TreeNode
JsonNode node = mapper.readTree(jp);
TotalEventsCnt++;
if (node.get("_n").toString().equals("\"aloaded\"")) {
LoadCounter++;
((ObjectNode) node).remove("_n");
Group group1 = GROUP_FACTORY_APP_LOADED.newGroup();
group1.add("time", node.get("_t").asLong());
group1.add("email", node.get("_p").toString());
group1.add("device_type",node.get("device_type").toString());
writer1.write(group1);
}
...
writer1.close();
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.master", "local[*]")
.getOrCreate();
Data loaded without any errors, And next I show parquet data
Dataset<Row> df_appl = spark.read().load(loaded_prq_fpath);
df_appl.show();
df_appl.createOrReplaceTempView("v_appl");
df_appl.printSchema();
Dataset<Row> df_v_appl = spark.sql("SELECT CAST(time AS DATE) AS the_datetime from v_appl");
df_v_appl.show();
output is
+--------------------+------------+-----------+
| time| email|device_type|
+--------------------+------------+-----------+
|1970-01-17 22:17:...|"[email protected]"| "desktop"|
|1970-01-17 22:17:...|"[email protected]"| "desktop"|
|1970-01-17 22:59:...|"[email protected]"| "desktop"|
|1970-01-17 22:59:...|"[email protected]"| "desktop"|
+--------------------+------------+-----------+
root
|-- time: timestamp (nullable = true)
|-- email: string (nullable = true)
|-- device_type: string (nullable = true)
+------------+
|the_datetime|
+------------+
| 1970-01-17|
| 1970-01-17|
| 1970-01-17|
| 1970-01-17|
+------------+
I think that problem in this line:
.required(INT64).as(TIMESTAMP_MILLIS).named("time")
or here
group1.add("time", node.get("_t").asLong());
Any help please. (I new in Java, last exp 8 years ago)
Upvotes: 0
Views: 317
Reputation: 465
I solve my problem with it:
group1.add("time", node.get("_t").asLong()*1000);
because
multiply by 1000, since java is expecting milliseconds:
Upvotes: 1