javier_orta
javier_orta

Reputation: 477

How to save parquet file in hdfs without spark or framework?

I want to save a parquet file directly to hdfs using java.

This is the code that i used to generate parquet files and store them local, but now i want to store them in hdfs.

 final String schemaLocation = "/home/javier/FlinkProjects/kafka-flink/src/main/java/com/grallandco/demos/avro.json";
  final Schema avroSchema = new Schema.Parser().parse(new File(schemaLocation));
  final MessageType parquetSchema = new AvroSchemaConverter().convert(avroSchema);
  final WriteSupport writeSupport = new AvroWriteSupport(parquetSchema, avroSchema);
  final String parquetFile = "/home/javier/parquet-files/data" + postfijoFilename + ".parquet";
  final Path path = new Path(parquetFile);
  AvroParquetWriter parquetWriter = new AvroParquetWriter(path,
          avroSchema, CompressionCodecName.SNAPPY, ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE);
  final GenericRecord record = new GenericData.Record(avroSchema);
  record.put(Constantes.CAMPO_ID, datos[0]);
  record.put("movie", datos[1]);
  record.put("date", datos[2]);
  record.put("imdb", datos[3]);
  parquetWriter.write(record);

And i want to replace this

 final String parquetFile = "/home/javier/parquet-files/data" + postfijoFilename + ".parquet";

with a hadoop hdfs path, any idea ???

Upvotes: 2

Views: 1227

Answers (1)

Mobin Ranjbar
Mobin Ranjbar

Reputation: 1360

You can do this by something like below (Note that the location must exist and change your hdfsurl and username in the code. It might be needed to have the schema in the hdfs):

final String schemaLocation = "/home/javier/FlinkProjects/kafka-flink/src/main/java/com/grallandco/demos/avro.json";
final Schema avroSchema = new Schema.Parser().parse(new File(schemaLocation));
final MessageType parquetSchema = new AvroSchemaConverter().convert(avroSchema);
final WriteSupport writeSupport = new AvroWriteSupport(parquetSchema,  avroSchema);
final Path path = new Path("/user/hduser/parquet-files/data" + 
       postfijoFilename + ".parquet");
Configuration configuration = new Configuration();
String hdfsUrl = "hdfs://hadoopnamenode:9000/";
String username = "hduser";
FileSystem fs= FileSystem.get(new URI(hdfsUrl), configuration);
UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(username);
ugi.doAs(new PrivilegedExceptionAction<Void>() {
            public Void run() throws Exception { 
                AvroParquetWriter parquetWriter = new 
                    AvroParquetWriter(path,
                    avroSchema, 
                    CompressionCodecName.SNAPPY,                
                    ParquetWriter.DEFAULT_BLOCK_SIZE, 
                    ParquetWriter.DEFAULT_PAGE_SIZE);
                final GenericRecord record = new 
                    GenericData.Record(avroSchema);
                record.put(Constantes.CAMPO_ID, datos[0]);
                record.put("movie", datos[1]);
                record.put("date", datos[2]);
                record.put("imdb", datos[3]);
                parquetWriter.write(record);
                return null;
                }
    });

Upvotes: 1

Related Questions