
1. Create Role for Lambda and Sagemaker Processing Job

# SageMaker processing role

resource "aws_iam_role" "sagemaker_processing_role" {
  name = "sagemaker-processing-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
        Effect = "Allow",
        Principal = {
          Service = "sagemaker.amazonaws.com"
        Action = "sts:AssumeRole"

resource "aws_iam_policy" "sagemaker_processing_policy" {
  name        = "sagemaker-processing-policy"
  description = "Policy for SageMaker Processing to access S3, ECR, and logs"

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
        Effect = "Allow",
        Action = [
        Resource = [
          # TODO: Replace with your bucket name
        Effect = "Allow",
        Action = [
        Resource = [
        Effect = "Allow",
        Action = [
        Resource = "*"

resource "aws_iam_role_policy_attachment" "sagemaker_processing_policy_attachment" {
  policy_arn = aws_iam_policy.sagemaker_processing_policy.arn
  role       = aws_iam_role.sagemaker_processing_role.name

output "sagemaker_processing_role_arn" {
  value = aws_iam_role.sagemaker_processing_role.arn

# Role for lambda function
resource "aws_iam_role" "lambda_role" {
  name = "lambda-sagemaker-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
        Action = "sts:AssumeRole",
        Effect = "Allow",
        Principal = {
          Service = "lambda.amazonaws.com"

resource "aws_iam_policy" "lambda_policy" {
  name        = "lambda-sagemaker-policy"
  description = "Policy for Lambda to invoke SageMaker and access S3"

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
        Effect = "Allow",
        Action = [
        Resource = "*"
        Effect = "Allow",
        Action = [
        Resource = [

resource "aws_iam_role_policy_attachment" "lambda_policy_attachment" {
  policy_arn = aws_iam_policy.lambda_policy.arn
  role     = aws_iam_role.lambda_role.name

# add basic execution policy to the role
resource "aws_iam_role_policy" "lambda_execution_policy" {
  name = "lambda-sagemaker-execution-policy-${var.bucket_id}"
  role = aws_iam_role.lambda_role.id

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
        Effect   = "Allow",
        Action   = [
        Resource = "*"

# IAM Policy for Lambda to Pass SageMaker Role

resource "aws_iam_policy" "pass_role_policy" {
  name = "lambda-pass-sagemaker-role-policy"

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
        Effect = "Allow",
        Action = "iam:PassRole",
        Resource = aws_iam_role.sagemaker_processing_role.arn

resource "aws_iam_role_policy_attachment" "lambda_pass_role_policy_attachment" {
  role       = aws_iam_role.lambda_role.name
  policy_arn = aws_iam_policy.pass_role_policy.arn

2. Create Lambda Function

# AWS Lambda Function
resource "aws_lambda_function" "processing_data" {
  function_name = "embedding_data"
  role          = aws_iam_role.lambda_role.arn
  handler       = "lambda_function.lambda_handler"
  runtime       = "python3.8"
  filename      = "${path.root}/src/embedding/lambda/lambda_function.zip"
  timeout       = 60
  memory_size   = 128

  environment {
    variables = {
      SAGEMAKER_ROLE_ARN = aws_iam_role.sagemaker_processing_role.arn
      S3_BUCKET_NAME     = var.bucket_id
      S3_OUTPUT_PREFIX   = "embeddings/"


  depends_on = [

resource "aws_lambda_permission" "allow_ecs" {
  statement_id  = "AllowECSTaskInvoke"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.processing_data.arn
  principal     = "ecs.amazonaws.com"

3. Create S3 code folder for SageMaker Processing Job

resource "aws_s3_bucket_object" "script" {
  bucket = var.bucket_id
  key    = "code/script.py"
  source = "${path.root}/src/embedding/sagemaker-processing/script.py"

resource "aws_s3_bucket_object" "requirements" {
  bucket = var.bucket_id
  key    = "code/requirements.txt"
  source = "${path.root}/src/embedding/sagemaker-processing/requirements.txt"

4. Code for Lambda Function

  • Create a folder containing the lambda function code and requirements.txt file.
  1. The script.py file contains the code for creating a SageMaker Processing Job.
import json
import boto3
import os
import time

sagemaker = boto3.client('sagemaker')
role_arn = os.environ.get('SAGEMAKER_ROLE_ARN')
output_prefix = os.environ.get('S3_OUTPUT_PREFIX')

def lambda_handler(event, context):
    bucket_name = event['bucket_name']
    input_prefix = "keyframes/" +  event['video_name'] + "/"
    s3_outdir = input_prefix.split('/')[1]

    print(f"Bucket Name: {bucket_name}")
    print(f"Input Prefix: {input_prefix}")

    processing_job_name = f"processing-job-{int(time.time())}"

    print(f"Processing Job Name: {processing_job_name}")
    response = sagemaker.create_processing_job(
            'ImageUri': "763104351884.dkr.ecr.ap-southeast-1.amazonaws.com/pytorch-training:1.9-cpu-py38",
            'ContainerEntrypoint': ['python3', '/opt/ml/processing/code/script.py']
                'InputName': 'input',
                'S3Input': {
                    'S3Uri': f's3://{bucket_name}/{input_prefix}',
                    'LocalPath': '/opt/ml/processing/input',
                    'S3DataType': 'S3Prefix',
                    'S3InputMode': 'File',
                'InputName': 'code',
                'S3Input': {
                    'S3Uri': f's3://{bucket_name}/code/',
                    'LocalPath': '/opt/ml/processing/code',
                    'S3DataType': 'S3Prefix',
                    'S3InputMode': 'File',
            'Outputs': [
                    'OutputName': 'output',
                    'S3Output': {
                        'S3Uri': f's3://{bucket_name}/{output_prefix}',
                        'LocalPath': '/opt/ml/processing/output',
                        'S3UploadMode': 'EndOfJob'
            'ClusterConfig': {
                'InstanceCount': 1,
                'InstanceType': 'ml.m5.xlarge',
                'VolumeSizeInGB': 30,
            'S3_OUTDIR': s3_outdir


    return {
        'statusCode': 200,
        'body': json.dumps({
            'message': 'SageMaker Processing Job created successfully',
            'jobName': processing_job_name,
            'response': response

if __name__ == '__main__':
    lambda_handler({}, {})

5. Code for SageMaker Processing Job

import os
import sys
import subprocess

import torch
from sentence_transformers import SentenceTransformer
from PIL import Image
import numpy

def processor(input_dir, output_dir , folder):
    print("Loading model.....")
    model = SentenceTransformer("clip-ViT-B-32")
    print("Model loaded")
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = model.to(device)
    print(f"Using device: {device}")

    image_paths = os.listdir(os.path.join(input_dir))
    images = []
    for image_path in image_paths:
        image = Image.open(os.path.join(input_dir, image_path))

    embeddings = model.encode(images)

    numpy.save(os.path.join(output_dir, f"{folder}.npy"), embeddings)
    print(f"Embeddings for {folder} saved")

if __name__ == "__main__":
    input_dir = "/opt/ml/processing/input"
    output_dir = "/opt/ml/processing/output"

    # Local testing
    # input_dir = "./dataset/keyframes/L01_V001"
    # output_dir = "./dataset/output"
    s3_out_dir = os.environ.get('S3_OUTDIR')
    if s3_out_dir is None:
        raise ValueError("S3_OUTDIR environment variable is not set")

    print("Starting processing.....")

    processor(input_dir, output_dir , s3_out_dir)

    print("Processing complete")
  1. The requirements.txt file contains the required libraries for the processing job.