AWS: Python Kinesis Streams

This entry is part 2 of 3 in the series AWS & Python
(Last Updated On: )

If you haven’t already done so please refer to the AWS setup section which is part of this series. As time goes on I will continually update this section.

To put something on the Kinesis Stream you need to utilise the “connection_kinesis” you setup already in the previous tutorial on setting up the connection. You will need to set the partition key, data and stream.

response = connection_kinesis.put_record(StreamName=##KINESIS_STREAM##, Data=##DATA##, PartitionKey=##FILE_NAME##)

Depending on your data you may need to utf8 encode. For example below.

bytearray(##MY_DATA##, 'utf8')

To read from the kinesis stream you need to setup the shard iterator then retrieve the data from the stream. Not forgetting to grab the new shard iterator from the returned records. Remember to not query against the queue to fast.

#shardId-000000000000 is the format of the stream
shard_it = connection_kinesis.get_shard_iterator(StreamName=##KINESIS_STREAM##, ShardId='shardId-000000000000', ShardIteratorType='LATEST')["ShardIterator"]

recs = connection_kinesis.get_records(ShardIterator=shard_it, Limit=1)

#This is the new shard iterator returned after queueing the data from the stream.
shard_it = out["NextShardIterator"]

 

Series Navigation<< AWS: Python SetupAWS: Python S3 >>