"""
.. rubric:: Functions
.. autosummary::
dec2bin
str2array
get_time
tic
toc
db
dbm
idb
idbm
gaus
Q
phase
tau_g
dispersion
bode
rcos
si
norm
nearest
theory_BER
p_ase
average_voltages
noise_variances
optimum_threshold
shortest_int
eyediagram
rcos_pulse
gauss_pulse
nrz_pulse
upfir
phase_estimator
get_psd
"""
import re
import numpy as np
import timeit, time as tm
import scipy.special as sp # type: ignore
import scipy.signal as sg # type: ignore
from typing import Literal, Union
from scipy.constants import pi, c, h, e, k as kB # type: ignore
import matplotlib.pyplot as plt
from numpy.fft import fft, ifft, fftfreq, fftshift
import warnings
Array_Like = (list, tuple, np.ndarray)
from numbers import Integral as IntegerNumber
from numbers import Real as RealNumber # inlude integer numbers
from numbers import Complex as ComplexNumber # inlcude real numbers, is not exclusive
from collections.abc import Iterable
def _is_iterable_and_numpy_compatible(obj):
"""
Check if an object is iterable, can be converted into a NumPy array,
and contains only numeric values (complex or real values).
Parameters
----------
obj : any
The input object to be checked.
Returns
-------
bool
True if the object is iterable, can be transformed into a NumPy array,
and contains only numeric values. False otherwise.
Notes
-----
- The function first verifies if `obj` is iterable.
- If it is not iterable, the function returns False immediately.
- If `obj` can be converted into a NumPy array (`np.array(obj)`),
it proceeds to check the numerical validity.
- `numbers.Complex` is used to ensure all elements are numeric.
- The function supports various iterable types like lists, tuples, and NumPy arrays.
- It excludes non-numeric elements such as strings and mixed-type collections.
"""
# Check if the object is iterable
is_iterable = isinstance(obj, Iterable)
if not is_iterable:
return False
try:
array = np.array(obj) # Try to convert it into a NumPy array
except Exception:
return False
result = all(isinstance(x, ComplexNumber) for x in array.flatten())
return result
def _is_numeric(obj):
return isinstance(obj, ComplexNumber)
def _is_real(obj):
return isinstance(obj, RealNumber)
def _is_integer(obj):
return isinstance(obj, IntegerNumber)
[docs]
def dec2bin(num: int, digits: int=8):
r"""
Converts an integer to its binary representation.
Parameters
----------
num : int
Integer to convert.
digits : int, default: 8
Number of bits of the binary representation.
Returns
-------
binary_sequence
Binary representation of ``num`` of length ``digits``.
Raises
------
ValueError
If ``num`` is not an integer number.
ValueError
If ``num`` is too large to be represented with ``digits`` bits.
Example
-------
.. code-block:: python
>>> dec2bin(5, 4)
array([0, 1, 0, 1], dtype=uint8)
"""
if not _is_integer(num):
raise ValueError('`num` must be an integer number.')
binary = np.zeros(digits, np.uint8)
if num > 2**digits-1: raise ValueError(f'The number is too large to be represented with {digits} bits.')
i = digits - 1
while num > 0 and i >= 0:
binary[i] = num % 2
num //= 2
i -= 1
return binary
def _get_type_array_from_str(string):
# just 0 and 1 without +/- --> bool
if re.match(r'^[0-1,;\s]+$', string):
return bool
# just numbers with +/- and without dots --> int
if re.match(r'^[0-9,;\-\+\s]+$', string):
return int
# just numbers with +/- and dots --> float
if re.match(r'^[0-9,;.\+\-\s]+$', string):
return float
# just numbers with +/-, dots and j --> complex
if re.match(r'^[0-9,;.\+\-\sji]+$', string):
return complex
return None
[docs]
def str2array(string: str, dtype: bool | int | float | complex | None = None):
r"""
Converts a string to array of numbers. Use comma (``,``) or whitespace (`` ``) as element separators and semicolon (``;``) as row separator.
Also, ``i`` or ``j`` can be used to represent the imaginary unit.
Parameters
----------
string : :obj:`str`
String to convert.
dtype : :obj:`type`, optional
Data type of the output array.
If ``dtype`` is not given, the data type is determined from the input string.
If ``dtype`` is given, the data output is cast to the given type.
Allowed values are ``bool``, ``int``, ``float`` and ``complex``.
Returns
-------
arr : :obj:`np.ndarray`
Numeric array.
Raises
------
ValueError
If the string contains invalid characters.
Example
-------
For binary numbers, string must contain only 0 and 1.
Only in this case, sequence don't need to be separated by commas or spaces although it is allowed.
>>> str2array('101')
array([ True, False, True])
>>> str2array('1 0 1; 0 1 0')
array([[ True, False, True],
[False, True, False]])
Special case
>>> str2array('1 0 1 10')
array([True, False, True, True, False])
>>> str2array('1 0 1 10', dtype=int)
array([ 1, 0, 1, 10])
>>> str2array('1 0 1 10', dtype=float)
array([ 1., 0., 1., 10.])
>>> str2array('1 0 1 10', dtype=complex)
array([ 1.+0.j, 0.+0.j, 1.+0.j, 10.+0.j])
For integer and float numbers
>>> str2array('1 2 3 4')
array([1, 2, 3, 4])
>>> str2array('1.1 2.2 3.3 4.4')
array([1.1, 2.2, 3.3, 4.4])
For complex numbers
>>> str2array('1+2j 3-4i')
array([1.+2.j, 3.-4.j])
"""
_dtype = _get_type_array_from_str(string)
if _dtype == bool:
# for special cases when string = '10 100 1000' and dtype = int | float | complex
if dtype == int or dtype==float or dtype==complex:
strings = string.split(';')
if len(strings) == 1:
arr = np.array(re.split(r'[,\s]+', strings[0].strip()), dtype=dtype)
else:
arr = np.array([re.split(r'[,\s]+', item.strip()) for item in strings], dtype=dtype)
else:
strings = string.replace(' ', '').replace(',', '').split(';')
if len(strings) == 1:
arr = np.array(list(strings[0])).astype(_dtype)
else:
arr = np.array([list(item) for item in strings]).astype(_dtype)
elif _dtype == int or _dtype==float:
strings = string.split(';')
if len(strings) == 1:
arr = np.array(re.split(r'[,\s]+', strings[0].strip()), dtype=_dtype)
else:
arr = np.array([re.split(r'[,\s]+', item.strip()) for item in strings], dtype=_dtype)
elif _dtype == complex:
strings = string.replace('i','j').split(';')
if len(strings) == 1:
arr = np.array(re.split(r'[,\s]+', strings[0].strip()), dtype=_dtype)
else:
arr = np.array([re.split(r'[,\s]+', item.strip()) for item in strings], dtype=_dtype)
else:
raise ValueError('The string contains invalid characters and can\'t be converted to an array.')
if dtype:
with warnings.catch_warnings():
warnings.simplefilter("ignore", np.ComplexWarning)
return arr.astype(dtype)
return arr
[docs]
def get_time(line_of_code: str, n:int):
r"""
Get the average time of execution of a line of code.
Parameters
----------
line_of_code : str
Line of code to execute.
n : int
Number of iterations.
Returns
-------
time : :obj:`float`
Average time of execution, in seconds.
Example
-------
.. code-block:: python
>>> get_time('for i in range(1000): pass', 1000)
1.1955300000010993e-05
"""
return timeit.timeit(line_of_code, number=n)/n
class _Timer:
def __init__(self):
self.tic_stack = []
def tic(self):
self.tic_stack.append(tm.time())
def toc(self):
if not self.tic_stack:
raise Exception("toc() called without a matching tic()")
start_time = self.tic_stack.pop()
return tm.time() - start_time
# Crear una instancia singleton de Timer
_timer_instance = _Timer()
[docs]
def tic():
r"""
Start a timer. Create a global variable with the current time.
Then you can use toc() to get the elapsed time.
Example
-------
.. code-block:: python
>>> tic() # wait some time
>>> toc()
2.687533378601074
"""
_timer_instance.tic()
[docs]
def toc():
r"""Stop a timer. Get the elapsed time since the last call to tic().
Returns
-------
time : :obj:`float`
Elapsed time, in seconds.
Example
-------
.. code-block:: python
>>> tic() # wait some time
>>> toc()
2.687533378601074
"""
return _timer_instance.toc()
[docs]
def db(x):
r"""Calculates the logarithm in base 10 of the input x and multiplies it by 10.
.. math:: \text{db} = 10\log_{10}{x}
Parameters
----------
x : Number or Array_Like
Input value (``x>=0``).
Returns
-------
out : :obj:`float` or :obj:`np.ndarray`
dB value.
Raises
------
TypeError
If ``x`` is not a `number`, `list`, `tuple` or `ndarray`.
ValueError
If ``x`` or ``any(x) < 0``.
Example
-------
.. code-block:: python
>>> db(1)
0.0
>>> db([1,2,3,4])
array([0. , 3.01029996, 4.77121255, 6.02059991])
"""
if _is_iterable_and_numpy_compatible(x):
x = np.array(x)
if (x<0).any():
raise ValueError('Some values of input array are negative.')
elif _is_numeric(x):
if x<0:
raise ValueError('`x` must be positive.')
else:
raise TypeError('`x` must be a number or an array_like with positive values.')
warnings.filterwarnings("ignore", category=RuntimeWarning) # to avoid warning when x=0
return 10*np.log10(x)
[docs]
def dbm(x):
r"""Calculates dBm from Watts.
.. math:: \text{dbm} = 10\log_{10}{x}+30
Parameters
----------
x : Number or Array_Like
Input value (``x>=0``).
Returns
-------
out : :obj:`float` or :obj:`np.ndarray`
dBm value. If ``x`` is a number, then the output is a :obj:`float`. If ``x`` is an array_like, then the output is an :obj:`np.ndarray`.
Raises
------
TypeError
If ``x`` is not a `number`, `list`, `tuple` or `ndarray`.
ValueError
If ``x`` or ``any(x) < 0``.
Example
-------
.. code-block:: python
>>> dbm(1)
30.0
>>> dbm([1,2,3,4])
array([30. , 33.01029996, 34.77121255, 36.02059991])
"""
return db(x) + 30
[docs]
def idb(x):
r"""Calculates the number value from a dB value.
.. math:: y = 10^{\frac{x}{10}}
Parameters
----------
x : Number or Array_Like
Input value.
Returns
-------
out : :obj:`float` or :obj:`np.ndarray`
Number value. If ``x`` is a number, then the output is a :obj:`float`. If ``x`` is an array_like, then the output is an :obj:`np.ndarray`.
Example
-------
.. code-block:: python
>>> idb(3)
1.9952623149688795
>>> idb([0,3,6,9])
array([1. , 1.99526231, 3.98107171, 7.94328235])
"""
if _is_iterable_and_numpy_compatible(x):
x = np.array(x)
elif not _is_numeric(x):
raise TypeError('The input value must be a number or an array_like.')
return 10**(x/10)
[docs]
def idbm(x):
r"""Calculates the power value in Watts from a dBm value.
.. math:: y = 10^{(\frac{x}{10}-3)}
Parameters
----------
x : Number or Array_Like
Input value.
Returns
-------
out : :obj:`float` or :obj:`np.ndarray`
Power value in Watts. If ``x`` is a number, then the output is a :obj:`float`. If ``x`` is an array_like, then the output is an :obj:`np.ndarray`.
Example
-------
.. code-block:: python
>>> idbm(0)
0.001
>>> idbm([0,3,6,9])
array([0.001 , 0.00199526, 0.00398107, 0.00794328])
"""
if _is_iterable_and_numpy_compatible(x):
x = np.array(x)
elif not _is_numeric(x):
raise TypeError('The input value must be a number or an array_like.')
return 10**(x/10-3)
[docs]
def gaus(x, mu: float=None, std: float=None):
r"""Gaussian function.
.. math:: \text{gaus}(x) = \frac{1}{\sigma\sqrt{2\pi}}e^{-\frac{(x-\mu)^2}{2\sigma^2}}
Parameters
----------
x : Number or Array_Like
Input value.
mu : :obj:`float`, default: 0
Mean.
std : :obj:`float`, default: 1
Standard deviation.
Returns
-------
out : :obj:`float` or :obj:`np.ndarray`
Gaussian function value. If ``x`` is a number, then the output is a :obj:`float`. If ``x`` is an array_like, then the output is an :obj:`np.ndarray`.
Examples
--------
.. code-block:: python
>>> gaus(0, 0, 1)
0.3989422804014327
>>> gaus([0,1,2,3], 0, 1)
array([0.39894228, 0.24197072, 0.05399097, 0.00443185])
.. plot::
:include-source:
:alt: Gaussian function
:align: center
from opticomlib import gaus
import matplotlib.pyplot as plt
import numpy as np
x = np.linspace(-5, 5, 1000)
y = gaus(x, 0, 1)
plt.figure(figsize=(8, 5))
plt.plot(x, y, 'r', lw=2)
plt.ylabel('y')
plt.xlabel('x')
plt.grid(alpha=0.3)
plt.show()
"""
if _is_iterable_and_numpy_compatible(x):
x = np.array(x)
elif not _is_numeric(x):
raise TypeError('The input value must be a number or an array_like.')
if mu is None: mu = 0
if std is None: std = 1
return 1/std/(2*pi)**0.5*np.exp(-0.5*(x-mu)**2/std**2)
[docs]
def Q(x):
r"""
Q-function.
.. math:: Q(x) = \frac{1}{2}\text{erfc}\left( \frac{x}{\sqrt{2}} \right)
Parameters
----------
x : Numper or Array_Like
Input value.
Returns
-------
out : :obj:`float` or :obj:`np.ndarray`
Q(x) values. If ``x`` is a number, then the output is a :obj:`float`. If ``x`` is an array_like, then the output is an :obj:`np.ndarray`.
Examples
--------
.. code-block:: python
>>> Q(0)
0.5
>>> Q([0,1,2,3])
array([0.5 , 0.15865525, 0.02275013, 0.0013499 ])
.. plot::
:include-source:
:alt: Gaussian function
:align: center
from opticomlib import Q
import matplotlib.pyplot as plt
import numpy as np
x = np.linspace(-5, 5, 1000)
plt.figure(figsize=(8, 5))
plt.plot(x, Q(x), 'r', lw=3, label='Q(x)')
plt.plot(x, Q(-x), 'b', lw=3, label='Q(-x)')
plt.ylabel('y')
plt.xlabel('x')
plt.legend()
plt.grid()
plt.show()
"""
if _is_iterable_and_numpy_compatible(x):
x = np.array(x)
elif not _is_numeric(x):
raise TypeError('The input value must be a number or an array_like.')
return 0.5*sp.erfc(x/2**0.5)
[docs]
def phase(x: np.ndarray, zero_ref_index: int=None):
r"""
Calculate the unwrapped phase the signal.
Parameters
----------
x : :obj:`np.ndarray`
Signal to calculate the phase
zero_ref_index : int, default: ``None``
Position of signal to take zero phase reference. By default zero phase reference is set to position 0 of x.
Returns
-------
phase : :obj:`np.ndarray`
Unwrapped phase in radians.
Examples
--------
.. plot::
:include-source:
:alt: Gaussian function
:align: center
from opticomlib import phase
import matplotlib.pyplot as plt
import numpy as np
t = np.linspace(-5, 5, 1000)
y = np.exp(1j*t**2)
phi = phase(y)
plt.figure(figsize=(8, 5))
plt.plot(t, phi, 'r', lw=2)
plt.ylabel('phase [rad]')
plt.xlabel('t')
plt.grid(alpha=0.3)
plt.show()
"""
if not _is_iterable_and_numpy_compatible(x):
raise TypeError('The input value must be an array_like.')
phase_ = np.angle(x)
if zero_ref_index is not None:
offset = phase_[zero_ref_index]
else:
offset = 0
return np.unwrap(phase_) - offset
[docs]
def tau_g(x: np.ndarray, fs: float):
r"""
Calculate the group delay of a frequency response.
Parameters
----------
x : :obj:`np.ndarray`
Frequency response of a system.
fs : :obj:`float`
Sampling frequency of the system.
Returns
-------
tau: :obj:`np.ndarray`
Group delay of the system, in [ps].
Examples
--------
.. plot::
:include-source:
:alt: Gaussian function
:align: center
from opticomlib import tau_g
import matplotlib.pyplot as plt
import numpy as np
t = np.linspace(-5, 5, 1000)
y = np.exp(1j*t**2)
phi = tau_g(y, 1e2)
plt.figure(figsize=(8, 5))
plt.plot(t, phi, 'r', lw=2)
plt.ylabel(r'$\tau_g$ [ps]')
plt.xlabel('t')
plt.grid(alpha=0.3)
plt.show()
"""
if not _is_iterable_and_numpy_compatible(x):
raise TypeError('The input value must be an array_like.')
dw = 2*pi*fs/x.size
phase_ = phase(x)
return np.diff(phase_, prepend=phase_[0])/dw * 1e12
[docs]
def dispersion(x: np.ndarray, fs: float, f0: float):
"""
Calculate the dispersion of a frequency response.
Parameters
----------
x : :obj:`np.ndarray`
Frequency response of a system.
fs : :obj:`float`
Sampling frequency of the system.
f0 : :obj:`float`
Center frequency of the system.
Returns
-------
D : :obj:`np.ndarray`
Cumulative dispersion of the system, in [ps/nm].
"""
if not _is_iterable_and_numpy_compatible(x):
raise TypeError('The input value must be an array_like.')
f = fftshift(fftfreq(x.size, d=1/fs))
dλ = np.diff(c/(f+f0))[0]*1e9
tau_g_ = tau_g(x, fs)
D = np.diff(tau_g_, prepend=tau_g_[0])/dλ
return D
[docs]
def bode(H: np.ndarray,
fs: float,
f0: float=None,
xaxis: Literal['f','w','lambda']='f',
disp: bool=False,
yscale : Literal['linear', 'db']='linear',
ret: bool=False,
retAxes: bool=False,
show_: bool=True,
xlim: tuple=None):
r"""
Plot the Bode plot of a given transfer function H (magnitude, phase and group delay).
Parameters
----------
H : :obj:`np.ndarray`
The transfer function.
fs : :obj:`float`
The sampling frequency.
f0 : :obj:`float`, default: None
The center frequency. If not None, dispersion are also plotted.
xaxis : :obj:`str`, default: 'f'
The x-axis (frequency, angular velocity, wavelength).
disp : :obj:`bool`, default: False
Whether to plot the dispersion.
ret : :obj:`bool`, default: False
Whether to return the plotted data.
show_ : :obj:`bool`, default: True
Whether to display the plot.
style : :obj:`str`, default: 'dark'
The plot style.
Returns
-------
(f, H, phase, tau_g) : :obj:`np.ndarray`
A tuple containing the frequency, magnitude, phase, and group delay if ``ret=True``.
Raises
------
ValueError
If style is not "dark" or "light".
Example
-------
.. code-block:: python
>>> from opticomlib import bode
>>> H, phase, tau_g = bode(H, fs, ret=True, show_=False)
"""
if not isinstance(H, np.ndarray):
raise ValueError('`H` must be a numpy.ndarray.')
f = fftshift(fftfreq(H.size, d=1/fs))
w = 2*pi*f
if xaxis == 'f':
x = f*1e-9
xlabel = 'Frequency [GHz]'
elif xaxis == 'w':
x = w*1e-9
xlabel = r'$\omega$ [Grad/s]'
elif xaxis == 'lambda':
if not f0:
raise ValueError('`f0` must be specify for determine lambda vector.')
x = (c/(f+f0) - c/f0)*1e9
xlabel = r'$\lambda$ [nm]'
nplots = 4 if disp and f0 else 3
if yscale=='db':
y = db(np.abs(H)**2)
ylabel = r'$|H(\omega)|^2$ [dB]'
ylim = (-60, 1)
pad = 32
elif yscale=='linear':
y = np.abs(H)**2
ylabel = r'$|H(\omega)|^2$'
ylim = (-0.1, 1.1)
pad = 20
else:
raise ValueError('`yscale` must be "linear" or "db".')
_, axs = plt.subplots(nplots, 1, figsize=(8, 6), sharex=True, gridspec_kw={'hspace': 0.02})
plt.suptitle('Frequency Response')
axs[0].plot(x, y, 'r', lw=2)
axs[0].set_ylabel(ylabel, rotation=0, labelpad=pad)
axs[0].grid(alpha=0.3)
axs[0].yaxis.set_label_position("left")
axs[0].yaxis.tick_right()
axs[0].set_ylim(*ylim)
axs[1].plot(x, phase(H), 'b', lw=2)
axs[1].set_ylabel(r'$\phi$ [rad]', rotation=0, labelpad=pad)
axs[1].grid(alpha=0.3)
axs[1].yaxis.set_label_position("left")
axs[1].yaxis.tick_right()
axs[2].plot(x, sg.medfilt(tau_g(H, fs), 7), 'g', lw=2)
axs[2].set_ylabel(r'$\tau_g$ [ps]', rotation=0, labelpad=pad)
axs[2].grid(alpha=0.3)
axs[2].yaxis.set_label_position("left")
axs[2].yaxis.tick_right()
if disp:
if not f0:
raise ValueError('`f0` must be specify to determine dispersion.')
axs[3].plot(x, sg.medfilt(dispersion(H, fs, f0), 7), 'm', lw=2)
axs[3].set_ylabel(r'D [ps/nm]', rotation=0, labelpad=28)
axs[3].set_xlabel(xlabel)
axs[3].grid(alpha=0.3)
axs[3].yaxis.set_label_position("left")
axs[3].yaxis.tick_right()
else:
axs[2].set_xlabel(xlabel)
if xlim:
plt.xlim(xlim)
if retAxes:
return axs
if show_:
plt.show()
if ret:
return f, H, phase, tau_g
[docs]
def rcos(x, alpha, T):
r"""
Raised cosine spectrum function.
Parameters
----------
x : Number or Array_Like
Input values.
alpha : :obj:`float`
Roll-off factor.
T : :obj:`float`
Symbol period.
Returns
-------
:obj:`np.ndarray`
Raised cosine function.
Example
-------
https://en.wikipedia.org/wiki/Raised-cosine_filter
.. plot::
:include-source:
:alt: Raised cosine function
:align: center
from opticomlib import rcos
import matplotlib.pyplot as plt
import numpy as np
T = 1
x = np.linspace(-1.5/T, 1.5/T, 1000)
plt.figure(figsize=(8, 5))
for alpha in [0, 0.25, 0.5, 1]:
plt.plot(x, rcos(x, alpha, T), label=r'$\alpha$ = {}'.format(alpha))
plt.ylabel('y')
plt.xlabel('x')
plt.legend()
plt.grid(alpha=0.3)
plt.show()
"""
first_condition = np.abs(x) <= (1-alpha)/(2*T)
second_condition = (np.abs(x)>(1-alpha)/(2*T)) & (np.abs(x)<=(1+alpha)/(2*T))
third_condition = np.abs(x) > (1+alpha)/(2*T)
if _is_numeric(x):
return 1 if first_condition else 0 if third_condition else 0.5*(1+np.cos(pi*T/alpha*(np.abs(x)-(1-alpha)/(2*T))))
if not _is_iterable_and_numpy_compatible(x):
raise TypeError('`x` must be a number or an array_like.')
x = np.array(x)
y = np.zeros_like(x)
y[ first_condition ] = 1
if alpha != 0:
y[ second_condition ] = 0.5*(1+np.cos(pi*T/alpha*(np.abs(x[second_condition])-(1-alpha)/(2*T))))
return y
[docs]
def si(x, unit: Literal['m','s']='s', k: int=1):
r"""
Unit of measure classifier.
Parameters
----------
x : int | float
Number to classify.
unit : str, default: 's'
Unit of measure. Valid options are {'s', 'm', 'Hz', 'rad', 'bit', 'byte', 'W', 'V', 'A', 'F', 'H', 'Ohm'}.
k : int, default: 1
Precision of the output.
Returns
-------
str
String with number and unit.
Example
-------
.. code-block:: python
>>> si(0.002, 's')
'2.0 ms'
>>> si(1e9, 'Hz')
'1.0 GHz'
"""
if not _is_numeric(x):
raise TypeError('`x` must be a numeric value.')
if 1e12<= x:
return f'{x*1e-9:.{k}f} T{unit}'
if 1e9<= x <1e12:
return f'{x*1e-9:.{k}f} G{unit}'
if 1e6<= x <1e9:
return f'{x*1e-6:.{k}f} M{unit}'
if 1e3<= x <1e6:
return f'{x*1e-3:.{k}f} k{unit}'
if 1<= x <1e3:
return f'{x:.{k}f} {unit}'
if 1e-3<= x <1:
return f'{x*1e3:.{k}f} m{unit}'
if 1e-6<= x <1e-3:
return f'{x*1e6:.{k}f} μ{unit}'
if 1e-9<= x <1e-6:
return f'{x*1e9:.{k}f} n{unit}'
if 1e-12<= x <1e-9:
return f'{x*1e12:.{k}f} p{unit}'
if 1e-15<= x <1e-12:
return f'{x*1e15:.{k}f} f{unit}'
if x == 0:
return f'{x:.{k}f} {unit}'
[docs]
def norm(x):
"""
Normalize an array by dividing each element by the maximum value in the array.
Parameters
----------
x : Array_Like
Input array to be normalized.
Returns
-------
out : np.ndarray
Normalized array.
Raises
------
ValueError
If ``x`` is not an `array_like`.
"""
if _is_iterable_and_numpy_compatible(x):
x = np.array(x)
else:
raise TypeError('`x` must be an array_like.')
return x/x.max()
[docs]
def nearest(x, a):
"""
Find the value of X closest to a.
Parameters
----------
X : Array_Like
Input array.
A : Number or Array_Like
Reference value or values to find in X.
Returns
-------
out : Number
Nearest value in the array.
Raises
------
ValueError
If ``x`` is not an `array_like`.
If ``a`` is not a `number`.
"""
if _is_iterable_and_numpy_compatible(x):
x = np.array(x)
else:
raise ValueError('`x` must be an array_like.')
if _is_iterable_and_numpy_compatible(a):
return x[np.argmin(
np.abs(
np.repeat([x], len(a), axis=0) - np.reshape(a, (-1, 1))
),
axis=1,
)]
if _is_numeric(a):
return x[np.argmin(np.abs(x - a))]
raise ValueError('`A` must be a number or an array_like')
[docs]
def nearest_index(X, A):
"""
Find the indices of X for the values of X closest to the values of A.
Parameters
----------
X : Array_Like
Input array.
A : Number or Array_Like
Value or values to find in X.
Returns
-------
out : Number or np.ndarray
Indices of the nearest values in the array.
Raises
------
TypeError
If ``X`` is not an `array_like`.
"""
if _is_iterable_and_numpy_compatible(X):
X = np.array(X)
else:
raise TypeError('`X` must be an array_like.')
if _is_iterable_and_numpy_compatible(A):
return np.argmin(
np.abs(
np.repeat([X], len(A), axis=0) - np.reshape(A, (-1, 1))
),
axis=1,
)
if _is_numeric(A):
return np.argmin(np.abs(X - A))
raise TypeError('`A` must be a number or an array_like')
[docs]
def p_ase(
amplify=True,
wavelength=1550e-9,
G=None,
NF=None,
BW_opt=None,
):
"""
Calculate the ASE noise power [Watts].
Parameters
----------
amplify : bool, default: True
If use an EDFA or not at the receiver (before PIN).
wavelength : float, default: 1550e-9
Wavelength of the signal.
G : float
Gain of EDFA, in [dB]. Only used if `amplify=True`. This parameter is mandatory.
NF : float
Noise Figure of EDFA, in [dB]. Only used if `amplify=True`. Mandatory.
BW_opt : float
Bandwidth of optical filter that is placed after the EDFA, in [Hz]. Only used if `amplify=True`. Mandatory.
Returns
-------
p_ase : float
ASE optical noise power, in [W].
"""
if amplify:
if not (G is not None and NF is not None and BW_opt is not None):
raise ValueError('`G`, `NF` and `BW_opt` must be specify.')
nf = idb(NF)
g = idb(G)
f0 = c/wavelength
p_ase = nf * h * f0 * (g - 1) * BW_opt # ASE optical noise
else:
p_ase = 0
return p_ase
[docs]
def average_voltages(
P_avg,
modulation: Literal['ook', 'ppm'],
M=None,
ER=np.inf,
amplify=True,
wavelength=1550e-9,
G=None,
NF=None,
BW_opt=None,
r=1.0,
R_L=50,
):
"""
Calculate the average voltages of the ON and OFF slots [Voltages].
Parameters
----------
P_avg : float
Average Received input optical Power (in [dBm]).
modulation : Literal['ook', 'ppm']
Kind of modulation format {'ook', 'ppm'}, more modulations in future...
M : int
Order of M-ary PPM (a power of 2). Only needed if `modulation='ppm'`.
ER : float, default: np.inf
Extinction Ratio of the input optical signal, in [dB].
amplify : bool, default: True
If use an EDFA or not at the receiver (before PIN).
wavelength : float, default: 1550e-9
Wavelength of the signal.
G : float
Gain of EDFA, in [dB]. Only used if `amplify=True`. This parameter is mandatory.
NF : float
Noise Figure of EDFA, in [dB]. Only used if `amplify=True`. Mandatory.
BW_opt : float
Bandwidth of optical filter that is placed after the EDFA, in [Hz]. Only used if `amplify=True`. Mandatory.
r : float, default: 1.0
Responsivity of photo-detector.
R_L : float, default: 50
Load resistance of photo-detector, in [Ω].
Returns
-------
mu: np.ndarray
Average voltage of ON and OFF slots. mu[0] is the OFF slot and mu[1] is the ON slot.
mu_ASE: float
ASE voltage offset.
"""
M = 2 if modulation.lower() == 'ook' else M
er = idb(ER) # extinction ratio
p_avg = idbm(P_avg) # average input power, in [W]
if amplify:
if G is None: raise ValueError("G must be provided if amplify=True")
g = idb(G) # gain of EDFA
else:
g = 1.0
p_ON = p_avg * M / (1 + (M-1)/er) # ON slot average optical power, without amplification
p_OFF = p_ON/er # OFF slot average optical power, without amplification
mu_ASE = r * p_ase(amplify, wavelength, G, NF, BW_opt) * R_L # ASE voltage offset
mu = r * g * np.array([p_OFF, p_ON]) * R_L + mu_ASE # average voltage of ON and OFF slots
return mu, mu_ASE
[docs]
def noise_variances(
P_avg,
modulation: Literal['ook', 'ppm'],
M=None,
ER=np.inf,
amplify=True,
wavelength=1550e-9,
G=None,
NF=None,
BW_opt=None,
r=1.0,
BW_el=5e9,
R_L=50,
T=300,
NF_el = 0
):
"""
Calculate the theoretical noise variances for OFF and ON slots, include sig-ase, ase-ase, thermal and shot noises [V^2].
If ``amplify=False`` only thermal and shot are calculated.
Parameters
----------
P_avg: float
Average Received input optical Power (in [dBm]).
modulation: Literal['ook', 'ppm']
Kind of modulation format {'ook', 'ppm'}, more modulations in future...
M: int
Order of M-ary PPM (a power of 2). Only needed if `modulation='ppm'`.
ER: float
Extinction Ratio of the input optical signal, in [dB].
amplify: bool
If use an EDFA or not at the receiver (before PIN). Default: `False`.
wavelength: float
Central frequency of communication, in [Hz]. Only used if `amplify=True`. Default: `1550 nm`.
G: float
Gain of EDFA, in [dB]. Only used if `amplify=True`. This parameter is mandatory.
NF: float
Noise Figure of EDFA, in [dB]. Only used if `amplify=True`. Mandatory.
BW_opt: float
Bandwidth of optical filter that is placed after the EDFA, in [Hz]. Only used if `amplify=True`. Mandatory.
r: float
Responsivity of photo-detector. Default: 1.0 [A/W]
BW_el: float
Bandwidth of photo-detector or electrical filter, in [Hz]. Default: 5e9 [Hz].
R_L: float
Load resistance of photo-detector, in [Ω]. Default: 50 [Ω].
T: float
Temperature of photo-detector, in [K]. Default: 300 [K].
NF_el: float
Equivalent Noise Figure of electric circuit, in [dB]. Default: 0 [dB]
"""
mu, mu_ASE = average_voltages(P_avg, modulation, M, ER, amplify, wavelength, G, NF, BW_opt, r, R_L)
nf_el = idb(NF_el)
if amplify:
l = BW_el/BW_opt
S_sig_ase_i = 2 * mu_ASE * (mu-mu_ASE) * l # signal-ase beating noise variance, in [V^2]
S_ase_ase = mu_ASE**2 * (1 - l/2) * l # ase-ase beating noise variance, in [V^2]
else:
S_sig_ase_i = 0
S_ase_ase = 0
S_th = 4 * kB * T * BW_el * R_L # thermal noise variance, in [V^2]
S_sh_i = 2 * e * mu * BW_el * R_L # shot noise variance, in [V^2]
S = (S_th + S_sig_ase_i + S_ase_ase + S_sh_i) * nf_el # variance of ON and OFF slots
return S
[docs]
def optimum_threshold(mu0,mu1,S0,S1, modulation: Literal['ook', 'ppm'], M=None):
"""
Calculate the optimum threshold for binary modulation formats.
Parameters
----------
mu0: float
Average voltage of OFF slot.
mu1: float
Average voltage of ON slot.
S0: float
Noise variance of OFF slot.
S1: float
Noise variance of ON slot.
modulation: str
Modulation format
M: int
PPM order
Returns
-------
threshold: float
Optimum threshold value.
"""
M = 2 if modulation.lower() == 'ook' else M
if S1 == S0:
return (mu0 + mu1) / 2
s1=S1**0.5
s0=S0**0.5
threshold = 1/(S1-S0)*(mu0*S1 - mu1*S0 + s1*s0*np.sqrt((mu1-mu0)**2 + 2*(S1-S0)*np.log(s1/s0*(M-1))))
return threshold
[docs]
def theory_BER(
P_avg,
modulation: Literal['ook', 'ppm'],
M=None,
decision=None,
threshold=None,
ER=np.inf,
amplify=False,
f0=193.4145e12,
G=None,
NF=None,
BW_opt=None,
r=1.0,
BW_el=5e9,
R_L=50,
T=300,
NF_el=0
):
"""
This function calculates the bit error rate (BER) for an OPTICAL RECEIVER, based on a PIN photodetector, from the average input power.
It also allows consider the effects of an EDFA amplifier in the results.
If ``amplify==False``, thermal and shot noise of PIN will be consider in BER calculation.
If ``amplify==True``, signal-ase and ase-ase beating noises generated by the EDFA are consider too.
In addition, parameter ``NF_el != 0`` (electrical noise figure) can be used to represent another sources of noise after detection, like electrical amplifiers, etc.
Parameters
----------
P_avg: float
Average Received input optical Power (in [dBm]).
modulation: Literal['ook', 'ppm']
Kind of modulation format {'ook', 'ppm'}, more modulations in future...
M: int
Order of M-ary PPM (a power of 2). Only needed if `modulation='ppm'`.
decision: Literal['hard', 'soft']
Kind of PPM decision. 'hard' decision use the optimum threshold to separate ON and OFF slots. 'soft' decision
use the Maximum a Posteriori (MAP) and it outperform 'hard' decision. Only needed if `modulation='ppm'`.
threshold: float
Threshold for decision. Only needed if (`modulation='ook'`) or (`modulation='ppm` and `decision='hard'`). Value
must be in (0, 1), without include edges. By default optimum threshold is used.
ER: float
Extinction Ratio of the input optical signal, in [dB].
amplify: bool
If use an EDFA or not at the receiver (before PIN). Default: `False`.
f0: float
Central frequency of communication, in [Hz]. Only used if `amplify=True`. Default: `193.4 THz` corresponding to `1550 nm`.
G: float
Gain of EDFA, in [dB]. Only used if `amplify=True`. This parameter is mandatory.
NF: float
Noise Figure of EDFA, in [dB]. Only used if `amplify=True`. Mandatory.
BW_opt: float
Bandwidth of optical filter that is placed after the EDFA, in [Hz]. Only used if `amplify=True`. Mandatory.
r: float
Responsivity of photo-detector. Default: 1.0 [A/W]
BW_el: float
Bandwidth of photo-detector or electrical filter, in [Hz]. Default: 5e9 [Hz].
R_L: float
Load resistance of photo-detector, in [Ω]. Default: 50 [Ω].
T: float
Temperature of photo-detector, in [K]. Default: 300 [K].
NF_el: float
Equivalent Noise Figure of electric circuit, in [dB]. Default: 0 [dB]
Returns
-------
BER : float
Theoretical Bit Error rate of the system specified.
Notes
-----
The bandwidths are used only for the determination of the noise contribution to the BER value, i.e. the signal
distortion due to these bandwidth is not taken into account. Therefore, the bandwidths ``BW_el`` and ``BW_opt`` are
not considered to affect the transmitted signal.
Example
-------
.. plot::
:include-source:
:alt: Raised cosine function
:align: center
from opticomlib import theory_BER
import matplotlib.pyplot as plt
import numpy as np
x = np.linspace(-40, -20, 1000) # Average input optical power [dB]
plt.figure(figsize=(8, 6))
plt.semilogy(x, theory_BER(P_avg=x, modulation='ook'), label='OOK')
plt.semilogy(x, theory_BER(P_avg=x, modulation='ppm', M=4, decision='soft'), label='4-PPM (soft)')
plt.semilogy(x, theory_BER(P_avg=x, modulation='ppm', M=4, decision='hard'), label='4-PPM (hard)')
plt.xlabel(r'$P_{avg}$')
plt.ylabel('BER')
plt.legend()
plt.grid(alpha=0.3)
plt.ylim(1e-9,)
plt.show()
"""
from scipy.constants import h, k as kB, e, pi
from scipy.integrate import quad
@np.vectorize(otypes=[np.float64]) # se creó esta función envoltorio para que se mostrara bien luego en la documentación.
def temp(
P_avg,
modulation: Literal['ook', 'ppm'],
M=None,
decision=None,
threshold=None,
ER=np.inf,
amplify=False,
f0=193.4145e12,
G=None,
NF=None,
BW_opt=None,
r=1.0,
BW_el=5e9,
R_L=50,
T=300,
NF_el=0
):
if amplify:
if G is None:
raise ValueError('Enter the EDFA gain "G" in [dB].')
if NF is None:
raise ValueError('Enter the EDFA noise figure "NF" in [dB].')
if BW_opt is None:
raise ValueError('Enter the bandwidth of the optical filter "BW_opt" in [Hz].')
g = idb(G)
nf = idb(NF)
l = BW_el/BW_opt
p_ase = nf * h * f0 * (g - 1) * BW_opt # ASE optical noise
mu_ASE = r * p_ase * R_L # ASE voltage offset
else:
g = 1
l = 1
mu_ASE = 0
M = 2 if modulation.lower() == 'ook' else M
er = idb(ER) # extinction ratio
nf_el = idb(NF_el) # electrical noise figure
p_avg = idbm(P_avg) # average input power, in [W]
p_ON = p_avg * M / (1 + (M-1)/er) # ON slot average optical power, without amplification
p_OFF = p_ON/er # OFF slot average optical power, without amplification
mu_ON = r * g * p_ON * R_L + mu_ASE # ON slot voltage
mu_OFF = r * g * p_OFF * R_L + mu_ASE # OFF slot voltage
S_sig_ase_i = 2 * mu_ASE * np.array([(mu - mu_ASE) for mu in [mu_OFF, mu_ON]]) * l # signal-ase beating noise variance, in [V^2]
S_ase_ase = mu_ASE**2 * (1 - l/2) * l # ase-ase beating noise variance, in [V^2]
S_th = 4 * kB * T * BW_el * R_L * nf_el # thermal noise variance, in [V^2]
S_sh_i = 2 * e * np.array([mu_OFF, mu_ON]) * BW_el * R_L # shot noise variance, in [V^2]
s = (S_th + S_sig_ase_i + S_ase_ase + S_sh_i)**0.5 # santandar desviation of ON and OFF slots
if modulation.lower() == 'ppm':
if M is None:
raise ValueError('Enter a value for "M".')
if M<2 or (M & (M - 1)):
raise ValueError('The parameter "M" must be a power of 2 greater than or equal to 2.')
if decision.lower()=='hard':
SER = lambda x: 1 - Q((x-mu_ON)/s[1]) * (1-Q((x-mu_OFF)/s[0]))**(M-1) # hard decision
if threshold is not None:
if threshold<=0 or threshold>=1:
raise ValueError('The threshold value must be in the range (0, 1).')
SER = SER(threshold * mu_ON + (1 - threshold) * mu_OFF)
else:
SER = SER(np.linspace(mu_OFF, mu_ON, 5000)).min()
elif decision.lower()=='soft':
SER = 1-1/(2*pi)**0.5*quad(lambda x: (1-Q((mu_ON-mu_OFF+s[1]*x)/s[0]))**(M-1)*np.exp(-x**2/2),-np.inf,np.inf)[0]
else:
raise ValueError('decision must be "hard" or "soft"')
BER = SER * M/2/(M-1)
elif modulation.lower() == 'ook':
BER = lambda x: 0.5*(Q((mu_ON-x)/s[1]) + Q((x-mu_OFF)/s[0]))
if threshold is not None:
if threshold<=0 or threshold>=1:
raise ValueError('The threshold value must be in the range (0, 1).')
BER = BER(threshold * mu_ON + (1 - threshold) * mu_OFF)
else:
BER = BER(np.linspace(mu_OFF, mu_ON, 5000)).min()
else:
raise KeyError(f'The modulation type "{modulation}" is invalid.')
return BER
return temp(P_avg, modulation, M, decision, threshold, ER, amplify, f0, G, NF, BW_opt, r, BW_el, R_L, T, NF_el)
[docs]
def shortest_int(x: np.ndarray, percent: float=50) -> tuple[float, float]:
r"""
Estimation of the shortest interval of x values, containing {``percent``}% of the samples in 'x'.
Parameters
----------
x : ndarray
Data of not complex values. If data is complex, real part will be taken.
percent : real number
percent of of data.
Returns
-------
tuple[float, float]
The shortest interval containing 50% of the samples in 'data'.
"""
if not _is_iterable_and_numpy_compatible(x):
raise TypeError('`x` must be an array_like.')
if not _is_real(percent) or percent <= 0 or percent >100:
raise ValueError('`percent` must be a real number between (0, 100].')
diff_lag = (
lambda data, lag: data[lag:] - data[:-lag]
) # Difference between two elements of an array separated by a distance 'lag'
x = np.sort(x.real)
lag = int(len(x) * percent/100)
# Validate that lag is at least 1 to ensure diff_lag can operate correctly
if lag < 1:
raise ValueError(
f"Computed lag ({lag}) must be at least 1. "
f"The provided percent ({percent}%) is too small for the length of x ({len(x)}). "
f"Choose a larger percent value (at least {100/len(x):.4f}%)."
)
diff = diff_lag(x, lag)
i = np.where(np.abs(diff - np.min(diff)) < 1e-10)[0]
if len(i) > 1:
i = int(np.mean(i))
return np.array((x[i], x[i + lag]))
# --- Función para Aplicar el Filtro Gaussiano Optimizado ---
[docs]
def apply_optimized_gaussian_filter(t, signal, T_bit):
"""
Applies a Gaussian filter to a NRZ signal. The filter is optimized based on the bit duration, to a sigma of 0.139 * T_bit.
Args:
t (np.ndarray): Time vector corresponding to signal_in.
signal (np.ndarray): NRZ input signal.
T_bit (float): Bit duration, used as a reference for sigma.
Returns:
np.ndarray: The filtered signal with scaled amplitude.
"""
dt = t[1] - t[0]
if dt <= 0:
raise ValueError("El paso de tiempo dt debe ser positivo.")
# Calcular parámetros del kernel basados en el sigma óptimo
std_dev_points = T_bit * 0.139 / dt
# Determinar el tamaño del kernel. Debe ser impar y lo suficientemente grande.
# Un tamaño basado en 6*sigma_en_puntos cubre >99.7% de la gaussiana.
kernel_size = int(6 * std_dev_points)
if kernel_size % 2 == 0:
kernel_size += 1 # Asegurar que es impar para un centro claro
if kernel_size < 3: # Asegurar un tamaño mínimo para el kernel
kernel_size = 3
# Asegurar que el kernel no sea más grande que la señal de entrada (menos un pequeño margen)
max_possible_kernel_size = len(signal) - 2
if kernel_size > max_possible_kernel_size:
print(f"Advertencia: El tamaño calculado del kernel ({kernel_size}) es mayor que la señal ({len(signal)}). Reduciendo a {max_possible_kernel_size}.")
kernel_size = max_possible_kernel_size
if kernel_size < 3:
print("Error: El tamaño de la señal es demasiado pequeño para aplicar un filtro significativo.")
return np.zeros_like(signal) # Devolver ceros si no se puede filtrar
# Crear el kernel gaussiano usando la función corregida
from scipy.signal.windows import gaussian
gaussian_kernel = gaussian(kernel_size, std=std_dev_points)
# Normalizar el kernel para preservar el área (o nivel DC) de la señal
kernel_sum = np.sum(gaussian_kernel)
gaussian_kernel /= kernel_sum
# Realizar la convolución
# mode='same' asegura que la salida tenga el mismo tamaño y esté centrada
from scipy.signal import convolve
convolved_signal = convolve(signal, gaussian_kernel, mode='same')
return convolved_signal
[docs]
def eyediagram(y, sps, n_traces=None, cmap='viridis',
N_grid_bins=200, grid_sigma=5, style: Literal['line', 'dot', 'density']='dot', ax=None, **plot_kw):
r"""Plots a colored eye diagram, internally calculating color density.
Parameters
----------
y : np.ndarray
Full amplitude array (1D).
sps : int
Samples per symbol. Used to segment the eye traces.
n_traces : int, optional
Maximum number of traces to plot. If None, all available traces
will be plotted. Defaults to None.
cmap : str, optional
Name of the matplotlib colormap. Defaults to 'viridis'.
N_grid_bins : int, optional
Number of bins for the density histogram. Defaults to 200.
grid_sigma : float, optional
Sigma for the Gaussian filter applied to the density. Defaults to 5.
style : Literal['line', 'dot', 'density'], optional
Style of the eye diagram plot:
- 'line': Plots traces as lines.
- 'dot': Plots traces as dots.
- 'density': Plots density map only.
ax : matplotlib.axes.Axes, optional
Axes object to plot on. If None, creates new figure and axes.
Defaults to None.
\*\*plot_kw : dict, optional
Additional plotting parameters:
Figure parameters (used only if ax is None):
- figsize : tuple, default (10, 6)
- dpi : int, default 100
Line collection parameters:
- linewidth : float, default 0.75
- alpha : float, default 0.25
- capstyle : str, default 'round'
- joinstyle : str, default 'round'
Axes formatting parameters:
- xlabel : str, default "Time (2-symbol segment)"
- ylabel : str, default "Amplitude"
- title : str, default "Eye Diagram ({num_traces} traces)"
- grid : bool, default True
- grid_alpha : float, default 0.3
- xlim : tuple, optional (xmin, xmax)
- ylim : tuple, optional (ymin, ymax)
- tight_layout : bool, default True
Display parameters:
- show : bool, default False (whether to call plt.show())
Returns
-------
matplotlib.axes.Axes
The axes object containing the eye diagram plot.
"""
# Truncate signals to avoid edge artifacts
# Remove sps//2 samples from each end
start_idx = sps // 2
end_idx = len(y) - sps // 2
if start_idx >= end_idx:
raise ValueError(f"Signal too short for truncation. Need at least {sps} samples, got {len(y)}.")
Y_truncated = y[start_idx:end_idx]
# Eye diagram parameters
num_points_per_trace = 2 * sps
if len(Y_truncated) < num_points_per_trace:
raise ValueError(f"Need at least {num_points_per_trace} points for eye diagram, got {len(Y_truncated)} after truncation.")
available_traces = len(Y_truncated) // num_points_per_trace
num_traces_to_plot = min(available_traces, n_traces) if n_traces is not None else available_traces
if num_traces_to_plot == 0:
raise ValueError(f"Not enough points to form even one trace of {num_points_per_trace} points after truncation.")
X_truncated = np.kron(np.ones(num_traces_to_plot), np.linspace(-1, 1 - 1/sps, num_points_per_trace))
Y_truncated = Y_truncated[:num_traces_to_plot * num_points_per_trace]
# Get colormap
try:
cmap_obj = getattr(plt.cm, cmap)
except AttributeError:
print(f"Warning: Colormap '{cmap}' not found. Using 'viridis' by default.")
cmap_obj = plt.cm.viridis
# Extract plotting parameters with defaults
# Figure parameters
figsize = plot_kw.get('figsize', None)
dpi = plot_kw.get('dpi', 100)
# Axes formatting parameters
xlabel = plot_kw.get('xlabel', "Time (2-symbol segment)")
ylabel = plot_kw.get('ylabel', "Amplitude")
title_template = plot_kw.get('title', "Eye Diagram ({num_traces} traces)")
grid = plot_kw.get('grid', True)
grid_alpha = plot_kw.get('grid_alpha', 0.3)
xlim = plot_kw.get('xlim', None)
ylim = plot_kw.get('ylim', None)
tight_layout = plot_kw.get('tight_layout', True)
# Display parameters
show = plot_kw.get('show', False)
# Calculate ranges using truncated signals
min_x, max_x = X_truncated.min(), X_truncated.max()
min_y, max_y = Y_truncated.min(), Y_truncated.max()
# Normalize coordinates (handle edge cases)
X_norm = np.zeros_like(X_truncated) if max_x == min_x else (X_truncated - min_x) / (max_x - min_x)
Y_norm = np.zeros_like(Y_truncated) if max_y == min_y else (Y_truncated - min_y) / (max_y - min_y)
# Create density grid using truncated signals
from scipy.ndimage import gaussian_filter
grid_density, _, _ = np.histogram2d(X_truncated, Y_truncated, bins=N_grid_bins)
grid_density = gaussian_filter(grid_density, sigma=grid_sigma)
# Map points to grid indices
ix_grid = np.clip((X_norm * (N_grid_bins - 1)).astype(int), 0, N_grid_bins - 1)
iy_grid = np.clip((Y_norm * (N_grid_bins - 1)).astype(int), 0, N_grid_bins - 1)
# Get and normalize color values
color_values = grid_density[ix_grid, iy_grid]
color_range = color_values.max() - color_values.min()
color_values_norm = np.zeros_like(color_values) if color_range == 0 else (color_values - color_values.min()) / color_range
# Prepare data for plotting using truncated signals
x_eye_trace = X_truncated[:num_points_per_trace]
Y_reshaped = Y_truncated.reshape(num_traces_to_plot, num_points_per_trace)
color_reshaped = color_values_norm.reshape(num_traces_to_plot, num_points_per_trace)
# Create plot or use existing axes
if ax is None:
fig, ax = plt.subplots(figsize=figsize, dpi=dpi)
created_fig = True
else:
created_fig = False
# Plot eye traces
if style == 'dot':
s = plot_kw.get('s', 0.1)
alpha = plot_kw.get('alpha', 0.9)
plt.scatter(X_truncated, Y_truncated, c=color_values_norm, cmap=cmap_obj, s=s, alpha=alpha)
elif style == 'density':
extent = [min_x, max_x, min_y, max_y]
ax.imshow(grid_density.T, extent=extent, origin='lower', aspect='auto', cmap=cmap_obj)
elif style == 'line':
from matplotlib.collections import LineCollection
# Line collection parameters
alpha = plot_kw.get('alpha', 0.05)
linewidth = plot_kw.get('linewidth', 1)
capstyle = plot_kw.get('capstyle', 'round')
joinstyle = plot_kw.get('joinstyle', 'round')
for i in range(num_traces_to_plot):
# Create line segments
points = np.array([x_eye_trace, Y_reshaped[i]]).T.reshape(-1, 1, 2)
segments = np.concatenate([points[:-1], points[1:]], axis=1)
if len(segments) > 0:
colors = cmap_obj(color_reshaped[i][:len(segments)])
lc = LineCollection(segments, colors=colors, linewidth=linewidth,
alpha=alpha, capstyle=capstyle, joinstyle=joinstyle)
ax.add_collection(lc)
else:
raise ValueError(f"Invalid style '{style}'. Choose from 'line', 'dot', or 'density'.")
# Format plot
ax.set_xlim(xlim if xlim is not None else (-1,1))
ax.set_ylim(ylim if ylim is not None else (min_y, max_y))
ax.set_xlabel(xlabel)
ax.set_ylabel(ylabel)
# Format title with number of traces
title_formatted = title_template.format(num_traces=num_traces_to_plot)
ax.set_title(title_formatted)
if grid:
ax.grid(True, alpha=grid_alpha)
if created_fig and tight_layout:
plt.tight_layout()
if created_fig and show:
plt.show()
return ax
[docs]
def rcos_pulse(beta, span, sps, shape='sqrt'):
"""Generate a raised cosine or root raised cosine filter impulse response.
This function replicates the MATLAB `rcosdesign()` function, generating the impulse response
of a raised cosine or root raised cosine filter used in digital communications for
pulse shaping.
Parameters
----------
beta : float
Roll-off factor, must be between 0 and 1.
span : int
Number of symbols. The filter length will be span * sps + 1.
sps : int
Samples per symbol.
shape : str, optional
Filter shape, either 'normal' for raised cosine or 'sqrt' for root raised cosine.
Default is 'sqrt'.
Returns
-------
numpy.ndarray
The filter impulse response, normalized to unit energy.
Raises
------
ValueError
If beta is not in [0, 1] or shape is not 'normal' or 'sqrt'.
Notes
-----
The filter is normalized such that the sum of squares of the coefficients is 1.
For beta=0, it reduces to a `sinc()` function.
Examples
--------
>>> import numpy as np
>>> h = rcos(0.5, 6, 64, 'sqrt')
>>> h.shape
(384,)
"""
if not (0 <= beta <= 1):
raise ValueError("beta must be in [0, 1]")
if shape.lower() not in ('sqrt', 'normal'):
raise ValueError("shape must be 'sqrt' or 'normal'")
N = span * sps
t = np.linspace(-span/2, span/2, N+1)
if beta == 0:
p = np.sinc(t)
elif shape.lower() == 'normal':
sinc_t = np.sinc(t)
cos_term = np.cos(np.pi * beta * t)
den = 1 - (2 * beta * t)**2
p = np.divide(sinc_t * cos_term, den, out=np.zeros_like(den), where=den != 0)
# Handle the singularity at t = 1/(2*beta)
special_mask = np.abs(den) < 1e-8
if np.any(special_mask):
p[special_mask] = (np.pi / 4) * np.sinc(1 / (2 * beta))
else: # sqrt
t_abs = np.abs(t)
p = np.zeros_like(t)
# Special case at t=0
mask_zero = t_abs < 1e-8
p[mask_zero] = (1 - beta) + 4 * beta / np.pi
# Special case at t = 1/(4*beta)
special_t_sqrt = 1 / (4 * beta)
mask_special = np.abs(t_abs - special_t_sqrt) < 1e-8
if np.any(mask_special):
p[mask_special] = (beta / np.sqrt(2)) * (
(1 + 2 / np.pi) * np.sin(np.pi / (4 * beta)) +
(1 - 2 / np.pi) * np.cos(np.pi / (4 * beta))
)
# General case
mask_general = ~mask_zero & ~mask_special
if np.any(mask_general):
ti = t[mask_general]
num = np.sin(np.pi * ti * (1 - beta)) + 4 * beta * ti * np.cos(np.pi * ti * (1 + beta))
den = np.pi * ti * (1 - (4 * beta * ti)**2)
p[mask_general] = num / den
return p
[docs]
def gauss_pulse(span, sps, T=1, m=1, c=0.0):
"""Generate a Gaussian or Super-Gaussian filter.
This function generates the impulse response of a Gaussian filter used in digital
communications to reduce bandwidth and minimize intersymbol interference.
Parameters
----------
span : int
Number of symbols. The filter length will be span * sps + 1.
sps : int
Samples per symbol.
T : float
Full Width at Half Maximum (FWHM) of the pulse in symbols. Default is 1.
m : int
Super-Gaussian order. Default is 1 (standard Gaussian).
c : float
Chirp parameter. Default is 0.0 (no chirp).
Returns
-------
numpy.ndarray
The impulse response of the Gaussian filter, normalized to unit energy.
Notes
-----
The filter is normalized such that the sum of squares of the coefficients is 1.
The Gaussian shape helps reduce intersymbol interference in modulation systems.
Examples
--------
>>> import numpy as np
>>> h = gauss(6, 8, 1.0)
>>> print(h.shape)
(49,)
>>> print('Energy:', np.sum(h**2))
1.0
"""
N = span * sps
t = np.linspace(-span/2, span/2, N+1)
alpha = 2*np.sqrt(np.log(2)) / T
p = np.exp(- (alpha * (1 + 1j*c) * t)**(2*m))
return p
[docs]
def nrz_pulse(span, sps, T):
"""Generate a Non-Return-to-Zero (NRZ) pulse shape.
This function generates the impulse response of a Non-Return-to-Zero (NRZ) pulse shape, commonly used in digital communications for pulse shaping.
Parameters
----------
span : int
Number of symbols. The filter length will be span * sps + 1.
sps : int
Samples per symbol.
T : float
Duration of the NRZ pulse in symbols.
Returns
-------
numpy.ndarray
The impulse response of the NRZ pulse shape.
"""
N = span * sps
t = np.linspace(-span/2, span/2, N+1)
p = np.where((t >= -T/2) & (t < T/2), 1.0, 0.0)
return p
[docs]
def upfir(x, h, up=1):
"""Replicate MATLAB's upfirdn function for upsampling and FIR filtering.
This function performs upsampling of the input signal and then applies an FIR filter,
partially replicating the functionality of MATLAB's `upfirdn`.
Parameters
----------
x : array_like
Input signal to process.
h : array_like
Impulse response of the FIR filter.
up : int, optional
Upsampling factor. Default is 1 (no upsampling).
Returns
-------
numpy.ndarray
The filtered signal after upsampling, convolution and downsampling.
Notes
-----
Upsampling is performed by inserting zeros between the input signal samples.
Convolution is then applied with the filter using `mode='same'` to maintain length. Finally,
downsampling is performed by selecting every `dn`-th sample from the filtered signal.
"""
# Upsample
xu = np.zeros(len(x) * up)
xu[up//2::up] = x
# Filtrado
y = sg.fftconvolve(xu, h, mode='same')
return y
[docs]
def phase_estimator(t, x, f):
r"""
Estimates the phase and amplitude of a sinusoid with known frequency.
Parameters
----------
t : array_like
Times in seconds, shape (N,).
x : array_like
Sampled signal, shape (N,).
f : float
Frequency in Hz.
Returns
-------
phi : float
Estimated phase in radians.
amp : float
Estimated amplitude.
Notes
-----
The model is :math:`x[n] = A \cos(2\pi f t[n] + \phi) + noise`, solved by linear regression
over :math:`[\cos(\omega t), \sin(\omega t)]`.
"""
x = np.asarray(x).ravel()
t = np.asarray(t).ravel()
if t.shape != x.shape:
raise ValueError("t and x must have same shape")
w = 2 * np.pi * f
C = np.cos(w * t)
S = np.sin(w * t)
G = np.column_stack((C, S)) # diseño: x ≈ a*C + b*S
# IRLS con pérdida Huber: iteratively reweighted LS
theta = np.linalg.lstsq(G, x, rcond=None)[0]
max_iter = 50
huber_delta = 0.2
for _ in range(max_iter):
r = x - G.dot(theta)
absr = np.abs(r)
# pesos Huber
wght = np.ones_like(r)
mask = absr > huber_delta
wght[mask] = huber_delta / absr[mask]
W = np.sqrt(wght)
Gw = G * W[:, None]
xw = x * W
theta_new, _, _, _ = np.linalg.lstsq(Gw, xw, rcond=None)
if np.linalg.norm(theta_new - theta) < 1e-20:
theta = theta_new
break
theta = theta_new
a, b = float(theta[0]), float(theta[1])
# recordatorio: cos(ωt + φ) = cosφ*cosωt - sinφ*sinωt
# por tanto a = A cosφ ; b = -A sinφ
phi = np.arctan2(-b, a) # devuelve fase en (-pi,pi]
amp = np.hypot(a, b) # amplitud
return phi, amp
[docs]
def get_psd(signal, fs, nperseg=None):
"""
Calculate the Power Spectral Density (PSD) of a signal using Welch's method.
Parameters
----------
signal : Array_Like
Input signal.
fs : float
Sampling frequency in Hz.
nperseg : int, optional
Length of each segment for Welch's method. If None, defaults to 2048 or the length of the signal, whichever is smaller.
Returns
-------
f : np.ndarray
Frequency array.
psd : np.ndarray
Power Spectral Density.
"""
if hasattr(signal, 'signal'):
sig = signal.signal
elif _is_iterable_and_numpy_compatible(signal):
sig = np.array(signal)
else:
raise TypeError("signal must be array_like or have a .signal attribute")
nperseg = nperseg if nperseg is not None else min(2048, len(sig))
# fs is passed in GHz to get f in GHz
f, psd = sg.welch(sig, fs=fs, nperseg=nperseg, scaling='spectrum', return_onesided=False, detrend=False)
f, psd = fftshift(f), fftshift(psd, axes=-1)
return f, psd