x

[eBook Presented by AWS & Thundra] Mastering Observability on the Cloud 📖 Download:

Coordinate Complex Workflows with Step Functions

Jun 18, 2020

 

2-6

What are Step Functions?

Steps Functions are primarily used to orchestrate various AWS services into serverless workflows, coordinated to achieve a high level task. The flow of execution is based on the results or information passed on from previous tasks. Simply put, the output of one step acts as an input to the next step.

Step Functions offer a graphical console to visualize the components of your application as a series of steps. They automatically trigger and track each step and retrie when there are errors so your application executes in order and as expected, every time.

The workflows are composed of tasks and states. Workflows are defined in the JSON-based Amazon States Language, allowing them to maintain state machines via configuration as code, and also visualize state machine diagrams so that they are easy to view, understand, and change as required.

When to use them?

In a decoupled micro service architecture, each service needs to be coordinated in a specific execution flow with fault tolerant strategies in place so as to achieve a defined end result. This is precisely where Step Functions can make things a lot easier to accomplish.

Some use cases (but not limited to):

  • Data processing 
    • Long running ETL jobs

  • Database access  
    • Ideal for coordinating session-based applications. 
      • Coordinate all of the steps of a checkout process on an e-commerce site 
    • Step Functions can read and write from Amazon DynamoDB as needed to manage inventory records

  • Machine learning workflows  
    • Automate the pre-processing of your data with AWS Glue 
    • create an Amazon SageMaker job to train your ML model on the data then trigger another SageMaker job to deploy your model into production for online prediction 

Let’s take a look at how they work by diving into a tutorial that transcodes image files.

Process image files from S3 using Lambda and Rekognition

image5

Media transcoding with Step Functions

The flow of the above design is like this:

  • User uploads image file to S3 bucket.
  • The upload to S3 triggers a Cloudwatch event which then begins the workflow from Step Functions.
  • Step Functions invokes a Lambda function which extracts metadata from the uploaded S3 object and stores it inside a DynamoDB table.
  • Step Functions invokes another Lambda function after a 10 second wait, which then calls the Rekognition API and obtains image data. After the API call, the existing entry for the image object is updated at the DynamoDB table and Step Functions finishes the state machine.

Let’s get a quick overview of how Step Functions work by setting up the above use case.

Overview of steps

  1. Create the following
    1. A DynamoDB table
    2. Two lambda functions
    3. An S3 bucket
  2. Create a state machine via Step Functions console.
  3. Setup a trail in Cloudtrail.
  4. Create Cloudwatch event rule.
  5. Testing

Creating and setting up the AWS resources

Go to DynamoDB via the console to create a table. Enter a table name and Primary key as filename.

image8

Navigate to the Lambda console and hit Create function. Choose Author from Scratch and setup the function with parameters shown below. Be sure to create a new role with the policies indicated here. This Lambda function will extract metadata of the image file.

image6

Paste the following code snippet and save the function.

import json
import boto3
import uuid

print('Loading function')

s3 = boto3.client('s3')
ddb = boto3.resource('dynamodb')

def lambda_handler(event,context):
    table = ddb.Table("<REPLACE WITH YOUR TABLE NAME>")
    
    # Read from state machine input
    states_input = json.dumps(event)
    get_bucket_values = json.loads(states_input)
    try:
        bucket_name = get_bucket_values["detail"]["requestParameters"]["bucketName"]
        key = get_bucket_values["detail"]["requestParameters"]["key"]
        
        # Call S3 bucket
        bucket_obj = s3.get_object(Bucket=bucket_name,Key=key)
        new_item = {
            'id': str(uuid.uuid4().hex),
            's3_bucket':  bucket_name,
            'filename': key,
            'filesize': int(bucket_obj['ContentLength']),
            'contentType': bucket_obj['ContentType'],
            'labelData':{},
            'faceData':{},
            }
        # PUT metadata to DynamoDB table
        table.put_item(Item=new_item)
        
        # Return PUT values
        return new_item
        
    except Exception as e:
        raise e


Create another Lambda function that will call the Rekognition API. Choose the existing role created with the previous Lambda function. Paste the following code snippet into this newly created Lambda function.

import boto3
import json
from decimal import Decimal

print('Loading function')

rekognition = boto3.client('rekognition')
s3 = boto3.client('s3')
ddb = boto3.resource('dynamodb')


# --------------- Helper Functions to call Rekognition APIs ------------------


def detect_faces(bucket, key):
    response = rekognition.detect_faces(Image={"S3Object": {"Bucket": bucket, "Name": key}})
    return response


def detect_labels(bucket, key):
    response = rekognition.detect_labels(Image={"S3Object": {"Bucket": bucket, "Name": key}})
    return response

# --------------- Main handler ------------------


