Java - Producer / Consumer Kafka
Producer Github Project : example-talend-java-kafka-producer
Consumer Github Project : example-talend-java-kafka-consumer
Common part
Maven Dependencies
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
</dependencies>
Producer
Instantiation of the KafkaProducer
KafkaProducer<String, String> producer = createKafkaProducer();
// Create Kafka producer with parameters
private static KafkaProducer<String, String> createKafkaProducer() {
Map<String,Object> map=new HashMap<>();
map.put("bootstrap.servers","127.0.0.1:9092,127.0.0.1:9093");
map.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
map.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
return new KafkaProducer<>(map);
}
Create the topic message
// create the topic message, enter topic name and message
ProducerRecord<String, String> record = new ProducerRecord<>("TopicJAVA", produceData());
// Get random US City Name
private static String getUsCity() {
String[] list = { "Montgomery", "Juneau", "Phoenix", "Little Rock", "Sacramento", "Raleigh", "Columbia", "Denver",
"Hartford", "Bismarck", "Pierre", "Dover", "Tallahassee", "Atlanta", "Honolulu", "Boise", "Springfield",
"Indianapolis", "Des Moines", "Topeka", "Frankfort", "Baton Rouge", "Augusta", "Annapolis", "Boston", "Lansing",
"Saint Paul", "Jackson", "Jefferson City", "Helena", "Lincoln", "Carson City", "Concord", "Trenton", "Santa Fe",
"Albany", "Columbus", "Oklahoma City", "Salem", "Harrisburg", "Providence", "Nashville", "Austin",
"Salt Lake City", "Montpelier", "Richmond", "Charleston", "Olympia", "Madison", "Cheyenne" };
Integer random = ((Long) Math.round(Math.random() * (list.length - 1))).intValue();
return list[random];
}
// Get message
private static String produceData() {
return getUsCity();
}
Send the message to the kafka topic
// function to push the message to kafka topic
SendToKafka(producer, record);
// Send message to Topic in Kafka
private static void SendToKafka(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) {
try {
producer.send(record, new ProducerCallback());
} catch (Exception e) { e.printStackTrace(); }
}
// Method to see if the message is send, if no exception the message is send
private static class ProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e!=null)
{
e.printStackTrace();
} else {
System.out.println("sent on : " + recordMetadata.topic() + " offset : " + recordMetadata.offset());
}
}
}
Main function
// Imports
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
// Instanciation du KafkaProducer
KafkaProducer<String, String> producer = createKafkaProducer();
// Write
do {
// create the topic message, enter topic name and message
ProducerRecord<String, String> record = new ProducerRecord<>("TopicJAVA", produceData());
// function to push the message to kafka topic
SendToKafka(producer, record);
Thread.sleep(500);
} while (true);
Consumer
Instantiation of the KafkaConsumer
//Creation of the consumer
KafkaConsumer<String, String> consumer = createKafkaConsumer();
private static KafkaConsumer<String, String> createKafkaConsumer() {
Map<String,Object> map=new HashMap<>();
map.put("bootstrap.servers","127.0.0.1:9092,127.0.0.1:9093");
map.put("group.id","MyGroupId");
map.put("enable.auto.commit","false");
map.put("auto.commit.interval.ms","1000");
map.put("auto.offset.reset","latest");
map.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
map.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<>(map);
}
//Configuring of the consumer
consumer.subscribe(Collections.singletonList("TopicJAVA"), new ConsumerRebalance());
private static class ConsumerRebalance implements ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
partitions.forEach(
topicPartition ->
System.out.printf("Assigned partition %d of topic %s%n",
topicPartition.partition(), topicPartition.topic())
);
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for (TopicPartition topicPartition : partitions) {
System.out.printf("Revoked partition %d of topic %s%n",
topicPartition.partition(), topicPartition.topic());
}
}
}
Read the message to the kafka topic
// Creation of the message
ConsumerRecords<String,String> consumerRecords=consumer.poll(100);
// Display topic message
consumerRecords.forEach(Example_Java_Kafka_Consumer::display);
private static void display(ConsumerRecord<String, String> record) {
System.out.println(String.format("topic = %s, partition: %d, offset: %d: %s", record.topic(), record.partition(), record.offset(), record.value()));
}
Commit the offset
// Manual commit of offset
manualAsynchronousCommit(consumer);
private static void manualAsynchronousCommit(KafkaConsumer<String, String> consumer) {
consumer.commitAsync((offsets, exception) ->
{
if (exception != null) {
exception.printStackTrace();
} else if (offsets != null) {
offsets.forEach((topicPartition, offsetAndMetadata) -> {
// System.out.printf("commited offset %d for partition %d of topic %s%n",
// offsetAndMetadata.offset(), topicPartition.partition(), topicPartition.topic());
});
}
});
}
Main function
// Imports
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
// Instanciation du KafkaProducer
KafkaConsumer<String, String> consumer = createKafkaConsumer();
//Configuring of the consumer
consumer.subscribe(Collections.singletonList("TopicJAVA"), new ConsumerRebalance());
try {
while (true) {
// Creation of the message
ConsumerRecords<String,String> consumerRecords=consumer.poll(100);
// Display topic message
consumerRecords.forEach(Example_Java_Kafka_Consumer::display);
// Manual commit of offset
manualAsynchronousCommit(consumer);
}
} finally {
// Close the kafka consumer
consumer.close();
}
, multiple selections available,