Kafka provides a high volume of real-time data processing. Basically, Kafka implements a publisher-subscriber model where producer applications publish events to Kafka while consumer applications subscribe to these events.
Walkthrough:
Create new project in Integration studio >> File >> New >> Integration project and give the Project name.
After creating project Right click on api >> new >> RestAPI >> Create new artifact given required fields as Name and context >> Finish
Configure the Kafka connector
Right click on configs Artifact >> Click on Add or Remove Connector >> Add Connector will redirect to the WSO2 Connector Store.
Search for the specific connector required for your integration scenario and download to the workspace. Click Finish, and your Integration project is ready. The downloaded connector is displayed on the side palette with its operations
Kafka connector version 2.0.5
Right click on configs Artifact >> Click on Add or Remove Connector >> Add Connector will redirect to the WSO2 Connector Store >> Click Add from File System >> Browse from file system >> select downloaded file connector >> Click OK
Right-click on KafkaConfigs and navigate to New à Inbound Endpoint. Select Create a New Inbound Endpoint and click Next.
- Inbound Endpoint Name: KafkaIBEP
- Inbound Endpoint Creation Type: Custom
Enter the following details and click Finish.
Configure the custom inbound endpoint properties as shown below.
<inboundEndpoint class=”org.wso2.carbon.inbound.kafka.KafkaMessageConsumer” name=”kafkaIBEP” onError=”Error_seq” sequence=”process_seq” suspend=”false” xmlns=”http://ws.apache.org/ns/synapse”>
<parameters>
<parameter name=”sequential”>true</parameter>
<parameter name=”interval”>10</parameter>
<parameter name=”coordination”>true</parameter>
<parameter name=”inbound.behavior”>polling</parameter>
<parameter name=”value.deserializer”>org.apache.kafka.common.serialization.StringDeserializer</parameter>
<parameter name=”topic.name”>weatherdatatopic</parameter>
<parameter name=”poll.timeout”>100</parameter>
<parameter name=”bootstrap.servers”>localhost:9092</parameter>
<parameter name=”group.id”>hello</parameter>
<parameter name=”contentType”>application/json</parameter>
<parameter name=”key.deserializer”>org.apache.kafka.common.serialization.StringDeserializer</parameter>
<parameter >
name=”class”>org.wso2.carbon.inbound.kafka.KafkaMessageConsumer</parameter>
</parameters>
</inboundEndpoint>
Creating a Sequence Artifact
- Right-click the ESB Config project and go to New → Sequence to open the New Sequence Artifact dialog box
- Select Create New Sequence and click Next.
- In the Save Sequence in field, specify the location to save the sequence.
- Click Finish
Double-click the process_Seq, Error_Seq mediators to access and review the added logic.
- Drag and drop the Log mediator to log information about messages or events passing through.
Navigate product page,or click on “https://kafka.apache.org/downloads Download, and then click Zip Archive to download the product distribution as a ZIP file.
- Extract the downloaded ZIP file to a location on your computer. The kafka-version-src.tgz folder inside the extracted ZIP file will be your Kafka_home directory.
- Add the following jar files from <kafka home>/libs folder to the <mi home>/runtime/microesb/lib folder in the micro integrator as shown below jars.
- kafka_2.12-1.0.0.jar
- kafka-clients-1.0.0.jar
- metrics-core-2.2.0.jar
- scala-library-2.12.3.jar
- zkclient-0.10.jar and 6. zookeeper-3.4.10.jar and also download the kafka inbound endpoint from the wso2 connector store.
Navigate page, or click onhttps://store.wso2.com/store/assets/esbconnector/details/b15e9612-5144-4c97-a3f0-179ea583be88 Download
Add the downloaded inbound endpoint jar in <mi home>/runtime/microesb/lib folder.
Open the terminal as shown in below image
To create the topic from kafka and start the server as shown with below steps:
- start the zookeeper by executing the following command from
<kafka home>/bin/windows
zookeeper-server-start.bat ..\..\config\zookeeper.properties
- start the kafka server by executing the following command form
<kafka home>/bin/windows
kafka-server-start.bat ..\..\config\server.properties
- create topic by using command: kafka-topics.bat –create –topic weatherdatatopic –zookeeper localhost:2181 –replication-factor 1 –partitions 3.
The console will display a message confirming that the topic is created.
After creating topic, publish the data to topic
Using this path <kafka home>/bin/windows execute the following command.
kafka-console-producer.bat –broker-list localhost:9092 –topic weatherdatatopic
The above console shows that the producer is now ready to take the message to the topic.
In this case there is sample data that need to be send to topic, the below image shows the how message is sent to topic.
- The image below indicates that there is a message in the topic that needs to be consumed.
Consume Message from Kafka:
To consume the messages from the producer we have two ways:
To run consumer, from this path <kafka home>/bin/windows execute the following command kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic weatherdatatopic -from-beginning and press enter.
The below image shows the simultaneous message producers and message consumers.
From the integration studio, messages will be consumed from the topic created in kafka by the configurations made in inbound endpoint.
Kafka’s publish-and-consume message model offers a powerful and scalable way to handle data processing and distribution. At its core, this approach allows producers to publish messages to Kafka topics, which are then consumed by one or more consumers. This decoupling of producers and consumers is a game-changer, as it enables businesses to build highly resilient, fault-tolerant, and scalable data pipelines. For more information, please visit www.massiltechnologies.com.