Processing Data

1. Create ECS Cluster and Task Definition

locals {
  container_name = "video-processing-container"
}

#################################
# ECS Cluster
#################################
resource "aws_ecs_cluster" "video_processing" {
  name = "video-processing"
  tags = var.common_tags
}


#################################
# ECS Task Definition
#################################
resource "aws_ecs_task_definition" "video_processing" {
  family                   = "video-processing"
  execution_role_arn       = aws_iam_role.ecs_task_execution_role.arn
  task_role_arn            = aws_iam_role.ecs_task_role.arn
  network_mode             = "awsvpc"
  requires_compatibilities = ["FARGATE"]
  cpu                      = 2048
  memory                   = 6144




  depends_on = [
    aws_ecs_cluster.video_processing
  ]

  container_definitions = jsonencode([
    {
      name   = local.container_name
      image  = var.ecs_video_image
      cpu    = 2048
      memory = 6144

      # Logging Configuration
      logConfiguration = {
        logDriver = "awslogs"
        options = {
          "awslogs-group"         = "/ecs/video-processing"
          "awslogs-region"        = "ap-southeast-1"
          "awslogs-stream-prefix" = "ecs"
          "awslogs-create-group"  = "true"
          "max-buffer-size"       = "25m"
          "mode"                  = "non-blocking"
        }
      }

      # Port Mapping Configuration
      portMappings = [
        {
          containerPort = 80 
          hostPort      = 80 
          protocol      = "tcp"
        },
        {
          containerPort = 443 
          hostPort      = 443 
          protocol      = "tcp"
        }
      ]


      essential = true
      environment = [
        {
          name  = "S3_BUCKET"
          value = "your-bucket-name"
        },
        {
          name  = "INPUT_VIDEO_KEY"
          value = "your-video-key"
        }
      ]
    }
  ])
}

2. Create Role for ECS Task and Role for Lambda



#################################
# Role for Lambda
#################################
data "aws_iam_policy_document" "lambda_assume_role" {
  statement {
    actions = ["sts:AssumeRole"]
    principals {
      type        = "Service"
      identifiers = ["lambda.amazonaws.com"]
    }
  }
}

resource "aws_iam_role" "lambda_role" {
  name               = "lambda_s3_ecs_role"
  assume_role_policy = data.aws_iam_policy_document.lambda_assume_role.json
}

resource "aws_iam_policy" "lambda_policy" {
  name = "lambda_s3_ecs_policy"
  policy = jsonencode({
    "Version" : "2012-10-17",
    "Statement" : [
      {
        "Effect" : "Allow",
        "Action" : [
          "s3:GetObject",
          "s3:ListBucket"
        ],
        "Resource" : [
          "arn:aws:s3:::${var.bucket_video_name}",
          "arn:aws:s3:::${var.bucket_video_name}/*"
        ]
      },
      {
        "Effect" : "Allow",
        "Action" : [
          "ecs:RunTask",
          "ecs:DescribeTasks",
          "ecs:DescribeTaskDefinition",
          "iam:PassRole"
        ],
        "Resource" : "*"
      },
      {
        "Effect" : "Allow",
        "Action" : [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ],
        "Resource" : "*"
      }
    ]
  })

  depends_on = [
    aws_iam_role.lambda_role,
    aws_s3_bucket.video
  ]
}

resource "aws_iam_role_policy_attachment" "lambda_role_attach" {
  role       = aws_iam_role.lambda_role.name
  policy_arn = aws_iam_policy.lambda_policy.arn
  depends_on = [
    aws_iam_policy.lambda_policy
  ]
}


#################################
# ECS Task Execution Role
#################################
data "aws_iam_policy_document" "ecs_task_execution_assume_role" {
  statement {
    actions = ["sts:AssumeRole"]
    principals {
      type        = "Service"
      identifiers = ["ecs-tasks.amazonaws.com"]
    }
  }
}

resource "aws_iam_role" "ecs_task_execution_role" {
  name               = "ecs_task_execution_role"
  assume_role_policy = data.aws_iam_policy_document.ecs_task_execution_assume_role.json
}

resource "aws_iam_policy" "ecs_task_execution_policy" {
  name = "ecs_task_execution_policy"
  policy = jsonencode({
    "Version" : "2012-10-17",
    "Statement" : [
      {
        "Effect" : "Allow",
        "Action" : [
          "ecr:GetDownloadUrlForLayer",
          "ecr:BatchCheckLayerAvailability",
          "ecr:GetDownloadUrlForLayer",
          "ecr:BatchGetImage",
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents",
          "ecr:GetAuthorizationToken"
        ],
        "Resource" : "*"
      }
    ]
  })

  depends_on = [
    aws_iam_role.ecs_task_execution_role
  ]
}

resource "aws_iam_role_policy_attachment" "ecs_task_execution_role_attach" {
  role       = aws_iam_role.ecs_task_execution_role.name
  policy_arn = aws_iam_policy.ecs_task_execution_policy.arn

  depends_on = [
    aws_iam_policy.ecs_task_execution_policy
  ]
}


#################################
# ECS Task Role
#################################
data "aws_iam_policy_document" "ecs_task_assume_role" {
  statement {
    actions = ["sts:AssumeRole"]
    principals {
      type        = "Service"
      identifiers = ["ecs-tasks.amazonaws.com"]
    }
  }
}

resource "aws_iam_role" "ecs_task_role" {
  name               = "ecs_task_role"
  assume_role_policy = data.aws_iam_policy_document.ecs_task_assume_role.json
}

