R Sun
R Sun

Reputation: 1671

How can I stream elements from inside a JSON array using serde_json?

I have a 5GB JSON file which is an array of objects with fixed structure:

[
  {
    "first": "John",
    "last": "Doe",
    "email": "[email protected]"
  },
  {
    "first": "Anne",
    "last": "Ortha",
    "email": "[email protected]"
  },
  ....
]

I know that I can try to parse this file using the code shown in How can I deserialize JSON with a top-level array using Serde?:

use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug)]
struct User {
    first: String,
    last: String,
    email: String,
}

let users: Vec<User> = serde_json::from_str(file)?;

There are multiple problems:

  1. It is first read as a string as a whole
  2. After reading as string, it converts it into a vector of User structs (I don't want that)

I tried How I can I lazily read multiple JSON values from a file/stream in Rust? but it reads the whole file before printing anything and it prints the whole structure at once inside the loop. I was expecting one object at a time in the loop:

enter image description here

Ideally, parsing and processing of the (parsed) User object should happen simultaneously in two separate threads/tasks/routines or by making use of channel.

Upvotes: 11

Views: 3631

Answers (3)

Marcono1234
Marcono1234

Reputation: 6884

Disclaimer: The following uses a library other than serde_json, I am the author of that library and it is currently still experimental (but feedback is highly appreciated!).


You can use Struson and its serde feature to achieve this:

First you start the JSON array with begin_array and use has_next as loop condition to check if the array has more elements. In the loop you can then use deserialize_next to deserialize your User values.

let json = r#"
[
    {
        "first": "John",
        "last": "Doe",
        "email": "[email protected]"
    },
    {
        "first": "Anne",
        "last": "Ortha",
        "email": "[email protected]"
    }
]       
"#;
// `std::io::Read` providing the JSON data; in this example the str bytes
let reader = json.as_bytes();

#[derive(Serialize, Deserialize, Debug)]
struct User {
    first: String,
    last: String,
    email: String,
}

let mut json_reader = JsonStreamReader::new(reader);

json_reader.begin_array()?;

while json_reader.has_next()? {
    let user: User = json_reader.deserialize_next()?;
    // ... use deserialized value in some way
    println!("deserialized: {user:?}")
}

// Optionally consume the remainder of the JSON document
json_reader.end_array()?;
json_reader.consume_trailing_whitespace()?;

This is all performed in a streaming way by Struson, so it will only read as much data necessary to deserialize each of the User values (and possibly buffer some more bytes internally for the next call).

Upvotes: 2

user4815162342
user4815162342

Reputation: 154886

Streaming elements from a JSON array is possible, but requires some legwork. You must skip the leading [ and the intermittent , yourself, as well as detect the final ]. To parse individual array elements you need to use StreamDeserializer and extract a single item from it (so you can drop it and regain control of the IO reader). For example:

use serde::de::DeserializeOwned;
use serde_json::{self, Deserializer};
use std::io::{self, Read};

fn read_skipping_ws(mut reader: impl Read) -> io::Result<u8> {
    loop {
        let mut byte = 0u8;
        reader.read_exact(std::slice::from_mut(&mut byte))?;
        if !byte.is_ascii_whitespace() {
            return Ok(byte);
        }
    }
}

fn invalid_data(msg: &str) -> io::Error {
    io::Error::new(io::ErrorKind::InvalidData, msg)
}

fn deserialize_single<T: DeserializeOwned, R: Read>(reader: R) -> io::Result<T> {
    let next_obj = Deserializer::from_reader(reader).into_iter::<T>().next();
    match next_obj {
        Some(result) => result.map_err(Into::into),
        None => Err(invalid_data("premature EOF")),
    }
}

fn yield_next_obj<T: DeserializeOwned, R: Read>(
    mut reader: R,
    at_start: &mut bool,
) -> io::Result<Option<T>> {
    if !*at_start {
        *at_start = true;
        if read_skipping_ws(&mut reader)? == b'[' {
            // read the next char to see if the array is empty
            let peek = read_skipping_ws(&mut reader)?;
            if peek == b']' {
                Ok(None)
            } else {
                deserialize_single(io::Cursor::new([peek]).chain(reader)).map(Some)
            }
        } else {
            Err(invalid_data("`[` not found"))
        }
    } else {
        match read_skipping_ws(&mut reader)? {
            b',' => deserialize_single(reader).map(Some),
            b']' => Ok(None),
            _ => Err(invalid_data("`,` or `]` not found")),
        }
    }
}

pub fn iter_json_array<T: DeserializeOwned, R: Read>(
    mut reader: R,
) -> impl Iterator<Item = Result<T, io::Error>> {
    let mut at_start = false;
    std::iter::from_fn(move || yield_next_obj(&mut reader, &mut at_start).transpose())
}

Example usage:

fn main() {
    let data = r#"[
  {
    "first": "John",
    "last": "Doe",
    "email": "[email protected]"
  },
  {
    "first": "Anne",
    "last": "Ortha",
    "email": "[email protected]"
  }
]"#;
    use serde::{Deserialize, Serialize};

    #[derive(Serialize, Deserialize, Debug)]
    struct User {
        first: String,
        last: String,
        email: String,
    }

    for user in iter_json_array(io::Cursor::new(&data)) {
        let user: User = user.unwrap();
        println!("{:?}", user);
    }
}

Playground

When using it in production, you'd open it as File instead of reading it to a string. As always, don't forget to wrap the File in a BufReader.

Upvotes: 9

Shepmaster
Shepmaster

Reputation: 430634

This is not directly possible as of serde_json 1.0.66.

One workaround suggested is to implement your own Visitor that uses a channel. As deserialization of the array progresses, each element is pushed down the channel. The receiving side of the channel can then grab each element and process it, freeing up space for the deserialization to push in another value.

Upvotes: 4

Related Questions