Embedding

1. Create Role for Lambda and Sagemaker Processing Job

#################################
# SageMaker processing role
#################################

resource "aws_iam_role" "sagemaker_processing_role" {
  name = "sagemaker-processing-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Effect = "Allow",
        Principal = {
          Service = "sagemaker.amazonaws.com"
        },
        Action = "sts:AssumeRole"
      }
    ]
  })
}

resource "aws_iam_policy" "sagemaker_processing_policy" {
  name        = "sagemaker-processing-policy"
  description = "Policy for SageMaker Processing to access S3, ECR, and logs"

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Effect = "Allow",
        Action = [
          "s3:GetObject",
          "s3:PutObject",
          "s3:ListBucket"
        ],
        Resource = [
          # TODO: Replace with your bucket name
          "arn:aws:s3:::${var.bucket_id}",
          "arn:aws:s3:::${var.bucket_id}/*"
        ]
      },
      {
        Effect = "Allow",
        Action = [
          "ecr:GetDownloadUrlForLayer",
          "ecr:BatchCheckLayerAvailability",
          "ecr:GetDownloadUrlForLayer",
          "ecr:BatchGetImage",
          "ecr:ListImages",
          "ecr:GetAuthorizationToken"
        ],
        Resource = [
          "*"
        ]
      },
      {
        Effect = "Allow",
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ],
        Resource = "*"
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "sagemaker_processing_policy_attachment" {
  policy_arn = aws_iam_policy.sagemaker_processing_policy.arn
  role       = aws_iam_role.sagemaker_processing_role.name
}

output "sagemaker_processing_role_arn" {
  value = aws_iam_role.sagemaker_processing_role.arn
  
}

#################################
# Role for lambda function
#################################
resource "aws_iam_role" "lambda_role" {
  name = "lambda-sagemaker-role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Action = "sts:AssumeRole",
        Effect = "Allow",
        Principal = {
          Service = "lambda.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_policy" "lambda_policy" {
  name        = "lambda-sagemaker-policy"
  description = "Policy for Lambda to invoke SageMaker and access S3"

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Effect = "Allow",
        Action = [
          "sagemaker:CreateProcessingJob",
          "sagemaker:DescribeProcessingJob",
          "sagemaker:ListProcessingJobs"
        ],
        Resource = "*"
      },
      {
        Effect = "Allow",
        Action = [
          "s3:GetObject",
          "s3:PutObject",
          "s3:ListBucket"
        ],
        Resource = [
          "arn:aws:s3:::${var.bucket_id}",
          "arn:aws:s3:::${var.bucket_id}/*"
        ]
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_policy_attachment" {
  policy_arn = aws_iam_policy.lambda_policy.arn
  role     = aws_iam_role.lambda_role.name
}

# add basic execution policy to the role
resource "aws_iam_role_policy" "lambda_execution_policy" {
  name = "lambda-sagemaker-execution-policy-${var.bucket_id}"
  role = aws_iam_role.lambda_role.id

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Effect   = "Allow",
        Action   = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ],
        Resource = "*"
      }
    ]
  })
}

#################################
# IAM Policy for Lambda to Pass SageMaker Role
#################################

resource "aws_iam_policy" "pass_role_policy" {
  name = "lambda-pass-sagemaker-role-policy"

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Effect = "Allow",
        Action = "iam:PassRole",
        Resource = aws_iam_role.sagemaker_processing_role.arn
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_pass_role_policy_attachment" {
  role       = aws_iam_role.lambda_role.name
  policy_arn = aws_iam_policy.pass_role_policy.arn
}

2. Create Lambda Function

#################################
# AWS Lambda Function
#################################
resource "aws_lambda_function" "processing_data" {
  function_name = "embedding_data"
  role          = aws_iam_role.lambda_role.arn
  handler       = "lambda_function.lambda_handler"
  runtime       = "python3.8"
  filename      = "${path.root}/src/embedding/lambda/lambda_function.zip"
  timeout       = 60
  memory_size   = 128

  environment {
    variables = {
      SAGEMAKER_ROLE_ARN = aws_iam_role.sagemaker_processing_role.arn
      S3_BUCKET_NAME     = var.bucket_id
      S3_OUTPUT_PREFIX   = "embeddings/"

    }
  }

  depends_on = [
    aws_iam_role.lambda_role,
  ]
}

resource "aws_lambda_permission" "allow_ecs" {
  statement_id  = "AllowECSTaskInvoke"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.processing_data.arn
  principal     = "ecs.amazonaws.com"
}

3. Create S3 code folder for SageMaker Processing Job

