Source code for blockingpy.voyager_blocker

"""
Contains the VoyagerBlocker class for performing blocking using the
Voyager algorithm from Spotify.
"""

import logging
import os
import warnings
from typing import Any

import numpy as np
import pandas as pd
from voyager import Index, Space

from .base import BlockingMethod
from .data_handler import DataHandler
from .helper_functions import rearrange_array

logger = logging.getLogger(__name__)


[docs] class VoyagerBlocker(BlockingMethod): """ A class for performing blocking using the Voyager algorithm from Spotify. This class implements blocking functionality using Spotify's Voyager algorithm for efficient approximate nearest neighbor search. It supports multiple distance metrics and is designed for high-dimensional data. Parameters ---------- None Attributes ---------- index : voyager.Index or None The Voyager index used for nearest neighbor search x_columns : array-like or None Column names of the reference dataset METRIC_MAP : dict Mapping of distance metric names to Voyager Space types See Also -------- BlockingMethod : Abstract base class defining the blocking interface voyager.Index : The underlying Voyager index implementation Raises ------ ValueError If path is provided but incorrect Notes ----- For more details about the Voyager algorithm and implementation, see: https://github.com/spotify/voyager """ METRIC_MAP: dict[str, Space] = { "euclidean": Space.Euclidean, "cosine": Space.Cosine, "inner_product": Space.InnerProduct, }
[docs] def __init__(self) -> None: """ Initialize the VoyagerBlocker instance. Creates a new VoyagerBlocker with empty index. """ self.index: Index self.x_columns: list[str]
[docs] def block( self, x: DataHandler, y: DataHandler, k: int, verbose: bool | None, controls: dict[str, Any], ) -> pd.DataFrame: """ Perform blocking using the Voyager algorithm. Parameters ---------- x : DataHandler Reference dataset containing features for indexing y : DataHandler Query dataset to find nearest neighbors for k : int Number of nearest neighbors to find verbose : bool, optional If True, print detailed progress information controls : dict Algorithm control parameters with the following structure: { 'random_seed': int, 'voyager': { 'distance': str, 'k_search': int, 'path': str, 'M': int, 'ef_construction': int, 'max_elements': int, 'num_threads': int, 'query_ef': int } } Returns ------- pandas.DataFrame DataFrame containing the blocking results with columns: - 'y': indices from query dataset - 'x': indices of matched items from reference dataset - 'dist': distances to matched items Notes ----- The algorithm uses a graph-based approach for approximate nearest neighbor search. The quality of approximation can be controlled through parameters like ef_construction and query_ef. """ logger.setLevel(logging.INFO if verbose else logging.WARNING) self.x_columns = list(x.cols) X = x.to_dense() Y = y.to_dense() distance = controls["voyager"].get("distance") space = self.METRIC_MAP[distance] k_search = controls["voyager"].get("k_search") path = controls["voyager"].get("path") seed = controls.get("random_seed") if seed is None: seed = 1 if X.shape[0] == 0: raise ValueError("Reference dataset `x` must not be empty.") if Y.shape[0] == 0: raise ValueError("Query dataset `y` must not be empty.") self.index = Index( space=space, num_dimensions=X.shape[1], M=controls["voyager"].get("M"), ef_construction=controls["voyager"].get("ef_construction"), random_seed=seed, max_elements=controls["voyager"].get("max_elements"), ) logger.info("Building index...") self.index.add_items( X, num_threads=controls["voyager"].get("num_threads"), ) logger.info("Querying index...") l_ind_nns = np.zeros(Y.shape[0], dtype=int) l_ind_dist = np.zeros(Y.shape[0]) if k_search > X.shape[0]: original_k_search = k_search k_search = min(k_search, X.shape[0]) warnings.warn( f"k_search ({original_k_search}) is larger than the number of reference points " f"({X.shape[0]}). Adjusted k_search to {k_search}.", category=UserWarning, stacklevel=2, ) all_neighbor_ids, all_distances = self.index.query( vectors=Y, k=k_search, num_threads=controls["voyager"].get("num_threads"), query_ef=controls["voyager"].get("query_ef"), ) K_VAL = 2 if k == K_VAL: all_neighbor_ids, all_distances = rearrange_array(all_neighbor_ids, all_distances) l_ind_nns = all_neighbor_ids[:, k - 1] l_ind_dist = all_distances[:, k - 1] if path: self._save_index(path) result = pd.DataFrame( { "y": np.arange(Y.shape[0]), "x": l_ind_nns, "dist": l_ind_dist, } ) logger.info("Process completed successfully.") return result
def _save_index(self, path: str) -> None: """ Save the Voyager index and column names to files. Parameters ---------- path : str Directory path where the files will be saved Raises ------ ValueError If the provided path is incorrect Notes ----- Creates two files: - 'index.voyager': The Voyager index file - 'index-colnames.txt': A text file with column names """ if not os.path.exists(os.path.dirname(path)): raise ValueError("Provided path is incorrect") path_voy = os.path.join(path, "index.voyager") path_voy_cols = os.path.join(path, "index-colnames.txt") logger.info(f"Writing an index to {path_voy}") self.index.save(path_voy) with open(path_voy_cols, "w", encoding="utf-8") as f: f.write("\n".join(self.x_columns))