Embedding
1. Create Role for Lambda and Sagemaker Processing Job
#################################
# SageMaker processing role
#################################
resource "aws_iam_role" "sagemaker_processing_role" {
name = "sagemaker-processing-role"
assume_role_policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Principal = {
Service = "sagemaker.amazonaws.com"
},
Action = "sts:AssumeRole"
}
]
})
}
resource "aws_iam_policy" "sagemaker_processing_policy" {
name = "sagemaker-processing-policy"
description = "Policy for SageMaker Processing to access S3, ECR, and logs"
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Action = [
"s3:GetObject",
"s3:PutObject",
"s3:ListBucket"
],
Resource = [
# TODO: Replace with your bucket name
"arn:aws:s3:::${var.bucket_id}",
"arn:aws:s3:::${var.bucket_id}/*"
]
},
{
Effect = "Allow",
Action = [
"ecr:GetDownloadUrlForLayer",
"ecr:BatchCheckLayerAvailability",
"ecr:GetDownloadUrlForLayer",
"ecr:BatchGetImage",
"ecr:ListImages",
"ecr:GetAuthorizationToken"
],
Resource = [
"*"
]
},
{
Effect = "Allow",
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
Resource = "*"
}
]
})
}
resource "aws_iam_role_policy_attachment" "sagemaker_processing_policy_attachment" {
policy_arn = aws_iam_policy.sagemaker_processing_policy.arn
role = aws_iam_role.sagemaker_processing_role.name
}
output "sagemaker_processing_role_arn" {
value = aws_iam_role.sagemaker_processing_role.arn
}
#################################
# Role for lambda function
#################################
resource "aws_iam_role" "lambda_role" {
name = "lambda-sagemaker-role"
assume_role_policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Action = "sts:AssumeRole",
Effect = "Allow",
Principal = {
Service = "lambda.amazonaws.com"
}
}
]
})
}
resource "aws_iam_policy" "lambda_policy" {
name = "lambda-sagemaker-policy"
description = "Policy for Lambda to invoke SageMaker and access S3"
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Action = [
"sagemaker:CreateProcessingJob",
"sagemaker:DescribeProcessingJob",
"sagemaker:ListProcessingJobs"
],
Resource = "*"
},
{
Effect = "Allow",
Action = [
"s3:GetObject",
"s3:PutObject",
"s3:ListBucket"
],
Resource = [
"arn:aws:s3:::${var.bucket_id}",
"arn:aws:s3:::${var.bucket_id}/*"
]
}
]
})
}
resource "aws_iam_role_policy_attachment" "lambda_policy_attachment" {
policy_arn = aws_iam_policy.lambda_policy.arn
role = aws_iam_role.lambda_role.name
}
# add basic execution policy to the role
resource "aws_iam_role_policy" "lambda_execution_policy" {
name = "lambda-sagemaker-execution-policy-${var.bucket_id}"
role = aws_iam_role.lambda_role.id
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
Resource = "*"
}
]
})
}
#################################
# IAM Policy for Lambda to Pass SageMaker Role
#################################
resource "aws_iam_policy" "pass_role_policy" {
name = "lambda-pass-sagemaker-role-policy"
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Action = "iam:PassRole",
Resource = aws_iam_role.sagemaker_processing_role.arn
}
]
})
}
resource "aws_iam_role_policy_attachment" "lambda_pass_role_policy_attachment" {
role = aws_iam_role.lambda_role.name
policy_arn = aws_iam_policy.pass_role_policy.arn
}
2. Create Lambda Function
#################################
# AWS Lambda Function
#################################
resource "aws_lambda_function" "processing_data" {
function_name = "embedding_data"
role = aws_iam_role.lambda_role.arn
handler = "lambda_function.lambda_handler"
runtime = "python3.8"
filename = "${path.root}/src/embedding/lambda/lambda_function.zip"
timeout = 60
memory_size = 128
environment {
variables = {
SAGEMAKER_ROLE_ARN = aws_iam_role.sagemaker_processing_role.arn
S3_BUCKET_NAME = var.bucket_id
S3_OUTPUT_PREFIX = "embeddings/"
}
}
depends_on = [
aws_iam_role.lambda_role,
]
}
resource "aws_lambda_permission" "allow_ecs" {
statement_id = "AllowECSTaskInvoke"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.processing_data.arn
principal = "ecs.amazonaws.com"
}
3. Create S3 code
folder for SageMaker Processing Job
resource "aws_s3_bucket_object" "script" {
bucket = var.bucket_id
key = "code/script.py"
source = "${path.root}/src/embedding/sagemaker-processing/script.py"
}
resource "aws_s3_bucket_object" "requirements" {
bucket = var.bucket_id
key = "code/requirements.txt"
source = "${path.root}/src/embedding/sagemaker-processing/requirements.txt"
}
4. Code for Lambda Function
- Create a folder containing the lambda function code and requirements.txt file.
script.py
requirements.txt
- The
script.py
file contains the code for creating a SageMaker Processing Job.
import json
import boto3
import os
import time
sagemaker = boto3.client('sagemaker')
role_arn = os.environ.get('SAGEMAKER_ROLE_ARN')
output_prefix = os.environ.get('S3_OUTPUT_PREFIX')
def lambda_handler(event, context):
bucket_name = event['bucket_name']
input_prefix = "keyframes/" + event['video_name'] + "/"
s3_outdir = input_prefix.split('/')[1]
print(f"Bucket Name: {bucket_name}")
print(f"Input Prefix: {input_prefix}")
processing_job_name = f"processing-job-{int(time.time())}"
print(f"Processing Job Name: {processing_job_name}")
response = sagemaker.create_processing_job(
ProcessingJobName=processing_job_name,
RoleArn=role_arn,
AppSpecification={
'ImageUri': "763104351884.dkr.ecr.ap-southeast-1.amazonaws.com/pytorch-training:1.9-cpu-py38",
'ContainerEntrypoint': ['python3', '/opt/ml/processing/code/script.py']
},
ProcessingInputs=[
{
'InputName': 'input',
'S3Input': {
'S3Uri': f's3://{bucket_name}/{input_prefix}',
'LocalPath': '/opt/ml/processing/input',
'S3DataType': 'S3Prefix',
'S3InputMode': 'File',
},
},
{
'InputName': 'code',
'S3Input': {
'S3Uri': f's3://{bucket_name}/code/',
'LocalPath': '/opt/ml/processing/code',
'S3DataType': 'S3Prefix',
'S3InputMode': 'File',
},
}
],
ProcessingOutputConfig={
'Outputs': [
{
'OutputName': 'output',
'S3Output': {
'S3Uri': f's3://{bucket_name}/{output_prefix}',
'LocalPath': '/opt/ml/processing/output',
'S3UploadMode': 'EndOfJob'
},
},
],
},
ProcessingResources={
'ClusterConfig': {
'InstanceCount': 1,
'InstanceType': 'ml.m5.xlarge',
'VolumeSizeInGB': 30,
},
},
Environment={
'S3_OUTDIR': s3_outdir
}
)
print(response)
return {
'statusCode': 200,
'body': json.dumps({
'message': 'SageMaker Processing Job created successfully',
'jobName': processing_job_name,
'response': response
})
}
if __name__ == '__main__':
lambda_handler({}, {})
5. Code for SageMaker Processing Job
import os
import sys
import subprocess
subprocess.check_call(
[
sys.executable,
"-m",
"pip",
"install",
"-r",
"/opt/ml/processing/code/requirements.txt",
]
)
import torch
from sentence_transformers import SentenceTransformer
from PIL import Image
import numpy
def processor(input_dir, output_dir , folder):
print("Loading model.....")
model = SentenceTransformer("clip-ViT-B-32")
print("Model loaded")
device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)
print(f"Using device: {device}")
image_paths = os.listdir(os.path.join(input_dir))
images = []
for image_path in image_paths:
image = Image.open(os.path.join(input_dir, image_path))
images.append(image)
embeddings = model.encode(images)
numpy.save(os.path.join(output_dir, f"{folder}.npy"), embeddings)
print(f"Embeddings for {folder} saved")
if __name__ == "__main__":
input_dir = "/opt/ml/processing/input"
output_dir = "/opt/ml/processing/output"
# Local testing
# input_dir = "./dataset/keyframes/L01_V001"
# output_dir = "./dataset/output"
s3_out_dir = os.environ.get('S3_OUTDIR')
if s3_out_dir is None:
raise ValueError("S3_OUTDIR environment variable is not set")
print("Starting processing.....")
processor(input_dir, output_dir , s3_out_dir)
print("Processing complete")
- The
requirements.txt
file contains the required libraries for the processing job.
sentence_transformers
pillow