# Copyright (C) 2023 - 2026 ANSYS, Inc. and/or its affiliates.
# SPDX-License-Identifier: MIT
#
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Validation framework for PyDYNA keywords and decks."""
from abc import ABC, abstractmethod
import collections
from enum import Enum
import logging
from typing import TYPE_CHECKING, Callable, List
from ansys.dyna.core.pre.errors import (
DuplicateIDError,
DuplicateKeywordError,
RequiredFieldError,
ValidationError,
)
if TYPE_CHECKING:
from ansys.dyna.core.lib.deck import Deck
[docs]
logger = logging.getLogger(__name__)
class ValidationSeverity(Enum):
"""Severity levels for validation rules."""
ERROR = "error"
WARNING = "warning"
INFO = "info"
class ValidationResult:
"""Result of a validation operation."""
def __init__(self):
"""Initialize an empty validation result."""
self.errors: List[ValidationError] = []
self.warnings: List[ValidationError] = []
self.info: List[ValidationError] = []
def add_error(self, error: ValidationError) -> None:
"""Add an error to the result.
Parameters
----------
error : ValidationError
The validation error to add.
"""
if error.severity == ValidationSeverity.ERROR.value:
self.errors.append(error)
elif error.severity == ValidationSeverity.WARNING.value:
self.warnings.append(error)
else:
self.info.append(error)
def has_errors(self) -> bool:
"""Check if there are any errors.
Returns
-------
bool
True if there are errors, False otherwise.
"""
return len(self.errors) > 0
def has_warnings(self) -> bool:
"""Check if there are any warnings.
Returns
-------
bool
True if there are warnings, False otherwise.
"""
return len(self.warnings) > 0
@property
def is_valid(self) -> bool:
"""Check if validation passed (no errors).
Returns
-------
bool
True if no errors, False otherwise.
"""
return not self.has_errors()
def get_summary(self) -> str:
"""Get a summary of validation results.
Returns
-------
str
Summary string with counts of errors, warnings, and info messages.
"""
parts = []
if self.errors:
parts.append(f"{len(self.errors)} error(s)")
if self.warnings:
parts.append(f"{len(self.warnings)} warning(s)")
if self.info:
parts.append(f"{len(self.info)} info message(s)")
return ", ".join(parts) if parts else "All validations passed"
def raise_if_errors(self) -> None:
"""Raise an exception if there are any errors.
Raises
------
ValidationError
If there are any errors in the result.
"""
if self.has_errors():
msg_parts = [f"Validation failed with {len(self.errors)} error(s):"]
for error in self.errors[:10]: # Limit to first 10 errors
msg_parts.append(f" - {str(error)}")
if len(self.errors) > 10:
msg_parts.append(f" ... and {len(self.errors) - 10} more")
raise ValidationError("\n".join(msg_parts))
class Validator(ABC):
"""Base class for all validators."""
def __init__(self, severity: ValidationSeverity = ValidationSeverity.ERROR):
"""Initialize the validator.
Parameters
----------
severity : ValidationSeverity
Severity level for violations found by this validator.
"""
self.severity = severity
@abstractmethod
def validate(self, deck: "Deck", result: ValidationResult) -> None:
"""Validate the deck.
Parameters
----------
deck : Deck
The deck to validate.
result : ValidationResult
Result object to accumulate validation errors.
"""
pass
@abstractmethod
def get_name(self) -> str:
"""Get the name of this validator.
Returns
-------
str
Validator name for logging and registration.
"""
pass
class RequiredFieldValidator(Validator):
"""Validator that checks for required fields in keywords."""
def __init__(self, keyword_pattern: str, field_name: str, severity: ValidationSeverity = ValidationSeverity.ERROR):
"""Initialize the required field validator.
Parameters
----------
keyword_pattern : str
Pattern to match keyword types (e.g., "DEFINE_CURVE" matches all DEFINE_CURVE_* keywords).
field_name : str
Name of the required field.
severity : ValidationSeverity
Severity level for violations.
"""
super().__init__(severity)
self.keyword_pattern = keyword_pattern
self.field_name = field_name
def get_name(self) -> str:
"""Get the name of this validator."""
return f"RequiredField[{self.keyword_pattern}.{self.field_name}]"
def validate(self, deck: "Deck", result: ValidationResult) -> None:
"""Validate that required field exists and is not None.
Parameters
----------
deck : Deck
The deck to validate.
result : ValidationResult
Result object to accumulate validation errors.
"""
logger.debug(f"Running {self.get_name()} validator")
checked_count = 0
for kwd in deck._keywords:
if isinstance(kwd, str):
continue # Skip string keywords
# Match by keyword property or class name (remove underscores for comparison)
matches = False
if hasattr(kwd, "keyword"):
# For "DEFINE_CURVE", match if keyword+subkeyword contains the pattern
full_keyword = f"{kwd.keyword}_{kwd.subkeyword}" if hasattr(kwd, "subkeyword") else kwd.keyword
matches = (
self.keyword_pattern == full_keyword
or self.keyword_pattern == kwd.keyword
or self.keyword_pattern in full_keyword
)
if not matches:
# Fallback to class name matching (remove underscores)
kwd_type_normalized = type(kwd).__name__.replace("_", "")
pattern_normalized = self.keyword_pattern.replace("_", "")
matches = pattern_normalized.upper() in kwd_type_normalized.upper()
if matches:
checked_count += 1
if hasattr(kwd, self.field_name):
value = getattr(kwd, self.field_name)
if value is None:
error = RequiredFieldError(kwd, self.field_name)
error.severity = self.severity.value
result.add_error(error) # add_error auto-sorts by severity
logger.warning(f"Required field validation failed: {error}")
logger.debug(f"{self.get_name()} checked {checked_count} keywords")
class UniqueIDValidator(Validator):
"""Validator that checks for unique ID fields across keyword types."""
def __init__(self, keyword_type: str, field_name: str, severity: ValidationSeverity = ValidationSeverity.ERROR):
"""Initialize the unique ID validator.
Parameters
----------
keyword_type : str
Exact keyword type name (e.g., "SECTION").
field_name : str
Name of the ID field that must be unique.
severity : ValidationSeverity
Severity level for violations.
"""
super().__init__(severity)
self.keyword_type = keyword_type
self.field_name = field_name
def get_name(self) -> str:
"""Get the name of this validator."""
return f"UniqueID[{self.keyword_type}.{self.field_name}]"
def validate(self, deck: "Deck", result: ValidationResult) -> None:
"""Validate that ID field is unique across keywords of the specified type.
Parameters
----------
deck : Deck
The deck to validate.
result : ValidationResult
Result object to accumulate validation errors.
"""
logger.debug(f"Running {self.get_name()} validator")
ids = []
keywords = deck[self.keyword_type] # Returns KeywordCollection
for kwd in keywords:
if not hasattr(kwd, self.field_name):
logger.warning(f"Keyword type {self.keyword_type} does not have field {self.field_name}")
continue
id_value = getattr(kwd, self.field_name)
if id_value is not None: # Only check non-None values
ids.append(id_value)
duplicates = [id_val for id_val, count in collections.Counter(ids).items() if count > 1]
if duplicates:
error = DuplicateIDError(self.keyword_type, self.field_name, duplicates)
error.severity = self.severity.value
result.add_error(error) # add_error auto-sorts by severity
logger.warning(f"Unique ID validation failed: {error}")
logger.debug(f"{self.get_name()} checked {len(keywords)} keywords, found {len(duplicates)} duplicates")
class KeywordValidator(Validator):
"""Validator that calls _is_valid() on all keywords."""
def __init__(self, severity: ValidationSeverity = ValidationSeverity.ERROR):
"""Initialize the keyword validator.
Parameters
----------
severity : ValidationSeverity
Severity level for violations.
"""
super().__init__(severity)
def get_name(self) -> str:
"""Get the name of this validator."""
return "KeywordValid"
def validate(self, deck: "Deck", result: ValidationResult) -> None:
"""Validate all keywords using their _is_valid() method.
Parameters
----------
deck : Deck
The deck to validate.
result : ValidationResult
Result object to accumulate validation errors.
"""
logger.debug(f"Running {self.get_name()} validator")
checked_count = 0
for kwd in deck._keywords:
if isinstance(kwd, str):
continue # Skip string keywords
checked_count += 1
is_valid, msg = kwd._is_valid()
if not is_valid:
error = ValidationError(f"{kwd} is not valid due to {msg}")
error.severity = self.severity.value
result.add_error(error) # add_error auto-sorts by severity
logger.warning(f"Keyword validation failed: {error}")
logger.debug(f"{self.get_name()} checked {checked_count} keywords")
# Keywords that should appear at most once in a deck.
# This list covers common CONTROL keywords and other singleton keywords.
# Users can write custom validators if additional keywords need uniqueness checks.
[docs]
GLOBALLY_UNIQUE_KEYWORDS = frozenset(
[
("CONTROL", "ACCURACY"),
("CONTROL", "BULK_VISCOSITY"),
("CONTROL", "CONTACT"),
("CONTROL", "CPU"),
("CONTROL", "DYNAMIC_RELAXATION"),
("CONTROL", "ENERGY"),
("CONTROL", "HOURGLASS"),
("CONTROL", "IMPLICIT_AUTO"),
("CONTROL", "IMPLICIT_DYNAMICS"),
("CONTROL", "IMPLICIT_GENERAL"),
("CONTROL", "IMPLICIT_SOLUTION"),
("CONTROL", "IMPLICIT_SOLVER"),
("CONTROL", "OUTPUT"),
("CONTROL", "PARALLEL"),
("CONTROL", "SHELL"),
("CONTROL", "SOLID"),
("CONTROL", "SOLUTION"),
("CONTROL", "TERMINATION"),
("CONTROL", "THERMAL_SOLVER"),
("CONTROL", "THERMAL_TIMESTEP"),
("CONTROL", "TIMESTEP"),
]
)
class GloballyUniqueKeywordValidator(Validator):
"""Validator that checks for keywords that should appear at most once in a deck."""
def __init__(self, severity: ValidationSeverity = ValidationSeverity.ERROR):
"""Initialize the globally unique keyword validator.
Parameters
----------
severity : ValidationSeverity
Severity level for violations.
"""
super().__init__(severity)
def get_name(self) -> str:
"""Get the name of this validator."""
return "GloballyUniqueKeyword"
def validate(self, deck: "Deck", result: ValidationResult) -> None:
"""Validate that globally unique keywords appear at most once.
Parameters
----------
deck : Deck
The deck to validate.
result : ValidationResult
Result object to accumulate validation errors.
"""
logger.debug(f"Running {self.get_name()} validator")
keyword_counts: dict = {}
for kwd in deck._keywords:
if isinstance(kwd, str):
continue
key = (getattr(kwd, "keyword", None), getattr(kwd, "subkeyword", None))
if key in GLOBALLY_UNIQUE_KEYWORDS:
keyword_counts[key] = keyword_counts.get(key, 0) + 1
duplicates_found = 0
for (keyword_type, subkeyword), count in keyword_counts.items():
if count > 1:
duplicates_found += 1
error = DuplicateKeywordError(keyword_type, subkeyword, count)
error.severity = self.severity.value
result.add_error(error)
logger.warning(f"Globally unique keyword validation failed: {error}")
logger.debug(
f"{self.get_name()} checked {len(keyword_counts)} unique keyword types, found {duplicates_found} duplicates"
)
class CustomValidator(Validator):
"""Validator that wraps a custom validation function."""
def __init__(
self,
name: str,
func: Callable[["Deck", ValidationResult], None],
severity: ValidationSeverity = ValidationSeverity.ERROR,
):
"""Initialize a custom validator.
Parameters
----------
name : str
Name of the validator.
func : Callable
Validation function that takes (deck, result) and adds errors to result.
severity : ValidationSeverity
Default severity level for violations.
"""
super().__init__(severity)
self._name = name
self.func = func
def get_name(self) -> str:
"""Get the name of this validator."""
return self._name
def validate(self, deck: "Deck", result: ValidationResult) -> None:
"""Run the custom validation function.
Parameters
----------
deck : Deck
The deck to validate.
result : ValidationResult
Result object to accumulate validation errors.
"""
logger.debug(f"Running custom validator: {self._name}")
self.func(deck, result)
class ValidatorRegistry:
"""Registry for managing validators."""
def __init__(self):
"""Initialize the validator registry."""
self._validators: List[Validator] = []
self._default_validators_registered = False
def register(self, validator: Validator) -> None:
"""Register a validator.
Parameters
----------
validator : Validator
The validator to register.
"""
logger.debug(f"Registering validator: {validator.get_name()}")
self._validators.append(validator)
def register_custom(
self,
name: str,
func: Callable[["Deck", ValidationResult], None],
severity: ValidationSeverity = ValidationSeverity.ERROR,
) -> None:
"""Register a custom validation function.
Parameters
----------
name : str
Name of the validator.
func : Callable
Validation function that takes (deck, result) and adds errors to result.
severity : ValidationSeverity
Default severity level for violations.
"""
validator = CustomValidator(name, func, severity)
self.register(validator)
def unregister(self, validator_name: str) -> bool:
"""Unregister a validator by name.
Parameters
----------
validator_name : str
Name of the validator to remove.
Returns
-------
bool
True if validator was found and removed, False otherwise.
"""
original_length = len(self._validators)
self._validators = [v for v in self._validators if v.get_name() != validator_name]
removed = len(self._validators) < original_length
if removed:
logger.debug(f"Unregistered validator: {validator_name}")
return removed
def clear(self) -> None:
"""Remove all validators from the registry."""
logger.debug(f"Clearing {len(self._validators)} validators")
self._validators.clear()
self._default_validators_registered = False
def get_all(self) -> List[Validator]:
"""Get all registered validators.
Returns
-------
List[Validator]
List of all registered validators.
"""
return self._validators.copy()
def validate(self, deck: "Deck") -> ValidationResult:
"""Run all registered validators on a deck.
Parameters
----------
deck : Deck
The deck to validate.
Returns
-------
ValidationResult
Result containing all errors, warnings, and info messages.
"""
result = ValidationResult()
logger.info(f"Running {len(self._validators)} validators on deck")
for validator in self._validators:
try:
validator.validate(deck, result)
except Exception as e:
logger.error(f"Validator {validator.get_name()} raised exception: {e}", exc_info=True)
error = ValidationError(
f"Validator '{validator.get_name()}' failed with exception: {e}",
severity=ValidationSeverity.ERROR.value,
)
result.add_error(error)
logger.info(f"Validation complete: {result.get_summary()}")
return result
def register_default_validators(self) -> None:
"""Register the default set of validators for LS-DYNA decks."""
if self._default_validators_registered:
logger.debug("Default validators already registered, skipping")
return
logger.info("Registering default validators")
# Keyword _is_valid() validator (legacy check)
self.register(KeywordValidator(ValidationSeverity.ERROR))
# Required field validators for DEFINE_CURVE_* keywords
self.register(RequiredFieldValidator("DEFINE_CURVE", "lcid", ValidationSeverity.ERROR))
# Required field validators for SECTION_* keywords
self.register(RequiredFieldValidator("SECTION", "secid", ValidationSeverity.ERROR))
# Unique ID validators
self.register(UniqueIDValidator("SECTION", "secid", ValidationSeverity.ERROR))
# Globally unique keyword validator (e.g., CONTROL_TIMESTEP should appear at most once)
self.register(GloballyUniqueKeywordValidator(ValidationSeverity.ERROR))
# Note: We don't enforce unique lcid for DEFINE_CURVE because multiple curves
# can legally share the same ID in some LS-DYNA workflows
self._default_validators_registered = True
logger.info(f"Registered {len(self._validators)} default validators")