Source code for opticomlib.typing

"""
.. rubric:: Classes
.. autosummary::

    global_variables
    binary_sequence
    electrical_signal
    optical_signal
    eye
"""

from numpy.fft import fft, ifft, fftfreq, fftshift, ifftshift
from pympler.asizeof import asizeof as sizeof

import numpy as np
from scipy.constants import c, pi
from scipy.ndimage import gaussian_filter
from scipy.special import expit
import scipy.signal as sg

import matplotlib.pyplot as plt
from matplotlib.collections import LineCollection

from typing import Literal, Any, Iterable

import warnings

from .utils import (
    str2array, 
    dbm, 
    si, 
    upfir,
    eyediagram,
    ComplexNumber,
    RealNumber,
    IntegerNumber,
    tic, toc,
)

from .logger import logging, HierLogger
logger = HierLogger(__name__)
INFO, DEBUG, WARNING = logging.INFO, logging.DEBUG, logging.WARNING

Array_Like = (list, tuple, np.ndarray)











class NULLType: 
    def __add__(self, other):  # n + null -> n
        return other
    __radd__ = __add__
    def __mul__(self, other):  # n * null -> 0
        return self
    __rmul__ = __mul__
    def __repr__(self):
        return "NULL"
    def __str__(self):
        return "NULL"
    def __sub__(self, other):
        return -other
    __rsub__ = __add__
    def __neg__(self):
        return self
    def __truediv__(self, other):
        return self
    __floordiv__ = __truediv__
    def __pow__(self, other):
        return self
    def __array_function__(self, func, types, args, kwargs):
        return self
    def __array_ufunc__(self, ufunc, method, *inputs, **kwargs):
        if method == '__call__' and not kwargs.get('out'):
            if ufunc == np.add or ufunc == np.subtract:
                lhs, _ = inputs
                return lhs
        return self
    def __getattr__(self, name):
        # Return self so that any attribute access returns NULL
        # This also allows chaining: NULL.real.imag -> NULL
        return self
    def __call__(self, *args, **kwargs):
        # Return self so that any method call returns NULL
        # This handles cases like NULL.conj() -> NULL
        return self
NULL = NULLType()











