In this tutorial series, we will be discussing about how to stream log4j application logs to apache Kafka using maven artifact kafka-log4j-appender
.To keep application logging configuration simple, we will be doing spring boot configurations and stream log4j logs to apache Kafka.
We have already installed and configured apache Kafka in our local system in my last article - Apache Kafka With java and Maven. Hence, we will skip that part and directly create a spring boot application.This application will have log4j configuration with simple Kafka Appender that will stream the logs generated in the application to kafka running on port 9092. And to make this tutorial simple, we will have a java consumer class that will be consuming the messages from Kafka topic and print in the console.
In the next article, we will configure logstash to consume these messages from the Kafka topic and push to elasticsearch.
Maven Dependencies
First of all, we will be creating a sample spring boot application from http://start.spring.io and add kafka-log4j-appender as a dependency in it and exclude spring boot default logging properties. Following is the pom file.
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-log4j-appender</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
Apache Kafka Configuration
We have already configured Kafka and zookepper in or last article here. Hence, we will be just listing the default behaviour which we will be using in this article.
Zookeeper - http://localhost:2181
Kafka - http://localhost:9092
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic devglan-test //create topic
Spring Boot Application
Following is our main application class that will invoke the log() method for once and after every 3 seconds it will perform logging.In other words, this will be the producer for the Kafka topic.
SpringBootKafkaLogApplication.java@SpringBootApplication public class SpringBootKafkaLogApplication { public static void main(String[] args) throws InterruptedException { SpringApplication.run(SpringBootKafkaLogApplication.class, args); Timer timer = new Timer(); timer.log(); } }
Timer.java public class Timer { private static final Logger logger = LoggerFactory.getLogger(Timer.class); public void log() throws InterruptedException { while(true) { logger.info("Inside scheduleTask - Sending logs to Kafka at " + DateTimeFormatter.ofPattern("HH:mm:ss").format(LocalDateTime.now())); Thread.sleep(3000); } } }
Log4j Configuration for Kafka Appender
There are couple of things to take care while configuring Kafka appender in log4j configuration.To avoid recursive logging, we have configured apache kafka logging level as WARN.One thing to note here is, the logging level can not be debug to avoid recursive logging.This appender is synchronous by default and will block until the record has been acknowledged by the Kafka server and hence we have wrapped it with Async Appender to log asynchronously.
log42j.xml<?xml version="1.0" encoding="UTF-8"?> <Configuration status="info" name="spring-boot-kafka-log" packages="com.devglan"> <Appenders> <Kafka name="Kafka" topic="devglan-test"> <PatternLayout pattern="%date %message"/> <Property name="bootstrap.servers">localhost:9092</Property> </Kafka> <Async name="Async"> <AppenderRef ref="Kafka"/> </Async> <Console name="stdout" target="SYSTEM_OUT"> <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5p [%-7t] %F:%L - %m%n"/> </Console> </Appenders> <Loggers> <Root level="INFO"> <AppenderRef ref="Kafka"/> <AppenderRef ref="stdout"/> </Root> <Logger name="org.apache.kafka" level="WARN" /> </Loggers> </Configuration>
Kafka Consumer
Following is a simple java implementation of Apach kafka that will consume the log message from the kafka broker.This is just for demo purpose. In the next article, we will be discussing about consuming this log messages in logstash.
Consumer.javapublic class Consumer { public static void main(String[] args) { Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("group.id", "test-group"); KafkaConsumerkafkaConsumer = new KafkaConsumer (properties); List topics = new ArrayList (); topics.add("devglan-test"); kafkaConsumer.subscribe(topics); try{ while (true){ ConsumerRecords records = kafkaConsumer.poll(10); for (ConsumerRecord record: records){ System.out.println(String.format("Topic - %s, Partition - %d, Value: %s", record.topic(), record.partition(), record.value())); } } }catch (Exception e){ System.out.println(e.getMessage()); }finally { kafkaConsumer.close(); } } }
Streaming Log4j Logs to Kafka
Start zookeeper and kafka first. Refer this article for complete configuration settings.
Once, it is done, run Consumer.java
and SpringBootKafkaLogApplication.java
as a java application.You can see following in the console of Consumer.java
Conclusion
In this article, we discussed about streaming log4j logs to apache kafka broker and consuming the same in java client. In next article, we will be discussing about consuming these log messages in logstash.