Source code for blockingpy.nnd_blocker

"""Contains the NNDBlocker class for blocking using the Nearest Neighbor Descent algorithm."""

import logging
import warnings
from typing import Any

import numpy as np
import pandas as pd
import pynndescent

from .base import BlockingMethod
from .data_handler import DataHandler
from .helper_functions import rearrange_array

logger = logging.getLogger(__name__)


[docs] class NNDBlocker(BlockingMethod): """ A blocker class that uses the Nearest Neighbor Descent (NND) algorithm. This class implements blocking functionality using the pynndescent library's NNDescent algorithm for efficient approximate nearest neighbor search. Parameters ---------- None Attributes ---------- index : pynndescent.NNDescent or None The NNDescent index used for querying See Also -------- BlockingMethod : Abstract base class defining the blocking interface pynndescent.NNDescent : The underlying nearest neighbor descent implementation Notes ----- For more details about the algorithm and implementation, see: https://pynndescent.readthedocs.io/en/latest/api.html https://github.com/lmcinnes/pynndescent """
[docs] def __init__(self) -> None: """ Initialize the NNDBlocker instance. Creates a new NNDBlocker with empty index. """ self.index: pynndescent.NNDescent
[docs] def block( self, x: DataHandler, y: DataHandler, k: int, verbose: bool | None, controls: dict[str, Any], ) -> pd.DataFrame: """ Perform blocking using the NND algorithm. Parameters ---------- x : DataHandler Reference dataset containing features for indexing y : DataHandler Query dataset to find nearest neighbors for k : int Number of nearest neighbors to find verbose : bool, optional If True, print detailed progress information controls : dict Algorithm control parameters with the following structure: { 'random_seed': int, 'nnd': { 'metric': str, 'k_search': int, 'metric_kwds': dict, 'n_threads': int, 'tree_init': bool, 'n_trees': int, 'leaf_size': int, 'pruning_degree_multiplier': float, 'diversify_prob': float, 'init_graph': array-like or None, 'init_dist': array-like or None, 'low_memory': bool, 'max_candidates': int, 'max_rptree_depth': int, 'n_iters': int, 'delta': float, 'compressed': bool, 'parallel_batch_queries': bool, 'epsilon': float } } Returns ------- pandas.DataFrame DataFrame containing the blocking results with columns: - 'y': indices from query dataset - 'x': indices of matched items from reference dataset - 'dist': distances to matched items Notes ----- The algorithm builds an approximate nearest neighbor index using random projection trees and neighbor descent. The quality of the approximation can be controlled through various parameters such as n_trees, n_iters, and epsilon. """ logger.setLevel(logging.INFO if verbose else logging.WARNING) distance = controls["nnd"].get("metric") k_search = controls["nnd"].get("k_search") X = x.data Y = y.data if k_search > X.shape[0]: original_k_search = k_search k_search = min(k_search, X.shape[0]) warnings.warn( f"k_search ({original_k_search}) is larger than the number of reference points " f"({X.shape[0]}). Adjusted k_search to {k_search}.", category=UserWarning, stacklevel=2, ) logger.info(f"Initializing NND index with {distance} metric.") self.index = pynndescent.NNDescent( data=X, n_neighbors=k_search, metric=distance, metric_kwds=controls["nnd"].get("metric_kwds"), verbose=verbose, n_jobs=controls["nnd"].get("n_threads"), tree_init=controls["nnd"].get("tree_init"), n_trees=controls["nnd"].get("n_trees"), leaf_size=controls["nnd"].get("leaf_size"), pruning_degree_multiplier=controls["nnd"].get("pruning_degree_multiplier"), diversify_prob=controls["nnd"].get("diversify_prob"), init_graph=controls["nnd"].get("init_graph"), init_dist=controls["nnd"].get("init_dist"), low_memory=controls["nnd"].get("low_memory"), max_candidates=controls["nnd"].get("max_candidates"), max_rptree_depth=controls["nnd"].get("max_rptree_depth"), n_iters=controls["nnd"].get("n_iters"), delta=controls["nnd"].get("delta"), compressed=controls["nnd"].get("compressed"), parallel_batch_queries=controls["nnd"].get("parallel_batch_queries"), random_state=controls.get("random_seed"), ) logger.info("Querying index...") l_1nn = self.index.query(query_data=Y, k=k_search, epsilon=controls["nnd"].get("epsilon")) indices = l_1nn[0] distances = l_1nn[1] K_VAL = 2 if k == K_VAL: indices, distances = rearrange_array(indices, distances) result = pd.DataFrame( { "y": np.arange(Y.shape[0]), "x": indices[:, k - 1], "dist": distances[:, k - 1], } ) logger.info("Process completed successfully.") return result