Embedding Processing with SageMaker

1.Prepare Script for Embedding Processing

  1. Create a folder embedding-processing and navigate to it.
|-- main.ipynb
|-- requirements.txt
`-- script.py
  1. Create a main.ipynb file in the embedding-processing directory.
Import Libraries
import sagemaker
import boto3
from PIL import Image
from io import BytesIO
import  matplotlib.pyplot as plt
import sagemaker
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
Set Up SageMaker Session
s3 = boto3.client('s3')
sess = sagemaker.Session()
BUCKET_NAME = "ai-challenge-2024"
PREFIX = "keyframes/"
OUTPUT_PREFIX = "keyframes-processed/"
FOLDERS = []
response = s3.list_objects_v2(Bucket=BUCKET_NAME, Prefix=PREFIX, Delimiter='/')

# Get the subfolders in the bucket
if 'CommonPrefixes' in response:
    subfolders = [prefix.get('Prefix') for prefix in response['CommonPrefixes']]
    for subfolder in subfolders:
        FOLDERS.append(subfolder)
else:
    print("No subfolders found.")
Show Sample Image
# Get exmaple file in each subfolder
example_folder = FOLDERS[0]
files = s3.list_objects_v2(Bucket=BUCKET_NAME, Prefix=example_folder)

plt.figure(figsize=(20, 10))

for i in range(1, 5):
    example_file = files['Contents'][i]['Key']
    example_img = s3.get_object(Bucket=BUCKET_NAME, Key=example_file)["Body"].read()
    img = Image.open(BytesIO(example_img))
    plt.subplot(1, 4, i)
    plt.imshow(img)

plt.show()

Sample Image

Script Processor
%%writefile script.py
import os
import sys
import subprocess

subprocess.check_call(
    [
        sys.executable,
        "-m",
        "pip",
        "install",
        "-r",
        "/opt/ml/processing/code/requirements.txt",
    ]
)
import torch
from sentence_transformers import SentenceTransformer
from PIL import Image
import numpy


def processor(input_dir, output_dir):
    print("Loading model.....")
    model = SentenceTransformer("clip-ViT-B-32")
    print("Model loaded")
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model = model.to(device)
    print(f"Using device: {device}")

    folders = os.listdir(input_dir)

    for folder in folders:
        # Ignore :  '/opt/ml/processing/input/code/script.py'
        if folder == "code":
            continue
        image_paths = os.listdir(os.path.join(input_dir, folder))
        images = []
        for image_path in image_paths:
            image = Image.open(os.path.join(input_dir, folder, image_path))
            images.append(image)

        embeddings = model.encode(images)

        numpy.save(os.path.join(output_dir, f"{folder}.npy"), embeddings)
        print(f"Embeddings for {folder} saved")


if __name__ == "__main__":
    input_dir = "/opt/ml/processing/input"
    output_dir = "/opt/ml/processing/output"

    # Local testing
    # input_dir = "./dataset/keyframes"
    # output_dir = "./dataset/output"

    print("Starting processing.....")

    processor(input_dir, output_dir)

    print("Processing complete")
Requirements.txt
%%writefile requirements.txt
sentence_transformers
pillow
Upload requirements.txt to S3
s3.upload_file("requirements.txt", BUCKET_NAME, "code/requirements.txt")
Start processing job
role =  sagemaker.get_execution_role() 

image_uri = sagemaker.image_uris.retrieve(
    framework='pytorch',
    region=sess.boto_region_name,
    version='1.9',
    py_version='py38',
    instance_type='ml.m5.xlarge',  
    image_scope='training'
)

script_processor = ScriptProcessor(
    image_uri=image_uri,
    command=['python3'],
    role=role,
    instance_count=1, 
    instance_type='ml.m5.xlarge', 
    sagemaker_session=sess
)

input_s3_uri = 's3://{}/{}/'.format(BUCKET_NAME, PREFIX)
output_s3_uri = 's3://{}/{}/'.format(BUCKET_NAME, OUTPUT_PREFIX) 

processing_inputs = [
    ProcessingInput(
        source=input_s3_uri,
        destination='/opt/ml/processing/input'
    ),
    ProcessingInput(
        source='s3://{}/code/'.format(BUCKET_NAME),
        destination='/opt/ml/processing/code'
    )
]

processing_outputs = [
    ProcessingOutput(
        source='/opt/ml/processing/output',
        destination=output_s3_uri
    )
]

script_processor.run(
    code='script.py',
    inputs=processing_inputs,
    outputs=processing_outputs,
    logs=True
)

Note: If you encounter an error related to the quota limit, you can request a limit increase by following the instructions.

SageMaker Processing Job

2.Monitor Processing Job

You can monitor the processing job by navigating to the SageMaker console and selecting the Processing jobs tab. You will see the job status, logs, and other details related to the processing job.

SageMaker Processing Job

3.Check Output Data in S3

Once the processing job is complete, you can check the output data in the S3 bucket. Navigate to the S3 console and locate the embeddings folder to view the processed embeddings.

S3 Output Data