# Pogona
# Copyright (C) 2020 Data Communications and Networking (TKN), TU Berlin
#
# This file is part of Pogona.
#
# Pogona is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pogona is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Pogona. If not, see <https://www.gnu.org/licenses/>.
import pogona as pg
import pogona.properties as prop
import logging
import numpy as np
from typing import Optional, List, cast
LOG = logging.getLogger(__name__)
[docs]class Injector(pg.Component):
"""
An injector spawns particles within its defined volume in every time
step in which it is turned on.
Usually an injector is controlled by an instance of
ModulationOOK or a similar component, which determines when
to switch the injector on or off and which also controls the flow
speed in connected objects.
"""
shape = prop.EnumProperty(
default='NONE',
name='shape',
required=True,
enum_class=pg.Shapes,
)
"""
Shape of this injector.
Valid shapes are defined in the `pg.Shapes` enum.
"""
translation = prop.VectorProperty([0, 0, 0], required=True)
rotation = prop.VectorProperty([0, 0, 0], required=True)
scale = prop.VectorProperty([1, 1, 1], required=True)
attached_object = prop.ComponentReferenceProperty(
"",
required=False,
can_be_empty=True,
)
"""
Component name of the attached object.
This has to be set if you want newly spawned particles to be influenced
by a vector field!
"""
injection_amount = prop.IntProperty(0, required=False)
"""
*Number* of molecules spawned in every
time step while the injector is active.
"""
seed = prop.StrProperty('', required=False)
"""
Seed for the pseudo-random number generator.
If empty, the random number generator of the simulation kernel
will be used.
If 'random', a random seed will be used for initialization.
Will be converted to int otherwise.
"""
[docs] def __init__(self):
super().__init__()
self._attached_object: Optional['pg.Object'] = None
self._turned_on = False
self._burst_on = False
self._rng = np.random.RandomState(seed=None)
self._geometry: Optional['pg.Geometry'] = None
"""Geometry of this injector."""
self._transformation: Optional['pg.Transformation'] = None
"""Transformation of this injector in the scene."""
[docs] def initialize(
self,
simulation_kernel: 'pg.SimulationKernel',
init_stage: 'pg.InitStages'
):
super().initialize(simulation_kernel, init_stage)
if init_stage == pg.InitStages.CHECK_ARGUMENTS:
if self.seed == 'random':
self._rng = np.random.RandomState(seed=None)
elif self.seed != '':
self._rng = np.random.RandomState(seed=int(self.seed))
else:
self._rng = simulation_kernel.get_random_number_generator()
self._geometry = pg.Geometry(shape=pg.Shapes[self.shape])
self._transformation = pg.Transformation(
translation=np.array(self.translation),
rotation=np.array(self.rotation),
scaling=np.array(self.scale)
)
if (self.attached_object != ''
and self.attached_object
not in simulation_kernel.get_components()):
raise ValueError(
"No object component with the name "
f"{self.attached_object} attached to the simulation "
"kernel could be found."
)
elif init_stage == pg.InitStages.BUILD_SCENE:
self._attached_object = cast(
'pg.Object',
simulation_kernel.get_components()[self.attached_object]
)
[docs] def turn_on(self):
"""
Turn on the injector and start spawning particles.
Usually called in the NotificationStages.MODULATION stage,
so before SPAWNING in the current time step.
:return:
"""
# Start injecting molecules every time step
self._turned_on = True
[docs] def turn_off(self):
# Stop injecting molecules
self._turned_on = False
[docs] def inject_burst(self):
# Inject molecules in the next time step only
self._burst_on = True
[docs] def set_geometry(self, geometry: pg.Geometry):
self._geometry = geometry
[docs] def generate_points_local(self) -> List[np.ndarray]:
return [np.array((0, 0, 0)) for _ in range(self.injection_amount)]
[docs] def process_new_time_step(
self,
simulation_kernel: 'pg.SimulationKernel',
notification_stage: 'pg.NotificationStages',
):
if notification_stage != pg.NotificationStages.SPAWNING:
return
if self._turned_on or self._burst_on:
# Only inject bursts once
self._burst_on = False
LOG.debug(f"Injecting {self.injection_amount} new molecules")
if self._geometry.shape == pg.Shapes.POINT:
points_local = self.generate_points_local()
elif self._geometry.shape == pg.Shapes.CUBE:
points_local = pg.util.get_random_points_in_cube_local(
n=self.injection_amount,
rng=self._rng,
)
else:
points_local = pg.util.get_random_points_in_geometry_local(
n=self.injection_amount,
geometry=self._geometry,
rng=self._rng,
)
new_points_global = self._transformation.apply_to_points(
points_local
)
# TODO: transformation should probably be applied earlier!
# see issue #160
for new_point_global in new_points_global:
injected_molecule = pg.Molecule(
position=new_point_global,
velocity=np.zeros(3),
object_id=self._attached_object.object_id,
)
simulation_kernel.get_molecule_manager().add_molecule(
injected_molecule
)