NiFi: ExecuteSQL Processor

In this tutorial I will guide you through how to add a processor for querying a SQL table to NiFi.

For this tutorial you will need an AVRO schema called “dttest” and it’s contents are as follows.

{
     "type": "record",
     "namespace": "com.example",
     "name": "FullName",
     "fields": [
       { "name": "id", "type": "int" },
       { "name": "name", "type": "string" }
     ]
}

First we need to drag the processor onto the grid.

Next we need select the processor ExecuteSQLRecord.

Next we must configure the processor.

 

 

 

 

 

 

 

 

Now we must create the JsonRecordWriter service.

Now we name the JsonRecordWriter

Configure the JsonWriter

Next we create the DB Connection Service

Next we name the DB Connection Service

Configure the DB Service

Now validate all the settings are as below

Now you are all done. It will now query your table.

NiFi: Custom Processor

The following tutorial shows you how to create a custom nifi processor.

Create Project:

  1. Install Maven
  2. Create a folder called “nifi”
  3. navigate into “nifi” folder and run
    mvn archetype:generate -DarchetypeGroupId=org.apache.nifi -DarchetypeArtifactId=nifi-processor-bundle-archetype -DarchetypeVersion=1.0.0 -DnifiVersion=1.0.0
  4. Put in your “groupId” when it asks.
    1. I used “com.test”
  5. Put in your “artifactId” when it asks.
    1. I used “processor”
  6. You can accept the default “version”.
  7. Put in your “artifactBaseName” when it asks.
    1. I used “MyProcessor”
  8. Once it completes you can import the maven project into Eclipse.
  9. You will get two projects
    1. nar
    2. processor
  10. You should then have two files like below created.

MyProcessor.java:

package com.test.processors;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

import java.util.*;

@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class MyProcessor extends AbstractProcessor {

    public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
            .Builder().name("My Property")
            .description("Example Property")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

    public static final Relationship MY_RELATIONSHIP = new Relationship.Builder()
            .name("my_relationship")
            .description("Example relationship")
            .build();

    private List descriptors;

    private Set relationships;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        final List descriptors = new ArrayList();
        descriptors.add(MY_PROPERTY);
        this.descriptors = Collections.unmodifiableList(descriptors);

        final Set relationships = new HashSet();
        relationships.add(MY_RELATIONSHIP);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    public Set getRelationships() {
        return this.relationships;
    }

    @Override
    public final List getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {

    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if ( flowFile == null ) {
            return;
        }
        // TODO implement
        session.transfer(flowFile, MY_RELATIONSHIP);
    }
}

MyProcessorTest.java:

This is the unit test for nifi.

package com.test.processors;

import static org.junit.Assert.*;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.List;

import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;

public class MyProcessorTest {
    private TestRunner testRunner;

    @Before
    public void init() {
        testRunner = TestRunners.newTestRunner(MyProcessor.class);
    }

    @Test
    public void testProcessor() {
    	final InputStream content = new ByteArrayInputStream(new byte[0]);
    	testRunner.setProperty("My Property", "test");
        testRunner.enqueue(content);
        testRunner.run(1);
        testRunner.assertQueueEmpty();
        
        final List results = testRunner.getFlowFilesForRelationship(MyProcessor.MY_RELATIONSHIP);
        assertTrue("1 match", results.size() == 1);
    }
}

Optional:

Nar Directory:

You can create a custom nar directory to deploy your custom nifi processors to. You can either use the nifi/lib directory or specify your own. To specify your own edit the “nifi.properties” file.

cd /nifi/conf/
nano nifi.properties

Look for “nifi.nar.library.directory.”.
Add the following: nifi.nar.library.directory.anyname=/your/directory/