Processing Data
1. Create ECS Cluster and Task Definition
locals {
container_name = "video-processing-container"
}
#################################
# ECS Cluster
#################################
resource "aws_ecs_cluster" "video_processing" {
name = "video-processing"
tags = var.common_tags
}
#################################
# ECS Task Definition
#################################
resource "aws_ecs_task_definition" "video_processing" {
family = "video-processing"
execution_role_arn = aws_iam_role.ecs_task_execution_role.arn
task_role_arn = aws_iam_role.ecs_task_role.arn
network_mode = "awsvpc"
requires_compatibilities = ["FARGATE"]
cpu = 2048
memory = 6144
depends_on = [
aws_ecs_cluster.video_processing
]
container_definitions = jsonencode([
{
name = local.container_name
image = var.ecs_video_image
cpu = 2048
memory = 6144
# Logging Configuration
logConfiguration = {
logDriver = "awslogs"
options = {
"awslogs-group" = "/ecs/video-processing"
"awslogs-region" = "ap-southeast-1"
"awslogs-stream-prefix" = "ecs"
"awslogs-create-group" = "true"
"max-buffer-size" = "25m"
"mode" = "non-blocking"
}
}
# Port Mapping Configuration
portMappings = [
{
containerPort = 80
hostPort = 80
protocol = "tcp"
},
{
containerPort = 443
hostPort = 443
protocol = "tcp"
}
]
essential = true
environment = [
{
name = "S3_BUCKET"
value = "your-bucket-name"
},
{
name = "INPUT_VIDEO_KEY"
value = "your-video-key"
}
]
}
])
}
2. Create Role for ECS Task and Role for Lambda
#################################
# Role for Lambda
#################################
data "aws_iam_policy_document" "lambda_assume_role" {
statement {
actions = ["sts:AssumeRole"]
principals {
type = "Service"
identifiers = ["lambda.amazonaws.com"]
}
}
}
resource "aws_iam_role" "lambda_role" {
name = "lambda_s3_ecs_role"
assume_role_policy = data.aws_iam_policy_document.lambda_assume_role.json
}
resource "aws_iam_policy" "lambda_policy" {
name = "lambda_s3_ecs_policy"
policy = jsonencode({
"Version" : "2012-10-17",
"Statement" : [
{
"Effect" : "Allow",
"Action" : [
"s3:GetObject",
"s3:ListBucket"
],
"Resource" : [
"arn:aws:s3:::${var.bucket_video_name}",
"arn:aws:s3:::${var.bucket_video_name}/*"
]
},
{
"Effect" : "Allow",
"Action" : [
"ecs:RunTask",
"ecs:DescribeTasks",
"ecs:DescribeTaskDefinition",
"iam:PassRole"
],
"Resource" : "*"
},
{
"Effect" : "Allow",
"Action" : [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource" : "*"
}
]
})
depends_on = [
aws_iam_role.lambda_role,
aws_s3_bucket.video
]
}
resource "aws_iam_role_policy_attachment" "lambda_role_attach" {
role = aws_iam_role.lambda_role.name
policy_arn = aws_iam_policy.lambda_policy.arn
depends_on = [
aws_iam_policy.lambda_policy
]
}
#################################
# ECS Task Execution Role
#################################
data "aws_iam_policy_document" "ecs_task_execution_assume_role" {
statement {
actions = ["sts:AssumeRole"]
principals {
type = "Service"
identifiers = ["ecs-tasks.amazonaws.com"]
}
}
}
resource "aws_iam_role" "ecs_task_execution_role" {
name = "ecs_task_execution_role"
assume_role_policy = data.aws_iam_policy_document.ecs_task_execution_assume_role.json
}
resource "aws_iam_policy" "ecs_task_execution_policy" {
name = "ecs_task_execution_policy"
policy = jsonencode({
"Version" : "2012-10-17",
"Statement" : [
{
"Effect" : "Allow",
"Action" : [
"ecr:GetDownloadUrlForLayer",
"ecr:BatchCheckLayerAvailability",
"ecr:GetDownloadUrlForLayer",
"ecr:BatchGetImage",
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents",
"ecr:GetAuthorizationToken"
],
"Resource" : "*"
}
]
})
depends_on = [
aws_iam_role.ecs_task_execution_role
]
}
resource "aws_iam_role_policy_attachment" "ecs_task_execution_role_attach" {
role = aws_iam_role.ecs_task_execution_role.name
policy_arn = aws_iam_policy.ecs_task_execution_policy.arn
depends_on = [
aws_iam_policy.ecs_task_execution_policy
]
}
#################################
# ECS Task Role
#################################
data "aws_iam_policy_document" "ecs_task_assume_role" {
statement {
actions = ["sts:AssumeRole"]
principals {
type = "Service"
identifiers = ["ecs-tasks.amazonaws.com"]
}
}
}
resource "aws_iam_role" "ecs_task_role" {
name = "ecs_task_role"
assume_role_policy = data.aws_iam_policy_document.ecs_task_assume_role.json
}
resource "aws_iam_policy" "ecs_task_policy" {
name = "ecs_task_policy"
depends_on = [
aws_iam_role.ecs_task_role,
aws_s3_bucket.video
]
policy = jsonencode({
"Version" : "2012-10-17",
"Statement" : [
{
"Effect" : "Allow",
"Action" : [
"s3:GetObject",
"s3:ListBucket",
"s3:PutObject"
],
"Resource" : [
"arn:aws:s3:::${var.bucket_video_name}",
"arn:aws:s3:::${var.bucket_video_name}/*"
]
},
{
"Effect" : "Allow",
"Action" : [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource" : "*"
}
]
})
}
resource "aws_iam_role_policy_attachment" "ecs_task_role_attach" {
role = aws_iam_role.ecs_task_role.name
policy_arn = aws_iam_policy.ecs_task_policy.arn
depends_on = [
aws_iam_policy.ecs_task_policy
]
}
# add invoke lambda policy to the role
resource "aws_iam_policy" "lambda_invoke_policy" {
name = "lambda_invoke_policy"
policy = jsonencode({
"Version" : "2012-10-17",
"Statement" : [
{
"Effect" : "Allow",
"Action" : [
"lambda:InvokeFunction"
],
"Resource" : "*"
}
]
})
depends_on = [
aws_iam_role.ecs_task_role
]
}
resource "aws_iam_role_policy_attachment" "lambda_invoke_role_attach" {
role = aws_iam_role.ecs_task_role.name
policy_arn = aws_iam_policy.lambda_invoke_policy.arn
depends_on = [
aws_iam_policy.lambda_invoke_policy
]
}
3. Create Lambda Function
#################################
# AWS Lambda Function
#################################
resource "aws_lambda_function" "processing_data" {
function_name = "processing_data"
role = aws_iam_role.lambda_role.arn
handler = "lambda_function.lambda_handler"
runtime = "python3.8"
filename = "${path.root}/src/processing-data/lambda/lambda_function.zip"
timeout = 60
memory_size = 128
environment {
variables = {
ECS_CLUSTER_NAME = aws_ecs_cluster.video_processing.name
ECS_TASK_DEFINITION = aws_ecs_task_definition.video_processing.family
SUBNET_ID = var.subnet_ids[0]
CONTAINER_NAME = local.container_name
SECURITY_GROUP_ID = var.security_group_id
}
}
depends_on = [
aws_iam_role.lambda_role,
aws_ecs_cluster.video_processing,
aws_ecs_task_definition.video_processing
]
}
#################################
# Lambda Permission to Allow S3 to Invoke
#################################
resource "aws_lambda_permission" "allow_s3_invocation" {
statement_id = "AllowS3InvokeLambda"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.processing_data.function_name
principal = "s3.amazonaws.com"
source_arn = aws_s3_bucket.video.arn
}
output "lambda_processing_data_arn" {
value = aws_lambda_function.processing_data.arn
}
4. Create S3 Bucket Folders for Video and Processed Video
#################################
# Video Bucket
#################################
resource "aws_s3_bucket" "video" {
bucket = var.bucket_video_name
tags = var.common_tags
}
#################################
# Create folder in S3 bucket
#################################
locals {
folder_names = ["video", "keyframes", "embeddings", "metadata"]
}
resource "aws_s3_bucket_object" "create_folders" {
for_each = { for idx, folder_name in local.folder_names : idx => folder_name }
bucket = aws_s3_bucket.video.id
key = "${each.value}/"
}
output "video_bucket_id" {
value = aws_s3_bucket.video.id
}
output "video_bucket_arn" {
value = aws_s3_bucket.video.arn
}
5. Code for Lambda Function
import json
import boto3
import os
ecs_client = boto3.client("ecs")
s3_client = boto3.client("s3")
CLUSTER_NAME = os.environ.get("ECS_CLUSTER_NAME")
TASK_DEFINITION = os.environ.get("ECS_TASK_DEFINITION")
SUBNET_ID = os.environ.get("SUBNET_ID")
SECURITY_GROUP_ID = os.environ.get("SECURITY_GROUP_ID")
CONTAINER_NAME = os.environ.get("CONTAINER_NAME")
def lambda_handler(event, context):
print(event)
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = event["Records"][0]["s3"]["object"]["key"]
print(f"Bucket: {bucket}")
print(f"Key: {key}")
response = ecs_client.run_task(
cluster=CLUSTER_NAME,
taskDefinition=TASK_DEFINITION,
launchType="FARGATE",
networkConfiguration={
"awsvpcConfiguration": {
"subnets": [SUBNET_ID],
"securityGroups": [SECURITY_GROUP_ID],
"assignPublicIp": "ENABLED",
}
},
overrides={
"containerOverrides": [
{
"name": CONTAINER_NAME,
"environment": [
{"name": "S3_BUCKET", "value": bucket},
{"name": "INPUT_VIDEO_KEY", "value": key},
{"name": "OUTPUT_FOLDER_PREFIX", "value": "keyframes/"},
],
}
]
},
)
print("ECS task response: ")
print(response)
return {"statusCode": 200, "body": "ECS Task Invoked Successfully"}