Talend - Producer / Consumer Kafka


Producer Github Project : example-talend-kafka-producer

Consumer Github Project : example-talend-kafka-consumer

Preamble


Configuration: Context


To create the different jobs displayed in this article, you have to create a repository: With VALUES.



Producer 


Example: Produce a message to the Kafka cluster (with all versions of Data Fabric)

  • Create a new job
  • Add the component "tKafkaConnection" : allows the creation of a Kafka cluster
  • Add the component "tKafkaCreateTopic" : create the topic Kafka with the settings
  • Add the source component, in this example a "tRowGenerator" is used to generate 100 lines
  • Add the component "tKafkaOutput" : allows the send the message to the Kafka cluster


Double click on "tKafkaConnection" and set its properties:

  • Choose the version of Kafka
  • Enter IP adresses of the Zookerper (example : localhost:2181)
  • Enter IP adresses of the brokers (example: localhost:9092)


Double click on "tKafkaCreateTopic" and set its properties:

  • Choose to used the existing connection
  • Choose an action on the topic "Create topic if not exists"
  • Enter topic name
  • Enter replication factor
  • Enter number of partitions
  • Set topic retention time




Double click on "tRowGenerator" and set its properties:

  • Add a column 
    • choose the type "Bytes[]"
    • enter the fonction "TalendDataGenerator.getUsCity().getBytes()
  • Enter the number of lines to generate




Double click on "tKafkaOutput" and set its properties:

  • Choose to used the existing connection
  • Enter the topic name



  • Run the job

Consumer


Example: Consume a message from the Kafka cluster (with all versions of Data Fabric)

  • Create a new job
  • Add the component "tKafkaConnection" : allows the creation of a Kafka cluster
  • Add the component "tKafkaInput" :  allows to consume the topic Kafka
  • Add the component "tLogRow" :  allows to display the messages of the topic in the terminal



Double click on "tKafkaConnection" and set its properties:

  • Choose the version of Kafka
  • Enter IP adresses of the Zookerper (example : localhost:2181)
  • Enter IP adresses of the brokers (example: localhost:9092)


Double click on "tKafkaInput" and set its properties:

  • Choose to the output type "bytes[]"
  • Choose to used the existing connection
  • Enter topic name "TopicTestTalend"
  • Enter consumer group id "mygroup"
  • Enter intervalle to auto-commt offset
  • Enter the maximum wait time between messages to stop



  • Run the job