首页

基于apache的log4j的AbstractAppender自定义KafkaAppender监听LogEvent事件进行发送kafka消息处理

标签:KafkaAppender,AbstractAppender,apache.,log4j,logging,日志事件,LogEvent     发布时间:2018-03-30   

一、前言

基于apachelog4j的org.apache.logging.log4j.core.appender.AbstractAppender,进行自定义log4j的<Kafka>将日志通过发送消息队列后,并可以将消息便更的状态信息等

二、源码说明

1.KafkaAppender类

package com.xwood.kafka.log4j.appender;@b@@b@import com.xwood.kafka.exception.InitializationError;@b@import java.util.Properties;@b@import java.util.concurrent.locks.Lock;@b@import java.util.concurrent.locks.ReadWriteLock;@b@import java.util.concurrent.locks.ReentrantReadWriteLock;@b@import org.apache.logging.log4j.core.Filter;@b@import org.apache.logging.log4j.core.LogEvent;@b@import org.apache.logging.log4j.core.appender.AbstractAppender;@b@import org.apache.logging.log4j.core.config.plugins.Plugin;@b@import org.apache.logging.log4j.core.config.plugins.PluginAttribute;@b@import org.apache.logging.log4j.core.config.plugins.PluginElement;@b@import org.apache.logging.log4j.core.config.plugins.PluginFactory;@b@import org.apache.logging.log4j.core.util.Booleans;@b@import org.springframework.util.Assert;@b@import org.springframework.util.StringUtils;@b@@b@@Plugin(name="Kafka", category="Core", elementType="appender", printObject=true)@b@public class KafkaAppender extends AbstractAppender@b@{@b@  private static final long serialVersionUID = 4701714603421188915L;@b@  private static final String TOPIC_PREFIX = "xwood_log";@b@  private final ReadWriteLock rwLock = new ReentrantReadWriteLock();@b@  private final Lock writeLock = this.rwLock.writeLock();@b@  private static volatile KafkaLog4jManager manager;@b@@b@  public KafkaAppender(String name, Filter filter, boolean ignoreExceptions)@b@  {@b@    super(name, filter, null, ignoreExceptions);@b@  }@b@@b@  public synchronized void append(LogEvent event) {@b@    this.writeLock.lock();@b@    try {@b@      getManager().writeEvent(event);@b@    } catch (Throwable t) {@b@    }@b@    finally {@b@      this.writeLock.unlock();@b@    }@b@  }@b@@b@  @PluginFactory@b@  public static KafkaAppender createAppender(@PluginAttribute("name") String name, @PluginElement("PropertyConfigs") PropertyConfig[] properties, @PluginAttribute("ignoreExceptions") String ignore, @PluginElement("Filter") Filter filter)@b@  {@b@    String projectId ="xwood_test";@b@    String domainId ="xwood.net.id";@b@    String domain = (!(StringUtils.hasText(domainId))) ? projectId : domainId;@b@    String pappName = "xwood_test"; @b@    boolean ignoreExceptions = Booleans.parseBoolean(ignore, true);@b@    KafkaLog4jManagerData factoryData = new KafkaLog4jManagerData(domain + "_" + pappName, "xwood_log", domain + "_" + pappName, checkProperties(properties));@b@    KafkaAppender appender = new KafkaAppender(name, filter, ignoreExceptions);@b@@b@    if (manager != null) {@b@      manager.stop();@b@    }@b@@b@    manager = new KafkaLog4jManager(factoryData);@b@    return appender;@b@  }@b@@b@  private static Properties checkProperties(PropertyConfig[] properties) {@b@    Assert.notEmpty(properties, "kafka.bootstrap.servers is needed! " + exampleMsg());@b@    Properties kafkaProperties = new Properties();@b@    PropertyConfig[] arrayOfPropertyConfig = properties; int i = arrayOfPropertyConfig.length; for (int j = 0; j < i; ++j) { PropertyConfig config = arrayOfPropertyConfig[j];@b@      kafkaProperties.put(config.getKey(), config.getValue());@b@    }@b@    if ((!(kafkaProperties.containsKey("kafka.bootstrap.servers"))) || (!(StringUtils.hasText(kafkaProperties.getProperty("kafka.bootstrap.servers"))))) {@b@      throw new InitializationError("kafka.bootstrap.servers is needed! " + exampleMsg(), null);@b@    }@b@@b@    if (!(StringUtils.hasText(kafkaProperties.getProperty("kafka.producer.key.serializer"))))@b@      kafkaProperties.put("kafka.producer.key.serializer", "com.xwood.kafka.serialization.JsonSerializer");@b@@b@    if (!(StringUtils.hasText(kafkaProperties.getProperty("kafka.producer.value.serializer"))))@b@      kafkaProperties.put("kafka.producer.value.serializer", "com.xwood.kafka.serialization.JsonSerializer");@b@@b@    return kafkaProperties;@b@  }@b@@b@  private static String exampleMsg() {@b@    StringBuffer msg = new StringBuffer();@b@    msg.append("<Appenders>");@b@    msg.append("<Kafka>");@b@    msg.append("<property key=\"kafka.bootstrap.servers\" value=\"ip:port,ip:port\" />");@b@    msg.append("<property key=\"kafka.producer.client.id\" value=\"log4j-client\" />");@b@    msg.append("......");@b@    msg.append("</Kafka>");@b@    msg.append("</Appenders>");@b@    return msg.toString();@b@  }@b@@b@  public KafkaLog4jManager getManager() {@b@    return manager;@b@  }@b@}

