Reputation: 5564
I am running a Symfony CLI command to migrate 150M+ records from MongoDB to PostgreSQL by chunks in batches. The problem is that Doctrine keeps increasing memory usage until I hit OOM (Out of Memory) errors after ~200 records, even when flushing and clearing in batches.
Migration Flow:
em-clear()
after each flushHere is a code example:
public function migrate() {
$limit = 10000; // the more the better
$batchSize = 0;
$batch = [];
// get products is taking item 1 by 1 with cursor and generator
foreach ($this->getFromMongo($limit) as $itemData) {
$batch[] = $this->buildEntity($itemData);
$count++;
if ($count % $limit === 0) {
$this->bulkInsert($batch);
$batch = [];
}
}
// Insert remaining batch
if (!empty($batch)) {
$this->bulkInsert($batch);
}
}
private function bulkInsert(array $batch)
{
try {
foreach ($batch as $item) {
$this->em->persist($item);
}
$this->em->flush();
$this->em->clear(); // Free memory
foreach ($this->em->getUnitOfWork()->getIdentityMap() as $class => $entities) {
foreach ($entities as $entity) {
$this->em->detach($entity); // Fully remove entity references, just in case
}
}
gc_collect_cycles(); // Ensure garbage collection
} catch (\Exception $e) {
throw $e;
}
}
So as I mentioned above I can save a maximum of 200 items until I'm facing OOM, memory_limit
is 256M
, of course, I can have more, but it makes no sense, it still is not enough. Migrating by 100-200 is also not a case, it'll take forever to migrate 150M items.
Yes, I can write raw update queries, but my entities contain business logic, validation, etc. and I want it to be triggered.
I know I can use the power of queue here so I can process as quickly as many workers I'll have, but it also adds extra costs on supporting this, catching issues, races, locks, etc. So maybe there is some other magic that can be done at the Doctrine level, maybe some guru knows how to free up memory?
Upvotes: 0
Views: 35
Reputation: 92
suggestions to improve:
yield
to yield a batch, and make a generator method -> this will improve memory problem a bitpublic function migrate(): void // Note: previous 'bulkInsert' method
{
try {
foreach ($this->migrateIterator() as $items) {
$emClone = EntityManagerCloner::cloneEntityManager(
$this->em
);
foreach ($items as &$item) {
$emClone->persist($item);
}
unset($item, $emClone); // Note: unsetting clonned $emClone should automatically release related memory
gc_collect_cycles(); // Ensure garbage collection
}
} catch (\Exception $e) {
throw $e;
}
}
private function migrateIterator(): Generator // Note: previous 'migrate' method
{
$batchSize = 1000;
$doneSomeYields = false;
$currentBatch = [];
$currentBatchSize = 0; // Note: it is useful to make all iteration's variables defined from the start with values+types
foreach ($this->getFromMongo($limit) as $itemData) {
$batch[] = $this->buildEntity($itemData); // Note: but 'd rather recommend to have a separate Factory class
$currentBatchSize++;
if ($currentBatchSize === $limit) {
yield $batch;
$doneSomeYields = true;
$currentBatchSize = 0;
$batch = [];
}
}
if (
$batch !== []
|| !$doneSomeYields
) {
yield $batch; // Note: this is required to cover case, if there were no records from 'getFromMongo' method
}
}
code snippet how to clone DB connection
use Doctrine\ORM\EntityManagerInterface;
use Doctrine\ORM\Tools\Setup;
use Doctrine\ORM\EntityManager;
class EntityManagerCloner
{
public static function cloneEntityManager(EntityManagerInterface $originalEm): EntityManagerInterface
{
// Get the original configuration, connection, and metadata factory
$config = $originalEm->getConfiguration();
$connection = $originalEm->getConnection();
// Create a new instance of EntityManager with the same connection and config
return new EntityManager($connection, $config);
}
}
Upvotes: 1