/
Talend - Producer / Consumer Kafka
Talend - Producer / Consumer Kafka
Producer Github Project : example-talend-kafka-producer
Consumer Github Project : example-talend-kafka-consumer
Preamble
Configuration: Context
To create the different jobs displayed in this article, you have to create a repository: With VALUES.
Producer
Example: Produce a message to the Kafka cluster (with all versions of Data Fabric)
- Create a new job
- Add the component "tKafkaConnection" : allows the creation of a Kafka cluster
- Add the component "tKafkaCreateTopic" : create the topic Kafka with the settings
- Add the source component, in this example a "tRowGenerator" is used to generate 100 lines
- Add the component "tKafkaOutput" : allows the send the message to the Kafka cluster
Double click on "tKafkaConnection" and set its properties:
- Choose the version of Kafka
- Enter IP adresses of the Zookerper (example : localhost:2181)
- Enter IP adresses of the brokers (example: localhost:9092)
Double click on "tKafkaCreateTopic" and set its properties:
- Choose to used the existing connection
- Choose an action on the topic "Create topic if not exists"
- Enter topic name
- Enter replication factor
- Enter number of partitions
- Set topic retention time
Double click on "tRowGenerator" and set its properties:
- Add a column
- choose the type "Bytes[]"
- enter the fonction "TalendDataGenerator.getUsCity().getBytes()
- Enter the number of lines to generate
Double click on "tKafkaOutput" and set its properties:
- Choose to used the existing connection
- Enter the topic name
- Run the job
Consumer
Example: Consume a message from the Kafka cluster (with all versions of Data Fabric)
- Create a new job
- Add the component "tKafkaConnection" : allows the creation of a Kafka cluster
- Add the component "tKafkaInput" : allows to consume the topic Kafka
- Add the component "tLogRow" : allows to display the messages of the topic in the terminal
Double click on "tKafkaConnection" and set its properties:
- Choose the version of Kafka
- Enter IP adresses of the Zookerper (example : localhost:2181)
- Enter IP adresses of the brokers (example: localhost:9092)
Double click on "tKafkaInput" and set its properties:
- Choose to the output type "bytes[]"
- Choose to used the existing connection
- Enter topic name "TopicTestTalend"
- Enter consumer group id "mygroup"
- Enter intervalle to auto-commt offset
- Enter the maximum wait time between messages to stop
- Run the job