Azure: EventHub

In this tutorial I will show you how to connect to event hub from Python. Ensure you have first installed an IDE (Eclipse) and Python3.7.

Python Package Installation

pip3 install azure-eventhub

Create a Producer

This will publish events to event hub. The important part here is the “EndPoint”. You need to login to Azure Portal and get the get the endpoint from the “Shared Access Policies” from the event hub namespace.

from azure.eventhub import EventHubProducerClient, EventData, EventHubConsumerClient

connection_str = 'Endpoint=sb://testeventhubnamespace.servicebus.windows.net/;SharedAccessKeyName=<<THE_ACCESS_KEY_NAME>>;SharedAccessKey=<<THE_ACCESS_KEY>>'
eventhub_name = '<<THE_EVENT_HUB_NAME>>'
producer = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)

event_data_batch = producer.create_batch()

event_data_batch.add(EventData('My Test Data'))

with producer:
    producer.send_batch(event_data_batch)

Create a Consumer

This will monitor the event hub for new messages.

from azure.eventhub import EventHubProducerClient, EventData, EventHubConsumerClient

connection_str = 'Endpoint=sb://testeventhubnamespace.servicebus.windows.net/;SharedAccessKeyName=<<THE_ACCESS_KEY_NAME>>;SharedAccessKey=<<THE_ACCESS_KEY>>'
eventhub_name = '<<THE_EVENT_HUB_NAME>>'
consumer_group = '<<THE_EVENT_HUB_CONSUMER_GROUP>>'
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)

def on_event(partition_context, event):
    print("Received event from partition {} - {}".format(partition_context.partition_id, event))
    partition_context.update_checkpoint(event)

with client:
    #client.receive(
    #    on_event=on_event, 
    #    starting_position="-1",  # "-1" is from the beginning of the partition.
    #)
    client.receive(
        on_event=on_event
    )

 

Kafka & Java: Secured Consumer Read Record

In this tutorial I will show you how to read a record to Kafka. Before you begin you will need Maven/Eclipse all setup and a project ready to go. If you haven’t installed Kafka Kerberos yet please do so.

Import SSL Cert to Java:

Follow this tutorial to “Installing unlimited strength encryption Java libraries

If on Windows do the following

#Import it
"C:\Program Files\Java\jdk1.8.0_171\bin\keytool" -import -file hadoop.csr -keystore "C:\Program Files\Java\jdk1.8.0_171\jre\lib\security\cacerts" -alias "hadoop"

#Check it
"C:\Program Files\Java\jdk1.8.0_171\bin\keytool" -list -v -keystore "C:\Program Files\Java\jdk1.8.0_171\jre\lib\security\cacerts"

#If you want to delete it
"C:\Program Files\Java\jdk1.8.0_171\bin\keytool" -delete -alias hadoop -keystore "C:\Program Files\Java\jdk1.8.0_171\jre\lib\security\cacerts"

POM.xml

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>1.1.0</version>
</dependency>

Imports

import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
import java.io.InputStream;
import java.util.Arrays;

Consumer JAAS Conf (client_jaas.conf)

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useTicketCache=false
    refreshKrb5Config=true
    debug=true
    useKeyTab=true
    storeKey=true
    keyTab="c:\\data\\kafka.service.keytab"
    principal="kafka/hadoop@REALM.CA";
};

Consumer Props File

You can go here to view all the options for consumer properties.

bootstrap.servers=hadoop:9094
group.id=test

security.protocol=SASL_SSL
sasl.kerberos.service.name=kafka

#offset will be periodically committed in the background
enable.auto.commit=true

# The serializer for the key
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# The serializer for the value
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# heartbeat to detect worker failures
session.timeout.ms=10000

#Automatically reset offset to earliest offset
auto.offset.reset=earliest

Initiate Kerberos Authentication

System.setProperty("java.security.auth.login.config", "C:\\data\\kafkaconnect\\kafka\\src\\main\\resources\\client_jaas.conf");
System.setProperty("https.protocols", "TLSv1,TLSv1.1,TLSv1.2");
System.setProperty("java.security.krb5.conf", "C:\\Program Files\\Java\\jdk1.8.0_171\\jre\\lib\\security\\krb5.conf");
System.setProperty("java.security.krb5.realm", "REALM.CA");
System.setProperty("java.security.krb5.kdc", "REALM.CA");
System.setProperty("sun.security.krb5.debug", "false");
System.setProperty("javax.net.debug", "false");
System.setProperty("javax.net.ssl.keyStorePassword", "changeit");
System.setProperty("javax.net.ssl.keyStore", "C:\\Program Files\\Java\\jdk1.8.0_171\\jre\\lib\\security\\cacerts");
System.setProperty("javax.net.ssl.trustStore", "C:\\Program Files\\Java\\jdk1.8.0_171\\jre\\lib\\security\\cacerts");
System.setProperty("javax.net.ssl.trustStorePassword", "changeit");
System.setProperty("javax.security.auth.useSubjectCredsOnly", "true");

