import os
import json
import numpy
from opensearchpy import OpenSearch, helpers , RequestsHttpConnection
class SemanticSearch:
def __init__(self, host, port, user, password, index_name):
self.client = OpenSearch(
hosts=[{"host": host, "port": port}],
http_auth=(user, password),
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
timeout = 120,
max_retries = 5,
retry_on_timeout = True
)
self.index_name = index_name
def delete_index(self):
try:
response = self.client.indices.delete(index=self.index_name)
print(f"Index '{self.index_name}' deleted successfully:", response)
except Exception as e:
print(f"Error deleting index '{self.index_name}':", e)
def create_index(self):
index_body = {
"settings": {"index": {"knn": True}},
"mappings": {
"properties": {
"image_embedding": {
"type": "knn_vector",
"dimension": 512,
"method": {
"name": "hnsw",
"engine": "faiss",
"space_type": "l2",
"parameters": {
"ef_construction": 256,
"m": 48,
},
},
},
"video": {"type": "text"},
"image": {"type": "integer"},
}
},
}
# Tạo index
response = self.client.indices.create(index=self.index_name, body=index_body)
print(f"Index created: {response}")
def create_map_embedding(self, embedding_dir):
files = os.listdir(embedding_dir)
map_embedding = []
for file in files:
embeddings = numpy.load(f"{embedding_dir}/{file}")
for i, embedding in enumerate(embeddings):
map_embedding.append(
{
"video": file,
"image": i + 1,
"image_embedding": embedding.tolist(),
}
)
# save to jsonl file
with open("map_embedding.jsonl", "w") as f:
for i, doc in enumerate(map_embedding):
f.write(
json.dumps(
{
"_op_type": "index",
"_index": self.index_name,
"_id": i + 1,
"_source": {
"image_embedding": doc["image_embedding"],
"image": doc["image"],
"video": doc["video"],
},
}
)
)
f.write("\n")
def bulk_add_documents(self, file_path):
actions = []
with open(file_path, "r") as f:
for line in f:
document = json.loads(line.strip())
actions.append(document)
if len(actions) >= 1000:
helpers.bulk(self.client, actions)
print("Batch added: " , len(actions))
actions = []
if actions:
helpers.bulk(self.client, actions)
# Sử dụng class SemanticSearch
host = "<your-opensearch-endpoint>"
port = 443
user = "<your-opensearch-user>"
password = "<your-opensearch-password>"
index_name = "semantic-index"
search_system = SemanticSearch(host, port, user, password, index_name)
search_system.delete_index()
search_system.create_index()
search_system.create_map_embedding("embeddings")
search_system.bulk_add_documents("map_embedding.jsonl")