Anton Daneyko
Anton Daneyko

Reputation: 6512

Using Python how do I validate JSON against a JSON schema in a streaming fashion, e.g., not loading the whole object in memory?

I have a large JSON that I do not want to load into memory. I would like to validate it against a JSON schema in a streaming fashion. All libraries I could find so far, only validate completely loaded JSON objects (like Pydantic or https://github.com/python-jsonschema/jsonschema). What I rather need is some way to validate it feeding the original JSON chunk by chunk, i.e, control the size of the buffer.

This could look like this:

import pydantic  # I use V2
import ijson
import pathlib

class A(pydantic.BaseModel):
    i: int
    a: list[int]
    s: str

jsonpath = pathlib.Path("some.json")
validator = MyValidator(schema=A.model_json_schema())
with jsonpath.open("rb") as file:
    for prefix, event, value in ijson.parse(file, use_float=True):
        validator.event((prefix, event, value))

print(validator.errors)

Imagine some.json file is ~50 MB large A instance with a very long array. I do not want to load the whole object into memory (this is what Pydantic would do), but I want to make sure that some.json complies with the schema of A. The validator.errors could give me a list of errors which is going to be empty in cases none where discovered.

[EDIT 2025-01-31] The term "streaming fashion" means that event is seen exactly once and there is no way to see it again. However, I am willing to accept an answer if there is a way to do the validation with multiple scans, i.e., in my example above multiple file scans are fine with me.

Upvotes: 5

Views: 361

Answers (7)

Genesis Solutions
Genesis Solutions

Reputation: 505

import pydantic
import ijson
import pathlib
from typing import Type, Any

class StreamingValidator:
    def __init__(self, model: Type[pydantic.BaseModel]):
        self.model = model
        self.errors = []

    def validate_event(self, prefix: str, event: str, value: Any):
        """
        Validate each JSON event as it arrives.
        """
        field_name = prefix.split(".")[0]  # Extract top-level field name
        
        if field_name in self.model.model_fields:
            field_info = self.model.model_fields[field_name]
            field_type = field_info.annotation

            try:
                if isinstance(field_type, type) and issubclass(field_type, list):
                    # Validate list items individually
                    pydantic.TypeAdapter(field_type.__args__[0]).validate_python(value)
                else:
                    # Validate single field values
                    pydantic.TypeAdapter(field_type).validate_python(value)
            except pydantic.ValidationError as e:
                self.errors.append(f"Error in '{field_name}': {e.errors()}")

    def get_errors(self):
        return self.errors

# Example Pydantic Model
class A(pydantic.BaseModel):
    i: int
    a: list[int]
    s: str

# Generic streaming validation
jsonpath = pathlib.Path("some.json")
validator = StreamingValidator(A)

with jsonpath.open("rb") as file:
    for prefix, event, value in ijson.parse(file, use_float=True):
        validator.validate_event(prefix, event, value)

print(validator.get_errors() if validator.get_errors() else "Validation passed")

Upvotes: 1

Genesis Solutions
Genesis Solutions

Reputation: 505

  1. Dynamically parses fields without hardcoding each attribute.
  2. Validates incrementally for large lists and nested structures.
  3. Supports any Pydantic model.

This works for any Pydantic model and does not require manually handling fields.

import pydantic
import ijson
import pathlib
from typing import Type, Any

class StreamingValidator:
    def __init__(self, model: Type[pydantic.BaseModel]):
        self.model = model
        self.partial_data = {field: [] if field_info.annotation == list else None 
                             for field, field_info in model.model_fields.items()}
        self.errors = []

    def process_event(self, prefix: str, event: str, value: Any):
        """
        Process each streaming event and store values incrementally.
        """
        field_name = prefix.split(".")[0]  # Extract root field name
        if field_name in self.partial_data:
            field_type = self.model.model_fields[field_name].annotation

            # Handle lists incrementally
            if isinstance(self.partial_data[field_name], list):
                try:
                    validated_value = pydantic.TypeAdapter(field_type).validate_python(value)
                    self.partial_data[field_name].append(validated_value)
                except pydantic.ValidationError as e:
                    self.errors.append(f"Error in '{field_name}': {e.errors()}")
            else:
                self.partial_data[field_name] = value

    def validate(self):
        """
        Validate the final accumulated data against the full Pydantic model.
        """
        try:
            self.model.model_validate(self.partial_data, strict=True)
        except pydantic.ValidationError as e:
            self.errors.append(e.errors())

    def get_errors(self):
        return self.errors


# Example Pydantic Model
class A(pydantic.BaseModel):
    i: int
    a: list[int]
    s: str

# Generic streaming validation
jsonpath = pathlib.Path("some.json")
validator = StreamingValidator(A)

with jsonpath.open("rb") as file:
    for prefix, event, value in ijson.parse(file, use_float=True):
        validator.process_event(prefix, event, value)

validator.validate()
print(validator.get_errors() if validator.get_errors() else "Validation passed")

Why This Works for Any Model

  • Automatic Parsing: Extracts field names dynamically from the Pydantic model.
  • Handles Large Lists Efficiently: Validates list elements incrementally.
  • Generic for Any Model: No need to manually parse fields for each model.
  • Strict Validation: Uses model_validate() for full schema enforcement.

Upvotes: 1

Vegard
Vegard

Reputation: 4952

I think we're locked in to a semi-manual approach to validation.

Handling lists are something of a nightmare, but for some basic test data the below works (except the dict handler, I didn't test that due to time constraints). Event handling looks very simple if you don't account for lists, and you'll see below that 4/5 of the code is there to account for how ijson emits events for lists.

# validator.py

import ijson
from typing import Type, Any, Set, get_type_hints
from pydantic import BaseModel

class StreamingJsonValidator:
    def __init__(self, model_class: Type[BaseModel]):
        """
        Initialize with a Pydantic model (not instance)
        """
        self.model_class = model_class
        self.field_types = get_type_hints(model_class)
        self.required_fields = {
            field_name for field_name, field in model_class.model_fields.items()
            if field.is_required
        }

    def _validate_type(self, value: Any, expected_type: Type) -> bool:
        """
        Validate a value against the expected type
        """
        # Basic types
        if expected_type in (str, int, float, bool):
            return isinstance(value, expected_type)

        # Lists
        if hasattr(expected_type, "__origin__") and expected_type.__origin__ is list:
            if not isinstance(value, list):
                return False
            item_type = expected_type.__args__[0]
            return all(self._validate_type(item, item_type) for item in value)

        # Dictionaries
        if hasattr(expected_type, "__origin__") and expected_type.__origin__ is dict:
            if not isinstance(value, dict):
                return False
            key_type, value_type = expected_type.__args__
            return all(
                self._validate_type(k, key_type) and self._validate_type(v, value_type)
                for k, v in value.items()
            )

        return False

    def validate_file(self, file_path: str) -> tuple[bool, list[str]]:
        """
        Validate a JSON file
        """
        seen_fields: Set[str] = set()
        errors: list[str] = []
        current_field = None
        current_array = []
        in_array = False

        try:
            with open(file_path, 'rb') as file:
                parser = ijson.parse(file)

                for prefix, event, value in parser:
                    # New field
                    if event == 'map_key':

                        # Track list progress
                        if in_array and current_field:
                            expected_type = self.field_types[current_field]
                            if not self._validate_type(current_array, expected_type):
                                errors.append(f"Invalid type for {current_field}: expected {expected_type}, got array with invalid items")
                            seen_fields.add(current_field)
                            current_array = []
                            in_array = False

                        current_field = value
                        continue

                    # Detect start of lists
                    if current_field and event == 'start_array':
                        in_array = True
                        current_array = []
                        continue

                    if current_field and in_array and event not in ('start_array', 'end_array'):
                        current_array.append(value)
                        continue

                    # Close list
                    if current_field and event == 'end_array':
                        if current_field not in self.field_types:
                            errors.append(f"Unknown field: {current_field}")
                        elif current_field in seen_fields:
                            errors.append(f"Duplicate field: {current_field}")
                        else:
                            expected_type = self.field_types[current_field]
                            if not self._validate_type(current_array, expected_type):
                                errors.append(f"Invalid type for {current_field}: expected {expected_type}, got array with invalid items")
                            seen_fields.add(current_field)
                        current_array = []
                        in_array = False
                        current_field = None
                        continue

                    # Detect if we're looking at a complete key-value pair - necessary for list (and possibly dict) handling
                    if current_field and not in_array and event in ('number', 'string', 'boolean', 'null'):
                        if current_field not in self.field_types:
                            errors.append(f"Unknown field: {current_field}")
                        elif current_field in seen_fields:
                            errors.append(f"Duplicate field: {current_field}")
                        else:
                            expected_type = self.field_types[current_field]
                            if not self._validate_type(value, expected_type):
                                errors.append(f"Invalid type for {current_field}: expected {expected_type}, got {type(value)}")
                            seen_fields.add(current_field)
                        current_field = None

                missing_fields = self.required_fields - seen_fields
                if missing_fields:
                    errors.append(f"Missing required fields: {missing_fields}")

        except Exception as e:
            errors.append(f"Error parsing JSON: {type(e).__class__.__name__} - {str(e)}")

        if len(errors) != 0:
            is_valid = False
        else:
            is_valid = True

        return is_valid, errors
# test.py

from pydantic import BaseModel
from .validator import StreamingJsonValidator

class A(BaseModel):
    i: int
    a: list[int] = []
    s: str

validator = StreamingJsonValidator(A)

# valid case
is_valid, errors = validator.validate_file('valid.json')

print("Expecting a valid result")
if is_valid:
    print("JSON is valid!")
else:
    print("Validation errors:")
    for error in errors:
        print(f"- {error}")

# invalid case
is_valid, errors = validator.validate_file('invalid.json')

print("Expecting an invalid result")
if is_valid:
    print("JSON is valid!")
else:
    print("Validation errors:")
    for error in errors:
        print(f"- {error}")
# valid.json

{
    "i": 42,
    "a": [1, 2, 3],
    "s": "hello"
}
# invalid.json

{
    "i": 42,
    "a": [1, "2", 3],
    "s": "hello"
}

Upvotes: 1

Genesis Solutions
Genesis Solutions

Reputation: 505

You're on the right track using ijson for streaming, but the issue is that pydantic expects the entire object at once, while ijson parses in a streaming manner. To validate incrementally without loading the entire JSON into memory, you can:

  1. Use a custom validator that incrementally checks each field as it arrives instead of waiting for the full object.
  2. Validate the a list items one by one instead of collecting them all in memory.

Instead of passing the entire JSON object to Pydantic at once, parse the JSON step-by-step and validate in parts.

import pydantic  # Pydantic V2
import ijson
import pathlib

class A(pydantic.BaseModel):
    i: int
    a: list[int] = []
    s: str

jsonpath = pathlib.Path("some.json")

errors = []
partial_data = {"i": None, "a": [], "s": None}

with jsonpath.open("rb") as file:
    for prefix, event, value in ijson.parse(file, use_float=True):
        if prefix == "i" and event == "number":
            partial_data["i"] = value
        elif prefix == "s" and event == "string":
            partial_data["s"] = value
        elif prefix.startswith("a.item") and event in {"number", "integer"}:
            try:
                # Validate individual array elements as they arrive
                int_value = int(value)
                A.model_validate({"a": [int_value]}, strict=True)
                partial_data["a"].append(int_value)
            except pydantic.ValidationError as e:
                errors.append(f"Error in 'a': {e.errors()}")

try:
    A.model_validate(partial_data, strict=True)
except pydantic.ValidationError as e:
    errors.append(e.errors())

print(errors if errors else "Validation passed")

This is the JSON Schema of some.json.

{
    "type": "object",
    "properties": {
        "id": {"type": "integer"},
        "name": {"type": "string"},
        "data": {
            "type": "array",
            "items": {"type": "integer"}
        }
    },
    "required": ["id", "name", "data"]
}

Upvotes: 3

mohammed_ayaz
mohammed_ayaz

Reputation: 679

Use pandas to read the json file and you can chunksize param in the pd.read_json(). This was you are only loading the few records (in chunks) into memory.

import pandas as pd
    
# Read the JSON file in chunks
for chunk in pd.read_json(input_file, chunksize=chunk_size, lines=True):
    chunk_results = await process_chunk(chunk)

enter image description here

Link to the doc: read_json doc

Upvotes: 0

Suhani Bhatia
Suhani Bhatia

Reputation: 97

Using ijson for Incremental Parsing: You can process the file in a memory-efficient way by using the ijson package, which enables iterative parsing of JSON data. Although schema validation is not integrated into iJSON, you can add your own validation logic while parsing. Each parsed element must be explicitly compared to the expected schema using this method.

Upvotes: 0

ypnos
ypnos

Reputation: 52387

Pydantic comes with an experimental feature called "partial validation" that is designed for stream inputs.

See https://docs.pydantic.dev/latest/concepts/experimental/#partial-validation

You can create a Pydantic model from an existing JSON schema using datamodel-code-generator: https://koxudaxi.github.io/datamodel-code-generator/

Open issues I see right now with this method:

  1. Support is limited to specific types and the root must be a TypeAdapter instead of a BaseModel
  2. Unclear how to proceed after the inital validation step, with consecutive incoming data

Upvotes: 1

Related Questions