Reputation: 6512
I have a large JSON that I do not want to load into memory. I would like to validate it against a JSON schema in a streaming fashion. All libraries I could find so far, only validate completely loaded JSON objects (like Pydantic or https://github.com/python-jsonschema/jsonschema). What I rather need is some way to validate it feeding the original JSON chunk by chunk, i.e, control the size of the buffer.
This could look like this:
import pydantic # I use V2
import ijson
import pathlib
class A(pydantic.BaseModel):
i: int
a: list[int]
s: str
jsonpath = pathlib.Path("some.json")
validator = MyValidator(schema=A.model_json_schema())
with jsonpath.open("rb") as file:
for prefix, event, value in ijson.parse(file, use_float=True):
validator.event((prefix, event, value))
print(validator.errors)
Imagine some.json
file is ~50 MB large A
instance with a very long array. I do not want to load the whole object into memory (this is what Pydantic would do), but I want to make sure that some.json
complies with the schema of A
. The validator.errors
could give me a list of errors which is going to be empty in cases none where discovered.
[EDIT 2025-01-31] The term "streaming fashion" means that event is seen exactly once and there is no way to see it again. However, I am willing to accept an answer if there is a way to do the validation with multiple scans, i.e., in my example above multiple file scans are fine with me.
Upvotes: 5
Views: 361
Reputation: 505
import pydantic
import ijson
import pathlib
from typing import Type, Any
class StreamingValidator:
def __init__(self, model: Type[pydantic.BaseModel]):
self.model = model
self.errors = []
def validate_event(self, prefix: str, event: str, value: Any):
"""
Validate each JSON event as it arrives.
"""
field_name = prefix.split(".")[0] # Extract top-level field name
if field_name in self.model.model_fields:
field_info = self.model.model_fields[field_name]
field_type = field_info.annotation
try:
if isinstance(field_type, type) and issubclass(field_type, list):
# Validate list items individually
pydantic.TypeAdapter(field_type.__args__[0]).validate_python(value)
else:
# Validate single field values
pydantic.TypeAdapter(field_type).validate_python(value)
except pydantic.ValidationError as e:
self.errors.append(f"Error in '{field_name}': {e.errors()}")
def get_errors(self):
return self.errors
# Example Pydantic Model
class A(pydantic.BaseModel):
i: int
a: list[int]
s: str
# Generic streaming validation
jsonpath = pathlib.Path("some.json")
validator = StreamingValidator(A)
with jsonpath.open("rb") as file:
for prefix, event, value in ijson.parse(file, use_float=True):
validator.validate_event(prefix, event, value)
print(validator.get_errors() if validator.get_errors() else "Validation passed")
Upvotes: 1
Reputation: 505
This works for any Pydantic model and does not require manually handling fields.
import pydantic
import ijson
import pathlib
from typing import Type, Any
class StreamingValidator:
def __init__(self, model: Type[pydantic.BaseModel]):
self.model = model
self.partial_data = {field: [] if field_info.annotation == list else None
for field, field_info in model.model_fields.items()}
self.errors = []
def process_event(self, prefix: str, event: str, value: Any):
"""
Process each streaming event and store values incrementally.
"""
field_name = prefix.split(".")[0] # Extract root field name
if field_name in self.partial_data:
field_type = self.model.model_fields[field_name].annotation
# Handle lists incrementally
if isinstance(self.partial_data[field_name], list):
try:
validated_value = pydantic.TypeAdapter(field_type).validate_python(value)
self.partial_data[field_name].append(validated_value)
except pydantic.ValidationError as e:
self.errors.append(f"Error in '{field_name}': {e.errors()}")
else:
self.partial_data[field_name] = value
def validate(self):
"""
Validate the final accumulated data against the full Pydantic model.
"""
try:
self.model.model_validate(self.partial_data, strict=True)
except pydantic.ValidationError as e:
self.errors.append(e.errors())
def get_errors(self):
return self.errors
# Example Pydantic Model
class A(pydantic.BaseModel):
i: int
a: list[int]
s: str
# Generic streaming validation
jsonpath = pathlib.Path("some.json")
validator = StreamingValidator(A)
with jsonpath.open("rb") as file:
for prefix, event, value in ijson.parse(file, use_float=True):
validator.process_event(prefix, event, value)
validator.validate()
print(validator.get_errors() if validator.get_errors() else "Validation passed")
Why This Works for Any Model
Upvotes: 1
Reputation: 4952
I think we're locked in to a semi-manual approach to validation.
Handling lists are something of a nightmare, but for some basic test data the below works (except the dict
handler, I didn't test that due to time constraints). Event handling looks very simple if you don't account for lists, and you'll see below that 4/5 of the code is there to account for how ijson
emits events for lists.
# validator.py
import ijson
from typing import Type, Any, Set, get_type_hints
from pydantic import BaseModel
class StreamingJsonValidator:
def __init__(self, model_class: Type[BaseModel]):
"""
Initialize with a Pydantic model (not instance)
"""
self.model_class = model_class
self.field_types = get_type_hints(model_class)
self.required_fields = {
field_name for field_name, field in model_class.model_fields.items()
if field.is_required
}
def _validate_type(self, value: Any, expected_type: Type) -> bool:
"""
Validate a value against the expected type
"""
# Basic types
if expected_type in (str, int, float, bool):
return isinstance(value, expected_type)
# Lists
if hasattr(expected_type, "__origin__") and expected_type.__origin__ is list:
if not isinstance(value, list):
return False
item_type = expected_type.__args__[0]
return all(self._validate_type(item, item_type) for item in value)
# Dictionaries
if hasattr(expected_type, "__origin__") and expected_type.__origin__ is dict:
if not isinstance(value, dict):
return False
key_type, value_type = expected_type.__args__
return all(
self._validate_type(k, key_type) and self._validate_type(v, value_type)
for k, v in value.items()
)
return False
def validate_file(self, file_path: str) -> tuple[bool, list[str]]:
"""
Validate a JSON file
"""
seen_fields: Set[str] = set()
errors: list[str] = []
current_field = None
current_array = []
in_array = False
try:
with open(file_path, 'rb') as file:
parser = ijson.parse(file)
for prefix, event, value in parser:
# New field
if event == 'map_key':
# Track list progress
if in_array and current_field:
expected_type = self.field_types[current_field]
if not self._validate_type(current_array, expected_type):
errors.append(f"Invalid type for {current_field}: expected {expected_type}, got array with invalid items")
seen_fields.add(current_field)
current_array = []
in_array = False
current_field = value
continue
# Detect start of lists
if current_field and event == 'start_array':
in_array = True
current_array = []
continue
if current_field and in_array and event not in ('start_array', 'end_array'):
current_array.append(value)
continue
# Close list
if current_field and event == 'end_array':
if current_field not in self.field_types:
errors.append(f"Unknown field: {current_field}")
elif current_field in seen_fields:
errors.append(f"Duplicate field: {current_field}")
else:
expected_type = self.field_types[current_field]
if not self._validate_type(current_array, expected_type):
errors.append(f"Invalid type for {current_field}: expected {expected_type}, got array with invalid items")
seen_fields.add(current_field)
current_array = []
in_array = False
current_field = None
continue
# Detect if we're looking at a complete key-value pair - necessary for list (and possibly dict) handling
if current_field and not in_array and event in ('number', 'string', 'boolean', 'null'):
if current_field not in self.field_types:
errors.append(f"Unknown field: {current_field}")
elif current_field in seen_fields:
errors.append(f"Duplicate field: {current_field}")
else:
expected_type = self.field_types[current_field]
if not self._validate_type(value, expected_type):
errors.append(f"Invalid type for {current_field}: expected {expected_type}, got {type(value)}")
seen_fields.add(current_field)
current_field = None
missing_fields = self.required_fields - seen_fields
if missing_fields:
errors.append(f"Missing required fields: {missing_fields}")
except Exception as e:
errors.append(f"Error parsing JSON: {type(e).__class__.__name__} - {str(e)}")
if len(errors) != 0:
is_valid = False
else:
is_valid = True
return is_valid, errors
# test.py
from pydantic import BaseModel
from .validator import StreamingJsonValidator
class A(BaseModel):
i: int
a: list[int] = []
s: str
validator = StreamingJsonValidator(A)
# valid case
is_valid, errors = validator.validate_file('valid.json')
print("Expecting a valid result")
if is_valid:
print("JSON is valid!")
else:
print("Validation errors:")
for error in errors:
print(f"- {error}")
# invalid case
is_valid, errors = validator.validate_file('invalid.json')
print("Expecting an invalid result")
if is_valid:
print("JSON is valid!")
else:
print("Validation errors:")
for error in errors:
print(f"- {error}")
# valid.json
{
"i": 42,
"a": [1, 2, 3],
"s": "hello"
}
# invalid.json
{
"i": 42,
"a": [1, "2", 3],
"s": "hello"
}
Upvotes: 1
Reputation: 505
You're on the right track using ijson
for streaming, but the issue is that pydantic
expects the entire object at once, while ijson
parses in a streaming manner. To validate incrementally without loading the entire JSON into memory, you can:
a
list items one by one instead of collecting them all in memory.Instead of passing the entire JSON object to Pydantic
at once, parse the JSON step-by-step and validate in parts.
import pydantic # Pydantic V2
import ijson
import pathlib
class A(pydantic.BaseModel):
i: int
a: list[int] = []
s: str
jsonpath = pathlib.Path("some.json")
errors = []
partial_data = {"i": None, "a": [], "s": None}
with jsonpath.open("rb") as file:
for prefix, event, value in ijson.parse(file, use_float=True):
if prefix == "i" and event == "number":
partial_data["i"] = value
elif prefix == "s" and event == "string":
partial_data["s"] = value
elif prefix.startswith("a.item") and event in {"number", "integer"}:
try:
# Validate individual array elements as they arrive
int_value = int(value)
A.model_validate({"a": [int_value]}, strict=True)
partial_data["a"].append(int_value)
except pydantic.ValidationError as e:
errors.append(f"Error in 'a': {e.errors()}")
try:
A.model_validate(partial_data, strict=True)
except pydantic.ValidationError as e:
errors.append(e.errors())
print(errors if errors else "Validation passed")
This is the JSON Schema of some.json.
{
"type": "object",
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"},
"data": {
"type": "array",
"items": {"type": "integer"}
}
},
"required": ["id", "name", "data"]
}
Upvotes: 3
Reputation: 679
Use pandas to read the json file and you can chunksize param in the pd.read_json(). This was you are only loading the few records (in chunks) into memory.
import pandas as pd
# Read the JSON file in chunks
for chunk in pd.read_json(input_file, chunksize=chunk_size, lines=True):
chunk_results = await process_chunk(chunk)
Link to the doc: read_json doc
Upvotes: 0
Reputation: 97
Using ijson for Incremental Parsing: You can process the file in a memory-efficient way by using the ijson package, which enables iterative parsing of JSON data. Although schema validation is not integrated into iJSON, you can add your own validation logic while parsing. Each parsed element must be explicitly compared to the expected schema using this method.
Upvotes: 0
Reputation: 52387
Pydantic comes with an experimental feature called "partial validation" that is designed for stream inputs.
See https://docs.pydantic.dev/latest/concepts/experimental/#partial-validation
You can create a Pydantic model from an existing JSON schema using datamodel-code-generator: https://koxudaxi.github.io/datamodel-code-generator/
Open issues I see right now with this method:
Upvotes: 1