Java - Producer / Consumer Kafka


Producer Github Project : example-talend-java-kafka-producer

Consumer Github Project : example-talend-java-kafka-consumer

Common part

Maven Dependencies

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.1</version>
    </dependency>
</dependencies>


Producer

Instantiation of the KafkaProducer

KafkaProducer<String, String> producer = createKafkaProducer();


// Create Kafka producer with parameters
private static KafkaProducer<String, String> createKafkaProducer() {
    Map<String,Object> map=new HashMap<>();
    map.put("bootstrap.servers","127.0.0.1:9092,127.0.0.1:9093");
    map.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    map.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

    return new KafkaProducer<>(map);
}


Create the topic message

// create the topic message, enter topic name and message
ProducerRecord<String, String> record = new ProducerRecord<>("TopicJAVA", produceData());


// Get random US City Name
private static String getUsCity() {
    String[] list = { "Montgomery", "Juneau", "Phoenix", "Little Rock", "Sacramento", "Raleigh", "Columbia", "Denver",
            "Hartford", "Bismarck", "Pierre", "Dover", "Tallahassee", "Atlanta", "Honolulu", "Boise", "Springfield",
            "Indianapolis", "Des Moines", "Topeka", "Frankfort", "Baton Rouge", "Augusta", "Annapolis", "Boston", "Lansing",
            "Saint Paul", "Jackson", "Jefferson City", "Helena", "Lincoln", "Carson City", "Concord", "Trenton", "Santa Fe",
            "Albany", "Columbus", "Oklahoma City", "Salem", "Harrisburg", "Providence", "Nashville", "Austin",
            "Salt Lake City", "Montpelier", "Richmond", "Charleston", "Olympia", "Madison", "Cheyenne" };
    Integer random = ((Long) Math.round(Math.random() * (list.length - 1))).intValue();
    return list[random];
}

// Get message
private static String produceData() {
    return getUsCity();
}


Send the message to the kafka topic

// function to push the message to kafka topic
SendToKafka(producer, record);


// Send message to Topic in Kafka
private static void SendToKafka(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) {
    try {
        producer.send(record, new ProducerCallback());
    } catch (Exception e) { e.printStackTrace(); }
}

// Method to see if the message is send, if no exception the message is send
private static class ProducerCallback implements Callback {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e!=null)
        {
            e.printStackTrace();
        } else {
            System.out.println("sent on : " + recordMetadata.topic() + " offset : " + recordMetadata.offset());
        }

    }
}


Main function

// Imports
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;


// Instanciation du KafkaProducer
KafkaProducer<String, String> producer = createKafkaProducer();

// Write
do {
    // create the topic message, enter topic name and message
    ProducerRecord<String, String> record = new ProducerRecord<>("TopicJAVA", produceData());
	// function to push the message to kafka topic
    SendToKafka(producer, record);
    Thread.sleep(500);
} while (true);


Consumer

Instantiation of the KafkaConsumer

//Creation of the consumer
KafkaConsumer<String, String> consumer = createKafkaConsumer();


private static KafkaConsumer<String, String> createKafkaConsumer() {
    Map<String,Object> map=new HashMap<>();
    map.put("bootstrap.servers","127.0.0.1:9092,127.0.0.1:9093");
    map.put("group.id","MyGroupId");
    map.put("enable.auto.commit","false");
    map.put("auto.commit.interval.ms","1000");
    map.put("auto.offset.reset","latest");
    map.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    map.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
    
	return new KafkaConsumer<>(map);
}


//Configuring of the consumer
consumer.subscribe(Collections.singletonList("TopicJAVA"), new ConsumerRebalance());


private static class ConsumerRebalance implements ConsumerRebalanceListener {
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        partitions.forEach(
                topicPartition ->
                        System.out.printf("Assigned partition %d of topic %s%n",
                                topicPartition.partition(), topicPartition.topic())
        );

    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        for (TopicPartition topicPartition : partitions) {
            System.out.printf("Revoked partition %d of topic %s%n",
                    topicPartition.partition(), topicPartition.topic());
        }

    }
}


Read the message to the kafka topic

// Creation of the message
ConsumerRecords<String,String> consumerRecords=consumer.poll(100);
// Display topic message 
consumerRecords.forEach(Example_Java_Kafka_Consumer::display);


private static void display(ConsumerRecord<String, String> record) {
    System.out.println(String.format("topic = %s, partition: %d, offset: %d: %s", record.topic(), record.partition(), record.offset(), record.value()));
}



Commit the offset

// Manual commit of offset
manualAsynchronousCommit(consumer);


private static void manualAsynchronousCommit(KafkaConsumer<String, String> consumer) {
    consumer.commitAsync((offsets, exception) ->
    {
        if (exception != null) {
            exception.printStackTrace();
        } else if (offsets != null) {
            offsets.forEach((topicPartition, offsetAndMetadata) -> {
               // System.out.printf("commited offset %d for partition %d of topic %s%n",
               //        offsetAndMetadata.offset(), topicPartition.partition(), topicPartition.topic());
            });
        }
    });
}


Main function

// Imports
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;


// Instanciation du KafkaProducer
KafkaConsumer<String, String> consumer = createKafkaConsumer();

//Configuring of the consumer
consumer.subscribe(Collections.singletonList("TopicJAVA"), new ConsumerRebalance());

try {
    while (true) {
        // Creation of the message
		ConsumerRecords<String,String> consumerRecords=consumer.poll(100);
		// Display topic message 
        consumerRecords.forEach(Example_Java_Kafka_Consumer::display);
		// Manual commit of offset
        manualAsynchronousCommit(consumer);

    }
} finally {
	// Close the kafka consumer
    consumer.close();
}