def lambda_handler(event, context):
    '''
    Rekognition APIs to detect faces, labels and index faces in S3 Object.
    '''

    # DynamoDB table
    table = ddb.Table("<REPLACE WITH YOUR TABLE NAME>")
    
    # Get the object from S3
    states_input = json.dumps(event)
    get_input = json.loads(states_input)
    bucket_name = get_input["detail"]["requestParameters"]["bucketName"]
    key = get_input["detail"]["requestParameters"]["key"]
    try:
        # Calls rekognition DetectFaces API to detect faces in S3 object
        face_detect = detect_faces(bucket_name, key)
        faces_map = json.loads(json.dumps(face_detect), parse_float=Decimal)

        # Calls rekognition DetectLabels API to detect labels in S3 object
        label_detect = detect_labels(bucket_name, key)
        labels_map = json.loads(json.dumps(label_detect), parse_float=Decimal)
        
        # update table entry with image data
        table.update_item(
                    Key={
                            'filename': key,
                        },
                        UpdateExpression='set labelData = :label, faceData = :face',
                        ExpressionAttributeValues={
                            ':label': labels_map,
                            ':face':faces_map
                        },
                        ReturnValues="UPDATED_NEW"
                    )

        return faces_map
    except Exception as e:
        print(e)
        raise e


After creating both the functions at the IAM console, under Roles, select the role linked to these functions and attach the AmazonRekognitionFullAccess policy. Create an S3 bucket with default settings.

Setup a state machine via Step Functions console

Hit the Create state machine button on the console. With defaults as is, paste the below Amazon States Language (ASL) code snippet into code definition.

image2

Remember to replace the appropriate Lambda ARN values in the ASL code.

  
{
  "Comment": "Transcode images using AWS Step functions.",
  "StartAt": "Parallel",
  "States": {
    "Parallel": {
      "Type": "Parallel",
      "End": true,
      "Branches": [
        {
          "StartAt": "Lambda-Image metadata",
          "States": {
            "Lambda-Image metadata": {
              "Type": "Task",
              "Resource":

 "<LAMBDA ARN THAT EXTRACTS METADATA>",
              "End": true
            }
          }
        },
        {
          "StartAt": "Wait 10s",
          "States": {
            "Wait 10s": {
              "Type": "Wait",
              "Seconds": 10,
              "Next": "Lambda-Rekognition"
            },
            "Lambda-Rekognition": {
              "Type": "Task",
              "Resource": 

"<LAMBDA ARN THAT CALLS REKOGNITION API>",
              "End": true
            }
          }
        }
      ]
    }
  }
}

On refreshing the graph, the workflow is rendered reflecting the code that was placed inside the defintion. 

States can perform a variety of functions in your state machine. 

  • Do some work in your state machine (a Task state)
  • Make a choice between branches of execution (a Choice state)
  • Stop an execution with a failure or success (a Fail or Succeed state)
  • Simply pass its input to its output or inject some fixed data (a Pass state)
  • Provide a delay for a certain amount of time or until a specified time/date (a Wait state)
  • Begin parallel branches of execution (a Parallel state)
  • Dynamically iterate steps (a Map state)

We employ the use of Parallel state in our workflow. The Lambda function responsible for extracting the metadata gets executed first, and then we utilize a wait state to ensure there is enough time for storing the metadata into the DynamoDB table before calling the Rekognition API in the other Lambda function to update the same table entry with image data.

Hit Next and enter a state machine name to finish creation.

Setup CloudTrail

From the Cloudtrail console,

  • Select Trails from the left pane, click Create trail
  • Enter trail name. 
  • Scroll down to Data events. Add S3 bucket, select the S3 bucket which was created previously. This is the bucket where you will upload the image files.
  • Under Storage location, enter a bucket name. (This bucket is created to store the Cloudtrail logs)
  • Click Create.

Create CloudWatch event rule

At the Cloudwatch console, from the left pane click Rules and hit create rule. The event is your created S3 bucket, and target is the created state machine. Fill up the selections as indicated in the image below.

image4

Hit configure details and enter a rule name.

Testing the setup

Upload any image file of .png/.jpeg extension into the S3 bucket. Navigate to the Step Functions console and select your state machine. From the list of executions, select the top most entry and you will be able to view the running of the state machine (if you navigate quick enough).

image3

You can verify the execution by checking the DynamoDB table which will have the image data populated.

image1

Conclusion

We have seen how Step Functions can greatly enhance the control and coordination of your workflows. They manage the operations and underlying infrastructure for you to ensure your application is available at any scale.

The ability to view what’s happening in real time and also receive instant feedback lets you know exactly where things have gone wrong so you know where to look and fix the problem.

You can run your tasks in the AWS Cloud, on your servers, or on any system that has access to AWS. Access and use Step Functions by using the Step Functions console, the AWS SDKs, or an HTTP API. 

If you ever need to monitor your state flows with distributed tracing (I’m sure you will), check out Thundra with our highly generous free package. Thundra can provide an end-to-end visibility for step functions that will help you pinpoint the issues a lot faster than normal and save time for an additional feature or beer!