NiFi: Kerberized Kafka Consumer Processor

In this tutorial I will guide you through how to add a Kafka consumer to NiFi which is Kerberized.

For this tutorial you will need an AVRO schema called “person” and it’s contents are as follows.

{
     "type": "record",
     "namespace": "com.example",
     "name": "FullName",
     "fields": [
       { "name": "first_name", "type": "string" },
       { "name": "last_name", "type": "string" }
     ]
}

When ready you can publish this record to Kafka using the Kafka Producer.

{ "first_name": "John", "last_name": "Smith" }

First we need to drag the processor onto the grid.

Next we need select the Kafka Consumer.

Next we configure the processor

 

 

 

 

 

 

 

We will need to create 5 controller services.
First is the Kerberos Service

Next is the SSL Service

Next is the Json Record Reader

Next is the Avro Registry

Next is the Json Record Writer

Now you have finished configuring the services. Ensure your final Kafka Consumer configuration looks like this and you are ready.

Next we need to enable all the controller services

We need to start the processor to start receiving data

Now the record i gave you earlier you can now put to the queue. As you can see the data starts flowing in.

You can now view the queue to see the data.

We are done now and you can start using the consumer.

Kafka & Java: Secured Consumer Read Record

In this tutorial I will show you how to read a record to Kafka. Before you begin you will need Maven/Eclipse all setup and a project ready to go. If you haven’t installed Kafka Kerberos yet please do so.

Import SSL Cert to Java:

Follow this tutorial to “Installing unlimited strength encryption Java libraries

If on Windows do the following

#Import it
"C:\Program Files\Java\jdk1.8.0_171\bin\keytool" -import -file hadoop.csr -keystore "C:\Program Files\Java\jdk1.8.0_171\jre\lib\security\cacerts" -alias "hadoop"

#Check it
"C:\Program Files\Java\jdk1.8.0_171\bin\keytool" -list -v -keystore "C:\Program Files\Java\jdk1.8.0_171\jre\lib\security\cacerts"

#If you want to delete it
"C:\Program Files\Java\jdk1.8.0_171\bin\keytool" -delete -alias hadoop -keystore "C:\Program Files\Java\jdk1.8.0_171\jre\lib\security\cacerts"

POM.xml

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>1.1.0</version>
</dependency>

Imports

import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
import java.io.InputStream;
import java.util.Arrays;

Consumer JAAS Conf (client_jaas.conf)

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useTicketCache=false
    refreshKrb5Config=true
    debug=true
    useKeyTab=true
    storeKey=true
    keyTab="c:\\data\\kafka.service.keytab"
    principal="kafka/hadoop@REALM.CA";
};

Consumer Props File

You can go here to view all the options for consumer properties.

bootstrap.servers=hadoop:9094
group.id=test

security.protocol=SASL_SSL
sasl.kerberos.service.name=kafka

#offset will be periodically committed in the background
enable.auto.commit=true

# The serializer for the key
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# The serializer for the value
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# heartbeat to detect worker failures
session.timeout.ms=10000

#Automatically reset offset to earliest offset
auto.offset.reset=earliest

Initiate Kerberos Authentication

System.setProperty("java.security.auth.login.config", "C:\\data\\kafkaconnect\\kafka\\src\\main\\resources\\client_jaas.conf");
System.setProperty("https.protocols", "TLSv1,TLSv1.1,TLSv1.2");
System.setProperty("java.security.krb5.conf", "C:\\Program Files\\Java\\jdk1.8.0_171\\jre\\lib\\security\\krb5.conf");
System.setProperty("java.security.krb5.realm", "REALM.CA");
System.setProperty("java.security.krb5.kdc", "REALM.CA");
System.setProperty("sun.security.krb5.debug", "false");
System.setProperty("javax.net.debug", "false");
System.setProperty("javax.net.ssl.keyStorePassword", "changeit");
System.setProperty("javax.net.ssl.keyStore", "C:\\Program Files\\Java\\jdk1.8.0_171\\jre\\lib\\security\\cacerts");
System.setProperty("javax.net.ssl.trustStore", "C:\\Program Files\\Java\\jdk1.8.0_171\\jre\\lib\\security\\cacerts");
System.setProperty("javax.net.ssl.trustStorePassword", "changeit");
System.setProperty("javax.security.auth.useSubjectCredsOnly", "true");

Consumer Connection/Send

The record we will read will just be a string for both key and value.

Consumer<String, String> consumer = null;

try {
	ClassLoader classLoader = getClass().getClassLoader();

	try (InputStream props = classLoader.getResourceAsStream("consumer.props")) {
		Properties properties = new Properties();
		properties.load(props);
		consumer = new KafkaConsumer<>(properties);
	}
	
	System.out.println("Consumer Created");

	// Subscribe to the topic.
	consumer.subscribe(Arrays.asList("testTopic"));

	while (true) {
		final ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
		
		if (consumerRecords.count() == 0) {
			//Keep reading till no records
			break;
		}

		consumerRecords.forEach(record -> {
			System.out.printf("Consumer Record:(%s, %s, %d, %d)\n", record.key(), record.value(), record.partition(), record.offset());
		});

		//Commit offsets returned on the last poll() for all the subscribed list of topics and partition
		consumer.commitAsync();
	}
} finally {
	consumer.close();
}
System.out.println("Consumer Closed");

References

I used kafka-sample-programs as a guide for setting up props.

Kafka & Java: Unsecure Consumer Read Record

In this tutorial I will show you how to read a record to Kafka. Before you begin you will need Maven/Eclipse all setup and a project ready to go. If you haven’t installed Kafka yet please do so.

POM.xml

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>1.1.0</version>
</dependency>

Imports

import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
import java.io.InputStream;
import java.util.Arrays;

Consumer Props File

You can go here to view all the options for consumer properties.

# The url to kafka
bootstrap.servers=localhost:9092

#identify consumer group
group.id=test

#offset will be periodically committed in the background
enable.auto.commit=true

# The serializer for the key
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# The serializer for the value
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# heartbeat to detect worker failures
session.timeout.ms=10000

#Automatically reset offset to earliest offset
auto.offset.reset=earliest

Consumer Connection/Send

The record we will read will just be a string for both key and value.

Consumer<String, String> consumer = null;

try {
	ClassLoader classLoader = getClass().getClassLoader();

	try (InputStream props = classLoader.getResourceAsStream("consumer.props")) {
		Properties properties = new Properties();
		properties.load(props);
		consumer = new KafkaConsumer<>(properties);
	}
	
	System.out.println("Consumer Created");

	// Subscribe to the topic.
	consumer.subscribe(Arrays.asList("testTopic"));

	while (true) {
		final ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
		
		if (consumerRecords.count() == 0) {
			//Keep reading till no records
			break;
		}

		consumerRecords.forEach(record -> {
			System.out.printf("Consumer Record:(%s, %s, %d, %d)\n", record.key(), record.value(), record.partition(), record.offset());
		});

		//Commit offsets returned on the last poll() for all the subscribed list of topics and partition
		consumer.commitAsync();
	}
} finally {
	consumer.close();
}
System.out.println("Consumer Closed");

References

I used kafka-sample-programs as a guide for setting up props.