In my last post I implemented an AWS IoT scenario that used an IoT rule to write to DynamoDB.
Here we change that rule to one that instead uses a Python Lambda function to write to a database on an Amazon RDS for PostgreSQL instance.
So the scenario is this:
The overall steps to be implemented are:
1. Create a PostgreSQL RDS instance
2. Create a database and table for that instance
3. Create a Python function that writes inbound sensor data to the database
4. Import the function into Lambda
5. Add asynchronous error handling to the Lambda function
6. Create an IoT rule that calls the Lambda function
We can use the AWS Console to create a PostgreSQL 11.5-R1 instance, and where for the purposes of this example it is set up to be publicly accessible. This allows it to be easily managed via pgAdmin from a local PC. The steps to do this are detailed here. (Note that for Step 10, the only open inbound TCP port required for PostgreSQL is 5432, and the source IP address should be set to that of the local PC running pgAdmin. For Step 11 no outbound rules are required for this example.)
Steps 13-16 in the link just given show how to add the new RDS instance to pgAdmin and how to create an initial database. Now we have that, we need a table to write our inbound IoT sensor data into. The table can be created within pgAdmin using the following SQL – the columns and data types match the sensor data generated by the Python IoT sensor emulation script in my last post:
CREATE TABLE public.”SensorData”
“DeviceID” integer NOT NULL,
“DateTime” timestamp without time zone NOT NULL,
“Temperature” integer NOT NULL,
“Humidity” integer NOT NULL,
“WindDirection” integer NOT NULL,
“WindIntensity” integer NOT NULL,
“RainHeight” integer NOT NULL,
CONSTRAINT “SensorData_pkey” PRIMARY KEY (“DeviceID”, “DateTime”)
OIDS = FALSE
The Python 3 Lambda function used to write inbound IoT sensor data to the PostgreSQL database is in my GitHub respository as db_insert.py and is shown below:
Sensor data is passed into the handler via the event argument as a Python dictionary structure, which is then interrogated within the SQL insert statement. The popular psycopg2 PostgreSQL package for Python is used to interact with the database.
The handler makes a database connection each time it is called. This is not a suitable approach for high frequency calls to the function, where connection pooling should be used instead. The recommended approach for doing this now is to use AWS RDS Proxy, where the PostgreSQL preview of this was announced for selected AWS regions just days before this post was written (see here). Once this service is available in production, the Python code above remains the same – it’s simply that the host endpoint used in the connect() call changes from an RDS instance endpoint to an RDS proxy endpoint.
To avoid over-complicating this post, the database password in the script above was stored in an environment variable. This obviously isn’t a safe approach for production, so another technique is required. See here for a blog post from the AWS Security team that describes how to use the AWS Secrets Manager within Python Lambda functions for securing RDS database passwords.
I’m using the AWS console to create an empty Lambda function called insert_sensor_data, configuring it to use the Python 3.6 runtime and to have a handler called db_insert.handler. I’ve also assigned it a default AWS generated IAM execution role, which gives the function basic Lambda permissions. (The default policy name is AWSLambdaBasicExecutionRole, which gives Lambda write permissions to CloudWatch logs.)
Because the function includes a library (psycopg2) other than the AWS SDK (boto 3), a deployment package must be created that includes both the function and its library. This can then be uploaded into Lambda.
I’m using Red Hat Enterprise Linux 8, running Python 3.6.8. From the command line, in the directory that the Python function exists, we can execute:
pip3 install –target ./package psycopg2-binary
This installs the standalone psycopg2 binary package to a subdirectory called package. For the deployment package, a zip file must now be created that includes both the Python db_insert.py function and the package, where the function is at the same directory level in the zip as the package. From within the package directory we execute:
zip -r9 ../function.zip .
and then go up a level to execute:
zip -g function.zip db_insert.py
The deployment package can now be uploaded to Lambda:
aws lambda update-function-code –function-name insert_sensor_data –zip-file fileb://function.zip
The structure of the deployment package can be seen in Lambda via the AWS Console as follows:
After using AWS Console to set the 5 environment variables used by the function (where the values for rds_host and rds_port can be found in the “Connectivity & Security” tab of the instance in the AWS Console for RDS), a Lambda test event can be set up, e.g.:
“datetime”: “2020-04-06 13:25:12”,
Running a test using this within AWS Console is successful.
Note that the Lambda Python 3.6 runtime was selected in the Lambda function creation above. Use of the 3.7 or 3.8 runtimes resulted in an execution error: “Unable to import module ‘db_insert’: No module namc’psycopg2._psycopg'”. Other developers have noted this also for Lambda Python runtimes later than 3.6 with functions that use psycopg2.
Our Lambda function will be called asynchronously by the IoT rules engine, meaning that Lambda will place the invocation request into a queue for the function and will immediately return a success response to the caller. If function execution is throttled or results in a function error, Lambda will attempt to retry the function with the same event data, subject to the function’s settings for “Maximum age of event” and “Retry attempts”. For this example I’ve left these settings at their defaults of 6 (hours) and 2 (retries).
When an invocation event exceeds the maximum age or fails all retry attempts, Lambda discards it. In order to troubleshoot failed executions, an on-failure event destination should be configured (these are recommended now over Dead Letter Queues – see
here – since they include more execution context information and support more destination services).
With destinations, asynchronous function results can be routed as an execution record to a destination resource without writing additional code. An execution record contains details about the request and response in JSON format including version, timestamp, request context, request payload, response context, and response payload.
Prior to creating a destination the IAM execution role used by the Lambda function must be updated to include write permissions to the destination service being used. I’m using an SQS queue, so the policy I add to my role is:
An SQS on-failure destination can now be created by following steps given here.
To test this destination I changed the PostgreSQL password to be incorrect. Test data submitted to the function then resulted in execution records in the SQS on-failure queue such as this:
The message payload can be seen there, together with the error that occurred.
We can now disable the IoT Core rule created in my last post and create a new rule instead:
The select statement above filters rule input to MQTT topics with the name sensor/data, and the Lambda function name has been set to the function that we created in the previous step. During rule creation a resource-based policy is added to the Lambda function, which allows the iot.amazonaws.com service to execute it.
When the IoT Sensor emulation script from my last post is then run, sensor data can now be seen (via pgAdmin) in the SensorData table that we created: