Source code for pogona.modulation

# Pogona
# Copyright (C) 2020 Data Communications and Networking (TKN), TU Berlin
# This file is part of Pogona.
# Pogona is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# Pogona is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with Pogona.  If not, see <>.

import pogona as pg
import numpy as np
import logging
from typing import Optional, Callable
from abc import ABCMeta, abstractmethod

LOG = logging.getLogger(__name__)

[docs]class Modulation(pg.Component, metaclass=ABCMeta):
[docs] def __init__(self): super().__init__() self._calling_bitstream_generator: Optional[ 'pg.BitstreamGenerator'] = None self._is_transmitting = False """ Whether or not a bitstream is currently being transmitted """ self._start_time: float = np.inf """ Simulation time in seconds at which we started transmitting the current bitstream. Initially set to infinity so that nothing will be transmitted before the first call to transmit_bitstream(). """ self._bitstream: str = "" """ Currently transmitting bit stream """ self._bitstream_duration: float = 0 """ Duration of the currently transmitting bit stream in seconds """ self._finish_callback: Optional[Callable] = None """ Function that should get called upon transmission finish """
@property @abstractmethod def symbol_duration(self): pass
[docs] def transmit_bitstream( self, bitstream: str, simulation_kernel: 'pg.SimulationKernel', finish_callback: Optional[Callable]): if self._is_transmitting: raise AssertionError("Trying to transmit two bitstreams at once") self._is_transmitting = True self._start_time = simulation_kernel.get_simulation_time() self._bitstream = bitstream self._finish_callback = lambda: finish_callback(simulation_kernel) self._bitstream_duration = self.symbol_duration * len(bitstream)
def _finish_transmission(self): self._is_transmitting = False if self._finish_callback is not None: self._finish_callback()