ecr-repo
resource "aws_ecr_repository" "private_repo" {
name = "semantic-repo"
tags = var.common_tags
}
output "ecr_repo_url" {
value = aws_ecr_repository.private_repo.repository_url
}
data "aws_opensearch_domain" "semantic_search_domain" {
domain_name = var.opensearch_domain_name
}
#################################
# VPC
#################################
resource "aws_vpc" "main" {
cidr_block = "172.0.0.0/16"
enable_dns_support = true
enable_dns_hostnames = true
tags = var.common_tags
}
#################################
# Internet Gateway
#################################
resource "aws_internet_gateway" "main" {
vpc_id = aws_vpc.main.id
tags = var.common_tags
}
#################################
# Public Subnets
#################################
resource "aws_subnet" "public" {
count = 2
vpc_id = aws_vpc.main.id
cidr_block = "172.0.${count.index}.0/24"
map_public_ip_on_launch = true
tags = var.common_tags
}
#################################
# Private Subnets
#################################
resource "aws_subnet" "private" {
count = 2
vpc_id = aws_vpc.main.id
cidr_block = "172.0.${count.index + 10}.0/24"
tags = var.common_tags
}
#################################
# Route Tables
#################################
resource "aws_route_table" "public" {
vpc_id = aws_vpc.main.id
route {
cidr_block = "0.0.0.0/0"
gateway_id = aws_internet_gateway.main.id
}
tags = var.common_tags
}
resource "aws_route_table_association" "public" {
count = 2
subnet_id = aws_subnet.public[count.index].id
route_table_id = aws_route_table.public.id
}
resource "aws_route_table" "private" {
vpc_id = aws_vpc.main.id
tags = var.common_tags
}
resource "aws_route_table_association" "private" {
count = 2
subnet_id = aws_subnet.private[count.index].id
route_table_id = aws_route_table.private.id
}
#################################
# Security Groups
#################################
resource "aws_security_group" "default" {
vpc_id = aws_vpc.main.id
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
# Allow http traffic
ingress {
from_port = 80
to_port = 80
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
# Allow https traffic
ingress {
from_port = 443
to_port = 443
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
# Allow ssh traffic
ingress {
from_port = 22
to_port = 22
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
tags = var.common_tags
}
output "vpc_id" {
value = aws_vpc.main.id
}
output "public_subnet_ids" {
value = aws_subnet.public[*].id
}
output "private_subnet_ids" {
value = aws_subnet.private[*].id
}
output "security_group_id" {
value = aws_security_group.default.id
}
Dockerfile
in the root directory of the projectFROM python:3.9-slim
RUN apt-get update && \
apt-get install -y ffmpeg && \
pip install boto3
COPY extract_keyframes.py /app/extract_keyframes.py
WORKDIR /app
ENTRYPOINT ["python", "extract_keyframes.py"]
extract_keyframes.py
in the root directory of the projectimport os
import boto3
import subprocess
from botocore.exceptions import NoCredentialsError
import json
S3_BUCKET = os.environ.get('S3_BUCKET')
INPUT_VIDEO_KEY = os.environ.get('INPUT_VIDEO_KEY')
OUTPUT_FOLDER_PREFIX = os.environ.get('OUTPUT_FOLDER_PREFIX', 'output/')
OUTPUT_MAP_FOLDER = 'mapkeyframes'
s3 = boto3.client('s3')
lambda_client = boto3.client('lambda')
def download_from_s3(bucket, key, download_path):
try:
s3.download_file(bucket, key, download_path)
print(f"Downloaded {key} from {bucket}")
except NoCredentialsError:
print("Error: Not authorized")
raise
def upload_to_s3(bucket, file_path, s3_key):
try:
s3.upload_file(file_path, bucket, s3_key)
print(f"Uploaded {file_path} to {bucket}/{s3_key}")
except NoCredentialsError:
print("Error: Not authorized")
raise
def extract_keyframes(video_path, output_folder):
if not os.path.exists(output_folder):
os.makedirs(output_folder)
cmd = [
'ffmpeg', '-i', video_path, '-vf', 'select=eq(pict_type\\,I)',
'-vsync', 'vfr', '-q:v', '2', f'{output_folder}/%03d.jpg'
]
subprocess.run(cmd)
def generate_map_csv(keyframes_folder, csv_path, video_name):
with open(csv_path, 'w') as f:
f.write('FrameNumber,ImagePath\n')
for image in sorted(os.listdir(keyframes_folder)):
if image.endswith('.jpg'):
frame_number = int(image.split('.')[0])
image_path = os.path.join(keyframes_folder, image)
f.write(f"{frame_number},{image_path}\n")
print(f"Created {csv_path}")
def invoke_second_lambda(bucket_name, video_name):
payload = {
'bucket_name': bucket_name,
'video_name': video_name
}
print(f"Invoke second lambda with payload: {payload}")
response = lambda_client.invoke(
FunctionName="embedding_data",
InvocationType='Event',
Payload=json.dumps(payload)
)
print(f"Start Embedding ....: {response}")
def main():
# Download video
video_name = os.path.basename(INPUT_VIDEO_KEY)
video_path = f'/tmp/{video_name}'
download_from_s3(S3_BUCKET, INPUT_VIDEO_KEY, video_path)
output_folder_name = os.path.splitext(video_name)[0]
output_folder = f'/tmp/{output_folder_name}'
extract_keyframes(video_path, output_folder)
csv_filename = f'map-keyframe-{output_folder_name}.csv'
csv_path = f'/tmp/{csv_filename}'
generate_map_csv(output_folder, csv_path, output_folder_name)
# Upload keyframes and csv to S3
for image in os.listdir(output_folder):
upload_to_s3(S3_BUCKET, os.path.join(output_folder, image), f'{OUTPUT_FOLDER_PREFIX}{output_folder_name}/{image}')
upload_to_s3(S3_BUCKET, csv_path, f'{OUTPUT_MAP_FOLDER}/{csv_filename}')
# Invoke second lambda
invoke_second_lambda(S3_BUCKET, output_folder_name)
if __name__ == "__main__":
main()
@echo "Building ECS Video"
docker build -t ecs-video -f Dockerfile .
@echo "Pushing ECS Video to ECR"
aws ecr get-login-password --region ap-southeast-1 | docker login --username AWS --password-stdin <aws account id>.dkr.ecr.ap-southeast-1.amazonaws.com
docker tag ecs-video:latest <aws account id>.dkr.ecr.ap-southeast-1.amazonaws.com/semantic-repo:ecs-video
docker push <aws account id>.dkr.ecr.ap-southeast-1.amazonaws.com/semantic-repo:ecs-video
@echo "Pushed ECS Video to ECR"
|-- code
| |-- inference_code.py
| `-- requirements.txt
`-- setup.py
inference_code.py
in the code
folderfrom sentence_transformers import SentenceTransformer
import json
import os
def model_fn(model_dir):
model_path = os.path.join(model_dir, "model")
model = SentenceTransformer(model_path)
return model
def input_fn(request_body, request_content_type):
if request_content_type == "application/json":
input_data = json.loads(request_body)
else:
return request_body
return input_data["text"]
def predict_fn(input_data, model):
embeddings = model.encode(input_data)
return embeddings
def output_fn(prediction, content_type):
if content_type == "application/json":
return json.dumps({"embeddings": prediction.tolist()})
else:
return "content type not supported"
requirements.txt
in the code
foldersentence_transformers
setup.py
in the root directory of the project.setup.py
, you need create a role for the sagemaker endpoint to access the S3 bucket sagemaker-local
from sentence_transformers import SentenceTransformer
import tarfile
from sagemaker.pytorch import PyTorchModel
import time
import os
# Wait for about 5 minutes
model = SentenceTransformer('clip-ViT-B-32')
model.save('model')
model_path = 'model/'
code_path = 'code/'
# Wait for about 5 minutes
zipped_model_path = os.path.join(model_path, "model.tar.gz")
with tarfile.open(zipped_model_path, "w:gz") as tar:
tar.add(model_path)
tar.add(code_path)
# Wait for about 20 minutes
endpoint_name = "clip-model-" + time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
model = PyTorchModel(
entry_point="inference_code.py",
model_data=zipped_model_path,
role="arn:aws:iam::<aws account id>:role/sagemaker-local",
framework_version="1.5",
py_version="py3",
)
predictor = model.deploy(
initial_instance_count=1, instance_type="ml.m5.xlarge", endpoint_name=endpoint_name
)
# Wait total of 30 minutes
setup.py
filepython setup.py
Wait for about 30 minutes for the endpoint to be created