NiFi: ExecuteSQL Processor

In this tutorial I will guide you through how to add a processor for querying a SQL table to NiFi.

For this tutorial you will need an AVRO schema called “dttest” and it’s contents are as follows.

{
     "type": "record",
     "namespace": "com.example",
     "name": "FullName",
     "fields": [
       { "name": "id", "type": "int" },
       { "name": "name", "type": "string" }
     ]
}

First we need to drag the processor onto the grid.

Next we need select the processor ExecuteSQLRecord.

Next we must configure the processor.

 

 

 

 

 

 

 

 

Now we must create the JsonRecordWriter service.

Now we name the JsonRecordWriter

Configure the JsonWriter

Next we create the DB Connection Service

Next we name the DB Connection Service

Configure the DB Service

Now validate all the settings are as below

Now you are all done. It will now query your table.

NiFi: Kerberized Kafka Consumer Processor

In this tutorial I will guide you through how to add a Kafka consumer to NiFi which is Kerberized.

For this tutorial you will need an AVRO schema called “person” and it’s contents are as follows.

{
     "type": "record",
     "namespace": "com.example",
     "name": "FullName",
     "fields": [
       { "name": "first_name", "type": "string" },
       { "name": "last_name", "type": "string" }
     ]
}

When ready you can publish this record to Kafka using the Kafka Producer.

{ "first_name": "John", "last_name": "Smith" }

First we need to drag the processor onto the grid.

Next we need select the Kafka Consumer.

Next we configure the processor

 

 

 

 

 

 

 

We will need to create 5 controller services.
First is the Kerberos Service

Next is the SSL Service

Next is the Json Record Reader

Next is the Avro Registry

Next is the Json Record Writer

Now you have finished configuring the services. Ensure your final Kafka Consumer configuration looks like this and you are ready.

Next we need to enable all the controller services

We need to start the processor to start receiving data

Now the record i gave you earlier you can now put to the queue. As you can see the data starts flowing in.

You can now view the queue to see the data.

We are done now and you can start using the consumer.

NiFi: Kerberize/SSL

In this tutorial I will show you how to use Kerberos/SSL with NiFi. I will use self signed certs for this example. Before you begin ensure you have installed Kerberos Server and NiFi.

This assumes your hostname is “hadoop”

Create Kerberos Principals

cd /etc/security/keytabs/

sudo kadmin.local

#You can list principals
listprincs

#Create the following principals
addprinc -randkey nifi/hadoop@REALM.CA
addprinc -randkey nifi-spnego/hadoop@REALM.CA
#Notice this user does not have -randkey because we are a login user
#Also notice that this user does not have a keytab created
addprinc admin/hadoop@REALM.CA


#Create the keytab files.
#You will need these for Hadoop to be able to login
xst -k nifi.service.keytab nifi/hadoop@REALM.CA
xst -k nifi-spnego.service.keytab nifi-spnego/hadoop@REALM.CA

Set Keytab Permissions/Ownership

