Reputation: 45
In the following CSV, I need to append new row values for it.
ID | date | balance |
---|---|---|
01 | 31/01/2021 | 100 |
01 | 28/02/2021 | 200 |
01 | 31/03/2021 | 200 |
01 | 30/04/2021 | 200 |
01 | 31/05/2021 | 500 |
01 | 30/06/2021 | 600 |
Expected output:
ID | date | balance |
---|---|---|
01 | 31/01/2021 | 100 |
01 | 28/02/2021 | 200 |
01 | 31/03/2021 | 200 |
01 | 30/04/2021 | 200 |
01 | 31/05/2021 | 500 |
01 | 30/06/2021 | 600 |
01 | 30/07/2021 | 999 |
Java code:
public static void main(String[] args) throws IOException {
final File schemaFile = new File("src/main/resources/addRow/schema_transform.avsc");
File csvFile = new File("src/main/resources/addRow/CustomerRequest.csv");
Schema schema = new Schema.Parser().parse(schemaFile);
Pipeline pipeline = Pipeline.create();
// Reading schema
org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);
final PCollectionTuple tuples = pipeline
// Reading csv input
.apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))
// Reading files that matches conditions //PRashanth needs to be looked at
.apply("2", FileIO.readMatches())
// Reading schema and validating with schema and converts to row and returns
// valid and invalid list
.apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
TupleTagList.of(invalidTag())));
// Fetching only valid rows
final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));
RowAddition rowAddition = new RowAddition();
final PCollection<Row> newlyAddedRows = rows.apply(ParDo.of(rowAddition)).setCoder(RowCoder.of(beamSchema));
;
How to combine these two PCollection objects?
PCollection<String> pOutput = newlyAddedRows.apply(ParDo.of(new RowToString()));
pOutput.apply(TextIO.write().to("src/main/resources/addRow/rowOutput").withNumShards(1).withSuffix(".csv"));
pipeline.run().waitUntilFinish();
System.out.println("The end");
}
}
Logic for adding rows
class RowAddition extends DoFn<Row, Row> {
private static final long serialVersionUID = -8093837716944809689L;
@ProcessElement
public void processElement(ProcessContext context) {
org.apache.beam.sdk.schemas.Schema beamSchema=null;
try {
beamSchema = AvroUtils.toBeamSchema(new Schema.Parser().parse(new File("src/main/resources/addRow/schema_transform.avsc")));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Row row = context.element();
Row newRow = row.withSchema(beamSchema).addValues("01", "30/7/2021", 999.0).build();
context.output(newRow);
}
}
I have referring this link
Upvotes: 0
Views: 1066
Reputation: 5104
You're looking for the Flatten transform. This takes any number of existing PCollections and produces a new PCollection with the union of their elements. For completely new elements, you could use Create or use another PTransform to compute the new elements based on the old ones.
Upvotes: 2