Vector Database

1. Create Role for lambda index, lambda query

#################################
# Lambda Query Role
#################################
data "aws_caller_identity" "current" {}


resource "aws_iam_role" "lambda_opensearch_sagemaker_role" {
  name = "LambdaOpenSearchSageMakerRoleQuery"

  assume_role_policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Effect = "Allow",
        Principal = {
          Service = "lambda.amazonaws.com"
        },
        Action = "sts:AssumeRole"
      }
    ]
  })
}

resource "aws_iam_policy" "opensearch_policy" {
  name        = "OpenSearchAccessPolicy"
  description = "Allow Lambda to access OpenSearch domains"

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Effect = "Allow",
        Action = [
          "es:ESHttpPost",
          "es:ESHttpGet",
          "es:ESHttpPut",
          "es:ESHttpDelete"
        ],
        # Resource = "arn:aws:es:ap-southeast-1:${data.aws_caller_identity.current.account_id}:domain/*"  
        # TODO
        Resource = "*"
      }
    ]
  })
}

resource "aws_iam_policy" "sagemaker_policy" {
  name        = "SageMakerInvokePolicy"
  description = "Allow Lambda to invoke SageMaker endpoint"

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Effect = "Allow",
        Action = [
          "sagemaker:InvokeEndpoint"
        ],
        # Resource = "arn:aws:sagemaker:ap-southeast-1:${data.aws_caller_identity.current.account_id}:endpoint/*" 
        # TODO
        Resource = "*"
      }
    ]
  })
}

resource "aws_iam_policy" "cloudwatch_logs_policy" {
  name        = "CloudWatchLogsPolicy"
  description = "Allow Lambda to write logs to CloudWatch"

  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Effect = "Allow",
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ],
        Resource = "arn:aws:logs:ap-southeast-1:${data.aws_caller_identity.current.account_id}:*"
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "attach_opensearch_policy" {
  role       = aws_iam_role.lambda_opensearch_sagemaker_role.name
  policy_arn = aws_iam_policy.opensearch_policy.arn
}

resource "aws_iam_role_policy_attachment" "attach_sagemaker_policy" {
  role       = aws_iam_role.lambda_opensearch_sagemaker_role.name
  policy_arn = aws_iam_policy.sagemaker_policy.arn
}

resource "aws_iam_role_policy_attachment" "attach_cloudwatch_logs_policy" {
  role       = aws_iam_role.lambda_opensearch_sagemaker_role.name
  policy_arn = aws_iam_policy.cloudwatch_logs_policy.arn
}


#################################
# Lambda Index Role
#################################
resource "aws_iam_role" "lambda_opensearch_sagemaker_role_index" {
  name = "LambdaOpenSearchSageMakerRoleQueryIndex"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "lambda.amazonaws.com"
        }
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_basic_execution_policy" {
  role       = aws_iam_role.lambda_opensearch_sagemaker_role_index.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}

resource "aws_iam_role_policy_attachment" "s3_read_only_policy" {
  role       = aws_iam_role.lambda_opensearch_sagemaker_role_index.name
  policy_arn = "arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
}

resource "aws_iam_role_policy_attachment" "opensearch_access_policy" {
  role       = aws_iam_role.lambda_opensearch_sagemaker_role_index.name
  policy_arn = "arn:aws:iam::aws:policy/AmazonOpenSearchServiceFullAccess" # Thay thế bằng chính sách tùy chỉnh nếu cần
}

2. Create Lambda Function for Indexing and Querying

#################################
# AWS Lambda Function Query
#################################
resource "aws_lambda_function" "query" {
  function_name = "query-vectordb"
  role          = aws_iam_role.lambda_opensearch_sagemaker_role.arn
  handler       = "lambda_function.lambda_handler"
  runtime       = "python3.8"
  filename      = "${path.root}/src/vectordb/lambda-query/lambda_function.zip"
  timeout       = 60
  memory_size   = 128

  environment {
    variables = {
      SAGEMAKER_ENDPOINT_NAME = var.sagemaker_endpoint_name
    }
  }

  depends_on = [
    aws_iam_role.lambda_opensearch_sagemaker_role,
  ]
}

