Source code for pogona.modulation_ook

# Pogona
# Copyright (C) 2020 Data Communications and Networking (TKN), TU Berlin
#
# This file is part of Pogona.
#
# Pogona is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pogona is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Pogona.  If not, see <https://www.gnu.org/licenses/>.

import pogona as pg
import pogona.properties as prop
import numpy as np
import logging
from typing import Optional

LOG = logging.getLogger(__name__)


[docs]class ModulationOOK(pg.Modulation): injection_volume = prop.FloatProperty(0, required=False) """ Injection volume for each bit in litres. If this is given, `injection_duration` should be 0 as it will be determined by the pump model. In this case, `pause_duration` will be equal to the cycle duration and independent of how long an injection takes. Take care to not let injections overlap. """ injection_duration = prop.FloatProperty(0, required=False) """ Duration of one injection in seconds. If a bit stream is given, the injection_duration specifies the duration of the injection for each individual bit. """ pause_duration = prop.FloatProperty(0, required=False) """ Time in seconds to wait *after* an injection (with `injection_duration`) before injecting the next pulse or, if `ascii_sequence` or `bit_sequence` is given, the time to wait before injecting the next bit. """ attached_injector = prop.ComponentReferenceProperty( '', required=True, can_be_empty=False, ) """Component name of the attached injector.""" attached_destructor = prop.ComponentReferenceProperty( '', required=False, can_be_empty=True, ) """ Component name of the attached destructor. """ attached_pump = prop.ComponentReferenceProperty( '', required=True, can_be_empty=False, ) """ Component name of the attached pump. A pump is required for coordinating the timing of an injection. Consider using an ObjectPumpTimed if you are not dealing with injection volumes or fluid vector fields in general. """ use_burst = prop.BoolProperty(False, required=False) """ By default, the attached injector is turned on in every time step. If use_burst is True, however, it will only be turned on at the beginning of an injection. """
[docs] def __init__(self): super().__init__() self._attached_injector: Optional['pg.Injector'] = None self._attached_destructor: Optional['pg.SensorDestructing'] = None self._attached_pump: Optional['pg.objects.ObjectPumpVolume'] = None self._is_injecting = False self._most_recent_pulse_number = -1
[docs] def initialize( self, simulation_kernel: 'pg.SimulationKernel', init_stage: 'pg.InitStages' ): super().initialize(simulation_kernel, init_stage) if init_stage == pg.InitStages.CHECK_ARGUMENTS: # Half-mandatory arguments: if (self.attached_injector not in simulation_kernel.get_components()): raise ValueError( "No injector component with the name " f"{self.attached_injector} attached to the simulation " f"kernel could be found." ) if self.attached_pump not in simulation_kernel.get_components(): raise ValueError( "No pump component with the name " f"\"{self.attached_pump}\" attached to the simulation " "kernel could be found.\n" "Available components are: " + ", ".join(simulation_kernel.get_components()) ) if (self.attached_destructor != "" and self.attached_destructor not in simulation_kernel.get_components()): raise ValueError( "No destructor component with the name " f"\"{self.attached_destructor}\" attached to the " f"simulation kernel could be found." ) elif init_stage == pg.InitStages.BUILD_SCENE: if (self.attached_destructor is not None and self.attached_destructor != ""): self._attached_destructor = simulation_kernel.get_components()[ self.attached_destructor ] self._attached_injector = simulation_kernel.get_components()[ self.attached_injector ] self._attached_pump = simulation_kernel.get_components()[ self.attached_pump ]
@property def symbol_duration(self): return self.injection_duration + self.pause_duration def _start_injection( self, simulation_kernel: 'pg.SimulationKernel', ): self._attached_pump.start_injection( simulation_kernel=simulation_kernel, )
[docs] def process_new_time_step( self, simulation_kernel: 'pg.SimulationKernel', notification_stage: 'pg.NotificationStages', ): if notification_stage != pg.NotificationStages.MODULATION: return if (self._start_time == np.inf or simulation_kernel.get_simulation_time() > self._start_time + self._bitstream_duration): # We have finished transmitting the stream, nothing to do return pulse_number = int(np.floor( (simulation_kernel.get_simulation_time() - self._start_time) / self.symbol_duration )) """ 0-based index of the pulse for the current sim_time. If `self.bit_sequence` is set, this include pulses for which the sequence has a value of 0. """ pulse_beginning = ( self._start_time + pulse_number * self.symbol_duration ) if ( pulse_number >= len(self._bitstream) or self._bitstream[pulse_number] == '0' ): return if (pulse_beginning <= simulation_kernel.get_simulation_time() # Prevent restarting during an injection: and not self._is_injecting # Prevent restarting after an injection has ended: and pulse_number > self._most_recent_pulse_number): # Then start a new injection: self._is_injecting = True self._most_recent_pulse_number = pulse_number if not self.use_burst: # TODO: write pytest w/ and w/o burst self._attached_injector.turn_on() else: self._attached_injector.inject_burst() if self._attached_destructor is not None: self._attached_destructor.turn_off() self._start_injection( simulation_kernel=simulation_kernel, ) # Check if this is the last time step of an injection (!= cycle): if self._is_injecting and not self._attached_pump.is_active: self._is_injecting = False if not self.use_burst: self._attached_injector.turn_off() else: if self._attached_destructor is not None: self._attached_destructor.turn_on()
# self._attached_pump will process this time step after all # Modulation instances have done the same.