resource "aws_iam_policy" "ecs_task_policy" {
  name = "ecs_task_policy"

  depends_on = [
    aws_iam_role.ecs_task_role,
    aws_s3_bucket.video
  ]

  policy = jsonencode({
    "Version" : "2012-10-17",
    "Statement" : [
      {
        "Effect" : "Allow",
        "Action" : [
          "s3:GetObject",
          "s3:ListBucket",
          "s3:PutObject"
        ],
        "Resource" : [
          "arn:aws:s3:::${var.bucket_video_name}",
          "arn:aws:s3:::${var.bucket_video_name}/*"
        ]
      },
      {
        "Effect" : "Allow",
        "Action" : [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ],
        "Resource" : "*"
      }
    ]
  })


}

resource "aws_iam_role_policy_attachment" "ecs_task_role_attach" {
  role       = aws_iam_role.ecs_task_role.name
  policy_arn = aws_iam_policy.ecs_task_policy.arn

  depends_on = [
    aws_iam_policy.ecs_task_policy
  ]
}

# add invoke lambda policy to the role
resource "aws_iam_policy" "lambda_invoke_policy" {
  name = "lambda_invoke_policy"
  policy = jsonencode({
    "Version" : "2012-10-17",
    "Statement" : [
      {
        "Effect" : "Allow",
        "Action" : [
          "lambda:InvokeFunction"
        ],
        "Resource" : "*"
      }
    ]
  })

  depends_on = [
    aws_iam_role.ecs_task_role
  ]
}

resource "aws_iam_role_policy_attachment" "lambda_invoke_role_attach" {
  role       = aws_iam_role.ecs_task_role.name
  policy_arn = aws_iam_policy.lambda_invoke_policy.arn

  depends_on = [
    aws_iam_policy.lambda_invoke_policy
  ]
}

3. Create Lambda Function

#################################
# AWS Lambda Function
#################################
resource "aws_lambda_function" "processing_data" {
  function_name = "processing_data"
  role          = aws_iam_role.lambda_role.arn
  handler       = "lambda_function.lambda_handler"
  runtime       = "python3.8"
  filename      = "${path.root}/src/processing-data/lambda/lambda_function.zip"
  timeout       = 60
  memory_size   = 128

  environment {
    variables = {
      ECS_CLUSTER_NAME = aws_ecs_cluster.video_processing.name
      ECS_TASK_DEFINITION = aws_ecs_task_definition.video_processing.family
      SUBNET_ID = var.subnet_ids[0]
      CONTAINER_NAME = local.container_name
      SECURITY_GROUP_ID = var.security_group_id
    }
  }

  depends_on = [
    aws_iam_role.lambda_role,
    aws_ecs_cluster.video_processing,
    aws_ecs_task_definition.video_processing
  ]
}

#################################
# Lambda Permission to Allow S3 to Invoke
#################################
resource "aws_lambda_permission" "allow_s3_invocation" {
  statement_id  = "AllowS3InvokeLambda"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.processing_data.function_name
  principal     = "s3.amazonaws.com"
  source_arn    = aws_s3_bucket.video.arn
}


output "lambda_processing_data_arn" {
  value = aws_lambda_function.processing_data.arn
  
}

4. Create S3 Bucket Folders for Video and Processed Video

#################################
# Video Bucket
#################################
resource "aws_s3_bucket" "video" {
  bucket = var.bucket_video_name
  tags   = var.common_tags
}


#################################
# Create folder in S3 bucket
#################################
locals {
  folder_names = ["video", "keyframes", "embeddings", "metadata"]
}

resource "aws_s3_bucket_object" "create_folders" {
  for_each = { for idx, folder_name in local.folder_names : idx => folder_name }

  bucket = aws_s3_bucket.video.id
  key    = "${each.value}/"
}

output "video_bucket_id" {
  value = aws_s3_bucket.video.id
  
}

output "video_bucket_arn" {
  value = aws_s3_bucket.video.arn
  
}

5. Code for Lambda Function

import json
import boto3
import os

ecs_client = boto3.client("ecs")
s3_client = boto3.client("s3")

CLUSTER_NAME = os.environ.get("ECS_CLUSTER_NAME")
TASK_DEFINITION = os.environ.get("ECS_TASK_DEFINITION")
SUBNET_ID = os.environ.get("SUBNET_ID")
SECURITY_GROUP_ID = os.environ.get("SECURITY_GROUP_ID")
CONTAINER_NAME = os.environ.get("CONTAINER_NAME")


def lambda_handler(event, context):
    print(event)

    bucket = event["Records"][0]["s3"]["bucket"]["name"]
    key = event["Records"][0]["s3"]["object"]["key"]

    print(f"Bucket: {bucket}")
    print(f"Key: {key}")

    response = ecs_client.run_task(
        cluster=CLUSTER_NAME,
        taskDefinition=TASK_DEFINITION,
        launchType="FARGATE",
        networkConfiguration={
            "awsvpcConfiguration": {
                "subnets": [SUBNET_ID],
                "securityGroups": [SECURITY_GROUP_ID],
                "assignPublicIp": "ENABLED",
            }
        },
        overrides={
            "containerOverrides": [
                {
                    "name": CONTAINER_NAME,
                    "environment": [
                        {"name": "S3_BUCKET", "value": bucket},
                        {"name": "INPUT_VIDEO_KEY", "value": key},
                        {"name": "OUTPUT_FOLDER_PREFIX", "value": "keyframes/"},
                    ],
                }
            ]
        },
    )

    print("ECS task response: ")
    print(response)

    return {"statusCode": 200, "body": "ECS Task Invoked Successfully"}