output "lambda_function_name" {
  value = aws_lambda_function.query.function_name

}

output "lambda_invoke_arn" {
  value = aws_lambda_function.query.invoke_arn

}

#################################
# AWS Lambda Function Indexing
#################################
resource "aws_lambda_function" "index" {
  function_name = "index-vectordb"
  role          = aws_iam_role.lambda_opensearch_sagemaker_role_index.arn
  handler       = "lambda_function.lambda_handler"
  runtime       = "python3.8"
  filename      = "${path.root}/src/vectordb/lambda-index/lambda_function.zip"
  timeout       = 300
  memory_size   = 1024
  layers = [
    aws_lambda_layer_version.opensearch_numpy_layer.arn,
    # aws_lambda_layer_version.aws4auth_layer_zip.arn
  ]

  environment {
    variables = {
      OPENSEARCH_ENDPOINT = var.opensearch_domain_endpoint
      INDEX_NAME          = var.index_name
      USERNAME            = var.username
      PASSWORD            = var.password
    }
  }
}



resource "aws_s3_bucket_object" "opensearch_numpy_layer_zip" {
  bucket = var.bucket_name
  key    = "lambda-layer/lamda_layer.zip"
  source = "${path.root}/src/vectordb/lambda-index/lambda_layer.zip"
}

resource "aws_lambda_layer_version" "opensearch_numpy_layer" {
  layer_name          = "opensearch-numpy-layer"
  s3_bucket           = var.bucket_name
  s3_key              = aws_s3_bucket_object.opensearch_numpy_layer_zip.key
  compatible_runtimes = ["python3.8"]
  description         = "Lambda layer with numpy and opensearch-py"
}
  1. Create S3 notifications

#################################
# Lambda Permission to Allow S3 to Invoke
#################################
resource "aws_lambda_permission" "allow_s3_invocation_embeddings" {
  statement_id  = "AllowS3InvokeLambda"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.index.function_name
  principal     = "s3.amazonaws.com"
  source_arn    = var.bucket_arn
}


#################################
# Embeddings Bucket Notification
#################################

resource "aws_s3_bucket_notification" "embeddings_bucket_notification" {
  bucket = var.bucket_name

  lambda_function {
    lambda_function_arn = aws_lambda_function.index.arn
    events              = ["s3:ObjectCreated:*"]
    filter_suffix       = ".npy"
    filter_prefix       = "embeddings/"
  }

  lambda_function {
    lambda_function_arn = var.lambda_processing_data_arn
    events              = ["s3:ObjectCreated:*"]
    filter_suffix       = ".mp4"
    filter_prefix       = "video/"
  }

}
  1. Code for Lambda Function
  • Code for Lambda Index
import json
import boto3
import os
import numpy
from opensearchpy import OpenSearch, RequestsHttpConnection , helpers

OPENSEARCH_ENDPOINT = os.environ.get("OPENSEARCH_ENDPOINT")
OPENSEARCH_INDEX = os.environ.get("INDEX_NAME")
USERNAME = os.environ.get("USERNAME")
PASSWORD = os.environ.get("PASSWORD")

s3_client = boto3.client("s3")
client = OpenSearch(
    hosts=[OPENSEARCH_ENDPOINT],
    http_auth=(USERNAME, PASSWORD),
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection,
    timeout=120,
    max_retries=5,
    retry_on_timeout=True,
)

def create_index(self):
        index_body = {
            "settings": {"index": {"knn": True}},
            "mappings": {
                "properties": {
                    "image_embedding": {
                        "type": "knn_vector",
                        "dimension": 512,
                        "method": {
                            "name": "hnsw",
                            "engine": "faiss",
                            "space_type": "l2",
                            "parameters": {
                                "ef_construction": 256,
                                "m": 48,
                            },
                        },
                    },
                    "video": {"type": "text"},
                    "image": {"type": "integer"},
                }
            },
        }

        # Tạo index
        response = self.client.indices.create(index=self.index_name, body=index_body)
        print(f"Index created: {response}")


def format_video_name(video_name):
    return video_name.split("/")[-1].split(".")[0]


