Source code for validatex.core.expectation

"""
Expectation base classes and registry.

This module defines the base Expectation class and the global registry
that maps expectation type names to their implementation classes.
"""

from __future__ import annotations

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Type

from validatex.core.result import ExpectationResult

# ---------------------------------------------------------------------------
# Global Expectation Registry
# ---------------------------------------------------------------------------

_EXPECTATION_REGISTRY: Dict[str, Type["Expectation"]] = {}


[docs] def register_expectation(cls: Type["Expectation"]) -> Type["Expectation"]: """Decorator that registers an expectation class by its *type_name*.""" # expectation_type is a dataclass field with init=False and a default value. # We must read the default from __dataclass_fields__ rather than getattr(), # because getattr returns the Field descriptor object (truthy) instead of # the actual default string on uninstantiated classes. fields = getattr(cls, "__dataclass_fields__", {}) et_field = fields.get("expectation_type") if et_field is not None: name = et_field.default else: name = getattr(cls, "expectation_type", None) or cls.__name__ _EXPECTATION_REGISTRY[name] = cls return cls
[docs] def get_expectation_class(name: str) -> Type["Expectation"]: """Look up an expectation class by its registered type name.""" if name not in _EXPECTATION_REGISTRY: available = ", ".join(sorted(_EXPECTATION_REGISTRY.keys())) raise ValueError(f"Unknown expectation type '{name}'. " f"Available types: {available}") return _EXPECTATION_REGISTRY[name]
[docs] def list_expectations() -> List[str]: """Return a sorted list of all registered expectation type names.""" return sorted(_EXPECTATION_REGISTRY.keys())
# --------------------------------------------------------------------------- # Base Expectation # ---------------------------------------------------------------------------
[docs] @dataclass class Expectation(ABC): """ Abstract base class for all expectations. Subclasses must: 1. Set the class attribute ``expectation_type`` (a unique string id). 2. Implement :meth:`_validate_pandas` and/or :meth:`_validate_spark`. """ expectation_type: str = field(init=False, default="base_expectation") column: Optional[str] = None kwargs: Dict[str, Any] = field(default_factory=dict) meta: Dict[str, Any] = field(default_factory=dict) # -- public API --------------------------------------------------------
[docs] def validate(self, data: Any, engine: str = "pandas") -> ExpectationResult: """ Run this expectation against *data* using the specified engine. Parameters ---------- data : Any The dataset (pd.DataFrame or pyspark.sql.DataFrame). engine : str ``"pandas"`` or ``"spark"``. Returns ------- ExpectationResult """ try: if engine == "pandas": return self._validate_pandas(data) elif engine == "spark": return self._validate_spark(data) elif engine == "sql": return self._validate_sql(data) else: raise ValueError(f"Unsupported engine: {engine}") except Exception as exc: return ExpectationResult( expectation_type=self.expectation_type, success=False, column=self.column, details={"error": str(exc)}, exception_info=str(exc), )
[docs] def to_dict(self) -> Dict[str, Any]: """Serialize to a plain dictionary (for YAML / JSON configs).""" d: Dict[str, Any] = { "expectation_type": self.expectation_type, } if self.column is not None: d["column"] = self.column if self.kwargs: d["kwargs"] = self.kwargs if self.meta: d["meta"] = self.meta return d
[docs] @classmethod def from_dict(cls, d: Dict[str, Any]) -> "Expectation": """Deserialize from a dictionary.""" exp_type = d["expectation_type"] exp_cls = get_expectation_class(exp_type) column = d.get("column") kwargs = d.get("kwargs", {}) meta = d.get("meta", {}) instance = exp_cls(column=column, kwargs=kwargs, meta=meta) return instance
# -- hooks for subclasses ---------------------------------------------- @abstractmethod def _validate_pandas(self, df: Any) -> ExpectationResult: """Validate using Pandas engine. Must be implemented by subclasses.""" ... def _validate_spark(self, df: Any) -> ExpectationResult: """Validate using PySpark engine. Optional override.""" raise NotImplementedError(f"{self.expectation_type} does not support PySpark yet.") def _validate_sql(self, engine: Any) -> ExpectationResult: """Validate using SQLAlchemy engine. Optional override.""" raise NotImplementedError(f"{self.expectation_type} does not support SQL yet.") # -- helpers ----------------------------------------------------------- def _build_result( self, success: bool, observed_value: Any = None, element_count: int = 0, unexpected_count: int = 0, unexpected_percent: float = 0.0, unexpected_values: Optional[List[Any]] = None, details: Optional[Dict[str, Any]] = None, ) -> ExpectationResult: """Convenience builder for :class:`ExpectationResult`.""" return ExpectationResult( expectation_type=self.expectation_type, success=success, column=self.column, observed_value=observed_value, element_count=element_count, unexpected_count=unexpected_count, unexpected_percent=unexpected_percent, unexpected_values=unexpected_values or [], details=details or {}, meta=self.meta, ) def __repr__(self) -> str: parts = [f"type={self.expectation_type!r}"] if self.column: parts.append(f"column={self.column!r}") if self.kwargs: parts.append(f"kwargs={self.kwargs!r}") return f"Expectation({', '.join(parts)})"