Source code for pybert.pybert

#! /usr/bin/env python

"""
Bit error rate tester (BERT) simulator, written in Python.

Original Author: David Banas <capn.freako@gmail.com>

Original Date:   17 June 2014

Testing by: Mark Marlett <mark.marlett@gmail.com>

This Python script provides a GUI interface to a BERT simulator, which
can be used to explore the concepts of serial communication link design.

Copyright (c) 2014 by David Banas; All rights reserved World wide.
"""

from __future__ import absolute_import

from traits.trait_base import ETSConfig
ETSConfig.toolkit = 'qt4'
# ETSConfig.toolkit = 'wx'

import pickle

from datetime        import datetime
from threading       import Thread, Event
from time            import sleep

from numpy           import array, linspace, zeros, histogram, mean, diff, \
                            transpose, shape, exp, real, pad, pi, resize, cos, \
                            where, sqrt, convolve, sinc, log10, ones
from numpy.fft       import fft, ifft
from numpy.random    import randint
from scipy.signal    import lfilter, iirfilter
from scipy.optimize  import minimize, minimize_scalar

from pyface.api      import FileDialog, OK
from traits.api      import HasTraits, Array, Range, Float, Int, Property, \
                            String, cached_property, Instance, HTML, List, \
                            Bool, File, Button, Enum
from traitsui.api    import View, Item, Group
from traitsui.message import auto_close_message, error, message

from chaco.api       import Plot, ArrayPlotData, VPlotContainer, \
                            GridPlotContainer, ColorMapper, Legend, \
                            OverlayPlotContainer, PlotAxis
from chaco.tools.api import PanTool, ZoomTool, LegendTool, TraitsTool, DragZoom
from enable.component_editor import ComponentEditor

from pyibisami.amimodel  import AMIModel, AMIModelInitializer
from pyibisami.ami_parse import AMIParamConfigurator

from pybert.pybert_view     import traits_view
from pybert.pybert_cntrl    import my_run_simulation, update_results, update_eyes
from pybert.pybert_util     import calc_gamma, calc_G, trim_impulse, import_channel, \
                            make_ctle, lfsr_bits, safe_log10, pulse_center
from pybert.pybert_plot     import make_plots
from pybert.pybert_help     import help_str
from pybert.pybert_cfg      import PyBertCfg

import pybert

debug = True

gDebugStatus   = False
gDebugOptimize = False
gMaxCTLEPeak   = 20.      # max. allowed CTLE peaking (dB) (when optimizing, only)
gMaxCTLEFreq   = 20.      # max. allowed CTLE peak frequency (GHz) (when optimizing, only)

# Default model parameters - Modify these to customize the default simulation.
# - Simulation Control
gBitRate        = 10      # (Gbps)
gNbits          = 8000    # number of bits to run
gPatLen         = 127     # repeating bit pattern length
gNspb           = 32      # samples per bit
gNumAve         = 1       # Number of bit error samples to average, when sweeping.
# - Channel Control
#     - parameters for Howard Johnson's "Metallic Transmission Model"
#     - (See "High Speed Signal Propagation", Sec. 3.1.)
#     - ToDo: These are the values for 24 guage twisted copper pair; need to add other options.
gRdc            = 0.1876  # Ohms/m
gw0             = 10.e6   # 10 MHz is recommended in Ch. 8 of his second book, in which UTP is described in detail.
gR0             = 1.452   # skin-effect resistance (Ohms/m)
gTheta0         = .02     # loss tangent
gZ0             = 100.    # characteristic impedance in LC region (Ohms)
gv0             = 0.67    # relative propagation velocity (c)
gl_ch           = 1.0     # cable length (m)
gRn             = 0.001   # standard deviation of Gaussian random noise (V) (Applied at end of channel, so as to appear white to Rx.)
# - Tx
gVod            = 1.0     # output drive strength (Vp)
gRs             = 100     # differential source impedance (Ohms)
gCout           = 0.50    # parasitic output capacitance (pF) (Assumed to exist at both 'P' and 'N' nodes.)
gPnMag          = 0.001     # magnitude of periodic noise (V)
gPnFreq         = 0.437   # frequency of periodic noise (MHz)
# - Rx
gRin            = 100     # differential input resistance
gCin            = 0.50    # parasitic input capacitance (pF) (Assumed to exist at both 'P' and 'N' nodes.)
gCac            = 1.      # a.c. coupling capacitance (uF) (Assumed to exist at both 'P' and 'N' nodes.)
gBW             = 12.     # Rx signal path bandwidth, assuming no CTLE action. (GHz)
gUseDfe         = True    # Include DFE when running simulation.
gDfeIdeal       = True    # DFE ideal summing node selector
gPeakFreq       = 5.      # CTLE peaking frequency (GHz)
gPeakMag        = 10.     # CTLE peaking magnitude (dB)
gCTLEOffset     = 0.      # CTLE d.c. offset (dB)
# - DFE
gDecisionScaler = 0.5
gNtaps          = 5
gGain           = 0.5
gNave           = 100
gDfeBW          = 12.     # DFE summing node bandwidth (GHz)
# - CDR
gDeltaT         = 0.1     # (ps)
gAlpha          = 0.01
gNLockAve       = 500     # number of UI used to average CDR locked status.
gRelLockTol     = .1      # relative lock tolerance of CDR.
gLockSustain    = 500
# - Analysis
gThresh         = 6       # threshold for identifying periodic jitter spectral elements (sigma)


