Konstantin
Konstantin

Reputation: 11

Flink. Class loading leak in the JobManager

Flink 1.15.4

After every executed job for an uploaded jar, the metaspace on the job manager keeps increasing, because a class loader can't be GC-ed.

Initially, I suspected that it's due to the org.apache.flink.metrics.jmx.JMXReporter -- the cluster is configured with metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter.

The heap dump showed that the org.apache.flink.jobmanager.job.numRestarts metric has a hard reference to the instance of custom input class (inputSplitSource referenced through org.apache.flink.metrics.jmx.JMXReporter$JmxGauge -> org.apache.flink.runtime.jobgraph.JobGraph). And that the MBean's are not correctly unregistered on job completion.

But then, I found more references to the org.apache.flink.runtime.jobgraph.JobGraph in the heap.

Below is the example code of dummy input/output used to reproduce the problem

package io.debug.flink;

import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.common.io.NonParallelInput;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;

import java.time.ZonedDateTime;
import java.util.Iterator;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import lombok.extern.slf4j.Slf4j;

public class CustomInputJob {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.createInput(new CustomInput())
                .output(new CustomOutput());
        env.execute("CustomInputJob(export=" + ZonedDateTime.now() + ")");
    }
    public static class CustomInput extends GenericInputFormat<UUID> implements NonParallelInput {
        private Iterator<UUID> iterator;
        @Override
        public void open(GenericInputSplit split) {
            iterator = IntStream.range(0, 5).boxed().map(i -> UUID.randomUUID()).collect(Collectors.toList()).iterator();
        }
        @Override
        public boolean reachedEnd()  {
            return !iterator.hasNext();
        }
        @Override
        public UUID nextRecord(UUID reuse) {
            return iterator.next();
        }
    }
    @Slf4j
    public static class CustomOutput extends RichOutputFormat<UUID> {
        @Override
        public void configure(Configuration parameters) {
        }
        @Override
        public void open(int taskNumber, int numTasks) {
        }
        @Override
        public void writeRecord(UUID record) {
            log.info("received: {}", record);
        }
        @Override
        public void close() {
        }
    }
}

nothing suspicious in the logs after enabling TRACE for org.apache.flink. at least I see the job was presumably cleaned:

Job 52f460a9dcb068b1509137f12f28b061 has been registered for cleanup in the JobResultStore after reaching a terminal state.
Cleanup for the job '52f460a9dcb068b1509137f12f28b061' has finished. Job has been marked as clean.

and a few screenshots with the info taken from the heap dump

  1. not collected class loaders not gc-ed class loaders

  2. merged paths for the not gc-ed class loader path to the custom input instance

  3. a pointer to the numRestarts metric org.apache.flink.jobmanager.job.numRestarts

I added a JobListener to unregister the job-related MBean's like

@Override
public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
    var server = ManagementFactory.getPlatformMBeanServer();

    if (jobExecutionResult == null) {
        log.warn("received null job execution result");
        return;
    }

    try {
        log.info("stared cleaning up the jmx resources for job id '{}', job name '{}'", jobExecutionResult.getJobID(), jobName);

        Hashtable<String, String> ht = new Hashtable<>(3);
        ht.put("host", "user-export-cluster-jobmanager");
        ht.put("job_id", replaceInvalidChars(jobExecutionResult.getJobID().toString()));
        ht.put("job_name", replaceInvalidChars(jobName));

        for (var matchingName : server.queryNames(ObjectName.getInstance("org.apache.flink.jobmanager.job.*", ht), null)) {
            if (server.isRegistered(matchingName)) {
                log.debug("unregistering MBean for `{}`", matchingName);
                server.unregisterMBean(matchingName);
                log.info("unregistered MBean for `{}`", matchingName);
            } else {
                log.debug("no registered MBean for `{}`", matchingName);
            }
        }
    } catch (Exception e) {
        log.error("can't get access to jmx", e);
    }
}

But it didn't help, the metaspace keeps growing, the class loader's are hanging around. Below is the screenshot, which shows the rest of the hard references (not to the "top" level) all incoming references to the custom input class

As of now I'm wondering about the following questions:

  1. is it a bug in the Flink?
  2. or is there something the Flink configuration I can tune to force the proper cleanup on job completed?
  3. or is there a workaround to achieve the same, like I did for unregistering the MBean's on job executed?

Upvotes: 1

Views: 323

Answers (0)

Related Questions