January 05, 2022

Serverless Data Wrapper with Node + SQS + Lambda

Anyone that have worked with data from external APIs may have faced some limitations, such as request limit, server overload, connection problems and many other errors that may appear. The solution might be simple: a worker that fetches data from time to time and updates on a local database.

As much as this is a quite common solution for this situation, there are many things that we have to worry when building this kind of worker: memory consumption, server infrastructure and, most importantly, scalability. As much simple a worker that fetches data from an API can be, there are many edge cases where a bad architecture can fall in.

Recently, I have had (once again) the necessity of creating a worker to fetch data from an API. But, this time, I decided to spend some time working on a good way to create this project. After some research, and many experiments, I have reached to a reasonable achitecture. My solution for this problem uses a serveless Node JS application, applied to an AWS Lambda Function, executed every week, storing execution messages into an AWS SQS. Also, all data is stored into a MongoDB Database, created on a Digital Ocean instance (lowest MongoDB instance price I have found).

This post shows the definition of the problem, following the proposal of my solution, and the step by step to create the solution itself. Also, I will explain some definitions that may help anyone that never worked with any of these technologies.


Problem Definition

Suppose the case that you need to create an application that contains data of all video-games available. Users are going to navigate in your application, searching for video-games, starring the ones that they have, and adding their review. If you don't have these data in your database, the most common solution is searching for some API that contains the data that you need.

What is an API?

Many APIs limits the usage of request by user. This means that you can only make an specific number of requests in an specific amount of time (10 requests per second, for example). If you apply API calls directly from your application (for example, any time a user searches for a game), specially in a large scale application, you will receive a lot of failed requests. Also, you may find the problem where the API is unavailable by any reasons, making your own application unavailable too.

So, what if instead of applying the API request directly from our application, we create a background application that makes requests to retrieve data from time to time, and store into our application database? This would allow us to use the data directly from the database, and would solve the problem of having n requests to constant requests. In these situations, we usually use the Master-Worker Pattern (or just worker), allowing us to process data in parallel.

What is the Worker pattern?

The thing about workers is that, usually, they are created in the same application. This creates a problem where we need to worry about memory consumption and runtime errors. The solution for that may be simple as creating an external infrastructure to run this single worker, but having a single server to run an application that is executed on specific times may be expensive (and really unnecessary).

Solution Proposal

Considering all requirements and limitations, my proposal for this situation goes for a serverless application, as a lambda function, that is executed from time to time, fetching data from the API and storing into our project database. This would allow us to avoid worrying about errors in the same infrastructure, and still have our application running in parallel.

What is serverless?

This lambda function can be executed from time to time, specified in a cron expression. Also, most of APIs uses pagination to avoid larger requests. To allow us to make requests based on pagination, and keep the requests per time ratio, my proposal also includes the usage of a queue, storing messages with the page parameter. We can configure to dequeue each message at the amount of time we need to wait for the next request.

What is a queue?

The following image shows a larger view of this architecture proposal:

Worker Architecture

Project Initialization

