Reputation: 85
I am using log4j2 in my web project. I was trying to put logs directly to kafka by extending abstractAppender. As per documentation my understanding is that i can specify patternlayout for a custom appender and with that being set, my logger will send log events to kafka with formatted string but that is not happening. log4j2.xml looks like
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="info" packages="com.abc.webservice.log.appender">
<Appenders>
<Console name="console" target="SYSTEM_OUT">
<PatternLayout>
<pattern>%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L- %X{sessionId}--%X{guid}- %m #]%n</pattern>
</PatternLayout>
</Console>
<Kafka name="kafka" topic="test">
<PatternLayout>
<pattern>%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L- %X{sessionId}--%X{guid}- %m #]%n</pattern>
</PatternLayout>
<Property name="metadata.broker.list">127.0.0.1:9092</Property>
<Property name="serializer.class">kafka.serializer.StringEncoder</Property>
</Kafka>
</Appenders>
<Loggers>
<AsyncLogger name="async">
<AppenderRef ref="kafka" />
<AppenderRef ref="console" />
</AsyncLogger>
<Root level="info">
<AppenderRef ref="console" />
<AppenderRef ref="kafka" />
</Root>
<Logger name="com.abc" level="debug">
<!-- <appender-ref ref="console" level="debug"/>-->
<!--<appender-ref ref="kafka" level="debug"/>-->
<!--<appender-ref ref="console" level="error"/>-->
<appender-ref ref="kafka" level="error"/>
</Logger>
<Logger name="org.hibernate.SQL" >
<appender-ref ref="kafka" level="info" />
<appender-ref ref="console" level="info"/>
</Logger>
<Logger name="org.hibernate.type">
<appender-ref ref="console" level="info"/>
<appender-ref ref="kafka" level="info"/>
</Logger>
<Root level="info">
<AppenderRef ref="kafka"/>
<AppenderRef ref="console"/>
</Root>
</Loggers>
</Configuration>
If i use console appender then log comes in proper format but when i use custom appender, log is received without format. How can i send logs to kafka with specified paatternlayout.
Please find my appender implementation
import java.io.Serializable;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.appender.AppenderLoggingException;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.core.layout.PatternLayout;
import org.apache.logging.log4j.core.util.Booleans;
import org.apache.logging.log4j.message.Message;
@Plugin(name = "Kafka", category = "Core", elementType = "appender", printObject = true)
public final class KafkaAppender extends AbstractAppender {
private final Lock lock = new ReentrantLock();
private KafkaManager manager;
protected KafkaAppender(String name, Filter filter, Layout layout, boolean ignoreExceptions, KafkaManager manager) {
super(name, filter, layout, ignoreExceptions);
System.err.println("hello world hello");
this.manager = manager;
}
@PluginFactory
public static KafkaAppender createAppender(@PluginAttribute("name") final String name, @PluginElement("Filter") final Filter filter,
@PluginAttribute("ignoreExceptions") final String ignore, @PluginAttribute("topic") final String topic,
@PluginElement("Properties") final Property[] properties, @PluginElement("layout") final Layout layout) {
boolean ignoreExceptions = Booleans.parseBoolean(ignore, true);
KafkaManager kafkaManager = KafkaManager.getKafkaManager(name, topic, properties);
if (kafkaManager == null) {
return null;
}
// Layout patternLayout = PatternLayout.createLayout("%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L- %X{sessionId}--%X{guid}- %m #]%n",
// null, null, null, true, false, null, null);
// System.err.println(patternLayout.toString());
return new KafkaAppender(name, filter, layout, ignoreExceptions, kafkaManager);
}
@Override
public final void start() {
if (this.getManager() == null) {
LOGGER.error("No KafkaManager set for the appender named [{}].", this.getName());
}
super.start();
if (this.getManager() != null) {
this.getManager().startup();
}
}
@Override
public final void stop() {
super.stop();
if (this.getManager() != null) {
this.getManager().release();
}
}
public final KafkaManager getManager() {
return this.manager;
}
public void append(LogEvent event) {
this.lock.lock();
try {
String s = event.getMessage().getFormattedMessage();
Message logEvent1 = event.getMessage();
String sp = logEvent1.getFormattedMessage();
this.getManager().send(event.getMessage().getFormattedMessage());
} catch (final Exception e) {
LOGGER.error("Unable to write to kafka [{}] for appender [{}].", this.getManager().getName(), this.getName(), e);
throw new AppenderLoggingException("Unable to write to kafka in appender: " + e.getMessage(), e);
} finally {
this.lock.unlock();
}
}
@Override
public Layout<? extends Serializable> getLayout() {
Layout patternLayout = PatternLayout.createLayout("%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L- %X{sessionId}--%X{guid}- %m #]%n",
null, null, null, true, false, null, null);
return patternLayout;
}
}
Upvotes: 1
Views: 5378
Reputation: 36754
In class KafkaAppender, your append method should call getLayout().toByteArray(event)
to format the event.
I noticed that the sample code overrides getLayout
. I would not recommend this. The AbstractAppender implementation of getLayout returns the configured layout, which allows you to control the layout in configuration without code changes.
@Override
public void append(LogEvent event) {
this.lock.lock();
try {
// let the Layout format the data in the LogEvent object
final byte[] bytes = getLayout().toByteArray(event);
// then pass the byte[] array with the formatted event to the manager
// (I assume that your manager provides this method)
manager.write(bytes, 0, bytes.length);
} catch (Exception e) {
LOGGER.error("Unable to write to kafka [{}] for appender [{}].",
this.getManager().getName(), this.getName(), e);
if (!ignoreExceptions()) {
throw new AppenderLoggingException(
"Unable to write to kafka in appender: " + e.getMessage(), e);
}
} finally {
this.lock.unlock();
}
}
// I would recommend not to override getLayout.
// The AbstractAppender implementation of getLayout returns the configured
// layout, which allows you to control the layout in configuration
// without code changes.
// @Override
// public Layout<? extends Serializable> getLayout() {...
Upvotes: 0