Source code for pogona.sensor_manager

# Pogona
# Copyright (C) 2020 Data Communications and Networking (TKN), TU Berlin
#
# This file is part of Pogona.
#
# Pogona is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pogona is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Pogona.  If not, see <https://www.gnu.org/licenses/>.

from typing import List
import logging
import enum
import numpy as np

import pogona as pg
import pogona.properties as prop

LOG = logging.getLogger(__name__)


[docs]class SensorSubscriptionsUsage(enum.Enum): ENABLED = 1 DISABLED = 2 USE_DEFAULT = 3
[docs]class SensorManager(pg.Component): default_use_sensor_subscriptions = prop.BoolProperty(True, required=False) """ Notify subscribed sensors based on object mesh cells. This speeds up simulation. Only disable this if you are constrained by memory. Individual objects can override this setting. """ use_range_queries = prop.BoolProperty(True, required=False) """ When determining possible sensor subscriptions, execute range queries on the Object's k-d tree instead of iterating the entire mesh. Use this if all sensors occupy a relatively small volume compared to each mesh. Performance may degrade for large sensors. """ component_name = prop.StrProperty("sensor_manager", required=False) """Since this component is not created by the config, choose a name"""
[docs] def __init__(self): super().__init__() self._sensors: List[pg.Sensor] = [] self._sensor_subscriptions: List[List[List[int]]] = [] """Indexable by object ID, cell ID; holds a list of sensor IDs"""
[docs] def initialize( self, simulation_kernel: 'pg.SimulationKernel', init_stage: 'pg.InitStages' ): super().initialize(simulation_kernel, init_stage) if init_stage == pg.InitStages.CREATE_SENSOR_SUBSCRIPTIONS: # Initialize the subscription array for i, obj in enumerate( simulation_kernel.get_scene_manager().get_all_objects()): if obj.object_id != len(self._sensor_subscriptions): raise AssertionError( "Creating the sensor subscription array failed " f"because the ID of object \"{obj.component_name}\" " f"= {obj.object_id}, which has index {i} in the list " "of objects, does not match the length of the " "list of sensor subscriptions " f"= {len(self._sensor_subscriptions)}. " "Maybe it was somehow attached twice?" ) # Ensure that there is a mesh to add subscriptions to, # even if the object is inactive: fallback_mesh_index = obj.get_fallback_mesh_index() if not obj.is_active and fallback_mesh_index is not None: # Try to temporarily load a fallback mesh, if it exists. LOG.debug( "Ensure that a mesh exists even if object is " "inactive by loading a fallback mesh for " f"object \"{obj.component_name}\". " f"{fallback_mesh_index=}" ) vector_field = ( simulation_kernel.get_mesh_manager().load_vector_field( openfoam_sim_path=obj.get_path(fallback=True), mesh_index=fallback_mesh_index, walls_patch_names=obj.walls_patch_names, dummy_boundary_points=obj.dummy_boundary_points, ) ) vfm = pg.VectorFieldManager( vector_field=vector_field, transformation=obj.get_transformation() ) mesh_global = vfm.get_cell_centres_global() else: # Object is likely active, or it doesn't support # subscriptions. # In the latter case, mesh_global will be None. mesh_global = obj.get_current_mesh_global() if (mesh_global is None or not self._uses_sensor_subscriptions(obj)): # The object does not support subscriptions. self._sensor_subscriptions.append([]) else: # Ensure that for every mesh cell, # we have an array of subscriptions ready self._sensor_subscriptions.append( [[] for _ in range(len(mesh_global))]) # Write subscriptions to the array for sensor in self._sensors: if self.use_range_queries: # Use k-d tree range queries to determine possible # cells cell_ids = self._get_mesh_subset_ids( obj, sensor) LOG.debug( f"Range query subset of size " f"{len(cell_ids)}. " f"Original size: {len(mesh_global)} " f"while subscribing sensor " f"\"{sensor.component_name}\" to object " f"\"{obj.component_name}\"." ) else: # Just ask for all cell ids cell_ids = [[] for _ in range(len(mesh_global))] for cell_id in cell_ids: cell_centre = mesh_global[cell_id] if sensor.is_inside_sensor_zone(cell_centre): self._sensor_subscriptions[obj.object_id][ cell_id].append(sensor.sensor_id)
[docs] def register_sensor( self, sensor: 'pg.Sensor' ): """ :param sensor: :return: nothing """ sensor.sensor_id = len(self._sensors) self._sensors.append(sensor) LOG.debug(f"Registered new sensor \"{sensor.component_name}\"")
def _uses_sensor_subscriptions(self, obj: 'pg.Object'): """ :return: True iff obj is supposed to use sensor subscriptions. obj.use_sensor_subscriptions has precedence over self.default_use_sensor_subscriptions. """ return obj.is_active and ( ( obj.use_sensor_subscriptions == pg.SensorSubscriptionsUsage.USE_DEFAULT and self.default_use_sensor_subscriptions ) or ( obj.use_sensor_subscriptions == pg.SensorSubscriptionsUsage.ENABLED ) ) def _get_mesh_subset_ids(self, obj: "pg.Object", sensor: 'pg.Sensor'): # Query radius: All shapes are confined to a box with # side lengths 1 centered around (0, 0, 0). # Use the distance to one of the corners of this box # as the radius: radius = np.sqrt( (sensor.transformation.scaling[0] / 2) ** 2 + (sensor.transformation.scaling[1] / 2) ** 2 + (sensor.transformation.scaling[2] / 2) ** 2 ) return obj.get_vector_field_manager().kd_tree_global.query_ball_point( # Center point of this sensor: x=sensor.transformation.translation, # Query radius: r=radius ) def _get_subscribed_sensors( self, simulation_kernel: 'pg.SimulationKernel', molecule: 'pg.Molecule', ): """ :param simulation_kernel: :param molecule: :return: List of sensors subscribed to the current cell of the given molecule if sensor subscriptions are active, otherwise returns the list of all sensors. """ # Just use all sensors if there is no related object_id if molecule.object_id is None: return self._sensors obj = simulation_kernel.get_scene_manager().get_all_objects()[ molecule.object_id] if self._uses_sensor_subscriptions(obj) and len( self._sensor_subscriptions[molecule.object_id]) > 0: # Find out which sensors are subscribed to this particular cell subscribed_sensor_ids = ( self._sensor_subscriptions[molecule.object_id][ molecule.cell_id] ) return [ self._sensors[sensor_id] for sensor_id in subscribed_sensor_ids ] else: # Just notify all sensors. # Either we do not want to use sensor subscriptions at all, # or the object in which the molecule is does not provide a mesh. return self._sensors
[docs] def process_molecule_moving_before( self, simulation_kernel: 'pg.SimulationKernel', molecule: 'pg.Molecule', ): """ Called before updating the position of a particle within a time step. Useful for SensorTeleporting, for example, which should consider the position of particles right after they have been spawned. :param simulation_kernel: :param molecule: :return: """ subscribed_sensors = self._get_subscribed_sensors( simulation_kernel=simulation_kernel, molecule=molecule, ) for sensor in subscribed_sensors: sensor.process_molecule_moving_before( simulation_kernel=simulation_kernel, molecule=molecule, )
[docs] def process_molecule_moving_after( self, simulation_kernel: 'pg.SimulationKernel', molecule: 'pg.Molecule', ): """ Called after the position of a particle has been updated. Useful for updating regular sensors. :param simulation_kernel: :param molecule: :return: """ subscribed_sensors = self._get_subscribed_sensors( simulation_kernel=simulation_kernel, molecule=molecule, ) for sensor in subscribed_sensors: sensor.process_molecule_moving_after( simulation_kernel=simulation_kernel, molecule=molecule, )