Consumer Connection/Send

The record we will read will just be a string for both key and value.

Consumer<String, String> consumer = null;

try {
	ClassLoader classLoader = getClass().getClassLoader();

	try (InputStream props = classLoader.getResourceAsStream("consumer.props")) {
		Properties properties = new Properties();
		properties.load(props);
		consumer = new KafkaConsumer<>(properties);
	}
	
	System.out.println("Consumer Created");

	// Subscribe to the topic.
	consumer.subscribe(Arrays.asList("testTopic"));

	while (true) {
		final ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
		
		if (consumerRecords.count() == 0) {
			//Keep reading till no records
			break;
		}

		consumerRecords.forEach(record -> {
			System.out.printf("Consumer Record:(%s, %s, %d, %d)\n", record.key(), record.value(), record.partition(), record.offset());
		});

		//Commit offsets returned on the last poll() for all the subscribed list of topics and partition
		consumer.commitAsync();
	}
} finally {
	consumer.close();
}
System.out.println("Consumer Closed");

References

I used kafka-sample-programs as a guide for setting up props.

Kafka & Java: Consumer Seek To Beginning

This is a quick tutorial on how to seek to beginning using a Kafka consumer. If you haven’t setup the consumer yet follow this tutorial.

This is all that is required once you have setup the consumer. This will put the kafka offset for the topic of your choice to the beginning so once you start reading you will get all records.

consumer.seekToBeginning(consumer.assignment());

Kafka & Java: Consumer List Topics

In this tutorial I will show you how to list all topics in Kafka. Before you begin you will need Maven/Eclipse all setup and a project ready to go. Also you should go through this tutorial to setup the consumer.

Imports

import java.util.Map;
import java.util.List;
import org.apache.kafka.common.PartitionInfo;

Consumer List Topics

Map<String, List> listTopics = consumer.listTopics();
System.out.println("list of topic size :" + listTopics.size());

for (String topic : listTopics.keySet()) {
	System.out.println("topic name :" + topic);
}

 

 

 

Kafka & Java: Unsecure Consumer Read Record

In this tutorial I will show you how to read a record to Kafka. Before you begin you will need Maven/Eclipse all setup and a project ready to go. If you haven’t installed Kafka yet please do so.

POM.xml

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>1.1.0</version>
</dependency>

Imports

import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
import java.io.InputStream;
import java.util.Arrays;

Consumer Props File

You can go here to view all the options for consumer properties.

# The url to kafka
bootstrap.servers=localhost:9092

#identify consumer group
group.id=test

#offset will be periodically committed in the background
enable.auto.commit=true

# The serializer for the key
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# The serializer for the value
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# heartbeat to detect worker failures
session.timeout.ms=10000

#Automatically reset offset to earliest offset
auto.offset.reset=earliest

Consumer Connection/Send

The record we will read will just be a string for both key and value.

Consumer<String, String> consumer = null;

try {
	ClassLoader classLoader = getClass().getClassLoader();

	try (InputStream props = classLoader.getResourceAsStream("consumer.props")) {
		Properties properties = new Properties();
		properties.load(props);
		consumer = new KafkaConsumer<>(properties);
	}
	
	System.out.println("Consumer Created");

	// Subscribe to the topic.
	consumer.subscribe(Arrays.asList("testTopic"));

	while (true) {
		final ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
		
		if (consumerRecords.count() == 0) {
			//Keep reading till no records
			break;
		}

		consumerRecords.forEach(record -> {
			System.out.printf("Consumer Record:(%s, %s, %d, %d)\n", record.key(), record.value(), record.partition(), record.offset());
		});

		//Commit offsets returned on the last poll() for all the subscribed list of topics and partition
		consumer.commitAsync();
	}
} finally {
	consumer.close();
}
System.out.println("Consumer Closed");

References

I used kafka-sample-programs as a guide for setting up props.