Reputation: 3118
I need to insert around 150M records into an Xodus database using xodus-dnq in a batch. Based on examples, I have implemented the following method:
XdModel.registerNodes(
XDTrip
)
val store = StaticStoreContainer.init(
dbFolder = File(target),
environmentName = "trips"
)
initMetaData(XdModel.hierarchy, store)
store.use { store ->
store.persistentStore.use {
// TripLoader(File(src)) returns a stream with roughly 150M elements
TripLoader(File(src)).forEach { databaseTrip ->
store.transactional {
XDTrip.new {
id = databaseTrip.id
start = databaseTrip.start.epochSecond
end = databaseTrip.end.epochSecond
}
}
}
}
}
This works fine but leaks memory. Presumably I need to manually commit/flush/persist the transaction?
Based on another suggestion I have refactored this to run smaller batches within a transaction:
XdModel.registerNodes(
XDTrip,
XDPosition,
XDPositionSource
)
val store = StaticStoreContainer.init(
dbFolder = File(target),
environmentName = "trips"
)
initMetaData(XdModel.hierarchy, store)
store.use {
it.use { store ->
Sequence { TripLoader(File(src)).iterator() }.chunked(100).forEachIndexed { index, csvChunk ->
store.transactional {
println("Chunk $index")
csvChunk.forEach { csvTrip ->
XDTrip.new {
id = csvTrip.id
start = csvTrip.start.epochSecond
end = csvTrip.end.epochSecond
// ...
}
}
}
}
}
}
This however still leaks memory.
I analysed the memory and found an instance of jetbrains.exodus.core.dataStructures.ConcurrentLongObjectCache
taking up most of the heap:
One instance of "jetbrains.exodus.core.dataStructures.ConcurrentLongObjectCache" loaded by "sun.misc.Launcher$AppClassLoader @ 0x4c0481d58" occupies 45,075,952 (80.24%) bytes. The memory is accumulated in one instance of "jetbrains.exodus.core.dataStructures.ConcurrentLongObjectCache$CacheEntry[]" loaded by "sun.misc.Launcher$AppClassLoader @ 0x4c0481d58".
Keywords jetbrains.exodus.core.dataStructures.ConcurrentLongObjectCache sun.misc.Launcher$AppClassLoader @ 0x4c0481d58 jetbrains.exodus.core.dataStructures.ConcurrentLongObjectCache$CacheEntry[]
I can see how the database size on disk increases while the program runs, so I'm unsure why the cache fills up here.
Upvotes: 0
Views: 484
Reputation: 234
Process heavy operations with batches is a recommended way to go. If you use Kotlin Sequences there is windowed
extension function. So your code will be like:
TripLoader(File(src)).windowed(1000) { trips ->
store.transactional {
trips.forEach { databaseTrip ->
XDTrip.new {
id = databaseTrip.id
start = databaseTrip.start.epochSecond
end = databaseTrip.end.epochSecond
}
}
}
Practically if you do something in runtime or whole operation consumes a lot of time it's better to go a bit deeper and use low-level api. Code will become more complex but runs times faster.
As an example if you believes that data is consistent and you do not need to check all constraints, relations and other you can use something like this:
val type = XdTrip.entityType
TripLoader(File(src)).windowed(1000) { trips ->
persistentStore.executeInExclusiveTransaction { txn ->
trips.forEach { databaseTrip ->
txn.newEntity(type).also {
it.setProperty("id", databaseTrip.id)
...
}
}
}
Window size should checked according to imported data. In case of primitive data structures it can be increased.
Upvotes: 2