Saturday 13 August 2016

Adding debug statements in a PL/SQL block/procedure/function


The most common strategy used for adding debug statement is to create a temp table for debug purpose and log the data into it via autonomous transaction.

Creating autonomous transaction is very important as your transaction with business logic may get rollback due to any unexpected error.

The below script can be run for adding the temp table and autonomous procedure for adding the logs.

enable_debug.sql :


CREATE TABLE DBUGTAB(ERROR_CODE NUMBER, MODULE_NAME VARCHAR2(4000), ERROR_MSG VARCHAR2(4000));
 CREATE or REPLACE PACKAGE dbug as PROCEDURE debug(p_module_name VARCHAR2,p_error_msg in VARCHAR2);
 END dbug;
 /
 CREATE OR REPLACE PACKAGE BODY dbug AS
 PROCEDURE debug(p_module_name VARCHAR2, p_error_msg in VARCHAR2) is  PRAGMA AUTONOMOUS_TRANSACTION;    
l_errornum    NUMBER;
 BEGIN    
SELECT nvl(max(ERROR_CODE),0) into l_errornum from dbugtab;     l_errornum := l_errornum+1;    
INSERT into dbugtab(error_code, error_msg ,module_name)     values         (l_errornum, p_error_msg, p_module_name);    
COMMIT;
 END;
 END dbug;

 /


For adding the debug statements, the below sample can be followed:
dbug.debug('anbcdcd','adadad');

To query the debug statements in the order of their insertion:
SELECT * FROM dbugtab ORDER BY error_code;


Tuesday 9 August 2016

Using Java API for publish/consume Kafka messages


For accessing Java APIs for accessing Kafka, you need to include jar files under kafka<version>/libs in your class path.

A sample message producer java program for Kafka is as follows:


package com.prasune.test.kafka;

import java.util.Date;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 *
 * @author prajohn
 */
public class TestKafkaProducer {
   
    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("zookeeper.connect", "localhost:2181");
        props.put("metadata.broker.list", "localhost:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);

        Producer<String, String> producer = new Producer<String, String>(config);

        String topicName = "test";
        String message = "Test message from Java Producer" + new Date();
        KeyedMessage<String, String> data = new KeyedMessage<String, String>(topicName, message);
        producer.send(data);
       
        producer.close();
    }
   
}



A sample message consumer from Kafka via java program is as follows:


package com.prasune.test.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import jersey.repackaged.com.google.common.collect.ImmutableMap;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;

/**
 *
 * @author prajohn
 */
public class TestKafkaConsumer {
   
    public static void main(String[] args) {
       
        // specify some consumer properties
        Properties props = new Properties();
        props.put("zookeeper.connect", "localhost:2181");
        props.put("zookeeper.connectiontimeout.ms", "1000000");
        props.put("group.id", "mygroupid2");
       
        // Create the connection to the cluster
        ConsumerConfig cf = new ConsumerConfig(props) ;
        ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(cf) ;
       
        // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume       
        Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams =
                        consumerConnector.createMessageStreams(ImmutableMap.of("test", 4));
        List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get("test");

        // create list of 4 threads to consume from each of the partitions
        ExecutorService executor = Executors.newFixedThreadPool(4);

        // consume the messages in the threads
        for(final KafkaStream<byte[], byte[]> stream: streams) {
          executor.submit(new Runnable() {
            public void run() {
              for(MessageAndMetadata msgAndMetadata: stream) {
                System.out.println(new String((byte[]) msgAndMetadata.message()));
              }              
            }
          });
        }
    }
}