Setup

1 Setup Infrastructure

1.1 Create ecr repo ecr-repo

resource "aws_ecr_repository" "private_repo" {
  name = "semantic-repo"
  
  tags = var.common_tags
}

output "ecr_repo_url" {
  value = aws_ecr_repository.private_repo.repository_url
}

1.2 Fectch Opensearch domain

data "aws_opensearch_domain" "semantic_search_domain" {
  domain_name = var.opensearch_domain_name
}

1.3 Create VPC

#################################
# VPC
#################################
resource "aws_vpc" "main" {
  cidr_block = "172.0.0.0/16"
  enable_dns_support = true
  enable_dns_hostnames = true
  tags = var.common_tags
}

#################################
# Internet Gateway
#################################
resource "aws_internet_gateway" "main" {
  vpc_id = aws_vpc.main.id
  tags   = var.common_tags
}

#################################
# Public Subnets
#################################
resource "aws_subnet" "public" {
  count = 2
  vpc_id = aws_vpc.main.id
  cidr_block = "172.0.${count.index}.0/24"
  map_public_ip_on_launch = true
  tags = var.common_tags
}

#################################
# Private Subnets
#################################
resource "aws_subnet" "private" {
  count = 2
  vpc_id = aws_vpc.main.id
  cidr_block = "172.0.${count.index + 10}.0/24"
  tags = var.common_tags
}

#################################
# Route Tables
#################################
resource "aws_route_table" "public" {
  vpc_id = aws_vpc.main.id
  route {
    cidr_block = "0.0.0.0/0"
    gateway_id = aws_internet_gateway.main.id
  }
  tags = var.common_tags
}

resource "aws_route_table_association" "public" {
  count = 2
  subnet_id = aws_subnet.public[count.index].id
  route_table_id = aws_route_table.public.id
}

resource "aws_route_table" "private" {
  vpc_id = aws_vpc.main.id
  tags = var.common_tags
}

resource "aws_route_table_association" "private" {
  count = 2
  subnet_id = aws_subnet.private[count.index].id
  route_table_id = aws_route_table.private.id
}

#################################
# Security Groups
#################################
resource "aws_security_group" "default" {
  vpc_id = aws_vpc.main.id
  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }
  # Allow http traffic
    ingress {
        from_port   = 80
        to_port     = 80
        protocol    = "tcp"
        cidr_blocks = ["0.0.0.0/0"]
    }

    # Allow https traffic
    ingress {
        from_port   = 443
        to_port     = 443
        protocol    = "tcp"
        cidr_blocks = ["0.0.0.0/0"]
    }

    # Allow ssh traffic
    ingress {
        from_port   = 22
        to_port     = 22
        protocol    = "tcp"
        cidr_blocks = ["0.0.0.0/0"]
    }
  tags = var.common_tags
}

output "vpc_id" {
  value = aws_vpc.main.id
  
}

output "public_subnet_ids" {
  value = aws_subnet.public[*].id
  
}

output "private_subnet_ids" {
  value = aws_subnet.private[*].id
  
}

output "security_group_id" {
  value = aws_security_group.default.id
  
}

2 Setup ECR Image , Sagemaker Endpoint

2.1 Buid docker image for processing data

  1. Create a file Dockerfile in the root directory of the project
FROM python:3.9-slim

RUN apt-get update && \
    apt-get install -y ffmpeg && \
    pip install boto3

COPY extract_keyframes.py /app/extract_keyframes.py

WORKDIR /app

ENTRYPOINT ["python", "extract_keyframes.py"]
  1. Create a file extract_keyframes.py in the root directory of the project
import os
import boto3
import subprocess
from botocore.exceptions import NoCredentialsError
import json

S3_BUCKET = os.environ.get('S3_BUCKET')
INPUT_VIDEO_KEY = os.environ.get('INPUT_VIDEO_KEY') 
OUTPUT_FOLDER_PREFIX = os.environ.get('OUTPUT_FOLDER_PREFIX', 'output/')
OUTPUT_MAP_FOLDER = 'mapkeyframes'

s3 = boto3.client('s3')
lambda_client = boto3.client('lambda')

def download_from_s3(bucket, key, download_path):
    try:
        s3.download_file(bucket, key, download_path)
        print(f"Downloaded {key} from {bucket}")
    except NoCredentialsError:
        print("Error: Not authorized")
        raise

def upload_to_s3(bucket, file_path, s3_key):
    try:
        s3.upload_file(file_path, bucket, s3_key)
        print(f"Uploaded {file_path} to {bucket}/{s3_key}")
    except NoCredentialsError:
        print("Error: Not authorized")
        raise

def extract_keyframes(video_path, output_folder):
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    cmd = [
        'ffmpeg', '-i', video_path, '-vf', 'select=eq(pict_type\\,I)', 
        '-vsync', 'vfr', '-q:v', '2', f'{output_folder}/%03d.jpg'
    ]
    subprocess.run(cmd)

