""" **Description**
A Finite Impulse Response (FIR) filter realizes a discrete difference
equation as a function of a forward coefficient array and a state array
of a specified order, consuming an incident signal and producing a
reference signal.
.. math::
y_{n} = \\sum_{i = 0}^{N} b_{i,n} x_{n-i} = \\sum_{i = 1}^{N} b_{i,n} s_{i,n} + b_{0,n} x_{n}
.. math::
s_{1,n+1} = x_{n}\\qquad\\quad s_{i,n+1} = s_{i-1,n}
A reset may minimize edge effects at a discontinuity by assuming
persistent operation at a specified incident signal condition.
.. math::
s_{i,n} = x_{n}
A frequency response is expressed as a function of a forward
coefficient array.
.. math::
H_{z,n} = \\sum_{i = 0}^{N} b_{i,n} z^{-i}
A forward coefficient array and state array of a specified order are
defined to realize specified constraints. A style, frequency,
order, count, complement, and gain are electively specified.
Alternatively, a forward coefficient array and state array may be explicitly
defined to ignore constraints.
Frequency corresponds to a -3 dB frequency response normalized relative
to Nyquist.
Style is in ( 'Blackman', 'Hamming', 'Hann', 'Kaiser' ).
* | 'Blackman' filters demonstrate low resolution and spectral leakage
| with improved rate of attenuation.
* | 'Hamming' filters demonstrate minimal nearest side lobe magnitude
| response.
* | 'Hann' filters demonstrate high resolution and spectral leakage.
* | 'Kaiser' filters demonstrate flexible resolution and spectral
| leakage dependent upon a beta value of a Bessel function of the
| first kind, with beta equal to 7.0.
Order must even to ensure a Type I form linear phase solution.
Count is a quantity of filters of a specified order concatenated to
form an aggregate frequency response in cascade form.
Complement effectively constructs a mirror image of a specified
frequency response.
**Example**
.. code-block:: python
from diamondback import FirFilter
import numpy
# Create an instance with constraints.
obj = FirFilter( style = 'Kaiser', frequency = 0.1, order = 32, count = 1 )
# Create an instance with coefficients.
obj = FirFilter( b = obj.b )
# Estimate frequency response, group delay, and roots.
y, f = obj.response( length = 8192, count = 1 )
y, f = obj.delay( length = 8192, count = 1 )
p, z = obj.roots( count = 1 )
# Filter an incident signal.
x = numpy.random.rand( 128 ) * 2.0 - 1.0
obj.reset( x[ 0 ] )
y = obj.filter( x )
**License**
`BSD-3C. <https://github.com/larryturner/diamondback/blob/master/license>`_
© 2018 - 2024 Larry Turner, Schneider Electric Industries SAS. All rights reserved.
**Author**
Larry Turner, Schneider Electric, AI Hub, 2018-01-23.
"""
from typing import Tuple, Union
import math
import numpy
import scipy.signal
import warnings
[docs]
class FirFilter( object ) :
""" Finite Impulse Response ( FIR ) filter.
"""
STYLE = ( 'Blackman', 'Hamming', 'Hann', 'Kaiser' )
@property
def b( self ) :
return self._b
@b.setter
def b( self, b : Union[ list, numpy.ndarray ] ) :
self._b = b
@property
def s( self ) :
return self._s
@s.setter
def s( self, s : Union[ list, numpy.ndarray ] ) :
self._s = s
def __init__( self, style : str = '', frequency : float = 0.0, order : int = 0, count : int = 1, complement : bool = False, gain : float = 1.0,
b : Union[ list, numpy.ndarray ] = [ ], s : Union[ list, numpy.ndarray ] = [ ] ) -> None :
""" Initialize.
Specify constraints including style, frequency, and order.
Alternatively, a forward coefficient array may be explicitly defined
to ignore constraints.
Labels should be used to avoid ambiguity between constraints and
coefficients.
Arguments :
style : str - in ( 'Blackman', 'Hamming', 'Hann', 'Kaiser' ).
frequency : float - frequency normalized to Nyquist in ( 0.0, 1.0 ).
order : int - order per instance.
count : int - instances per cascade.
complement : bool - complement response.
gain : float - gain.
b : Union[ list, numpy.ndarray ] - forward coefficient.
s : Union[ list, numpy.ndarray ] - state.
"""
if ( not len( b ) ) :
style = style.title( )
if ( style not in FirFilter.STYLE ) :
raise ValueError( f'style = {style} Expected Style in {FirFilter.STYLE}' )
if ( ( frequency <= 0.0 ) or ( frequency >= 1.0 ) ) :
raise ValueError( f'Frequency = {frequency} Expected Frequency in ( 0.0, 1.0 )' )
if ( order < 0 ) :
raise ValueError( f'Order = {order} Expected Order in [ 0, inf )' )
if ( count <= 0 ) :
raise ValueError( f'Count = {count} Expected Count in ( 0, inf )' )
if ( complement ) :
frequency = 1.0 - frequency
if ( style == 'Kaiser' ) :
window = ( style.lower( ), 7.0 )
else :
window = style.lower( ) # type: ignore
beta, eps, error = 10.0, float( numpy.finfo( float ).eps ), numpy.inf
index, rate, scale = 500 * ( 1 + ( count > 2 ) ), 2.5e-2, 1.0
for _ in range( 0, index ) :
with warnings.catch_warnings( ) :
warnings.simplefilter( 'ignore' )
v = scipy.signal.firwin( order + 1, scale * frequency, None, window, True, True, 1.0 )
if ( numpy.isnan( v ).any( ) ) :
raise ValueError( f'V = {v}' )
x = numpy.exp( 1j * math.pi * frequency )
e = ( 2.0 ** ( -0.5 ) ) - ( abs( numpy.polyval( v, x ) ) ** count )
if ( abs( e ) < error ) :
b, error = v, abs( e )
if ( error < ( 10.0 * eps ) ) :
break
scale = numpy.maximum( scale + rate * math.tanh( beta * e ), eps )
if ( complement ) :
b *= numpy.array( [ ( ( -1.0 ) ** x ) for x in range( 0, len( b ) ) ] )
b /= sum( b * numpy.array( [ ( ( -1.0 ) ** x ) for x in range( 0, len( b ) ) ] ) )
b *= gain # type: ignore
if ( not isinstance( b, numpy.ndarray ) ) :
b = numpy.array( list( b ) )
if ( not len( b ) ) :
raise ValueError( f'B = {b}' )
if ( not isinstance( s, numpy.ndarray ) ) :
s = numpy.array( list( s ) )
if ( len( b ) < len( s ) ) :
b = numpy.concatenate( ( b, numpy.zeros( len( s ) - len( b ) ) ) )
if ( len( s ) < len( b ) ) :
s = numpy.concatenate( ( s, numpy.zeros( len( b ) - len( s ) ) ) )
super( ).__init__( )
self._b, self._s = numpy.array( b ), numpy.array( s, type( b[ 0 ] ) )
[docs]
def delay( self, length : int = 8192, count : int = 1 ) -> Tuple[ numpy.ndarray, numpy.ndarray ] :
""" Estimates group delay and produces a reference signal.
Arguments :
length : int.
count : int.
Returns :
y : numpy.ndarray - reference signal.
f : numpy.ndarray - frequency normalized to Nyquist in [ -1.0, 1.0 ).
"""
if ( length <= 0 ) :
raise ValueError( f'Length = {length} Expected Length in ( 0, inf )' )
if ( count <= 0 ) :
raise ValueError( f'Count = {count} Expected Count in ( 0, inf )' )
with warnings.catch_warnings( ) :
warnings.simplefilter( 'ignore' )
y, f = scipy.signal.group_delay( ( self.b, [ 1.0 ] ), length, True )[ 1 ], numpy.linspace( -1.0, 1.0 - 2.0 / length, length )
y = numpy.concatenate( ( y[ len( y ) // 2 : ], y[ : len( y ) // 2 ] ) ) * count
if ( length > 2 ) :
y[ 0 ] = y[ 1 ] * 2.0 - y[ 2 ]
return y, f
[docs]
def filter( self, x : Union[ list, numpy.ndarray ] ) -> numpy.ndarray :
""" Filters an incident signal and produces a reference signal.
Arguments :
x : Union[ list, numpy.ndarray ] - incident signal.
Returns :
y : numpy.ndarray - reference signal.
"""
if ( not isinstance( x, numpy.ndarray ) ) :
x = numpy.array( list( x ) )
if ( not len( x ) ) :
raise ValueError( f'X = {x}' )
y = numpy.zeros( len( x ), type( self.b[ 0 ] ) )
for ii in range( 0, len( x ) ) :
self.s[ 0 ] = x[ ii ]
y[ ii ] = self.b.dot( self.s )
if ( len( self.s ) > 1 ) :
self.s[ 1 : ] = self.s[ : -1 ]
return y
[docs]
def reset( self, x : Union[ complex, float ] ) -> None :
""" Modifies a state to minimize edge effects by assuming persistent
operation at a specified incident signal condition.
Arguments :
x : Union[ complex, float ] - incident signal.
"""
if ( not numpy.isscalar( x ) ) :
raise ValueError( f'X = {x}' )
self.s.fill( x )
[docs]
def response( self, length = 8192, count = 1 ) -> Tuple[ numpy.ndarray, numpy.ndarray ] :
""" Estimates frequency response and produces a reference signal.
Arguments :
length : int.
count : int.
Returns :
y : numpy.ndarray - reference signal.
f : numpy.ndarray - frequency normalized to Nyquist in [ -1.0, 1.0 ).
"""
if ( length <= 0 ) :
raise ValueError( f'Length = {length} Expected Length in ( 0, inf )' )
if ( count <= 0 ) :
raise ValueError( f'Count = {count} Expected Count in ( 0, inf )' )
y, f = scipy.signal.freqz( self.b, [ 1.0, 0.0 ], length, True )[ 1 ], numpy.linspace( -1.0, 1.0 - 2.0 / length, length )
y = numpy.concatenate( ( y[ len( y ) // 2 : ], y[ : len( y ) // 2 ] ) ) ** count
return y, f
[docs]
def roots( self, count = 1 ) -> Tuple[ numpy.ndarray, numpy.ndarray ] :
""" Estimates roots of a frequency response in poles and zeros.
Arguments :
count : int.
Returns :
p : numpy.ndarray - poles.
z : numpy.ndarray - zeros.
"""
z = numpy.tile( numpy.roots( self.b ), count )
return numpy.zeros( count * ( len( self.b ) - 1 ) ), z[ numpy.argsort( abs( z ) ) ]