Source code for pybert.pybert_cntrl

"""
Default controller definition for PyBERT class.

Original author: David Banas <capn.freako@gmail.com>  

Original date:   August 24, 2014 (Copied from `pybert.py', as part of a major code cleanup.)

Copyright (c) 2014 David Banas; all rights reserved World wide.
"""

from time         import clock, sleep

from chaco.api       import Plot
from chaco.tools.api import PanTool, ZoomTool

from numpy        import sign, sin, pi, array, linspace, zeros, ones, repeat, where, sqrt, histogram, arange, append
from numpy        import diff, log10, correlate, convolve, mean, resize, real, transpose, cumsum, diff, std, pad
from numpy.random import normal
from numpy.fft    import fft, ifft
from scipy.signal import lfilter, iirfilter, freqz, fftconvolve

from dfe          import DFE
from cdr          import CDR

from pyibisami.amimodel import AMIModel, AMIModelInitializer
from pybert_util  import find_crossings, make_ctle, calc_jitter, moving_average, calc_eye, import_qucs_csv

DEBUG           = False
MIN_BATHTUB_VAL = 1.e-18

gFc     = 1.e6     # Corner frequency of high-pass filter used to model capacitive coupling of periodic noise.

[docs]def my_run_sweeps(self): """ Runs the simulation sweeps. Args: self(PyBERT): Reference to an instance of the *PyBERT* class. """ sweep_aves = self.sweep_aves do_sweep = self.do_sweep tx_taps = self.tx_taps if(do_sweep): # Assemble the list of desired values for each sweepable parameter. sweep_vals = [] for tap in tx_taps: if(tap.enabled): if(tap.steps): sweep_vals.append(list(arange(tap.min_val, tap.max_val, (tap.max_val - tap.min_val) / tap.steps))) else: sweep_vals.append([tap.value]) else: sweep_vals.append([0.]) # Run the sweep, using the lists assembled, above. sweeps = [[w, x, y, z] for w in sweep_vals[0] for x in sweep_vals[1] for y in sweep_vals[2] for z in sweep_vals[3] ] num_sweeps = sweep_aves * len(sweeps) self.num_sweeps = num_sweeps sweep_results = [] sweep_num = 1 for sweep in sweeps: for i in range(4): self.tx_taps[i].value = sweep[i] bit_errs = [] for i in range(sweep_aves): self.sweep_num = sweep_num my_run_simulation(self, update_plots=False) bit_errs.append(self.bit_errs) sweep_num += 1 sweep_results.append((sweep, mean(bit_errs), std(bit_errs))) self.sweep_results = sweep_results else: my_run_simulation(self)
[docs]def my_run_simulation(self, initial_run=False, update_plots=True): """ Runs the simulation. Args: self(PyBERT): Reference to an instance of the *PyBERT* class. initial_run(Bool): If True, don't update the eye diagrams, since they haven't been created, yet. (Optional; default = False.) update_plots(Bool): If True, update the plots, after simulation completes. This option can be used by larger scripts, which import *pybert*, in order to avoid graphical back-end conflicts and speed up this function's execution time. (Optional; default = True.) """ num_sweeps = self.num_sweeps sweep_num = self.sweep_num start_time = clock() self.status = 'Running channel...(sweep %d of %d)' % (sweep_num, num_sweeps) self.run_count += 1 # Force regeneration of bit stream. # Pull class variables into local storage, performing unit conversion where necessary. t = self.t t_ns = self.t_ns f = self.f w = self.w bits = self.bits symbols = self.symbols ffe = self.ffe nbits = self.nbits nui = self.nui bit_rate = self.bit_rate * 1.e9 eye_bits = self.eye_bits eye_uis = self.eye_uis nspb = self.nspb nspui = self.nspui rn = self.rn pn_mag = self.pn_mag pn_freq = self.pn_freq * 1.e6 Vod = self.vod Rs = self.rs Cs = self.cout * 1.e-12 RL = self.rin CL = self.cac * 1.e-6 Cp = self.cin * 1.e-12 R0 = self.R0 w0 = self.w0 Rdc = self.Rdc Z0 = self.Z0 v0 = self.v0 * 3.e8 Theta0 = self.Theta0 l_ch = self.l_ch pattern_len = self.pattern_len rx_bw = self.rx_bw * 1.e9 peak_freq = self.peak_freq * 1.e9 peak_mag = self.peak_mag ctle_offset = self.ctle_offset ctle_mode = self.ctle_mode delta_t = self.delta_t * 1.e-12 alpha = self.alpha ui = self.ui n_taps = self.n_taps gain = self.gain n_ave = self.n_ave decision_scaler = self.decision_scaler n_lock_ave = self.n_lock_ave rel_lock_tol = self.rel_lock_tol lock_sustain = self.lock_sustain bandwidth = self.sum_bw * 1.e9 rel_thresh = self.thresh mod_type = self.mod_type[0] # Calculate misc. values. fs = bit_rate * nspb Ts = t[1] ts = Ts # Generate the ideal over-sampled signal. # # Duo-binary is problematic, in that it requires convolution with the ideal duobinary # impulse response, in order to produce the proper ideal signal. x = repeat(symbols, nspui) self.x = x if(mod_type == 1): # Handle duo-binary case. duob_h = array(([0.5] + [0.] * (nspui - 1)) * 2) x = convolve(x, duob_h)[:len(t)] self.ideal_signal = x # Find the ideal crossing times, for subsequent jitter analysis of transmitted signal. ideal_xings = find_crossings(t, x, decision_scaler, min_delay=(ui / 2.), mod_type=mod_type) self.ideal_xings = ideal_xings # Calculate the channel output. # # Note: We're not using 'self.ideal_signal', because we rely on the system response to # create the duobinary waveform. We only create it explicitly, above, # so that we'll have an ideal reference for comparison. chnl_h = self.calc_chnl_h() chnl_out = convolve(self.x, chnl_h)[:len(x)] self.channel_perf = nbits * nspb / (clock() - start_time) split_time = clock() self.status = 'Running Tx...(sweep %d of %d)' % (sweep_num, num_sweeps) # Generate the output from, and the incremental/cumulative impulse/step/frequency responses of, the Tx. if(self.tx_use_ami): tx_cfg = self._tx_cfg # Grab the 'AMIParamConfigurator' instance for this model. # Get the model invoked and initialized, except for 'channel_response', which # we need to do several different ways, in order to gather all the data we need. tx_param_dict = tx_cfg.input_ami_params tx_model_init = AMIModelInitializer(tx_param_dict) tx_model_init.sample_interval = ts # Must be set, before 'channel_response'! tx_model_init.channel_response = [1.] + [0.] * (len(chnl_h) - 1) # Start with a delta function, to capture the model's impulse response. tx_model_init.bit_time = ui tx_model = AMIModel(self.tx_dll_file) tx_model.initialize(tx_model_init) self.log("Tx IBIS-AMI model initialization results:\nInput parameters: {}\nOutput parameters: {}\nMessage: {}".format( tx_model.ami_params_in, tx_model.ami_params_out, tx_model.msg)) if(tx_cfg.fetch_param_val(['Reserved_Parameters', 'Init_Returns_Impulse'])): tx_h = array(tx_model.initOut) elif(not tx_cfg.fetch_param_val(['Reserved_Parameters', 'GetWave_Exists'])): error("ERROR: Both 'Init_Returns_Impulse' and 'GetWave_Exists' are False!\n \ I cannot continue.\nThis condition is supposed to be caught sooner in the flow.") self.status = "Simulation Error." return elif(not self.tx_use_getwave): error("ERROR: You have elected not to use GetWave for a model, which does not return an impulse response!\n \ I cannot continue.\nPlease, select 'Use GetWave' and try again.", 'PyBERT Alert') self.status = "Simulation Error." return if(self.tx_use_getwave): # For GetWave, use a step to extract the model's native properties. # Position the input edge at the center of the vector, in # order to minimize high frequency artifactual energy # introduced by frequency domain processing in some models. half_len = len(chnl_h) // 2 tx_s = tx_model.getWave([0.] * half_len + [1.] * half_len) # Shift the result back to the correct location, extending the last sample. tx_s = pad(tx_s[half_len:], (0, half_len), 'edge') tx_h = diff(tx_s) tx_out = tx_model.getWave(x) else: tx_s = tx_h.cumsum() tx_out = convolve(tx_h, self.x)[:len(self.x)] else: # - Generate the ideal, post-preemphasis signal. # To consider: use 'scipy.interp()'. This is what Mark does, in order to induce jitter in the Tx output. ffe_out = convolve(symbols, ffe)[:len(symbols)] self.rel_power = mean(ffe_out ** 2) # Store the relative average power dissipated in the Tx. tx_out = repeat(ffe_out, nspui) # oversampled output # - Calculate the responses. # - (The Tx is unique in that the calculated responses aren't used to form the output. # This is partly due to the out of order nature in which we combine the Tx and channel, # and partly due to the fact that we're adding noise to the Tx output.) tx_h = array(sum([[x] + list(zeros(nspui - 1)) for x in ffe], [])) # Using sum to concatenate. tx_h.resize(len(chnl_h)) tx_s = tx_h.cumsum() temp = tx_h.copy() temp.resize(len(w)) tx_H = fft(temp) tx_H *= tx_s[-1] / abs(tx_H[0]) # - Generate the uncorrelated periodic noise. (Assume capacitive coupling.) # - Generate the ideal rectangular aggressor waveform. pn_period = 1. / pn_freq pn_samps = int(pn_period / Ts + 0.5) pn = zeros(pn_samps) pn[pn_samps // 2:] = pn_mag pn = resize(pn, len(tx_out)) # - High pass filter it. (Simulating capacitive coupling.) (b, a) = iirfilter(2, gFc/(fs/2), btype='highpass') pn = lfilter(b, a, pn)[:len(pn)] # - Add the uncorrelated periodic and random noise to the Tx output. tx_out += pn tx_out += normal(scale=rn, size=(len(tx_out),)) # - Convolve w/ channel. tx_out_h = convolve(tx_h, chnl_h)[:len(chnl_h)] temp = tx_out_h.copy() temp.resize(len(w)) tx_out_H = fft(temp) rx_in = convolve(tx_out, chnl_h)[:len(tx_out)] self.tx_s = tx_s self.tx_out = tx_out self.rx_in = rx_in self.tx_out_s = tx_out_h.cumsum() self.tx_out_p = self.tx_out_s[nspui:] - self.tx_out_s[:-nspui] self.tx_H = tx_H self.tx_h = tx_h self.tx_out_H = tx_out_H self.tx_out_h = tx_out_h self.tx_perf = nbits * nspb / (clock() - split_time) split_time = clock() self.status = 'Running CTLE...(sweep %d of %d)' % (sweep_num, num_sweeps) # Generate the output from, and the incremental/cumulative impulse/step/frequency responses of, the CTLE. if(self.rx_use_ami): rx_cfg = self._rx_cfg # Grab the 'AMIParamConfigurator' instance for this model. # Get the model invoked and initialized, except for 'channel_response', which # we need to do several different ways, in order to gather all the data we need. rx_param_dict = rx_cfg.input_ami_params rx_model_init = AMIModelInitializer(rx_param_dict) rx_model_init.sample_interval = ts # Must be set, before 'channel_response'! rx_model_init.channel_response = [1.] + [0.] * (len(chnl_h) - 1) # Start with a delta function, to capture the model's impulse response. rx_model_init.bit_time = ui rx_model = AMIModel(self.rx_dll_file) rx_model.initialize(rx_model_init) self.log("Rx IBIS-AMI model initialization results:\nInput parameters: {}\nOutput parameters: {}\nMessage: {}".format( rx_model.ami_params_in, rx_model.ami_params_out, rx_model.msg)) if(rx_cfg.fetch_param_val(['Reserved_Parameters', 'Init_Returns_Impulse'])): ctle_h = array(rx_model.initOut) elif(not rx_cfg.fetch_param_val(['Reserved_Parameters', 'GetWave_Exists'])): error("ERROR: Both 'Init_Returns_Impulse' and 'GetWave_Exists' are False!\n \ I cannot continue.\nThis condition is supposed to be caught sooner in the flow.") self.status = "Simulation Error." return elif(not self.rx_use_getwave): error("ERROR: You have elected not to use GetWave for a model, which does not return an impulse response!\n \ I cannot continue.\nPlease, select 'Use GetWave' and try again.", 'PyBERT Alert') self.status = "Simulation Error." return if(self.rx_use_getwave): # For GetWave, use a step to extract the model's native properties. # In case the model is adaptive, place this step at the end # of the actual signal vector. And make it large amplitude # and d.c. balanced, to avoid triggering d.c. blocking # behavior, or DFE oscillation. # Tacking on the step, composed of step_len '0's followed by step_len '1's. step_len = 64 step_mag = 0.2 wave_in = append(rx_in.copy(), array([-step_mag / 2.] * step_len * nspui + [step_mag / 2.] * step_len * nspui)) input_len = len(wave_in) row_size = 128 # Some models are persnickity about integral power of 2. idx = 0 # Holds the starting index of the next processing chunk. wave_out = [] while(idx < input_len): if((input_len - idx) >= row_size): wave_out.extend(rx_model.getWave(wave_in[idx : idx + row_size], row_size)) else: wave_out.extend(rx_model.getWave(wave_in[idx :].resize(row_size), row_size)) self.log("AMI output parameters from GetWave call:\n{}".format(rx_model.ami_params_out)) idx += row_size ctle_out = array(wave_out[:-2 * step_len * nspui]) ctle_s = array(wave_out[-step_len * nspui:]) ctle_s = (ctle_s - ctle_s[0]) / step_mag # Compensating for choice of input amplitude/offset. # Calculate the associated impulse response. ctle_h = diff(ctle_s) ctle_h.resize(len(t)) else: ctle_s = ctle_h.cumsum() ctle_out = convolve(ctle_h, rx_in)[:len(rx_in)] ctle_h.resize(len(t)) ctle_H = fft(ctle_h) ctle_H *= ctle_s[-1] / abs(ctle_H[0]) else: if(self.use_ctle_file): ctle_h = import_qucs_csv(self.ctle_file, ts) if(max(abs(ctle_h)) < 100.): # step response? ctle_h = diff(ctle_h) # impulse response is derivative of step response. else: ctle_h *= ts # Normalize to (V/sample) ctle_h.resize(len(t)) ctle_H = fft(ctle_h) ctle_H *= sum(ctle_h) / ctle_H[0] else: w_dummy, ctle_H = make_ctle(rx_bw, peak_freq, peak_mag, w, ctle_mode, ctle_offset) ctle_h = real(ifft(ctle_H))[:len(chnl_h)] ctle_h *= abs(ctle_H[0]) / sum(ctle_h) ctle_out = convolve(rx_in, ctle_h)[:len(tx_out)] ctle_out -= mean(ctle_out) # Force zero mean. if(self.ctle_mode == 'AGC'): # Automatic gain control engaged? ctle_out *= 2. * decision_scaler / ctle_out.ptp() ctle_s = ctle_h.cumsum() self.ctle_s = ctle_s ctle_out_h = convolve(tx_out_h, ctle_h)[:len(tx_out_h)] ctle_out_h_main_lobe = where(ctle_out_h >= max(ctle_out_h) / 2.)[0] if(len(ctle_out_h_main_lobe)): conv_dly_ix = ctle_out_h_main_lobe[0] else: conv_dly_ix = self.chnl_dly / Ts conv_dly = t[conv_dly_ix] ctle_out_s = ctle_out_h.cumsum() temp = ctle_out_h.copy() temp.resize(len(w)) ctle_out_H = fft(temp) # self.rel_opt = -self.cost # Triggers update to EQ tuning plot data. # - Store local variables to class instance. self.ctle_out_s = ctle_out_s # Consider changing this; it could be sensitive to insufficient "front porch" in the CTLE output step response. self.ctle_out_p = self.ctle_out_s[nspui:] - self.ctle_out_s[:-nspui] self.ctle_H = ctle_H self.ctle_h = ctle_h self.ctle_out_H = ctle_out_H self.ctle_out_h = ctle_out_h self.ctle_out = ctle_out self.conv_dly = conv_dly self.conv_dly_ix = conv_dly_ix self.ctle_perf = nbits * nspb / (clock() - split_time) split_time = clock() self.status = 'Running DFE/CDR...(sweep %d of %d)' % (sweep_num, num_sweeps) # Generate the output from, and the incremental/cumulative impulse/step/frequency responses of, the DFE. if(self.use_dfe): dfe = DFE(n_taps, gain, delta_t, alpha, ui, nspui, decision_scaler, mod_type, n_ave=n_ave, n_lock_ave=n_lock_ave, rel_lock_tol=rel_lock_tol, lock_sustain=lock_sustain, bandwidth=bandwidth, ideal=self.sum_ideal) else: dfe = DFE(n_taps, 0., delta_t, alpha, ui, nspui, decision_scaler, mod_type, n_ave=n_ave, n_lock_ave=n_lock_ave, rel_lock_tol=rel_lock_tol, lock_sustain=lock_sustain, bandwidth=bandwidth, ideal=True) (dfe_out, tap_weights, ui_ests, clocks, lockeds, clock_times, bits_out) = dfe.run(t, ctle_out) bits_out = array(bits_out) auto_corr = 1. * correlate(bits_out[(nbits - eye_bits):], bits[(nbits - eye_bits):], mode='same') \ / sum(bits[(nbits - eye_bits):]) auto_corr = auto_corr[len(auto_corr) // 2 :] self.auto_corr = auto_corr bit_dly = where(auto_corr == max(auto_corr))[0][0] n_extra = len(bits) - len(bits_out) bit_errs = where(bits_out[(nbits - eye_bits + bit_dly):] ^ bits[(nbits - eye_bits) : len(bits_out) - bit_dly])[0] self.bit_errs = len(bit_errs) dfe_h = array([1.] + list(zeros(nspb - 1)) + sum([[-x] + list(zeros(nspb - 1)) for x in tap_weights[-1]], [])) dfe_h.resize(len(ctle_out_h)) temp = dfe_h.copy() temp.resize(len(w)) dfe_H = fft(temp) self.dfe_s = dfe_h.cumsum() dfe_out_H = ctle_out_H * dfe_H dfe_out_h = convolve(ctle_out_h, dfe_h)[:len(ctle_out_h)] dfe_out_s = dfe_out_h.cumsum() # self.dfe_out_p = self.dfe_out_s[nspui:] - self.dfe_out_s[:-nspui] self.dfe_out_p = dfe_out_s - pad(dfe_out_s[:-nspui], (nspui,0), 'constant', constant_values=(0,0)) self.dfe_H = dfe_H self.dfe_h = dfe_h self.dfe_out_H = dfe_out_H self.dfe_out_h = dfe_out_h self.dfe_out_s = dfe_out_s self.dfe_out = dfe_out self.dfe_perf = nbits * nspb / (clock() - split_time) split_time = clock() self.status = 'Analyzing jitter...(sweep %d of %d)' % (sweep_num, num_sweeps) # Analyze the jitter. if(mod_type == 1): # Handle duo-binary case. pattern_len *= 2 # Because, the XOR pre-coding can invert every other pattern rep. if(mod_type == 2): # Handle PAM-4 case. if(pattern_len % 2): pattern_len *= 2 # Because, the bits are taken in pairs, to form the symbols. # - channel output try: actual_xings = find_crossings(t, chnl_out, decision_scaler, mod_type=mod_type) (jitter, t_jitter, isi, dcd, pj, rj, jitter_ext, \ thresh, jitter_spectrum, jitter_ind_spectrum, spectrum_freqs, \ hist, hist_synth, bin_centers) = calc_jitter(ui, nui, pattern_len, ideal_xings, actual_xings, rel_thresh) self.t_jitter = t_jitter self.isi_chnl = isi self.dcd_chnl = dcd self.pj_chnl = pj self.rj_chnl = rj self.thresh_chnl = thresh self.jitter_chnl = hist self.jitter_ext_chnl = hist_synth self.jitter_bins = bin_centers self.jitter_spectrum_chnl = jitter_spectrum self.jitter_ind_spectrum_chnl = jitter_ind_spectrum self.f_MHz = array(spectrum_freqs) * 1.e-6 except: raise # - Tx output try: actual_xings = find_crossings(t, rx_in, decision_scaler, mod_type=mod_type) (jitter, t_jitter, isi, dcd, pj, rj, jitter_ext, \ thresh, jitter_spectrum, jitter_ind_spectrum, spectrum_freqs, \ hist, hist_synth, bin_centers) = calc_jitter(ui, nui, pattern_len, ideal_xings, actual_xings, rel_thresh) self.isi_tx = isi self.dcd_tx = dcd self.pj_tx = pj self.rj_tx = rj self.thresh_tx = thresh self.jitter_tx = hist self.jitter_ext_tx = hist_synth self.jitter_spectrum_tx = jitter_spectrum self.jitter_ind_spectrum_tx = jitter_ind_spectrum except: self.thresh_tx = array([]) self.jitter_ext_tx = array([]) self.jitter_tx = array([]) self.jitter_spectrum_tx = array([]) self.jitter_ind_spectrum_tx = array([]) # - CTLE output try: actual_xings = find_crossings(t, ctle_out, decision_scaler, mod_type=mod_type) (jitter, t_jitter, isi, dcd, pj, rj, jitter_ext, \ thresh, jitter_spectrum, jitter_ind_spectrum, spectrum_freqs, \ hist, hist_synth, bin_centers) = calc_jitter(ui, nui, pattern_len, ideal_xings, actual_xings, rel_thresh) self.isi_ctle = isi self.dcd_ctle = dcd self.pj_ctle = pj self.rj_ctle = rj self.thresh_ctle = thresh self.jitter_ctle = hist self.jitter_ext_ctle = hist_synth self.jitter_spectrum_ctle = jitter_spectrum self.jitter_ind_spectrum_ctle = jitter_ind_spectrum except: self.thresh_ctle = array([]) self.jitter_ext_ctle = array([]) self.jitter_ctle = array([]) self.jitter_spectrum_ctle = array([]) self.jitter_ind_spectrum_ctle = array([]) # - DFE output try: ignore_until = (nui - eye_uis) * ui + 0.75 * ui # 0.5 was causing an occasional misalignment. ideal_xings = array(filter(lambda x: x > ignore_until, list(ideal_xings))) min_delay = ignore_until + conv_dly actual_xings = find_crossings(t, dfe_out, decision_scaler, min_delay=min_delay, mod_type=mod_type, rising_first=False, ) (jitter, t_jitter, isi, dcd, pj, rj, jitter_ext, \ thresh, jitter_spectrum, jitter_ind_spectrum, spectrum_freqs, \ hist, hist_synth, bin_centers) = calc_jitter(ui, eye_uis, pattern_len, ideal_xings, actual_xings, rel_thresh) self.isi_dfe = isi self.dcd_dfe = dcd self.pj_dfe = pj self.rj_dfe = rj self.thresh_dfe = thresh self.jitter_dfe = hist self.jitter_ext_dfe = hist_synth self.jitter_spectrum_dfe = jitter_spectrum self.jitter_ind_spectrum_dfe = jitter_ind_spectrum self.f_MHz_dfe = array(spectrum_freqs) * 1.e-6 skip_factor = nbits / eye_bits ctle_spec = self.jitter_spectrum_ctle dfe_spec = self.jitter_spectrum_dfe self.jitter_rejection_ratio = zeros(len(dfe_spec)) except: raise self.thresh_dfe = array([]) self.jitter_ext_dfe = array([]) self.jitter_dfe = array([]) self.jitter_spectrum_dfe = array([]) self.jitter_ind_spectrum_dfe = array([]) self.f_MHz_dfe = array([]) self.jitter_rejection_ratio = array([]) self.jitter_perf = nbits * nspb / (clock() - split_time) self.total_perf = nbits * nspb / (clock() - start_time) split_time = clock() self.status = 'Updating plots...(sweep %d of %d)' % (sweep_num, num_sweeps) # Save local variables to class instance for state preservation, performing unit conversion where necessary. self.chnl_out = chnl_out self.adaptation = tap_weights self.ui_ests = array(ui_ests) * 1.e12 # (ps) self.clocks = clocks self.lockeds = lockeds self.clock_times = clock_times # Update plots. if(update_plots): update_results(self) if(not initial_run): update_eyes(self) self.plotting_perf = nbits * nspb / (clock() - split_time) self.status = 'Ready.'
# Plot updating
[docs]def update_results(self): """ Updates all plot data used by GUI. Args: self(PyBERT): Reference to an instance of the *PyBERT* class. """ # Copy globals into local namespace. ui = self.ui samps_per_ui = self.nspui eye_uis = self.eye_uis num_ui = self.nui clock_times = self.clock_times f = self.f t = self.t t_ns = self.t_ns t_ns_chnl = self.t_ns_chnl conv_dly_ix = self.conv_dly_ix n_taps = self.n_taps Ts = t[1] ignore_until = (num_ui - eye_uis) * ui ignore_samps = (num_ui - eye_uis) * samps_per_ui # Misc. f_GHz = f[:len(f) // 2] / 1.e9 len_f_GHz = len(f_GHz) self.plotdata.set_data("f_GHz", f_GHz[1:]) self.plotdata.set_data("t_ns", t_ns) self.plotdata.set_data("t_ns_chnl", t_ns_chnl) # DFE. tap_weights = transpose(array(self.adaptation)) i = 1 for tap_weight in tap_weights: self.plotdata.set_data("tap%d_weights" % i, tap_weight) i += 1 self.plotdata.set_data("tap_weight_index", range(len(tap_weight))) if(self._old_n_taps != n_taps): new_plot = Plot(self.plotdata, auto_colors=['red', 'orange', 'yellow', 'green', 'blue', 'purple'], padding_left=75) for i in range(self.n_taps): new_plot.plot(("tap_weight_index", "tap%d_weights" % (i + 1)), type="line", color="auto", name="tap%d"%(i+1)) new_plot.title = "DFE Adaptation" new_plot.tools.append(PanTool(new_plot, constrain=True, constrain_key=None, constrain_direction='x')) zoom9 = ZoomTool(new_plot, tool_mode="range", axis='index', always_on=False) new_plot.overlays.append(zoom9) new_plot.legend.visible = True new_plot.legend.align = 'ul' self.plots_dfe.remove(self._dfe_plot) self.plots_dfe.insert(1, new_plot) self._dfe_plot = new_plot self._old_n_taps = n_taps clock_pers = diff(clock_times) start_t = t[where(self.lockeds)[0][0]] start_ix = where(clock_times > start_t)[0][0] (bin_counts, bin_edges) = histogram(clock_pers[start_ix:], bins=100) bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2. clock_spec = fft(clock_pers[start_ix:]) clock_spec = abs(clock_spec[:len(clock_spec) / 2]) spec_freqs = arange(len(clock_spec)) / (2. * len(clock_spec)) # In this case, fNyquist = half the bit rate. clock_spec /= clock_spec[1:].mean() # Normalize the mean non-d.c. value to 0 dB. self.plotdata.set_data("clk_per_hist_bins", bin_centers * 1.e12) # (ps) self.plotdata.set_data("clk_per_hist_vals", bin_counts) self.plotdata.set_data("clk_spec", 10. * log10(clock_spec[1:])) # Omit the d.c. value. self.plotdata.set_data("clk_freqs", spec_freqs[1:]) self.plotdata.set_data("dfe_out", self.dfe_out) self.plotdata.set_data("ui_ests", self.ui_ests) self.plotdata.set_data("clocks", self.clocks) self.plotdata.set_data("lockeds", self.lockeds) # Impulse responses self.plotdata.set_data("chnl_h", self.chnl_h * 1.e-9 / Ts) # Re-normalize to (V/ns), for plotting. self.plotdata.set_data("tx_h", self.tx_h * 1.e-9 / Ts) self.plotdata.set_data("tx_out_h", self.tx_out_h * 1.e-9 / Ts) self.plotdata.set_data("ctle_h", self.ctle_h * 1.e-9 / Ts) self.plotdata.set_data("ctle_out_h", self.ctle_out_h * 1.e-9 / Ts) self.plotdata.set_data("dfe_h", self.dfe_h * 1.e-9 / Ts) self.plotdata.set_data("dfe_out_h", self.dfe_out_h * 1.e-9 / Ts) # Step responses self.plotdata.set_data("chnl_s", self.chnl_s) self.plotdata.set_data("tx_s", self.tx_s) self.plotdata.set_data("tx_out_s", self.tx_out_s) self.plotdata.set_data("ctle_s", self.ctle_s) self.plotdata.set_data("ctle_out_s", self.ctle_out_s) self.plotdata.set_data("dfe_s", self.dfe_s) self.plotdata.set_data("dfe_out_s", self.dfe_out_s) # Pulse responses self.plotdata.set_data("chnl_p", self.chnl_p) self.plotdata.set_data("tx_out_p", self.tx_out_p) self.plotdata.set_data("ctle_out_p", self.ctle_out_p) self.plotdata.set_data("dfe_out_p", self.dfe_out_p) # Outputs self.plotdata.set_data("ideal_signal", self.ideal_signal) self.plotdata.set_data("chnl_out", self.chnl_out) self.plotdata.set_data("tx_out", self.rx_in) self.plotdata.set_data("ctle_out", self.ctle_out) self.plotdata.set_data("dfe_out", self.dfe_out) self.plotdata.set_data("auto_corr", self.auto_corr) # Frequency responses self.plotdata.set_data("chnl_H", 20. * log10(abs(self.chnl_H [1 : len_f_GHz]))) self.plotdata.set_data("tx_H", 20. * log10(abs(self.tx_H [1 : len_f_GHz]))) self.plotdata.set_data("tx_out_H", 20. * log10(abs(self.tx_out_H [1 : len_f_GHz]))) self.plotdata.set_data("ctle_H", 20. * log10(abs(self.ctle_H [1 : len_f_GHz]))) self.plotdata.set_data("ctle_out_H", 20. * log10(abs(self.ctle_out_H[1 : len_f_GHz]))) self.plotdata.set_data("dfe_H", 20. * log10(abs(self.dfe_H [1 : len_f_GHz]))) self.plotdata.set_data("dfe_out_H", 20. * log10(abs(self.dfe_out_H [1 : len_f_GHz]))) # Jitter distributions jitter_ext_chnl = self.jitter_ext_chnl # These are used, again, in bathtub curve generation, below. jitter_ext_tx = self.jitter_ext_tx jitter_ext_ctle = self.jitter_ext_ctle jitter_ext_dfe = self.jitter_ext_dfe self.plotdata.set_data("jitter_bins", array(self.jitter_bins) * 1.e12) self.plotdata.set_data("jitter_chnl", self.jitter_chnl) self.plotdata.set_data("jitter_ext_chnl", jitter_ext_chnl) self.plotdata.set_data("jitter_tx", self.jitter_tx) self.plotdata.set_data("jitter_ext_tx", jitter_ext_tx) self.plotdata.set_data("jitter_ctle", self.jitter_ctle) self.plotdata.set_data("jitter_ext_ctle", jitter_ext_ctle) self.plotdata.set_data("jitter_dfe", self.jitter_dfe) self.plotdata.set_data("jitter_ext_dfe", jitter_ext_dfe) # Jitter spectrums log10_ui = log10(ui) self.plotdata.set_data("f_MHz", self.f_MHz[1:]) self.plotdata.set_data("f_MHz_dfe", self.f_MHz_dfe[1:]) self.plotdata.set_data("jitter_spectrum_chnl", 10. * (log10(self.jitter_spectrum_chnl [1:]) - log10_ui)) self.plotdata.set_data("jitter_ind_spectrum_chnl", 10. * (log10(self.jitter_ind_spectrum_chnl [1:]) - log10_ui)) self.plotdata.set_data("thresh_chnl", 10. * (log10(self.thresh_chnl [1:]) - log10_ui)) self.plotdata.set_data("jitter_spectrum_tx", 10. * (log10(self.jitter_spectrum_tx [1:]) - log10_ui)) self.plotdata.set_data("jitter_ind_spectrum_tx", 10. * (log10(self.jitter_ind_spectrum_tx [1:]) - log10_ui)) self.plotdata.set_data("thresh_tx", 10. * (log10(self.thresh_tx [1:]) - log10_ui)) self.plotdata.set_data("jitter_spectrum_ctle", 10. * (log10(self.jitter_spectrum_ctle [1:]) - log10_ui)) self.plotdata.set_data("jitter_ind_spectrum_ctle", 10. * (log10(self.jitter_ind_spectrum_ctle [1:]) - log10_ui)) self.plotdata.set_data("thresh_ctle", 10. * (log10(self.thresh_ctle [1:]) - log10_ui)) self.plotdata.set_data("jitter_spectrum_dfe", 10. * (log10(self.jitter_spectrum_dfe [1:]) - log10_ui)) self.plotdata.set_data("jitter_ind_spectrum_dfe", 10. * (log10(self.jitter_ind_spectrum_dfe [1:]) - log10_ui)) self.plotdata.set_data("thresh_dfe", 10. * (log10(self.thresh_dfe [1:]) - log10_ui)) self.plotdata.set_data("jitter_rejection_ratio", self.jitter_rejection_ratio[1:]) # Bathtubs half_len = len(jitter_ext_chnl) / 2 # - Channel bathtub_chnl = list(cumsum(jitter_ext_chnl[-1 : -(half_len + 1) : -1])) bathtub_chnl.reverse() bathtub_chnl = array(bathtub_chnl + list(cumsum(jitter_ext_chnl[:half_len + 1]))) bathtub_chnl = where(bathtub_chnl < MIN_BATHTUB_VAL, 0.1 * MIN_BATHTUB_VAL * ones(len(bathtub_chnl)), bathtub_chnl) # To avoid Chaco log scale plot wierdness. self.plotdata.set_data("bathtub_chnl", log10(bathtub_chnl)) # - Tx bathtub_tx = list(cumsum(jitter_ext_tx[-1 : -(half_len + 1) : -1])) bathtub_tx.reverse() bathtub_tx = array(bathtub_tx + list(cumsum(jitter_ext_tx[:half_len + 1]))) bathtub_tx = where(bathtub_tx < MIN_BATHTUB_VAL, 0.1 * MIN_BATHTUB_VAL * ones(len(bathtub_tx)), bathtub_tx) # To avoid Chaco log scale plot wierdness. self.plotdata.set_data("bathtub_tx", log10(bathtub_tx)) # - CTLE bathtub_ctle = list(cumsum(jitter_ext_ctle[-1 : -(half_len + 1) : -1])) bathtub_ctle.reverse() bathtub_ctle = array(bathtub_ctle + list(cumsum(jitter_ext_ctle[:half_len + 1]))) bathtub_ctle = where(bathtub_ctle < MIN_BATHTUB_VAL, 0.1 * MIN_BATHTUB_VAL * ones(len(bathtub_ctle)), bathtub_ctle) # To avoid Chaco log scale plot wierdness. self.plotdata.set_data("bathtub_ctle", log10(bathtub_ctle)) # - DFE bathtub_dfe = list(cumsum(jitter_ext_dfe[-1 : -(half_len + 1) : -1])) bathtub_dfe.reverse() bathtub_dfe = array(bathtub_dfe + list(cumsum(jitter_ext_dfe[:half_len + 1]))) bathtub_dfe = where(bathtub_dfe < MIN_BATHTUB_VAL, 0.1 * MIN_BATHTUB_VAL * ones(len(bathtub_dfe)), bathtub_dfe) # To avoid Chaco log scale plot wierdness. self.plotdata.set_data("bathtub_dfe", log10(bathtub_dfe)) # Eyes width = 2 * samps_per_ui xs = linspace(-ui * 1.e12, ui * 1.e12, width) height = 100 y_max = 1.1 * max(abs(array(self.chnl_out))) eye_chnl = calc_eye(ui, samps_per_ui, height, self.chnl_out[ignore_samps:], y_max) y_max = 1.1 * max(abs(array(self.rx_in))) eye_tx = calc_eye(ui, samps_per_ui, height, self.rx_in[ignore_samps:], y_max) y_max = 1.1 * max(abs(array(self.ctle_out))) eye_ctle = calc_eye(ui, samps_per_ui, height, self.ctle_out[ignore_samps:], y_max) i = 0 while(clock_times[i] <= ignore_until): i += 1 assert i < len(clock_times), "ERROR: Insufficient coverage in 'clock_times' vector." y_max = 1.1 * max(abs(array(self.dfe_out))) eye_dfe = calc_eye(ui, samps_per_ui, height, self.dfe_out, y_max, clock_times[i:]) self.plotdata.set_data("eye_index", xs) self.plotdata.set_data("eye_chnl", eye_chnl) self.plotdata.set_data("eye_tx", eye_tx) self.plotdata.set_data("eye_ctle", eye_ctle) self.plotdata.set_data("eye_dfe", eye_dfe)
[docs]def update_eyes(self): """ Update the heat plots representing the eye diagrams. Args: self(PyBERT): Reference to an instance of the *PyBERT* class. """ ui = self.ui samps_per_ui = self.nspui width = 2 * samps_per_ui height = 100 xs = linspace(-ui * 1.e12, ui * 1.e12, width) y_max = 1.1 * max(abs(array(self.chnl_out))) ys = linspace(-y_max, y_max, height) self.plots_eye.components[0].components[0].index.set_data(xs, ys) self.plots_eye.components[0].x_axis.mapper.range.low = xs[0] self.plots_eye.components[0].x_axis.mapper.range.high = xs[-1] self.plots_eye.components[0].y_axis.mapper.range.low = ys[0] self.plots_eye.components[0].y_axis.mapper.range.high = ys[-1] self.plots_eye.components[0].invalidate_draw() y_max = 1.1 * max(abs(array(self.rx_in))) ys = linspace(-y_max, y_max, height) self.plots_eye.components[1].components[0].index.set_data(xs, ys) self.plots_eye.components[1].x_axis.mapper.range.low = xs[0] self.plots_eye.components[1].x_axis.mapper.range.high = xs[-1] self.plots_eye.components[1].y_axis.mapper.range.low = ys[0] self.plots_eye.components[1].y_axis.mapper.range.high = ys[-1] self.plots_eye.components[1].invalidate_draw() y_max = 1.1 * max(abs(array(self.dfe_out))) ys = linspace(-y_max, y_max, height) self.plots_eye.components[3].components[0].index.set_data(xs, ys) self.plots_eye.components[3].x_axis.mapper.range.low = xs[0] self.plots_eye.components[3].x_axis.mapper.range.high = xs[-1] self.plots_eye.components[3].y_axis.mapper.range.low = ys[0] self.plots_eye.components[3].y_axis.mapper.range.high = ys[-1] self.plots_eye.components[3].invalidate_draw() self.plots_eye.components[2].components[0].index.set_data(xs, ys) self.plots_eye.components[2].x_axis.mapper.range.low = xs[0] self.plots_eye.components[2].x_axis.mapper.range.high = xs[-1] self.plots_eye.components[2].y_axis.mapper.range.low = ys[0] self.plots_eye.components[2].y_axis.mapper.range.high = ys[-1] self.plots_eye.components[2].invalidate_draw() self.plots_eye.request_redraw()