mritz_p
mritz_p

Reputation: 3118

How to write a large number of records to an Xodus database without memory leaks?

I need to insert around 150M records into an Xodus database using xodus-dnq in a batch. Based on examples, I have implemented the following method:

        XdModel.registerNodes(
                XDTrip
        )
        val store = StaticStoreContainer.init(
                dbFolder = File(target),
                environmentName = "trips"
        )
        initMetaData(XdModel.hierarchy, store)
        store.use { store ->
            store.persistentStore.use {
                // TripLoader(File(src)) returns a stream with roughly 150M elements
                TripLoader(File(src)).forEach { databaseTrip ->
                    store.transactional {
                        XDTrip.new {
                            id = databaseTrip.id
                            start = databaseTrip.start.epochSecond
                            end = databaseTrip.end.epochSecond
                        }
                    }
                }
            }
        }

This works fine but leaks memory. Presumably I need to manually commit/flush/persist the transaction?

Based on another suggestion I have refactored this to run smaller batches within a transaction:

    XdModel.registerNodes(
            XDTrip,
            XDPosition,
            XDPositionSource
    )
    val store = StaticStoreContainer.init(
            dbFolder = File(target),
            environmentName = "trips"
    )
    initMetaData(XdModel.hierarchy, store)
    store.use {
        it.use { store ->
            Sequence { TripLoader(File(src)).iterator() }.chunked(100).forEachIndexed { index, csvChunk ->
                store.transactional {
                    println("Chunk $index")
                    csvChunk.forEach { csvTrip ->
                        XDTrip.new {
                            id = csvTrip.id
                            start = csvTrip.start.epochSecond
                            end = csvTrip.end.epochSecond
                            // ...
                        }
                    }
                }
            }
        }
    }

This however still leaks memory.

I analysed the memory and found an instance of jetbrains.exodus.core.dataStructures.ConcurrentLongObjectCache taking up most of the heap:

One instance of "jetbrains.exodus.core.dataStructures.ConcurrentLongObjectCache" loaded by "sun.misc.Launcher$AppClassLoader @ 0x4c0481d58" occupies 45,075,952 (80.24%) bytes. The memory is accumulated in one instance of "jetbrains.exodus.core.dataStructures.ConcurrentLongObjectCache$CacheEntry[]" loaded by "sun.misc.Launcher$AppClassLoader @ 0x4c0481d58".

Keywords jetbrains.exodus.core.dataStructures.ConcurrentLongObjectCache sun.misc.Launcher$AppClassLoader @ 0x4c0481d58 jetbrains.exodus.core.dataStructures.ConcurrentLongObjectCache$CacheEntry[]

I can see how the database size on disk increases while the program runs, so I'm unsure why the cache fills up here.

Upvotes: 0

Views: 484

Answers (1)

lehvolk
lehvolk

Reputation: 234

Process heavy operations with batches is a recommended way to go. If you use Kotlin Sequences there is windowed extension function. So your code will be like:

TripLoader(File(src)).windowed(1000) { trips ->
                    store.transactional {
                    trips.forEach { databaseTrip ->
                        XDTrip.new {
                            id = databaseTrip.id
                            start = databaseTrip.start.epochSecond
                            end = databaseTrip.end.epochSecond
                        }
                    }
                }

Practically if you do something in runtime or whole operation consumes a lot of time it's better to go a bit deeper and use low-level api. Code will become more complex but runs times faster.

As an example if you believes that data is consistent and you do not need to check all constraints, relations and other you can use something like this:

val type =  XdTrip.entityType
TripLoader(File(src)).windowed(1000) { trips ->
        persistentStore.executeInExclusiveTransaction { txn -> 
               trips.forEach { databaseTrip ->
                        txn.newEntity(type).also {
                            it.setProperty("id", databaseTrip.id)
                            ...
                        }
               }
        }

Window size should checked according to imported data. In case of primitive data structures it can be increased.

Upvotes: 2

Related Questions