2.KafkaLog4jManager类

package com.xwood.kafka.log4j.appender;@b@@b@import com.xwood.common.utils.PNetUtils;@b@import com.xwood.kafka.core.ProducerTemplate;@b@import com.xwood.kafka.producer.MessageRecord;@b@import com.xwood.log4j.appender.DefaultNoSqlObject;@b@import com.xwood.log4j.appender.NoSqlObject;@b@import java.util.Map;@b@import org.apache.logging.log4j.Level;@b@import org.apache.logging.log4j.Marker;@b@import org.apache.logging.log4j.ThreadContext.ContextStack;@b@import org.apache.logging.log4j.core.LogEvent;@b@import org.apache.logging.log4j.message.Message;@b@@b@public final class KafkaLog4jManager@b@{@b@  private ProducerTemplate<String, Object> producer;@b@  private KafkaLog4jManagerData factoryData;@b@@b@  public KafkaLog4jManager(KafkaLog4jManagerData factoryData)@b@  {@b@    this.factoryData = factoryData;@b@    this.producer = new ProducerTemplate(factoryData.getTopicPrefix(), factoryData.getProperties());@b@    this.producer.start();@b@  }@b@@b@  public void writeEvent(LogEvent event) {@b@    NoSqlObject value = assembleProducerRecord(event);@b@    this.producer.send(new MessageRecord(this.factoryData.getTopic(), value.unwrap()));@b@  }@b@@b@  private NoSqlObject<Map<String, Object>> assembleProducerRecord(LogEvent event) {@b@    NoSqlObject entity = createNoSqlObject();@b@@b@    entity.set("level", event.getLevel().name());@b@    entity.set("loggerName", event.getLoggerName());@b@    entity.set("instanceIp", PNetUtils.getLocalHost());@b@    entity.set("message", (event.getMessage() == null) ? null : event.getMessage().getFormattedMessage());@b@@b@    StackTraceElement source = event.getSource();@b@    if (source == null)@b@      entity.set("source", null);@b@    else {@b@      entity.set("source", convertStackTraceElement(source));@b@    }@b@@b@    Marker marker = event.getMarker();@b@    if (marker == null)@b@      entity.set("marker", null);@b@    else {@b@      entity.set("marker", buildMarkerEntity(marker));@b@    }@b@@b@    entity.set("threadName", event.getThreadName());@b@    entity.set("millis", Long.valueOf(event.getTimeMillis()));@b@@b@    Throwable thrown = event.getThrown();@b@    if (thrown == null) {@b@      entity.set("thrown", null);@b@    } else {@b@      NoSqlObject originalExceptionEntity = createNoSqlObject();@b@      NoSqlObject exceptionEntity = originalExceptionEntity;@b@      exceptionEntity.set("type", thrown.getClass().getName());@b@      exceptionEntity.set("message", thrown.getMessage());@b@      exceptionEntity.set("stackTrace", convertStackTrace(thrown.getStackTrace()));@b@      while (thrown.getCause() != null) {@b@        thrown = thrown.getCause();@b@        NoSqlObject causingExceptionEntity = createNoSqlObject();@b@        causingExceptionEntity.set("type", thrown.getClass().getName());@b@        causingExceptionEntity.set("message", thrown.getMessage());@b@        causingExceptionEntity.set("stackTrace", convertStackTrace(thrown.getStackTrace()));@b@        exceptionEntity.set("cause", causingExceptionEntity);@b@        exceptionEntity = causingExceptionEntity;@b@      }@b@@b@      entity.set("thrown", originalExceptionEntity);@b@    }@b@@b@    ThreadContext.ContextStack contextStack = event.getContextStack();@b@    if (contextStack == null)@b@      entity.set("contextStack", null);@b@    else@b@      entity.set("contextStack", contextStack.asList());@b@@b@    return entity;@b@  }@b@@b@  private NoSqlObject<Map<String, Object>> buildMarkerEntity(Marker marker) {@b@    NoSqlObject entity = createNoSqlObject();@b@    entity.set("name", marker.getName());@b@@b@    Marker[] parents = marker.getParents();@b@    if (parents != null)@b@    {@b@      NoSqlObject[] parentEntities = new NoSqlObject[parents.length];@b@      for (int i = 0; i < parents.length; ++i)@b@        parentEntities[i] = buildMarkerEntity(parents[i]);@b@@b@      entity.set("parents", parentEntities);@b@    }@b@    return entity;@b@  }@b@@b@  private NoSqlObject<Map<String, Object>>[] convertStackTrace(StackTraceElement[] stackTrace) {@b@    NoSqlObject[] stackTraceEntities = createNoSqlObjectList(stackTrace.length);@b@    for (int i = 0; i < stackTrace.length; ++i)@b@      stackTraceEntities[i] = convertStackTraceElement(stackTrace[i]);@b@@b@    return stackTraceEntities;@b@  }@b@@b@  private NoSqlObject<Map<String, Object>> convertStackTraceElement(StackTraceElement element) {@b@    NoSqlObject elementEntity = createNoSqlObject();@b@    elementEntity.set("className", element.getClassName());@b@    elementEntity.set("methodName", element.getMethodName());@b@    elementEntity.set("fileName", element.getFileName());@b@    elementEntity.set("lineNumber", Integer.valueOf(element.getLineNumber()));@b@    return elementEntity;@b@  }@b@@b@  public void stop() {@b@    this.producer.close();@b@  }@b@@b@  private NoSqlObject<Map<String, Object>> createNoSqlObject() {@b@    DefaultNoSqlObject obj = new DefaultNoSqlObject();@b@    obj.set("appName", this.factoryData.getAppName());@b@    return obj;@b@  }@b@@b@  private NoSqlObject<Map<String, Object>>[] createNoSqlObjectList(int length) {@b@    return new DefaultNoSqlObject[length];@b@  }@b@}

三、log4j配置

<Appenders>@b@	<Kafka>@b@		<property key="kafka.bootstrap.servers" value="ip:port,ip:port" />@b@		<property key="kafka.producer.client.id" value="log4j-client" />@b@	</Kafka>@b@</Appenders>