Java - Producer / Consumer Kafka
Producer Github Project : example-talend-java-kafka-producer
Consumer Github Project : example-talend-java-kafka-consumer
Common part
Maven Dependencies
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.1</version> </dependency> </dependencies>
Producer
Instantiation of the KafkaProducer
KafkaProducer<String, String> producer = createKafkaProducer(); // Create Kafka producer with parameters private static KafkaProducer<String, String> createKafkaProducer() { Map<String,Object> map=new HashMap<>(); map.put("bootstrap.servers","127.0.0.1:9092,127.0.0.1:9093"); map.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); map.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); return new KafkaProducer<>(map); }
Create the topic message
// create the topic message, enter topic name and message ProducerRecord<String, String> record = new ProducerRecord<>("TopicJAVA", produceData()); // Get random US City Name private static String getUsCity() { String[] list = { "Montgomery", "Juneau", "Phoenix", "Little Rock", "Sacramento", "Raleigh", "Columbia", "Denver", "Hartford", "Bismarck", "Pierre", "Dover", "Tallahassee", "Atlanta", "Honolulu", "Boise", "Springfield", "Indianapolis", "Des Moines", "Topeka", "Frankfort", "Baton Rouge", "Augusta", "Annapolis", "Boston", "Lansing", "Saint Paul", "Jackson", "Jefferson City", "Helena", "Lincoln", "Carson City", "Concord", "Trenton", "Santa Fe", "Albany", "Columbus", "Oklahoma City", "Salem", "Harrisburg", "Providence", "Nashville", "Austin", "Salt Lake City", "Montpelier", "Richmond", "Charleston", "Olympia", "Madison", "Cheyenne" }; Integer random = ((Long) Math.round(Math.random() * (list.length - 1))).intValue(); return list[random]; } // Get message private static String produceData() { return getUsCity(); }
Send the message to the kafka topic
// function to push the message to kafka topic SendToKafka(producer, record); // Send message to Topic in Kafka private static void SendToKafka(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) { try { producer.send(record, new ProducerCallback()); } catch (Exception e) { e.printStackTrace(); } } // Method to see if the message is send, if no exception the message is send private static class ProducerCallback implements Callback { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e!=null) { e.printStackTrace(); } else { System.out.println("sent on : " + recordMetadata.topic() + " offset : " + recordMetadata.offset()); } } }
Main function
// Imports import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; // Instanciation du KafkaProducer KafkaProducer<String, String> producer = createKafkaProducer(); // Write do { // create the topic message, enter topic name and message ProducerRecord<String, String> record = new ProducerRecord<>("TopicJAVA", produceData()); // function to push the message to kafka topic SendToKafka(producer, record); Thread.sleep(500); } while (true);
Consumer
Instantiation of the KafkaConsumer
//Creation of the consumer KafkaConsumer<String, String> consumer = createKafkaConsumer(); private static KafkaConsumer<String, String> createKafkaConsumer() { Map<String,Object> map=new HashMap<>(); map.put("bootstrap.servers","127.0.0.1:9092,127.0.0.1:9093"); map.put("group.id","MyGroupId"); map.put("enable.auto.commit","false"); map.put("auto.commit.interval.ms","1000"); map.put("auto.offset.reset","latest"); map.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); map.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); return new KafkaConsumer<>(map); } //Configuring of the consumer consumer.subscribe(Collections.singletonList("TopicJAVA"), new ConsumerRebalance()); private static class ConsumerRebalance implements ConsumerRebalanceListener { public void onPartitionsAssigned(Collection<TopicPartition> partitions) { partitions.forEach( topicPartition -> System.out.printf("Assigned partition %d of topic %s%n", topicPartition.partition(), topicPartition.topic()) ); } public void onPartitionsRevoked(Collection<TopicPartition> partitions) { for (TopicPartition topicPartition : partitions) { System.out.printf("Revoked partition %d of topic %s%n", topicPartition.partition(), topicPartition.topic()); } } }
Read the message to the kafka topic
// Creation of the message ConsumerRecords<String,String> consumerRecords=consumer.poll(100); // Display topic message consumerRecords.forEach(Example_Java_Kafka_Consumer::display); private static void display(ConsumerRecord<String, String> record) { System.out.println(String.format("topic = %s, partition: %d, offset: %d: %s", record.topic(), record.partition(), record.offset(), record.value())); }
Commit the offset
// Manual commit of offset manualAsynchronousCommit(consumer); private static void manualAsynchronousCommit(KafkaConsumer<String, String> consumer) { consumer.commitAsync((offsets, exception) -> { if (exception != null) { exception.printStackTrace(); } else if (offsets != null) { offsets.forEach((topicPartition, offsetAndMetadata) -> { // System.out.printf("commited offset %d for partition %d of topic %s%n", // offsetAndMetadata.offset(), topicPartition.partition(), topicPartition.topic()); }); } }); }
Main function
// Imports import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; // Instanciation du KafkaProducer KafkaConsumer<String, String> consumer = createKafkaConsumer(); //Configuring of the consumer consumer.subscribe(Collections.singletonList("TopicJAVA"), new ConsumerRebalance()); try { while (true) { // Creation of the message ConsumerRecords<String,String> consumerRecords=consumer.poll(100); // Display topic message consumerRecords.forEach(Example_Java_Kafka_Consumer::display); // Manual commit of offset manualAsynchronousCommit(consumer); } } finally { // Close the kafka consumer consumer.close(); }