Source code for pybert.pybert_cntrl

"""
Default controller definition for PyBERT class.

Original author: David Banas <capn.freako@gmail.com>  

Original date:   August 24, 2014 (Copied from pybert.py, as part of a major code cleanup.)

Copyright (c) 2014 David Banas; all rights reserved World wide.
"""

import sys, traceback

from time         import clock, sleep

from chaco.api       import Plot
from chaco.tools.api import PanTool, ZoomTool

from numpy        import sign, sin, pi, array, linspace, zeros, ones, repeat
from numpy        import where, sqrt, histogram, arange, append, square, shape
from numpy        import diff, log10, correlate, convolve, mean, resize, real
from numpy        import transpose, cumsum, diff, std, pad, concatenate
from numpy.random import normal
from numpy.fft    import fft, ifft
from scipy        import signal
from scipy.signal import lfilter, iirfilter, freqz, fftconvolve

from dfe          import DFE
from cdr          import CDR

from pyibisami.amimodel import AMIModel, AMIModelInitializer
from pybert_util  import find_crossings, make_ctle, calc_jitter, moving_average, calc_eye, import_channel

DEBUG           = False
MIN_BATHTUB_VAL = 1.e-18

gFc     = 1.e6     # Corner frequency of high-pass filter used to model capacitive coupling of periodic noise.

