WSO2: Publish & Consume Message from Kafka

WSO2: Publish & Consume Message from Kafka

Kafka provides a high volume of real-time data processing. Basically, Kafka implements a publisher-subscriber model where producer applications publish events to Kafka while consumer applications subscribe to these events.

Walkthrough:

Create new project in Integration studio >> File >> New >> Integration project and give the Project name.

After creating project Right click on api >> new >> RestAPI >> Create new artifact given required fields as Name and context >> Finish

Configure the Kafka connector

Right click on configs Artifact >> Click on Add or Remove Connector >> Add Connector will redirect to the WSO2 Connector Store.

Search for the specific connector required for your integration scenario and download to the workspace. Click Finish, and your Integration project is ready. The downloaded connector is displayed on the side palette with its operations

Kafka connector version 2.0.5

Right click on configs Artifact >> Click on Add or Remove Connector >> Add Connector will redirect to the WSO2 Connector Store >> Click Add from File System >> Browse from file system >> select downloaded file connector >> Click OK

Right-click on KafkaConfigs and navigate to New à Inbound Endpoint. Select Create a New Inbound Endpoint and click Next.

  • Inbound Endpoint Name: KafkaIBEP
  • Inbound Endpoint Creation Type: Custom

Enter the following details and click Finish.

Configure the custom inbound endpoint properties as shown below.

<inboundEndpoint class=”org.wso2.carbon.inbound.kafka.KafkaMessageConsumer” name=”kafkaIBEP” onError=”Error_seq” sequence=”process_seq” suspend=”false” xmlns=”http://ws.apache.org/ns/synapse”>
<parameters>
<parameter name=”sequential”>true</parameter>
<parameter name=”interval”>10</parameter>
<parameter name=”coordination”>true</parameter>
<parameter name=”inbound.behavior”>polling</parameter>
<parameter name=”value.deserializer”>org.apache.kafka.common.serialization.StringDeserializer</parameter>
<parameter name=”topic.name”>weatherdatatopic</parameter>
<parameter name=”poll.timeout”>100</parameter>
<parameter name=”bootstrap.servers”>localhost:9092</parameter>
<parameter name=”group.id”>hello</parameter>
<parameter name=”contentType”>application/json</parameter>
<parameter name=”key.deserializer”>org.apache.kafka.common.serialization.StringDeserializer</parameter>
<parameter >

name=”class”>org.wso2.carbon.inbound.kafka.KafkaMessageConsumer</parameter>
</parameters>
</inboundEndpoint>

Creating a Sequence Artifact

  • Right-click the ESB Config project and go to New → Sequence to open the New Sequence Artifact dialog box
  • Select Create New Sequence and click Next.
  • In the Save Sequence in field, specify the location to save the sequence.
  • Click Finish

Double-click the process_Seq, Error_Seq mediators to access and review the added logic.

  • Drag and drop the Log mediator to log information about messages or events passing through.

Navigate product page,or click onhttps://kafka.apache.org/downloads Download, and then click Zip Archive to download the product distribution as a ZIP file.

  • Extract the downloaded ZIP file to a location on your computer. The kafka-version-src.tgz folder inside the extracted ZIP file will be your Kafka_home directory.
  • Add the following jar files from <kafka home>/libs folder to the <mi home>/runtime/microesb/lib folder in the micro integrator as shown below jars.
  • kafka_2.12-1.0.0.jar
  • kafka-clients-1.0.0.jar
  • metrics-core-2.2.0.jar
  • scala-library-2.12.3.jar
  • zkclient-0.10.jar and 6. zookeeper-3.4.10.jar and also download the kafka       inbound endpoint from the wso2 connector store.

Navigate page, or click onhttps://store.wso2.com/store/assets/esbconnector/details/b15e9612-5144-4c97-a3f0-179ea583be88 Download

Add the downloaded inbound endpoint jar in <mi home>/runtime/microesb/lib folder.

Open the terminal as shown in below image

To create the topic from kafka and start the server as shown with below steps:

  1. start the zookeeper by executing the following command from

<kafka home>/bin/windows

zookeeper-server-start.bat ..\..\config\zookeeper.properties

  • start the kafka server by executing the following command form

<kafka home>/bin/windows

kafka-server-start.bat ..\..\config\server.properties

  • create topic by using command: kafka-topics.bat –create –topic weatherdatatopic –zookeeper localhost:2181 –replication-factor 1 –partitions 3.

The console will display a message confirming that the topic is created.

After creating topic, publish the data to topic

Using this path <kafka home>/bin/windows execute the following command.

kafka-console-producer.bat –broker-list localhost:9092 –topic weatherdatatopic

The above console shows that the producer is now ready to take the message to the topic.

In this case there is sample data that need to be send to topic, the below image shows the how message is sent to topic.

  • The image below indicates that there is a message in the topic that needs to be consumed.

Consume Message from Kafka:

To consume the messages from the producer we have two ways:

To run consumer, from this path <kafka home>/bin/windows execute the following command   kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic weatherdatatopic -from-beginning  and press enter.

The below image shows the simultaneous message producers and message consumers.

From the integration studio, messages will be consumed from the topic created in kafka by the configurations made in inbound endpoint.

Kafka’s publish-and-consume message model offers a powerful and scalable way to handle data processing and distribution. At its core, this approach allows producers to publish messages to Kafka topics, which are then consumed by one or more consumers. This decoupling of producers and consumers is a game-changer, as it enables businesses to build highly resilient, fault-tolerant, and scalable data pipelines. For more information, please visit www.massiltechnologies.com.

Connect With Us

Connect with us
Scroll to Top