Clay
Clay

Reputation: 2736

Use polars to read many small json files from S3 in parallel

I have seen the following polars documentation:
https://pola-rs.github.io/polars-book/user-guide/multiple_files/intro.html#reading-and-processing-in-parallel

Is there a way to create a query plan to read many small json files from an S3 bucket?

This would be similar to the way Spark can read many small json files or csv files from a single S3 prefix (path) with

spark.read.format("json").load("s3a://my-bucket/path/to/smallfiles/*.json")

Upvotes: 7

Views: 1456

Answers (1)

John Collins
John Collins

Reputation: 2961

Parallel reading of many files from S3 with Polars

Asynchronous JSON file retrieval and concatenation from Amazon S3 using Python polars, asyncio, and aiobotocore

Here is an example reading from a public AWS S3 data bucket from aws.amazon.com/opendata¹:

import io

import asyncio
import polars as pl

from aiobotocore.session import get_session
from botocore import UNSIGNED # NOTE: See Credentials* below.
from botocore.config import Config


S3_REGION = "eu-west-2"
S3_BUCKET = "pansurg-curation-raw-open-data"
S3_PREFIX = "cord19/AWSCORD19/upload_date=1591730933/"


async def read_json_from_s3(bucket, prefix):
    # Initialize S3 client
    session = get_session()
    async with session.create_client(
        "s3", region_name=S3_REGION, config=Config(signature_version=UNSIGNED)
    ) as client:
        file_limit = int(1e2) # NOTE: For demo purposes only.² (See endnotes)
        file_count = 0
        paginator = client.get_paginator("list_objects_v2")
        async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
            contents = result.get("Contents", [])
            for obj in contents:
                key = obj["Key"]
                if key.endswith(".json"):
                    file_count += 1
                    if file_count > file_limit:
                        break
                    response = await client.get_object(Bucket=bucket, Key=key)
                    data = await response["Body"].read()
                    df = pl.read_json(io.StringIO(data.decode("utf-8")))
                    yield df


async def retrieve_json():
    bucket = S3_BUCKET
    prefix = S3_PREFIX

    # Read JSON files in parallel
    dfs = [df async for df in read_json_from_s3(bucket, prefix)]

    combined_df = pl.concat(dfs)

    return combined_df


df = asyncio.run(retrieve_json())

where

print(df)
df.glimpse(max_items_per_column=1)

then gives:

shape: (100, 2)
┌───────────────────────────────────┬───────────────────────────────────┐
│ Attributes                        ┆ Title                             │
│ ---                               ┆ ---                               │
│ struct[4]                         ┆ str                               │
╞═══════════════════════════════════╪═══════════════════════════════════╡
│ {"cord19","txt","2020-06-08/docu… ┆ Absence of surface expression of… │
│ {"cord19","txt","2020-06-08/docu… ┆ Rethinking high-risk groups in C… │
│ {"cord19","txt","2020-06-08/docu… ┆ Plasma inflammatory cytokines an… │
│ {"cord19","txt","2020-06-08/docu… ┆ Seasonal influenza risk in hospi… │
│ {"cord19","txt","2020-06-08/docu… ┆ Disaster Perceptions              │
│ …                                 ┆ …                                 │
│ {"cord19","txt","2020-06-08/docu… ┆ A Global Survey on the Impact of… │
│ {"cord19","txt","2020-06-08/docu… ┆ Liver Chemistries in Patients wi… │
│ {"cord19","txt","2020-06-08/docu… ┆ Simulations for epidemiology and… │
│ {"cord19","txt","2020-06-08/docu… ┆ Expression and purification of c… │
│ {"cord19","txt","2020-06-08/docu… ┆ Transmissible gastroenteritis vi… │
└───────────────────────────────────┴───────────────────────────────────┘
Rows: 100
Columns: 2
$ Attributes <struct[4]> {'_category': 'cord19', '_file_type': 'txt', 'source_file': '2020-06-08/document_parses/pmc_json/PMC7127496.xml.json', 'publisher_url': 'https://api.elsevier.com/content/article/pii/S0378113506004792;https://www.ncbi.nlm.nih.gov/pubmed/17188823/;https://www.sciencedirect.com/science/article/pii/S0378113506004792'}
$ Title            <str> 'Absence of surface expression of feline infectious peritonitis virus (FIPV) antigens on infected cells isolated from cats with FIP'

Implementation Notes


¹ Data from The REaltime DAta Synthesis and Analysis (REDASA) COVID-19 snapshot was chosen as an example dataset because it met the criteria of containing many small JSON files. ² In the example above, a file limit of 100 was used during development and debug of the code.

Upvotes: 3

Related Questions