Reputation: 2736
I have seen the following polars documentation:
https://pola-rs.github.io/polars-book/user-guide/multiple_files/intro.html#reading-and-processing-in-parallel
Is there a way to create a query plan to read many small json files from an S3 bucket?
This would be similar to the way Spark can read many small json files or csv files from a single S3 prefix (path) with
spark.read.format("json").load("s3a://my-bucket/path/to/smallfiles/*.json")
Upvotes: 7
Views: 1456
Reputation: 2961
polars
, asyncio
, and aiobotocore
Here is an example reading from a public AWS S3 data bucket from aws.amazon.com/opendata¹:
import io
import asyncio
import polars as pl
from aiobotocore.session import get_session
from botocore import UNSIGNED # NOTE: See Credentials* below.
from botocore.config import Config
S3_REGION = "eu-west-2"
S3_BUCKET = "pansurg-curation-raw-open-data"
S3_PREFIX = "cord19/AWSCORD19/upload_date=1591730933/"
async def read_json_from_s3(bucket, prefix):
# Initialize S3 client
session = get_session()
async with session.create_client(
"s3", region_name=S3_REGION, config=Config(signature_version=UNSIGNED)
) as client:
file_limit = int(1e2) # NOTE: For demo purposes only.² (See endnotes)
file_count = 0
paginator = client.get_paginator("list_objects_v2")
async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
contents = result.get("Contents", [])
for obj in contents:
key = obj["Key"]
if key.endswith(".json"):
file_count += 1
if file_count > file_limit:
break
response = await client.get_object(Bucket=bucket, Key=key)
data = await response["Body"].read()
df = pl.read_json(io.StringIO(data.decode("utf-8")))
yield df
async def retrieve_json():
bucket = S3_BUCKET
prefix = S3_PREFIX
# Read JSON files in parallel
dfs = [df async for df in read_json_from_s3(bucket, prefix)]
combined_df = pl.concat(dfs)
return combined_df
df = asyncio.run(retrieve_json())
where
print(df)
df.glimpse(max_items_per_column=1)
then gives:
shape: (100, 2)
┌───────────────────────────────────┬───────────────────────────────────┐
│ Attributes ┆ Title │
│ --- ┆ --- │
│ struct[4] ┆ str │
╞═══════════════════════════════════╪═══════════════════════════════════╡
│ {"cord19","txt","2020-06-08/docu… ┆ Absence of surface expression of… │
│ {"cord19","txt","2020-06-08/docu… ┆ Rethinking high-risk groups in C… │
│ {"cord19","txt","2020-06-08/docu… ┆ Plasma inflammatory cytokines an… │
│ {"cord19","txt","2020-06-08/docu… ┆ Seasonal influenza risk in hospi… │
│ {"cord19","txt","2020-06-08/docu… ┆ Disaster Perceptions │
│ … ┆ … │
│ {"cord19","txt","2020-06-08/docu… ┆ A Global Survey on the Impact of… │
│ {"cord19","txt","2020-06-08/docu… ┆ Liver Chemistries in Patients wi… │
│ {"cord19","txt","2020-06-08/docu… ┆ Simulations for epidemiology and… │
│ {"cord19","txt","2020-06-08/docu… ┆ Expression and purification of c… │
│ {"cord19","txt","2020-06-08/docu… ┆ Transmissible gastroenteritis vi… │
└───────────────────────────────────┴───────────────────────────────────┘
Rows: 100
Columns: 2
$ Attributes <struct[4]> {'_category': 'cord19', '_file_type': 'txt', 'source_file': '2020-06-08/document_parses/pmc_json/PMC7127496.xml.json', 'publisher_url': 'https://api.elsevier.com/content/article/pii/S0378113506004792;https://www.ncbi.nlm.nih.gov/pubmed/17188823/;https://www.sciencedirect.com/science/article/pii/S0378113506004792'}
$ Title <str> 'Absence of surface expression of feline infectious peritonitis virus (FIPV) antigens on infected cells isolated from cats with FIP'
boto3
equivalent of the AWS CLI "aws s3 [command] --no-sign-request
" is thus used.¹ Data from The REaltime DAta Synthesis and Analysis (REDASA) COVID-19 snapshot was chosen as an example dataset because it met the criteria of containing many small JSON files. ² In the example above, a file limit of 100 was used during development and debug of the code.
Upvotes: 3