Source code for rics.mapping.support

"""Functions and classes used by the ``Mapper`` for handling score matrices.

.. warning::

   This module is considered an implementation detail, and may change without notice.
"""

import logging
import warnings
from collections import defaultdict as _defaultdict
from contextlib import contextmanager as _contextmanager
from dataclasses import dataclass as _dataclass
from typing import Dict, Generator, Generic as _Generic, Iterable, List, Optional, Tuple

import numpy as np
import pandas as pd

from rics.mapping import Cardinality as _Cardinality, DirectionalMapping as _DirectionalMapping
from rics.mapping.types import CandidateType, ValueType

_MAPPER_LOGGER = logging.getLogger(__package__).getChild("Mapper")
ACCEPT_LOGGER = _MAPPER_LOGGER.getChild("accept")
SUPERSESSION_LOGGER = ACCEPT_LOGGER.getChild("details")
UNMAPPED_LOGGER = _MAPPER_LOGGER.getChild("unmapped").getChild("details")

warnings.warn("This module is considered an implementation detail, and may change without notice.", UserWarning)


[docs]@_contextmanager def enable_verbose_debug_messages() -> Generator[None, None, None]: # typing.ContextManager doesn't work? """Temporarily enable verbose DEBUG-level logger messages. Returns a context manager. Calling the function without the ``with`` statement does nothing. >>> from rics.mapping import Mapper, support >>> with support.enable_verbose_debug_messages(): ... Mapper().apply('ab', candidates='abc') """ # noqa: DAR301 from rics.mapping import filter_functions, heuristic_functions, score_functions before = filter_functions.VERBOSE, heuristic_functions.VERBOSE, score_functions.VERBOSE try: filter_functions.VERBOSE, heuristic_functions.VERBOSE, score_functions.VERBOSE = True, True, True yield finally: filter_functions.VERBOSE, heuristic_functions.VERBOSE, score_functions.VERBOSE = before
[docs]class MatchScores: """High-level selection operations. Args: scores: A score matrix, where ``scores.index`` are values and ``score.columns`` are treated as the candidates. min_score: Minimum score to consider make a `value -> candidate` match. """ def __init__(self, scores: pd.DataFrame, min_score: float) -> None: self._min_score = min_score self._matrix = scores
[docs] def to_directional_mapping(self, cardinality: _Cardinality = None) -> _DirectionalMapping[ValueType, CandidateType]: """Create a ``DirectionalMapping`` with a given target ``Cardinality``. Args: cardinality: Explicit cardinality to set, see :attr:`~rics.mapping.DirectionalMapping.cardinality`. If ``None``, use the actual cardinality when selecting all matches with scores :attr:`above` the minimum. Returns: A ``DirectionalMapping``. """ matches: List[MatchScores.Record[ValueType, CandidateType]] rejections: List[MatchScores.Reject[ValueType, CandidateType]] matches, rejections = self._match(cardinality) left_to_right = _defaultdict(list) for record in list(matches): supersedes: List[MatchScores.Reject[ValueType, CandidateType]] = [] if SUPERSESSION_LOGGER.isEnabledFor(logging.DEBUG) and rejections: for rr in rejections: if record in (rr.superseding_value, rr.superseding_candidate): supersedes.append(rr) if ACCEPT_LOGGER.isEnabledFor(logging.INFO): reason = "(short-circuit or override)" if record.score == np.inf else f">= {self._min_score}" ACCEPT_LOGGER.debug(f"Accepted: {record} {reason}.") if supersedes: s = "\n".join(" " + rr.explain(self._min_score) for rr in supersedes) SUPERSESSION_LOGGER.debug(f"This match supersedes {len(supersedes)} other matches:\n{s}") left_to_right[record.value].append(record.candidate) if rejections and UNMAPPED_LOGGER.isEnabledFor(logging.DEBUG): unmapped_values = set(self._matrix.index.difference(left_to_right)) for value in unmapped_values: lst = [] for rr in filter(lambda r: r.record.value == value, rejections): # noqa: B023 lst.append(f" {rr.explain(self._min_score, full=True)}") value_reasons = "\n".join(lst) UNMAPPED_LOGGER.debug(f"Could not map {value=}:\n{value_reasons}") return _DirectionalMapping( cardinality=cardinality, left_to_right={ value: tuple(left_to_right[value]) for value in self._matrix.index if value in left_to_right }, _verify=False, )
def _match( self, cardinality: _Cardinality = None ) -> Tuple[List["MatchScores.Record[ValueType, CandidateType]"], List["Reject[ValueType, CandidateType]"]]: rejections: Optional[List[MatchScores.Reject[ValueType, CandidateType]]] = None records: List["MatchScores.Record[ValueType, CandidateType]"] = self.above() if SUPERSESSION_LOGGER.isEnabledFor(logging.DEBUG) or UNMAPPED_LOGGER.isEnabledFor(logging.DEBUG): rejections = [] records.extend(self.below()) if cardinality is _Cardinality.OneToOne: matches = self._select_one_to_one(records, rejections) elif cardinality is _Cardinality.OneToMany: matches = self._select_one_to_many(records, rejections) elif cardinality is _Cardinality.ManyToOne: matches = self._select_many_to_one(records, rejections) else: matches = self._select_many_to_many(records, rejections) return list(matches), rejections or [] def _get_sorted(self) -> pd.Series: sorted_scores: pd.Series = self._matrix.stack() sorted_scores.sort_values(ascending=False, inplace=True) return sorted_scores
[docs] def above(self) -> List["MatchScores.Record[ValueType, CandidateType]"]: """Get all records with scores `above` the threshold.""" s = self._get_sorted() return self._from_series(s[s >= self._min_score])
[docs] def below(self) -> List["MatchScores.Record[ValueType, CandidateType]"]: """Get all records with scores `below` the threshold.""" s = self._get_sorted() return self._from_series(s[s < self._min_score])
[docs] @_dataclass(frozen=True) class Record(_Generic[ValueType, CandidateType]): """Data concerning a match.""" value: ValueType """A hashable value.""" candidate: CandidateType """A hashable candidate.""" score: float """Likeness score computed by some scoring function.""" def __str__(self) -> str: return f"{repr(self.value)} -> '{self.candidate}'; score={self.score:.3f}"
@classmethod def _from_series(cls, s: pd.Series) -> List[Record[ValueType, CandidateType]]: return [MatchScores.Record(value, candidate, score) for (value, candidate), score in s.items()]
[docs] @_dataclass(frozen=True) class Reject(_Generic[ValueType, CandidateType]): """Data concerning the rejection of a match.""" record: "MatchScores.Record[ValueType, CandidateType]" superseding_value: Optional["MatchScores.Record[ValueType, CandidateType]"] = None superseding_candidate: Optional["MatchScores.Record[ValueType, CandidateType]"] = None
[docs] def explain(self, min_score: float, full: bool = False) -> str: # pragma: no cover """Create a string which explains the rejection. Args: min_score: Minimum score to accept a match. full: If ``True`` show full information about superseding matches. Returns: An explanatory string. """ if self.record.score == -np.inf: if self.superseding_value and self.superseding_value.score == np.inf: extra = f": {self.superseding_value}" if full else "" why = f" (superseded by short-circuit or override{extra})" elif self.superseding_candidate and self.superseding_candidate.score == np.inf: extra = f": {self.superseding_candidate}" if full else "" why = f" (superseded by short-circuit or override{extra}" else: why = " (filtered)" elif self.record.score < min_score: why = f" < {min_score} (below threshold)" else: ands = [] if self.superseding_value: extra = f": {self.superseding_value}" if full else "" ands.append(f"value={repr(self.superseding_value.value)}{extra}") if self.superseding_candidate: extra = f": {self.superseding_candidate}" if full else "" ands.append(f"candidate={repr(self.superseding_candidate.candidate)}{extra}") why = f" (superseded on {' and '.join(ands)})" return f"{self.record}{why}."
def _select_one_to_one( self, records: Iterable[Record[ValueType, CandidateType]], rejections: List[Reject[ValueType, CandidateType]] = None, ) -> Iterable[Record[ValueType, CandidateType]]: mvs: Dict[ValueType, MatchScores.Record[ValueType, CandidateType]] = {} mcs: Dict[CandidateType, MatchScores.Record[ValueType, CandidateType]] = {} for record in records: if record.score < self._min_score or record.value in mvs or record.candidate in mcs: if rejections is not None: # pragma: no cover rejections.append( MatchScores.Reject( record, superseding_value=mvs.get(record.value), superseding_candidate=mcs.get(record.candidate), ) ) continue mvs[record.value] = record mcs[record.candidate] = record yield record def _select_one_to_many( self, records: Iterable[Record[ValueType, CandidateType]], rejections: List[Reject[ValueType, CandidateType]] = None, ) -> Iterable[Record[ValueType, CandidateType]]: mcs: Dict[CandidateType, MatchScores.Record[ValueType, CandidateType]] = {} for record in records: if record.score < self._min_score or record.candidate in mcs: if rejections is not None: # pragma: no cover rejections.append(MatchScores.Reject(record, superseding_candidate=mcs.get(record.candidate))) continue mcs[record.candidate] = record yield record def _select_many_to_one( self, records: Iterable[Record[ValueType, CandidateType]], rejections: List[Reject[ValueType, CandidateType]] = None, ) -> Iterable[Record[ValueType, CandidateType]]: mvs: Dict[ValueType, MatchScores.Record[ValueType, CandidateType]] = {} for record in records: if record.score < self._min_score or record.value in mvs: if rejections is not None: # pragma: no cover rejections.append(MatchScores.Reject(record, superseding_value=mvs.get(record.value))) continue mvs[record.value] = record yield record def _select_many_to_many( self, records: Iterable[Record[ValueType, CandidateType]], rejections: List[Reject[ValueType, CandidateType]] = None, ) -> Iterable[Record[ValueType, CandidateType]]: for record in records: # pragma: no cover if record.score < self._min_score: if rejections is not None: rejections.append(MatchScores.Reject(record)) continue yield record