[docs] @logger.auto_indent_methods class global_variables(): r"""**Global Variables (gv)** This object stores the simulation-wide parameters required across the pipeline. It keeps track of the sampling frequency, slot rate, samples per slot, number of simulated slots, optical wavelength/frequency, preferred plotting style, and logging verbosity. .. Note:: A slot is taken as the smallest time unit representing a binary value of the signal. For example, in PPM a bit is not the same as a slot. However, in OOK a bit and a slot are the same. This class doesn't need to be instantiated; it is already exposed as ``gv``. Use the :meth:`__call__` method (e.g. ``gv(**kwargs)``) to refresh parameters, update Matplotlib's style via ``plt_style`` or adjust the logger through ``verbose``. .. rubric:: Attributes .. autosummary:: ~global_variables.sps ~global_variables.R ~global_variables.fs ~global_variables.wavelength ~global_variables.f0 ~global_variables.N ~global_variables.dt ~global_variables.t ~global_variables.dw ~global_variables.w ~global_variables.plt_style ~global_variables.verbose .. rubric:: Methods .. autosummary:: __call__ print default Examples -------- >>> gv(R=10e9, sps=8, N=100).print() :: ------------------------------ *** Global Variables *** ------------------------------ sps : 8 R : 1.00e+10 fs : 8.00e+10 λ0 : 1.55e-06 f0 : 1.93e+14 N : 100 dt : 1.25e-11 t : [0.00e+00 1.25e-11 2.50e-11 ... 9.97e-09 9.99e-09 1.00e-08] dw : 6.28e+08 Config ------ plt_style : "fast" verbose : None Also can be define new variables trough \*\*kwargs. If at least two of this arguments (``sps``, ``fs`` and ``R``) are not provided a warning will be raised and the default values will be used. >>> gv(alpha=0.5, beta=0.3).print() :: ------------------------------ *** Global Variables *** ------------------------------ sps : 16 R : 1.00e+09 fs : 1.60e+10 λ0 : 1.55e-06 f0 : 1.93e+14 N : 128 dt : 6.25e-11 t : [0.00e+00 6.25e-11 1.25e-10 ... 1.28e-07 1.28e-07 1.28e-07] dw : 4.91e+07 Config ------ plt_style : "fast" verbose : None Custom ------ alpha : 0.5 beta : 0.3 """ def __init__(self): self.sps = 16 """Number of samples per slot, ``16`` by default.""" self.R = 1e9 """Slot rate in Hz, ``1e9`` by default.""" self.fs = self.R*self.sps """Sampling frequency in Samples/s, ``R*sps=16e9`` by default.""" self.dt = 1/self.fs """Time step in seconds, ``1/fs=62.5e-12`` by default.""" self.wavelength = 1550e-9 """Optical communication central wavelength in meters, ``1550e-9`` by default.""" self.f0 = c/self.wavelength """Optical communication central frequency in Hz, ``c/wavelength=193.4e12`` by default.""" self.N = 128 """Number of slots to simulate (128 by default).""" self.t = np.linspace(0, self.N*self.sps*self.dt, self.N*self.sps, endpoint=True) """Time array in seconds""" self.dw = 2*pi*self.fs/(self.N*self.sps) """Frequency step in rad/s""" self.w = 2*pi*fftshift(fftfreq(self.N*self.sps))*self.fs """Frequency array in rad/s""" self.plt_style = 'fast' """Matplotlib plot style, ``"fast"`` by default.""" plt.style.use(self.plt_style) self.verbose = None """Logging verbosity level, ``None`` by default. Can be set to ``DEBUG`` or 10, ``INFO`` or 20, ``WARNING`` or 30.""" def __str__(self): title = 3*'*' + ' Global Variables ' + 3*'*' sub = len(title)*'-' names = list(gv.__dict__.keys()) others = [name for name in names if name not in ['sps', 'R', 'fs', 'wavelength', 'f0', 'N', 'dt', 'dw', 't', 'w', 'plt_style', 'verbose']] msg = f'\n{sub}\n{title}\n{sub}\n\t' + \ f'sps : {self.sps}\n\t' + \ f'R : {self.R:.2e}\n\t' + \ f'fs : {self.fs:.2e}\n\t' + \ f'λ0 : {self.wavelength:.2e}\n\t' + \ f'f0 : {self.f0:.2e}\n\t' + \ f'N : {self.N}\n\t' + \ f'dt : {self.dt:.2e}\n\t' + \ f't : {self.t}\n\t' + \ f'dw : {self.dw:.2e}\n' msg += f' Config\n ------\n\t' + \ f'plt_style : "{self.plt_style}"\n\t' + \ f'verbose : {self.verbose}\n' if others: msg += ' Custom\n ------\n\t' + '\n\t'.join([f'{name} : {getattr(self, name)}' for name in others]) + '\n' return msg
[docs] def print(self): """ Prints the global variables in a formatted manner""" np.set_printoptions(precision=2, threshold=20) print(self)
[docs] def __call__( self, sps: int=None, R: float=None, fs: float=None, wavelength: float=1550e-9, N: int=None, plt_style : Literal['ggplot', 'bmh', 'dark_background', 'fast', 'default']='fast', verbose=None, **kargs ) -> Any: """ Configures the instance with the provided parameters. Parameters ---------- sps : int, optional Samples per slot. R : float, optional Rate in Hz. fs : float, optional Sampling frequency in Samples/s. wavelength : float, optional Wavelength in meters. Default is 1550e-9. N : int, optional Number of samples. plt_style : str, optional Matplotlib plot style. Default is "fast". verbose : int | None, optional Verbosity level for logging. **kargs : dict Additional custom parameters. Returns ------- gv The instance itself. Notes ----- In the absence of parameters, default values are used. Missing parameters are calculated from the provided ones, prioritizing the default value of **gv.R** when more than one of **sps**, **fs**, and **R** is not provided. """ logger.debug('setting gv()') if verbose is not None : self.verbose = verbose logger.logger.setLevel(self.verbose) if sps: self.sps = int(np.round(sps)) if R: self.R = R self.fs = R*self.sps elif fs: self.fs = fs self.R = fs/self.sps else: logger.warning("'R' set to default value (%.2e bits/s)", self.R) self.fs = self.R*self.sps elif R: self.R = R if fs: self.fs = fs self.sps = int(np.round(fs/R)) else: logger.warning("'sps' set to default value (%d S/bit)", self.sps) self.fs = R*self.sps elif fs: logger.warning("'R' set to default value (%.2e bits/s)", self.R) self.fs = fs self.sps = int(np.round(fs/self.R)) else: logger.warning("'sps', 'R' and 'fs' will be set to default values (%d S/bit, %.2e bits/s, %.2e Hz)", self.sps, self.R, self.fs) self.dt = 1/self.fs self.N = N if N is not None else self.N self._set_t_dw_w() self.wavelength = wavelength self.f0 = c/wavelength if plt_style != self.plt_style: self.plt_style = plt_style plt.rcdefaults() plt.style.use(self.plt_style) logger.info('Global variables set to, sps: %d, R: %.2e, fs: %.2e, N: %d, wavelength: %.2e', self.sps, self.R, self.fs, self.N, self.wavelength) if kargs: for key, value in kargs.items(): setattr(self, key, value) return self
def _set_t_dw_w(self): self.t = np.linspace(0, self.N*self.sps/self.fs, self.N*self.sps, endpoint=True) self.dw = 2*pi*self.fs/(self.N*self.sps) self.w = 2*pi*fftshift(fftfreq(self.N*self.sps))*self.fs
[docs] def default(self): """ Return all parameters to default values.""" logger.debug('resetting gv to default()') self.sps = 16 self.R = 1e9 self.fs = self.R*self.sps self.dt = 1/self.fs self.wavelength = 1550e-9 self.f0 = c/self.wavelength self.N = 128 self._set_t_dw_w() self.plt_style = 'fast' plt.rcdefaults() plt.style.use(self.plt_style) self.verbose = None # Reset logger level to default (NOTSET allows propagation to parent) logger.logger.setLevel(logging.NOTSET) attrs = [attr for attr in dir(gv) if not callable(getattr(gv, attr)) and not attr.startswith("__") and not (attr in ['sps', 'R', 'fs', 'dt', 'wavelength', 'f0', 'N', 't', 'w', 'dw', 'plt_style', 'verbose'])] logger.info('Global variables set to default, sps: %d, R: %.2e, fs: %.2e, N: %d, wavelength: %.2e', self.sps, self.R, self.fs, self.N, self.wavelength) for attr in attrs: delattr(self, attr) return self
gv = global_variables()
[docs] @logger.auto_indent_methods class binary_sequence(): r"""**Binary Sequence** This class provides methods and attributes to work with binary sequences. The binary sequence can be provided as a string, list, tuple, or numpy array. .. rubric:: Attributes .. autosummary:: ~binary_sequence.data ~binary_sequence.execution_time ~binary_sequence.ones ~binary_sequence.zeros ~binary_sequence.size ~binary_sequence.type ~binary_sequence.sizeof .. rubric:: Methods .. autosummary:: prbs print to_numpy flip hamming_distance dac plot .. table:: **Implemented Operators** :widths: 10 90 :align: center +--------------------------------+--------------------------------------------------------------+ | Operator | Description | +================================+==============================================================+ | ``~`` | ``~a`` NOT operation, bit by bit. | +--------------------------------+--------------------------------------------------------------+ | ``&`` | ``a & b`` AND operation, bit by bit | +--------------------------------+--------------------------------------------------------------+ | ``|`` | ``a | b`` OR operation, bit by bit | +--------------------------------+--------------------------------------------------------------+ | ``^`` | ``a ^ b`` XOR operation, bit by bit | +--------------------------------+--------------------------------------------------------------+ | ``+`` | ``a + b`` concatenate ``a ∪ b``; ``b + a`` concatenate | | | ``b ∪ a``. | +--------------------------------+--------------------------------------------------------------+ | ``*`` | ``a * n`` (n > 1 integer) repeats ``a`` n times; ``a * b`` | | | equivalent to ``&`` operator. | +--------------------------------+--------------------------------------------------------------+ | ``==`` | ``a == b`` compares elements, returning a ``binary_sequence``| | | mask of matches. | +--------------------------------+--------------------------------------------------------------+ | ``!=`` | ``a != b`` compares elements, returning a ``binary_sequence``| | | mask of differences. | +--------------------------------+--------------------------------------------------------------+ | ``[:]`` | ``a[i]`` returns the integer value at index ``i``; | | | ``a[i:j]`` returns a sliced ``binary_sequence``. | +--------------------------------+--------------------------------------------------------------+ | ``<``, ``<=``, ``>``, ``>=`` | Not implemented | | ``-``, ``/``, ``//``, ``<<``, | | | ``>>``, | | +--------------------------------+--------------------------------------------------------------+ """ def __init__(self, data: str | Iterable): logger.debug('%s.__init__(%s)', self.__class__.__name__, type(data).__name__) if isinstance(data, binary_sequence): data = data.data elif isinstance(data, str): data = str2array(data) else: data = np.array(data) if not np.all((data == 0) | (data == 1)): raise ValueError("The array must contain only 0's and 1's!") if data.ndim > 1: raise ValueError(f"Binary sequence must be 1D array, invalid shape {data.shape}") if data.ndim == 0 and data.size == 1: data = data[np.newaxis] self.data = data.astype(np.uint8) """The binary sequence data, a 1D numpy array of boolean values.""" self.execution_time = 0. """The execution time of the last operation performed on the binary sequence.""" def __str__(self, title: str=None): logger.debug('__str__()') if title is None: title = self.__class__.__name__ title = 3*'*' + f' {title} ' + 3*'*' sub = len(title)*'-' np.set_printoptions(precision=0, threshold=100) data = str(self.data) msg = f'\n{sub}\n{title}\n{sub}\n\t' + \ f'data : {data} (shape: {self.data.shape})\n\t' + \ f'ones : {self.ones}\n\t' + \ f'zeros : {self.zeros}\n\t' + \ f'size : {self.sizeof} bytes\n\t' + \ f'time : {si(self.execution_time, "s", 2)}\n' return msg def __repr__(self): logger.debug('__repr__()') np.set_printoptions(threshold=100) return f'binary_sequence({str(self.data)})' def __len__(self): logger.debug('__len__()') return self.size
[docs] def __array__(self, dtype=None): """Return the array representation of the binary sequence. This method provides the basic array conversion for NumPy compatibility. It returns the data. This is the fundamental protocol that allows the object to be converted to a NumPy array when needed, enabling direct use in NumPy functions that expect array-like objects. Unlike ``__array_ufunc__`` and ``__array_function__``, this method is called for basic array conversion and does not handle specific NumPy operations - it simply provides the underlying data as an array. Parameters ---------- dtype : np.dtype, optional Desired data type of the array. Returns ------- np.ndarray The array representation of the binary data. """ logger.debug('__array__()') arr = self.data return arr
[docs] def __getattr__(self, name): """Delegate attribute access to the underlying NumPy array for array-like methods. This method allows instances of ``binary_sequence`` to access NumPy array methods (like max, min, sum, etc.) directly as if they were arrays. If the requested attribute is a method or property of np.ndarray, it will be called on the array representation of this object. Parameters ---------- name : str The name of the attribute being accessed. Returns ------- result The result of calling the attribute on the array representation. Raises ------ AttributeError If the attribute is not found in np.ndarray. """ logger.debug('__getattr__(%s)', name) # Check if the attribute exists in np.ndarray and is not a private/internal attribute if hasattr(np.ndarray, name) and not name.startswith('__'): logger.debug("Delegating attribute '%s' to ndarray for %s", name, self.__class__.__name__) return getattr(self.__array__(), name) raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'")
[docs] def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): """Handle NumPy universal functions (ufuncs) by converting the object to an array. This method is specifically designed for NumPy's universal functions, which are element-wise operations like np.add, np.sin, np.multiply, etc. When a ufunc is called on an instance of this class, this method intercepts the call, converts any instances of this class in the inputs to arrays using ``__array__()``, and then applies the ufunc to the converted arrays. Unlike ``__array__``, which provides basic array conversion, this method handles the execution of specific ufunc operations. Unlike ``__array_function__``, which handles higher-level array functions, this focuses on element-wise operations. Parameters ---------- ufunc : numpy.ufunc The NumPy universal function being called. method : str The method of the ufunc (e.g., '__call__', 'reduce'). *inputs : tuple The input arguments to the ufunc. **kwargs : dict Keyword arguments passed to the ufunc. Returns ------- result The result of applying the ufunc to the converted inputs, wrapped in the appropriate class if the result is an array with compatible shape. """ logger.debug('__array_ufunc__(%s, %s)', ufunc.__name__, method) if method == '__call__' and not kwargs.get('out'): if ufunc == np.add: lhs, rhs = inputs if isinstance(rhs, binary_sequence): return rhs.__radd__(lhs) if ufunc == np.multiply: lhs, rhs = inputs if isinstance(rhs, binary_sequence): return rhs.__mul__(lhs) # Convert inputs that are instances of this class to arrays new_inputs = [] for inp in inputs: if isinstance(inp, self.__class__): new_inputs.append(inp.__array__()) else: new_inputs.append(inp) result = getattr(ufunc, method)(*new_inputs, **kwargs) try: if isinstance(result, np.ndarray): return binary_sequence(result) except (ValueError, TypeError) as e: logger.debug('Failed to convert ufunc result to binary_sequence: %s', e) return result
[docs] def __array_function__(self, func, types, args, kwargs): """Handle NumPy array functions by converting the object to an array. This method is called for NumPy array functions that are not universal functions (ufuncs). It handles higher-level array operations like np.sum, np.mean, np.concatenate, np.fft.fft, and other array manipulation functions. When such a function is called on an instance of this class, this method converts any instances in the arguments to arrays and then calls the function with the converted arguments. Key differences from other protocols: - ``__array__``: Provides basic array conversion without handling specific operations. - ``__array_ufunc__``: Handles element-wise universal functions (ufuncs) like np.add, np.sin, etc. - ``__array_function__``: Handles higher-level array functions and transformations like np.abs, etc. This enables seamless integration with NumPy's array function ecosystem, allowing instances to be used directly in functions like np.sum(x), np.mean(x), np.concatenate([x, y]), etc., without manual conversion. Parameters ---------- func : callable The NumPy array function being called (e.g., np.sum, np.mean). types : tuple The types of all arguments passed to the function. args : tuple The positional arguments passed to the function. kwargs : dict The keyword arguments passed to the function. Returns ------- result The result of applying the NumPy array function to the converted arguments, wrapped in the appropriate class if the result is an array with compatible shape. """ logger.debug('__array_function__(%s)', func.__name__) def _convert(obj): if isinstance(obj, self.__class__): return obj.__array__() elif isinstance(obj, (list, tuple)): return type(obj)(_convert(item) for item in obj) elif isinstance(obj, dict): return {k: _convert(v) for k, v in obj.items()} else: return obj # Convert args and kwargs that contain instances to arrays new_args = _convert(args) new_kwargs = _convert(kwargs) result = func(*new_args, **new_kwargs) try: if isinstance(result, np.ndarray): return binary_sequence(result) except (ValueError, TypeError) as e: logger.debug('Failed to convert array_function result to binary_sequence: %s', e) return result
[docs] def __getitem__(self, slice: int | slice): """Get a slice of the binary sequence (``self[slice]``). Parameters ---------- slice : :obj:`int` or :obj:`slice` The slice to get. Returns ------- :obj:`int` or :obj:`binary_sequence` The value of the slot if `slice` is an integer, or a new binary sequence object with the result of the slice. """ logger.debug('__getitem__(%s)', slice) if isinstance(slice, int): return self.data[slice] return binary_sequence(self.data[slice])
def __eq__(self, other): logger.debug('__eq__(%s)', type(other).__name__) if not isinstance(other, binary_sequence): other = binary_sequence(other) return binary_sequence(self.data == other.data) def __ne__(self, other): logger.debug('__ne__(%s)', type(other).__name__) if not isinstance(other, binary_sequence): other = binary_sequence(other) return binary_sequence(self.data != other.data) def __add__(self, other): logger.debug('__add__(%s)', type(other).__name__) if not isinstance(other, binary_sequence): other = binary_sequence(other) out = np.concatenate((self.data, other.data)) return binary_sequence(out) def __radd__(self, other): logger.debug('__radd__(%s)', type(other).__name__) if not isinstance(other, binary_sequence): other = binary_sequence(other) out = np.concatenate((other.data, self.data)) return binary_sequence(out) def __mul__(self, other): logger.debug('__mul__(%s)', type(other).__name__) if isinstance(other, int) and other > 1: # Repeat the sequence other times repeated = np.tile(self.data, other) return binary_sequence(repeated) else: # Convert other to binary_sequence if necessary if not isinstance(other, binary_sequence): other = binary_sequence(other) result = self.data * other.data return binary_sequence(result) __rmul__ = __mul__ def __invert__(self): logger.debug('__invert__()') return binary_sequence(~self.data.astype(bool)) def __or__(self, other): logger.debug('__or__(%s)', type(other).__name__) if not isinstance(other, binary_sequence): other = binary_sequence(other) return binary_sequence(self.data | other.data) __ror__ = __or__ def __and__(self, other): logger.debug('__and__(%s)', type(other).__name__) if not isinstance(other, binary_sequence): other = binary_sequence(other) return binary_sequence(self.data & other.data) __rand__ = __and__ def __xor__(self, other): logger.debug('__xor__(%s)', type(other).__name__) if not isinstance(other, binary_sequence): other = binary_sequence(other) return binary_sequence(self.data ^ other.data) __rxor__ = __xor__ # properties @property def ones(self): """Number of ones in the binary sequence.""" x = np.sum(self.data==1) logger.debug('ones: %d', x) return x @property def zeros(self): """Number of zeros in the binary sequence.""" x = np.sum(self.data == 0) logger.debug('zeros: %d', x) return x @property def size(self): """Number of slots of the binary sequence.""" x = self.data.size logger.debug('size: %d', x) return x @property def type(self): """Object type.""" x = type(self) logger.debug('type: %s', x.__name__) return x @property def sizeof(self): """Memory size of object in bytes.""" logger.debug('sizeof') x = sizeof(self) logger.debug('sizeof: %d bytes', x) return x # static methods
[docs] @staticmethod def prbs( order: int, len: int=None, seed: int=None, return_seed: bool=False ): r"""Pseudorandom binary sequence generator (PRBS) (*static method*). Parameters ---------- order : :obj:`int`, {7, 9, 11, 15, 20, 23, 31} degree of the generating pseudorandom polynomial len : :obj:`int`, optional lenght of output binary sequence seed : :obj:`int`, optional seed of the generator (initial state of the LFSR). It must be provided if you want to continue the sequence. Default is 2**order-1. return_seed : :obj:`bool`, optional If True, the last state of LFSR is returned. Default is False. Returns ------- out : :obj:`binary_sequence` generated pseudorandom binary sequence if `return_seed` is False out, last_seed : :obj:`tuple` of (:obj:`binary_sequence`, : obj:`int`) generated pseudorandom binary sequence and last state of LFSR if `return_seed` """ tic() taps = { 7: [7, 6], 9: [9, 5], 11: [11, 9], 15: [15, 14], 20: [20, 3], 23: [23, 18], 31: [31, 28], } seed = seed % (2**order) if seed is not None else (1 << order) - 1 if seed == 0: seed = 1 warnings.warn( "The seed can't be 0 or a multiple of 2**order. It has been changed to 1.", UserWarning, ) if len is not None: if not isinstance(len, int): raise TypeError("The parameter `len` must be an integer.") elif len <= 0: raise ValueError( "The parameter `len` must be an integer greater than cero." ) else: len = 2**order - 1 if order not in taps.keys(): raise ValueError( "The parameter `order` must be one of the following values (7, 9, 11, 15, 20, 23, 31)." ) prbs = np.empty((len,), dtype=np.uint8) # Preallocate memory for the PRBS lfsr = seed # initial state of the LFSR tap1, tap2 = np.array(taps[order]) - 1 index = 0 while index < len: prbs[index] = lfsr & 1 new = ((lfsr >> tap1) ^ (lfsr >> tap2)) & 1 lfsr = ((lfsr << 1) | new) & (1 << order) - 1 index += 1 output = binary_sequence(prbs) output.execution_time = toc() if not return_seed: return output return output, lfsr
# methods
[docs] def print(self, msg: str=None): """Print object parameters. Parameters ---------- msg : str, opcional top message to show Returns ------- :obj:`binary_sequence` The same object. """ logger.debug('print()') print(self.__str__(msg)) return self
[docs] def to_numpy(self, dtype: np.dtype | None = None) -> np.ndarray: """Return a NumPy representation of the binary sequence. This method is similar to ``__array__``, the diference is that some libraries, as matplotlib, used to call this method to get the numpy array. """ logger.debug("to_numpy(dtype=%s)", dtype) return np.array(self.data, dtype=dtype)
[docs] def flip(self): """Invert the binary sequence. Equivalent to the ``~`` operator. Returns ------- binary_sequence A new binary sequence object with the result of the inversion. """ logger.debug('flip()') return ~self
[docs] def hamming_distance(self, other): """Calculate the Hamming distance to another binary sequence of the same length. Parameters ---------- other : :obj:`str` or :obj:`binary_sequence` or :obj:`Array_Like` The binary sequence to compare. Returns ------- :obj:`int` The Hamming distance between the two binary sequences. """ logger.debug('hamming_distance(%s)', type(other).__name__) if not isinstance(other, binary_sequence): other = binary_sequence(other) return np.sum(self != other)
[docs] def dac(self, h: np.ndarray): """Apply upsampling and FIR filtering to the binary sequence for digital-to-analog conversion. This method upsamples the binary sequence by the global samples per slot (gv.sps) and applies the provided FIR filter to produce an electrical signal. Parameters ---------- h : :obj:`np.ndarray` The FIR filter impulse response to use for shaping the signal. Returns ------- :obj:`electrical_signal` The resulting electrical signal after upsampling, filtering, and downsampling. """ logger.debug('dac()') return electrical_signal(upfir(x=self.data, h=h, up=gv.sps))
[docs] def plot(self, **kwargs): """Plot the binary sequence using matplotlib. Parameters ---------- **kwargs : :obj:`dict` Additional keyword arguments to customize the plot. Returns ------- :obj:`matplotlib.axes.Axes` The axes object of the plot. """ logger.debug('plot()') _, ax = plt.subplots() ax.step(np.arange(self.size), self.data, where='post', **kwargs) ax.set_xlabel('Index') ax.set_ylabel('Value') ax.set_title('Binary Sequence') ax.set_yticks([0, 1]) ax.grid(True) return self
[docs] @logger.auto_indent_methods class electrical_signal(): """**Electrical Signal** This class provides methods and attributes to work with electrical signals. It has overloaded operators necessary to properly interpret the ``+``, ``-``, ``*``, ``/``, ``**``, and comparison operations as any numpy array. .. rubric:: Attributes .. autosummary:: ~electrical_signal.signal ~electrical_signal.noise ~electrical_signal.execution_time ~electrical_signal.size ~electrical_signal.real ~electrical_signal.imag ~electrical_signal.type ~electrical_signal.fs ~electrical_signal.sps ~electrical_signal.dt ~electrical_signal.t ~electrical_signal.sizeof .. rubric:: Methods .. autosummary:: __init__ __call__ print to_numpy conj sum w f abs power normalize phase filter plot psd plot_eye grid legend show .. table:: **Implemented Operators** :widths: 10 90 :align: center +--------------------------------+--------------------------------------------------------------------------------------------------+ | Operator | Description | +================================+==================================================================================================+ | ``+`` | ``a + b`` adds signals and noises element-wise; | | | ``sig = (a.signal + b.signal), noi = (a.noise + b.noise)`` | +--------------------------------+--------------------------------------------------------------------------------------------------+ | ``-`` | ``a - b`` subtracts signals and noises element-wise; | | | ``sig = (a.signal - b.signal), noi = (a.noise - b.noise)`` | +--------------------------------+--------------------------------------------------------------------------------------------------+ | ``*`` | ``a * b`` multiplies two ``electrical_signal``; | | | ``sig = (a.signal*b.signal)`` | | | ``noi = (a.signal*b.noise + a.noise*b.signal + a.noise*b.noise)`` | +--------------------------------+--------------------------------------------------------------------------------------------------+ | ``/`` | ``a / n`` divides signal and noise by a scalar; | | | ``sig = (a.signal/n), noi = (a.signal/n)`` | +--------------------------------+--------------------------------------------------------------------------------------------------+ | ``//`` | ``a // n`` floor divides signal and noise by a scalar; | | | ``sig = (a.signal//n), noi = (a.signal//n)`` | +--------------------------------+--------------------------------------------------------------------------------------------------+ | ``**`` | ``a ** n`` raises ``electrical_signal`` to a power; | | | ``- n=1 --> sig = (a.signal), noi = (a.noise)``; | | | ``- n=2 --> sig = (a.signal**2), noi = (2*a.signal*a.noise + a.noise**2)`` | | | ``- n=other --> sig = (a.signal + a.noise)**n, noi=NULL`` | +--------------------------------+--------------------------------------------------------------------------------------------------+ | ``>`` | ``a > b`` compares signals element-wise, returns | | | ``binary_sequence`` mask. | +--------------------------------+--------------------------------------------------------------------------------------------------+ | ``<`` | ``a < b`` compares signals element-wise, returns | | | ``binary_sequence`` mask. | +--------------------------------+--------------------------------------------------------------------------------------------------+ | ``==`` | ``a == b`` compares signals element-wise, returns | | | ``np.ndarray`` mask. | +--------------------------------+--------------------------------------------------------------------------------------------------+ | ``[:]`` | ``a[i]`` returns the value at index ``i``; | | | ``a[i:j]`` returns a sliced ``electrical_signal``. | +--------------------------------+--------------------------------------------------------------------------------------------------+ | ``-`` (unary) | ``-a`` negates signal and noise. | +--------------------------------+--------------------------------------------------------------------------------------------------+ """
[docs] def __init__(self, signal: str | Iterable, noise: str | Iterable = NULL, dtype: np.dtype=None) -> None: """ Initialize the electrical signal object. Parameters ---------- signal : :obj:`str` or 1D array_like or scalar The signal values. noise : :obj:`str` or 1D array_like or scalar, optional The noise values. Defaults to ``NULL``. dtype : :obj:`np.dtype`, optional The desired data type for the signal and noise arrays. If not provided, the data type will be inferred from the input data. Defaults to ``None``. Notes ----- The signal and noise can be provided as a string, in which case it will be converted to a ``numpy.array`` using the :func:`str2array` function. For example: .. code-block:: python >>> electrical_signal('1 2 3,4,5') # separate values by space or comma indistinctly electrical_signal(signal=[1 2 3 4 5], noise=NULL) >>> electrical_signal('1+2j, 3+4j, 5+6j') # complex values electrical_signal(signal=[1.+2.j 3.+4.j 5.+6.j], noise=NULL) """ sig, noi = signal, noise if self.__class__ == electrical_signal: logger.debug("%s.__init__()", self.__class__.__name__) if isinstance(signal, electrical_signal): sig, noi = signal.signal, signal.noise if noise is not NULL: _, noi_ = self._prepare_arrays(sig, noise, dtype) noi = noi + noi_ else: sig, noi = self._prepare_arrays(signal, noise, dtype) if sig.ndim > 1 or sig.size < 1: raise ValueError(f"Signal must be scalar or 1D array for electrical_signal, invalid shape {sig.shape}") if sig.ndim == 0: sig = sig[np.newaxis] if noi is not NULL: noi = noi[np.newaxis] self.signal = sig """The signal values, a 1D array-like values.""" self.noise = noi """The noise values, a 1D array-like values.""" self.execution_time = 0. """The execution time of the last operation performed."""
def __str__(self, title: str=None): logger.debug("__str__()") if title is None: title = self.__class__.__name__ title = 3*'*' + f' {title} ' + 3*'*' sub = len(title)*'-' tab = 3*' ' np.set_printoptions(precision=3, threshold=20) if self.signal.ndim == 1: signal = str(self.signal) noise = str(self.noise) else: signal = str(self.signal).replace('\n', '\n'+tab + 11*' ') noise = str(self.noise).replace('\n', '\n'+tab + 11*' ') pw_sig_w = self.power('W', 'signal') pw_sig_dbm = dbm(pw_sig_w) pw_noi_w = self.power('W', 'noise') pw_noi_dbm = dbm(pw_noi_w) pw_all_w = self.power('W', 'all') pw_all_dbm = dbm(pw_all_w) msg = f'\n{sub}\n{title}\n{sub}\n'+ tab + \ f'signal: {signal} (shape: {self.shape})\n'+ tab + \ f'noise: {noise} (shape: {self.shape if self.noise is not NULL else None})\n'+ tab + \ f"pow_signal: {si(pw_sig_w, 'W', 1)} ({pw_sig_dbm:.1f} dBm)\n"+ tab + \ f"pow_noise: {si(pw_noi_w, 'W', 1)} ({pw_noi_dbm:.1f} dBm)\n"+ tab + \ f"pow_total: {si(pw_all_w, 'W', 1)} ({pw_all_dbm:.1f} dBm)\n"+ tab + \ f'len: {self.size}\n' + tab + \ f'elem_type: {self.dtype}\n' + tab + \ f'mem_size: {self.sizeof} bytes\n' + tab + \ f'time: {si(self.execution_time, "s", 2)}\n' return msg def __repr__(self): logger.debug("__repr__()") np.set_printoptions(precision=3, threshold=20) if self.noise is not NULL: return f'electrical_signal({str(self.signal)})' return f'electrical_signal(signal={str(self.signal)},\n\t\t noise={str(self.noise)})' def __len__(self): logger.debug("__len__()") return self.size def __iter__(self): logger.debug("__iter__()") return iter(self.__array__()) def __array__(self, dtype=None): logger.debug("__array__()") arr = self.signal + self.noise if dtype is not None: arr = arr.astype(dtype) return arr def __getattr__(self, name): logger.debug("__getattr__('%s')", name) # Check if the attribute exists in np.ndarray and is not a private/internal attribute if hasattr(np.ndarray, name) and not name.startswith('__'): logger.debug("Delegating attribute '%s' to ndarray for %s", name, self.__class__.__name__) return getattr(self.__array__(), name) logger.debug("Attribute '%s' not found in %s", name, self.__class__.__name__) raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'") def __array_ufunc__(self, ufunc, method, *inputs, **kwargs): logger.debug("__array_ufunc__(%s, %s)", getattr(ufunc, '__name__', ufunc), method) if method == '__call__' and not kwargs.get('out'): if ufunc == np.add: lhs, rhs = inputs if isinstance(rhs, electrical_signal): return rhs.__add__(lhs) if ufunc == np.subtract: lhs, rhs = inputs if isinstance(rhs, electrical_signal): return (-rhs).__add__(lhs) if ufunc == np.multiply: lhs, rhs = inputs if isinstance(rhs, electrical_signal): return rhs.__mul__(lhs) # Convert inputs that are instances of this class to arrays new_inputs = [] for inp in inputs: if isinstance(inp, self.__class__): new_inputs.append(inp.__array__()) else: new_inputs.append(inp) # Call the ufunc result = getattr(ufunc, method)(*new_inputs, **kwargs) # If the result is an array with compatible shape, wrap it in the class if isinstance(result, np.ndarray): if self.__class__ == electrical_signal and result.ndim == 1: return self.__class__(result) elif self.__class__ == optical_signal and result.ndim in [1, 2]: return self.__class__(result) else: return result return result def __array_function__(self, func, types, args, kwargs): logger.debug("__array_function__(%s, %s, %s)", getattr(func, '__name__', func), args, kwargs) @logger.auto_indent def _convert(obj): logger.debug("_convert(%s)", type(obj).__name__) if isinstance(obj, self.__class__): return obj.__array__() elif isinstance(obj, (list, tuple)): return type(obj)(_convert(item) for item in obj) elif isinstance(obj, dict): return {k: _convert(v) for k, v in obj.items()} else: return obj # Convert args and kwargs that contain instances to arrays new_args = _convert(args) new_kwargs = _convert(kwargs) result = func(*new_args, **new_kwargs) # If the result is an array with compatible shape, wrap it in the class if isinstance(result, np.ndarray): if self.__class__ == electrical_signal and result.ndim == 1: return self.__class__(result) elif self.__class__ == optical_signal and result.ndim in [1, 2]: return self.__class__(result) else: return result return result def __add__(self, other): logger.debug("__add__()") other, _ = self._parse(other) sig = self.signal + other.signal noi = self.noise + other.noise return self.__class__(sig, noi) def __radd__(self, other): logger.debug("__radd__()") return self.__add__(other) def __neg__(self): logger.debug("__neg__()") sig = -self.signal noi = -self.noise return self.__class__(sig, noi) def __sub__(self, other): logger.debug("__sub__()") other, _ = self._parse(other) return self + (-other) def __rsub__(self, other): logger.debug("__rsub__()") other, _ = self._parse(other) return (-self) + other def __mul__(self, other): logger.debug("__mul__()") other, _ = self._parse(other) sig = self.signal*other.signal noi = self.signal*other.noise + self.noise*other.signal + self.noise*other.noise return self.__class__(sig, noi) def __rmul__(self, other): logger.debug("__rmul__()") return self.__mul__(other) def __truediv__(self, number: int): logger.debug("__truediv__()") if not isinstance(number, ComplexNumber): logger.error("Division by unsupported type %s", type(number)) raise TypeError(f"Can't divide electrical_signal by type {type(number)}") if number == 0: logger.error("Attempted division by zero in %s", self.__class__.__name__) raise ZeroDivisionError("Can't divide electrical_signal by zero") return self.__class__(self.signal / number, self.noise / number) def __floordiv__(self, other): logger.debug("__floordiv__()") x = (self/other) return self.__class__( np.floor(x.signal), np.floor(x.noise) ) def __getitem__(self, key): logger.debug("__getitem__(%s)", key) if isinstance(key, slice): if self.noise is NULL: return self.__class__( self.signal[key] ) return self.__class__( self.signal[key], self.noise[key] ) elif isinstance(key, int): if self.noise is NULL: return self.signal[key] return self.__class__( self.signal[key], self.noise[key] ) raise TypeError(f"Invalid argument type. {key} of type {type(key)}") def __gt__(self, other): logger.debug("__gt__()") other, _ = self._parse(other) x_r = self.signal + self.noise x_l = other.signal + other.noise return binary_sequence(x_r > x_l) def __lt__(self, other): logger.debug("__lt__()") return other - self > 0 def __eq__(self, other): logger.debug("__eq__()") other, _ = self._parse(other) x_r = self.signal + self.noise x_l = other.signal + other.noise return x_r == x_l def __pow__(self, other): logger.debug("__pow__()") if not isinstance(other, RealNumber): raise TypeError(f"Can't exponentiate electrical_signal by type {type(other)}") if other == 0: sig = np.ones_like(self.signal) noi = NULL elif other == 1: sig = self.signal noi = self.noise elif other == 2: sig = self.signal**2 noi = 2*self.signal*self.noise + self.noise**2 else: sig = (self.signal + self.noise) ** other noi = NULL return self.__class__(sig, noi)
[docs] def __call__(self, domain: Literal['t','w', 'f'], shift: bool=False): """ Return a new object with Fast Fourier Transform (FFT) of signal and noise of input object. Parameters ---------- domain : {'t', 'w', 'f'} Domain to transform. 't' for time domain (ifft is applied), 'w' and 'f' for frequency domain (fft is applied). shift : :obj:`bool`, optional If True, apply the ``np.fft.fftshift()`` or ``np.fft.ifftshift`` functions as appropriate. Returns ------- new_obj : :obj:`electrical_signal` or :obj:`optical_signal` A new electrical signal object with the result of the transformation. Raises ------ TypeError If ``domain`` is not one of the following values ('t', 'w', 'f'). """ logger.debug("__call__(domain='%s', shift=%s)", domain, shift) if domain == 'w' or domain == 'f': signal = fft(self.signal, axis=-1) noise = fft(self.noise, axis=-1) elif domain == 't': signal = ifft(self.signal, axis=-1) noise = ifft(self.noise, axis=-1) else: raise ValueError("`domain` must be one of the following values ('t', 'w', 'f')") if shift: if domain == 'w' or domain == 'f': signal = fftshift(signal, axes=-1) noise = fftshift(noise, axes=-1) else: signal = ifftshift(signal, axes=-1) noise = ifftshift(noise, axes=-1) return self.__class__(signal, noise)
# properties @property def index(self) -> np.ndarray: logger.debug("index") return np.arange(self.signal.size) @property def size(self) -> np.ndarray: """Number of samples of the electrical signal.""" logger.debug("size") return self.signal.size @property def real(self) -> np.ndarray: """Real part of the electrical signal (signal + noise).""" logger.debug("real") return self.__class__(self.signal.real, self.noise.real) @property def imag(self) -> np.ndarray: """Imaginary part of the electrical signal (signal + noise).""" logger.debug("imag") return self.__class__(self.signal.imag, self.noise.imag) @property def type(self): """Object type.""" logger.debug("type") return type(self) @property def sizeof(self): """Memory size of object in bytes.""" logger.debug("sizeof") return sizeof(self) @property def fs(self): """Sampling frequency of the electrical signal.""" logger.debug("fs()") return gv.fs @property def sps(self): """Samples per slot of the electrical signal.""" logger.debug("sps()") return gv.sps @property def dt(self): """Time step of the electrical signal.""" logger.debug("dt") return gv.dt @property def t(self): """Time array for the electrical signal.""" logger.debug("t") return gv.t[:self.size] # static and private methods @staticmethod # can be used without instantiating the class, eg: electrical_signal._prepare_arrays() def _prepare_arrays(signal, noise, dtype): logger.debug("_prepare_arrays()") @logger.auto_indent def _convert_to_array(value, dtype, text=''): logger.debug("_convert_to_array(%s(%s))", text, type(value).__name__) if isinstance(value, str): return str2array(value) return np.array(value) signal = _convert_to_array(signal, dtype, 'signal') if noise is not NULL: noise = _convert_to_array(noise, dtype, 'noise') if dtype is None: arrays_type = np.result_type(signal, noise) # obtain the most comprehensive type else: arrays_type = dtype signal = signal.astype(arrays_type) noise = noise.astype(arrays_type) if signal.shape != noise.shape: raise ValueError(f"`signal` and `noise` must have the same shape, mismatch shapes {signal.shape} and {noise.shape}!") else: if dtype is not None: signal = signal.astype(dtype) return signal, noise def _parse(self, other): logger.debug("_parse()") if not isinstance(other, self.type): other = self.__class__(other) else: other = other[:] if self.size != other.size: l_min = min(self.size, other.size) l_max = max(self.size, other.size) if l_min != 1 and l_min != l_max: raise ValueError(f"Can't operate '{self.__class__.__name__}'s with shapes {self.shape} and {other.shape}") dtype = np.result_type(self.signal, other.signal) return other, dtype # public methods
[docs] def print(self, msg: str=None): """Print object parameters. Parameters ---------- msg : :obj:`str`, opcional top message to show Returns ------- self : electrical_signal The same object. """ logger.debug("print()") print(self.__str__(msg)) return self
[docs] def to_numpy(self, dtype: np.dtype | None = None, copy: bool = False) -> np.ndarray: """Return a NumPy representation of the electrical signal (signal + noise).""" logger.debug("to_numpy(dtype=%s, copy=%s)", dtype, copy) data = self.signal + self.noise return np.array(data, dtype=dtype, copy=copy)
[docs] def conj(self): """Return the complex conjugate of the electrical signal. Returns ------- :obj:`electrical_signal` The complex conjugate of the electrical signal. """ logger.debug("conj()") return self.__class__(self.signal.conj(), self.noise.conj())
[docs] def sum(self, axis: int=None): """Return the sum of the elements over a given axis. Parameters ---------- axis : :obj:`int`, optional Axis along which the sum is computed. By default, the sum is computed over the entire array. Returns ------- :obj:`electrical_signal` New object with signal and noise summed over the specified axis. """ logger.debug("sum(axis=%s)", axis) sig = self.signal.sum(axis=axis) noi = self.noise.sum(axis=axis) if self.noise is not NULL else NULL return self.__class__(sig, noi)
[docs] def w(self, shift: bool=False): """Return angular frequency (rad/s) for spectrum representation. Parameters ---------- shift : :obj:`bool`, optional If True, apply fftshift(). Returns ------- :obj:`np.ndarray` The angular frequency array for signals simulation. """ w = fftfreq(self.size, gv.dt)*2*pi if shift: return fftshift(w, axes=-1) return w
[docs] def f(self, shift: bool=False): """Return frequency (Hz) for spectrum representation. Parameters ---------- shift : :obj:`bool`, optional If True, apply fftshift(). Returns ------- :obj:`np.ndarray` The frequency array for signals simulation. """ return self.w(shift)/(2*pi)
[docs] def abs(self, of: Literal['signal','noise','all']='all'): """Get absolute value of ``signal``, ``noise`` or ``signal+noise``. Parameters ---------- of : :obj:`str`, optional Defines from which attribute to obtain the absolute value. If 'all', absolute value of ``signal+noise`` is determined. Returns ------- out : :obj:`np.ndarray`, (1D or 2D, float) The absolute value of the object. """ logger.debug("abs(of='%s')", of) if not isinstance(of, str): raise TypeError('`of` must be a string.') of = of.lower() if of == 'signal': return self.__class__(np.abs(self.signal)) elif of == 'noise': if self.noise is NULL: return self.__class__(np.zeros_like(self.signal.real)) return self.__class__(np.abs(self.noise)) elif of == 'all': return np.abs(self) else: raise ValueError('`of` must be one of the following values ("signal", "noise", "all")')
[docs] def power(self, unit : Literal['W', 'dBm']='W', of: Literal['signal','noise','all']='all'): """Get power of the electrical signal. Parameters ---------- unit : :obj:`str`, optional Defines the unit of power. 'W' for Watts, 'dBm' for decibels-milliwatts. of : :obj:`str`, optional Defines from which attribute to obtain the power. If 'all', power of ``signal+noise`` is determined. Returns ------- :obj:`float` The power of the electrical signal. """ logger.debug("power(unit='%s', of='%s')", unit, of) if of.lower() not in ['signal', 'noise', 'all']: raise ValueError('`of` must be one of the following values ("signal", "noise", "all")') p = np.mean(self.abs(of)**2, axis=-1) unit = unit.lower() if unit == 'w': return p elif unit == 'dbm': return dbm(p) else: raise ValueError('`unit` must be one of the following values ("W", "dBm")')
[docs] def normalize(self, by: Literal['power', 'amplitude']='power'): """Return the power-normalized signal Parameters ---------- by : :obj:`str`, optional Defines the normalization method. ``'power'`` for power normalization, ``'amplitude'`` for amplitude normalization. Returns ------- :obj:`electrical_signal` The normalized electrical signal. """ logger.debug("normalize(by='%s')", by) x = self[:] if by == 'power': pw = x.power('W', 'signal') return x / pw**0.5 elif by == 'amplitude': amp = x.abs('signal').max() return x / amp else: raise ValueError('`by` must be one of the following values ("power", "amplitude")')
[docs] def phase(self): """Get phase of the electrical signal: ``unwrap(angle(signal+noise))``. Returns ------- :obj:`np.ndarray` the unwrapped phase of the electrical signal. """ logger.debug("phase()") return np.unwrap(np.angle(self))
[docs] def filter(self, h: np.ndarray): """Apply FIR filter of impulse response **h** to the electrical signal: ``np.convolve(signal + noise, h, mode='same')``. Parameters ---------- h : :obj:`np.ndarray` The FIR filter impulse response. Returns ------- :obj:`electrical_signal` An electrical signal object with the result of the filtering. """ logger.debug("filter()") sig = sg.fftconvolve(self.signal, h, mode='same') if self.noise is not NULL: noi = sg.fftconvolve(self.noise, h, mode='same') else: noi = NULL return self.__class__(sig, noi)
[docs] def plot(self, fmt: str | list='-', n: int=None, xlabel: str=None, ylabel: str=None, grid: bool=False, hold: bool=True, show: bool=False, **kwargs: dict): r"""Plot signal in time domain. For electrical_signal: plots the real part of the signal. For optical_signal: plots the intensity/power. Parameters ---------- fmt : :obj:`str` or :obj:`list`, optional Format style of line. Example ``'b-.'``, Defaults to ``'-'``. n : :obj:`int`, optional Number of samples to plot. Defaults to the length of the signal. xlabel : :obj:`str`, optional X-axis label. Defaults to ``'Time [ns]'``. ylabel : :obj:`str`, optional Y-axis label. Defaults to ``'Amplitude [V]'`` grid : :obj:`bool`, optional If show grid. Defaults to ``False``. hold : :obj:`bool`, optional If hold the current plot. Defaults to ``True``. **kwargs : :obj:`dict` Additional keyword arguments compatible with ``matplotlib.pyplot.plot()``. Returns ------- :obj:`electrical_signal` The same object. """ logger.debug("plot()") n = min(self.size, gv.t.size) if n is None else n t = gv.t[:n]*1e9 y = self[:n] args = (t, y, fmt) ylabel = ylabel if ylabel else 'Amplitude [V]' xlabel = xlabel if xlabel else 'Time [ns]' label = kwargs.pop('label', None) if not hold: plt.figure() plt.plot(*args, **kwargs, label=label) plt.xlabel(xlabel) plt.ylabel(ylabel) if grid: for i, t_ in enumerate(t[:n*gv.sps][::gv.sps]): plt.axvline(t_, color='gray', ls='--', alpha=0.3, lw=1) plt.axvline(t[-1] + gv.dt*1e9, color='gray', ls='--', alpha=0.3, lw=1) plt.grid(alpha=0.3, axis='y') if label is not None: plt.legend() if show: plt.show() return self
[docs] def psd(self, fmt: str | list='-', mode: Literal['x','y','both']='x', n: int=None, xlabel: str=None, ylabel: str=None, yscale: Literal['linear', 'dbm']='dbm', grid: bool=False, hold: bool=True, show: bool=False, **kwargs: dict): """Plot Power Spectral Density (PSD) of the electrical/optical signal. Parameters ---------- fmt : :obj:`str` or :obj:`list` Format style of line. Example ``'b-.'``. Defaults to ``'-'``. mode : :obj:`str`, optional Polarization mode to show (for optical signals). Defaults to ``'x'``. - ``'x'`` plot polarization x. - ``'y'`` plot polarization y. - ``'both'`` plot both polarizations x and y in the same figure. n : :obj:`int`, optional Number of samples to plot. Defaults to the length of the signal. xlabel : :obj:`str`, optional X-axis label. Defaults to ``'Frequency [GHz]'``. ylabel : :obj:`str`, optional Y-axis label. Defaults to ``'Power [dBm]'`` if ``yscale='dbm'`` or ``'Power [mW]'`` if ``yscale='linear'``. yscale : :obj:`str`, {'linear', 'dbm'}, optional Kind of Y-axis plot. Defaults to ``'dbm'``. grid : :obj:`bool`, optional If show grid. Defaults to ``True``. hold : :obj:`bool`, optional If hold the current plot. Defaults to ``True``. **kwargs : :obj:`dict` Additional matplotlib arguments. Returns ------- obj:`electrical_signal` The same object. """ logger.debug("psd()") n = min(self.size, gv.t.size) if n is None else n sig = self[:n].signal nperseg = 2048 if sig.shape[-1] > 2048 else sig.shape[-1] f, psd = sg.welch(sig, fs=gv.fs*1e-9, nperseg=nperseg, scaling='spectrum', return_onesided=False, detrend=False) f, psd = fftshift(f), fftshift(psd, axes=-1) if yscale == 'linear': psd = psd*1e3 ylabel = ylabel if ylabel else 'Power [mW]' ylim = (-0.1,) elif yscale == 'dbm': psd = dbm(psd) ylabel = ylabel if ylabel else 'Power [dBm]' ylim = (-100,) else: raise TypeError('`yscale` must be one of the following values ("linear", "dbm")') n_pol = getattr(self, 'n_pol', 1) if n_pol == 1: if not isinstance(fmt, str): warnings.warn('`fmt` must be a string for single polarization signals, using default value.') fmt = '-' args = (f, psd, fmt) else: if mode == 'x': if not isinstance(fmt, str): warnings.warn('`fmt` must be a string for single polarization signals, using default value.') fmt = '-' args = (f, psd[0], fmt) elif mode == 'y': if not isinstance(fmt, str): warnings.warn('`fmt` must be a string for single polarization signals, using default value.') fmt = '-' args = (f, psd[1], fmt) elif mode == 'both': if isinstance(fmt, (list, tuple)): args = (f, psd[0], fmt[0], f, psd[1], fmt[1]) elif isinstance(fmt, str): args = (f, psd[0], fmt, f, psd[1], fmt) else: warnings.warn('`fmt` must be a string or a list of strings for both polarizations signals, using default value.') args = (f, psd[0], '-', f, psd[1], '-') else: raise TypeError('argument `mode` should be ("x", "y" or "both")') label = kwargs.pop('label', None) if mode == 'both' and n_pol > 1 else None if not hold: plt.figure() ls = plt.plot(*args, **kwargs) plt.ylabel(ylabel) plt.xlabel(xlabel if xlabel else 'Frequency [GHz]') plt.xlim( -3.5*gv.R*1e-9, 3.5*gv.R*1e-9 ) plt.ylim( *ylim ) if grid: plt.grid(alpha=0.3) if label is not None: if isinstance(label, str): ls[0].set_label(label + ' X') ls[1].set_label(label + ' Y') elif isinstance(label, (list, tuple)): ls[0].set_label(label[0]) ls[1].set_label(label[1]) else: raise ValueError('`label` must be a string or a list of strings.') plt.legend() if show: plt.show() return self
[docs] def plot_eye(self, n_traces=None, cmap='jet', N_grid_bins=200, grid_sigma=5, style: Literal['line', 'dot', 'density']='dot', ax=None, **plot_kw): r"""Plots a colored eye diagram, internally calculating color density. Parameters ---------- n_traces : :obj:`int`, optional Maximum number of traces to plot. If ``None``, all available traces will be plotted. Defaults to ``None``. cmap : :obj:`str`, optional Name of the matplotlib colormap. Defaults to ``'jet'``. N_grid_bins : :obj:`int`, optional Number of bins for the density histogram. Defaults to ``200``. grid_sigma : :obj:`float`, optional Sigma for the Gaussian filter applied to the density. Defaults to ``5``. style : Literal['line', 'dot', 'density'], optional Style of the eye diagram plot: - 'line': Plots traces as lines. - 'dot': Plots traces as dots. - 'density': Plots density map only. ax : :obj:`matplotlib.axes.Axes`, optional Axes object to plot on. If ``None``, creates new figure and axes. Defaults to ``None``. \*\*plot_kw : :obj:`dict`, optional Additional plotting parameters: *Figure parameters (used only if ax is ``None``):* - **figsize** : :obj:`tuple`, default ``(10, 6)`` - **dpi** : :obj:`int`, default ``100`` *Line collection parameters:* - **linewidth** : :obj:`float`, default ``0.75`` - **alpha** : :obj:`float`, default ``0.25`` - **capstyle** : :obj:`str`, default ``'round'`` - **joinstyle** : :obj:`str`, default ``'round'`` *Axes formatting parameters:* - **xlabel** : :obj:`str`, default ``"Time (2-symbol segment)"`` - **ylabel** : :obj:`str`, default ``"Amplitude"`` - **title** : :obj:`str`, default ``"Eye Diagram ({num_traces} traces)"`` - **grid** : bool, default ``True`` - **grid_alpha** : :obj:`float`, default ``0.3`` - **xlim** : :obj:`tuple`, optional (xmin, xmax) - **ylim** : :obj:`tuple`, optional (ymin, ymax) - **tight_layout** : :obj:`bool`, default ``True`` *Display parameters:* - **show** : :obj:`bool`, default ``True`` (whether to call ``plt.show()``) Returns ------- :obj:`electrical_signal` The same object with the plotted eye diagram. """ tic() logger.debug("plot_eye()") MAX_TRACES = 4096 if n_traces is None: n_traces = MAX_TRACES # large number to plot all traces else: n_traces = min(n_traces, MAX_TRACES) eyediagram(self, gv.sps, n_traces, cmap, N_grid_bins, grid_sigma, style, ax, **plot_kw) self.execution_time += toc() return self
[docs] def grid(self, **kwargs): r"""Add grid to the plot. Parameters ---------- \*\*kwargs : :obj:`dict` Arbitrary keyword arguments to pass to the function. Returns ------- self : :obj:`electrical_signal` The same object. """ logger.debug("grid()") kwargs['alpha'] = kwargs.get('alpha', 0.3) plt.grid(**kwargs) return self
[docs] def legend(self, *args, **kwargs): r"""Add a legend to the plot. Parameters ---------- \*args : :obj:`iterable` Variable length argument list to pass to the function. \*\*kwargs : :obj:`dict` Arbitrary keyword arguments to pass to the function. Returns ------- self : :obj:`electrical_signal` The same object. """ logger.debug("legend()") plt.legend(*args, **kwargs) return self
[docs] def show(self): """Show plots. Returns ------- self : :obj:`electrical_signal` The same object. """ logger.debug("show()") plt.show() return self
[docs] @logger.auto_indent_methods class optical_signal(electrical_signal): """**Optical Signal** Bases: :obj:`electrical_signal` This class provides methods and attributes to work with optical signals. Attributes and some methods are inherited from the :obj:`electrical_signal` class. .. rubric:: Attributes .. autosummary:: ~optical_signal.signal ~optical_signal.noise ~optical_signal.execution_time .. rubric:: Methods .. autosummary:: __init__ plot """
[docs] def __init__(self, signal: str | Iterable, noise: str | Iterable = NULL, n_pol: Literal[1, 2] = None, dtype: np.dtype=None): """ Initialize the optical signal object. Parameters ---------- signal : :obj:`str` or array_like (1D, 2D) or scalar The signal values. noise : :obj:`str` or array_like (1D, 2D) or scalar, optional The noise values, default is ``NULL``. n_pol : :obj:`int`, optional Number of polarizations. Defaults to ``1``. """ if self.__class__ == optical_signal: logger.debug("%s.__init__(n_pol=%s, dtype=%s)", self.__class__.__name__, n_pol, dtype) if isinstance(signal, (electrical_signal, optical_signal)): sig, noi = signal.signal, signal.noise if noise is not NULL: _, noi_ = self._prepare_arrays(signal, noise, dtype) noi = noi + noi_ else: sig, noi = self._prepare_arrays(signal, noise, dtype) if sig.ndim>2 or (sig.ndim>1 and sig.shape[0]>2) or sig.size<1: raise ValueError(f"Signal must be a scalar, 1D or 2D array for optical_signal, invalid shape {sig.shape}") if n_pol is not None and n_pol not in [1, 2]: raise ValueError("n_pol must be either 1 or 2") if sig.ndim == 0: if n_pol is None or n_pol == 1: sig = sig[np.newaxis] if noi is not NULL: noi = noi[np.newaxis] n_pol=1 else: sig = np.array([[sig], [sig]]) if noi is not NULL: noi = np.array([[noi], [noi]]) elif sig.ndim == 1: if n_pol is None or n_pol == 1: n_pol = 1 else: sig = np.array([sig, sig]) if noi is not NULL: noi = np.array([noi, noi]) elif sig.ndim == 2 and sig.shape[0] == 1: if n_pol is None or n_pol == 2: sig = np.tile(sig, (2, 1)) if noi is not NULL: noi = np.tile(noi, (2, 1)) n_pol = 2 else: sig = sig[0] if noi is not NULL: noi = noi[0] elif sig.ndim == 2 and sig.shape[0] == 2: if n_pol is None or n_pol == 2: n_pol = 2 else: sig = sig[0] if noi is not NULL: noi = noi[0] self.n_pol = n_pol super().__init__(sig, noi, dtype=dtype)
def __repr__(self): logger.debug("__repr__()") np.set_printoptions(precision=1, threshold=20) if self.noise is not NULL: signal = str(self.signal).replace('\n', '\n' + 15*' ') return f'optical_signal({signal})' signal = str(self.signal).replace('\n', '\n' + 22*' ') noise = str(self.noise).replace('\n', '\n' + 22*' ') return f'optical_signal(signal={signal}\n' + 16*' '+ f'noise={noise})' def __str__(self, title: str=None): logger.debug("__str__()") if title is None: title = self.__class__.__name__ title = 3*'*' + f' {title} ' + 3*'*' sub = len(title)*'-' tab = 3*' ' np.set_printoptions(precision=3, threshold=20) if self.signal.ndim == 1: signal = str(self.signal) noise = str(self.noise) else: signal = str(self.signal).replace('\n', '\n'+tab + 11*' ') noise = str(self.noise).replace('\n', '\n'+tab + 11*' ') # Get power values pw_sig_w = self.power('W', 'signal') pw_sig_dbm = self.power('dbm', 'signal') pw_noise_w = self.power('W', 'noise') pw_noise_dbm = self.power('dbm', 'noise') pw_all_w = self.power('W', 'all') pw_all_dbm = self.power('dbm', 'all') # Format power strings if np.isscalar(pw_sig_w): pow_sig_str = f"{si(pw_sig_w, 'W', 1)} ({pw_sig_dbm:.1f} dBm)" pow_noise_str = f"{si(pw_noise_w, 'W', 1)} ({pw_noise_dbm:.1f} dBm)" pow_all_str = f"{si(pw_all_w, 'W', 1)} ({pw_all_dbm:.1f} dBm)" else: # For multi-polarization pow_sig_str = ', '.join([f"Pol{i}: {si(pw_sig_w[i], 'W', 1)} ({pw_sig_dbm[i]:.1f} dBm)" for i in range(len(pw_sig_w))]) pow_noise_str = ', '.join([f"Pol{i}: {si(pw_noise_w[i], 'W', 1)} ({pw_noise_dbm[i]:.1f} dBm)" for i in range(len(pw_noise_w))]) pow_all_str = ', '.join([f"Pol{i}: {si(pw_all_w[i], 'W', 1)} ({pw_all_dbm[i]:.1f} dBm)" for i in range(len(pw_all_w))]) msg = f'\n{sub}\n{title}\n{sub}\n'+ tab + \ f'signal: {signal} (shape: {self.shape})\n'+ tab + \ f'noise: {noise} (shape: {self.shape if self.noise is not NULL else None})\n'+ tab + \ f'pow_signal: {pow_sig_str}\n'+ tab + \ f'pow_noise: {pow_noise_str}\n'+ tab + \ f'pow_total: {pow_all_str}\n'+ tab + \ f'elem_type: {self.dtype}\n' + tab + \ f'mem_size: {self.sizeof} bytes\n' + tab + \ f'time: {si(self.execution_time, "s", 2)}\n' return msg def __getitem__(self, key): logger.debug("__getitem__(%s)", key) if isinstance(key, tuple): if len(key) != 2: raise IndexError('Too many indices for optical_signal object.') pol_idx, time_idx = key if self.n_pol == 1 and pol_idx not in [0, -1, slice(None)]: raise IndexError('Optical signal has only one polarization (index 0).') sig = self.signal[pol_idx, time_idx] if self.n_pol == 2 else self.signal[time_idx] if self.noise is not NULL: noi = self.noise[pol_idx, time_idx] if self.n_pol == 2 else self.noise[time_idx] elif isinstance(time_idx, int): return sig[time_idx] else: noi = NULL return self.__class__(sig, noi, n_pol=1 if sig.ndim!=2 else self.n_pol) elif isinstance(key, slice): if self.n_pol == 1: sig = self.signal[key] if self.noise is not NULL: noi = self.noise[key] else: noi = NULL else: sig = self.signal[:, key] if self.noise is not NULL: noi = self.noise[:, key] else: noi = NULL return self.__class__(sig, noi, n_pol=self.n_pol) else: if self.n_pol == 1: sig = self.signal[key] if self.noise is not NULL: noi = self.noise[key] else: return sig else: sig = self.signal[key, :] if self.noise is not NULL: noi = self.noise[key, :] else: noi = NULL return self.__class__(sig, noi, n_pol=1 if sig.ndim!=2 else self.n_pol) def __gt__(self, other): raise NotImplementedError('The > operator is not implemented for optical_signal objects.') def __lt__(self, other): raise NotImplementedError('The < operator is not implemented for optical_signal objects.') @property def size(self) -> np.ndarray: """Number of samples of one polarization of the optical signal.""" logger.debug("size") if self.n_pol == 1: return self.signal.size else: return self.signal[0].size
[docs] def plot(self, fmt: str | list='-', mode: Literal['field', 'power'] = 'power', n: int=None, xlabel: str=None, ylabel: str=None, grid: bool=False, hold: bool=True, show: bool=False, **kwargs: dict): r"""Plot signal in time domain. For optical_signal: plots the intensity/power or field. Parameters ---------- fmt : :obj:`str` or :obj:`list`, optional Format style of line. Example ``'b-.'``, Defaults to ``'-'``. mode : :obj:`str`, optional Plot mode. ``'field'``, ``'power'`` (default). - ``'field'`` plot real and imaginary parts of the field one-polarization. - ``'power'`` plot power/intensity (one or two polarizations). n : :obj:`int`, optional Number of samples to plot. Defaults to the length of the signal. xlabel : :obj:`str`, optional X-axis label. Defaults to ``'Time [ns]'``. ylabel : :obj:`str`, optional Y-axis label. Defaults to ``'Power [mW]'`` for power, ``'Field [W**0.5]'`` for field. grid : :obj:`bool`, optional If show grid. Defaults to ``False``. hold : :obj:`bool`, optional If hold the current plot. Defaults to ``True``. **kwargs : :obj:`dict` Additional keyword arguments compatible with ``matplotlib.pyplot.plot()``. Returns ------- self : optical_signal The same object. """ logger.debug("plot()") n = min(self.size, gv.t.size) if n is None else n t = gv.t[:n]*1e9 y = self[:n] if not hold: plt.figure() xlabel = xlabel if xlabel else 'Time [ns]' # Optical signal: plot intensity or field if mode == 'power': I = y.abs('all')**2 * 1e3 # mW if y.n_pol == 1: if not isinstance(fmt, str): warnings.warn('`fmt` must be a string for single polarization signals, using default value.') fmt = '-' args = (t, I, fmt) label = kwargs.pop('label', None) plt.plot(*args, **kwargs, label=label) legend = True if label is not None else False else: if not isinstance(fmt, str): warnings.warn('`fmt` must be a string for multi-polarization signals, using default value.') fmt = '-' args0 = (t, I[0], fmt) # total power args1 = (t, I[1], fmt) # total power label = kwargs.pop('label', None) plt.plot(*args0, **kwargs, label='Pol X' if label is None else label + ' Pol X') plt.plot(*args1, **kwargs, label='Pol Y' if label is None else label + ' Pol Y') legend = True ylabel = ylabel if ylabel else 'Power [mW]' elif mode == 'field': if y.n_pol > 1: raise ValueError('`field` mode is only supported for single polarization signals.') if not isinstance(fmt, str): warnings.warn('`fmt` must be a string for field mode, using default value.') fmt = '-' label = kwargs.pop('label', None) plt.plot(t, y.real, fmt, label='Real' if label is None else label + 'Real') plt.plot(t, y.imag, fmt, label='Imag' if label is None else label + 'Imag') legend = True ylabel = ylabel if ylabel else r'Field [$\sqrt{W}$]' else: raise ValueError('`mode` must be one of ("power", "field") for optical signals.') plt.xlabel(xlabel) plt.ylabel(ylabel) if grid: for i, t_ in enumerate(t[:n*gv.sps][::gv.sps]): plt.axvline(t_, color='gray', ls='--', alpha=0.3, lw=1) plt.axvline(t[-1] + gv.dt*1e9, color='gray', ls='--', alpha=0.3, lw=1) plt.grid(alpha=0.3, axis='y') if legend: plt.legend() if show: plt.show() return self
class EyeShowOptions(): def __init__(self, averages : bool = None, threshold : bool = None, cross_points : bool = None, legends : bool = None, t_opt : bool = None, histogram : bool = None, all_none : bool = False ): self.averages = averages if averages is not None else all_none self.threshold = threshold if threshold is not None else all_none self.cross_points = cross_points if cross_points is not None else all_none self.legends = legends if legends is not None else all_none self.t_opt = t_opt if t_opt is not None else all_none self.histogram = histogram if histogram is not None else all_none
[docs] @logger.auto_indent_methods class eye(): """**Eye Diagram Parameters**. This object contains the parameters of an eye diagram and methods to plot it. .. rubric:: Methods .. autosummary:: __init__ __str__ print plot show Attributes ---------- t : :obj:`np.ndarray` The time values resampled. Shape (Nx1). y : :obj:`np.ndarray` The signal values resampled. Shape (Nx1). dt : :obj:`float` Time between samples. sps : :obj:`int` Samples per slot. t_left : :obj:`float` Cross time of left edge. t_right : :obj:`float` Cross time of right edge. t_opt : :obj:`float` Optimal time decision. t_dist : :obj:`float` Time between slots. t_span0 : :obj:`float` t_opt - t_dist*5%. t_span1 : :obj:`float` t_opt + t_dist*5%. y_top : :obj:`np.ndarray` Samples of signal above threshold and within t_span0 and t_apan1. y_bot : :obj:`np.ndarray` Samples of signal below threshold and within t_span0 and t_apan1. mu0 : :obj:`float` Mean of y_bot. mu1 : :obj:`float` Mean of y_top. s0 : :obj:`float` Standard deviation of y_bot. s1 : :obj:`float` Standard deviation of y_top. er : :obj:`float` Extinction ratio. eye_h : :obj:`float` Eye height. """
[docs] def __init__(self, **kwargs: dict): r""" Initialize the eye diagram object. Parameters ---------- \*\*kwargs : :obj:`dict`, optional Dictionary with the eye diagram parameters. """ logger.debug("%s.__init__()", self.__class__.__name__) if kwargs: for key, value in kwargs.items(): setattr(self, key, value) self.empty = False else: self.empty = True
[docs] def __str__(self, title: str=None): """Return a formatted string with the eye diagram data.""" logger.debug("__str__()") if self.empty: raise ValueError('Empty eye diagram object.') if title is None: title = self.__class__.__name__ title = 3*'*' + f' {title} ' + 3*'*' sub = len(title)*'-' np.set_printoptions(precision=1, threshold=10) msg = f'\n{sub}\n{title}\n{sub}\n ' + '\n '.join([f'{key} : {value}' for key, value in self.__dict__.items() if key != 'execution_time']) msg += f'\n time : {si(self.execution_time, "s", 1)}\n' return msg
[docs] def print(self, msg: str=None): """Print object parameters. Parameters ---------- msg : :obj:`str`, optional Top message to show. Returns ------- self: :obj:`eye` Same object """ logger.debug("print()") print(self.__str__(msg)) return self
[docs] def plot(self, show_options: EyeShowOptions=EyeShowOptions(), hlines: list=[], vlines: list=[], style: Literal['dark', 'light']='dark', cmap: Literal['viridis', 'plasma', 'inferno', 'cividis', 'magma', 'winter']='winter', smooth: bool=True, title: str = '', savefig: str=None, ax = None ): """ Plot eye diagram. Parameters ---------- show_options : :obj:`typing.EyeShowOptions`, optional Options to show in the plot. Default show all. hlines : :obj:`list`, optional A list of time values in which hlines will be set. vlines : :obj:`list`, optional A list of voltage values in which vlines will be set. style : :obj:`str`, optional Plot style. 'dark' or 'light'. cmap : :obj:`str`, optional Colormap to plot. title : :obj:`str`, optional Title of plot. savefig : :obj:`str`, optional Name of the file to save the plot. If None, the plot is not saved. Input just the name of the file without extension (extension is .png by default). Returns ------- self: :obj:`eye` Same object """ logger.debug("plot()") if self.empty: raise ValueError('Empty eye diagram object.') ## SETTINGS # Determine style context for temporary style changes if style == 'dark': style_context = 'dark_background' t_opt_color = '#60FF86' means_color = 'white' bgcolor='black' elif style == 'light': style_context = 'default' t_opt_color = 'green'#'#229954' means_color = '#5A5A5A' bgcolor='white' else: raise TypeError("The `style` argument must be one of the following values ('dark', 'light')") dt = self.dt # Use style context manager only when creating new axes to avoid global state pollution # When ax is provided by caller, respect their style settings from contextlib import nullcontext style_mgr = plt.style.context(style_context) if ax is None else nullcontext() with style_mgr: if show_options.histogram: fig, ax = plt.subplots(1,2, gridspec_kw={'width_ratios': [4,1], 'wspace': 0.03}, figsize=(8,5)) elif ax is None: fig, ax = plt.subplots(1,1) ax = [ax, ax] else: ax = [ax, ax] if title: plt.suptitle(f'Eye diagram {title}') ax[0].set_xlim(-1-dt,1) ax[0].set_ylim(self.mu0-4*self.s0, self.mu1+4*self.s1) ax[0].set_ylabel(r'Amplitude [V]', fontsize=12) ax[0].grid(color='grey', ls='--', lw=0.5, alpha=0.5) ax[0].set_xticks([-1,-0.5,0,0.5,1]) ax[0].set_xlabel(r'Time [$t/T_{slot}$]', fontsize=12) if show_options.t_opt: ax[0].axvline(self.t_opt, color = t_opt_color, ls = '--', alpha = 0.7) ax[0].axvline(self.t_span0, color = t_opt_color, ls = '-', alpha = 0.4) ax[0].axvline(self.t_span1, color = t_opt_color, ls = '-', alpha = 0.4) # crossing points if show_options.cross_points: if self.y_right and self.y_left: ax[0].plot([self.t_left, self.t_right], [self.y_left, self.y_right], 'xr') # threshold if show_options.threshold: ax[0].axhline(self.threshold, c='r', ls='--') if show_options.histogram: ax[1].axhline(self.threshold, c='r', ls='--', label='th') if show_options.legends: ax[1].legend() # horizontal lines for hl in hlines: ax[0].axhline(hl, c='y') if show_options.histogram: ax[1].axhline(hl, c='y') # vertical lines for vl in vlines: ax[0].axvline(vl, c='y') if show_options.histogram: ax[1].axvline (vl, c='y') # legend if show_options.legends: ax[0].legend([r'$t_{opt}$'], fontsize=12, loc='upper right') # means if show_options.averages: ax[0].axhline(self.mu1, color = means_color, ls = ':', alpha = 0.7) ax[0].axhline(self.mu0, color = means_color, ls = '-.', alpha = 0.7) if show_options.histogram: ax[1].axhline(self.mu1, color = means_color, ls = ':', alpha = 0.7, label=r'$\mu_1$') ax[1].axhline(self.mu0, color = means_color, ls = '-.', alpha = 0.7, label=r'$\mu_0$') if show_options.legends: ax[1].legend() if show_options.histogram: ax[1].sharey(ax[0]) ax[1].tick_params(axis='x', which='both', length=0, labelbottom=False) ax[1].tick_params(axis='y', which='both', length=0, labelleft=False) ax[1].grid(color='grey', ls='--', lw=0.5, alpha=0.5) ## ADD PLOTS y_ = np.roll(self.y, -self.sps//2)[self.sps//2 : -self.sps//2] # shift signal to center t_ = self.t[:-self.sps] N = 350 # number of bins heatmap, xedges, yedges = np.histogram2d(t_, y_, bins=N) heatmap_smooth = gaussian_filter(heatmap, sigma=3) if smooth: extent = [xedges[0], xedges[-1], yedges[0], yedges[-1]] vmin, vmax = heatmap.min(), heatmap.max() alpha_values = expit((heatmap_smooth - (vmin + 0.05 * (vmax - vmin))) * 100 / (vmax - vmin)).T*0.8 img = ax[0].imshow( heatmap_smooth.T, extent=extent, origin='lower', aspect='auto', alpha=alpha_values, cmap=cmap, interpolation='bicubic', resample=True, ) # fig.colorbar(img, ax=ax[0]) else: # ax[0].hexbin( # plot eye # x = t_, # y = y_, # gridsize=500, # bins='log', # alpha=0.7, # cmap=cmap # ) t_norm = (t_ - t_.min()) / (t_.max() - t_.min()) y_norm = (y_ - y_.min()) / (y_.max() - y_.min()) it = np.clip((t_norm * (N - 1)).astype(int), 0, N - 1) iy = np.clip((y_norm * (N - 1)).astype(int), 0, N - 1) color_values = heatmap_smooth[it, iy] color_values = (color_values - color_values.min()) / (color_values.max() - color_values.min()) t = t_[:2*self.sps] n_traces = len(y_) // (2*self.sps) Y_reshaped = y_[:n_traces * 2*self.sps].reshape(-1, 2*self.sps) color_values_reshaped = color_values[:n_traces * 2*self.sps].reshape(-1, 2*self.sps) for c, y in zip(color_values_reshaped, Y_reshaped): points = np.array([t, y]).T.reshape(-1, 1, 2) segments = np.concatenate([points[:-1], points[1:]], axis=1) # Asignar color a cada segmento según la grilla colors = getattr(plt.cm, cmap)(c[:-1]) lc = LineCollection(segments, colors=colors, linewidth=1, alpha=0.05) ax[0].add_collection(lc) if show_options.histogram: if smooth: ax[1].plot(heatmap_smooth[170:180].sum(axis=0), np.linspace(y_.min(), y_.max(), 350), color=t_opt_color) else: ax[1].hist( # plot vertical histogram y_[(t_>self.t_opt-0.05*self.t_dist) & (t_<self.t_opt+0.05*self.t_dist)], bins=200, density=True, orientation = 'horizontal', color = t_opt_color, alpha = 0.9, histtype='step', ) if savefig: if savefig.endswith('.png'): plt.savefig(savefig, dpi=300) else: plt.savefig(savefig) return self
[docs] def show(self): """Show plot Returns ------- self : :obj:`eye` The same object. """ logger.debug("show()") plt.show() return self