Chelsar
Chelsar

Reputation: 383

AWS Athena export array of structs to JSON

I've got an Athena table where some fields have a fairly complex nested format. The backing records in S3 are JSON. Along these lines (but we have several more levels of nesting):

CREATE EXTERNAL TABLE IF NOT EXISTS test (
  timestamp double,
  stats array<struct<time:double, mean:double, var:double>>,
  dets array<struct<coords: array<double>, header:struct<frame:int, 
    seq:int, name:string>>>,
  pos struct<x:double, y:double, theta:double>
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES ('ignore.malformed.json'='true')
LOCATION 's3://test-bucket/test-folder/'

Now we need to be able to query the data and import the results into Python for analysis. Because of security restrictions I can't connect directly to Athena; I need to be able to give someone the query and then they will give me the CSV results.

If we just do a straight select * we get back the struct/array columns in a format that isn't quite JSON. Here's a sample input file entry:

{"timestamp":1520640777.666096,"stats":[{"time":15,"mean":45.23,"var":0.31},{"time":19,"mean":17.315,"var":2.612}],"dets":[{"coords":[2.4,1.7,0.3], "header":{"frame":1,"seq":1,"name":"hello"}}],"pos": {"x":5,"y":1.4,"theta":0.04}}

And example output:

select * from test

"timestamp","stats","dets","pos"
"1.520640777666096E9","[{time=15.0, mean=45.23, var=0.31}, {time=19.0, mean=17.315, var=2.612}]","[{coords=[2.4, 1.7, 0.3], header={frame=1, seq=1, name=hello}}]","{x=5.0, y=1.4, theta=0.04}"

I was hoping to get those nested fields exported in a more convenient format - getting them in JSON would be great.

Unfortunately it seems that cast to JSON only works for maps, not structs, because it just flattens everything into arrays:

SELECT timestamp, cast(stats as JSON) as stats, cast(dets as JSON) as dets, cast(pos as JSON) as pos FROM "sampledb"."test"

"timestamp","stats","dets","pos"
"1.520640777666096E9","[[15.0,45.23,0.31],[19.0,17.315,2.612]]","[[[2.4,1.7,0.3],[1,1,""hello""]]]","[5.0,1.4,0.04]"

Is there a good way to convert to JSON (or another easy-to-import format) or should I just go ahead and do a custom parsing function?

Upvotes: 23

Views: 12350

Answers (6)

Miller Horvath
Miller Horvath

Reputation: 23

I also had to adjust the @tarun code, because I had more complex data and nested structures. Here is the solution I've got, I hope it helps:

import re
import json
import numpy as np

pattern1 = re.compile(r'(?<=[{,\[])\s*([^{}\[\],"=]+)=')
pattern2 = re.compile(r':([^{}\[\],"]+|()(?![{\[]))')
pattern3 = re.compile(r'"null"')

def convert_metadata_to_json(value):
    if type(value) is str:
        value = pattern1.sub('"\\1":', value)
        value = pattern2.sub(': "\\1"', value)
        value = pattern3.sub('null', value)
    elif np.isnan(value):
        return None
    
    return json.loads(value)

df = pd.read_csv('test.csv')

df['metadata_json'] = df.metadata.apply(convert_metadata_to_json)

Upvotes: 0

wyattwalter
wyattwalter

Reputation: 1

I worked around this by creating a second table using the same S3 location, but changed the field's data type to string. The resulting CSV then had the string that Athena pulled from the object in the JSON file and I was able to parse the result.

Upvotes: 0

zipate
zipate

Reputation: 166

I used a simple approach to get around the struct -> json Athena limitation. I created a second table where the json columns were saved as raw strings. Using presto json and array functions I was able to query the data and return the valid json string to my program:

--Array transform functions too
select 
  json_extract_scalar(dd, '$.timestamp') as timestamp,
  transform(cast(json_extract(json_parse(dd), '$.stats') as ARRAY<JSON>), x -> json_extract_scalar(x, '$.time')) as arr_stats_time,
  transform(cast(json_extract(json_parse(dd), '$.stats') as ARRAY<JSON>), x -> json_extract_scalar(x, '$.mean')) as arr_stats_mean,
  transform(cast(json_extract(json_parse(dd), '$.stats') as ARRAY<JSON>), x -> json_extract_scalar(x, '$.var')) as arr_stats_var
from 
(select '{"timestamp":1520640777.666096,"stats":[{"time":15,"mean":45.23,"var":0.31},{"time":19,"mean":17.315,"var":2.612}],"dets":[{"coords":[2.4,1.7,0.3], "header":{"frame":1,"seq":1,"name":"hello"}}],"pos": {"x":5,"y":1.4,"theta":0.04}}' as dd);

I know the query will take longer to execute but there are ways to optimize.

Upvotes: 0

Ranjithkumar MV
Ranjithkumar MV

Reputation: 824

This method is not by modifying the Query.

Its by Post Processing For Javascript/Nodejs we can use the npm package athena-struct-parser.

Detailed Answer with Example

https://stackoverflow.com/a/67899845/6662952

Reference - https://www.npmjs.com/package/athena-struct-parser

Upvotes: 0

XAnguera
XAnguera

Reputation: 1267

The python code from @tarun almost got me there, but I had to modify it in several ways due to my data. In particular, I have:

  • json structures saved in Athena as strings
  • Strings that contain multiple words, and therefore need to be in between double quotes. Some of them contain "[]" and "{}" symbols.

Here is the code that worked for me, hopefully will be useful for others:

#!/usr/bin/env python
import io
import re, sys

pattern1 = re.compile(r'(?<={)([a-z]+)=', re.I)
pattern2 = re.compile(r':([a-z][^,{}. [\]]+)', re.I)
pattern3 = re.compile(r'\\"', re.I)

with io.open(sys.argv[1]) as f:
    headers = list(map(lambda f: f.strip(), f.readline().split(",")))
    print(headers)
    for line in f.readlines():

        orig_line = line
        #save the double quote cases, which mean there is a string with quotes inside
        line = re.sub('""', "#", orig_line)
        data = []
        for i, l in enumerate(line.split('","')):
            item = re.sub('^"|"$', "", l.rstrip())
            if (item[0] == "{" and item[-1] == "}") or (item[0] == "[" and item[-1] == "]"):
                data.append(headers[i] + ":" + item)
            else: #we have a string
                data.append(headers[i] + ": \"" + item + "\"")

        line = "{" + ','.join(data) + "}"
        line = pattern1.sub(r'"\1":', line)
        line = pattern2.sub(r':"\1"', line)

        #restate the double quotes to single ones, once inside the json
        line = re.sub("#", '"', line)

        print(line)

Upvotes: 1

Tarun Lalwani
Tarun Lalwani

Reputation: 146510

I have skimmed through all the documentation and unfortunately there seems to be no way to do this as of now. The only possible workaround is

converting a struct to a json when querying athena

SELECT
  my_field,
  my_field.a,
  my_field.b,
  my_field.c.d,
  my_field.c.e
FROM 
  my_table

Or I would convert the data to json using post processing. Below script shows how

#!/usr/bin/env python
import io
import re

pattern1 = re.compile(r'(?<={)([a-z]+)=', re.I)
pattern2 = re.compile(r':([a-z][^,{}. [\]]+)', re.I)
pattern3 = re.compile(r'\\"', re.I)

with io.open("test.csv") as f:
    headers = list(map(lambda f: f.strip(), f.readline().split(",")))
    for line in f.readlines():
        orig_line = line
        data = []
        for i, l in enumerate(line.split('","')):
            data.append(headers[i] + ":" + re.sub('^"|"$', "", l))

        line = "{" + ','.join(data) + "}"
        line = pattern1.sub(r'"\1":', line)
        line = pattern2.sub(r':"\1"', line)
        print(line)

The output on your input data is

{"timestamp":1.520640777666096E9,"stats":[{"time":15.0, "mean":45.23, "var":0.31}, {"time":19.0, "mean":17.315, "var":2.612}],"dets":[{"coords":[2.4, 1.7, 0.3], "header":{"frame":1, "seq":1, "name":"hello"}}],"pos":{"x":5.0, "y":1.4, "theta":0.04}
}

Which is a valid JSON

Converted JSON

Upvotes: 9

Related Questions