Source code for pogona.sensor_counting

# Pogona
# Copyright (C) 2020 Data Communications and Networking (TKN), TU Berlin
#
# This file is part of Pogona.
#
# Pogona is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pogona is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Pogona.  If not, see <https://www.gnu.org/licenses/>.

import os.path
import csv
import logging

import pogona as pg
import pogona.properties as prop

LOG = logging.getLogger(__name__)


[docs]class SensorCounting(pg.Sensor): log_folder = prop.StrProperty("sensor_data", required=False) """The file `sensor[<component name>].csv` will be created in here."""
[docs] def __init__(self): super().__init__() self._counts = 0 """Number of molecules in this sensor so far in this time step.""" self._csv_writer = None self._csv_file = None
[docs] def initialize( self, simulation_kernel: 'pg.SimulationKernel', init_stage: 'pg.InitStages' ): super().initialize(simulation_kernel, init_stage) if init_stage == pg.InitStages.CREATE_FOLDERS: os.makedirs( os.path.join(simulation_kernel.results_dir, self.log_folder), exist_ok=True ) elif init_stage == pg.InitStages.CREATE_FILES: name = ( self.id if self.component_name == "Generic component" else self.component_name ) self._csv_file = open( os.path.join( simulation_kernel.results_dir, self.log_folder, f"sensor[{name}].csv" ), mode='w' ) fieldnames = ['sim_time', 'molecule_count'] self._csv_writer = csv.writer( self._csv_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL ) self._csv_writer.writerow(fieldnames)
[docs] def finalize(self, simulation_kernel: 'pg.SimulationKernel'): self._csv_file.close()
[docs] def process_molecule_moving_after( self, simulation_kernel: 'pg.SimulationKernel', molecule: 'pg.Molecule' ): if self.is_inside_sensor_zone(position_global=molecule.position): self._counts = self._counts + 1
[docs] def process_new_time_step( self, simulation_kernel: 'pg.SimulationKernel', notification_stage: 'pg.NotificationStages', ): if notification_stage != pg.NotificationStages.LOGGING: return LOG.debug(f"Sensor {self.id}: {self._counts} molecules in zone.") # TODO(jdrees): Is the sim_time off by one time step? # Investigate and refactor if necessary self._csv_writer.writerow([ simulation_kernel.get_simulation_time(), self._counts ]) self._counts = 0