"""
Contains the BlockingResult class for analyzing and printing
blocking results.
"""
from collections import Counter
import numpy as np
import pandas as pd
[docs]
class BlockingResult:
"""
A class to represent and analyze the results of a blocking operation.
This class provides functionality to analyze and evaluate blocking results,
including calculation of reduction ratios, metrics evaluation.
Parameters
----------
x_df : pandas.DataFrame
DataFrame containing blocking results with columns ['x', 'y', 'block', 'dist']
ann : str
The blocking method used (e.g., 'nnd', 'hnsw', 'annoy', etc.)
deduplication : bool
Whether the blocking was performed for deduplication
true_blocks : pandas.DataFrame, optional
DataFrame with true blocks to calculate evaluation metrics
n_original_records : tuple[int, int]
Number of records in the original dataset(s)
eval_metrics : pandas.Series, optional
Evaluation metrics if true blocks were provided
confusion : pandas.DataFrame, optional
Confusion matrix if true blocks were provided
colnames_xy : numpy.ndarray
Column names used in the blocking process
reduction_ratio : float, optional
Pre-calculated reduction ratio (default None)
Attributes
----------
result : pandas.DataFrame
The blocking results containing ['x', 'y', 'block', 'dist'] columns
method : str
Name of the blocking method used
deduplication : bool
Indicates if this was a deduplication operation
metrics : pandas.Series or None
Evaluation metrics if true blocks were provided
confusion : pandas.DataFrame or None
Confusion matrix if true blocks were provided
colnames : numpy.ndarray
Names of columns used in blocking
n_original_records : tuple[int, int]
Number of records in the original dataset(s)
reduction_ratio : float
Reduction ratio calculated for the blocking method
Notes
-----
The class provides methods for calculating reduction ratio and formatting
evaluation metrics for blocking quality assessment.
"""
def __init__( # noqa: PLR0913
self,
x_df: pd.DataFrame,
ann: str,
deduplication: bool,
n_original_records: tuple[int, int | None],
true_blocks: pd.DataFrame | None,
eval_metrics: pd.Series | None,
confusion: pd.DataFrame | None,
colnames_xy: np.ndarray,
reduction_ratio: float | None = None,
) -> None:
"""Initialize a BlockingResult instance."""
self.result = x_df[["x", "y", "block", "dist"]]
self.method = ann
self.deduplication = deduplication
self.metrics = eval_metrics if true_blocks is not None else None
self.confusion = confusion if true_blocks is not None else None
self.colnames = colnames_xy
self.n_original_records = n_original_records
self.reduction_ratio = reduction_ratio
def __repr__(self) -> str:
"""
Provide a concise representation of the blocking result.
Returns
-------
str
A string representation showing method and deduplication status
"""
return f"BlockingResult(method={self.method}, deduplication={self.deduplication})"
def __str__(self) -> str:
"""
Create a detailed string representation of the blocking result.
Returns
-------
str
A formatted string containing:
- Basic information about the blocking
- Block size distribution
- Evaluation metrics (if available)
Notes
-----
The output includes reduction ratio and detailed block size statistics.
If evaluation metrics are available, they are included in the output.
"""
if self.deduplication:
block_sizes = self.result.groupby("block").apply(
lambda x: len(pd.concat([x["x"], x["y"]]).unique())
)
else:
block_sizes = (
self.result.groupby("block").agg({"x": "nunique", "y": "nunique"}).sum(axis=1)
)
block_size_dist = Counter(block_sizes.values)
output = []
output.append("=" * 56)
output.append(f"Blocking based on the {self.method} method.")
output.append(f"Number of blocks: {len(block_sizes)}")
output.append(f"Number of columns created for blocking: {len(self.colnames)}")
output.append(f"Reduction ratio: {self.reduction_ratio:.6f}")
output.append("=" * 56)
output.append("Distribution of the size of the blocks:")
output.append(f"{'Block Size':>10} | {'Number of Blocks':<15}")
for size, count in sorted(block_size_dist.items()):
output.append(f"{size:>10} | {count:<15}")
if self.metrics is not None:
output.append("=" * 56)
output.append("Evaluation metrics (standard):")
metrics = self._format_metrics()
for name, value in metrics.items():
output.append(f"{name} : {value}")
return "\n".join(output)
[docs]
def add_block_column(
self,
df_left: pd.DataFrame,
df_right: pd.DataFrame | None = None,
id_col_left: str | None = None,
id_col_right: str | None = None,
block_col: str = "block",
) -> pd.DataFrame | tuple[pd.DataFrame, pd.DataFrame]:
"""
Attach block IDs back onto the original DataFrame(s), filling any
records with no assignment into their own singleton blocks.
- **Deduplication**: pass only `df_left`; returns one DataFrame.
- **Record-linkage**: pass both `df_left` and `df_right`; returns
a tuple `(left_with_blocks, right_with_blocks)`.
Parameters
----------
df_left
If dedup: your input DataFrame. If rec-lin: the “x” DataFrame.
df_right
If rec-lin: the “y” DataFrame. Otherwise None.
id_col_left
Column in `df_left` matching integer index into `self.result.x`;
if None, uses the DataFrame's positional index.
id_col_right
Column in `df_right` matching integer index into `self.result.y`;
if None, uses that DataFrame's positional index.
block_col
Name of the new block-ID column.
Returns
-------
Single DataFrame (dedup) or tuple of two DataFrames (rec-lin).
Examples
--------
>>> x = blocking_result.add_block_column(org_x_df) # dedup
>>> x, y = blocking_result.add_block_column(org_x_df, org_y_df) # rec-lin
"""
def _fill_orphans(out: pd.DataFrame, col: str, start_id: int) -> int:
mask = out[col].isna()
n = int(mask.sum())
if n > 0:
new_ids = range(start_id, start_id + n)
out.loc[mask, col] = list(new_ids)
start_id += n
out[col] = out[col].astype("int64")
return start_id
max_block = int(self.result["block"].max()) + 1
if df_right is None:
mapping = (
self.result.melt(id_vars="block", value_vars=["x", "y"], value_name="rec-id-map")
.drop_duplicates("rec-id-map")
.set_index("rec-id-map")["block"]
)
out = df_left.copy()
if id_col_left:
out[block_col] = out[id_col_left].map(mapping)
else:
out[block_col] = out.index.map(mapping)
out[block_col] = out[block_col].astype("Int64")
_fill_orphans(out, block_col, max_block)
return out
map_x = self.result[["x", "block"]].drop_duplicates("x").set_index("x")["block"]
map_y = self.result[["y", "block"]].drop_duplicates("y").set_index("y")["block"]
left = df_left.copy()
if id_col_left:
left[block_col] = left[id_col_left].map(map_x)
else:
left[block_col] = left.index.map(map_x)
left[block_col] = left[block_col].astype("Int64")
max_block = _fill_orphans(left, block_col, max_block)
right = df_right.copy()
if id_col_right:
right[block_col] = right[id_col_right].map(map_y)
else:
right[block_col] = right.index.map(map_y)
right[block_col] = right[block_col].astype("Int64")
_fill_orphans(right, block_col, max_block)
return left, right
def _format_metrics(self) -> dict[str, float]:
"""
Format the evaluation metrics for display.
Returns
-------
dict
Dictionary of metric names and formatted values as percentages,
rounded to 4 decimal places
Notes
-----
Returns an empty dictionary if no metrics are available.
Values are multiplied by 100 to convert to percentages.
"""
if self.metrics is None:
return {}
self.metrics.index = self.metrics.index.map(str)
self.metrics = self.metrics.astype(float) if self.metrics is not None else None
return {
name: round(value * 100.0, 4)
for name, value in zip(self.metrics.index, self.metrics.values, strict=False)
}