Tuesday, 9 August 2016

Using Java API for publish/consume Kafka messages


For accessing Java APIs for accessing Kafka, you need to include jar files under kafka<version>/libs in your class path.

A sample message producer java program for Kafka is as follows:


package com.prasune.test.kafka;

import java.util.Date;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 *
 * @author prajohn
 */
public class TestKafkaProducer {
   
    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("zookeeper.connect", "localhost:2181");
        props.put("metadata.broker.list", "localhost:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);

        Producer<String, String> producer = new Producer<String, String>(config);

        String topicName = "test";
        String message = "Test message from Java Producer" + new Date();
        KeyedMessage<String, String> data = new KeyedMessage<String, String>(topicName, message);
        producer.send(data);
       
        producer.close();
    }
   
}



A sample message consumer from Kafka via java program is as follows:


package com.prasune.test.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import jersey.repackaged.com.google.common.collect.ImmutableMap;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;

/**
 *
 * @author prajohn
 */
public class TestKafkaConsumer {
   
    public static void main(String[] args) {
       
        // specify some consumer properties
        Properties props = new Properties();
        props.put("zookeeper.connect", "localhost:2181");
        props.put("zookeeper.connectiontimeout.ms", "1000000");
        props.put("group.id", "mygroupid2");
       
        // Create the connection to the cluster
        ConsumerConfig cf = new ConsumerConfig(props) ;
        ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(cf) ;
       
        // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume       
        Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams =
                        consumerConnector.createMessageStreams(ImmutableMap.of("test", 4));
        List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get("test");

        // create list of 4 threads to consume from each of the partitions
        ExecutorService executor = Executors.newFixedThreadPool(4);

        // consume the messages in the threads
        for(final KafkaStream<byte[], byte[]> stream: streams) {
          executor.submit(new Runnable() {
            public void run() {
              for(MessageAndMetadata msgAndMetadata: stream) {
                System.out.println(new String((byte[]) msgAndMetadata.message()));
              }              
            }
          });
        }
    }
}



No comments:

Post a Comment