Reputation: 10712
I'm creating a migration tool from sql to mongo, as part of the sql queries I'm using for json auto
which results in a json response form sql server.
using dapper looks like that...
var jsonResults = _SqlDb.Query<string>(usersWithDynamicDataQuery, buffered:false, commandTimeout: _SqlCommandTimeoutSeconds);
var jsonResult = string.Concat(jsonResults);
var userDocuments = JsonSerializer.Deserialize<List<UserData>>(jsonResult);
so sql is returning a List with chunks of the full json response
I need to find some more "Memory Flexible" way other than just a string.concat(..)
as I'm hitting the CLR limit for a string memory allocation :\
I can always limit the and page over the queries with ROW_NUMBER()..
but I really want to utilize as much memory as I can (I have 128GB on the machine) here and make the migration swift with large chunks of data...
Upvotes: 3
Views: 3033
Reputation: 116980
It is possible to deserialize a single JSON payload from a list of strings representing chunked JSON by constructing a ReadOnlySequence<byte>
from the list, then constructing a Utf8JsonReader
from the sequence, and finally deserializing using the reader via JsonSerializer.Deserialize<TValue>(Utf8JsonReader, JsonSerializerOptions)
.
The following is a minimal implementation:
public static partial class JsonExtensions
{
public static TValue? Deserialize<TValue>(IEnumerable<string> buffers, JsonSerializerOptions? options = null) =>
Deserialize<TValue>(buffers.Select(s => s.AsMemory()), options);
public static TValue? Deserialize<TValue>(IEnumerable<ReadOnlyMemory<char>> buffers, JsonSerializerOptions? options = null) =>
Deserialize<TValue>(ToByteArrayChunks(buffers));
public static TValue? Deserialize<TValue>(IEnumerable<ReadOnlyMemory<byte>> buffers, JsonSerializerOptions? options = null)
{
var reader = new Utf8JsonReader(buffers.AsSequence(), options.GetReaderOptions());
return JsonSerializer.Deserialize<TValue>(ref reader, options);
}
public static JsonReaderOptions GetReaderOptions(this JsonSerializerOptions? options) =>
options is null ? new () : new ()
{
AllowTrailingCommas = options.AllowTrailingCommas,
CommentHandling = options.ReadCommentHandling,
MaxDepth = options.MaxDepth
};
static readonly Encoding encoding = new UTF8Encoding(false);
static IEnumerable<ReadOnlyMemory<byte>> ToByteArrayChunks(IEnumerable<ReadOnlyMemory<char>> buffers)
{
// By using an encoder we can handle the situation in which surrogate pairs enbedded in JSON string literals
// are split between chunks.
var encoder = encoding.GetEncoder();
foreach (var s in buffers)
{
ReadOnlySpan<char> charSpan = s.Span;
var count = encoder.GetByteCount(charSpan, false);
var bytes = new byte[count];
encoder.GetBytes(charSpan, bytes.AsSpan(), false);
yield return bytes;
}
}
}
public static class ReadOnlySequenceFactory
{
public static ReadOnlySequence<T> AsSequence<T>(this IEnumerable<T []> buffers) => ReadOnlyMemorySegment<T>.Create(buffers.Select(a => new ReadOnlyMemory<T>(a)));
public static ReadOnlySequence<T> AsSequence<T>(this IEnumerable<ReadOnlyMemory<T>> buffers) => ReadOnlyMemorySegment<T>.Create(buffers);
// There is no public concrete implementation of ReadOnlySequenceSegment<T> so we must create one ourselves.
// This is modeled on https://github.com/dotnet/runtime/blob/v5.0.18/src/libraries/System.Text.Json/tests/BufferFactory.cs
// by https://github.com/ahsonkhan
class ReadOnlyMemorySegment<T> : ReadOnlySequenceSegment<T>
{
public static ReadOnlySequence<T> Create(IEnumerable<ReadOnlyMemory<T>> buffers)
{
ReadOnlyMemorySegment<T>? first = null;
ReadOnlyMemorySegment<T>? current = null;
foreach (var buffer in buffers)
{
var next = new ReadOnlyMemorySegment<T> { Memory = buffer };
if (first == null)
first = next;
else
{
current!.Next = next;
next.RunningIndex = current.RunningIndex + current.Memory.Length;
}
current = next;
}
if (first == null)
first = current = new ();
return new ReadOnlySequence<T>(first, 0, current!, current!.Memory.Length);
}
}
}
Notes:
A ReadOnlySequence<T>
is constructed from a linked list of ReadOnlySequenceSegment<T>
objects -- but this type is abstract and .NET Core 3.1 doesn't seem to include concrete public implementation. I modeled the implementation above on this one by Ahson Khan.
JsonSerializer
is designed to deserialize from UTF-8 encoded byte sequences rather than from strings or character arrays, so if you can make your database access layer return a list of UTF-8 byte arrays rather than strings, you will get better performance and avoid the step of encoding each string chunk into bytes.
If this isn't possible and your input is definitely a long list of smallish strings (2033 characters), it might be worthwhile to investigate using memory or array pooling to allocate the necessary UTF-8 byte sequences.
While this approach avoids allocating a single, huge string
or byte []
, the entire JSON payload is nevertheless loaded into memory all at once as a sequence of chunks. Thus this is not a true streaming solution.
If you are interested in a true streaming solution and can access your JSON data directly as a Stream
, you might look at this answer to Parsing a JSON file with .NET core 3.0/System.text.Json by mtosh.
Demo fiddle here.
Upvotes: 3
Reputation: 96
this is not the answer but may lead to it apologies. I had a similar issue recently, with HTTP, and the trick was not to build the intermediary string, as you have identified. I found that if I used a stream instead I could just miss out the middle man entirely. Kev Dockx did some work in this area and has a useful nuget called Marvin.StreamExtensions for processing Json. You need to produce a stream from your Query to make it work however.....
var userDocuments = stream.ReadAndDeserializeFromJson<List<UserData>>();
Check out these links for Foreach based solutions? Is this Dapper, never used it, but following might be helpful. Explanation of dapper buffer/cache
It is possible to stream a large SQL Server database result set using Dapper?
Git hub stuff(produces a stream from Query) but as you are "bufered : false" pretty sure you can just foreach it (?): https://github.com/JocaPC/Dapper.Stream
Upvotes: 0