A sample message producer java program for Kafka is as follows:
package com.prasune.test.kafka;
import java.util.Date;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
*
* @author prajohn
*/
public class TestKafkaProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
String topicName = "test";
String message = "Test message from Java Producer" + new Date();
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topicName, message);
producer.send(data);
producer.close();
}
}
|
A sample message consumer from Kafka via java program is as follows:
package com.prasune.test.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import jersey.repackaged.com.google.common.collect.ImmutableMap;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
/**
*
* @author prajohn
*/
public class TestKafkaConsumer {
public static void main(String[] args) {
// specify some consumer properties
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181");
props.put("zookeeper.connectiontimeout.ms", "1000000");
props.put("group.id", "mygroupid2");
// Create the connection to the cluster
ConsumerConfig cf = new ConsumerConfig(props) ;
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(cf) ;
// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams =
consumerConnector.createMessageStreams(ImmutableMap.of("test", 4));
List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get("test");
// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(4);
// consume the messages in the threads
for(final KafkaStream<byte[], byte[]> stream: streams) {
executor.submit(new Runnable() {
public void run() {
for(MessageAndMetadata msgAndMetadata: stream) {
System.out.println(new String((byte[]) msgAndMetadata.message()));
}
}
});
}
}
}
|
No comments:
Post a Comment