Reputation: 477
I want to save a parquet file directly to hdfs using java.
This is the code that i used to generate parquet files and store them local, but now i want to store them in hdfs.
final String schemaLocation = "/home/javier/FlinkProjects/kafka-flink/src/main/java/com/grallandco/demos/avro.json";
final Schema avroSchema = new Schema.Parser().parse(new File(schemaLocation));
final MessageType parquetSchema = new AvroSchemaConverter().convert(avroSchema);
final WriteSupport writeSupport = new AvroWriteSupport(parquetSchema, avroSchema);
final String parquetFile = "/home/javier/parquet-files/data" + postfijoFilename + ".parquet";
final Path path = new Path(parquetFile);
AvroParquetWriter parquetWriter = new AvroParquetWriter(path,
avroSchema, CompressionCodecName.SNAPPY, ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE);
final GenericRecord record = new GenericData.Record(avroSchema);
record.put(Constantes.CAMPO_ID, datos[0]);
record.put("movie", datos[1]);
record.put("date", datos[2]);
record.put("imdb", datos[3]);
parquetWriter.write(record);
And i want to replace this
final String parquetFile = "/home/javier/parquet-files/data" + postfijoFilename + ".parquet";
with a hadoop hdfs path, any idea ???
Upvotes: 2
Views: 1227
Reputation: 1360
You can do this by something like below (Note that the location must exist and change your hdfsurl and username in the code. It might be needed to have the schema in the hdfs):
final String schemaLocation = "/home/javier/FlinkProjects/kafka-flink/src/main/java/com/grallandco/demos/avro.json";
final Schema avroSchema = new Schema.Parser().parse(new File(schemaLocation));
final MessageType parquetSchema = new AvroSchemaConverter().convert(avroSchema);
final WriteSupport writeSupport = new AvroWriteSupport(parquetSchema, avroSchema);
final Path path = new Path("/user/hduser/parquet-files/data" +
postfijoFilename + ".parquet");
Configuration configuration = new Configuration();
String hdfsUrl = "hdfs://hadoopnamenode:9000/";
String username = "hduser";
FileSystem fs= FileSystem.get(new URI(hdfsUrl), configuration);
UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(username);
ugi.doAs(new PrivilegedExceptionAction<Void>() {
public Void run() throws Exception {
AvroParquetWriter parquetWriter = new
AvroParquetWriter(path,
avroSchema,
CompressionCodecName.SNAPPY,
ParquetWriter.DEFAULT_BLOCK_SIZE,
ParquetWriter.DEFAULT_PAGE_SIZE);
final GenericRecord record = new
GenericData.Record(avroSchema);
record.put(Constantes.CAMPO_ID, datos[0]);
record.put("movie", datos[1]);
record.put("date", datos[2]);
record.put("imdb", datos[3]);
parquetWriter.write(record);
return null;
}
});
Upvotes: 1