[docs]def my_run_sweeps(self): """ Runs the simulation sweeps. Args: self(PyBERT): Reference to an instance of the *PyBERT* class. """ sweep_aves = self.sweep_aves do_sweep = self.do_sweep tx_taps = self.tx_taps if(do_sweep): # Assemble the list of desired values for each sweepable parameter. sweep_vals = [] for tap in tx_taps: if(tap.enabled): if(tap.steps): sweep_vals.append(list(arange(tap.min_val, tap.max_val, (tap.max_val - tap.min_val) / tap.steps))) else: sweep_vals.append([tap.value]) else: sweep_vals.append([0.]) # Run the sweep, using the lists assembled, above. sweeps = [[w, x, y, z] for w in sweep_vals[0] for x in sweep_vals[1] for y in sweep_vals[2] for z in sweep_vals[3] ] num_sweeps = sweep_aves * len(sweeps) self.num_sweeps = num_sweeps sweep_results = [] sweep_num = 1 for sweep in sweeps: for i in range(4): self.tx_taps[i].value = sweep[i] bit_errs = [] for i in range(sweep_aves): self.sweep_num = sweep_num my_run_simulation(self, update_plots=False) bit_errs.append(self.bit_errs) sweep_num += 1 sweep_results.append((sweep, mean(bit_errs), std(bit_errs))) self.sweep_results = sweep_results else: my_run_simulation(self)
[docs]def my_run_simulation(self, initial_run=False, update_plots=True): """ Runs the simulation. Args: self(PyBERT): Reference to an instance of the *PyBERT* class. initial_run(Bool): If True, don't update the eye diagrams, since they haven't been created, yet. (Optional; default = False.) update_plots(Bool): If True, update the plots, after simulation completes. This option can be used by larger scripts, which import *pybert*, in order to avoid graphical back-end conflicts and speed up this function's execution time. (Optional; default = True.) """ num_sweeps = self.num_sweeps sweep_num = self.sweep_num start_time = clock() self.status = 'Running channel...(sweep %d of %d)' % (sweep_num, num_sweeps) self.run_count += 1 # Force regeneration of bit stream. # Pull class variables into local storage, performing unit conversion where necessary. t = self.t t_ns = self.t_ns f = self.f w = self.w bits = self.bits symbols = self.symbols ffe = self.ffe nbits = self.nbits nui = self.nui bit_rate = self.bit_rate * 1.e9 eye_bits = self.eye_bits eye_uis = self.eye_uis nspb = self.nspb nspui = self.nspui rn = self.rn pn_mag = self.pn_mag pn_freq = self.pn_freq * 1.e6 Vod = self.vod Rs = self.rs Cs = self.cout * 1.e-12 RL = self.rin CL = self.cac * 1.e-6 Cp = self.cin * 1.e-12 R0 = self.R0 w0 = self.w0 Rdc = self.Rdc Z0 = self.Z0 v0 = self.v0 * 3.e8 Theta0 = self.Theta0 l_ch = self.l_ch pattern_len = self.pattern_len rx_bw = self.rx_bw * 1.e9 peak_freq = self.peak_freq * 1.e9 peak_mag = self.peak_mag ctle_offset = self.ctle_offset ctle_mode = self.ctle_mode delta_t = self.delta_t * 1.e-12 alpha = self.alpha ui = self.ui n_taps = self.n_taps gain = self.gain n_ave = self.n_ave decision_scaler = self.decision_scaler n_lock_ave = self.n_lock_ave rel_lock_tol = self.rel_lock_tol lock_sustain = self.lock_sustain bandwidth = self.sum_bw * 1.e9 rel_thresh = self.thresh mod_type = self.mod_type[0] try: # Calculate misc. values. fs = bit_rate * nspb Ts = t[1] ts = Ts # Generate the ideal over-sampled signal. # # Duo-binary is problematic, in that it requires convolution with the ideal duobinary # impulse response, in order to produce the proper ideal signal. x = repeat(symbols, nspui) self.x = x if(mod_type == 1): # Handle duo-binary case. duob_h = array(([0.5] + [0.] * (nspui - 1)) * 2) x = convolve(x, duob_h)[:len(t)] self.ideal_signal = x # Find the ideal crossing times, for subsequent jitter analysis of transmitted signal. ideal_xings = find_crossings(t, x, decision_scaler, min_delay=(ui / 2.), mod_type=mod_type) self.ideal_xings = ideal_xings # Calculate the channel output. # # Note: We're not using 'self.ideal_signal', because we rely on the system response to # create the duobinary waveform. We only create it explicitly, above, # so that we'll have an ideal reference for comparison. chnl_h = self.calc_chnl_h() chnl_out = convolve(self.x, chnl_h)[:len(x)] self.channel_perf = nbits * nspb / (clock() - start_time) split_time = clock() self.status = 'Running Tx...(sweep %d of %d)' % (sweep_num, num_sweeps) except Exception: self.status = 'Exception: channel' raise self.chnl_out = chnl_out self.chnl_out_H = fft(chnl_out) # Generate the output from, and the incremental/cumulative impulse/step/frequency responses of, the Tx. try: if(self.tx_use_ami): # Note: Within the PyBERT computational environment, we use normalized impulse responses, # which have units of (V/ts), where 'ts' is the sample interval. However, IBIS-AMI models expect # units of (V/s). So, we have to scale accordingly, as we transit the boundary between these two worlds. tx_cfg = self._tx_cfg # Grab the 'AMIParamConfigurator' instance for this model. # Get the model invoked and initialized, except for 'channel_response', which # we need to do several different ways, in order to gather all the data we need. tx_param_dict = tx_cfg.input_ami_params tx_model_init = AMIModelInitializer(tx_param_dict) tx_model_init.sample_interval = ts # Must be set, before 'channel_response'! tx_model_init.channel_response = [1. / ts] + [0.] * (len(chnl_h) - 1) # Start with a delta function, to capture the model's impulse response. tx_model_init.bit_time = ui tx_model = AMIModel(self.tx_dll_file) tx_model.initialize(tx_model_init) self.log("Tx IBIS-AMI model initialization results:\nInput parameters: {}\nOutput parameters: {}\nMessage: {}".format( tx_model.ami_params_in, tx_model.ami_params_out, tx_model.msg)) if(tx_cfg.fetch_param_val(['Reserved_Parameters', 'Init_Returns_Impulse'])): tx_h = array(tx_model.initOut) * ts elif(not tx_cfg.fetch_param_val(['Reserved_Parameters', 'GetWave_Exists'])): error("ERROR: Both 'Init_Returns_Impulse' and 'GetWave_Exists' are False!\n \ I cannot continue.\nThis condition is supposed to be caught sooner in the flow.") self.status = "Simulation Error." return elif(not self.tx_use_getwave): error("ERROR: You have elected not to use GetWave for a model, which does not return an impulse response!\n \ I cannot continue.\nPlease, select 'Use GetWave' and try again.", 'PyBERT Alert') self.status = "Simulation Error." return if(self.tx_use_getwave): # For GetWave, use a step to extract the model's native properties. # Position the input edge at the center of the vector, in # order to minimize high frequency artifactual energy # introduced by frequency domain processing in some models. half_len = len(chnl_h) // 2 tx_s = tx_model.getWave(array([0.] * half_len + [1.] * half_len)) # Shift the result back to the correct location, extending the last sample. tx_s = pad(tx_s[half_len:], (0, half_len), 'edge') tx_h = diff(concatenate((array([0.0]), tx_s))) # Without the leading 0, we miss the pre-tap. tx_out = tx_model.getWave(self.x) else: # Init()-only. tx_s = tx_h.cumsum() tx_out = convolve(tx_h, self.x)[:len(self.x)] else: # - Generate the ideal, post-preemphasis signal. # To consider: use 'scipy.interp()'. This is what Mark does, in order to induce jitter in the Tx output. ffe_out = convolve(symbols, ffe)[:len(symbols)] self.rel_power = mean(ffe_out ** 2) # Store the relative average power dissipated in the Tx. tx_out = repeat(ffe_out, nspui) # oversampled output # - Calculate the responses. # - (The Tx is unique in that the calculated responses aren't used to form the output. # This is partly due to the out of order nature in which we combine the Tx and channel, # and partly due to the fact that we're adding noise to the Tx output.) tx_h = array(sum([[x] + list(zeros(nspui - 1)) for x in ffe], [])) # Using sum to concatenate. tx_h.resize(len(chnl_h)) tx_s = tx_h.cumsum() temp = tx_h.copy() temp.resize(len(w)) tx_H = fft(temp) tx_H *= tx_s[-1] / abs(tx_H[0]) # - Generate the uncorrelated periodic noise. (Assume capacitive coupling.) # - Generate the ideal rectangular aggressor waveform. pn_period = 1. / pn_freq pn_samps = int(pn_period / Ts + 0.5) pn = zeros(pn_samps) pn[pn_samps // 2:] = pn_mag pn = resize(pn, len(tx_out)) # - High pass filter it. (Simulating capacitive coupling.) (b, a) = iirfilter(2, gFc/(fs/2), btype='highpass') pn = lfilter(b, a, pn)[:len(pn)] # - Add the uncorrelated periodic and random noise to the Tx output. tx_out += pn tx_out += normal(scale=rn, size=(len(tx_out),)) # - Convolve w/ channel. tx_out_h = convolve(tx_h, chnl_h)[:len(chnl_h)] temp = tx_out_h.copy() temp.resize(len(w)) tx_out_H = fft(temp) rx_in = convolve(tx_out, chnl_h)[:len(tx_out)] self.tx_s = tx_s self.tx_out = tx_out self.rx_in = rx_in self.tx_out_s = tx_out_h.cumsum() self.tx_out_p = self.tx_out_s[nspui:] - self.tx_out_s[:-nspui] self.tx_H = tx_H self.tx_h = tx_h self.tx_out_H = tx_out_H self.tx_out_h = tx_out_h self.tx_perf = nbits * nspb / (clock() - split_time) split_time = clock() self.status = 'Running CTLE...(sweep %d of %d)' % (sweep_num, num_sweeps) except Exception: self.status = 'Exception: Tx' raise # Generate the output from, and the incremental/cumulative impulse/step/frequency responses of, the CTLE. try: if(self.rx_use_ami): rx_cfg = self._rx_cfg # Grab the 'AMIParamConfigurator' instance for this model. # Get the model invoked and initialized, except for 'channel_response', which # we need to do several different ways, in order to gather all the data we need. rx_param_dict = rx_cfg.input_ami_params rx_model_init = AMIModelInitializer(rx_param_dict) rx_model_init.sample_interval = ts # Must be set, before 'channel_response'! rx_model_init.channel_response = tx_out_h / ts rx_model_init.bit_time = ui rx_model = AMIModel(self.rx_dll_file) rx_model.initialize(rx_model_init) self.log("Rx IBIS-AMI model initialization results:\nInput parameters: {}\nMessage: {}\nOutput parameters: {}".format( rx_model.ami_params_in, rx_model.msg, rx_model.ami_params_out)) if(rx_cfg.fetch_param_val(['Reserved_Parameters', 'Init_Returns_Impulse'])): ctle_out_h = array(rx_model.initOut) * ts elif(not rx_cfg.fetch_param_val(['Reserved_Parameters', 'GetWave_Exists'])): error("ERROR: Both 'Init_Returns_Impulse' and 'GetWave_Exists' are False!\n \ I cannot continue.\nThis condition is supposed to be caught sooner in the flow.") self.status = "Simulation Error." return elif(not self.rx_use_getwave): error("ERROR: You have elected not to use GetWave for a model, which does not return an impulse response!\n \ I cannot continue.\nPlease, select 'Use GetWave' and try again.", 'PyBERT Alert') self.status = "Simulation Error." return if(self.rx_use_getwave): if(False): ctle_out, clock_times = rx_model.getWave(rx_in, 32) else: ctle_out, clock_times = rx_model.getWave(rx_in, len(rx_in)) self.log(rx_model.ami_params_out) ctle_H = fft(ctle_out * signal.hann(len(ctle_out))) / fft(rx_in * signal.hann(len(rx_in))) ctle_h = real(ifft(ctle_H)[:len(chnl_h)]) ctle_out_h = convolve(ctle_h, tx_out_h)[:len(chnl_h)] else: # Init() only. ctle_out_h_padded = pad(ctle_out_h, (nspb, len(rx_in) - nspb - len(ctle_out_h)), 'linear_ramp', end_values=(0., 0.)) tx_out_h_padded = pad(tx_out_h, (nspb, len(rx_in) - nspb - len(tx_out_h)), 'linear_ramp', end_values=(0., 0.)) ctle_H = fft(ctle_out_h_padded) / fft(tx_out_h_padded) ctle_h = real(ifft(ctle_H)[:len(chnl_h)]) ctle_out = convolve(rx_in, ctle_h)[:len(tx_out)] ctle_s = ctle_h.cumsum() else: if(self.use_ctle_file): ctle_h = import_channel(self.ctle_file, ts) if(max(abs(ctle_h)) < 100.): # step response? ctle_h = diff(ctle_h) # impulse response is derivative of step response. else: ctle_h *= ts # Normalize to (V/sample) ctle_h.resize(len(t)) ctle_H = fft(ctle_h) ctle_H *= sum(ctle_h) / ctle_H[0] else: w_dummy, ctle_H = make_ctle(rx_bw, peak_freq, peak_mag, w, ctle_mode, ctle_offset) ctle_h = real(ifft(ctle_H))[:len(chnl_h)] ctle_h *= abs(ctle_H[0]) / sum(ctle_h) ctle_out = convolve(rx_in, ctle_h)[:len(tx_out)] ctle_out -= mean(ctle_out) # Force zero mean. if(self.ctle_mode == 'AGC'): # Automatic gain control engaged? ctle_out *= 2. * decision_scaler / ctle_out.ptp() ctle_s = ctle_h.cumsum() ctle_out_h = convolve(tx_out_h, ctle_h)[:len(tx_out_h)] self.ctle_s = ctle_s ctle_out_h_main_lobe = where(ctle_out_h >= max(ctle_out_h) / 2.)[0] if(len(ctle_out_h_main_lobe)): conv_dly_ix = ctle_out_h_main_lobe[0] else: conv_dly_ix = self.chnl_dly / Ts conv_dly = t[conv_dly_ix] ctle_out_s = ctle_out_h.cumsum() temp = ctle_out_h.copy() temp.resize(len(w)) ctle_out_H = fft(temp) # - Store local variables to class instance. self.ctle_out_s = ctle_out_s # Consider changing this; it could be sensitive to insufficient "front porch" in the CTLE output step response. self.ctle_out_p = self.ctle_out_s[nspui:] - self.ctle_out_s[:-nspui] self.ctle_H = ctle_H self.ctle_h = ctle_h self.ctle_out_H = ctle_out_H self.ctle_out_h = ctle_out_h self.ctle_out = ctle_out self.conv_dly = conv_dly self.conv_dly_ix = conv_dly_ix self.ctle_perf = nbits * nspb / (clock() - split_time) split_time = clock() self.status = 'Running DFE/CDR...(sweep %d of %d)' % (sweep_num, num_sweeps) except Exception: self.status = 'Exception: Rx' raise # Generate the output from, and the incremental/cumulative impulse/step/frequency responses of, the DFE. try: if(self.use_dfe): dfe = DFE(n_taps, gain, delta_t, alpha, ui, nspui, decision_scaler, mod_type, n_ave=n_ave, n_lock_ave=n_lock_ave, rel_lock_tol=rel_lock_tol, lock_sustain=lock_sustain, bandwidth=bandwidth, ideal=self.sum_ideal) else: dfe = DFE(n_taps, 0., delta_t, alpha, ui, nspui, decision_scaler, mod_type, n_ave=n_ave, n_lock_ave=n_lock_ave, rel_lock_tol=rel_lock_tol, lock_sustain=lock_sustain, bandwidth=bandwidth, ideal=True) (dfe_out, tap_weights, ui_ests, clocks, lockeds, clock_times, bits_out) = dfe.run(t, ctle_out) bits_out = array(bits_out) auto_corr = 1. * correlate(bits_out[(nbits - eye_bits):], bits[(nbits - eye_bits):], mode='same') \ / sum(bits[(nbits - eye_bits):]) auto_corr = auto_corr[len(auto_corr) // 2 :] self.auto_corr = auto_corr bit_dly = where(auto_corr == max(auto_corr))[0][0] bits_ref = bits[(nbits - eye_bits) :] bits_tst = bits_out[(nbits + bit_dly - eye_bits) :] if(len(bits_ref) > len(bits_tst)): bits_ref = bits_ref[: len(bits_tst)] elif(len(bits_tst) > len(bits_ref)): bits_tst = bits_tst[: len(bits_ref)] bit_errs = where(bits_tst ^ bits_ref)[0] self.bit_errs = len(bit_errs) dfe_h = array([1.] + list(zeros(nspb - 1)) + sum([[-x] + list(zeros(nspb - 1)) for x in tap_weights[-1]], [])) dfe_h.resize(len(ctle_out_h)) temp = dfe_h.copy() temp.resize(len(w)) dfe_H = fft(temp) self.dfe_s = dfe_h.cumsum() dfe_out_H = ctle_out_H * dfe_H dfe_out_h = convolve(ctle_out_h, dfe_h)[:len(ctle_out_h)] dfe_out_s = dfe_out_h.cumsum() self.dfe_out_p = dfe_out_s - pad(dfe_out_s[:-nspui], (nspui,0), 'constant', constant_values=(0,0)) self.dfe_H = dfe_H self.dfe_h = dfe_h self.dfe_out_H = dfe_out_H self.dfe_out_h = dfe_out_h self.dfe_out_s = dfe_out_s self.dfe_out = dfe_out self.dfe_perf = nbits * nspb / (clock() - split_time) split_time = clock() self.status = 'Analyzing jitter...(sweep %d of %d)' % (sweep_num, num_sweeps) except Exception: self.status = 'Exception: DFE' raise # Save local variables to class instance for state preservation, performing unit conversion where necessary. self.adaptation = tap_weights self.ui_ests = array(ui_ests) * 1.e12 # (ps) self.clocks = clocks self.lockeds = lockeds self.clock_times = clock_times # Analyze the jitter. self.thresh_tx = array([]) self.jitter_ext_tx = array([]) self.jitter_tx = array([]) self.jitter_spectrum_tx = array([]) self.jitter_ind_spectrum_tx = array([]) self.thresh_ctle = array([]) self.jitter_ext_ctle = array([]) self.jitter_ctle = array([]) self.jitter_spectrum_ctle = array([]) self.jitter_ind_spectrum_ctle = array([]) self.thresh_dfe = array([]) self.jitter_ext_dfe = array([]) self.jitter_dfe = array([]) self.jitter_spectrum_dfe = array([]) self.jitter_ind_spectrum_dfe = array([]) self.f_MHz_dfe = array([]) self.jitter_rejection_ratio = array([]) try: if(mod_type == 1): # Handle duo-binary case. pattern_len *= 2 # Because, the XOR pre-coding can invert every other pattern rep. if(mod_type == 2): # Handle PAM-4 case. if(pattern_len % 2): pattern_len *= 2 # Because, the bits are taken in pairs, to form the symbols. # - channel output actual_xings = find_crossings(t, chnl_out, decision_scaler, mod_type=mod_type) (jitter, t_jitter, isi, dcd, pj, rj, jitter_ext, \ thresh, jitter_spectrum, jitter_ind_spectrum, spectrum_freqs, \ hist, hist_synth, bin_centers) = calc_jitter(ui, nui, pattern_len, ideal_xings, actual_xings, rel_thresh) self.t_jitter = t_jitter self.isi_chnl = isi self.dcd_chnl = dcd self.pj_chnl = pj self.rj_chnl = rj self.thresh_chnl = thresh self.jitter_chnl = hist self.jitter_ext_chnl = hist_synth self.jitter_bins = bin_centers self.jitter_spectrum_chnl = jitter_spectrum self.jitter_ind_spectrum_chnl = jitter_ind_spectrum self.f_MHz = array(spectrum_freqs) * 1.e-6 # - Tx output actual_xings = find_crossings(t, rx_in, decision_scaler, mod_type=mod_type) (jitter, t_jitter, isi, dcd, pj, rj, jitter_ext, \ thresh, jitter_spectrum, jitter_ind_spectrum, spectrum_freqs, \ hist, hist_synth, bin_centers) = calc_jitter(ui, nui, pattern_len, ideal_xings, actual_xings, rel_thresh) self.isi_tx = isi self.dcd_tx = dcd self.pj_tx = pj self.rj_tx = rj self.thresh_tx = thresh self.jitter_tx = hist self.jitter_ext_tx = hist_synth self.jitter_spectrum_tx = jitter_spectrum self.jitter_ind_spectrum_tx = jitter_ind_spectrum # - CTLE output actual_xings = find_crossings(t, ctle_out, decision_scaler, mod_type=mod_type) (jitter, t_jitter, isi, dcd, pj, rj, jitter_ext, \ thresh, jitter_spectrum, jitter_ind_spectrum, spectrum_freqs, \ hist, hist_synth, bin_centers) = calc_jitter(ui, nui, pattern_len, ideal_xings, actual_xings, rel_thresh) self.isi_ctle = isi self.dcd_ctle = dcd self.pj_ctle = pj self.rj_ctle = rj self.thresh_ctle = thresh self.jitter_ctle = hist self.jitter_ext_ctle = hist_synth self.jitter_spectrum_ctle = jitter_spectrum self.jitter_ind_spectrum_ctle = jitter_ind_spectrum # - DFE output ignore_until = (nui - eye_uis) * ui + 0.75 * ui # 0.5 was causing an occasional misalignment. ideal_xings = array(filter(lambda x: x > ignore_until, list(ideal_xings))) min_delay = ignore_until + conv_dly actual_xings = find_crossings(t, dfe_out, decision_scaler, min_delay=min_delay, mod_type=mod_type, rising_first=False, ) (jitter, t_jitter, isi, dcd, pj, rj, jitter_ext, \ thresh, jitter_spectrum, jitter_ind_spectrum, spectrum_freqs, \ hist, hist_synth, bin_centers) = calc_jitter(ui, eye_uis, pattern_len, ideal_xings, actual_xings, rel_thresh) self.isi_dfe = isi self.dcd_dfe = dcd self.pj_dfe = pj self.rj_dfe = rj self.thresh_dfe = thresh self.jitter_dfe = hist self.jitter_ext_dfe = hist_synth self.jitter_spectrum_dfe = jitter_spectrum self.jitter_ind_spectrum_dfe = jitter_ind_spectrum self.f_MHz_dfe = array(spectrum_freqs) * 1.e-6 skip_factor = nbits / eye_bits ctle_spec = self.jitter_spectrum_ctle dfe_spec = self.jitter_spectrum_dfe self.jitter_rejection_ratio = zeros(len(dfe_spec)) self.jitter_perf = nbits * nspb / (clock() - split_time) self.total_perf = nbits * nspb / (clock() - start_time) split_time = clock() self.status = 'Updating plots...(sweep %d of %d)' % (sweep_num, num_sweeps) except Exception: self.status = 'Exception: jitter' # raise # Update plots. try: if(update_plots): update_results(self) if(not initial_run): update_eyes(self) self.plotting_perf = nbits * nspb / (clock() - split_time) self.status = 'Ready.' except Exception: self.status = 'Exception: plotting' raise
# Plot updating
[docs]def update_results(self): """ Updates all plot data used by GUI. Args: self(PyBERT): Reference to an instance of the *PyBERT* class. """ # Copy globals into local namespace. ui = self.ui samps_per_ui = self.nspui eye_uis = self.eye_uis num_ui = self.nui clock_times = self.clock_times f = self.f t = self.t t_ns = self.t_ns t_ns_chnl = self.t_ns_chnl conv_dly_ix = self.conv_dly_ix n_taps = self.n_taps Ts = t[1] ignore_until = (num_ui - eye_uis) * ui ignore_samps = (num_ui - eye_uis) * samps_per_ui # Misc. f_GHz = f[:len(f) // 2] / 1.e9 len_f_GHz = len(f_GHz) self.plotdata.set_data("f_GHz", f_GHz[1:]) self.plotdata.set_data("t_ns", t_ns) self.plotdata.set_data("t_ns_chnl", t_ns_chnl) # DFE. tap_weights = transpose(array(self.adaptation)) i = 1 for tap_weight in tap_weights: self.plotdata.set_data("tap%d_weights" % i, tap_weight) i += 1 self.plotdata.set_data("tap_weight_index", range(len(tap_weight))) if(self._old_n_taps != n_taps): new_plot = Plot(self.plotdata, auto_colors=['red', 'orange', 'yellow', 'green', 'blue', 'purple'], padding_left=75) for i in range(self.n_taps): new_plot.plot(("tap_weight_index", "tap%d_weights" % (i + 1)), type="line", color="auto", name="tap%d"%(i+1)) new_plot.title = "DFE Adaptation" new_plot.tools.append(PanTool(new_plot, constrain=True, constrain_key=None, constrain_direction='x')) zoom9 = ZoomTool(new_plot, tool_mode="range", axis='index', always_on=False) new_plot.overlays.append(zoom9) new_plot.legend.visible = True new_plot.legend.align = 'ul' self.plots_dfe.remove(self._dfe_plot) self.plots_dfe.insert(1, new_plot) self._dfe_plot = new_plot self._old_n_taps = n_taps clock_pers = diff(clock_times) start_t = t[where(self.lockeds)[0][0]] start_ix = where(clock_times > start_t)[0][0] (bin_counts, bin_edges) = histogram(clock_pers[start_ix:], bins=100) bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2. clock_spec = fft(clock_pers[start_ix:]) clock_spec = abs(clock_spec[:len(clock_spec) / 2]) spec_freqs = arange(len(clock_spec)) / (2. * len(clock_spec)) # In this case, fNyquist = half the bit rate. clock_spec /= clock_spec[1:].mean() # Normalize the mean non-d.c. value to 0 dB. self.plotdata.set_data("clk_per_hist_bins", bin_centers * 1.e12) # (ps) self.plotdata.set_data("clk_per_hist_vals", bin_counts) self.plotdata.set_data("clk_spec", 10. * log10(clock_spec[1:])) # Omit the d.c. value. self.plotdata.set_data("clk_freqs", spec_freqs[1:]) self.plotdata.set_data("dfe_out", self.dfe_out) self.plotdata.set_data("ui_ests", self.ui_ests) self.plotdata.set_data("clocks", self.clocks) self.plotdata.set_data("lockeds", self.lockeds) # Impulse responses self.plotdata.set_data("chnl_h", self.chnl_h * 1.e-9 / Ts) # Re-normalize to (V/ns), for plotting. self.plotdata.set_data("tx_h", self.tx_h * 1.e-9 / Ts) self.plotdata.set_data("tx_out_h", self.tx_out_h * 1.e-9 / Ts) self.plotdata.set_data("ctle_h", self.ctle_h * 1.e-9 / Ts) self.plotdata.set_data("ctle_out_h", self.ctle_out_h * 1.e-9 / Ts) self.plotdata.set_data("dfe_h", self.dfe_h * 1.e-9 / Ts) self.plotdata.set_data("dfe_out_h", self.dfe_out_h * 1.e-9 / Ts) # Step responses self.plotdata.set_data("chnl_s", self.chnl_s) self.plotdata.set_data("tx_s", self.tx_s) self.plotdata.set_data("tx_out_s", self.tx_out_s) self.plotdata.set_data("ctle_s", self.ctle_s) self.plotdata.set_data("ctle_out_s", self.ctle_out_s) self.plotdata.set_data("dfe_s", self.dfe_s) self.plotdata.set_data("dfe_out_s", self.dfe_out_s) # Pulse responses self.plotdata.set_data("chnl_p", self.chnl_p) self.plotdata.set_data("tx_out_p", self.tx_out_p) self.plotdata.set_data("ctle_out_p", self.ctle_out_p) self.plotdata.set_data("dfe_out_p", self.dfe_out_p) # Outputs self.plotdata.set_data("ideal_signal", self.ideal_signal) self.plotdata.set_data("chnl_out", self.chnl_out) self.plotdata.set_data("tx_out", self.rx_in) self.plotdata.set_data("ctle_out", self.ctle_out) self.plotdata.set_data("dfe_out", self.dfe_out) self.plotdata.set_data("auto_corr", self.auto_corr) # Frequency responses self.plotdata.set_data("chnl_H", 20. * log10(abs(self.chnl_H [1 : len_f_GHz]))) self.plotdata.set_data("chnl_trimmed_H", 20. * log10(abs(self.chnl_trimmed_H [1 : len_f_GHz]))) self.plotdata.set_data("tx_H", 20. * log10(abs(self.tx_H [1 : len_f_GHz]))) self.plotdata.set_data("tx_out_H", 20. * log10(abs(self.tx_out_H [1 : len_f_GHz]))) self.plotdata.set_data("ctle_H", 20. * log10(abs(self.ctle_H [1 : len_f_GHz]))) self.plotdata.set_data("ctle_out_H", 20. * log10(abs(self.ctle_out_H[1 : len_f_GHz]))) self.plotdata.set_data("dfe_H", 20. * log10(abs(self.dfe_H [1 : len_f_GHz]))) self.plotdata.set_data("dfe_out_H", 20. * log10(abs(self.dfe_out_H [1 : len_f_GHz]))) # Jitter distributions jitter_ext_chnl = self.jitter_ext_chnl # These are used, again, in bathtub curve generation, below. jitter_ext_tx = self.jitter_ext_tx jitter_ext_ctle = self.jitter_ext_ctle jitter_ext_dfe = self.jitter_ext_dfe self.plotdata.set_data("jitter_bins", array(self.jitter_bins) * 1.e12) self.plotdata.set_data("jitter_chnl", self.jitter_chnl) self.plotdata.set_data("jitter_ext_chnl", jitter_ext_chnl) self.plotdata.set_data("jitter_tx", self.jitter_tx) self.plotdata.set_data("jitter_ext_tx", jitter_ext_tx) self.plotdata.set_data("jitter_ctle", self.jitter_ctle) self.plotdata.set_data("jitter_ext_ctle", jitter_ext_ctle) self.plotdata.set_data("jitter_dfe", self.jitter_dfe) self.plotdata.set_data("jitter_ext_dfe", jitter_ext_dfe) # Jitter spectrums log10_ui = log10(ui) self.plotdata.set_data("f_MHz", self.f_MHz[1:]) self.plotdata.set_data("f_MHz_dfe", self.f_MHz_dfe[1:]) self.plotdata.set_data("jitter_spectrum_chnl", 10. * (log10(self.jitter_spectrum_chnl [1:]) - log10_ui)) self.plotdata.set_data("jitter_ind_spectrum_chnl", 10. * (log10(self.jitter_ind_spectrum_chnl [1:]) - log10_ui)) self.plotdata.set_data("thresh_chnl", 10. * (log10(self.thresh_chnl [1:]) - log10_ui)) self.plotdata.set_data("jitter_spectrum_tx", 10. * (log10(self.jitter_spectrum_tx [1:]) - log10_ui)) self.plotdata.set_data("jitter_ind_spectrum_tx", 10. * (log10(self.jitter_ind_spectrum_tx [1:]) - log10_ui)) self.plotdata.set_data("thresh_tx", 10. * (log10(self.thresh_tx [1:]) - log10_ui)) self.plotdata.set_data("jitter_spectrum_ctle", 10. * (log10(self.jitter_spectrum_ctle [1:]) - log10_ui)) self.plotdata.set_data("jitter_ind_spectrum_ctle", 10. * (log10(self.jitter_ind_spectrum_ctle [1:]) - log10_ui)) self.plotdata.set_data("thresh_ctle", 10. * (log10(self.thresh_ctle [1:]) - log10_ui)) self.plotdata.set_data("jitter_spectrum_dfe", 10. * (log10(self.jitter_spectrum_dfe [1:]) - log10_ui)) self.plotdata.set_data("jitter_ind_spectrum_dfe", 10. * (log10(self.jitter_ind_spectrum_dfe [1:]) - log10_ui)) self.plotdata.set_data("thresh_dfe", 10. * (log10(self.thresh_dfe [1:]) - log10_ui)) self.plotdata.set_data("jitter_rejection_ratio", self.jitter_rejection_ratio[1:]) # Bathtubs half_len = len(jitter_ext_chnl) / 2 # - Channel bathtub_chnl = list(cumsum(jitter_ext_chnl[-1 : -(half_len + 1) : -1])) bathtub_chnl.reverse() bathtub_chnl = array(bathtub_chnl + list(cumsum(jitter_ext_chnl[:half_len + 1]))) bathtub_chnl = where(bathtub_chnl < MIN_BATHTUB_VAL, 0.1 * MIN_BATHTUB_VAL * ones(len(bathtub_chnl)), bathtub_chnl) # To avoid Chaco log scale plot wierdness. self.plotdata.set_data("bathtub_chnl", log10(bathtub_chnl)) # - Tx bathtub_tx = list(cumsum(jitter_ext_tx[-1 : -(half_len + 1) : -1])) bathtub_tx.reverse() bathtub_tx = array(bathtub_tx + list(cumsum(jitter_ext_tx[:half_len + 1]))) bathtub_tx = where(bathtub_tx < MIN_BATHTUB_VAL, 0.1 * MIN_BATHTUB_VAL * ones(len(bathtub_tx)), bathtub_tx) # To avoid Chaco log scale plot wierdness. self.plotdata.set_data("bathtub_tx", log10(bathtub_tx)) # - CTLE bathtub_ctle = list(cumsum(jitter_ext_ctle[-1 : -(half_len + 1) : -1])) bathtub_ctle.reverse() bathtub_ctle = array(bathtub_ctle + list(cumsum(jitter_ext_ctle[:half_len + 1]))) bathtub_ctle = where(bathtub_ctle < MIN_BATHTUB_VAL, 0.1 * MIN_BATHTUB_VAL * ones(len(bathtub_ctle)), bathtub_ctle) # To avoid Chaco log scale plot wierdness. self.plotdata.set_data("bathtub_ctle", log10(bathtub_ctle)) # - DFE bathtub_dfe = list(cumsum(jitter_ext_dfe[-1 : -(half_len + 1) : -1])) bathtub_dfe.reverse() bathtub_dfe = array(bathtub_dfe + list(cumsum(jitter_ext_dfe[:half_len + 1]))) bathtub_dfe = where(bathtub_dfe < MIN_BATHTUB_VAL, 0.1 * MIN_BATHTUB_VAL * ones(len(bathtub_dfe)), bathtub_dfe) # To avoid Chaco log scale plot wierdness. self.plotdata.set_data("bathtub_dfe", log10(bathtub_dfe)) # Eyes width = 2 * samps_per_ui xs = linspace(-ui * 1.e12, ui * 1.e12, width) height = 100 y_max = 1.1 * max(abs(array(self.chnl_out))) eye_chnl = calc_eye(ui, samps_per_ui, height, self.chnl_out[ignore_samps:], y_max) y_max = 1.1 * max(abs(array(self.rx_in))) eye_tx = calc_eye(ui, samps_per_ui, height, self.rx_in[ignore_samps:], y_max) y_max = 1.1 * max(abs(array(self.ctle_out))) eye_ctle = calc_eye(ui, samps_per_ui, height, self.ctle_out[ignore_samps:], y_max) i = 0 while(clock_times[i] <= ignore_until): i += 1 assert i < len(clock_times), "ERROR: Insufficient coverage in 'clock_times' vector." y_max = 1.1 * max(abs(array(self.dfe_out))) eye_dfe = calc_eye(ui, samps_per_ui, height, self.dfe_out, y_max, clock_times[i:]) self.plotdata.set_data("eye_index", xs) self.plotdata.set_data("eye_chnl", eye_chnl) self.plotdata.set_data("eye_tx", eye_tx) self.plotdata.set_data("eye_ctle", eye_ctle) self.plotdata.set_data("eye_dfe", eye_dfe)
[docs]def update_eyes(self): """ Update the heat plots representing the eye diagrams. Args: self(PyBERT): Reference to an instance of the *PyBERT* class. """ ui = self.ui samps_per_ui = self.nspui width = 2 * samps_per_ui height = 100 xs = linspace(-ui * 1.e12, ui * 1.e12, width) y_max = 1.1 * max(abs(array(self.chnl_out))) ys = linspace(-y_max, y_max, height) self.plots_eye.components[0].components[0].index.set_data(xs, ys) self.plots_eye.components[0].x_axis.mapper.range.low = xs[0] self.plots_eye.components[0].x_axis.mapper.range.high = xs[-1] self.plots_eye.components[0].y_axis.mapper.range.low = ys[0] self.plots_eye.components[0].y_axis.mapper.range.high = ys[-1] self.plots_eye.components[0].invalidate_draw() y_max = 1.1 * max(abs(array(self.rx_in))) ys = linspace(-y_max, y_max, height) self.plots_eye.components[1].components[0].index.set_data(xs, ys) self.plots_eye.components[1].x_axis.mapper.range.low = xs[0] self.plots_eye.components[1].x_axis.mapper.range.high = xs[-1] self.plots_eye.components[1].y_axis.mapper.range.low = ys[0] self.plots_eye.components[1].y_axis.mapper.range.high = ys[-1] self.plots_eye.components[1].invalidate_draw() y_max = 1.1 * max(abs(array(self.dfe_out))) ys = linspace(-y_max, y_max, height) self.plots_eye.components[3].components[0].index.set_data(xs, ys) self.plots_eye.components[3].x_axis.mapper.range.low = xs[0] self.plots_eye.components[3].x_axis.mapper.range.high = xs[-1] self.plots_eye.components[3].y_axis.mapper.range.low = ys[0] self.plots_eye.components[3].y_axis.mapper.range.high = ys[-1] self.plots_eye.components[3].invalidate_draw() self.plots_eye.components[2].components[0].index.set_data(xs, ys) self.plots_eye.components[2].x_axis.mapper.range.low = xs[0] self.plots_eye.components[2].x_axis.mapper.range.high = xs[-1] self.plots_eye.components[2].y_axis.mapper.range.low = ys[0] self.plots_eye.components[2].y_axis.mapper.range.high = ys[-1] self.plots_eye.components[2].invalidate_draw() self.plots_eye.request_redraw()