# Pogona
# Copyright (C) 2020 Data Communications and Networking (TKN), TU Berlin
#
# This file is part of Pogona.
#
# Pogona is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pogona is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Pogona. If not, see <https://www.gnu.org/licenses/>.
from abc import ABCMeta, abstractmethod
from typing import List, Optional, Set
import os
import re
import numpy as np
import logging
import pogona as pg
import pogona.properties as prop
LOG = logging.getLogger(__name__)
[docs]class Object(pg.Component, metaclass=ABCMeta):
object_id = prop.IntProperty(-1, required=False)
"""Object index, set by the scene manager."""
# TODO: hide from user
translation = prop.VectorProperty([0, 0, 0], required=True)
rotation = prop.VectorProperty([0, 0, 0], required=True)
scale = prop.VectorProperty([1, 1, 1], required=True)
openfoam_cases_path = prop.StrProperty("", required=True)
"""
Path to all OpenFOAM simulation cases.
Subclasses may use this as the base path to find their respective
meshes via `get_path()`.
"""
use_sensor_subscriptions = prop.EnumProperty(
str(pg.SensorSubscriptionsUsage.USE_DEFAULT.name),
name='use_sensor_subscriptions',
required=False,
enum_class=pg.SensorSubscriptionsUsage,
)
"""
Notify subscribed sensors based on object mesh cells.
This speeds up simulation.
Only disable this if you are constrained by memory or if the object
does not have a mesh in the first place.
If None, use the SensorManager's default_use_sensor_subscriptions.
"""
dummy_boundary_points = prop.EnumProperty(
str(pg.DummyBoundaryPointsVariant.NONE.name),
name='dummy_boundary_points',
required=False,
enum_class=pg.DummyBoundaryPointsVariant,
)
"""
Whether to insert dummy points at the boundary of the mesh,
and where exactly.
This may help with interpolation.
"""
[docs] def __init__(self):
super().__init__()
self._vector_field_manager: Optional['pg.VectorFieldManager'] = None
"""This object's VectorFieldManager, set by child classes."""
self._walls_patch_names: Set[str] = {
'walls',
'yConnectorPatch',
'tubePatch'
}
"""
A superset of names of wall patches.
Required by the VectorFieldParser:
"Which patches to consider as boundaries
(typically does not include inlets and outlets)."
"""
self._transformation: Optional['pg.Transformation'] = None
"""
Transformation of this object in the scene.
"""
self.inlets: List[str] = []
"""Names of the inlets in the OpenFOAM mesh."""
self.outlets: List[str] = []
"""Names of the outlets in the OpenFOAM mesh."""
self.name: str = "Generic Object"
"""
The name of this type of object.
Not a unique identifier like Component.component_name!
"""
self._is_active = True
"""
If True, indicates that fluid inside this Object is moving.
"""
self._default_time_str: str = "latest"
"""
Default sub-folder of the OpenFOAM simulation results to use.
If "latest", will search for the sub-folder which name is a
floating point number and has the greatest value.
"""
self._ignored_arguments.update({
# These arguments are passed to all components present in the
# scene. A (mesh) object won't need them, but we also don't
# want to see warnings if these arguments are passed anyway:
'geometry',
'shape',
'visualization_scale',
})
[docs] def initialize(
self,
simulation_kernel: 'pg.SimulationKernel',
init_stage: 'pg.InitStages'
):
super().initialize(simulation_kernel, init_stage)
if init_stage == pg.InitStages.CHECK_ARGUMENTS:
self._transformation = pg.Transformation(
translation=np.array(self.translation),
rotation=np.array(self.rotation),
scaling=np.array(self.scale)
)
if init_stage == pg.InitStages.CREATE_DATA_STRUCTURES:
self.load_current_vector_field(simulation_kernel)
if init_stage == pg.InitStages.BUILD_SCENE:
simulation_kernel.get_scene_manager().add_object(self)
@property
@abstractmethod
def _openfoam_cases_subpath(self):
"""
In which folder of `openfoam_cases_path` to find the cases for this
Object class.
`get_path()` will be relative to
`<openfoam_cases_path>/<openfoam_cases_subpath>`.
"""
pass
@property
def is_active(self):
return self._is_active
@property
def walls_patch_names(self):
return self._walls_patch_names
[docs] def get_vector_field_manager(self) -> Optional['pg.VectorFieldManager']:
if not self._is_active:
return None
return self._vector_field_manager
[docs] def get_current_mesh_local(self) -> Optional[np.ndarray]:
if not self._is_active:
return None
return self._vector_field_manager.get_cell_centres_local()
[docs] def get_current_mesh_global(self) -> Optional[np.ndarray]:
if not self._is_active:
return None
return self._vector_field_manager.get_cell_centres_global()
[docs] def get_closest_cell_centre_id(
self,
position_global: np.ndarray
) -> Optional[np.ndarray]:
"""
:returns: Array of closest cell centers to `position`.
If there's only one position, output is squeezed.
None if this Object is inactive.
See documentation of cKDTree.query().
"""
if not self.is_active:
return None
return self._vector_field_manager.get_closest_cell_centre_id(
position_global=position_global
)
[docs] def get_flow(
self,
simulation_kernel: 'pg.SimulationKernel',
position_global: np.ndarray,
sim_time: float
):
if not self._is_active:
return np.array([0, 0, 0], dtype=np.float)
return self._vector_field_manager.get_flow_by_position(
simulation_kernel=simulation_kernel,
position_global=position_global
)
[docs] def load_current_vector_field(
self,
simulation_kernel: 'pg.SimulationKernel'
):
if not self._is_active:
# This saves us the extra step of having to load a mesh
# with all 0s.
return
old_mesh_size = (
len(self._vector_field_manager.get_mesh())
if self._vector_field_manager is not None
else None
)
vector_field = simulation_kernel.get_mesh_manager().load_vector_field(
openfoam_sim_path=self.get_path(),
mesh_index=self.get_mesh_index(),
walls_patch_names=self._walls_patch_names,
dummy_boundary_points=pg.DummyBoundaryPointsVariant[
self.dummy_boundary_points],
)
new_mesh_size = (
len(self._vector_field_manager.get_mesh())
if self._vector_field_manager is not None
else None
)
if (old_mesh_size is not None
and new_mesh_size is not None
and old_mesh_size != new_mesh_size):
# Comparison will only apply to states of this Object in which
# self._is_active was True.
raise AssertionError(
f"Object {self.name} loaded a new vector field, "
f"but the new mesh ({new_mesh_size} cells) doesn't match "
f"the old one ({old_mesh_size} cells)."
)
self._vector_field_manager = pg.VectorFieldManager(
vector_field=vector_field,
transformation=self._transformation
)
[docs] @abstractmethod
def get_mesh_index(self) -> str:
"""
:returns: A unique string for the current configuration of this object.
Used for caching.
"""
pass
[docs] @abstractmethod
def get_fallback_mesh_index(self) -> Optional[str]:
"""
If an Object supports sensor subscriptions, some OpenFOAM case
with a non-zero flow rate should exist (i.e., a vector field for
which this Object has `self.is_active == True`).
If this object is initially inactive, however, we still need to know
where to find this mesh for the initialization of the
SensorManager.
:returns: A valid mesh index independent of this Object's
`is_active` status.
None if this Object does not support sensor subscriptions.
"""
pass
[docs] def get_path(self, use_latest_time_step=False, fallback=False) -> str:
"""
:param use_latest_time_step: Find the latest time step in path.
If False, use this Object's `_default_time_str`,
which may still be set to 'latest', which has the same effect.
:param fallback: Use `get_fallback_mesh_index` instead of
`get_mesh_index`.
:return: Path to the OpenFOAM files for this object.
"""
"""
:param use_latest_time_step: Find the latest time step in path
:return: Path to the OpenFOAM files for this object.
"""
if not self._is_active and not fallback:
return ''
mesh_index = (
self.get_mesh_index() if not fallback
else self.get_fallback_mesh_index()
)
if fallback and mesh_index is None:
raise ValueError(
"Tried to call `get_path` with the fallback flag enabled, "
"but this Object does not have a fallback mesh index."
)
path = os.path.join(
self.openfoam_cases_path,
self._openfoam_cases_subpath,
mesh_index
)
if use_latest_time_step or self._default_time_str == 'latest':
return os.path.join(path, self.find_latest_time_step(path))
return os.path.join(path, self._default_time_str)
[docs] @abstractmethod
def process_changed_inlet_flow_rate(
self,
simulation_kernel: 'pg.SimulationKernel',
inlet_name: str,
flow_rate: float
):
"""
Update the flow rate inside this Object based on the changed flow
rate of the inlet with name `inlet_name`,
then propagate this change to any Object connected to this Object's
outlets.
Should also update `is_active` accordingly.
"""
pass
[docs] @abstractmethod
def get_outlet_area(self, outlet_name: str) -> (
'pg.Geometry',
'pg.Transformation'
):
pass
[docs] @staticmethod
def find_latest_time_step(path: str) -> str:
"""
:return: The highest-valued subdirectory of path that is named
only after a floating point value.
E.g., if there are '{path}/0/' and '{path}/0.1/', this will return
'0.1'.
"""
# Float pattern from
# https://docs.python.org/3/library/re.html#simulating-scanf:
p = re.compile(r'^[-+]?(\d+(\.\d*)?|\.\d+)([eE][-+]?\d+)?$')
all_files_and_dirs = os.listdir(path)
matching_files_and_dirs = [f for f in all_files_and_dirs if p.match(f)]
if len(matching_files_and_dirs) == 0:
raise ValueError(
f"Could not find a subfolder for the latest time step "
f"in \"{path}\"."
)
return max(matching_files_and_dirs, key=lambda x: float(x))