Reputation: 1868
I am trying to read and parse JSON file in Apache Beam code.
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
PCollection<String> lines = p.apply("ReadMyFile", TextIO.read().from("/Users/xyz/eclipse-workspace/beam-project/myfirst.json"));
System.out.println("lines: " + lines);
Below is the sample JSON that I need to parse testdata
from this:
myfirst.json
{
“testdata":{
“siteOwner”:”xxx”,
“siteInfo”:{
“siteID”:”id_member",
"siteplatform”:”web”,
"siteType”:”soap”,
"siteURL”:”www”
}
}
}
Could someone guide how to parse testdata
and get content from the above JSON file and then I need to stream the data using Beam?
Upvotes: 8
Views: 4456
Reputation: 5591
Yes, modern JSON libraries will let you parse a completely arbitrary stream of JSON and pseudo-JSON into a stream of objects, without having to load the whole file into memory.
There is no particular need to stuff your objects on a single line. Indeed, when designing software to process big data, it is good design practise to avoid reserving massive amounts of memory for batch processing your data, when on-demand streaming using mere kilobytes of memory would be possible.
Take a look at the short Baeldung tutorial: http://www.baeldung.com/jackson-streaming-api
I'll include the central parts of the Baeldung article code here, as it's good practise in case a site goes down:
while (jParser.nextToken() != JsonToken.END_OBJECT) {
String fieldname = jParser.getCurrentName();
if ("name".equals(fieldname)) {
jParser.nextToken();
parsedName = jParser.getText();
}
if ("age".equals(fieldname)) {
jParser.nextToken();
parsedAge = jParser.getIntValue();
}
if ("address".equals(fieldname)) {
jParser.nextToken();
while (jParser.nextToken() != JsonToken.END_ARRAY) {
addresses.add(jParser.getText());
}
}
}
In this case, the parser begins from a object start token, and proceeds to parse that object. In your case, you would want to keep looping until your file is done, so after you'd bailed out of this while loop, you'd keep moving forward until you find a JsonToken.START_OBJECT
, then create a new object, go through this parsing routine, and in the end, hand off the object to Apache Beam.
Upvotes: 0
Reputation: 6669
First of all, I don't think that it is possible (or at least common) to process "pretty-printed" JSON. Instead JSON data usually ingested from newline-delimited JSON, so your input file should look like following:
{"testdata":{"siteOwner":"xxx","siteInfo":{"siteID":"id_member","siteplatform":"web","siteType":"soap","siteURL":"www,}}}
{"testdata":{"siteOwner":"yyy","siteInfo":{"siteID":"id_member2","siteplatform":"web","siteType":"soap","siteURL":"www,}}}
After that, with your code in lines
you have "a stream of lines". Next, you can map
this "stream of lines" into "stream of JSONs", by applying parse-function in ParDo
:
static class ParseJsonFn extends DoFn<String, Json> {
@ProcessElement
public void processElement(ProcessContext c) {
// element here is your line, you can whatever you want, parse, print, etc
// this function will be simply applied to all elements in your stream
c.output(parseJson(c.element()))
}
}
PCollection<Json> jsons = lines.apply(ParDo.of(new ParseJsonFn())) // now you have a "stream of JSONs"
Upvotes: 1