Producer Github Project : example-talend-java-kafka-producer
Consumer Github Project : example-talend-java-kafka-consumer
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.1</version> </dependency> </dependencies> |
KafkaProducer<String, String> producer = createKafkaProducer(); // Create Kafka producer with parameters private static KafkaProducer<String, String> createKafkaProducer() { Map<String,Object> map=new HashMap<>(); map.put("bootstrap.servers","127.0.0.1:9092,127.0.0.1:9093"); map.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); map.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); return new KafkaProducer<>(map); } |
// create the topic message, enter topic name and message ProducerRecord<String, String> record = new ProducerRecord<>("TopicJAVA", produceData()); // Get random US City Name private static String getUsCity() { String[] list = { "Montgomery", "Juneau", "Phoenix", "Little Rock", "Sacramento", "Raleigh", "Columbia", "Denver", "Hartford", "Bismarck", "Pierre", "Dover", "Tallahassee", "Atlanta", "Honolulu", "Boise", "Springfield", "Indianapolis", "Des Moines", "Topeka", "Frankfort", "Baton Rouge", "Augusta", "Annapolis", "Boston", "Lansing", "Saint Paul", "Jackson", "Jefferson City", "Helena", "Lincoln", "Carson City", "Concord", "Trenton", "Santa Fe", "Albany", "Columbus", "Oklahoma City", "Salem", "Harrisburg", "Providence", "Nashville", "Austin", "Salt Lake City", "Montpelier", "Richmond", "Charleston", "Olympia", "Madison", "Cheyenne" }; Integer random = ((Long) Math.round(Math.random() * (list.length - 1))).intValue(); return list[random]; } // Get message private static String produceData() { return getUsCity(); } |
// function to push the message to kafka topic SendToKafka(producer, record); // Send message to Topic in Kafka private static void SendToKafka(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) { try { producer.send(record, new ProducerCallback()); } catch (Exception e) { e.printStackTrace(); } } // Method to see if the message is send, if no exception the message is send private static class ProducerCallback implements Callback { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e!=null) { e.printStackTrace(); } else { System.out.println("sent on : " + recordMetadata.topic() + " offset : " + recordMetadata.offset()); } } } |
// Imports import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; // Instanciation du KafkaProducer KafkaProducer<String, String> producer = createKafkaProducer(); // Write do { // create the topic message, enter topic name and message ProducerRecord<String, String> record = new ProducerRecord<>("TopicJAVA", produceData()); // function to push the message to kafka topic SendToKafka(producer, record); Thread.sleep(500); } while (true); |
//Creation of the consumer KafkaConsumer<String, String> consumer = createKafkaConsumer(); private static KafkaConsumer<String, String> createKafkaConsumer() { Map<String,Object> map=new HashMap<>(); map.put("bootstrap.servers","127.0.0.1:9092,127.0.0.1:9093"); map.put("group.id","MyGroupId"); map.put("enable.auto.commit","false"); map.put("auto.commit.interval.ms","1000"); map.put("auto.offset.reset","latest"); map.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); map.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); return new KafkaConsumer<>(map); } //Configuring of the consumer consumer.subscribe(Collections.singletonList("TopicJAVA"), new ConsumerRebalance()); private static class ConsumerRebalance implements ConsumerRebalanceListener { public void onPartitionsAssigned(Collection<TopicPartition> partitions) { partitions.forEach( topicPartition -> System.out.printf("Assigned partition %d of topic %s%n", topicPartition.partition(), topicPartition.topic()) ); } public void onPartitionsRevoked(Collection<TopicPartition> partitions) { for (TopicPartition topicPartition : partitions) { System.out.printf("Revoked partition %d of topic %s%n", topicPartition.partition(), topicPartition.topic()); } } } |
// Creation of the message ConsumerRecords<String,String> consumerRecords=consumer.poll(100); // Display topic message consumerRecords.forEach(Example_Java_Kafka_Consumer::display); private static void display(ConsumerRecord<String, String> record) { System.out.println(String.format("topic = %s, partition: %d, offset: %d: %s", record.topic(), record.partition(), record.offset(), record.value())); } |
// Manual commit of offset manualAsynchronousCommit(consumer); private static void manualAsynchronousCommit(KafkaConsumer<String, String> consumer) { consumer.commitAsync((offsets, exception) -> { if (exception != null) { exception.printStackTrace(); } else if (offsets != null) { offsets.forEach((topicPartition, offsetAndMetadata) -> { // System.out.printf("commited offset %d for partition %d of topic %s%n", // offsetAndMetadata.offset(), topicPartition.partition(), topicPartition.topic()); }); } }); } |
// Imports import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; // Instanciation du KafkaProducer KafkaConsumer<String, String> consumer = createKafkaConsumer(); //Configuring of the consumer consumer.subscribe(Collections.singletonList("TopicJAVA"), new ConsumerRebalance()); try { while (true) { // Creation of the message ConsumerRecords<String,String> consumerRecords=consumer.poll(100); // Display topic message consumerRecords.forEach(Example_Java_Kafka_Consumer::display); // Manual commit of offset manualAsynchronousCommit(consumer); } } finally { // Close the kafka consumer consumer.close(); } |