Reputation: 11
Flink 1.15.4
After every executed job for an uploaded jar, the metaspace on the job manager keeps increasing, because a class loader can't be GC-ed.
Initially, I suspected that it's due to the org.apache.flink.metrics.jmx.JMXReporter
-- the cluster is configured with metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
.
The heap dump showed that the org.apache.flink.jobmanager.job.numRestarts
metric has a hard reference to the instance of custom input class (inputSplitSource
referenced through org.apache.flink.metrics.jmx.JMXReporter$JmxGauge
-> org.apache.flink.runtime.jobgraph.JobGraph
). And that the MBean's are not correctly unregistered on job completion.
But then, I found more references to the org.apache.flink.runtime.jobgraph.JobGraph
in the heap.
Below is the example code of dummy input/output used to reproduce the problem
package io.debug.flink;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.common.io.NonParallelInput;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
import java.time.ZonedDateTime;
import java.util.Iterator;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.extern.slf4j.Slf4j;
public class CustomInputJob {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.createInput(new CustomInput())
.output(new CustomOutput());
env.execute("CustomInputJob(export=" + ZonedDateTime.now() + ")");
}
public static class CustomInput extends GenericInputFormat<UUID> implements NonParallelInput {
private Iterator<UUID> iterator;
@Override
public void open(GenericInputSplit split) {
iterator = IntStream.range(0, 5).boxed().map(i -> UUID.randomUUID()).collect(Collectors.toList()).iterator();
}
@Override
public boolean reachedEnd() {
return !iterator.hasNext();
}
@Override
public UUID nextRecord(UUID reuse) {
return iterator.next();
}
}
@Slf4j
public static class CustomOutput extends RichOutputFormat<UUID> {
@Override
public void configure(Configuration parameters) {
}
@Override
public void open(int taskNumber, int numTasks) {
}
@Override
public void writeRecord(UUID record) {
log.info("received: {}", record);
}
@Override
public void close() {
}
}
}
nothing suspicious in the logs after enabling TRACE for org.apache.flink
. at least I see the job was presumably cleaned:
Job 52f460a9dcb068b1509137f12f28b061 has been registered for cleanup in the JobResultStore after reaching a terminal state.
Cleanup for the job '52f460a9dcb068b1509137f12f28b061' has finished. Job has been marked as clean.
and a few screenshots with the info taken from the heap dump
not collected class loaders not gc-ed class loaders
merged paths for the not gc-ed class loader path to the custom input instance
a pointer to the numRestarts
metric org.apache.flink.jobmanager.job.numRestarts
I added a JobListener to unregister the job-related MBean's like
@Override
public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
var server = ManagementFactory.getPlatformMBeanServer();
if (jobExecutionResult == null) {
log.warn("received null job execution result");
return;
}
try {
log.info("stared cleaning up the jmx resources for job id '{}', job name '{}'", jobExecutionResult.getJobID(), jobName);
Hashtable<String, String> ht = new Hashtable<>(3);
ht.put("host", "user-export-cluster-jobmanager");
ht.put("job_id", replaceInvalidChars(jobExecutionResult.getJobID().toString()));
ht.put("job_name", replaceInvalidChars(jobName));
for (var matchingName : server.queryNames(ObjectName.getInstance("org.apache.flink.jobmanager.job.*", ht), null)) {
if (server.isRegistered(matchingName)) {
log.debug("unregistering MBean for `{}`", matchingName);
server.unregisterMBean(matchingName);
log.info("unregistered MBean for `{}`", matchingName);
} else {
log.debug("no registered MBean for `{}`", matchingName);
}
}
} catch (Exception e) {
log.error("can't get access to jmx", e);
}
}
But it didn't help, the metaspace keeps growing, the class loader's are hanging around. Below is the screenshot, which shows the rest of the hard references (not to the "top" level) all incoming references to the custom input class
As of now I'm wondering about the following questions:
Upvotes: 1
Views: 323