This article is part of a two-part series covering how to build a serverless data pipeline with Kinesis, Lambda, and AWS S3 Glacier. You can find the first article here.
In this article, we will be covering how to configure your Lambda function and build an S3 bucket with a lifecycle policy to push items older than 30 days to AWS S3 Glacier.
Configuring the Lambda
Like I’ve mentioned previously, the actual functionality of your Lambda will vary greatly depending on the application. In this case, I’m going to set up the Kinesis Stream to send data over in batches of 50 data points and find the average value of that batch.
First, we’ll need to create a trigger event for our Lambda. In resources. Resources, create another object of Type: AWS::Lambda::EventSourceMapping. I’ve called mine “Event”. It’s going to have several properties:
- BatchSize - the number of records sent in a single batch
- EventSourceArn - define the trigger
- FunctionName - define the function that will be triggered
- StartingPosition - The position in a stream from which to start reading.
In all, it should look like this:
With that, batches of 50 records will be sent over to our Lambda as the data comes in - this is the general format of the object that the Lambda will receive:
Note that your data may come through as base64 encoded
Save the record in S3
With your records coming through, once you alter them or analyze them in the way your app requires, you’ll need to save them somewhere. In this case, we’re going to send them to an S3 bucket. To do this we’ll need to add an S3 resource to our serverless.yml. Naming the bucket is optional, as CloudFormation will come up with a name for you if you don’t specify one. It should look like this:
We’re also going to need to update the environment variables for the Lambda so that it has access to the bucket name. In addition to this, we also need to add to our iamRoleStatements to give the Lambda access to S3. Adding environment variables is as easy as adding an environment attribute to our stack and adding in the variable names we need. It will look like this:
For BUCKET_NAME, we are using a logical reference to the bucket that we just defined in resources.
Adding to the iamRoleStatements for S3 isn’t necessarily complicated, but it does not follow previously established patterns, making it a little more complicated than most role statements. The trick is allowing access inside the bucket as opposed to just the bucket itself. To do this we need to define the resource a little differently. Take a look:
You’ll notice that the resource is just a join function. Join functions take an array of two, the first value being the delimiter, and the second being an array of strings to be joined. The tricky bit is the “/*” at the very end of the ARN - it’s what allows us to access the contents of the bucket and make changes.
Finally, with the environment variables added and the role statements configured, we can add a function to our Lambda that uploads data to the S3 bucket. The general structure is relatively straightforward if you are using the AWS-SDK. It just takes a params object and a callback function.
To break down the params object:
- Bucket: the name of the bucket we defined in our serverless.yml
- ContentType: the MIME data type. Here is a good reference if you need it.
- Key: A unique name for the object that will be created in S3. In this case its the timestamp from when the record was created.
- Body: The body of the record that will be stored in S3
Transitioning to Glacier
Regardless of how frequently you create new records in S3, sooner or later you’re going to be sitting on a lot of data, and it might be pertinent to add some lifecycle events to your S3 bucket. Adding that option is easy with the serverless framework - just add a Transition attribute to your S3 resource. It will take a storage type, and either a date in which to transition or a duration from creation. In this case, we’re going to use the duration. It’ll look like this:
With those rules set up, our data will transition to Glacier after 15 days and will be permanently deleted after 30.
Resources we made:
- S3 Bucket
How are they connected?
- The Lambda performs minor analysis on the data.
- The lambda then uploads the new data to S3.
- After 15 days, the S3 object is transferred to Glacier
- After an additional 15 days, the record is deleted.
In this blog post series, we walked through creating a simple data pipeline with Kinesis, Lambda and S3 Glacier. Serverless data pipelines can become a lot more complicated according to your needs. No matter how complicated your pipeline, it’s almost certain that you’ll use AWS Lambda in order to transform the data that you process. In order to understand what actually happened in an AWS Lambda function, you can use Thundra’s unique line-by-line tracing solution to debug and troubleshoot. You can see Thundra’s debugger features for remote debugging your functions and attain beta access to it by visiting our booth with number 627 at re:Invent.
If you want to chat on several stages of serverless with us, you can ping us over Twitter(@thundraio) or join our Slack and let’s chat. You can sign up to Thundra or see live demo to see Thundra in action.