Source code for diamondback.filters.PolyphaseRateFilter

""" **Description**
        A polyphase rate filter produces a reference signal which approximates
        an incident signal evaluated at an effective frequency equal to the
        product of an incident sample frequency and a specified rate.

        A polyphase rate filter synthesizes a polyphase filter bank with
        comprised of a sequence of low pass filters.  Each low pass filter in a
        filter bank realizes a common frequency response, with a fractional
        sample difference in group delay.  A stride is defined to be dependent
        upon a specified rate ratio and count.  An incident signal is filtered
        to reduce aliasing and decimated or interpolated to produce a reference
        signal, effectively modifying the sampling rate by a specified rate
        ratio.

        .. math::

            b_{k,i} = b^{M}[\\ k (\\ N\\ +\\ 1\\ )\\ +\\ i\\ ] \\qquad\\qquad k\\ :\\ [\\ 0,\\ K\\sim 256\\ )\\qquad\\ i\\ :\\ [\\ 0,\\ N\\sim 15 \\ ]

        .. math::

            y_{n} = \\sum_{i = 0}^{N} b_{k_{n},i}\\ x_{n-i} = \\sum_{i = 1}^{N} b_{k_{n},i}\\ s_{i,n} + b_{k_{n},0}\\ x_{n}

        .. math::

            s_{1,n+1} = x_{n}\\quad\\quad s_{i,n+1} = s_{i-1,n}

        A specified rate must be greater than zero, and less than or equal to
        the quantity of filters comprising a polyphase filter bank, supporting
        decimation and interpolation.

        Phase dither is present for a real rate, though error is accumulated to
        ensure that the specified rate is realized without bias.  Group delay
        may be addressed by latency compensation.

        .. math::

            \\phi_{n+1,Rate} = \\phi_{n,Rate}\\ +\\ \\frac{K}{\\scriptsize{Rate}}

        .. math::

            \\phi_{n+1,Rate}\\ \\geq\\ K\\qquad\\longrightarrow\\qquad \\phi_{n+1,Rate} = \\phi_{n+1,Rate}\\ -\\ K

        .. math::

            k_{n+1} = \\mod(\\ \\lfloor{\\ k_{n}\\ +\\ \\phi_{n+1,Rate}}\\rfloor,\\ M\\ )

        A reset may minimize edge effects at a discontinuity by assuming
        persistent operation at a specified incident signal condition.  Edge
        extension may also be applied to an incident signal.

        A polyphase rate filter may be the most appropriate option in
        applications which require fractional decimation and interpolation and
        are not highly sensitive to minimization of edge effects or latency due
        to continuous operation.

    **Example**

        .. code-block:: python

            from diamondback import ComplexExponentialFilter, PolyphaseRateFilter
            import math
            import numpy

            # Create an instance.

            obj = PolyphaseRateFilter( rate = 1.0 / math.pi )

            # Filter an incident signal.

            x = ComplexExponentialFilter( 0.0 ).filter( numpy.ones( 128 ) * 0.1 ).real
            obj.reset( x[ 0 ] )
            y = obj.filter( x )

    **License**
        `BSD-3C.  <https://github.com/larryturner/diamondback/blob/master/license>`_
        © 2018 - 2024 Larry Turner, Schneider Electric Industries SAS. All rights reserved.

    **Author**
        Larry Turner, Schneider Electric, AI Hub, 2018-03-19.
"""

from diamondback.filters.FirFilter import FirFilter
from typing import Union
import numpy

[docs] class PolyphaseRateFilter( object ) : """ Polyphase rate filter. """ B = numpy.zeros( ( 256, 15 ) ) @property def b( self ) : return PolyphaseRateFilter.B @property def rate( self ) : return self._rate @rate.setter def rate( self, rate : float ) : if ( ( rate < 0.0 ) or ( rate > PolyphaseRateFilter.B.shape[ 0 ] ) ) : raise ValueError( f'Rate = {rate} Expected Rate in [ 0.0, {PolyphaseRateFilter.B.shape[ 0 ]} ]' ) if ( not numpy.isclose( self.rate, rate ) ) : self._index = 0.0 self._rate = rate @property def s( self ) : return self._s @s.setter def s( self, s : Union[ list, numpy.ndarray ] ) : self._s = s def __init__( self, rate : float ) -> None : """ Initialize. Arguments : rate : float - ratio of effective frequency in ( 0.0, b.shape[ 0 ] ]. """ if ( ( rate < 0.0 ) or ( rate > PolyphaseRateFilter.B.shape[ 0 ] ) ) : raise ValueError( f'Rate = {rate} Expected Rate in [ 0.0, {PolyphaseRateFilter.B.shape[ 0 ]} ]' ) super( ).__init__( ) b = PolyphaseRateFilter.B rr, cc = b.shape if ( not b.any( ) ) : firfilter = FirFilter( style = 'Hann', frequency = 0.85 / rr, order = cc * rr - 1 ) b = numpy.reshape( firfilter.b, ( rr, cc ), 'F' ) for ii in range( 0, rr ) : b[ ii, : ] /= sum( b[ ii, : ] ) PolyphaseRateFilter.B = b self._index = 0 self._rate = rate self._s = numpy.zeros( cc )
[docs] def filter( self, x : Union[ list, numpy.ndarray ] ) -> numpy.ndarray : """ Filters an incident signal and produces a reference signal. Arguments : x : Union[ list, numpy.ndarray ] - incident signal. Returns : y : numpy.ndarray - reference signal. """ if ( not isinstance( x, numpy.ndarray ) ) : x = numpy.array( list( x ) ) if ( not len( x ) ) : raise ValueError( f'X = {x}' ) y = numpy.zeros( int( numpy.round( len( x ) * self.rate ) ) ) b = PolyphaseRateFilter.B rr = b.shape[ 0 ] ii, jj = 0, 0 while ( ( ii < len( x ) ) and ( jj < len( y ) ) ) : if ( self._index < rr ) : kk = min( int( numpy.round( self._index ) ), rr - 1 ) self._index += rr / self.rate self.s[ 0 ] = x[ ii ] y[ jj ] = b[ kk, : ].dot( self.s ) jj += 1 while ( ( ii < len( x ) ) and ( self._index >= rr ) ) : self.s[ 0 ] = x[ ii ] self.s[ 1 : ] = self.s[ : -1 ] self._index -= rr ii += 1 return y[ : min( jj, len( y ) ) ]
[docs] def reset( self, x : float ) -> None : """ Modifies a state to minimize edge effects by assuming persistent operation at a specified incident signal condition. Arguments : x : float - incident signal. """ if ( not numpy.isscalar( x ) ) : raise ValueError( f'X = {x}' ) if ( len( self.s ) > 1 ) : self.s.fill( x )