Source code for blockingpy.gpu_faiss_blocker

"""GPU Faiss Blocker."""

from __future__ import annotations

import gc
import logging
import os
import random
import warnings
from typing import Any, Literal

import faiss
import numpy as np
import pandas as pd

from .base import BlockingMethod
from .data_handler import DataHandler
from .helper_functions import rearrange_array

logger = logging.getLogger(__name__)

_IndexType = Literal["flat", "ivf", "ivfpq", "cagra"]


[docs] class GPUFaissBlocker(BlockingMethod): """ A class for performing blocking using the FAISS (Facebook AI Similarity Search) algorithms that are GPU-accelerated. Parameters ---------- None Attributes ---------- index : faiss.Index (gpu) The FAISS index used for nearest neighbor search x_columns : array-like or None Column names of the reference dataset METRIC_MAP : dict Mapping of distance metric names to FAISS metric types See Also -------- BlockingMethod : Abstract base class defining the blocking interface Notes ----- The available Index types from FAISS are: 'flat', 'ivf', 'ivfpq' and 'cagra'. For more details about the FAISS library and implementation, see: https://github.com/facebookresearch/faiss """
[docs] def __init__(self) -> None: self.index: faiss.Index self.x_columns: list[str] self.METRIC_MAP: dict[str, int] = { "euclidean": faiss.METRIC_L2, "l2": faiss.METRIC_L2, "inner_product": faiss.METRIC_INNER_PRODUCT, "cosine": faiss.METRIC_INNER_PRODUCT, }
[docs] def block( # noqa: PLR0915, PLR0912 self, x: DataHandler, y: DataHandler, k: int, verbose: bool | None, controls: dict[str, Any], ) -> pd.DataFrame: """ Perform blocking using the GPU FAISS algorithms. Parameters ---------- x : DataHandler Reference dataset containing features for indexing y : DataHandler Query dataset to find nearest neighbors for k : int Number of nearest neighbors to find verbose : bool, optional If True, print detailed progress information controls : dict Algorithm control parameters. Returns ------- pandas.DataFrame DataFrame containing the blocking results with columns: - 'y': indices from query dataset - 'x': indices of matched items from reference dataset - 'dist': distances to matched items """ logger.setLevel(logging.INFO if verbose else logging.WARNING) faiss_ctl = controls["gpu_faiss"] distance: str = faiss_ctl.get("distance", "cosine") index_type: _IndexType = faiss_ctl.get("index_type", "flat") k_search: int = faiss_ctl.get("k_search", 30) path: str | None = faiss_ctl.get("path") train_size: int | None = faiss_ctl.get("train_size") seed: int | None = controls.get("random_seed") if seed is not None: random.seed(seed) np.random.seed(seed) if index_type not in {"flat", "ivf", "ivfpq", "cagra"}: raise ValueError("index_type must be one of 'flat', 'ivf', 'ivfpq', 'cagra'") self.x_columns = list(x.cols) metric = self.METRIC_MAP[distance] x_arr = x.to_dense(dtype=np.float32) y_arr = y.to_dense(dtype=np.float32) del x, y gc.collect() if distance == "cosine": faiss.normalize_L2(x_arr) faiss.normalize_L2(y_arr) d = x_arr.shape[1] index_cpu = None trainable = False gpu_res = faiss.StandardGpuResources() if index_type == "flat": index_cpu = faiss.IndexFlat(d, metric) elif index_type == "ivf": nlist = faiss_ctl.get("ivf_nlist", 100) quantiser = faiss.IndexFlat(d, metric) index_cpu = faiss.IndexIVFFlat(quantiser, d, nlist, metric) trainable = True if seed is not None: self._apply_faiss_seeds(index_cpu, seed) elif index_type == "ivfpq": nlist = faiss_ctl.get("ivfpq_nlist", 100) m = faiss_ctl.get("ivfpq_m", 8) nbits = faiss_ctl.get("ivfpq_nbits", 8) NBITS_SUPPORTED_VAL = 8 if nbits != NBITS_SUPPORTED_VAL: raise ValueError("FAISS GPU IVFPQ requires ivfpq_nbits == 8.") if d % m != 0: raise ValueError(f"Dimension d={d} must be divisible by m={m} for IVFPQ index.") quantiser = faiss.IndexFlat(d, metric) index_cpu = faiss.IndexIVFPQ(quantiser, d, nlist, m, nbits) trainable = True if seed is not None: self._apply_faiss_seeds(index_cpu, seed) elif index_type == "cagra": cctl = faiss_ctl.get("cagra", {}) CfgCls = getattr(faiss, "GpuIndexCagraConfig", None) if CfgCls is not None: cagra_cfg = CfgCls() if hasattr(cagra_cfg, "graph_degree"): cagra_cfg.graph_degree = int(cctl.get("graph_degree", 64)) if hasattr(cagra_cfg, "intermediate_graph_degree"): cagra_cfg.intermediate_graph_degree = int( cctl.get("intermediate_graph_degree", 128) ) build_algo = str(cctl.get("build_algo", "ivf_pq")).lower() if hasattr(cagra_cfg, "build_algo"): if build_algo == "nn_descent" and hasattr(CfgCls, "NN_DESCENT"): cagra_cfg.build_algo = CfgCls.NN_DESCENT if hasattr(cagra_cfg, "nn_descent_niter"): cagra_cfg.nn_descent_niter = int(cctl.get("nn_descent_niter", 20)) elif hasattr(CfgCls, "IVF_PQ"): cagra_cfg.build_algo = CfgCls.IVF_PQ try: self.index = faiss.GpuIndexCagra(gpu_res, d, metric, cagra_cfg) except TypeError: self.index = faiss.GpuIndexCagra(gpu_res, d, metric) else: self.index = faiss.GpuIndexCagra(gpu_res, d, metric) else: raise ValueError("Unsupported index_type") if trainable and index_cpu is not None and not index_cpu.is_trained: if train_size is not None and train_size < x_arr.shape[0]: sample_idx = np.random.choice(x_arr.shape[0], train_size, replace=False) train_data = x_arr[sample_idx] else: train_data = x_arr index_cpu.train(train_data) if train_size is not None and train_size < x_arr.shape[0]: del train_data gc.collect() if index_type in {"flat", "ivf", "ivfpq"}: if index_type == "flat": self.index = faiss.index_cpu_to_gpu(gpu_res, 0, index_cpu) else: co = faiss.GpuClonerOptions() if index_type == "ivfpq": ivfpq_useFloat16 = faiss_ctl.get("ivfpq_useFloat16", False) ivfpq_usePrecomputed = faiss_ctl.get("ivfpq_usePrecomputed", False) ivfpq_reserveVecs = faiss_ctl.get("ivfpq_reserveVecs", 0) ivfpq_use_cuvs = faiss_ctl.get("ivfpq_use_cuvs", False) if hasattr(co, "useFloat16"): co.useFloat16 = ivfpq_useFloat16 if hasattr(co, "usePrecomputed"): co.usePrecomputed = ivfpq_usePrecomputed if hasattr(co, "reserveVecs"): co.reserveVecs = ivfpq_reserveVecs if hasattr(co, "use_cuvs"): co.use_cuvs = ivfpq_use_cuvs self.index = faiss.index_cpu_to_gpu(gpu_res, 0, index_cpu, co) if index_type == "ivf": self.index.nprobe = faiss_ctl.get("ivf_nprobe", 10) if index_type == "ivfpq": self.index.nprobe = faiss_ctl.get("ivfpq_nprobe", 10) del index_cpu gc.collect() if index_type == "cagra": if ( hasattr(self.index, "is_trained") and hasattr(self.index, "train") and not self.index.is_trained ): self.index.train(x_arr) if getattr(self.index, "ntotal", 0) == 0 and hasattr(self.index, "add"): self.index.add(x_arr) else: self.index.add(x_arr) x_total = x_arr.shape[0] del x_arr gc.collect() if k_search > x_total: warnings.warn( f"k_search={k_search} exceeds reference size ({x_total}); clipping.", category=UserWarning, stacklevel=2, ) k_search = x_total if index_type == "cagra": SPCls = getattr(faiss, "SearchParametersCagra", None) if SPCls is not None: sp = SPCls() def _set(name: str, val: Any) -> None: if hasattr(sp, name): setattr(sp, name, val) cctl = faiss_ctl.get("cagra", {}) _set("itopk_size", int(cctl.get("itopk_size", 64))) _set("max_queries", int(cctl.get("max_queries", 0))) _set("team_size", int(cctl.get("team_size", 0))) _set("search_width", int(cctl.get("search_width", 1))) _set("min_iterations", int(cctl.get("min_iterations", 0))) _set("max_iterations", int(cctl.get("max_iterations", 0))) _set("thread_block_size", int(cctl.get("thread_block_size", 0))) _set("hashmap_min_bitlen", int(cctl.get("hashmap_min_bitlen", 0))) _set( "hashmap_max_fill_rate", float(cctl.get("hashmap_max_fill_rate", 0.5)), ) _set("num_random_samplings", int(cctl.get("num_random_samplings", 1))) _set("seed", int(cctl.get("seed", seed if seed is not None else 0x128394))) algo_s = str(cctl.get("algo", "auto")).lower() if hasattr(SPCls, "AUTO"): if algo_s == "single_cta" and hasattr(SPCls, "SINGLE_CTA"): sp.algo = SPCls.SINGLE_CTA elif algo_s == "multi_cta" and hasattr(SPCls, "MULTI_CTA"): sp.algo = SPCls.MULTI_CTA else: sp.algo = SPCls.AUTO hm_s = str(cctl.get("hashmap_mode", "auto")).lower() if hasattr(SPCls, "AUTO"): if hm_s == "small" and hasattr(SPCls, "SMALL"): sp.hashmap_mode = SPCls.SMALL elif hm_s == "hash" and hasattr(SPCls, "HASH"): sp.hashmap_mode = SPCls.HASH else: sp.hashmap_mode = SPCls.AUTO distances, indices = self.index.search(y_arr, k_search, params=sp) else: distances, indices = self.index.search(y_arr, k_search) else: distances, indices = self.index.search(y_arr, k_search) y_n = y_arr.shape[0] del y_arr gc.collect() if distance == "cosine" and index_type != "ivfpq": distances = (1 - distances) / 2 K_VAL = 2 if k == K_VAL: indices, distances = rearrange_array(indices, distances) if path: self._save_index(path) result_df = pd.DataFrame( { "y": np.arange(y_n), "x": indices[:, k - 1], "dist": distances[:, k - 1], } ) del distances, indices gc.collect() return result_df
def _save_index(self, path: str) -> None: if not os.path.isdir(path): raise ValueError("Provided path is not a directory") if isinstance(self.index, getattr(faiss, "GpuIndexCagra", ())): raise ValueError("Saving CAGRA indices is not supported in this build.") index_cpu = faiss.index_gpu_to_cpu(self.index) faiss_path = os.path.join(path, "index.faiss") cols_path = os.path.join(path, "index-colnames.txt") faiss.write_index(index_cpu, faiss_path) with open(cols_path, "w", encoding="utf-8") as fh: fh.write("\n".join(self.x_columns)) del index_cpu gc.collect() def _apply_faiss_seeds(self, idx: faiss.Index, s: int) -> None: try: if hasattr(idx, "cp") and hasattr(idx.cp, "seed"): idx.cp.seed = int(s) if hasattr(idx, "pq") and hasattr(idx.pq, "cp") and hasattr(idx.pq.cp, "seed"): idx.pq.cp.seed = int(s) except Exception: pass