sudo chown root:hadoopuser /etc/security/keytabs/*
sudo chmod 750 /etc/security/keytabs/*

Stop NiFi

sudo service nifi stop

Hosts Update

sudo nano /etc/hosts

#Remove 127.0.1.1 line

#Change 127.0.0.1 to the following
127.0.0.1 gaudreault_kdc.ca hadoop localhost

Ubuntu Firewall

sudo ufw disable

sysctl.conf

Disable ipv6 as it causes issues in getting your server up and running.

nano /etc/sysctl.conf

Add the following to the end and save

net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1
net.ipv6.conf.lo.disable_ipv6 = 1
#Change eth0 to what ifconfig has
net.ipv6.conf.eth0.disable_ipv6 = 1

Close sysctl

sysctl -p
cat /proc/sys/net/ipv6/conf/all/disable_ipv6
reboot

TrustStore / KeyStore

#Creating your Certificate Authority
sudo mkdir -p /etc/security/serverKeys
sudo chown -R root:hduser /etc/security/serverKeys/
sudo chmod 750 /etc/security/serverKeys/
 
cd /etc/security/serverKeys

sudo openssl genrsa -aes128 -out nifi.key 4096
sudo openssl req -x509 -new -key nifi.key -days 1095 -out nifi.pem
sudo openssl rsa -check -in nifi.key #check it
sudo openssl x509 -outform der -in nifi.pem -out nifi.der
sudo keytool -import -keystore truststore.jks -file nifi.der -alias nifi
#***You must type 'yes' to trust this certificate.
sudo keytool -v -list -keystore truststore.jks

#Creating your Server Keystore
sudo keytool -genkey -alias nifi -keyalg RSA -keystore keystore.jks -keysize 2048
sudo keytool -certreq -alias nifi -keystore keystore.jks -file nifi.csr
sudo openssl x509 -sha256 -req -in nifi.csr -CA nifi.pem -CAkey nifi.key -CAcreateserial -out nifi.crt -days 730
sudo keytool -import -keystore keystore.jks -file nifi.pem
sudo keytool -import -trustcacerts -alias nifi -file nifi.crt -keystore keystore.jks

sudo chown -R root:hduser /etc/security/serverKeys/*
sudo chmod 750 /etc/security/serverKeys/*

nifi.properties

cd /usr/local/nifi/conf/
nano nifi.properties

#Find "# Site to Site properties" and change the following properties to what is below

nifi.remote.input.host=
nifi.remote.input.secure=true
nifi.remote.input.socket.port=9096
nifi.remote.input.http.enabled=false

#Find "# web properties #" and change the following properties to what is below

nifi.web.http.host=
nifi.web.http.port=
nifi.web.https.host=0.0.0.0
nifi.web.https.port=9095

#Find "# security properties #" and change the following properties to what is below

nifi.security.keystore=/etc/security/serverKeys/keystore.jks
nifi.security.keystoreType=JKS
nifi.security.keystorePasswd=PASSWORD
nifi.security.keyPasswd=PASSWORD
nifi.security.truststore=/etc/security/serverKeys/truststore.jks
nifi.security.truststoreType=JKS
nifi.security.truststorePasswd=PASSWORD
nifi.security.needClientAuth=true
nifi.security.user.authorizer=managed-authorizer
nifi.security.user.login.identity.provider=kerberos-provider

#Find "# Core Properties #" and change the following properties to what is below

nifi.authorizer.configuration.file=./conf/authorizers.xml
nifi.login.identity.provider.configuration.file=./conf/login-identity-providers.xml

#Find "# kerberos #" and change the following properties to what is below

nifi.kerberos.krb5.file=/etc/krb5.conf

#Find "# kerberos service principal #" and change the following properties to what is below

nifi.kerberos.service.principal=nifi/hadoop@REALM.CA
nifi.kerberos.service.keytab.location=/etc/security/keytabs/nifi.service.keytab

#Find "# kerberos spnego principal #" and change the following properties to what is below

nifi.kerberos.spnego.principal=nifi-spnego/hadoop@REALM.CA
nifi.kerberos.spnego.keytab.location=/etc/security/keytabs/nifi-spnego.service.keytab
nifi.kerberos.spnego.authentication.expiration=12 hours

#Find "# cluster common properties (all nodes must have same values) #" and change the following properties to what is below

nifi.cluster.protocol.is.secure=true

login-identity-providers.xml

nano login-identity-providers.xml

#Find "kerberos-provider"
<provider>
	<identifier>kerberos-provider</identifier>
	<class>org.apache.nifi.kerberos.KerberosProvider</class>
	<property name="Default Realm">REALM.CA</property>
	<property name="Kerberos Config File">/etc/krb5.conf</property>
	<property name="Authentication Expiration">12 hours</property>
</provider>

authorizers.xml

nano authorizers.xml

#Find "file-provider"
<authorizer>
	<identifier>file-provider</identifier>
	<class>org.apache.nifi.authorization.FileAuthorizer</class>
	<property name="Authorizations File">./conf/authorizations.xml</property>
	<property name="Users File">./conf/users.xml</property>
	<property name="Initial Admin Identity">admin/hadoop@REALM.CA</property>
	<property name="Legacy Authorized Users File"></property>

	<property name="Node Identity 1"></property>
</authorizer>

Start Nifi

sudo service nifi start

NiFi Web Login

Issues:

  • If you get the error “No applicable policies could be found” after logging in and no GUI is shown stop the NiFi service and restart. Then you should be good.
  • If you can then login but you don’t have any policies still you will need to update “authorizations.xml” and add the below lines. Making sure to change the resource process group id to the root process group id and the user id to the user id
nano /usr/local/nifi/conf/authorizations.xml

<policy identifier="1c897e9d-3dd5-34ca-ae3d-75fb5ee3e1a5" resource="/data/process-groups/##CHANGE TO ROOT ID##" action="R">
	<user identifier="##CHANGE TO USER ID##"/>
</policy>
<policy identifier="91c64c2d-7848-371d-9d5f-db71138b152f" resource="/data/process-groups/##CHANGE TO ROOT ID##" action="W">
	<user identifier="##CHANGE TO USER ID##"/>
</policy>
<policy identifier="7aeb4d67-e2e1-3a3e-a8fa-94576f35539e" resource="/process-groups/##CHANGE TO ROOT ID##" action="R">
	<user identifier="##CHANGE TO USER ID##"/>
</policy>
<policy identifier="f5b620e0-b094-3f70-9542-dd6920ad5bd9" resource="/process-groups/##CHANGE TO ROOT ID##" action="W">
	<user identifier="##CHANGE TO USER ID##"/>
</policy>

References

https://community.hortonworks.com/articles/34147/nifi-security-user-authentication-with-kerberos.html

https://community.hortonworks.com/content/supportkb/151106/nifi-how-to-create-your-own-certs-for-securing-nif.html

NiFi: Rest API

NiFi has a bunch of Rest API’s that you can use. They are located here.

They are very comprehensive. The only thing that I would say is missing is getting the root process group of NiFi. It is not documented what the api call would be. All api calls must be authenticated as well.

The api call to get the root process group called “NiFi Flow” which is the main process group is.

https://lcoalhost/nifi-api/process-groups/root

NiFi: Custom Processor

The following tutorial shows you how to create a custom nifi processor.

Create Project:

  1. Install Maven
  2. Create a folder called “nifi”
  3. navigate into “nifi” folder and run
    mvn archetype:generate -DarchetypeGroupId=org.apache.nifi -DarchetypeArtifactId=nifi-processor-bundle-archetype -DarchetypeVersion=1.0.0 -DnifiVersion=1.0.0
  4. Put in your “groupId” when it asks.
    1. I used “com.test”
  5. Put in your “artifactId” when it asks.
    1. I used “processor”
  6. You can accept the default “version”.
  7. Put in your “artifactBaseName” when it asks.
    1. I used “MyProcessor”
  8. Once it completes you can import the maven project into Eclipse.
  9. You will get two projects
    1. nar
    2. processor
  10. You should then have two files like below created.

MyProcessor.java:

package com.test.processors;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

import java.util.*;

@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class MyProcessor extends AbstractProcessor {

    public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
            .Builder().name("My Property")
            .description("Example Property")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

    public static final Relationship MY_RELATIONSHIP = new Relationship.Builder()
            .name("my_relationship")
            .description("Example relationship")
            .build();

    private List descriptors;

    private Set relationships;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        final List descriptors = new ArrayList();
        descriptors.add(MY_PROPERTY);
        this.descriptors = Collections.unmodifiableList(descriptors);

        final Set relationships = new HashSet();
        relationships.add(MY_RELATIONSHIP);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    public Set getRelationships() {
        return this.relationships;
    }

    @Override
    public final List getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {

    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if ( flowFile == null ) {
            return;
        }
        // TODO implement
        session.transfer(flowFile, MY_RELATIONSHIP);
    }
}

MyProcessorTest.java:

This is the unit test for nifi.

package com.test.processors;

import static org.junit.Assert.*;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.List;

import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;

public class MyProcessorTest {
    private TestRunner testRunner;

    @Before
    public void init() {
        testRunner = TestRunners.newTestRunner(MyProcessor.class);
    }

    @Test
    public void testProcessor() {
    	final InputStream content = new ByteArrayInputStream(new byte[0]);
    	testRunner.setProperty("My Property", "test");
        testRunner.enqueue(content);
        testRunner.run(1);
        testRunner.assertQueueEmpty();
        
        final List results = testRunner.getFlowFilesForRelationship(MyProcessor.MY_RELATIONSHIP);
        assertTrue("1 match", results.size() == 1);
    }
}

Optional:

Nar Directory:

You can create a custom nar directory to deploy your custom nifi processors to. You can either use the nifi/lib directory or specify your own. To specify your own edit the “nifi.properties” file.

cd /nifi/conf/
nano nifi.properties

Look for “nifi.nar.library.directory.”.
Add the following: nifi.nar.library.directory.anyname=/your/directory/

 

NiFi Installation (Basic)

In this tutorial I will guide you through installing NiFi on Ubuntu 16.04 and setting to run as a service. We will assume you have a user called “hduser”.

Install Java 8

sudo apt-get install openjdk-8-jdk

Install NiFi

wget http://mirror.dsrg.utoronto.ca/apache/nifi/1.8.0/nifi-1.8.0-bin.tar.gz
tar -xzf nifi-1.8.0-bin.tar.gz
sudo mv nifi-1.8.0/ /usr/local/nifi

Set Ownership:

 sudo chown -R hduser:hduser /usr/local/nifi

Setup .bashrc:

 sudo nano ~/.bashrc

Add the following to the end of the file.

#NIFI VARIABLES START
export NIFI_HOME=/usr/local/nifi
export NIFI_CONF_DIR=/usr/local/nifi/conf
export PATH=$PATH:$NIFI_HOME/bin
#NIFI VARIABLES STOP

 source ~/.bashrc

Install NiFi As Service

cd /usr/local/nifi/bin
sudo ./nifi.sh install
reboot

Start/Stop/Status Service

sudo service nifi start
sudo service nifi stop
sudo service nifi status

Your site is now available http://localhost:8080/nifi

Uninstall

sudo rm /etc/rc2.d/S65nifi
sudo rm /etc/init.d/nifi
sudo rm /etc/rc2.d/K65nifi

sudo rm -R /usr/local/nifi/