Using Amazon SQS with AWS Lambda Functions

Andrew Smith Posted on

The IoT scenario from my last post is expanded here to use an SQS queue in between the AWS IoT Rules engine and the Lambda function used to write inbound sensor data to the PostgreSQL database.

This decouples the Rules engine and the Lambda function, allowing for flexibility and resiliency when the Lambda function or its downstream components are unavailable. It also allows for the possibility of the Lambda function receiving a batch of messages in its invocation (as opposed to just one), meaning that all can be processed against a single connection to the RDS PostgreSQL instance.

So the IoT architecture in use now is as follows:

IoTSQSLambdaSensorDiagram

Creating the Queue

I’ve created a standard SQS queue using AWS Console, with these settings:

Default Visibility Timeout: 30 seconds
Message Retention Period:    4 days
Receive Message Wait Time:  20 seconds

The Default Visibility Timeout is set to 30 seconds because the timeout for the Lambda function being executed is 4 seconds, and AWS documentation suggests a Default Visibility Timeout of approx. 6 times the Lambda function timeout. If the function successfully processes the batch it receives, all messages from the batch are deleted from the queue, else all messages reappear in the queue again after the visibility window has expired.

The Message Retention Period is set to 4 days. This isn’t too relevant in normal operation for this scenario since a Dead Letter Queue will be picking up any messages that fail to process after their retry limit has been reached, but it is relevant for times when the Lambda function is disabled for a duration and when the IoT Rules engine is still delivering messages to the queue.

The Receive Message Wait Time is set to 20 seconds. This changes the default short polling of the queue to long polling, with a maximum wait time when polling of 20 seconds. This reduces the number of false empty responses and reduces the cost associated with polling the queue. (See here for a good article on short vs long polling.)

I’ve also created a redrive policy for the queue with:

Maximum Receives: 5

which is associated with a Dead Letter Queue (DLQ).

So if a message fails to process at least 5 times it will be routed to the DLQ. Messages here can be inspected when troubleshooting.

Updating the IoT Rule

The IoT scenario being implemented here uses an IoT Rule to process messages received by IoT Core. In my last post the IoT Rule called a Lambda function which processed the inbound message into the PostgreSQL database. Here, the rule is changed so that it instead inserts the message into the queue created above:

IoTSQSLambdaRule

The select statement filters the rule to only process messages published to the sensor/data MQTT topic and then the action sends the message into the queue.

Updating the Lambda Function

In my last post, the Lambda function that writes inbound IoT sensor data to the PostgreSQL database uses the IoT Rules engine as its trigger. This trigger is now replaced with an SQS trigger that’s based on the SQS queue created above. The following setting for the SQS trigger is set:

Batch Size: 10

This allows the Lambda poller to read and deliver up to 10 messages from the SQS queue to the Lambda function in a single function invocation.

The execution role used by the Lambda function needs updating now, to include a policy with these actions:

sqs:DeleteMessage
sqs:ReceiveMessage
sqs:GetQueueAttributes

and where these actions are constrained to using a resource that is the name of the SQS queue that we’ve just created.

The Python Lambda function used in my last post needs updating to be able to handle a batch of messages rather than just one. The revised version is below and is available in my GitHub repository:

IoTSQSPython

So the loop can be seen here that processes a batch with possibly multiple messages. (Note that if this function were to be used in production, connection pooling would be used for the RDS connection – e.g. via an RDS Proxy endpoint – and the database connection password would be stored securely, for example in AWS Secrets Manager.)

The function needs to handle the situation where it’s called more than once for the same message, since SQS provides “at least once” delivery. One situation where duplicate messages can be received is when a batch of messages is being processed and some succeed and some fail. If there is at least one failure then the whole batch gets returned to the queue, to be picked up by another poller and submitted again (at least for messages where the Maximum Receives threshold has not been been reached).

This function can handle duplicate messages (i.e. it is idempotent) because when combined with the primary key constraint on the PostgreSQL table being inserted into, duplicate inserts get raised as exceptions, which are logged but otherwise ignored.

Note that the SQS destination set up for this function in my last post to capture messages that failed processing is no longer used, since it’s only used when the function is called asynchronously. The Lambda poller calls it synchronously, where failed messages will eventually be routed to the queue’s DLQ.

SQS DLQ Testing

Whilst testing the DLQ, I timed how long it took for a message sent by the simulated IoT sensor to land in the DLQ, in the case where I gave the Lambda function an incorrect password for its connection to the PostgreSQL instance.

The message sent time was 14:51:34, and from refreshing AWS Console I saw it arrive in the DLQ at approx. 14:54:07. So that’s a gap of 2m 33s. This is consistent with the 30 second visibility window set for the source queue, together with the maximum receives value of 5 that was set in the queue’s redrive policy. I.e. it the message was unavailable for 30 seconds for each of the 5 times the Lambda poller gave it to the function.

I set up a CloudWatch alarm for the DLQ to email me whenever a message arrived there. DLQ alarms must be based upon the ApproximateNumberOfMessagesVisible metric rather than the NumberOfMessagesSent metric, because the latter doesn’t get incremented by messages being sent to the DLQ as a result of failing in the original queue.