"""
Column-level expectations.
Each class is a self-contained expectation that validates properties
of individual columns in a DataFrame.
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from typing import Any
import pandas as pd
from validatex.core.expectation import Expectation, register_expectation
from validatex.core.result import ExpectationResult
# ---------------------------------------------------------------------------
# 1. expect_column_to_exist
# ---------------------------------------------------------------------------
[docs]
@register_expectation
@dataclass
class ExpectColumnToExist(Expectation):
"""Expect a column to exist in the DataFrame."""
expectation_type: str = field(init=False, default="expect_column_to_exist")
def _validate_pandas(self, df: pd.DataFrame) -> ExpectationResult:
exists = self.column in df.columns
return self._build_result(
success=exists,
observed_value=list(df.columns),
details={"column_exists": exists},
)
def _validate_spark(self, df: Any) -> ExpectationResult:
exists = self.column in df.columns
return self._build_result(
success=exists,
details={"column_exists": exists},
)
def _validate_sql(self, sql_source: Any) -> ExpectationResult:
from sqlalchemy import text
engine, query_or_table = sql_source
# Fast query to just get column headers
query = f"SELECT * FROM ({query_or_table}) AS subquery LIMIT 1"
with engine.connect() as conn:
result = conn.execute(text(query))
exists = str(self.column) in result.keys()
return self._build_result(
success=exists,
details={"column_exists": exists},
)
# ---------------------------------------------------------------------------
# 2. expect_column_to_not_be_null
# ---------------------------------------------------------------------------
[docs]
@register_expectation
@dataclass
class ExpectColumnToNotBeNull(Expectation):
"""Expect a column to contain no null values."""
expectation_type: str = field(init=False, default="expect_column_to_not_be_null")
def _validate_pandas(self, df: pd.DataFrame) -> ExpectationResult:
null_count = int(df[self.column].isnull().sum())
total = len(df)
pct = (null_count / total * 100) if total > 0 else 0.0
return self._build_result(
success=(null_count == 0),
observed_value=null_count,
element_count=total,
unexpected_count=null_count,
unexpected_percent=pct,
details={"null_count": null_count, "total_count": total},
)
def _validate_spark(self, df: Any) -> ExpectationResult:
from pyspark.sql import functions as F
total = df.count()
null_count = df.filter(F.col(str(self.column)).isNull()).count()
pct = (null_count / total * 100) if total > 0 else 0.0
return self._build_result(
success=(null_count == 0),
observed_value=null_count,
element_count=total,
unexpected_count=null_count,
unexpected_percent=pct,
details={"null_count": null_count, "total_count": total},
)
def _validate_sql(self, sql_source: Any) -> ExpectationResult:
from sqlalchemy import text
engine, query_or_table = sql_source
col = str(self.column)
query = f"SELECT COUNT(*) as total, SUM(CASE WHEN {col} IS NULL THEN 1 ELSE 0 END) as nulls FROM ({query_or_table}) AS subquery"
with engine.connect() as conn:
row = conn.execute(text(query)).fetchone()
total = int(row.total) if row and row.total else 0
null_count = int(row.nulls) if row and row.nulls else 0
pct = (null_count / total * 100) if total > 0 else 0.0
return self._build_result(
success=(null_count == 0),
observed_value=null_count,
element_count=total,
unexpected_count=null_count,
unexpected_percent=pct,
details={"null_count": null_count, "total_count": total},
)
# ---------------------------------------------------------------------------
# 3. expect_column_values_to_be_unique
# ---------------------------------------------------------------------------
[docs]
@register_expectation
@dataclass
class ExpectColumnValuesToBeUnique(Expectation):
"""Expect all values in a column to be unique (no duplicates)."""
expectation_type: str = field(init=False, default="expect_column_values_to_be_unique")
def _validate_pandas(self, df: pd.DataFrame) -> ExpectationResult:
total = len(df)
dup_mask = df[self.column].duplicated(keep=False)
dup_count = int(dup_mask.sum())
pct = (dup_count / total * 100) if total > 0 else 0.0
dup_values = df.loc[dup_mask, self.column].unique().tolist()[:20]
return self._build_result(
success=(dup_count == 0),
observed_value=f"{total - dup_count} unique out of {total}",
element_count=total,
unexpected_count=dup_count,
unexpected_percent=pct,
unexpected_values=dup_values,
details={"duplicate_count": dup_count},
)
def _validate_sql(self, sql_source: Any) -> ExpectationResult:
from sqlalchemy import text
engine, query_or_table = sql_source
col = str(self.column)
query = (
f"SELECT COUNT({col}) as total, COUNT(DISTINCT {col}) as distinct_count FROM ({query_or_table}) AS subquery"
)
with engine.connect() as conn:
row = conn.execute(text(query)).fetchone()
total = int(row.total) if row and row.total else 0
distinct = int(row.distinct_count) if row and row.distinct_count else 0
dup_count = total - distinct
pct = (dup_count / total * 100) if total > 0 else 0.0
return self._build_result(
success=(dup_count == 0),
observed_value=f"{distinct} unique values out of {total}",
element_count=total,
unexpected_count=dup_count,
unexpected_percent=pct,
details={"duplicate_count": dup_count},
)
def _validate_spark(self, df: Any) -> ExpectationResult:
total = df.count()
distinct_count = df.select(self.column).distinct().count()
dup_count = total - distinct_count
pct = (dup_count / total * 100) if total > 0 else 0.0
return self._build_result(
success=(dup_count == 0),
observed_value=f"{distinct_count} unique out of {total}",
element_count=total,
unexpected_count=dup_count,
unexpected_percent=pct,
details={"duplicate_count": dup_count, "distinct_count": distinct_count},
)
# ---------------------------------------------------------------------------
# 4. expect_column_values_to_be_between
# ---------------------------------------------------------------------------
[docs]
@register_expectation
@dataclass
class ExpectColumnValuesToBeBetween(Expectation):
"""Expect column values to fall within [min_value, max_value]."""
expectation_type: str = field(init=False, default="expect_column_values_to_be_between")
def _validate_pandas(self, df: pd.DataFrame) -> ExpectationResult:
min_val = self.kwargs.get("min_value")
max_val = self.kwargs.get("max_value")
strict_min = self.kwargs.get("strict_min", False)
strict_max = self.kwargs.get("strict_max", False)
series = df[self.column].dropna()
total = len(series)
if strict_min:
mask_low = series <= min_val if min_val is not None else pd.Series(False, index=series.index)
else:
mask_low = series < min_val if min_val is not None else pd.Series(False, index=series.index)
if strict_max:
mask_high = series >= max_val if max_val is not None else pd.Series(False, index=series.index)
else:
mask_high = series > max_val if max_val is not None else pd.Series(False, index=series.index)
unexpected_mask = mask_low | mask_high
unexpected_count = int(unexpected_mask.sum())
pct = (unexpected_count / total * 100) if total > 0 else 0.0
unexpected_vals = series[unexpected_mask].tolist()[:20]
return self._build_result(
success=(unexpected_count == 0),
observed_value={
"min": series.min() if total > 0 else None,
"max": series.max() if total > 0 else None,
},
element_count=total,
unexpected_count=unexpected_count,
unexpected_percent=pct,
unexpected_values=unexpected_vals,
details={
"min_value": min_val,
"max_value": max_val,
"strict_min": strict_min,
"strict_max": strict_max,
},
)
def _validate_spark(self, df: Any) -> ExpectationResult:
from pyspark.sql import functions as F
min_val = self.kwargs.get("min_value")
max_val = self.kwargs.get("max_value")
strict_min = self.kwargs.get("strict_min", False)
strict_max = self.kwargs.get("strict_max", False)
col = F.col(str(self.column))
filtered = df.filter(col.isNotNull())
total = filtered.count()
conditions = []
if min_val is not None:
conditions.append(col <= min_val if strict_min else col < min_val)
if max_val is not None:
conditions.append(col >= max_val if strict_max else col > max_val)
if conditions:
from functools import reduce
import operator
combined = reduce(operator.__or__, conditions)
unexpected_count = filtered.filter(combined).count()
else:
unexpected_count = 0
pct = (unexpected_count / total * 100) if total > 0 else 0.0
stats = filtered.select(F.min(str(self.column)), F.max(str(self.column))).first()
return self._build_result(
success=(unexpected_count == 0),
observed_value={"min": stats[0], "max": stats[1]},
element_count=total,
unexpected_count=unexpected_count,
unexpected_percent=pct,
details={"min_value": min_val, "max_value": max_val},
)
# ---------------------------------------------------------------------------
# 5. expect_column_values_to_be_in_set
# ---------------------------------------------------------------------------
[docs]
@register_expectation
@dataclass
class ExpectColumnValuesToBeInSet(Expectation):
"""Expect every value in a column to be a member of a given set."""
expectation_type: str = field(init=False, default="expect_column_values_to_be_in_set")
def _validate_pandas(self, df: pd.DataFrame) -> ExpectationResult:
value_set = set(self.kwargs.get("value_set", []))
series = df[self.column].dropna()
total = len(series)
unexpected_mask = ~series.isin(value_set)
unexpected_count = int(unexpected_mask.sum())
pct = (unexpected_count / total * 100) if total > 0 else 0.0
unexpected_vals = series[unexpected_mask].unique().tolist()[:20]
return self._build_result(
success=(unexpected_count == 0),
observed_value={"unique_values": series.nunique()},
element_count=total,
unexpected_count=unexpected_count,
unexpected_percent=pct,
unexpected_values=unexpected_vals,
details={"value_set": list(value_set)},
)
def _validate_spark(self, df: Any) -> ExpectationResult:
from pyspark.sql import functions as F
value_set = list(self.kwargs.get("value_set", []))
col = F.col(str(self.column))
filtered = df.filter(col.isNotNull())
total = filtered.count()
unexpected_count = filtered.filter(~col.isin(value_set)).count()
pct = (unexpected_count / total * 100) if total > 0 else 0.0
return self._build_result(
success=(unexpected_count == 0),
element_count=total,
unexpected_count=unexpected_count,
unexpected_percent=pct,
details={"value_set": value_set},
)
# ---------------------------------------------------------------------------
# 6. expect_column_values_to_not_be_in_set
# ---------------------------------------------------------------------------
[docs]
@register_expectation
@dataclass
class ExpectColumnValuesToNotBeInSet(Expectation):
"""Expect no value in a column to be a member of the given set."""
expectation_type: str = field(init=False, default="expect_column_values_to_not_be_in_set")
def _validate_pandas(self, df: pd.DataFrame) -> ExpectationResult:
forbidden = set(self.kwargs.get("value_set", []))
series = df[self.column].dropna()
total = len(series)
unexpected_mask = series.isin(forbidden)
unexpected_count = int(unexpected_mask.sum())
pct = (unexpected_count / total * 100) if total > 0 else 0.0
unexpected_vals = series[unexpected_mask].unique().tolist()[:20]
return self._build_result(
success=(unexpected_count == 0),
element_count=total,
unexpected_count=unexpected_count,
unexpected_percent=pct,
unexpected_values=unexpected_vals,
details={"forbidden_set": list(forbidden)},
)
def _validate_spark(self, df: Any) -> ExpectationResult:
from pyspark.sql import functions as F
forbidden = list(self.kwargs.get("value_set", []))
col = F.col(str(self.column))
filtered = df.filter(col.isNotNull())
total = filtered.count()
unexpected_count = filtered.filter(col.isin(forbidden)).count()
pct = (unexpected_count / total * 100) if total > 0 else 0.0
return self._build_result(
success=(unexpected_count == 0),
element_count=total,
unexpected_count=unexpected_count,
unexpected_percent=pct,
details={"forbidden_set": forbidden},
)
# ---------------------------------------------------------------------------
# 7. expect_column_values_to_match_regex
# ---------------------------------------------------------------------------
[docs]
@register_expectation
@dataclass
class ExpectColumnValuesToMatchRegex(Expectation):
"""Expect column values to match a given regular expression."""
expectation_type: str = field(init=False, default="expect_column_values_to_match_regex")
def _validate_pandas(self, df: pd.DataFrame) -> ExpectationResult:
regex = self.kwargs.get("regex", ".*")
series = df[self.column].dropna().astype(str)
total = len(series)
pattern = re.compile(regex)
match_mask = series.apply(lambda x: bool(pattern.search(x)))
unexpected_count = int((~match_mask).sum())
pct = (unexpected_count / total * 100) if total > 0 else 0.0
unexpected_vals = series[~match_mask].tolist()[:20]
return self._build_result(
success=(unexpected_count == 0),
element_count=total,
unexpected_count=unexpected_count,
unexpected_percent=pct,
unexpected_values=unexpected_vals,
details={"regex": regex},
)
def _validate_spark(self, df: Any) -> ExpectationResult:
from pyspark.sql import functions as F
regex = self.kwargs.get("regex", ".*")
col = F.col(str(self.column))
filtered = df.filter(col.isNotNull())
total = filtered.count()
unexpected_count = filtered.filter(~col.cast("string").rlike(regex)).count()
pct = (unexpected_count / total * 100) if total > 0 else 0.0
return self._build_result(
success=(unexpected_count == 0),
element_count=total,
unexpected_count=unexpected_count,
unexpected_percent=pct,
details={"regex": regex},
)
# ---------------------------------------------------------------------------
# 8. expect_column_values_to_be_of_type
# ---------------------------------------------------------------------------
[docs]
@register_expectation
@dataclass
class ExpectColumnValuesToBeOfType(Expectation):
"""Expect a column's dtype to match the expected type string."""
expectation_type: str = field(init=False, default="expect_column_values_to_be_of_type")
def _validate_pandas(self, df: pd.DataFrame) -> ExpectationResult:
expected_type = self.kwargs.get("expected_type", "")
actual_type = str(df[self.column].dtype)
success = expected_type.lower() in actual_type.lower()
return self._build_result(
success=success,
observed_value=actual_type,
details={"expected_type": expected_type, "actual_type": actual_type},
)
def _validate_spark(self, df: Any) -> ExpectationResult:
expected_type = self.kwargs.get("expected_type", "")
actual_type = str(df.schema[self.column].dataType)
success = expected_type.lower() in actual_type.lower()
return self._build_result(
success=success,
observed_value=actual_type,
details={"expected_type": expected_type, "actual_type": actual_type},
)
# ---------------------------------------------------------------------------
# 9. expect_column_values_to_be_dateutil_parseable
# ---------------------------------------------------------------------------
[docs]
@register_expectation
@dataclass
class ExpectColumnValuesToBeDateutilParseable(Expectation):
"""Expect column values to be parseable as dates."""
expectation_type: str = field(init=False, default="expect_column_values_to_be_dateutil_parseable")
def _validate_pandas(self, df: pd.DataFrame) -> ExpectationResult:
series = df[self.column].dropna()
total = len(series)
converted = pd.to_datetime(series, errors="coerce")
null_after = int(converted.isnull().sum())
pct = (null_after / total * 100) if total > 0 else 0.0
bad_vals = series[converted.isnull()].tolist()[:20]
return self._build_result(
success=(null_after == 0),
element_count=total,
unexpected_count=null_after,
unexpected_percent=pct,
unexpected_values=bad_vals,
details={"unparseable_count": null_after},
)
# ---------------------------------------------------------------------------
# 10. expect_column_value_lengths_to_be_between
# ---------------------------------------------------------------------------
[docs]
@register_expectation
@dataclass
class ExpectColumnValueLengthsToBeBetween(Expectation):
"""Expect string lengths in a column to be within [min_value, max_value]."""
expectation_type: str = field(init=False, default="expect_column_value_lengths_to_be_between")
def _validate_pandas(self, df: pd.DataFrame) -> ExpectationResult:
min_len = self.kwargs.get("min_value", 0)
max_len = self.kwargs.get("max_value", float("inf"))
series = df[self.column].dropna().astype(str)
total = len(series)
lengths = series.str.len()
unexpected_mask = (lengths < min_len) | (lengths > max_len)
unexpected_count = int(unexpected_mask.sum())
pct = (unexpected_count / total * 100) if total > 0 else 0.0
unexpected_vals = series[unexpected_mask].tolist()[:20]
return self._build_result(
success=(unexpected_count == 0),
observed_value={
"min_length": int(lengths.min()) if total > 0 else None,
"max_length": int(lengths.max()) if total > 0 else None,
},
element_count=total,
unexpected_count=unexpected_count,
unexpected_percent=pct,
unexpected_values=unexpected_vals,
details={"min_value": min_len, "max_value": max_len},
)
def _validate_spark(self, df: Any) -> ExpectationResult:
from pyspark.sql import functions as F
min_len = self.kwargs.get("min_value", 0)
max_len = self.kwargs.get("max_value", float("inf"))
col = F.col(str(self.column))
filtered = df.filter(col.isNotNull())
total = filtered.count()
length_col = F.length(col.cast("string"))
unexpected_count = filtered.filter((length_col < min_len) | (length_col > max_len)).count()
pct = (unexpected_count / total * 100) if total > 0 else 0.0
return self._build_result(
success=(unexpected_count == 0),
element_count=total,
unexpected_count=unexpected_count,
unexpected_percent=pct,
details={"min_value": min_len, "max_value": max_len},
)
# ---------------------------------------------------------------------------
# 11. expect_column_max_to_be_between
# ---------------------------------------------------------------------------
[docs]
@register_expectation
@dataclass
class ExpectColumnMaxToBeBetween(Expectation):
"""Expect the maximum value of a column to be between min_value and max_value."""
expectation_type: str = field(init=False, default="expect_column_max_to_be_between")
def _validate_pandas(self, df: pd.DataFrame) -> ExpectationResult:
min_val = self.kwargs.get("min_value")
max_val = self.kwargs.get("max_value")
col_max = df[self.column].max()
success = True
if min_val is not None and col_max < min_val:
success = False
if max_val is not None and col_max > max_val:
success = False
return self._build_result(
success=success,
observed_value=col_max,
details={"min_value": min_val, "max_value": max_val, "column_max": col_max},
)
def _validate_spark(self, df: Any) -> ExpectationResult:
from pyspark.sql import functions as F
min_val = self.kwargs.get("min_value")
max_val = self.kwargs.get("max_value")
col_max = df.agg(F.max(str(self.column))).first()[0]
success = True
if min_val is not None and col_max < min_val:
success = False
if max_val is not None and col_max > max_val:
success = False
return self._build_result(
success=success,
observed_value=col_max,
details={"min_value": min_val, "max_value": max_val, "column_max": col_max},
)
# ---------------------------------------------------------------------------
# 12. expect_column_min_to_be_between
# ---------------------------------------------------------------------------
[docs]
@register_expectation
@dataclass
class ExpectColumnMinToBeBetween(Expectation):
"""Expect the minimum value of a column to be between min_value and max_value."""
expectation_type: str = field(init=False, default="expect_column_min_to_be_between")
def _validate_pandas(self, df: pd.DataFrame) -> ExpectationResult:
min_val = self.kwargs.get("min_value")
max_val = self.kwargs.get("max_value")
col_min = df[self.column].min()
success = True
if min_val is not None and col_min < min_val:
success = False
if max_val is not None and col_min > max_val:
success = False
return self._build_result(
success=success,
observed_value=col_min,
details={"min_value": min_val, "max_value": max_val, "column_min": col_min},
)
def _validate_spark(self, df: Any) -> ExpectationResult:
from pyspark.sql import functions as F
min_val = self.kwargs.get("min_value")
max_val = self.kwargs.get("max_value")
col_min = df.agg(F.min(str(self.column))).first()[0]
success = True
if min_val is not None and col_min < min_val:
success = False
if max_val is not None and col_min > max_val:
success = False
return self._build_result(
success=success,
observed_value=col_min,
details={"min_value": min_val, "max_value": max_val, "column_min": col_min},
)
# ---------------------------------------------------------------------------
# 13. expect_column_mean_to_be_between
# ---------------------------------------------------------------------------
[docs]
@register_expectation
@dataclass
class ExpectColumnMeanToBeBetween(Expectation):
"""Expect the mean value of a numeric column to fall within bounds."""
expectation_type: str = field(init=False, default="expect_column_mean_to_be_between")
def _validate_pandas(self, df: pd.DataFrame) -> ExpectationResult:
min_val = self.kwargs.get("min_value")
max_val = self.kwargs.get("max_value")
col_mean = float(df[self.column].mean())
success = True
if min_val is not None and col_mean < min_val:
success = False
if max_val is not None and col_mean > max_val:
success = False
return self._build_result(
success=success,
observed_value=round(col_mean, 4),
details={
"min_value": min_val,
"max_value": max_val,
"column_mean": round(col_mean, 4),
},
)
def _validate_spark(self, df: Any) -> ExpectationResult:
from pyspark.sql import functions as F
min_val = self.kwargs.get("min_value")
max_val = self.kwargs.get("max_value")
col_mean = df.agg(F.mean(str(self.column))).first()[0]
success = True
if min_val is not None and col_mean < min_val:
success = False
if max_val is not None and col_mean > max_val:
success = False
return self._build_result(
success=success,
observed_value=round(col_mean, 4) if col_mean else None,
details={"min_value": min_val, "max_value": max_val},
)
# ---------------------------------------------------------------------------
# 14. expect_column_stdev_to_be_between
# ---------------------------------------------------------------------------
[docs]
@register_expectation
@dataclass
class ExpectColumnStdevToBeBetween(Expectation):
"""Expect the standard deviation of a column to fall within bounds."""
expectation_type: str = field(init=False, default="expect_column_stdev_to_be_between")
def _validate_pandas(self, df: pd.DataFrame) -> ExpectationResult:
min_val = self.kwargs.get("min_value")
max_val = self.kwargs.get("max_value")
col_std = float(df[self.column].std())
success = True
if min_val is not None and col_std < min_val:
success = False
if max_val is not None and col_std > max_val:
success = False
return self._build_result(
success=success,
observed_value=round(col_std, 4),
details={
"min_value": min_val,
"max_value": max_val,
"column_stdev": round(col_std, 4),
},
)
# ---------------------------------------------------------------------------
# 15. expect_column_distinct_values_to_be_in_set
# ---------------------------------------------------------------------------
[docs]
@register_expectation
@dataclass
class ExpectColumnDistinctValuesToBeInSet(Expectation):
"""Expect all distinct values in a column to be in the given set."""
expectation_type: str = field(init=False, default="expect_column_distinct_values_to_be_in_set")
def _validate_pandas(self, df: pd.DataFrame) -> ExpectationResult:
value_set = set(self.kwargs.get("value_set", []))
actual_values = set(df[self.column].dropna().unique().tolist())
unexpected = actual_values - value_set
total_distinct = len(actual_values)
return self._build_result(
success=(len(unexpected) == 0),
observed_value={"distinct_values": list(actual_values)[:20]},
element_count=total_distinct,
unexpected_count=len(unexpected),
unexpected_percent=((len(unexpected) / total_distinct * 100) if total_distinct > 0 else 0.0),
unexpected_values=list(unexpected)[:20],
details={"value_set": list(value_set)},
)
def _validate_spark(self, df: Any) -> ExpectationResult:
pass
value_set = set(self.kwargs.get("value_set", []))
row_list = df.select(self.column).distinct().collect()
actual_values = {row[0] for row in row_list if row[0] is not None}
unexpected = actual_values - value_set
return self._build_result(
success=(len(unexpected) == 0),
observed_value={"distinct_values": list(actual_values)[:20]},
unexpected_count=len(unexpected),
unexpected_values=list(unexpected)[:20],
details={"value_set": list(value_set)},
)
# ---------------------------------------------------------------------------
# 16. expect_column_proportion_of_unique_values_to_be_between
# ---------------------------------------------------------------------------
[docs]
@register_expectation
@dataclass
class ExpectColumnProportionOfUniqueValuesToBeBetween(Expectation):
"""Expect the proportion of unique values in a column to fall within bounds."""
expectation_type: str = field(
init=False,
default="expect_column_proportion_of_unique_values_to_be_between",
)
def _validate_pandas(self, df: pd.DataFrame) -> ExpectationResult:
min_val = self.kwargs.get("min_value", 0.0)
max_val = self.kwargs.get("max_value", 1.0)
series = df[self.column].dropna()
total = len(series)
unique_count = series.nunique()
proportion = (unique_count / total) if total > 0 else 0.0
success = min_val <= proportion <= max_val
return self._build_result(
success=success,
observed_value=round(proportion, 4),
element_count=total,
details={
"unique_count": unique_count,
"total_count": total,
"proportion": round(proportion, 4),
"min_value": min_val,
"max_value": max_val,
},
)