[docs]class StoppableThread(Thread): """ Thread class with a stop() method. The thread itself has to check regularly for the stopped() condition. All PyBERT thread classes are subclasses of this class. """ def __init__(self): super(StoppableThread, self).__init__() self._stop_event = Event()
[docs] def stop(self): 'Called by thread invoker, when thread should be stopped prematurely.' self._stop_event.set()
[docs] def stopped(self): 'Should be called by thread (i.e. - subclass) periodically and, if this function returns True, thread should clean itself up and quit ASAP.' return self._stop_event.is_set()
[docs]class TxOptThread(StoppableThread): 'Used to run Tx tap weight optimization in its own thread, in order to preserve GUI responsiveness.'
[docs] def run(self): 'Run the Tx equalization optimization thread.' pybert = self.pybert if(self.update_status): pybert.status = "Optimizing Tx..." max_iter = pybert.max_iter old_taps = [] min_vals = [] max_vals = [] for tuner in pybert.tx_tap_tuners: if tuner.enabled: old_taps.append(tuner.value) min_vals.append(tuner.min_val) max_vals.append(tuner.max_val) cons = ({ 'type': 'ineq', 'fun' : lambda x: 0.7 - sum(abs(x)) }) bounds = zip(min_vals, max_vals) try: if(gDebugOptimize): res = minimize( self.do_opt_tx, old_taps, bounds=bounds, constraints=cons, options={'disp' : True, 'maxiter' : max_iter } ) else: res = minimize( self.do_opt_tx, old_taps, bounds=bounds, constraints=cons, options={'disp' : False, 'maxiter' : max_iter } ) if(self.update_status): if(res['success']): pybert.status = "Optimization succeeded." else: pybert.status = "Optimization failed: {}".format(res['message']) except Exception as err: pybert.status = err.message
def do_opt_tx(self, taps): sleep(0.001) # Give the GUI a chance to acknowledge user clicking the Abort button. if(self.stopped()): raise RuntimeError("Optimization aborted.") pybert = self.pybert tuners = pybert.tx_tap_tuners taps = list(taps) for tuner in tuners: if(tuner.enabled): tuner.value = taps.pop(0) return pybert.cost
[docs]class RxOptThread(StoppableThread): 'Used to run Rx tap weight optimization in its own thread, in order to preserve GUI responsiveness.'
[docs] def run(self): 'Run the Rx equalization optimization thread.' pybert = self.pybert pybert.status = "Optimizing Rx..." max_iter = pybert.max_iter try: if(gDebugOptimize): res = minimize_scalar(self.do_opt_rx, bounds=(0, gMaxCTLEPeak), method='Bounded', options={'disp' : True, 'maxiter' : max_iter} ) else: res = minimize_scalar(self.do_opt_rx, bounds=(0, gMaxCTLEPeak), method='Bounded', options={'disp' : False, 'maxiter' : max_iter} ) if(res['success']): pybert.status = "Optimization succeeded." else: pybert.status = "Optimization failed: {}".format(res['message']) except Exception as err: pybert.status = err.message
def do_opt_rx(self, peak_mag): sleep(0.001) # Give the GUI a chance to acknowledge user clicking the Abort button. if(self.stopped()): raise RuntimeError("Optimization aborted.") pybert = self.pybert pybert.peak_mag_tune = peak_mag return pybert.cost
[docs]class CoOptThread(StoppableThread): 'Used to run co-optimization in its own thread, in order to preserve GUI responsiveness.'
[docs] def run(self): 'Run the Tx/Rx equalization co-optimization thread.' pybert = self.pybert pybert.status = "Co-optimizing..." max_iter = pybert.max_iter try: if(gDebugOptimize): res = minimize_scalar(self.do_coopt, bounds=(0, gMaxCTLEPeak), method='Bounded', options={'disp' : True, 'maxiter' : max_iter} ) else: res = minimize_scalar(self.do_coopt, bounds=(0, gMaxCTLEPeak), method='Bounded', options={'disp' : False, 'maxiter' : max_iter} ) if(res['success']): pybert.status = "Optimization succeeded." else: pybert.status = "Optimization failed: {}".format(res['message']) except Exception as err: pybert.status = err.message
def do_coopt(self, peak_mag): sleep(0.001) # Give the GUI a chance to acknowledge user clicking the Abort button. if(self.stopped()): raise RuntimeError("Optimization aborted.") pybert = self.pybert pybert.peak_mag_tune = peak_mag if(any([pybert.tx_tap_tuners[i].enabled for i in range(len(pybert.tx_tap_tuners))])): while(pybert.tx_opt_thread and pybert.tx_opt_thread.isAlive()): sleep(0.001) pybert._do_opt_tx(update_status=False) while(pybert.tx_opt_thread and pybert.tx_opt_thread.isAlive()): sleep(0.001) return pybert.cost
[docs]class TxTapTuner(HasTraits): 'Object used to populate the rows of the Tx FFE tap tuning table.' name = String('(noname)') enabled = Bool(False) min_val = Float(0.0) max_val = Float(0.0) value = Float(0.0) steps = Int(0) # Non-zero means we want to sweep it. def __init__(self, name='(noname)', enabled = False, min_val = 0.0, max_val = 0.0, value = 0.0, steps = 0, ): 'Allows user to define properties, at instantiation.' # Super-class initialization is ABSOLUTELY NECESSARY, in order # to get all the Traits/UI machinery setup correctly. super(TxTapTuner, self).__init__() self.name = name self.enabled = enabled self.min_val = min_val self.max_val = max_val self.value = value self.steps = steps
[docs]class PyBERT(HasTraits): """ A serial communication link bit error rate tester (BERT) simulator with a GUI interface. Useful for exploring the concepts of serial communication link design. """ # Independent variables # - Simulation Control bit_rate = Range(low=0.1, high=100.0, value=gBitRate) #: (Gbps) nbits = Range(low=1000, high=10000000, value=gNbits) #: Number of bits to simulate. pattern_len = Range(low=7, high=10000000, value=gPatLen) #: PRBS pattern length. nspb = Range(low=2, high=256, value=gNspb) #: Signal vector samples per bit. eye_bits = Int(gNbits // 5) #: # of bits used to form eye. (Default = last 20%) mod_type = List([0]) #: 0 = NRZ; 1 = Duo-binary; 2 = PAM-4 num_sweeps = Int(1) #: Number of sweeps to run. sweep_num = Int(1) sweep_aves = Int(gNumAve) do_sweep = Bool(False) #: Run sweeps? (Default = False) # - Channel Control use_ch_file = Bool(False) #: Import channel description from file? (Default = False) padded = Bool(False) #: Zero pad imported Touchstone data? (Default = False) windowed = Bool(False) #: Apply windowing to the Touchstone data? (Default = False) ch_file = File('', entries=5, filter=['*.s4p', '*.S4P', '*.csv', '*.CSV', '*.txt', '*.TXT', '*.*']) #: Channel file name. impulse_length = Float(0.0) #: Impulse response length. (Determined automatically, when 0.) Rdc = Float(gRdc) #: Channel d.c. resistance (Ohms/m). w0 = Float(gw0) #: Channel transition frequency (rads./s). R0 = Float(gR0) #: Channel skin effect resistance (Ohms/m). Theta0 = Float(gTheta0) #: Channel loss tangent (unitless). Z0 = Float(gZ0) #: Channel characteristic impedance, in LC region (Ohms). v0 = Float(gv0) #: Channel relative propagation velocity (c). l_ch = Float(gl_ch) #: Channel length (m). # - EQ Tune tx_tap_tuners = List( [TxTapTuner(name='Pre-tap', enabled=True, min_val=-0.2, max_val=0.2, value=0.0), TxTapTuner(name='Post-tap1', enabled=False, min_val=-0.4, max_val=0.4, value=0.0), TxTapTuner(name='Post-tap2', enabled=False, min_val=-0.3, max_val=0.3, value=0.0), TxTapTuner(name='Post-tap3', enabled=False, min_val=-0.2, max_val=0.2, value=0.0), ] ) #: EQ optimizer list of TxTapTuner objects. rx_bw_tune = Float(gBW) #: EQ optimizer CTLE bandwidth (GHz). peak_freq_tune = Float(gPeakFreq) #: EQ optimizer CTLE peaking freq. (GHz). peak_mag_tune = Float(gPeakMag) #: EQ optimizer CTLE peaking mag. (dB). ctle_offset_tune = Float(gCTLEOffset) #: EQ optimizer CTLE d.c. offset (dB). ctle_mode_tune = Enum('Off', 'Passive', 'AGC', 'Manual') #: EQ optimizer CTLE mode ('Off', 'Passive', 'AGC', 'Manual'). use_dfe_tune = Bool(gUseDfe) #: EQ optimizer DFE select (Bool). n_taps_tune = Int(gNtaps) #: EQ optimizer # DFE taps. max_iter = Int(50) #: EQ optimizer max. # of optimization iterations. tx_opt_thread = Instance(TxOptThread) #: Tx EQ optimization thread. rx_opt_thread = Instance(RxOptThread) #: Rx EQ optimization thread. coopt_thread = Instance(CoOptThread) #: EQ co-optimization thread. # - Tx vod = Float(gVod) #: Tx differential output voltage (V) rs = Float(gRs) #: Tx source impedance (Ohms) cout = Range(low=0.001, value=gCout) #: Tx parasitic output capacitance (pF) pn_mag = Float(gPnMag) #: Periodic noise magnitude (V). pn_freq = Float(gPnFreq) #: Periodic noise frequency (MHz). rn = Float(gRn) #: Standard deviation of Gaussian random noise (V). tx_taps = List([TxTapTuner(name='Pre-tap', enabled=True, min_val=-0.2, max_val=0.2, value=0.0), TxTapTuner(name='Post-tap1', enabled=False, min_val=-0.4, max_val=0.4, value=0.0), TxTapTuner(name='Post-tap2', enabled=False, min_val=-0.3, max_val=0.3, value=0.0), TxTapTuner(name='Post-tap3', enabled=False, min_val=-0.2, max_val=0.2, value=0.0), ]) #: List of TxTapTuner objects. rel_power = Float(1.0) #: Tx power dissipation (W). tx_use_ami = Bool(False) #: (Bool) tx_use_getwave = Bool(False) #: (Bool) tx_has_getwave = Bool(False) #: (Bool) tx_ami_file = File('', entries=5, filter=['*.ami']) #: (File) tx_ami_valid = Bool(False) #: (Bool) tx_dll_file = File('', entries=5, filter=['*.dll', '*.so']) #: (File) tx_dll_valid = Bool(False) #: (Bool) # - Rx rin = Float(gRin) #: Rx input impedance (Ohm) cin = Range(low=0.001, value=gCin) #: Rx parasitic input capacitance (pF) cac = Float(gCac) #: Rx a.c. coupling capacitance (uF) use_ctle_file = Bool(False) #: For importing CTLE impulse/step response directly. ctle_file = File('', entries=5, filter=['*.csv']) #: CTLE response file (when use_ctle_file = True). rx_bw = Float(gBW) #: CTLE bandwidth (GHz). peak_freq = Float(gPeakFreq) #: CTLE peaking frequency (GHz) peak_mag = Float(gPeakMag) #: CTLE peaking magnitude (dB) ctle_offset = Float(gCTLEOffset) #: CTLE d.c. offset (dB) ctle_mode = Enum('Off', 'Passive', 'AGC', 'Manual') #: CTLE mode ('Off', 'Passive', 'AGC', 'Manual'). rx_use_ami = Bool(False) #: (Bool) rx_use_getwave = Bool(False) #: (Bool) rx_has_getwave = Bool(False) #: (Bool) rx_ami_file = File('', entries=5, filter=['*.ami']) #: (File) rx_ami_valid = Bool(False) #: (Bool) rx_dll_file = File('', entries=5, filter=['*.dll', '*.so']) #: (File) rx_dll_valid = Bool(False) #: (Bool) # - DFE use_dfe = Bool(gUseDfe) #: True = use a DFE (Bool). sum_ideal = Bool(gDfeIdeal) #: True = use an ideal (i.e. - infinite bandwidth) summing node (Bool). decision_scaler = Float(gDecisionScaler) #: DFE slicer output voltage (V). gain = Float(gGain) #: DFE error gain (unitless). n_ave = Float(gNave) #: DFE # of averages to take, before making tap corrections. n_taps = Int(gNtaps) #: DFE # of taps. _old_n_taps = n_taps sum_bw = Float(gDfeBW) #: DFE summing node bandwidth (Used when sum_ideal=False.) (GHz). # - CDR delta_t = Float(gDeltaT) #: CDR proportional branch magnitude (ps). alpha = Float(gAlpha) #: CDR integral branch magnitude (unitless). n_lock_ave = Int(gNLockAve) #: CDR # of averages to take in determining lock. rel_lock_tol = Float(gRelLockTol) #: CDR relative tolerance to use in determining lock. lock_sustain = Int(gLockSustain) #: CDR hysteresis to use in determining lock. # - Analysis thresh = Int(gThresh) #: Threshold for identifying periodic jitter components (sigma). # Misc. cfg_file = File('', entries=5, filter=['*.pybert_cfg']) #: PyBERT configuration data storage file (File). data_file = File('', entries=5, filter=['*.pybert_data']) #: PyBERT results data storage file (File). # Plots (plot containers, actually) plotdata = ArrayPlotData() plots_h = Instance(GridPlotContainer) plots_s = Instance(GridPlotContainer) plots_p = Instance(GridPlotContainer) plots_H = Instance(GridPlotContainer) plots_dfe = Instance(GridPlotContainer) plots_eye = Instance(GridPlotContainer) plots_jitter_dist = Instance(GridPlotContainer) plots_jitter_spec = Instance(GridPlotContainer) plots_bathtub = Instance(GridPlotContainer) # Status status = String("Ready.") #: PyBERT status (String). jitter_perf = Float(0.) total_perf = Float(0.) sweep_results = List([]) len_h = Int(0) chnl_dly = Float(0.) #: Estimated channel delay (s). bit_errs = Int(0) #: # of bit errors observed in last run. run_count = Int(0) # Used as a mechanism to force bit stream regeneration. # About ident = String('PyBERT v{} - a serial communication link design tool, written in Python.\n\n \ David Banas\n \ December 22, 2017\n\n \ Copyright (c) 2014 David Banas;\n \ All rights reserved World wide.'.format(pybert.__version__)) # Help instructions = help_str # Console console_log = String("PyBERT Console Log\n\n") # Dependent variables # - Handled by the Traits/UI machinery. (Should only contain "low overhead" variables, which don't freeze the GUI noticeably.) # # - Note: Don't make properties, which have a high calculation overhead, dependencies of other properties! # This will slow the GUI down noticeably. jitter_info = Property(HTML, depends_on=['jitter_perf']) perf_info = Property(HTML, depends_on=['total_perf']) status_str = Property(String, depends_on=['status']) sweep_info = Property(HTML, depends_on=['sweep_results']) tx_h_tune = Property(Array, depends_on=['tx_tap_tuners.value', 'nspui']) ctle_h_tune = Property(Array, depends_on=['peak_freq_tune', 'peak_mag_tune', 'rx_bw_tune', 'w', 'len_h', 'ctle_mode_tune', 'ctle_offset_tune', 'use_dfe_tune', 'n_taps_tune']) ctle_out_h_tune = Property(Array, depends_on=['tx_h_tune', 'ctle_h_tune', 'chnl_h']) cost = Property(Float, depends_on=['ctle_out_h_tune', 'nspui']) rel_opt = Property(Float, depends_on=['cost']) t = Property(Array, depends_on=['ui', 'nspb', 'nbits']) t_ns = Property(Array, depends_on=['t']) f = Property(Array, depends_on=['t']) w = Property(Array, depends_on=['f']) bits = Property(Array, depends_on=['pattern_len', 'nbits', 'run_count']) symbols = Property(Array, depends_on=['bits', 'mod_type', 'vod']) ffe = Property(Array, depends_on=['tx_taps.value', 'tx_taps.enabled']) ui = Property(Float, depends_on=['bit_rate', 'mod_type']) nui = Property(Int, depends_on=['nbits', 'mod_type']) nspui = Property(Int, depends_on=['nspb', 'mod_type']) eye_uis = Property(Int, depends_on=['eye_bits', 'mod_type']) dfe_out_p = Array() przf_err = Property(Float, depends_on=['dfe_out_p']) # Custom buttons, which we'll use in particular tabs. # (Globally applicable buttons, such as "Run" and "Ok", are handled more simply, in the View.) btn_rst_eq = Button(label = 'ResetEq') btn_save_eq = Button(label = 'SaveEq') btn_opt_tx = Button(label = 'OptTx') btn_opt_rx = Button(label = 'OptRx') btn_coopt = Button(label = 'CoOpt') btn_abort = Button(label = 'Abort') btn_cfg_tx = Button(label = 'Configure') btn_cfg_rx = Button(label = 'Configure') btn_save_cfg = Button(label = 'Save Config.') btn_load_cfg = Button(label = 'Load Config.') # Logger def log(self, msg): self.console_log += "\n[{}]: {}\n".format(datetime.now(), msg.strip()) def handle_error(self, err): self.log(err.message) if(debug): message(err.message + "\nPlease, check terminal for more information.", 'PyBERT Alert') raise else: message(err.message, 'PyBERT Alert') # Default initialization def __init__(self, run_simulation = True): """ Initial plot setup occurs here. In order to populate the data structure we need to construct the plots, we must run the simulation. Args: run_simulation(Bool): If true, run the simulation, as part of class initialization. This is provided as an argument for the sake of larger applications, which may be importing PyBERT for its attributes and methods, and may not want to run the full simulation. (Optional; default = True) """ # Super-class initialization is ABSOLUTELY NECESSARY, in order # to get all the Traits/UI machinery setup correctly. super(PyBERT, self).__init__() self.log("Started.") if(run_simulation): # Running the simulation will fill in the required data structure. my_run_simulation(self, initial_run=True) # Once the required data structure is filled in, we can create the plots. make_plots(self, n_dfe_taps = gNtaps) else: self.calc_chnl_h() # Prevents missing attribute error in _get_ctle_out_h_tune(). # Button handlers def _btn_rst_eq_fired(self): for i in range(4): self.tx_tap_tuners[i].value = self.tx_taps[i].value self.tx_tap_tuners[i].enabled = self.tx_taps[i].enabled self.peak_freq_tune = self.peak_freq self.peak_mag_tune = self.peak_mag self.rx_bw_tune = self.rx_bw self.ctle_mode_tune = self.ctle_mode self.ctle_offset_tune = self.ctle_offset self.use_dfe_tune = self.use_dfe self.n_taps_tune = self.n_taps def _btn_save_eq_fired(self): for i in range(4): self.tx_taps[i].value = self.tx_tap_tuners[i].value self.tx_taps[i].enabled = self.tx_tap_tuners[i].enabled self.peak_freq = self.peak_freq_tune self.peak_mag = self.peak_mag_tune self.rx_bw = self.rx_bw_tune self.ctle_mode = self.ctle_mode_tune self.ctle_offset = self.ctle_offset_tune self.use_dfe = self.use_dfe_tune self.n_taps = self.n_taps_tune def _btn_opt_tx_fired(self): if self.tx_opt_thread and self.tx_opt_thread.isAlive() \ or not any([self.tx_tap_tuners[i].enabled for i in range(len(self.tx_tap_tuners))]): pass else: self._do_opt_tx() def _do_opt_tx(self, update_status=True): self.tx_opt_thread = TxOptThread() self.tx_opt_thread.pybert = self self.tx_opt_thread.update_status = update_status self.tx_opt_thread.start() def _btn_opt_rx_fired(self): if self.rx_opt_thread and self.rx_opt_thread.isAlive() or self.ctle_mode_tune == "Off": pass else: self.rx_opt_thread = RxOptThread() self.rx_opt_thread.pybert = self self.rx_opt_thread.start() def _btn_coopt_fired(self): if self.coopt_thread and self.coopt_thread.isAlive(): pass else: self.coopt_thread = CoOptThread() self.coopt_thread.pybert = self self.coopt_thread.start() def _btn_abort_fired(self): if self.coopt_thread and self.coopt_thread.isAlive(): self.coopt_thread.stop() self.coopt_thread.join(10) if self.tx_opt_thread and self.tx_opt_thread.isAlive(): self.tx_opt_thread.stop() self.tx_opt_thread.join(10) if self.rx_opt_thread and self.rx_opt_thread.isAlive(): self.rx_opt_thread.stop() self.rx_opt_thread.join(10) def _btn_cfg_tx_fired(self): self._tx_cfg() def _btn_cfg_rx_fired(self): self._rx_cfg() def _btn_save_cfg_fired(self): dlg = FileDialog(action='save as', wildcard='*.pybert_cfg', default_path=self.cfg_file) if dlg.open() == OK: the_PyBertCfg = PyBertCfg(self) try: with open(dlg.path, 'wt') as the_file: pickle.dump(the_PyBertCfg, the_file) self.cfg_file = dlg.path except Exception as err: err.message = "The following error occured:\n\t{}\nThe configuration was NOT saved.".format(err.message) self.handle_error(err) def _btn_load_cfg_fired(self): dlg = FileDialog(action='open', wildcard='*.pybert_cfg', default_path=self.cfg_file) if dlg.open() == OK: try: with open(dlg.path, 'rt') as the_file: the_PyBertCfg = pickle.load(the_file) if(type(the_PyBertCfg) is not PyBertCfg): raise Exception("The data structure read in is NOT of type: PyBertCfg!") for prop, value in vars(the_PyBertCfg).iteritems(): if(prop == 'tx_taps'): i = 0 for (enabled, val) in value: setattr(self.tx_taps[i], 'enabled', enabled) setattr(self.tx_taps[i], 'value', val) i += 1 else: setattr(self, prop, value) self.cfg_file = dlg.path except Exception as err: err.message = "The following error occured:\n\t{}\nThe configuration was NOT loaded.".format(err.message) self.handle_error(err) # Independent variable setting intercepts # (Primarily, for debugging.) def _set_ctle_peak_mag_tune(self, val): if(val > gMaxCTLEPeak or val < 0.): raise RunTimeException("CTLE peak magnitude out of range!") else: self.peak_mag_tune = val # Dependent variable definitions @cached_property def _get_t(self): """ Calculate the system time vector, in seconds. """ ui = self.ui nspui = self.nspui nui = self.nui t0 = ui / nspui npts = nui * nspui return array([i * t0 for i in range(npts)]) @cached_property def _get_t_ns(self): """ Calculate the system time vector, in ns. """ return self.t * 1.e9 @cached_property def _get_f(self): """ Calculate the frequency vector appropriate for indexing non-shifted FFT output, in Hz. # (i.e. - [0, f0, 2 * f0, ... , fN] + [-(fN - f0), -(fN - 2 * f0), ... , -f0] """ t = self.t npts = len(t) f0 = 1. / (t[1] * npts) half_npts = npts // 2 return array([i * f0 for i in range(half_npts + 1)] + [(half_npts - i) * -f0 for i in range(1, half_npts)]) @cached_property def _get_w(self): """ Calculate the frequency vector appropriate for indexing non-shifted FFT output, in rads./sec. """ return 2 * pi * self.f @cached_property def _get_bits(self): """ Generate the bit stream. """ pattern_len = self.pattern_len nbits = self.nbits mod_type = self.mod_type[0] bits = [] seed = randint(128) while(not seed): # We don't want to seed our LFSR with zero. seed = randint(128) bit_gen = lfsr_bits([7, 6], seed) for i in range(pattern_len - 4): bits.append(bit_gen.next()) # The 4-bit prequels, below, are to ensure that the first zero crossing # in the actual slicer input signal occurs. This is necessary, because # we assume it does, when aligning the ideal and actual signals for # jitter calculation. # # We may want to talk to Mike Steinberger, of SiSoft, about his # correlation based approach to this alignment chore. It's # probably more robust. if(mod_type == 1): # Duo-binary precodes, using XOR. return resize(array([0, 0, 1, 0] + bits), nbits) else: return resize(array([0, 0, 1, 1] + bits), nbits) @cached_property def _get_ui(self): """ Returns the "unit interval" (i.e. - the nominal time span of each symbol moving through the channel). """ mod_type = self.mod_type[0] bit_rate = self.bit_rate * 1.e9 ui = 1. / bit_rate if(mod_type == 2): # PAM-4 ui *= 2. return ui @cached_property def _get_nui(self): """ Returns the number of unit intervals in the test vectors. """ mod_type = self.mod_type[0] nbits = self.nbits nui = nbits if(mod_type == 2): # PAM-4 nui /= 2 return nui @cached_property def _get_nspui(self): """ Returns the number of samples per unit interval. """ mod_type = self.mod_type[0] nspb = self.nspb nspui = nspb if(mod_type == 2): # PAM-4 nspui *= 2 return nspui @cached_property def _get_eye_uis(self): """ Returns the number of unit intervals to use for eye construction. """ mod_type = self.mod_type[0] eye_bits = self.eye_bits eye_uis = eye_bits if(mod_type == 2): # PAM-4 eye_uis /= 2 return eye_uis @cached_property def _get_ideal_h(self): """ Returns the ideal link impulse response. """ ui = self.ui nspui = self.nspui t = self.t mod_type = self.mod_type[0] ideal_type = self.ideal_type[0] t = array(t) - t[-1] / 2. if(ideal_type == 0): # delta ideal_h = zeros(len(t)) ideal_h[len(t) / 2] = 1. elif(ideal_type == 1): # sinc ideal_h = sinc(t / (ui / 2.)) elif(ideal_type == 2): # raised cosine ideal_h = (cos(pi * t / (ui / 2.)) + 1.) / 2. ideal_h = where(t < -ui / 2., zeros(len(t)), ideal_h) ideal_h = where(t > ui / 2., zeros(len(t)), ideal_h) else: raise Exception("PyBERT._get_ideal_h(): ERROR: Unrecognized ideal impulse response type.") if(mod_type == 1): # Duo-binary relies upon the total link impulse response to perform the required addition. ideal_h = 0.5 * (ideal_h + pad(ideal_h[:-nspui], (nspui, 0), 'constant', constant_values=(0, 0))) return ideal_h @cached_property def _get_symbols(self): """ Generate the symbol stream. """ mod_type = self.mod_type[0] vod = self.vod bits = self.bits if (mod_type == 0): # NRZ symbols = 2 * bits - 1 elif(mod_type == 1): # Duo-binary symbols = [bits[0]] for bit in bits[1:]: # XOR pre-coding prevents infinite error propagation. symbols.append(bit ^ symbols[-1]) symbols = 2 * array(symbols) - 1 elif(mod_type == 2): # PAM-4 symbols = [] for bits in zip(bits[0::2], bits[1::2]): if(bits == (0,0)): symbols.append(-1.) elif(bits == (0,1)): symbols.append(-1./3.) elif(bits == (1,0)): symbols.append(1./3.) else: symbols.append(1.) else: raise Exception("ERROR: _get_symbols(): Unknown modulation type requested!") return array(symbols) * vod @cached_property def _get_ffe(self): """ Generate the Tx pre-emphasis FIR numerator. """ tap_tuners = self.tx_taps taps = [] for tuner in tap_tuners: if(tuner.enabled): taps.append(tuner.value) else: taps.append(0.0) taps.insert(1, 1.0 - sum(map(abs, taps))) # Assume one pre-tap. return taps @cached_property def _get_jitter_info(self): try: isi_chnl = self.isi_chnl * 1.e12 dcd_chnl = self.dcd_chnl * 1.e12 pj_chnl = self.pj_chnl * 1.e12 rj_chnl = self.rj_chnl * 1.e12 isi_tx = self.isi_tx * 1.e12 dcd_tx = self.dcd_tx * 1.e12 pj_tx = self.pj_tx * 1.e12 rj_tx = self.rj_tx * 1.e12 isi_ctle = self.isi_ctle * 1.e12 dcd_ctle = self.dcd_ctle * 1.e12 pj_ctle = self.pj_ctle * 1.e12 rj_ctle = self.rj_ctle * 1.e12 isi_dfe = self.isi_dfe * 1.e12 dcd_dfe = self.dcd_dfe * 1.e12 pj_dfe = self.pj_dfe * 1.e12 rj_dfe = self.rj_dfe * 1.e12 isi_rej_tx = 1.e20 dcd_rej_tx = 1.e20 pj_rej_tx = 1.e20 rj_rej_tx = 1.e20 isi_rej_ctle = 1.e20 dcd_rej_ctle = 1.e20 pj_rej_ctle = 1.e20 rj_rej_ctle = 1.e20 isi_rej_dfe = 1.e20 dcd_rej_dfe = 1.e20 pj_rej_dfe = 1.e20 rj_rej_dfe = 1.e20 isi_rej_total = 1.e20 dcd_rej_total = 1.e20 pj_rej_total = 1.e20 rj_rej_total = 1.e20 if(isi_tx): isi_rej_tx = isi_chnl / isi_tx if(dcd_tx): dcd_rej_tx = dcd_chnl / dcd_tx if(pj_tx): pj_rej_tx = pj_chnl / pj_tx if(rj_tx): rj_rej_tx = rj_chnl / rj_tx if(isi_ctle): isi_rej_ctle = isi_tx / isi_ctle if(dcd_ctle): dcd_rej_ctle = dcd_tx / dcd_ctle if(pj_ctle): pj_rej_ctle = pj_tx / pj_ctle if(rj_ctle): rj_rej_ctle = rj_tx / rj_ctle if(isi_dfe): isi_rej_dfe = isi_ctle / isi_dfe if(dcd_dfe): dcd_rej_dfe = dcd_ctle / dcd_dfe if(pj_dfe): pj_rej_dfe = pj_ctle / pj_dfe if(rj_dfe): rj_rej_dfe = rj_ctle / rj_dfe if(isi_dfe): isi_rej_total = isi_chnl / isi_dfe if(dcd_dfe): dcd_rej_total = dcd_chnl / dcd_dfe if(pj_dfe): pj_rej_total = pj_tx / pj_dfe if(rj_dfe): rj_rej_total = rj_tx / rj_dfe info_str = '<H1>Jitter Rejection by Equalization Component</H1>\n' info_str += '<H2>Tx Preemphasis</H2>\n' info_str += '<TABLE border="1">\n' info_str += '<TR align="center">\n' info_str += "<TH>Jitter Component</TH><TH>Input (ps)</TH><TH>Output (ps)</TH><TH>Rejection (dB)</TH>\n" info_str += "</TR>\n" info_str += '<TR align="right">\n' info_str += '<TD align="center">ISI</TD><TD>%6.3f</TD><TD>%6.3f</TD><TD>%4.1f</TD>\n' % \ (isi_chnl, isi_tx, 10. * safe_log10(isi_rej_tx)) info_str += "</TR>\n" info_str += '<TR align="right">\n' info_str += '<TD align="center">DCD</TD><TD>%6.3f</TD><TD>%6.3f</TD><TD>%4.1f</TD>\n' % \ (dcd_chnl, dcd_tx, 10. * safe_log10(dcd_rej_tx)) info_str += "</TR>\n" info_str += '<TR align="right">\n' info_str += '<TD align="center">Pj</TD><TD>%6.3f</TD><TD>%6.3f</TD><TD>n/a</TD>\n' % \ (pj_chnl, pj_tx) info_str += "</TR>\n" info_str += '<TR align="right">\n' info_str += '<TD align="center">Rj</TD><TD>%6.3f</TD><TD>%6.3f</TD><TD>n/a</TD>\n' % \ (rj_chnl, rj_tx) info_str += "</TR>\n" info_str += "</TABLE>\n" info_str += '<H2>CTLE</H2>\n' info_str += '<TABLE border="1">\n' info_str += '<TR align="center">\n' info_str += "<TH>Jitter Component</TH><TH>Input (ps)</TH><TH>Output (ps)</TH><TH>Rejection (dB)</TH>\n" info_str += "</TR>\n" info_str += '<TR align="right">\n' info_str += '<TD align="center">ISI</TD><TD>%6.3f</TD><TD>%6.3f</TD><TD>%4.1f</TD>\n' % \ (isi_tx, isi_ctle, 10. * safe_log10(isi_rej_ctle)) info_str += "</TR>\n" info_str += '<TR align="right">\n' info_str += '<TD align="center">DCD</TD><TD>%6.3f</TD><TD>%6.3f</TD><TD>%4.1f</TD>\n' % \ (dcd_tx, dcd_ctle, 10. * safe_log10(dcd_rej_ctle)) info_str += "</TR>\n" info_str += '<TR align="right">\n' info_str += '<TD align="center">Pj</TD><TD>%6.3f</TD><TD>%6.3f</TD><TD>%4.1f</TD>\n' % \ (pj_tx, pj_ctle, 10. * safe_log10(pj_rej_ctle)) info_str += "</TR>\n" info_str += '<TR align="right">\n' info_str += '<TD align="center">Rj</TD><TD>%6.3f</TD><TD>%6.3f</TD><TD>%4.1f</TD>\n' % \ (rj_tx, rj_ctle, 10. * safe_log10(rj_rej_ctle)) info_str += "</TR>\n" info_str += "</TABLE>\n" info_str += '<H2>DFE</H2>\n' info_str += '<TABLE border="1">\n' info_str += '<TR align="center">\n' info_str += "<TH>Jitter Component</TH><TH>Input (ps)</TH><TH>Output (ps)</TH><TH>Rejection (dB)</TH>\n" info_str += "</TR>\n" info_str += '<TR align="right">\n' info_str += '<TD align="center">ISI</TD><TD>%6.3f</TD><TD>%6.3f</TD><TD>%4.1f</TD>\n' % \ (isi_ctle, isi_dfe, 10. * safe_log10(isi_rej_dfe)) info_str += "</TR>\n" info_str += '<TR align="right">\n' info_str += '<TD align="center">DCD</TD><TD>%6.3f</TD><TD>%6.3f</TD><TD>%4.1f</TD>\n' % \ (dcd_ctle, dcd_dfe, 10. * safe_log10(dcd_rej_dfe)) info_str += "</TR>\n" info_str += '<TR align="right">\n' info_str += '<TD align="center">Pj</TD><TD>%6.3f</TD><TD>%6.3f</TD><TD>%4.1f</TD>\n' % \ (pj_ctle, pj_dfe, 10. * safe_log10(pj_rej_dfe)) info_str += "</TR>\n" info_str += '<TR align="right">\n' info_str += '<TD align="center">Rj</TD><TD>%6.3f</TD><TD>%6.3f</TD><TD>%4.1f</TD>\n' % \ (rj_ctle, rj_dfe, 10. * safe_log10(rj_rej_dfe)) info_str += "</TR>\n" info_str += "</TABLE>\n" info_str += '<H2>TOTAL</H2>\n' info_str += '<TABLE border="1">\n' info_str += '<TR align="center">\n' info_str += "<TH>Jitter Component</TH><TH>Input (ps)</TH><TH>Output (ps)</TH><TH>Rejection (dB)</TH>\n" info_str += "</TR>\n" info_str += '<TR align="right">\n' info_str += '<TD align="center">ISI</TD><TD>%6.3f</TD><TD>%6.3f</TD><TD>%4.1f</TD>\n' % \ (isi_chnl, isi_dfe, 10. * safe_log10(isi_rej_total)) info_str += "</TR>\n" info_str += '<TR align="right">\n' info_str += '<TD align="center">DCD</TD><TD>%6.3f</TD><TD>%6.3f</TD><TD>%4.1f</TD>\n' % \ (dcd_chnl, dcd_dfe, 10. * safe_log10(dcd_rej_total)) info_str += "</TR>\n" info_str += '<TR align="right">\n' info_str += '<TD align="center">Pj</TD><TD>%6.3f</TD><TD>%6.3f</TD><TD>%4.1f</TD>\n' % \ (pj_tx, pj_dfe, 10. * safe_log10(pj_rej_total)) info_str += "</TR>\n" info_str += '<TR align="right">\n' info_str += '<TD align="center">Rj</TD><TD>%6.3f</TD><TD>%6.3f</TD><TD>%4.1f</TD>\n' % \ (rj_tx, rj_dfe, 10. * safe_log10(rj_rej_total)) info_str += "</TR>\n" info_str += "</TABLE>\n" except: raise info_str = '<H1>Jitter Rejection by Equalization Component</H1>\n' info_str += "Sorry, an error occured.\n" return info_str @cached_property def _get_perf_info(self): info_str = '<H2>Performance by Component</H2>\n' info_str += ' <TABLE border="1">\n' info_str += ' <TR align="center">\n' info_str += ' <TH>Component</TH><TH>Performance (Msmpls./min.)</TH>\n' info_str += ' </TR>\n' info_str += ' <TR align="right">\n' info_str += ' <TD align="center">Channel</TD><TD>%6.3f</TD>\n' % (self.channel_perf * 60.e-6) info_str += ' </TR>\n' info_str += ' <TR align="right">\n' info_str += ' <TD align="center">Tx Preemphasis</TD><TD>%6.3f</TD>\n' % (self.tx_perf * 60.e-6) info_str += ' </TR>\n' info_str += ' <TR align="right">\n' info_str += ' <TD align="center">CTLE</TD><TD>%6.3f</TD>\n' % (self.ctle_perf * 60.e-6) info_str += ' </TR>\n' info_str += ' <TR align="right">\n' info_str += ' <TD align="center">DFE</TD><TD>%6.3f</TD>\n' % (self.dfe_perf * 60.e-6) info_str += ' </TR>\n' info_str += ' <TR align="right">\n' info_str += ' <TD align="center">Jitter Analysis</TD><TD>%6.3f</TD>\n' % (self.jitter_perf * 60.e-6) info_str += ' </TR>\n' info_str += ' <TR align="right">\n' info_str += ' <TD align="center"><strong>TOTAL</strong></TD><TD><strong>%6.3f</strong></TD>\n' % (self.total_perf * 60.e-6) info_str += ' </TR>\n' info_str += ' <TR align="right">\n' info_str += ' <TD align="center">Plotting</TD><TD>%6.3f</TD>\n' % (self.plotting_perf * 60.e-6) info_str += ' </TR>\n' info_str += ' </TABLE>\n' return info_str @cached_property def _get_sweep_info(self): sweep_results = self.sweep_results info_str = '<H2>Sweep Results</H2>\n' info_str += ' <TABLE border="1">\n' info_str += ' <TR align="center">\n' info_str += ' <TH>Pretap</TH><TH>Posttap</TH><TH>Mean(bit errors)</TH><TH>StdDev(bit errors)</TH>\n' info_str += ' </TR>\n' for item in sweep_results: info_str += ' <TR align="center">\n' info_str += ' <TD>%+06.3f</TD><TD>%+06.3f</TD><TD>%d</TD><TD>%d</TD>\n' % (item[0], item[1], item[2], item[3]) info_str += ' </TR>\n' info_str += ' </TABLE>\n' return info_str @cached_property def _get_status_str(self): status_str = "%-20s | Perf. (Ms/m): %4.1f" % (self.status, self.total_perf * 60.e-6) dly_str = " | ChnlDly (ns): %5.3f" % (self.chnl_dly * 1.e9) err_str = " | BitErrs: %d" % self.bit_errs pwr_str = " | TxPwr (W): %4.2f" % self.rel_power status_str += dly_str + err_str + pwr_str try: jit_str = " | Jitter (ps): ISI=%6.3f DCD=%6.3f Pj=%6.3f Rj=%6.3f" % \ (self.isi_dfe * 1.e12, self.dcd_dfe * 1.e12, self.pj_dfe * 1.e12, self.rj_dfe * 1.e12) except: jit_str = " | (Jitter not available.)" status_str += jit_str return status_str @cached_property def _get_tx_h_tune(self): nspui = self.nspui tap_tuners = self.tx_tap_tuners taps = [] for tuner in tap_tuners: if(tuner.enabled): taps.append(tuner.value) else: taps.append(0.0) taps.insert(1, 1.0 - sum(map(abs, taps))) # Assume one pre-tap. h = sum([[x] + list(zeros(nspui - 1)) for x in taps], []) return h @cached_property def _get_ctle_h_tune(self): w = self.w len_h = self.len_h rx_bw = self.rx_bw_tune * 1.e9 peak_freq = self.peak_freq_tune * 1.e9 peak_mag = self.peak_mag_tune offset = self.ctle_offset_tune mode = self.ctle_mode_tune w_dummy, H = make_ctle(rx_bw, peak_freq, peak_mag, w, mode, offset) h = real(ifft(H))[:len_h] h *= abs(H[0]) / sum(h) return h @cached_property def _get_ctle_out_h_tune(self): chnl_h = self.chnl_h tx_h = self.tx_h_tune ctle_h = self.ctle_h_tune tx_out_h = convolve(tx_h, chnl_h) h = convolve(ctle_h, tx_out_h) return h @cached_property def _get_cost(self): nspui = self.nspui h = self.ctle_out_h_tune mod_type = self.mod_type[0] s = h.cumsum() p = s - pad(s[:-nspui], (nspui,0), 'constant', constant_values=(0,0)) (clock_pos, thresh) = pulse_center(p, nspui) if(clock_pos == -1): return 1.0 # Returning a large cost lets it know it took a wrong turn. clocks = thresh * ones(len(p)) if(mod_type == 1): # Handle duo-binary. clock_pos -= nspui // 2 clocks[clock_pos] = 0. if(mod_type == 1): # Handle duo-binary. clocks[clock_pos + nspui] = 0. # Cost is simply ISI minus main lobe amplitude. # Note: post-cursor ISI is NOT included in cost, when we're using the DFE. isi = 0. ix = clock_pos - nspui while(ix >= 0): clocks[ix] = 0. isi += abs(p[ix]) ix -= nspui ix = clock_pos + nspui if(mod_type == 1): # Handle duo-binary. ix += nspui while(ix < len(p)): clocks[ix] = 0. if(not self.use_dfe_tune): isi += abs(p[ix]) ix += nspui if(self.use_dfe_tune): for i in range(self.n_taps_tune): if(clock_pos + nspui * (1 + i) < len(p)): p[clock_pos + nspui * (0.5 + i) :] -= p[clock_pos + nspui * (1 + i)] self.plotdata.set_data('ctle_out_h_tune', p) self.plotdata.set_data('clocks_tune', clocks) if(mod_type == 1): # Handle duo-binary. return isi - p[clock_pos] - p[clock_pos + nspui] + 2. * abs(p[clock_pos + nspui] - p[clock_pos]) else: return isi - p[clock_pos] @cached_property def _get_rel_opt(self): return -self.cost @cached_property def _get_przf_err(self): p = self.dfe_out_p nspui = self.nspui n_taps = self.n_taps (clock_pos, thresh) = pulse_center(p, nspui) err = 0 for i in range(n_taps): err += p[clock_pos + (i+1)*nspui] ** 2 return err / p[clock_pos]**2 # Changed property handlers. def _status_str_changed(self): if(gDebugStatus): print self.status_str def _use_dfe_changed(self, new_value): if(new_value == False): for i in range(1, 4): self.tx_taps[i].enabled = True else: for i in range(1, 4): self.tx_taps[i].enabled = False def _use_dfe_tune_changed(self, new_value): if(new_value == False): for i in range(1, 4): self.tx_tap_tuners[i].enabled = True else: for i in range(1, 4): self.tx_tap_tuners[i].enabled = False def _tx_ami_file_changed(self, new_value): try: self.tx_ami_valid = False with open(new_value) as pfile: pcfg = AMIParamConfigurator(pfile.read()) self.log("Parsing Tx AMI file, '{}'...\n{}".format(new_value, pcfg.ami_parsing_errors)) self.tx_has_getwave = pcfg.fetch_param_val(['Reserved_Parameters', 'GetWave_Exists']) self._tx_cfg = pcfg self.tx_ami_valid = True except Exception as err: err.message = 'Failed to open and/or parse AMI file!\n{}'.format(err.message) self.handle_error(err) def _tx_dll_file_changed(self, new_value): try: self.tx_dll_valid = False model = AMIModel(str(new_value)) self._tx_model = model self.tx_dll_valid = True except Exception as err: err.message = 'Failed to open DLL/SO file!\n{}'.format(err.message) self.handle_error(err) def _rx_ami_file_changed(self, new_value): try: self.rx_ami_valid = False with open(new_value) as pfile: pcfg = AMIParamConfigurator(pfile.read()) self.log("Parsing Rx AMI file, '{}'...\n{}".format(new_value, pcfg.ami_parsing_errors)) self.rx_has_getwave = pcfg.fetch_param_val(['Reserved_Parameters', 'GetWave_Exists']) self._rx_cfg = pcfg self.rx_ami_valid = True except Exception as err: err.message = 'Failed to open and/or parse AMI file!\n{}'.format(err.message) self.handle_error(err) def _rx_dll_file_changed(self, new_value): try: self.rx_dll_valid = False model = AMIModel(str(new_value)) self._rx_model = model self.rx_dll_valid = True except Exception as err: err.message = 'Failed to open DLL/SO file!\n{}'.format(err.message) self.handle_error(err) # This function has been pulled outside of the standard Traits/UI "depends_on / @cached_property" mechanism, # in order to more tightly control when it executes. I wasn't able to get truly lazy evaluation, and # this was causing noticeable GUI slowdown.
[docs] def calc_chnl_h(self): """ Calculates the channel impulse response. Also sets, in 'self': - chnl_dly: group delay of channel - start_ix: first element of trimmed response - t_ns_chnl: the x-values, in ns, for plotting 'chnl_h' - chnl_H: channel frequency response - chnl_s: channel step response - chnl_p: channel pulse response """ t = self.t ts = t[1] nspui = self.nspui impulse_length = self.impulse_length * 1.e-9 if(self.use_ch_file): chnl_h = import_channel(self.ch_file, ts, self.padded, self.windowed) if(chnl_h[-1] > (max(chnl_h) / 2.)): # step response? chnl_h = diff(chnl_h) # impulse response is derivative of step response. chnl_h /= sum(chnl_h) # Normalize d.c. to one. chnl_dly = t[where(chnl_h == max(chnl_h))[0][0]] chnl_h.resize(len(t)) chnl_H = fft(chnl_h) else: l_ch = self.l_ch v0 = self.v0 * 3.e8 R0 = self.R0 w0 = self.w0 Rdc = self.Rdc Z0 = self.Z0 Theta0 = self.Theta0 w = self.w Rs = self.rs Cs = self.cout * 1.e-12 RL = self.rin Cp = self.cin * 1.e-12 CL = self.cac * 1.e-6 chnl_dly = l_ch / v0 gamma, Zc = calc_gamma(R0, w0, Rdc, Z0, v0, Theta0, w) H = exp(-l_ch * gamma) chnl_H = 2. * calc_G(H, Rs, Cs, Zc, RL, Cp, CL, w) # Compensating for nominal /2 divider action. chnl_h = real(ifft(chnl_H)) min_len = 10 * nspui max_len = 100 * nspui if(impulse_length): min_len = max_len = impulse_length / ts chnl_h, start_ix = trim_impulse(chnl_h, min_len=min_len, max_len=max_len) chnl_h /= sum(chnl_h) # a temporary crutch. temp = chnl_h.copy() temp.resize(len(t)) chnl_trimmed_H = fft(temp) chnl_s = chnl_h.cumsum() chnl_p = chnl_s - pad(chnl_s[:-nspui], (nspui,0), 'constant', constant_values=(0,0)) self.chnl_h = chnl_h self.len_h = len(chnl_h) self.chnl_dly = chnl_dly self.chnl_H = chnl_H self.chnl_trimmed_H = chnl_trimmed_H self.start_ix = start_ix self.t_ns_chnl = array(t[start_ix : start_ix + len(chnl_h)]) * 1.e9 self.chnl_s = chnl_s self.chnl_p = chnl_p return chnl_h
# So that we can be used in stand-alone, or imported, fashion. def main(): PyBERT().configure_traits(view=traits_view) if __name__ == '__main__': main()