# Copyright (C) 2021 - 2024 ANSYS, Inc. and/or its affiliates.
# SPDX-License-Identifier: MIT
#
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import io
import typing
import ansys.dyna.core
from ansys.dyna.core.lib.encrypted_keyword import EncryptedKeyword
from ansys.dyna.core.lib.format_type import format_type
from ansys.dyna.core.lib.import_handler import ImportContext, ImportHandler
from ansys.dyna.core.lib.keyword_base import KeywordBase
[docs]
class DeckLoaderResult:
"""A class containing the result of an attempted deck load."""
def __init__(self):
self._unprocessed_keywords = []
[docs]
def add_unprocessed_keyword(self, name):
self._unprocessed_keywords.append(name)
[docs]
def get_summary(self) -> str:
summary = io.StringIO()
for unprocessed_keyword in self._unprocessed_keywords:
summary.write(f"Failed to process: {unprocessed_keyword}\n")
return summary.getvalue()
def _get_kwd_class_and_format(keyword_name: str) -> str:
# handle spaces in keyword_name, such as
# *ELEMENT_SOLID (ten nodes format) => *ELEMENT_SOLID
# the spaces are used as hints for LSPP but not needed
# by the dyna solver
from ansys.dyna.core.keywords.keyword_classes.type_mapping import TypeMapping
keyword_name = keyword_name.split()[0]
title_tokens = keyword_name.split("_")
# Handling title is a hack right now. Should be able to find the correct
# keyword object type given any prefix and suffix options
if keyword_name.endswith("-"):
format = format_type.standard
keyword_name = keyword_name[:-1]
elif keyword_name.endswith("+"):
format = format_type.long
keyword_name = keyword_name[:-1]
else:
format = format_type.default
keyword_object_type = TypeMapping.get(keyword_name, None)
while keyword_object_type is None:
if len(title_tokens) == 0:
break
title_tokens = title_tokens[:-1]
keyword_name = "_".join(title_tokens)
keyword_object_type = TypeMapping.get(keyword_name, None)
return keyword_object_type, format
def _update_iterstate(line: str):
if line.startswith("*KEYWORD"):
return IterState.KEYWORD_BLOCK
if line.startswith("*TITLE"):
return IterState.TITLE
if line.startswith("*END"):
return IterState.END
if "BEGIN PGP MESSAGE" in line:
return IterState.ENCRYPTED
return IterState.KEYWORDS
def _update_deck_format(block: typing.List[str], deck: "ansys.dyna.core.deck.Deck") -> None:
assert len(block) == 1
line = block[0].upper()
if "LONG" in line:
format_setter = line[line.find("LONG") + 4 :].strip()
tokens = format_setter.split("=")
assert len(tokens) >= 2
format = tokens[1]
if format == "S":
deck.format = format_type.default
if format == "K":
deck.format = format_type.standard
if format == "Y":
deck.format = format_type.long
def _update_deck_comment(block: typing.List[str], deck: "ansys.dyna.core.deck.Deck") -> None:
def remove_comment_symbol(line: str):
if not line.startswith("$"):
raise Exception("Only comments can precede *KEYWORD")
return line[1:]
block_without_comment_symbol = [remove_comment_symbol(line) for line in block]
deck.comment_header = "\n".join(block_without_comment_symbol)
def _before_import(
block: typing.List[str],
keyword: str,
keyword_data: str,
import_handlers: typing.List[ImportHandler],
context: ImportContext,
) -> bool:
if len(import_handlers) == 0:
return True
assert context != None
s = io.StringIO()
s.write(keyword_data)
s.seek(0)
for handler in import_handlers:
if not handler.before_import(context, keyword, s):
return False
s.seek(0)
return True
def _update_deck_title(block: typing.List[str], deck: "ansys.dyna.core.deck.Deck") -> None:
block = [line for line in block if not line.startswith("$")]
assert len(block) == 2, "Title block can only have one line"
deck.title = block[1]
def _on_error(error, import_handlers: typing.List[ImportHandler]):
for handler in import_handlers:
handler.on_error(error)
def _after_import(keyword, import_handlers: typing.List[ImportHandler], context: ImportContext):
for handler in import_handlers:
handler.after_import(context, keyword)
def _handle_keyword(
block: typing.List[str],
deck: "ansys.dyna.core.deck.Deck",
import_handlers: typing.List[ImportHandler],
result: DeckLoaderResult,
context: ImportContext,
) -> None:
keyword = block[0].strip()
keyword_data = "\n".join(block)
do_import = _before_import(block, keyword, keyword_data, import_handlers, context)
if not do_import:
return
keyword_object_type, format = _get_kwd_class_and_format(keyword)
if keyword_object_type == None:
result.add_unprocessed_keyword(keyword)
deck.append(keyword_data)
_after_import(keyword_data, import_handlers, context)
else:
import ansys.dyna.core.keywords
keyword_object: KeywordBase = getattr(ansys.dyna.core.keywords, keyword_object_type)()
if format == format_type.default:
format = deck.format
keyword_object.format = format
try:
keyword_object.loads(keyword_data, deck.parameters)
deck.append(keyword_object)
_after_import(keyword_object, import_handlers, context)
except Exception as e:
_on_error(e, import_handlers)
result.add_unprocessed_keyword(keyword)
deck.append(keyword_data)
_after_import(keyword_data, import_handlers, context)
def _handle_block(
iterstate: int,
deck: "ansys.dyna.core.deck.Deck",
block: typing.List[str],
import_handlers: typing.List[ImportHandler],
result: DeckLoaderResult,
context: ImportContext,
) -> bool:
if iterstate == IterState.END:
return True
if iterstate == IterState.USERCOMMENT:
_update_deck_comment(block, deck)
elif iterstate == IterState.KEYWORD_BLOCK:
_update_deck_format(block, deck)
elif iterstate == IterState.TITLE:
_update_deck_title(block, deck)
else:
_handle_keyword(block, deck, import_handlers, result, context)
return False
def _try_load_deck_from_buffer(
deck: "ansys.dyna.core.deck.Deck",
buffer: typing.TextIO,
result: DeckLoaderResult,
context: typing.Optional[ImportContext],
import_handlers: typing.List[ImportHandler],
) -> None:
iterstate = IterState.USERCOMMENT
block = []
encrypted_section = None
while True:
close_previous_block = False
try:
line = buffer.readline()
if len(line) == 0:
_handle_block(iterstate, deck, block, import_handlers, result, context)
return
line = line.rstrip("\n")
if line.startswith("*"):
close_previous_block = True
if "BEGIN PGP MESSAGE" in line:
close_previous_block = True
encrypted_section = io.StringIO()
if "END PGP MESSAGE" in line:
kwd = EncryptedKeyword()
kwd.data = encrypted_section.getvalue()
encrypted_section = None
deck.append(kwd)
_after_import(kwd, import_handlers, context)
break
if close_previous_block:
# handle the previous block
end = _handle_block(iterstate, deck, block, import_handlers, result, context)
if end:
return
# set the new iterstate, start building the next block
iterstate = _update_iterstate(line)
block = [line]
else:
if iterstate == IterState.ENCRYPTED:
encrypted_section.write(line)
encrypted_section.write("\n")
if iterstate == IterState.KEYWORD_BLOCK:
# reset back to user comment after the keyword line?
iterstate = IterState.USERCOMMENT
block = []
else:
block.append(line)
except StopIteration:
_handle_block(iterstate, deck, block, import_handlers, result, context)
return
[docs]
def load_deck(
deck: "ansys.dyna.core.deck.Deck",
text: str,
context: typing.Optional[ImportContext],
import_handlers: typing.List[ImportHandler],
) -> DeckLoaderResult:
result = DeckLoaderResult()
buffer = io.StringIO()
buffer.write(text)
buffer.seek(0)
_try_load_deck_from_buffer(deck, buffer, result, context, import_handlers)
return result
[docs]
def load_deck_from_buffer(
deck: "ansys.dyna.core.deck.Deck",
buffer: typing.TextIO,
context: typing.Optional[ImportContext],
import_handlers: typing.List[ImportHandler],
) -> DeckLoaderResult:
result = DeckLoaderResult()
_try_load_deck_from_buffer(deck, buffer, result, context, import_handlers)
return result