向量数据库索引技术详解

向量数据库索引技术详解

为什么需要索引?🤔

在向量数据库中,如果没有索引,查询时需要遍历所有向量计算相似度(暴力搜索),这在大规模数据集上是不可接受的。索引的目的是:

  1. 加速检索

    • 减少比较次数
    • 提高查询效率
    • 降低资源消耗
  2. 支持规模化

    • 处理海量数据
    • 维持稳定性能
    • 优化内存使用

主流索引类型 📚

1. 树形索引

KD-Tree

from sklearn.neighbors import KDTree
import numpy as np

# 创建示例数据
X = np.random.random((1000, 128))  # 1000个128维向量

# 构建KD树
tree = KDTree(X, leaf_size=40)

# 查询最近邻
query_point = np.random.random(128)
dist, ind = tree.query([query_point], k=5)  # 查找最近的5个点

优点:

  • 构建简单
  • 内存友好
  • 适合低维

缺点:

  • 维度灾难
  • 不适合高维
  • 更新成本高

2. 哈希索引

LSH (局部敏感哈希)

import numpy as np
from datasketch import MinHashLSH

# 创建 LSH 索引
lsh = MinHashLSH(threshold=0.8, num_perm=128)

# 添加数据
for i, vec in enumerate(vectors):
    lsh.insert(f"id_{i}", vec)

# 查询相似项
results = lsh.query(query_vector)

优点:

  • 速度快
  • 内存效率高
  • 支持流式更新

缺点:

  • 精度损失
  • 需要多表
  • 参数敏感

3. 量化索引

PQ (乘积量化)

import faiss
import numpy as np

# 准备数据
d = 128  # 向量维度
nb = 10000  # 数据量
np.random.seed(1234)
xb = np.random.random((nb, d)).astype('float32')

# 创建 PQ 索引
nlist = 100  # 聚类中心数量
m = 8  # 子空间数量
quantizer = faiss.IndexFlatL2(d)
index = faiss.IndexIVFPQ(quantizer, d, nlist, m, 8)

# 训练索引
index.train(xb)
index.add(xb)

# 搜索
k = 4  # 返回最近的4个结果
D, I = index.search(xb[:5], k)

优点:

  • 压缩效果好
  • 检索快速
  • 内存占用小

缺点:

  • 精度损失
  • 训练复杂
  • 更新困难

4. 图索引

HNSW (分层可导航小世界图)

from hnswlib import Index

# 初始化索引
dim = 128  # 向量维度
num_elements = 10000  # 数据量

# 创建索引
index = Index(space='l2', dim=dim)
index.init_index(
    max_elements=num_elements,
    ef_construction=200,
    M=16
)

# 添加数据
index.add_items(data)

# 搜索
labels, distances = index.knn_query(query_data, k=5)

优点:

  • 性能最优
  • 内存友好
  • 支持增量

缺点:

  • 内存消耗大
  • 参数复杂
  • 删除困难

索引构建策略 🔨

1. 数据预处理

def preprocess_vectors(vectors, normalize=True, remove_outliers=True):
    """向量预处理"""
    if normalize:
        # L2 归一化
        norms = np.linalg.norm(vectors, axis=1)
        vectors = vectors / norms[:, np.newaxis]
    
    if remove_outliers:
        # IQR方法去除异常值
        q1 = np.percentile(vectors, 25, axis=0)
        q3 = np.percentile(vectors, 75, axis=0)
        iqr = q3 - q1
        mask = ~((vectors < (q1 - 1.5 * iqr)) | 
                 (vectors > (q3 + 1.5 * iqr))).any(axis=1)
        vectors = vectors[mask]
    
    return vectors

2. 分批构建

def build_index_batches(vectors, batch_size=1000):
    """分批构建索引"""
    index = faiss.IndexFlatL2(vectors.shape[1])
    
    for i in range(0, len(vectors), batch_size):
        batch = vectors[i:i + batch_size]
        index.add(batch)
        print(f"已添加 {i + len(batch)} 个向量")
    
    return index

3. 并行训练

def train_index_parallel(vectors, n_threads=4):
    """并行训练索引"""
    # 设置线程数
    faiss.omp_set_num_threads(n_threads)
    
    # 创建和训练索引
    nlist = 100
    quantizer = faiss.IndexFlatL2(vectors.shape[1])
    index = faiss.IndexIVFFlat(quantizer, vectors.shape[1], nlist)
    
    index.train(vectors)
    index.add(vectors)
    
    return index

索引更新机制 🔄

1. 增量更新

class IncrementalIndex:
    def __init__(self, dim):
        self.main_index = faiss.IndexFlatL2(dim)
        self.buffer_index = faiss.IndexFlatL2(dim)
        self.buffer_size = 1000
        
    def add(self, vectors):
        """添加新向量"""
        if self.buffer_index.ntotal + len(vectors) >= self.buffer_size:
            # 合并到主索引
            self._merge_indexes()
        self.buffer_index.add(vectors)
    
    def _merge_indexes(self):
        """合并索引"""
        if self.buffer_index.ntotal > 0:
            faiss.normalize_L2(self.buffer_index)
            self.main_index.add(
                faiss.vector_to_array(self.buffer_index)
            )
            self.buffer_index.reset()
    
    def search(self, query, k):
        """搜索"""
        # 在两个索引中分别搜索
        D1, I1 = self.main_index.search(query, k)
        D2, I2 = self.buffer_index.search(query, k)
        
        # 合并结果
        D = np.concatenate([D1, D2], axis=1)
        I = np.concatenate([I1, I2], axis=1)
        
        # 取最好的 k 个结果
        idx = np.argsort(D, axis=1)[:, :k]
        D = np.take_along_axis(D, idx, axis=1)
        I = np.take_along_axis(I, idx, axis=1)
        
        return D, I

