Vector Database
1. Create Role for lambda index, lambda query
#################################
# Lambda Query Role
#################################
data "aws_caller_identity" "current" {}
resource "aws_iam_role" "lambda_opensearch_sagemaker_role" {
name = "LambdaOpenSearchSageMakerRoleQuery"
assume_role_policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Principal = {
Service = "lambda.amazonaws.com"
},
Action = "sts:AssumeRole"
}
]
})
}
resource "aws_iam_policy" "opensearch_policy" {
name = "OpenSearchAccessPolicy"
description = "Allow Lambda to access OpenSearch domains"
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Action = [
"es:ESHttpPost",
"es:ESHttpGet",
"es:ESHttpPut",
"es:ESHttpDelete"
],
# Resource = "arn:aws:es:ap-southeast-1:${data.aws_caller_identity.current.account_id}:domain/*"
# TODO
Resource = "*"
}
]
})
}
resource "aws_iam_policy" "sagemaker_policy" {
name = "SageMakerInvokePolicy"
description = "Allow Lambda to invoke SageMaker endpoint"
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Action = [
"sagemaker:InvokeEndpoint"
],
# Resource = "arn:aws:sagemaker:ap-southeast-1:${data.aws_caller_identity.current.account_id}:endpoint/*"
# TODO
Resource = "*"
}
]
})
}
resource "aws_iam_policy" "cloudwatch_logs_policy" {
name = "CloudWatchLogsPolicy"
description = "Allow Lambda to write logs to CloudWatch"
policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Effect = "Allow",
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
Resource = "arn:aws:logs:ap-southeast-1:${data.aws_caller_identity.current.account_id}:*"
}
]
})
}
resource "aws_iam_role_policy_attachment" "attach_opensearch_policy" {
role = aws_iam_role.lambda_opensearch_sagemaker_role.name
policy_arn = aws_iam_policy.opensearch_policy.arn
}
resource "aws_iam_role_policy_attachment" "attach_sagemaker_policy" {
role = aws_iam_role.lambda_opensearch_sagemaker_role.name
policy_arn = aws_iam_policy.sagemaker_policy.arn
}
resource "aws_iam_role_policy_attachment" "attach_cloudwatch_logs_policy" {
role = aws_iam_role.lambda_opensearch_sagemaker_role.name
policy_arn = aws_iam_policy.cloudwatch_logs_policy.arn
}
#################################
# Lambda Index Role
#################################
resource "aws_iam_role" "lambda_opensearch_sagemaker_role_index" {
name = "LambdaOpenSearchSageMakerRoleQueryIndex"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "lambda.amazonaws.com"
}
}
]
})
}
resource "aws_iam_role_policy_attachment" "lambda_basic_execution_policy" {
role = aws_iam_role.lambda_opensearch_sagemaker_role_index.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
}
resource "aws_iam_role_policy_attachment" "s3_read_only_policy" {
role = aws_iam_role.lambda_opensearch_sagemaker_role_index.name
policy_arn = "arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
}
resource "aws_iam_role_policy_attachment" "opensearch_access_policy" {
role = aws_iam_role.lambda_opensearch_sagemaker_role_index.name
policy_arn = "arn:aws:iam::aws:policy/AmazonOpenSearchServiceFullAccess" # Thay thế bằng chính sách tùy chỉnh nếu cần
}
2. Create Lambda Function for Indexing and Querying
#################################
# AWS Lambda Function Query
#################################
resource "aws_lambda_function" "query" {
function_name = "query-vectordb"
role = aws_iam_role.lambda_opensearch_sagemaker_role.arn
handler = "lambda_function.lambda_handler"
runtime = "python3.8"
filename = "${path.root}/src/vectordb/lambda-query/lambda_function.zip"
timeout = 60
memory_size = 128
environment {
variables = {
SAGEMAKER_ENDPOINT_NAME = var.sagemaker_endpoint_name
}
}
depends_on = [
aws_iam_role.lambda_opensearch_sagemaker_role,
]
}
output "lambda_function_name" {
value = aws_lambda_function.query.function_name
}
output "lambda_invoke_arn" {
value = aws_lambda_function.query.invoke_arn
}
#################################
# AWS Lambda Function Indexing
#################################
resource "aws_lambda_function" "index" {
function_name = "index-vectordb"
role = aws_iam_role.lambda_opensearch_sagemaker_role_index.arn
handler = "lambda_function.lambda_handler"
runtime = "python3.8"
filename = "${path.root}/src/vectordb/lambda-index/lambda_function.zip"
timeout = 300
memory_size = 1024
layers = [
aws_lambda_layer_version.opensearch_numpy_layer.arn,
# aws_lambda_layer_version.aws4auth_layer_zip.arn
]
environment {
variables = {
OPENSEARCH_ENDPOINT = var.opensearch_domain_endpoint
INDEX_NAME = var.index_name
USERNAME = var.username
PASSWORD = var.password
}
}
}
resource "aws_s3_bucket_object" "opensearch_numpy_layer_zip" {
bucket = var.bucket_name
key = "lambda-layer/lamda_layer.zip"
source = "${path.root}/src/vectordb/lambda-index/lambda_layer.zip"
}
resource "aws_lambda_layer_version" "opensearch_numpy_layer" {
layer_name = "opensearch-numpy-layer"
s3_bucket = var.bucket_name
s3_key = aws_s3_bucket_object.opensearch_numpy_layer_zip.key
compatible_runtimes = ["python3.8"]
description = "Lambda layer with numpy and opensearch-py"
}
- Create S3 notifications
#################################
# Lambda Permission to Allow S3 to Invoke
#################################
resource "aws_lambda_permission" "allow_s3_invocation_embeddings" {
statement_id = "AllowS3InvokeLambda"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.index.function_name
principal = "s3.amazonaws.com"
source_arn = var.bucket_arn
}
#################################
# Embeddings Bucket Notification
#################################
resource "aws_s3_bucket_notification" "embeddings_bucket_notification" {
bucket = var.bucket_name
lambda_function {
lambda_function_arn = aws_lambda_function.index.arn
events = ["s3:ObjectCreated:*"]
filter_suffix = ".npy"
filter_prefix = "embeddings/"
}
lambda_function {
lambda_function_arn = var.lambda_processing_data_arn
events = ["s3:ObjectCreated:*"]
filter_suffix = ".mp4"
filter_prefix = "video/"
}
}
- Code for Lambda Function
import json
import boto3
import os
import numpy
from opensearchpy import OpenSearch, RequestsHttpConnection , helpers
OPENSEARCH_ENDPOINT = os.environ.get("OPENSEARCH_ENDPOINT")
OPENSEARCH_INDEX = os.environ.get("INDEX_NAME")
USERNAME = os.environ.get("USERNAME")
PASSWORD = os.environ.get("PASSWORD")
s3_client = boto3.client("s3")
client = OpenSearch(
hosts=[OPENSEARCH_ENDPOINT],
http_auth=(USERNAME, PASSWORD),
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
timeout=120,
max_retries=5,
retry_on_timeout=True,
)
def create_index(self):
index_body = {
"settings": {"index": {"knn": True}},
"mappings": {
"properties": {
"image_embedding": {
"type": "knn_vector",
"dimension": 512,
"method": {
"name": "hnsw",
"engine": "faiss",
"space_type": "l2",
"parameters": {
"ef_construction": 256,
"m": 48,
},
},
},
"video": {"type": "text"},
"image": {"type": "integer"},
}
},
}
# Tạo index
response = self.client.indices.create(index=self.index_name, body=index_body)
print(f"Index created: {response}")
def format_video_name(video_name):
return video_name.split("/")[-1].split(".")[0]
def format_image_name(image_name):
# 2 => 002.jpg
return image_name.split("/")[-1].split(".")[0].zfill(3)
def lambda_handler(event, context):
print(event)
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = event["Records"][0]["s3"]["object"]["key"]
# bucket = 'bucket-video-tftftftfttftf'
# key = 'embeddings/L01_V001.npy'
print(f"Bucket: {bucket}")
print(f"Key: {key}")
video_name = format_video_name(key)
# Create index if not exists
if not client.indices.exists(index=OPENSEARCH_INDEX):
create_index(client)
embeddings = s3_client.get_object(Bucket=bucket, Key=key)["Body"].read()
embeddings = numpy.load(embeddings).tolist()
print("Embeddings: ", embeddings[0])
# Create mapping of embeddings to video names
docs = []
for i, embedding in enumerate(embeddings):
doc = {
"_op_type": "index",
"_index": OPENSEARCH_INDEX,
"_id": i + 1,
"_source": {
"image_embedding": embedding,
"image": format_image_name(i),
"video": video_name,
},
}
docs.append(doc)
print("Creating docs: OK")
response = helpers.bulk(client, docs)
print("Response: ", response)
return {
"statusCode": 200,
"body": json.dumps("Indexing complete!"),
}
#################################
# AWS Lambda Function Query
#################################
resource "aws_lambda_function" "query" {
function_name = "query-vectordb"
role = aws_iam_role.lambda_opensearch_sagemaker_role.arn
handler = "lambda_function.lambda_handler"
runtime = "python3.8"
filename = "${path.root}/src/vectordb/lambda-query/lambda_function.zip"
timeout = 60
memory_size = 128
environment {
variables = {
SAGEMAKER_ENDPOINT_NAME = var.sagemaker_endpoint_name
OPENSEARCH_ENDPOINT = var.opensearch_domain_endpoint
INDEX_NAME = var.index_name
USERNAME = var.username
PASSWORD = var.password
}
}
layers = [
aws_lambda_layer_version.opensearch_numpy_layer.arn,
]
depends_on = [
aws_iam_role.lambda_opensearch_sagemaker_role,
]
}
output "lambda_function_name" {
value = aws_lambda_function.query.function_name
}
output "lambda_invoke_arn" {
value = aws_lambda_function.query.invoke_arn
}
#################################
# AWS Lambda Function Indexing
#################################
resource "aws_lambda_function" "index" {
function_name = "index-vectordb"
role = aws_iam_role.lambda_opensearch_sagemaker_role_index.arn
handler = "lambda_function.lambda_handler"
runtime = "python3.8"
filename = "${path.root}/src/vectordb/lambda-index/lambda_function.zip"
timeout = 300
memory_size = 1024
layers = [
aws_lambda_layer_version.opensearch_numpy_layer.arn,
]
environment {
variables = {
OPENSEARCH_ENDPOINT = var.opensearch_domain_endpoint
INDEX_NAME = var.index_name
USERNAME = var.username
PASSWORD = var.password
}
}
}
resource "aws_s3_bucket_object" "opensearch_numpy_layer_zip" {
bucket = var.bucket_name
key = "lambda-layer/lamda_layer.zip"
source = "${path.root}/src/vectordb/lambda-index/lambda_layer.zip"
}
resource "aws_lambda_layer_version" "opensearch_numpy_layer" {
layer_name = "opensearch-numpy-layer"
s3_bucket = var.bucket_name
s3_key = aws_s3_bucket_object.opensearch_numpy_layer_zip.key
compatible_runtimes = ["python3.8"]
description = "Lambda layer with numpy and opensearch-py"
}
- Create Lambda Layers
cd src/vectordb/lambda-index
pip install -r requirements.txt -t python
zip -r lambda_layer.zip python