EventBridge Pipes is one of the most interesting releases of Reinvent 2022. It allows a point-to-point integration between AWS services with possibility of filtering, enrichment and transforming data. This article will show a transition from how you would integrate a SQS with a Step Function before and after the EventBridge Pipes release.
The BEFORE approach
A message sent from a SQS queue will be received by a Step Function via a Lambda function, acting as a glue. Let’s see the steps to achieve before the EventBridge pipes.
1. Create the SQS queue
This step involves creating a Standard queue, as input of the flow:
2. Create a Step Function
For demo purpose we will use the default HelloWorld blueprint, so we can verify how the payload looks when received from its invoker:
3. Create the Glue Lambda
To integrate SQS with a Step Function, we will create a Lambda function. Code is below:
- Receives the SQS record containing the JSON payload
- It passes the payload to start the execution of the Step Function via SDK
- Requires an IAM policy associated to the function role in order to perform the above
4. Associate a SQS Trigger to the Lambda
To associate the function with an SQS message I will create a trigger:
The trigger requires the following IAM permissions to be associated with the Lambda role: ReceiveMessage, DeleteMessage and GetQueueAttributes.
The flow is now completed and upon sending a JSON as SQS message, the Step function is correctly executed via the Lambda function.
Received payload looks like below:
5. Cleanup Trigger
For demo purposes, now remove the SQS trigger so that we see how above is simplified with EventBridge Pipes.
The AFTER approach
1. Create an EventBridge Pipe
A pipe consists of different steps:
- Source, where the event originate from.
- Filtering, allow/block events based on pattern matching rules.
- Enrichment, updates the event payload with additional information.
- Target, where the event end up to.
For Simplicity let’s first setup:
- Source = Created SQS queue
- Target = Created Step function
Once saved, wait until the Pipe state changes from Updating to Running. Then, send a payload from SQS and it will land the Step function without using the Lambda.
Easy, quick, clean. As you noticed thought, the payload received by the Step function is not exactly the same as before:
Let’s change that with a target transformation.
2. Add a Target Transformation
Edit the Pipe and select the Target. Scroll down to the transformation section and add below code:
As seen below the UI allows you to preview how a payload will be transformed depending on the rule applied:
Once updated the pipe, the Step function, upon a SQS message, will receive a clean input:
Below you can see the transformed payload:
3. Apply filters
Filters allow/block received input events to be forwarded to their destination.
To allow only event where payload type reflects seaters with a prefix, you can apply below EventBridge filter:
Below you can see how UI allows to test the event pattern to understand if the event will be fitters or not.
Update the Pipe and wait.
Acceptable payloads prefixed with “turtle” type, will result in a successful execution of the Step function. On the other end, events with not-matching types such as “smallTurtleMarine” will be blocked and will live in the event bus until EventBridge removes them:
4. Enrich the data
Enrichment, the last EventBridge Pipe feature, allows to enrich the event payload with additional data coming from other sources, such as databases, Rest Apis. I will use a lambda function to enrich the event on-the-fly.
- Create a lambda with below code, which will add a new field, origin, to the payload.
- Associate the above Lambda function by adding an Enrichment step.
- As shown above, this will popup a message as the execution role pipe needs to be allowed to invoke the lambda function.
You can either toggle the “I have updated the execution role of the pipe” box or repeat the previous step again once the role is update. Either way, let’s update the role.
- To add the appropriate permission to the pipe role execution you have to add an InvokeLambda permission:
Once pipe is updated and a message is sent to the queue, you can see that received input from the Step function will look like this:
Full flow can be seen here:
EventBridge Pipes is an AWS effort to simplify and streamline service integration and adopt event driven architecture patterns, as the world is asynchronous.
With before approach you needed to:
- Know IAM policies setup
- Learn SDK and business logic
- Maintain the function via tests
- Setting trigger
With after approach:
- IAM policies are taken care of
- Get a clean powerful UI to organize events in just few steps.
Hope this was helpful and that will inspire you for more integrations. Stay well!