The following sections includes the step by step to implement the proposed architecture. For this implementation, the RAWG`s API was chosen as it seems to be the best gaming API available. It has 500.000+ games, and many cool data for usage. As their authentication is simple, we just need an API Key, available after registering an user on their website and providing info about the application you are going to build.

The worker can be developed into any desired language. In this case, we are going to use Node JS. A cool thing about Node is that, as many other frameworks that contains a package manager, you can find a lot of useful packages that improves development time. A cool package that will help us in this project is named serverless, which creates Lambda Functions automatically by using YAML syntax. It is pretty much convenient to deploy our code with a single command line.

npm install -g serverless

Before using the serverless command, don't forget to set up the AWS CLI on your local machine. It will allow you to use AWS commands for quick set up of your environment. Also, make sure you have configured your AWS Access Keys locally (this guide will help you with this). Then, you can use the serverless command to create a boilerplate project.

serverless

Fill in all required info, and check if the project is created correctly.

Serverless Settings

Before deploying the code, make sure you have everything configured correctly on serverless.yml. Also, make sure to check if the region parameter is added, and has the desired region.

service: teste

frameworkVersion: '2 || 3'

provider:
    name: aws
    runtime: nodejs12.x
    lambdaHashingVersion: 20201221
    region: sa-east-1

functions:
    hello:
    handler: handler.hello

To deploy the application, use the serverless deploy command.

serverless deploy

If everything went fine, you will be able to see your function created on AWS inteface (or using the aws lambda list-functions if you have the AWS CLI instaled).

Function should appear created in AWS interface

It is always good to test the function and see if its working. AWS Lambda interface provides a way to test the function using the orange Test button. Create a test event, and then click on the Test button again. You should receive a response displaying the execution results.

Execution results

Making the API Request

Back to the worker, the first thing we need to code is the request to the RAWG API. To do that, we are going to use the request package from npm. We also need to initialize npm on our project before installing the request package.

npm init
npm install request

Also, we are going to need the RAWG API key. It is always good to store this information on environment variables, and serverless allows to create these variables directly on serverless.yml file, inside the function structure.

functions:
    hello:
        handler: handler.hello
        environment:
            RAWG_API_KEY: xxxxxxx

On handler.js file, import the request package, and, inside the execution function, add the code to make a request to the API, adding the API key as a parameter. A good thing to notice is that AWS Lambda can deal with promises as a return value, so it should be good to return a promise, handling the request result inside it.

'use strict';
const request = require('request');

module.exports.hello = async (event) => {
    return new Promise((resolve, reject) => {
        request(`https://api.rawg.io/api/games?key=${process.env.RAWG_API_KEY}`, function(error, response, body) {
            if(error) {
                reject(error);
                return;
            }

            resolve(body);
        });
    });
};

Deploy and test it. You should receive the API response as result.

Queue Configuration

Now that we have our function making requests to the API, we should deal how we are going to navigate through pages to fetch all available data. As defined before, we need to create a queue that will store messages to execute the function with the page parameter. Everytime our function makes a request, it will check if there is more data available. If so, it should send a message to our queue, with the page incremented by one.

Our first step is to create the queue on AWS SQS. Using either the web interface or AWS CLI, create a queue that uses the FIFO settings. Don't forget to include .fifo in the end of the name (that's a requirement for FIFO queues on AWS).

Queue creation

Also, it should be good to set the interval between message dequeue here. ON Delivery delay field, add the desired amount of time you want (1 minute should be enough). All other settings should be left as default.

Change the delivery delay to the desired amount of time

After the queue is created, click on it, and open the Lambda triggers tab. Then, click on Configure Lambda function trigger button. Select your lambda function on the dropdown, and click on save.

Select the desired lambda function to be executed

The next step is to modify our worker to receive the page parameter and send messages to the queue when there is more data to fetch. To do that, we first need to install the AWS SDK package from npm, and import its reference on handler.js file.

npm install aws-sdk
const AWS = require('aws-sdk');

First, we need to receive the page parameter from the message. SQS store message data into an array named Records from the received event. In our case, we will also be sending messages in a JSON format, so it is important to parse it. Also, parsing strings into JSON can throw errors on Javascript, so it is always good to add a try catch block. In our case, if there is any error, we will return the page 1, as it is the first page to make the request to the API.

Create a function that receives the message event, and try to parse the message (if it exists).

function tryParsePage(event) {
    if(event && 
    event.Records && 
    event.Records.length > 0 && 
    event.Records[0].body) {
        try{
            let parsedMessage = JSON.parse(event.Records[0].body);
            if(parsedMessage && parsedMessage.page) {
                return '' + parsedMessage.page;
            }

            return '1';
        } catch (e) {return '1';}        
    }
}

Now we should handle the message submission to AWS SQS. First, we need to copy the queue URL from AWS (located on the queue details), and store it on the environment variables from serverless.yml. Also, it should be good to add another parameter named message group as well, with any desired string (in lowercase).

environment:
    RAWG_API_KEY: xxxxxxx
    AWS_QUEUE_URL: xxxxxxx
    AWS_MESSAGE_GROUP: xxxxxxx