2. 删除处理

class DeletableIndex:
    def __init__(self, dim):
        self.index = faiss.IndexFlatL2(dim)
        self.id_map = {}  # 映射原始ID到索引位置
        self.deleted = set()  # 已删除的ID集合
    
    def add(self, vectors, ids):
        """添加向量"""
        for i, id in enumerate(ids):
            if id not in self.deleted:
                pos = self.index.ntotal
                self.index.add(vectors[i:i+1])
                self.id_map[id] = pos
    
    def delete(self, ids):
        """标记删除"""
        self.deleted.update(ids)
    
    def search(self, query, k):
        """搜索(排除已删除的结果)"""
        D, I = self.index.search(query, k + len(self.deleted))
        
        # 过滤已删除的结果
        valid_results = []
        for distances, indices in zip(D, I):
            valid = [(d, i) for d, i in zip(distances, indices)
                    if i not in self.deleted][:k]
            valid_results.append(valid)
        
        return np.array(valid_results)
    
    def rebuild(self):
        """重建索引(清理已删除的向量)"""
        valid_vectors = []
        valid_ids = []
        
        for id, pos in self.id_map.items():
            if id not in self.deleted:
                vector = faiss.vector_to_array(
                    self.index.reconstruct(pos)
                )
                valid_vectors.append(vector)
                valid_ids.append(id)
        
        # 重新创建索引
        self.index = faiss.IndexFlatL2(self.index.d)
        self.id_map = {}
        self.deleted = set()
        
        # 添加有效向量
        self.add(np.array(valid_vectors), valid_ids)

维护优化 🛠️

1. 性能监控

class IndexMonitor:
    def __init__(self, index):
        self.index = index
        self.metrics = {
            'query_times': [],
            'memory_usage': [],
            'accuracy': []
        }
    
    def benchmark_query(self, queries, ground_truth, k=10):
        """性能测试"""
        start_time = time.time()
        results = self.index.search(queries, k)
        query_time = time.time() - start_time
        
        # 计算准确率
        accuracy = self._calculate_accuracy(results, ground_truth)
        
        # 记录指标
        self.metrics['query_times'].append(query_time)
        self.metrics['accuracy'].append(accuracy)
        
        return {
            'query_time': query_time,
            'accuracy': accuracy,
            'qps': len(queries) / query_time
        }
    
    def _calculate_accuracy(self, results, ground_truth):
        """计算准确率"""
        correct = 0
        total = len(results) * len(results[0])
        
        for pred, true in zip(results, ground_truth):
            correct += len(set(pred) & set(true))
        
        return correct / total

2. 自动调优

def auto_tune_index(index, train_data, val_data, param_grid):
    """自动调优索引参数"""
    best_score = 0
    best_params = None
    
    for params in param_grid:
        # 使用当前参数配置
        index.set_params(**params)
        
        # 训练索引
        index.train(train_data)
        
        # 评估性能
        score = evaluate_index(index, val_data)
        
        if score > best_score:
            best_score = score
            best_params = params
    
    # 使用最佳参数
    index.set_params(**best_params)
    return best_params, best_score

3. 定期维护

class IndexMaintenance:
    def __init__(self, index):
        self.index = index
        self.last_maintenance = time.time()
        self.maintenance_interval = 86400  # 24小时
    
    def need_maintenance(self):
        """检查是否需要维护"""
        return (time.time() - self.last_maintenance) > self.maintenance_interval
    
    def perform_maintenance(self):
        """执行维护任务"""
        if self.need_maintenance():
            # 1. 备份索引
            self._backup_index()
            
            # 2. 整理碎片
            self._defragment()
            
            # 3. 重建索引(如果需要)
            if self._fragmentation_ratio() > 0.3:
                self._rebuild_index()
            
            # 4. 更新维护时间
            self.last_maintenance = time.time()
    
    def _backup_index(self):
        """备份索引"""
        backup_path = f"index_backup_{int(time.time())}"
        faiss.write_index(self.index, backup_path)
    
    def _defragment(self):
        """整理碎片"""
        # 实现碎片整理逻辑
        pass
    
    def _fragmentation_ratio(self):
        """计算碎片率"""
        # 实现碎片率计算逻辑
        return 0.0
    
    def _rebuild_index(self):
        """重建索引"""
        # 实现索引重建逻辑
        pass

最佳实践 💡

1. 选择索引类型

  • 数据量小于100万:使用 HNSW
  • 数据量100万-1000万:使用 IVF + PQ
  • 数据量大于1000万:使用分布式索引

2. 调优建议

  • 内存受限:使用 PQ 压缩
  • 速度优先:使用 HNSW
  • 精度优先:使用 IVF + Flat

3. 注意事项

  • 定期监控性能
  • 合理设置参数
  • 做好容量规划
  • 建立备份机制

小结 📝

本章我们深入学习了向量数据库的索引技术:

  1. 索引类型

    • 树形索引
    • 哈希索引
    • 量化索引
    • 图索引
  2. 构建策略

    • 数据预处理
    • 分批构建
    • 并行训练
  3. 更新机制

    • 增量更新
    • 删除处理
    • 重建优化
  4. 维护优化

    • 性能监控
    • 自动调优
    • 定期维护

下一章,我们将学习如何优化向量数据库的查询性能!