def generate_map_csv(keyframes_folder, csv_path, video_name):
    with open(csv_path, 'w') as f:
        f.write('FrameNumber,ImagePath\n')
        for image in sorted(os.listdir(keyframes_folder)):
            if image.endswith('.jpg'):
                frame_number = int(image.split('.')[0])
                image_path = os.path.join(keyframes_folder, image)
                f.write(f"{frame_number},{image_path}\n")
    print(f"Created {csv_path}")

def invoke_second_lambda(bucket_name, video_name):
    payload = {
        'bucket_name': bucket_name,
        'video_name': video_name
    }
    print(f"Invoke second lambda with payload: {payload}")
    response = lambda_client.invoke(
        FunctionName="embedding_data",
        InvocationType='Event',  
        Payload=json.dumps(payload)
    )
    print(f"Start Embedding ....: {response}")

def main():
    # Download video
    video_name = os.path.basename(INPUT_VIDEO_KEY)
    video_path = f'/tmp/{video_name}'
    download_from_s3(S3_BUCKET, INPUT_VIDEO_KEY, video_path)

    output_folder_name = os.path.splitext(video_name)[0]
    output_folder = f'/tmp/{output_folder_name}'

    extract_keyframes(video_path, output_folder)

    csv_filename = f'map-keyframe-{output_folder_name}.csv'
    csv_path = f'/tmp/{csv_filename}'
    generate_map_csv(output_folder, csv_path, output_folder_name)

    # Upload keyframes and csv to S3
    for image in os.listdir(output_folder):
        upload_to_s3(S3_BUCKET, os.path.join(output_folder, image), f'{OUTPUT_FOLDER_PREFIX}{output_folder_name}/{image}')
    
    upload_to_s3(S3_BUCKET, csv_path, f'{OUTPUT_MAP_FOLDER}/{csv_filename}')

    # Invoke second lambda
    invoke_second_lambda(S3_BUCKET, output_folder_name)

if __name__ == "__main__":
    main()
  1. Build the docker image cd to the root directory of the project and run the following command
@echo "Building ECS Video"
docker build -t ecs-video -f Dockerfile .
  1. Push the docker image to ECR You can go to the ECR repository and follow the instructions to push the docker image to ECR or you can run the following command
@echo "Pushing ECS Video to ECR"
aws ecr get-login-password --region ap-southeast-1 | docker login --username AWS --password-stdin <aws account id>.dkr.ecr.ap-southeast-1.amazonaws.com
docker tag ecs-video:latest <aws account id>.dkr.ecr.ap-southeast-1.amazonaws.com/semantic-repo:ecs-video
docker push <aws account id>.dkr.ecr.ap-southeast-1.amazonaws.com/semantic-repo:ecs-video
@echo "Pushed ECS Video to ECR"

2.2 Create Sagemaker Endpoint

  1. Structure of the code
|-- code
|   |-- inference_code.py
|   `-- requirements.txt
`-- setup.py
  1. Create a file inference_code.py in the code folder
from sentence_transformers import SentenceTransformer
import json
import os


def model_fn(model_dir):

    model_path = os.path.join(model_dir, "model")   

    model = SentenceTransformer(model_path)

    return model

def input_fn(request_body, request_content_type):
    if request_content_type == "application/json":
        input_data = json.loads(request_body)
    else:
        return request_body

    return input_data["text"]
    


def predict_fn(input_data, model):
    embeddings = model.encode(input_data)
    return embeddings


def output_fn(prediction, content_type):
    if content_type == "application/json":
        return json.dumps({"embeddings": prediction.tolist()})
    else:
        return "content type not supported"
  1. Create a file requirements.txt in the code folder
sentence_transformers
  1. Create a file setup.py in the root directory of the project.
  • Before running the setup.py, you need create a role for the sagemaker endpoint to access the S3 bucket sagemaker-local
from sentence_transformers import SentenceTransformer
import tarfile
from sagemaker.pytorch import PyTorchModel
import time
import os

# Wait for about 5 minutes
model = SentenceTransformer('clip-ViT-B-32')
model.save('model')


model_path = 'model/'
code_path = 'code/'

# Wait for about 5 minutes
zipped_model_path = os.path.join(model_path, "model.tar.gz")
with tarfile.open(zipped_model_path, "w:gz") as tar:
    tar.add(model_path)
    tar.add(code_path)

# Wait for about 20 minutes
endpoint_name = "clip-model-" + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
model = PyTorchModel(
    entry_point="inference_code.py",
    model_data=zipped_model_path,
    role="arn:aws:iam::<aws account id>:role/sagemaker-local",
    framework_version="1.5",
    py_version="py3",
)

predictor = model.deploy(
    initial_instance_count=1, instance_type="ml.m5.xlarge", endpoint_name=endpoint_name
)

# Wait total of 30 minutes
  1. Run the setup.py file
python setup.py

Wait for about 30 minutes for the endpoint to be created