AWS: Java Post to Kinesis Queue

This entry is part 4 of 5 in the series AWS & Java

Posting to an AWS Kinesis Queue is rather simple and straight forward. As always you should refer to AWS Documentation.

Put Multiple Records On Queue

Import the following

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.amazonaws.services.kinesis.model.Record;

Put Records

AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard().withRegion(myRegion).withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(myAccessKeyId, mySecretKey)));
AmazonKinesis kinesisClient = clientBuilder.build();
PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
putRecordsRequest.setStreamName(myQueue);
List putRecordsRequestEntryList  = new ArrayList<>(); 


//You can put multiple entries at once if you wanted to
PutRecordsRequestEntry putRecordsRequestEntry  = new PutRecordsRequestEntry();
putRecordsRequestEntry.setData(ByteBuffer.wrap(myData));
putRecordsRequestEntry.setPartitionKey(myKey);
putRecordsRequestEntryList.add(putRecordsRequestEntry);


putRecordsRequest.setRecords(putRecordsRequestEntryList);
PutRecordsResult putResult = kinesisClient.putRecords(putRecordsRequest);

Put Single Record On Queue

Import the following

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.Record;

Put Record

AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard().withRegion(myRegion).withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(myAccessKeyId, mySecretKey)));
AmazonKinesis kinesisClient = clientBuilder.build();
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(myQueue);

putRecordRequest.setData(ByteBuffer.wrap(data.getBytes("UTF-8")));
putRecordRequest.setPartitionKey(myKey);

PutRecordResult putResult = kinesisClient.putRecord(putRecordRequest);

You now have put a record(s) onto the queue congratulations!

AWS: Java Kinesis Lambda Handler

This entry is part 2 of 5 in the series AWS & Java

If you want to write a Lambda for AWS in Java that connects to a Kinesis Stream. You need to have the handler.

Maven:

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk</artifactId>
    <version>1.11.109</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.7.1</version>
</dependency>

This is the method that AWS Lambda will call. It will look similar to the one below.

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord;
import com.amazonaws.services.kinesis.model.Record;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.JsonNode;

public void kinesisRecordHandler(KinesisEvent kinesisEvent, Context context) {
	final String awsRequestId = context.getAwsRequestId();
	final int memoryLimitMb = context.getMemoryLimitInMB();
	final int remainingTimeInMillis = context.getRemainingTimeInMillis();

	for (final KinesisEventRecord kinesisRec : kinesisEvent.getRecords()) {
		final Record record = kinesisRec.getKinesis();

		//We get the kinesis data information
		final JsonNode recData = new ObjectMapper().readValue(record.getData().array(), JsonNode.class);

		final String bucketName = recData.get("bucket").asText();
		final String key = recData.get("key").asText();
	}
}

The thing to note when you setup you Lambda is how to setup the “Handler” field in the “Configuration” section on AWS. It is in the format “##PACKAGE##.##CLASS##::##METHOD##”.

AWS: Node Kinesis Stream

This entry is part 2 of 2 in the series AWS & Node

If you haven’t already done so please refer to the AWS Node Setup tutorial as part of this series. In this tutorial we will just put something on the Kinesis queue.

We will utilise the AWS variable we created during the setup as part of this series.

First we need to create the variable that connects to our Kinesis in our region.

var kinesis = new AWS.Kinesis({region : ##REGION##});

Next we need to setup a record to send on the Kinesis stream. This will contain our data, key and the stream name.

var recordParams = {
	Data: ##DATA##,
	PartitionKey: ##FILENAME_OR_ID##,
	StreamName: ##STREAM_NAME##
};

Next we need to put the record onto the stream. This is a very basic implementation. Feel free to expand as you need to.

kinesis.putRecord(recordParams, function(err, data) {
	if (err) {
		console.error(err);
	}
	else {
		console.log("done");
	}
});

AWS: Python Kinesis Streams

This entry is part 2 of 3 in the series AWS & Python

If you haven’t already done so please refer to the AWS setup section which is part of this series. As time goes on I will continually update this section.

To put something on the Kinesis Stream you need to utilise the “connection_kinesis” you setup already in the previous tutorial on setting up the connection. You will need to set the partition key, data and stream.

response = connection_kinesis.put_record(StreamName=##KINESIS_STREAM##, Data=##DATA##, PartitionKey=##FILE_NAME##)

Depending on your data you may need to utf8 encode. For example below.

bytearray(##MY_DATA##, 'utf8')

To read from the kinesis stream you need to setup the shard iterator then retrieve the data from the stream. Not forgetting to grab the new shard iterator from the returned records. Remember to not query against the queue to fast.

#shardId-000000000000 is the format of the stream
shard_it = connection_kinesis.get_shard_iterator(StreamName=##KINESIS_STREAM##, ShardId='shardId-000000000000', ShardIteratorType='LATEST')["ShardIterator"]

recs = connection_kinesis.get_records(ShardIterator=shard_it, Limit=1)

#This is the new shard iterator returned after queueing the data from the stream.
shard_it = out["NextShardIterator"]

 

AWS: Python Setup

This entry is part 1 of 3 in the series AWS & Python

When you want to work with S3 or a Kinesis Stream we first need to setup the connection. At the time of this writing I am using boto3 version 1.3.1.

Next we need to import the package.

import boto3

Next we setup the session and specify what profile we will be using.

profile = boto3.session.Session(profile_name='prod')

The profile name comes from the “credentials” file. You can set the environment variable “AWS_SHARED_CREDENTIALS_FILE” to specify what credentials file to use. You can setup the credentials file like below. You can change the “local” to anything you want. I normally use “stage”, “dev” or “prod”.

[local]
aws_access_key_id=##KEY_ID##
aws_secret_access_key=##SECRET_ACCESS_KEY##
region=##REGION##

Next we need to setup the connection to S3. To do this we will need to use the profile we created above.

connection_s3 = profile.resource('s3')

If we want to also use a Kinesis stream then we need to setup the connection. To do this we will need the profile we created above.

connection_kinesis = profile.client('kinesis')