""" **Description**
A complex frequency filter adapts and discriminates the phase of a forward
complex coefficient to produce a reference signal, which estimates a normalized
frequency of a primary signal which is normalized to unity magnitude. A
normalized frequency and rate of adaptation are specified.
.. math::
f_{n} = \\frac{\\tan^{-1}(\\ b_{n}\\ ) }{\\pi}
.. math::
x_{n} = \\frac{d_{n}}{|\\ d_{n}\\ |}
.. math::
y_{n} = b_{n} x_{n-1}
.. math::
e_{n} = d_{n} - y_{n}
.. math::
b_{0} = e^{\\ j\\ \\pi\\ f_{0}}
.. math::
b_{n+1} = b_{n} + \\mu e_{n} x_{n}^{*}
**Example**
.. code-block:: python
from diamondback import ComplexExponentialFilter
import numpy
x = numpy.linspace( 0.0, 0.1, 128 )
# Create a primary signal.
d = ComplexExponentialFilter( 0.0 ).filter( x )
# Create an instance.
obj = ComplexFrequencyFilter( frequency = 0.0, rate = 0.1 )
# Filter a primary signal.
obj.reset( d[ 0 ] )
y, e, b = obj.filter( d )
**License**
`BSD-3C. <https://github.com/larryturner/diamondback/blob/master/license>`_
© 2018 - 2024 Larry Turner, Schneider Electric Industries SAS. All rights reserved.
**Author**
Larry Turner, Schneider Electric, AI Hub, 2018-02-01.
"""
from diamondback.filters.FirFilter import FirFilter
from typing import Tuple, Union
import math
import numpy
import scipy
[docs]
class ComplexFrequencyFilter( FirFilter ) :
""" Complex frequency filter.
"""
@property
def frequency( self ) :
return self._frequency
@frequency.setter
def frequency( self, frequency : float ) :
if ( ( frequency < -1.0 ) or ( frequency > 1.0 ) ) :
raise ValueError( f'Frequency = {frequency} Expected Frequency in [ -1.0, 1.0 ]' )
self.b[ 0 ] = numpy.exp( 1j * math.pi * frequency )
self._frequency = frequency
@property
def rate( self ) :
return self._rate
@rate.setter
def rate( self, rate : float ) :
if ( ( rate < 0.0 ) or ( rate > 1.0 ) ) :
raise ValueError( f'Rate = {rate} Expected Rate in [ 0.0, 1.0 ]' )
self._rate = rate
def __init__( self, frequency : float, rate : float ) -> None :
""" Initialize.
Arguments :
frequency : float - frequency normalized to Nyquist in [ -1.0, 1.0 ).
rate : float - in [ 0.0, 1.0 ].
"""
if ( ( frequency < -1.0 ) or ( frequency > 1.0 ) ) :
raise ValueError( f'Frequency = {frequency} Expected Frequency in [ -1.0, 1.0 ]' )
if ( ( rate < 0.0 ) or ( rate > 1.0 ) ) :
raise ValueError( f'Rate = {rate} Expected Rate in [ 0.0, 1.0 ]' )
super( ).__init__( b = numpy.ones( 1, complex ), s = numpy.ones( 1, complex ) )
self._frequency = frequency
self._rate = rate
[docs]
def filter( self, d : Union[ list, numpy.ndarray ] ) -> Tuple[ numpy.ndarray, numpy.ndarray, numpy.ndarray ] : # type: ignore
""" Filters a primary signal and produces a reference signal.
Signals are Hilbert transformed to complex as necessary.
Arguments :
d : Union[ list, numpy.ndarray ] - primary signal.
Returns :
y : numpy.ndarray - reference signal.
e : numpy.ndarray - error signal.
b : numpy.ndarray - forward coefficient.
"""
if ( not isinstance( d, numpy.ndarray ) ) :
d = numpy.array( list( d ) )
if ( not len( d ) ) :
raise ValueError( f'D = {d}' )
if ( not numpy.iscomplex( d ).any( ) ) :
d = scipy.signal.hilbert( d )
x = abs( d ) # type: ignore
x[ numpy.isclose( x, 0.0 ) ] = 1.0
x = d / x
y, e, b = numpy.zeros( len( x ) ), numpy.zeros( len( x ), complex ), numpy.zeros( len( x ), complex )
for ii in range( 0, len( x ) ) :
y[ ii ] = numpy.angle( self.b[ 0 ] ) / math.pi
e[ ii ] = x[ ii ] - self.b[ 0 ] * self.s[ 0 ]
b[ ii ] = self.b[ 0 ]
self.b[ 0 ] += self.rate * e[ ii ] * numpy.conjugate( self.s[ 0 ] )
self.s[ 0 ] = x[ ii ]
return y, e, b
[docs]
def reset( self, x : complex ) -> None :
""" Modifies a state to minimize edge effects by assuming persistent
operation at a specified primary incident condition.
Arguments :
x : complex - incident signal.
"""
if ( not numpy.isscalar( x ) ) :
raise ValueError( f'X = {x}' )
if ( numpy.isclose( x, 0.0 ) ) :
self.s[ 0 ] = 1.0
else :
self.s[ 0 ] = x / abs( x ) # type: ignore