Create Index and Indexing Data With Python SDK

1 Prepare Script to Create Index and Indexing Data

  1. Create a Python script to create an index and index data in the OpenSearch domain. The script will perform the following tasks:
    • Delete the index if it exists.
    • Create an index with KNN settings.
    • Create a map embedding from the embeddings directory.
    • Bulk add documents to the index from the map embedding jsonl file.
  • Before running the script, make sure to download the embeddings from the s3 bucket to the local machine. The embeddings are stored in the embeddings directory.
import os
import json
import numpy
from opensearchpy import OpenSearch, helpers , RequestsHttpConnection


class SemanticSearch:
    def __init__(self, host, port, user, password, index_name):
        self.client = OpenSearch(
            hosts=[{"host": host, "port": port}],
            http_auth=(user, password),
            use_ssl=True,
            verify_certs=True,
            connection_class=RequestsHttpConnection,
            timeout = 120,
            max_retries = 5,
            retry_on_timeout = True
        )
        self.index_name = index_name

    def delete_index(self):
        try:
            response = self.client.indices.delete(index=self.index_name)
            print(f"Index '{self.index_name}' deleted successfully:", response)
        except Exception as e:
            print(f"Error deleting index '{self.index_name}':", e)

    def create_index(self):
        index_body = {
            "settings": {"index": {"knn": True}},
            "mappings": {
                "properties": {
                    "image_embedding": {
                        "type": "knn_vector",
                        "dimension": 512,
                        "method": {
                            "name": "hnsw",
                            "engine": "faiss",
                            "space_type": "l2",
                            "parameters": {
                                "ef_construction": 256,
                                "m": 48,
                            },
                        },
                    },
                    "video": {"type": "text"},
                    "image": {"type": "integer"},
                }
            },
        }

        # Tạo index
        response = self.client.indices.create(index=self.index_name, body=index_body)
        print(f"Index created: {response}")

    def create_map_embedding(self, embedding_dir):
        files = os.listdir(embedding_dir)
        map_embedding = []
        for file in files:
            embeddings = numpy.load(f"{embedding_dir}/{file}")
            for i, embedding in enumerate(embeddings):
                map_embedding.append(
                    {
                        "video": file,
                        "image": i + 1,
                        "image_embedding": embedding.tolist(),
                    }
                )

        # save to jsonl file
        with open("map_embedding.jsonl", "w") as f:
            for i, doc in enumerate(map_embedding):
                f.write(
                    json.dumps(
                        {
                            "_op_type": "index",
                            "_index": self.index_name,
                            "_id": i + 1,
                            "_source": {
                                "image_embedding": doc["image_embedding"],
                                "image": doc["image"],
                                "video": doc["video"],
                            },
                        }
                    )
                )
                f.write("\n")

    def bulk_add_documents(self, file_path):
        actions = []
        with open(file_path, "r") as f:
            for line in f:
                document = json.loads(line.strip())
                actions.append(document)
                if len(actions) >= 1000:
                    helpers.bulk(self.client, actions)
                    print("Batch added: " , len(actions))
                    actions = []

        if actions:
            helpers.bulk(self.client, actions)



# Sử dụng class SemanticSearch
host = "<your-opensearch-endpoint>"
port = 443
user = "<your-opensearch-user>"
password = "<your-opensearch-password>"
index_name = "semantic-index"

search_system = SemanticSearch(host, port, user, password, index_name)
search_system.delete_index()
search_system.create_index()
search_system.create_map_embedding("embeddings")
search_system.bulk_add_documents("map_embedding.jsonl")
  1. Explain the code:
    • delete_index() : Xóa index nếu tồn tại.
    • create_index() : Tạo index với cấu hình KNN (dimension = 512, method = hnsw, engine = faiss, space_type = l2).
    • create_map_embedding() : Tạo map embedding từ các file embedding trong thư mục embeddings theo định dạng jsonl.
    • bulk_add_documents() : Thêm các documents vào index từ file jsonl theo batch size 1000.

Monitoring the Cluster

1. Check Cluster Health

  1. Navigate to OpenSearch by clicking on the Domain menu, click on the domain name semantic-search-domain.
  2. In the Overview tab, you can see the Cluster health status. Cluster Health
  • After indexing the data, we will see total dcouments is 105k Cluster Health