def format_image_name(image_name):
    # 2 => 002.jpg
    return image_name.split("/")[-1].split(".")[0].zfill(3)


def lambda_handler(event, context):
    print(event)

    bucket = event["Records"][0]["s3"]["bucket"]["name"]
    key = event["Records"][0]["s3"]["object"]["key"]

    # bucket = 'bucket-video-tftftftfttftf'
    # key = 'embeddings/L01_V001.npy'
    print(f"Bucket: {bucket}")
    print(f"Key: {key}")
    video_name = format_video_name(key)

    # Create index if not exists
    if not client.indices.exists(index=OPENSEARCH_INDEX):
        create_index(client)

    embeddings = s3_client.get_object(Bucket=bucket, Key=key)["Body"].read()
    embeddings = numpy.load(embeddings).tolist()
    print("Embeddings: ", embeddings[0])

    # Create mapping of embeddings to video names
    docs = []
    for i, embedding in enumerate(embeddings):
        doc = {
            "_op_type": "index",
            "_index": OPENSEARCH_INDEX,
            "_id": i + 1,
            "_source": {
                "image_embedding": embedding,
                "image": format_image_name(i),
                "video": video_name,
            },
        }
        docs.append(doc)
    print("Creating docs: OK")

    response = helpers.bulk(client, docs)

    print("Response: ", response)

    return {
        "statusCode": 200,
        "body": json.dumps("Indexing complete!"),
    }
  • Code for Lambda Query
#################################
# AWS Lambda Function Query
#################################
resource "aws_lambda_function" "query" {
  function_name = "query-vectordb"
  role          = aws_iam_role.lambda_opensearch_sagemaker_role.arn
  handler       = "lambda_function.lambda_handler"
  runtime       = "python3.8"
  filename      = "${path.root}/src/vectordb/lambda-query/lambda_function.zip"
  timeout       = 60
  memory_size   = 128

  environment {
    variables = {
      SAGEMAKER_ENDPOINT_NAME = var.sagemaker_endpoint_name
      OPENSEARCH_ENDPOINT = var.opensearch_domain_endpoint
      INDEX_NAME          = var.index_name
      USERNAME            = var.username
      PASSWORD            = var.password
    }
  }

  layers = [
    aws_lambda_layer_version.opensearch_numpy_layer.arn,
  ]

  depends_on = [
    aws_iam_role.lambda_opensearch_sagemaker_role,
  ]
}

output "lambda_function_name" {
  value = aws_lambda_function.query.function_name

}

output "lambda_invoke_arn" {
  value = aws_lambda_function.query.invoke_arn

}

#################################
# AWS Lambda Function Indexing
#################################
resource "aws_lambda_function" "index" {
  function_name = "index-vectordb"
  role          = aws_iam_role.lambda_opensearch_sagemaker_role_index.arn
  handler       = "lambda_function.lambda_handler"
  runtime       = "python3.8"
  filename      = "${path.root}/src/vectordb/lambda-index/lambda_function.zip"
  timeout       = 300
  memory_size   = 1024
  layers = [
    aws_lambda_layer_version.opensearch_numpy_layer.arn,
  ]

  environment {
    variables = {
      OPENSEARCH_ENDPOINT = var.opensearch_domain_endpoint
      INDEX_NAME          = var.index_name
      USERNAME            = var.username
      PASSWORD            = var.password
    }
  }
}



resource "aws_s3_bucket_object" "opensearch_numpy_layer_zip" {
  bucket = var.bucket_name
  key    = "lambda-layer/lamda_layer.zip"
  source = "${path.root}/src/vectordb/lambda-index/lambda_layer.zip"
}

resource "aws_lambda_layer_version" "opensearch_numpy_layer" {
  layer_name          = "opensearch-numpy-layer"
  s3_bucket           = var.bucket_name
  s3_key              = aws_s3_bucket_object.opensearch_numpy_layer_zip.key
  compatible_runtimes = ["python3.8"]
  description         = "Lambda layer with numpy and opensearch-py"
}
  1. Create Lambda Layers
cd src/vectordb/lambda-index
pip install -r requirements.txt -t python
zip -r lambda_layer.zip python