resource "aws_s3_bucket_object" "script" {
  bucket = var.bucket_id
  key    = "code/script.py"
  source = "${path.root}/src/embedding/sagemaker-processing/script.py"
}

resource "aws_s3_bucket_object" "requirements" {
  bucket = var.bucket_id
  key    = "code/requirements.txt"
  source = "${path.root}/src/embedding/sagemaker-processing/requirements.txt"
}

4. Code for Lambda Function

  • Create a folder containing the lambda function code and requirements.txt file.
script.py
requirements.txt
  1. The script.py file contains the code for creating a SageMaker Processing Job.
import json
import boto3
import os
import time

sagemaker = boto3.client('sagemaker')
role_arn = os.environ.get('SAGEMAKER_ROLE_ARN')
output_prefix = os.environ.get('S3_OUTPUT_PREFIX')

def lambda_handler(event, context):
    bucket_name = event['bucket_name']
    input_prefix = "keyframes/" +  event['video_name'] + "/"
    s3_outdir = input_prefix.split('/')[1]

    print(f"Bucket Name: {bucket_name}")
    print(f"Input Prefix: {input_prefix}")

    processing_job_name = f"processing-job-{int(time.time())}"

    print(f"Processing Job Name: {processing_job_name}")
    
    
    response = sagemaker.create_processing_job(
        ProcessingJobName=processing_job_name,
        RoleArn=role_arn,
        AppSpecification={
            'ImageUri': "763104351884.dkr.ecr.ap-southeast-1.amazonaws.com/pytorch-training:1.9-cpu-py38",
            'ContainerEntrypoint': ['python3', '/opt/ml/processing/code/script.py']
        },
        ProcessingInputs=[
            {
                'InputName': 'input',
                'S3Input': {
                    'S3Uri': f's3://{bucket_name}/{input_prefix}',
                    'LocalPath': '/opt/ml/processing/input',
                    'S3DataType': 'S3Prefix',
                    'S3InputMode': 'File',
                },
            },
            {
                'InputName': 'code',
                'S3Input': {
                    'S3Uri': f's3://{bucket_name}/code/',
                    'LocalPath': '/opt/ml/processing/code',
                    'S3DataType': 'S3Prefix',
                    'S3InputMode': 'File',
                },
            }
        ],
        ProcessingOutputConfig={
            'Outputs': [
                {
                    'OutputName': 'output',
                    'S3Output': {
                        'S3Uri': f's3://{bucket_name}/{output_prefix}',
                        'LocalPath': '/opt/ml/processing/output',
                        'S3UploadMode': 'EndOfJob'
                    },
                },
            ],
        },
        ProcessingResources={
            'ClusterConfig': {
                'InstanceCount': 1,
                'InstanceType': 'ml.m5.xlarge',
                'VolumeSizeInGB': 30,
            },
        },
        Environment={
            'S3_OUTDIR': s3_outdir
        }
    )

    print(response)

    return {
        'statusCode': 200,
        'body': json.dumps({
            'message': 'SageMaker Processing Job created successfully',
            'jobName': processing_job_name,
            'response': response
        })
    }
    

    
if __name__ == '__main__':
    lambda_handler({}, {})

5. Code for SageMaker Processing Job

import os
import sys
import subprocess

subprocess.check_call(
    [
        sys.executable,
        "-m",
        "pip",
        "install",
        "-r",
        "/opt/ml/processing/code/requirements.txt",
    ]
)
import torch
from sentence_transformers import SentenceTransformer
from PIL import Image
import numpy


def processor(input_dir, output_dir , folder):
    print("Loading model.....")
    model = SentenceTransformer("clip-ViT-B-32")
    print("Model loaded")
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = model.to(device)
    print(f"Using device: {device}")

    
    image_paths = os.listdir(os.path.join(input_dir))
    images = []
    for image_path in image_paths:
        image = Image.open(os.path.join(input_dir, image_path))
        images.append(image)

    embeddings = model.encode(images)

    numpy.save(os.path.join(output_dir, f"{folder}.npy"), embeddings)
    print(f"Embeddings for {folder} saved")


if __name__ == "__main__":
    input_dir = "/opt/ml/processing/input"
    output_dir = "/opt/ml/processing/output"

    # Local testing
    # input_dir = "./dataset/keyframes/L01_V001"
    # output_dir = "./dataset/output"
    s3_out_dir = os.environ.get('S3_OUTDIR')
    if s3_out_dir is None:
        raise ValueError("S3_OUTDIR environment variable is not set")

    print("Starting processing.....")

    processor(input_dir, output_dir , s3_out_dir)

    print("Processing complete")
  1. The requirements.txt file contains the required libraries for the processing job.
sentence_transformers
pillow