Reputation: 536
so I have written an apache beam pipeline that reads a file that contains 99 other files calculates the checksum and creates a key-value pair of the file and its checksum what I need to do is write these key-value pairs to a manifest.json file I am running into some serialization problems currently and any advice and help would be amazing.
Here is my code:
public class BeamPipeline {
private static final Logger log = LoggerFactory.getLogger(BeamPipeline.class);
public static interface MyOptions extends PipelineOptions {
@Description("Input Path(with gs:// prefix)")
String getInput();
void setInput(String value);
}
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
Pipeline p = Pipeline.create(options);
JsonObject obj = new JsonObject();
File dir = new File(options.getInput());
for (File file : dir.listFiles()) {
String inputString = file.toString();
p
.apply("Match Files", FileIO.match().filepattern(inputString))
.apply("Read Files", FileIO.readMatches())
.apply(MapElements.via(new SimpleFunction<FileIO.ReadableFile, KV<String, String>>() {
public KV<String, String> apply(FileIO.ReadableFile file) {
String temp = null;
try {
temp = file.readFullyAsUTF8String();
} catch (IOException e) {
}
String sha256hex = org.apache.commons.codec.digest.DigestUtils.sha256Hex(temp);
obj.addProperty(temp, sha256hex);
String json = obj.toString();
try (FileWriter fileWriter = new FileWriter("./manifest.json")) {
fileWriter.write(json);
} catch (IOException e) {
}
return KV.of(file.getMetadata().resourceId().toString(), sha256hex);
}
}))
.apply("Print", ParDo.of(new DoFn<KV<String, String>, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {
log.info(String.format("File: %s, SHA-256 %s", c.element().getKey(), c.element().getValue()));
}
}));
}
p.run();
}
}
Here are my errors currently:
"main" java.lang.IllegalArgumentException: unable to serialize DoFnAndMainOutput{doFn=org.apache.beam.sdk.transforms.MapElements$1@50756c76, mainOutputTag=Tag<output>}
Caused by: java.io.NotSerializableException: com.google.gson.JsonObject
Upvotes: 1
Views: 1417
Reputation: 779
DoFns are serialized with all the objects accessed from the Dofn.
The JsonObject
is not serializable. They are created out of DoFn and referred in DoFn which makes DoFn non serializable.
You can create JsonObject
with in the DoFn to avoid this serialization issues.
public class BeamPipeline {
private static final Logger log = LoggerFactory.getLogger(BeamPipeline.class);
public static interface MyOptions extends PipelineOptions {
@Description("Input Path(with gs:// prefix)")
String getInput();
void setInput(String value);
}
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
Pipeline p = Pipeline.create(options);
File dir = new File(options.getInput());
for (File file : dir.listFiles()) {
String inputString = file.toString();
p
.apply("Match Files", FileIO.match().filepattern(inputString))
.apply("Read Files", FileIO.readMatches())
.apply(MapElements.via(new SimpleFunction<FileIO.ReadableFile, KV<String, String>>() {
public KV<String, String> apply(FileIO.ReadableFile file) {
String temp = null;
try {
temp = file.readFullyAsUTF8String();
} catch (IOException e) {
}
String sha256hex = org.apache.commons.codec.digest.DigestUtils.sha256Hex(temp);
JsonObject obj = new JsonObject();
obj.addProperty(temp, sha256hex);
String json = obj.toString();
try (FileWriter fileWriter = new FileWriter("./manifest.json")) {
fileWriter.write(json);
} catch (IOException e) {
}
return KV.of(file.getMetadata().resourceId().toString(), sha256hex);
}
}))
.apply("Print", ParDo.of(new DoFn<KV<String, String>, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {
log.info(String.format("File: %s, SHA-256 %s", c.element().getKey(), c.element().getValue()));
}
}));
}
p.run();
}
}
Upvotes: 3