Then, create a function that sends new messages to the queue, receiving the page as parameter. To send a message to the queue, we just need to use AWS.SQS from the AWS SDK, and call the sendMessage method, passing an object with 4 parameters: MessageBody, QueueUrl, MessageGroupId and MessageDeduplicationId. Both MessageGroupId and MessageDeduplicationId should be unique, so we are using a concatenation of message group id and page.

function sendMessage(page) {
    const sqs = new AWS.SQS();

    return sqs.sendMessage({
        MessageBody: JSON.stringify({page: parseInt(page) + 1}),
        QueueUrl: `${process.env.AWS_QUEUE_URL}`,
        MessageGroupId: `${process.env.AWS_MESSAGE_GROUP}_${page}`,
        MessageDeduplicationId: `${process.env.AWS_MESSAGE_GROUP}_${page}`
    }).promise();
}

Finally, call this methods inside the lambda execution function. Add the page parameter to the API URL, and, after the request execution, parse the body response to check if there is a next URL. If so, we should send a new message to the queue.

module.exports.hello = async (event) => {
    return new Promise((resolve, reject) => {
        let page = tryParsePage(event);    

        request(`https://api.rawg.io/api/games?key=${process.env.RAWG_API_KEY}&ordering=-updated&page=${page}&page_size=100`, function(error, response, body) {
            if(error) {
                reject(error);
                return;
            }

            let parsedResponse = JSON.parse(body);
            let games = parsedResponse.results;

            if(parsedResponse.next) {
                sendMessage(page).then(() => {
                    resolve();
                }).catch(err => {
                    reject(err);
                });        
            } else {
                resolve();
            }      
        });
    });
};

Before deploying, it is a good thing to add some logs for testing purposes. During the execution of the function, you won't be able to see the results from the function, but you can always count with Amazon CloudWatch to check for the logs. Any console.log added to the code will print an entry on the CloudWatch logs when the function is executed.

Now you can deploy and test to see if the application works as expected. A good way to test is to send a message to the queue from the AWS interface. Just enter on the desired queue, and click on "Send new message". Then, enter the message body, and wait to see if the logs appears on CloudWatch.

Database Connection

Now that we have our main flow working, it is time to start to add the data to your project database. It is not the focus of this post to teach how to configure a database on your project (once it is equal to any other web project you may have made), but I will share how I managed to work with the database, and some of the problems I have faced.

Firstly, I decided to use MongoDB just for personal taste. Node has great packages that allows to manage a MongoDB connection, but the one that I like most is mongoose. Is allows working with promises, and contains many helpful things like schema, validators, middlewares and more.

Also, I created my DB on Digital Ocean. It is the cheapest DB with the minimum required I had that I have found. For U$ 15/mo, you can have a dedicated cluster with 1 Gb space, and a good security configuration. Of course that you can always create your own Cloud instance and install MongoDB, but I would really recommend to use a previous configured instance (unless you are the master of devops).

The thing is that I had a lot of problems to connect with this database due all security settings my DB instance had. The first problem is the IP whitelist, allowing only the specified IPs to connect to that database. You need to associate a static IP to Lambda so you can whitelist this IP to the Db. To do it, there is this excelent guide that will help you to configure (and I strongly recommend doing it).

You will also need to send your database certificate to connect to the DB. If you are using mongoose, put the certificate file in the root of the project, and use the following options on connect method:

{
    ssl: true,
    sslValidate: true,
    sslCA: __dirname + "/cert.pem"
}

Once you are connected, you can use the upsert method to insert or update the entry in your database.


There are many small improvements we can apply to this worker, such as retrieving only the updated games (filtered by updated parameter on the query string), or working with two different databases (one only to insert and another only for retrieval). But if you have any suggestion, or wants to share a different approach, fell free to comment below!

References

[1] https://www.ibm.com/topics/api

[2] https://docs.gigaspaces.com/sbp/master-worker-pattern.html

[3] https://serverless-stack.com/chapters/what-is-serverless.html

[4] https://aws.amazon.com/message-queue/