"""
Default controller definition for PyBERT class.
Original author: David Banas <capn.freako@gmail.com>
Original date: August 24, 2014 (Copied from `pybert.py', as part of a major code cleanup.)
Copyright (c) 2014 David Banas; all rights reserved World wide.
"""
from time import clock, sleep
from chaco.api import Plot
from chaco.tools.api import PanTool, ZoomTool
from numpy import sign, sin, pi, array, linspace, zeros, ones, repeat, where, sqrt, histogram, arange, append
from numpy import diff, log10, correlate, convolve, mean, resize, real, transpose, cumsum, diff, std, pad
from numpy.random import normal
from numpy.fft import fft, ifft
from scipy.signal import lfilter, iirfilter, freqz, fftconvolve
from dfe import DFE
from cdr import CDR
from pyibisami.amimodel import AMIModel, AMIModelInitializer
from pybert_util import find_crossings, make_ctle, calc_jitter, moving_average, calc_eye, import_qucs_csv
DEBUG = False
MIN_BATHTUB_VAL = 1.e-18
gFc = 1.e6 # Corner frequency of high-pass filter used to model capacitive coupling of periodic noise.
[docs]def my_run_sweeps(self):
"""
Runs the simulation sweeps.
Args:
self(PyBERT): Reference to an instance of the *PyBERT* class.
"""
sweep_aves = self.sweep_aves
do_sweep = self.do_sweep
tx_taps = self.tx_taps
if(do_sweep):
# Assemble the list of desired values for each sweepable parameter.
sweep_vals = []
for tap in tx_taps:
if(tap.enabled):
if(tap.steps):
sweep_vals.append(list(arange(tap.min_val, tap.max_val, (tap.max_val - tap.min_val) / tap.steps)))
else:
sweep_vals.append([tap.value])
else:
sweep_vals.append([0.])
# Run the sweep, using the lists assembled, above.
sweeps = [[w, x, y, z] for w in sweep_vals[0]
for x in sweep_vals[1]
for y in sweep_vals[2]
for z in sweep_vals[3]
]
num_sweeps = sweep_aves * len(sweeps)
self.num_sweeps = num_sweeps
sweep_results = []
sweep_num = 1
for sweep in sweeps:
for i in range(4):
self.tx_taps[i].value = sweep[i]
bit_errs = []
for i in range(sweep_aves):
self.sweep_num = sweep_num
my_run_simulation(self, update_plots=False)
bit_errs.append(self.bit_errs)
sweep_num += 1
sweep_results.append((sweep, mean(bit_errs), std(bit_errs)))
self.sweep_results = sweep_results
else:
my_run_simulation(self)
[docs]def my_run_simulation(self, initial_run=False, update_plots=True):
"""
Runs the simulation.
Args:
self(PyBERT): Reference to an instance of the *PyBERT* class.
initial_run(Bool): If True, don't update the eye diagrams, since
they haven't been created, yet. (Optional; default = False.)
update_plots(Bool): If True, update the plots, after simulation
completes. This option can be used by larger scripts, which
import *pybert*, in order to avoid graphical back-end
conflicts and speed up this function's execution time.
(Optional; default = True.)
"""
num_sweeps = self.num_sweeps
sweep_num = self.sweep_num
start_time = clock()
self.status = 'Running channel...(sweep %d of %d)' % (sweep_num, num_sweeps)
self.run_count += 1 # Force regeneration of bit stream.
# Pull class variables into local storage, performing unit conversion where necessary.
t = self.t
t_ns = self.t_ns
f = self.f
w = self.w
bits = self.bits
symbols = self.symbols
ffe = self.ffe
nbits = self.nbits
nui = self.nui
bit_rate = self.bit_rate * 1.e9
eye_bits = self.eye_bits
eye_uis = self.eye_uis
nspb = self.nspb
nspui = self.nspui
rn = self.rn
pn_mag = self.pn_mag
pn_freq = self.pn_freq * 1.e6
Vod = self.vod
Rs = self.rs
Cs = self.cout * 1.e-12
RL = self.rin
CL = self.cac * 1.e-6
Cp = self.cin * 1.e-12
R0 = self.R0
w0 = self.w0
Rdc = self.Rdc
Z0 = self.Z0
v0 = self.v0 * 3.e8
Theta0 = self.Theta0
l_ch = self.l_ch
pattern_len = self.pattern_len
rx_bw = self.rx_bw * 1.e9
peak_freq = self.peak_freq * 1.e9
peak_mag = self.peak_mag
ctle_offset = self.ctle_offset
ctle_mode = self.ctle_mode
delta_t = self.delta_t * 1.e-12
alpha = self.alpha
ui = self.ui
n_taps = self.n_taps
gain = self.gain
n_ave = self.n_ave
decision_scaler = self.decision_scaler
n_lock_ave = self.n_lock_ave
rel_lock_tol = self.rel_lock_tol
lock_sustain = self.lock_sustain
bandwidth = self.sum_bw * 1.e9
rel_thresh = self.thresh
mod_type = self.mod_type[0]
# Calculate misc. values.
fs = bit_rate * nspb
Ts = t[1]
ts = Ts
# Generate the ideal over-sampled signal.
#
# Duo-binary is problematic, in that it requires convolution with the ideal duobinary
# impulse response, in order to produce the proper ideal signal.
x = repeat(symbols, nspui)
self.x = x
if(mod_type == 1): # Handle duo-binary case.
duob_h = array(([0.5] + [0.] * (nspui - 1)) * 2)
x = convolve(x, duob_h)[:len(t)]
self.ideal_signal = x
# Find the ideal crossing times, for subsequent jitter analysis of transmitted signal.
ideal_xings = find_crossings(t, x, decision_scaler, min_delay=(ui / 2.), mod_type=mod_type)
self.ideal_xings = ideal_xings
# Calculate the channel output.
#
# Note: We're not using 'self.ideal_signal', because we rely on the system response to
# create the duobinary waveform. We only create it explicitly, above,
# so that we'll have an ideal reference for comparison.
chnl_h = self.calc_chnl_h()
chnl_out = convolve(self.x, chnl_h)[:len(x)]
self.channel_perf = nbits * nspb / (clock() - start_time)
split_time = clock()
self.status = 'Running Tx...(sweep %d of %d)' % (sweep_num, num_sweeps)
# Generate the output from, and the incremental/cumulative impulse/step/frequency responses of, the Tx.
if(self.tx_use_ami):
tx_cfg = self._tx_cfg # Grab the 'AMIParamConfigurator' instance for this model.
# Get the model invoked and initialized, except for 'channel_response', which
# we need to do several different ways, in order to gather all the data we need.
tx_param_dict = tx_cfg.input_ami_params
tx_model_init = AMIModelInitializer(tx_param_dict)
tx_model_init.sample_interval = ts # Must be set, before 'channel_response'!
tx_model_init.channel_response = [1.] + [0.] * (len(chnl_h) - 1) # Start with a delta function, to capture the model's impulse response.
tx_model_init.bit_time = ui
tx_model = AMIModel(self.tx_dll_file)
tx_model.initialize(tx_model_init)
self.log("Tx IBIS-AMI model initialization results:\nInput parameters: {}\nOutput parameters: {}\nMessage: {}".format(
tx_model.ami_params_in, tx_model.ami_params_out, tx_model.msg))
if(tx_cfg.fetch_param_val(['Reserved_Parameters', 'Init_Returns_Impulse'])):
tx_h = array(tx_model.initOut)
elif(not tx_cfg.fetch_param_val(['Reserved_Parameters', 'GetWave_Exists'])):
error("ERROR: Both 'Init_Returns_Impulse' and 'GetWave_Exists' are False!\n \
I cannot continue.\nThis condition is supposed to be caught sooner in the flow.")
self.status = "Simulation Error."
return
elif(not self.tx_use_getwave):
error("ERROR: You have elected not to use GetWave for a model, which does not return an impulse response!\n \
I cannot continue.\nPlease, select 'Use GetWave' and try again.", 'PyBERT Alert')
self.status = "Simulation Error."
return
if(self.tx_use_getwave):
# For GetWave, use a step to extract the model's native properties.
# Position the input edge at the center of the vector, in
# order to minimize high frequency artifactual energy
# introduced by frequency domain processing in some models.
half_len = len(chnl_h) // 2
tx_s = tx_model.getWave([0.] * half_len + [1.] * half_len)
# Shift the result back to the correct location, extending the last sample.
tx_s = pad(tx_s[half_len:], (0, half_len), 'edge')
tx_h = diff(tx_s)
tx_out = tx_model.getWave(x)
else:
tx_s = tx_h.cumsum()
tx_out = convolve(tx_h, self.x)[:len(self.x)]
else:
# - Generate the ideal, post-preemphasis signal.
# To consider: use 'scipy.interp()'. This is what Mark does, in order to induce jitter in the Tx output.
ffe_out = convolve(symbols, ffe)[:len(symbols)]
self.rel_power = mean(ffe_out ** 2) # Store the relative average power dissipated in the Tx.
tx_out = repeat(ffe_out, nspui) # oversampled output
# - Calculate the responses.
# - (The Tx is unique in that the calculated responses aren't used to form the output.
# This is partly due to the out of order nature in which we combine the Tx and channel,
# and partly due to the fact that we're adding noise to the Tx output.)
tx_h = array(sum([[x] + list(zeros(nspui - 1)) for x in ffe], [])) # Using sum to concatenate.
tx_h.resize(len(chnl_h))
tx_s = tx_h.cumsum()
temp = tx_h.copy()
temp.resize(len(w))
tx_H = fft(temp)
tx_H *= tx_s[-1] / abs(tx_H[0])
# - Generate the uncorrelated periodic noise. (Assume capacitive coupling.)
# - Generate the ideal rectangular aggressor waveform.
pn_period = 1. / pn_freq
pn_samps = int(pn_period / Ts + 0.5)
pn = zeros(pn_samps)
pn[pn_samps // 2:] = pn_mag
pn = resize(pn, len(tx_out))
# - High pass filter it. (Simulating capacitive coupling.)
(b, a) = iirfilter(2, gFc/(fs/2), btype='highpass')
pn = lfilter(b, a, pn)[:len(pn)]
# - Add the uncorrelated periodic and random noise to the Tx output.
tx_out += pn
tx_out += normal(scale=rn, size=(len(tx_out),))
# - Convolve w/ channel.
tx_out_h = convolve(tx_h, chnl_h)[:len(chnl_h)]
temp = tx_out_h.copy()
temp.resize(len(w))
tx_out_H = fft(temp)
rx_in = convolve(tx_out, chnl_h)[:len(tx_out)]
self.tx_s = tx_s
self.tx_out = tx_out
self.rx_in = rx_in
self.tx_out_s = tx_out_h.cumsum()
self.tx_out_p = self.tx_out_s[nspui:] - self.tx_out_s[:-nspui]
self.tx_H = tx_H
self.tx_h = tx_h
self.tx_out_H = tx_out_H
self.tx_out_h = tx_out_h
self.tx_perf = nbits * nspb / (clock() - split_time)
split_time = clock()
self.status = 'Running CTLE...(sweep %d of %d)' % (sweep_num, num_sweeps)
# Generate the output from, and the incremental/cumulative impulse/step/frequency responses of, the CTLE.
if(self.rx_use_ami):
rx_cfg = self._rx_cfg # Grab the 'AMIParamConfigurator' instance for this model.
# Get the model invoked and initialized, except for 'channel_response', which
# we need to do several different ways, in order to gather all the data we need.
rx_param_dict = rx_cfg.input_ami_params
rx_model_init = AMIModelInitializer(rx_param_dict)
rx_model_init.sample_interval = ts # Must be set, before 'channel_response'!
rx_model_init.channel_response = [1.] + [0.] * (len(chnl_h) - 1) # Start with a delta function, to capture the model's impulse response.
rx_model_init.bit_time = ui
rx_model = AMIModel(self.rx_dll_file)
rx_model.initialize(rx_model_init)
self.log("Rx IBIS-AMI model initialization results:\nInput parameters: {}\nOutput parameters: {}\nMessage: {}".format(
rx_model.ami_params_in, rx_model.ami_params_out, rx_model.msg))
if(rx_cfg.fetch_param_val(['Reserved_Parameters', 'Init_Returns_Impulse'])):
ctle_h = array(rx_model.initOut)
elif(not rx_cfg.fetch_param_val(['Reserved_Parameters', 'GetWave_Exists'])):
error("ERROR: Both 'Init_Returns_Impulse' and 'GetWave_Exists' are False!\n \
I cannot continue.\nThis condition is supposed to be caught sooner in the flow.")
self.status = "Simulation Error."
return
elif(not self.rx_use_getwave):
error("ERROR: You have elected not to use GetWave for a model, which does not return an impulse response!\n \
I cannot continue.\nPlease, select 'Use GetWave' and try again.", 'PyBERT Alert')
self.status = "Simulation Error."
return
if(self.rx_use_getwave):
# For GetWave, use a step to extract the model's native properties.
# In case the model is adaptive, place this step at the end
# of the actual signal vector. And make it large amplitude
# and d.c. balanced, to avoid triggering d.c. blocking
# behavior, or DFE oscillation.
# Tacking on the step, composed of step_len '0's followed by step_len '1's.
step_len = 64
step_mag = 0.2
wave_in = append(rx_in.copy(), array([-step_mag / 2.] * step_len * nspui + [step_mag / 2.] * step_len * nspui))
input_len = len(wave_in)
row_size = 128 # Some models are persnickity about integral power of 2.
idx = 0 # Holds the starting index of the next processing chunk.
wave_out = []
while(idx < input_len):
if((input_len - idx) >= row_size):
wave_out.extend(rx_model.getWave(wave_in[idx : idx + row_size], row_size))
else:
wave_out.extend(rx_model.getWave(wave_in[idx :].resize(row_size), row_size))
self.log("AMI output parameters from GetWave call:\n{}".format(rx_model.ami_params_out))
idx += row_size
ctle_out = array(wave_out[:-2 * step_len * nspui])
ctle_s = array(wave_out[-step_len * nspui:])
ctle_s = (ctle_s - ctle_s[0]) / step_mag # Compensating for choice of input amplitude/offset.
# Calculate the associated impulse response.
ctle_h = diff(ctle_s)
ctle_h.resize(len(t))
else:
ctle_s = ctle_h.cumsum()
ctle_out = convolve(ctle_h, rx_in)[:len(rx_in)]
ctle_h.resize(len(t))
ctle_H = fft(ctle_h)
ctle_H *= ctle_s[-1] / abs(ctle_H[0])
else:
if(self.use_ctle_file):
ctle_h = import_qucs_csv(self.ctle_file, ts)
if(max(abs(ctle_h)) < 100.): # step response?
ctle_h = diff(ctle_h) # impulse response is derivative of step response.
else:
ctle_h *= ts # Normalize to (V/sample)
ctle_h.resize(len(t))
ctle_H = fft(ctle_h)
ctle_H *= sum(ctle_h) / ctle_H[0]
else:
w_dummy, ctle_H = make_ctle(rx_bw, peak_freq, peak_mag, w, ctle_mode, ctle_offset)
ctle_h = real(ifft(ctle_H))[:len(chnl_h)]
ctle_h *= abs(ctle_H[0]) / sum(ctle_h)
ctle_out = convolve(rx_in, ctle_h)[:len(tx_out)]
ctle_out -= mean(ctle_out) # Force zero mean.
if(self.ctle_mode == 'AGC'): # Automatic gain control engaged?
ctle_out *= 2. * decision_scaler / ctle_out.ptp()
ctle_s = ctle_h.cumsum()
self.ctle_s = ctle_s
ctle_out_h = convolve(tx_out_h, ctle_h)[:len(tx_out_h)]
ctle_out_h_main_lobe = where(ctle_out_h >= max(ctle_out_h) / 2.)[0]
if(len(ctle_out_h_main_lobe)):
conv_dly_ix = ctle_out_h_main_lobe[0]
else:
conv_dly_ix = self.chnl_dly / Ts
conv_dly = t[conv_dly_ix]
ctle_out_s = ctle_out_h.cumsum()
temp = ctle_out_h.copy()
temp.resize(len(w))
ctle_out_H = fft(temp)
# self.rel_opt = -self.cost # Triggers update to EQ tuning plot data.
# - Store local variables to class instance.
self.ctle_out_s = ctle_out_s
# Consider changing this; it could be sensitive to insufficient "front porch" in the CTLE output step response.
self.ctle_out_p = self.ctle_out_s[nspui:] - self.ctle_out_s[:-nspui]
self.ctle_H = ctle_H
self.ctle_h = ctle_h
self.ctle_out_H = ctle_out_H
self.ctle_out_h = ctle_out_h
self.ctle_out = ctle_out
self.conv_dly = conv_dly
self.conv_dly_ix = conv_dly_ix
self.ctle_perf = nbits * nspb / (clock() - split_time)
split_time = clock()
self.status = 'Running DFE/CDR...(sweep %d of %d)' % (sweep_num, num_sweeps)
# Generate the output from, and the incremental/cumulative impulse/step/frequency responses of, the DFE.
if(self.use_dfe):
dfe = DFE(n_taps, gain, delta_t, alpha, ui, nspui, decision_scaler, mod_type,
n_ave=n_ave, n_lock_ave=n_lock_ave, rel_lock_tol=rel_lock_tol, lock_sustain=lock_sustain,
bandwidth=bandwidth, ideal=self.sum_ideal)
else:
dfe = DFE(n_taps, 0., delta_t, alpha, ui, nspui, decision_scaler, mod_type,
n_ave=n_ave, n_lock_ave=n_lock_ave, rel_lock_tol=rel_lock_tol, lock_sustain=lock_sustain,
bandwidth=bandwidth, ideal=True)
(dfe_out, tap_weights, ui_ests, clocks, lockeds, clock_times, bits_out) = dfe.run(t, ctle_out)
bits_out = array(bits_out)
auto_corr = 1. * correlate(bits_out[(nbits - eye_bits):], bits[(nbits - eye_bits):], mode='same') \
/ sum(bits[(nbits - eye_bits):])
auto_corr = auto_corr[len(auto_corr) // 2 :]
self.auto_corr = auto_corr
bit_dly = where(auto_corr == max(auto_corr))[0][0]
n_extra = len(bits) - len(bits_out)
bit_errs = where(bits_out[(nbits - eye_bits + bit_dly):] ^ bits[(nbits - eye_bits) : len(bits_out) - bit_dly])[0]
self.bit_errs = len(bit_errs)
dfe_h = array([1.] + list(zeros(nspb - 1)) + sum([[-x] + list(zeros(nspb - 1)) for x in tap_weights[-1]], []))
dfe_h.resize(len(ctle_out_h))
temp = dfe_h.copy()
temp.resize(len(w))
dfe_H = fft(temp)
self.dfe_s = dfe_h.cumsum()
dfe_out_H = ctle_out_H * dfe_H
dfe_out_h = convolve(ctle_out_h, dfe_h)[:len(ctle_out_h)]
dfe_out_s = dfe_out_h.cumsum()
# self.dfe_out_p = self.dfe_out_s[nspui:] - self.dfe_out_s[:-nspui]
self.dfe_out_p = dfe_out_s - pad(dfe_out_s[:-nspui], (nspui,0), 'constant', constant_values=(0,0))
self.dfe_H = dfe_H
self.dfe_h = dfe_h
self.dfe_out_H = dfe_out_H
self.dfe_out_h = dfe_out_h
self.dfe_out_s = dfe_out_s
self.dfe_out = dfe_out
self.dfe_perf = nbits * nspb / (clock() - split_time)
split_time = clock()
self.status = 'Analyzing jitter...(sweep %d of %d)' % (sweep_num, num_sweeps)
# Analyze the jitter.
if(mod_type == 1): # Handle duo-binary case.
pattern_len *= 2 # Because, the XOR pre-coding can invert every other pattern rep.
if(mod_type == 2): # Handle PAM-4 case.
if(pattern_len % 2):
pattern_len *= 2 # Because, the bits are taken in pairs, to form the symbols.
# - channel output
try:
actual_xings = find_crossings(t, chnl_out, decision_scaler, mod_type=mod_type)
(jitter, t_jitter, isi, dcd, pj, rj, jitter_ext, \
thresh, jitter_spectrum, jitter_ind_spectrum, spectrum_freqs, \
hist, hist_synth, bin_centers) = calc_jitter(ui, nui, pattern_len, ideal_xings, actual_xings, rel_thresh)
self.t_jitter = t_jitter
self.isi_chnl = isi
self.dcd_chnl = dcd
self.pj_chnl = pj
self.rj_chnl = rj
self.thresh_chnl = thresh
self.jitter_chnl = hist
self.jitter_ext_chnl = hist_synth
self.jitter_bins = bin_centers
self.jitter_spectrum_chnl = jitter_spectrum
self.jitter_ind_spectrum_chnl = jitter_ind_spectrum
self.f_MHz = array(spectrum_freqs) * 1.e-6
except:
raise
# - Tx output
try:
actual_xings = find_crossings(t, rx_in, decision_scaler, mod_type=mod_type)
(jitter, t_jitter, isi, dcd, pj, rj, jitter_ext, \
thresh, jitter_spectrum, jitter_ind_spectrum, spectrum_freqs, \
hist, hist_synth, bin_centers) = calc_jitter(ui, nui, pattern_len, ideal_xings, actual_xings, rel_thresh)
self.isi_tx = isi
self.dcd_tx = dcd
self.pj_tx = pj
self.rj_tx = rj
self.thresh_tx = thresh
self.jitter_tx = hist
self.jitter_ext_tx = hist_synth
self.jitter_spectrum_tx = jitter_spectrum
self.jitter_ind_spectrum_tx = jitter_ind_spectrum
except:
self.thresh_tx = array([])
self.jitter_ext_tx = array([])
self.jitter_tx = array([])
self.jitter_spectrum_tx = array([])
self.jitter_ind_spectrum_tx = array([])
# - CTLE output
try:
actual_xings = find_crossings(t, ctle_out, decision_scaler, mod_type=mod_type)
(jitter, t_jitter, isi, dcd, pj, rj, jitter_ext, \
thresh, jitter_spectrum, jitter_ind_spectrum, spectrum_freqs, \
hist, hist_synth, bin_centers) = calc_jitter(ui, nui, pattern_len, ideal_xings, actual_xings, rel_thresh)
self.isi_ctle = isi
self.dcd_ctle = dcd
self.pj_ctle = pj
self.rj_ctle = rj
self.thresh_ctle = thresh
self.jitter_ctle = hist
self.jitter_ext_ctle = hist_synth
self.jitter_spectrum_ctle = jitter_spectrum
self.jitter_ind_spectrum_ctle = jitter_ind_spectrum
except:
self.thresh_ctle = array([])
self.jitter_ext_ctle = array([])
self.jitter_ctle = array([])
self.jitter_spectrum_ctle = array([])
self.jitter_ind_spectrum_ctle = array([])
# - DFE output
try:
ignore_until = (nui - eye_uis) * ui + 0.75 * ui # 0.5 was causing an occasional misalignment.
ideal_xings = array(filter(lambda x: x > ignore_until, list(ideal_xings)))
min_delay = ignore_until + conv_dly
actual_xings = find_crossings(t, dfe_out, decision_scaler, min_delay=min_delay,
mod_type=mod_type,
rising_first=False,
)
(jitter, t_jitter, isi, dcd, pj, rj, jitter_ext, \
thresh, jitter_spectrum, jitter_ind_spectrum, spectrum_freqs, \
hist, hist_synth, bin_centers) = calc_jitter(ui, eye_uis, pattern_len, ideal_xings, actual_xings, rel_thresh)
self.isi_dfe = isi
self.dcd_dfe = dcd
self.pj_dfe = pj
self.rj_dfe = rj
self.thresh_dfe = thresh
self.jitter_dfe = hist
self.jitter_ext_dfe = hist_synth
self.jitter_spectrum_dfe = jitter_spectrum
self.jitter_ind_spectrum_dfe = jitter_ind_spectrum
self.f_MHz_dfe = array(spectrum_freqs) * 1.e-6
skip_factor = nbits / eye_bits
ctle_spec = self.jitter_spectrum_ctle
dfe_spec = self.jitter_spectrum_dfe
self.jitter_rejection_ratio = zeros(len(dfe_spec))
except:
raise
self.thresh_dfe = array([])
self.jitter_ext_dfe = array([])
self.jitter_dfe = array([])
self.jitter_spectrum_dfe = array([])
self.jitter_ind_spectrum_dfe = array([])
self.f_MHz_dfe = array([])
self.jitter_rejection_ratio = array([])
self.jitter_perf = nbits * nspb / (clock() - split_time)
self.total_perf = nbits * nspb / (clock() - start_time)
split_time = clock()
self.status = 'Updating plots...(sweep %d of %d)' % (sweep_num, num_sweeps)
# Save local variables to class instance for state preservation, performing unit conversion where necessary.
self.chnl_out = chnl_out
self.adaptation = tap_weights
self.ui_ests = array(ui_ests) * 1.e12 # (ps)
self.clocks = clocks
self.lockeds = lockeds
self.clock_times = clock_times
# Update plots.
if(update_plots):
update_results(self)
if(not initial_run):
update_eyes(self)
self.plotting_perf = nbits * nspb / (clock() - split_time)
self.status = 'Ready.'
# Plot updating
[docs]def update_results(self):
"""
Updates all plot data used by GUI.
Args:
self(PyBERT): Reference to an instance of the *PyBERT* class.
"""
# Copy globals into local namespace.
ui = self.ui
samps_per_ui = self.nspui
eye_uis = self.eye_uis
num_ui = self.nui
clock_times = self.clock_times
f = self.f
t = self.t
t_ns = self.t_ns
t_ns_chnl = self.t_ns_chnl
conv_dly_ix = self.conv_dly_ix
n_taps = self.n_taps
Ts = t[1]
ignore_until = (num_ui - eye_uis) * ui
ignore_samps = (num_ui - eye_uis) * samps_per_ui
# Misc.
f_GHz = f[:len(f) // 2] / 1.e9
len_f_GHz = len(f_GHz)
self.plotdata.set_data("f_GHz", f_GHz[1:])
self.plotdata.set_data("t_ns", t_ns)
self.plotdata.set_data("t_ns_chnl", t_ns_chnl)
# DFE.
tap_weights = transpose(array(self.adaptation))
i = 1
for tap_weight in tap_weights:
self.plotdata.set_data("tap%d_weights" % i, tap_weight)
i += 1
self.plotdata.set_data("tap_weight_index", range(len(tap_weight)))
if(self._old_n_taps != n_taps):
new_plot = Plot(self.plotdata, auto_colors=['red', 'orange', 'yellow', 'green', 'blue', 'purple'], padding_left=75)
for i in range(self.n_taps):
new_plot.plot(("tap_weight_index", "tap%d_weights" % (i + 1)), type="line", color="auto", name="tap%d"%(i+1))
new_plot.title = "DFE Adaptation"
new_plot.tools.append(PanTool(new_plot, constrain=True, constrain_key=None, constrain_direction='x'))
zoom9 = ZoomTool(new_plot, tool_mode="range", axis='index', always_on=False)
new_plot.overlays.append(zoom9)
new_plot.legend.visible = True
new_plot.legend.align = 'ul'
self.plots_dfe.remove(self._dfe_plot)
self.plots_dfe.insert(1, new_plot)
self._dfe_plot = new_plot
self._old_n_taps = n_taps
clock_pers = diff(clock_times)
start_t = t[where(self.lockeds)[0][0]]
start_ix = where(clock_times > start_t)[0][0]
(bin_counts, bin_edges) = histogram(clock_pers[start_ix:], bins=100)
bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2.
clock_spec = fft(clock_pers[start_ix:])
clock_spec = abs(clock_spec[:len(clock_spec) / 2])
spec_freqs = arange(len(clock_spec)) / (2. * len(clock_spec)) # In this case, fNyquist = half the bit rate.
clock_spec /= clock_spec[1:].mean() # Normalize the mean non-d.c. value to 0 dB.
self.plotdata.set_data("clk_per_hist_bins", bin_centers * 1.e12) # (ps)
self.plotdata.set_data("clk_per_hist_vals", bin_counts)
self.plotdata.set_data("clk_spec", 10. * log10(clock_spec[1:])) # Omit the d.c. value.
self.plotdata.set_data("clk_freqs", spec_freqs[1:])
self.plotdata.set_data("dfe_out", self.dfe_out)
self.plotdata.set_data("ui_ests", self.ui_ests)
self.plotdata.set_data("clocks", self.clocks)
self.plotdata.set_data("lockeds", self.lockeds)
# Impulse responses
self.plotdata.set_data("chnl_h", self.chnl_h * 1.e-9 / Ts) # Re-normalize to (V/ns), for plotting.
self.plotdata.set_data("tx_h", self.tx_h * 1.e-9 / Ts)
self.plotdata.set_data("tx_out_h", self.tx_out_h * 1.e-9 / Ts)
self.plotdata.set_data("ctle_h", self.ctle_h * 1.e-9 / Ts)
self.plotdata.set_data("ctle_out_h", self.ctle_out_h * 1.e-9 / Ts)
self.plotdata.set_data("dfe_h", self.dfe_h * 1.e-9 / Ts)
self.plotdata.set_data("dfe_out_h", self.dfe_out_h * 1.e-9 / Ts)
# Step responses
self.plotdata.set_data("chnl_s", self.chnl_s)
self.plotdata.set_data("tx_s", self.tx_s)
self.plotdata.set_data("tx_out_s", self.tx_out_s)
self.plotdata.set_data("ctle_s", self.ctle_s)
self.plotdata.set_data("ctle_out_s", self.ctle_out_s)
self.plotdata.set_data("dfe_s", self.dfe_s)
self.plotdata.set_data("dfe_out_s", self.dfe_out_s)
# Pulse responses
self.plotdata.set_data("chnl_p", self.chnl_p)
self.plotdata.set_data("tx_out_p", self.tx_out_p)
self.plotdata.set_data("ctle_out_p", self.ctle_out_p)
self.plotdata.set_data("dfe_out_p", self.dfe_out_p)
# Outputs
self.plotdata.set_data("ideal_signal", self.ideal_signal)
self.plotdata.set_data("chnl_out", self.chnl_out)
self.plotdata.set_data("tx_out", self.rx_in)
self.plotdata.set_data("ctle_out", self.ctle_out)
self.plotdata.set_data("dfe_out", self.dfe_out)
self.plotdata.set_data("auto_corr", self.auto_corr)
# Frequency responses
self.plotdata.set_data("chnl_H", 20. * log10(abs(self.chnl_H [1 : len_f_GHz])))
self.plotdata.set_data("tx_H", 20. * log10(abs(self.tx_H [1 : len_f_GHz])))
self.plotdata.set_data("tx_out_H", 20. * log10(abs(self.tx_out_H [1 : len_f_GHz])))
self.plotdata.set_data("ctle_H", 20. * log10(abs(self.ctle_H [1 : len_f_GHz])))
self.plotdata.set_data("ctle_out_H", 20. * log10(abs(self.ctle_out_H[1 : len_f_GHz])))
self.plotdata.set_data("dfe_H", 20. * log10(abs(self.dfe_H [1 : len_f_GHz])))
self.plotdata.set_data("dfe_out_H", 20. * log10(abs(self.dfe_out_H [1 : len_f_GHz])))
# Jitter distributions
jitter_ext_chnl = self.jitter_ext_chnl # These are used, again, in bathtub curve generation, below.
jitter_ext_tx = self.jitter_ext_tx
jitter_ext_ctle = self.jitter_ext_ctle
jitter_ext_dfe = self.jitter_ext_dfe
self.plotdata.set_data("jitter_bins", array(self.jitter_bins) * 1.e12)
self.plotdata.set_data("jitter_chnl", self.jitter_chnl)
self.plotdata.set_data("jitter_ext_chnl", jitter_ext_chnl)
self.plotdata.set_data("jitter_tx", self.jitter_tx)
self.plotdata.set_data("jitter_ext_tx", jitter_ext_tx)
self.plotdata.set_data("jitter_ctle", self.jitter_ctle)
self.plotdata.set_data("jitter_ext_ctle", jitter_ext_ctle)
self.plotdata.set_data("jitter_dfe", self.jitter_dfe)
self.plotdata.set_data("jitter_ext_dfe", jitter_ext_dfe)
# Jitter spectrums
log10_ui = log10(ui)
self.plotdata.set_data("f_MHz", self.f_MHz[1:])
self.plotdata.set_data("f_MHz_dfe", self.f_MHz_dfe[1:])
self.plotdata.set_data("jitter_spectrum_chnl", 10. * (log10(self.jitter_spectrum_chnl [1:]) - log10_ui))
self.plotdata.set_data("jitter_ind_spectrum_chnl", 10. * (log10(self.jitter_ind_spectrum_chnl [1:]) - log10_ui))
self.plotdata.set_data("thresh_chnl", 10. * (log10(self.thresh_chnl [1:]) - log10_ui))
self.plotdata.set_data("jitter_spectrum_tx", 10. * (log10(self.jitter_spectrum_tx [1:]) - log10_ui))
self.plotdata.set_data("jitter_ind_spectrum_tx", 10. * (log10(self.jitter_ind_spectrum_tx [1:]) - log10_ui))
self.plotdata.set_data("thresh_tx", 10. * (log10(self.thresh_tx [1:]) - log10_ui))
self.plotdata.set_data("jitter_spectrum_ctle", 10. * (log10(self.jitter_spectrum_ctle [1:]) - log10_ui))
self.plotdata.set_data("jitter_ind_spectrum_ctle", 10. * (log10(self.jitter_ind_spectrum_ctle [1:]) - log10_ui))
self.plotdata.set_data("thresh_ctle", 10. * (log10(self.thresh_ctle [1:]) - log10_ui))
self.plotdata.set_data("jitter_spectrum_dfe", 10. * (log10(self.jitter_spectrum_dfe [1:]) - log10_ui))
self.plotdata.set_data("jitter_ind_spectrum_dfe", 10. * (log10(self.jitter_ind_spectrum_dfe [1:]) - log10_ui))
self.plotdata.set_data("thresh_dfe", 10. * (log10(self.thresh_dfe [1:]) - log10_ui))
self.plotdata.set_data("jitter_rejection_ratio", self.jitter_rejection_ratio[1:])
# Bathtubs
half_len = len(jitter_ext_chnl) / 2
# - Channel
bathtub_chnl = list(cumsum(jitter_ext_chnl[-1 : -(half_len + 1) : -1]))
bathtub_chnl.reverse()
bathtub_chnl = array(bathtub_chnl + list(cumsum(jitter_ext_chnl[:half_len + 1])))
bathtub_chnl = where(bathtub_chnl < MIN_BATHTUB_VAL, 0.1 * MIN_BATHTUB_VAL * ones(len(bathtub_chnl)),
bathtub_chnl) # To avoid Chaco log scale plot wierdness.
self.plotdata.set_data("bathtub_chnl", log10(bathtub_chnl))
# - Tx
bathtub_tx = list(cumsum(jitter_ext_tx[-1 : -(half_len + 1) : -1]))
bathtub_tx.reverse()
bathtub_tx = array(bathtub_tx + list(cumsum(jitter_ext_tx[:half_len + 1])))
bathtub_tx = where(bathtub_tx < MIN_BATHTUB_VAL, 0.1 * MIN_BATHTUB_VAL * ones(len(bathtub_tx)),
bathtub_tx) # To avoid Chaco log scale plot wierdness.
self.plotdata.set_data("bathtub_tx", log10(bathtub_tx))
# - CTLE
bathtub_ctle = list(cumsum(jitter_ext_ctle[-1 : -(half_len + 1) : -1]))
bathtub_ctle.reverse()
bathtub_ctle = array(bathtub_ctle + list(cumsum(jitter_ext_ctle[:half_len + 1])))
bathtub_ctle = where(bathtub_ctle < MIN_BATHTUB_VAL, 0.1 * MIN_BATHTUB_VAL * ones(len(bathtub_ctle)),
bathtub_ctle) # To avoid Chaco log scale plot wierdness.
self.plotdata.set_data("bathtub_ctle", log10(bathtub_ctle))
# - DFE
bathtub_dfe = list(cumsum(jitter_ext_dfe[-1 : -(half_len + 1) : -1]))
bathtub_dfe.reverse()
bathtub_dfe = array(bathtub_dfe + list(cumsum(jitter_ext_dfe[:half_len + 1])))
bathtub_dfe = where(bathtub_dfe < MIN_BATHTUB_VAL, 0.1 * MIN_BATHTUB_VAL * ones(len(bathtub_dfe)),
bathtub_dfe) # To avoid Chaco log scale plot wierdness.
self.plotdata.set_data("bathtub_dfe", log10(bathtub_dfe))
# Eyes
width = 2 * samps_per_ui
xs = linspace(-ui * 1.e12, ui * 1.e12, width)
height = 100
y_max = 1.1 * max(abs(array(self.chnl_out)))
eye_chnl = calc_eye(ui, samps_per_ui, height, self.chnl_out[ignore_samps:], y_max)
y_max = 1.1 * max(abs(array(self.rx_in)))
eye_tx = calc_eye(ui, samps_per_ui, height, self.rx_in[ignore_samps:], y_max)
y_max = 1.1 * max(abs(array(self.ctle_out)))
eye_ctle = calc_eye(ui, samps_per_ui, height, self.ctle_out[ignore_samps:], y_max)
i = 0
while(clock_times[i] <= ignore_until):
i += 1
assert i < len(clock_times), "ERROR: Insufficient coverage in 'clock_times' vector."
y_max = 1.1 * max(abs(array(self.dfe_out)))
eye_dfe = calc_eye(ui, samps_per_ui, height, self.dfe_out, y_max, clock_times[i:])
self.plotdata.set_data("eye_index", xs)
self.plotdata.set_data("eye_chnl", eye_chnl)
self.plotdata.set_data("eye_tx", eye_tx)
self.plotdata.set_data("eye_ctle", eye_ctle)
self.plotdata.set_data("eye_dfe", eye_dfe)
[docs]def update_eyes(self):
"""
Update the heat plots representing the eye diagrams.
Args:
self(PyBERT): Reference to an instance of the *PyBERT* class.
"""
ui = self.ui
samps_per_ui = self.nspui
width = 2 * samps_per_ui
height = 100
xs = linspace(-ui * 1.e12, ui * 1.e12, width)
y_max = 1.1 * max(abs(array(self.chnl_out)))
ys = linspace(-y_max, y_max, height)
self.plots_eye.components[0].components[0].index.set_data(xs, ys)
self.plots_eye.components[0].x_axis.mapper.range.low = xs[0]
self.plots_eye.components[0].x_axis.mapper.range.high = xs[-1]
self.plots_eye.components[0].y_axis.mapper.range.low = ys[0]
self.plots_eye.components[0].y_axis.mapper.range.high = ys[-1]
self.plots_eye.components[0].invalidate_draw()
y_max = 1.1 * max(abs(array(self.rx_in)))
ys = linspace(-y_max, y_max, height)
self.plots_eye.components[1].components[0].index.set_data(xs, ys)
self.plots_eye.components[1].x_axis.mapper.range.low = xs[0]
self.plots_eye.components[1].x_axis.mapper.range.high = xs[-1]
self.plots_eye.components[1].y_axis.mapper.range.low = ys[0]
self.plots_eye.components[1].y_axis.mapper.range.high = ys[-1]
self.plots_eye.components[1].invalidate_draw()
y_max = 1.1 * max(abs(array(self.dfe_out)))
ys = linspace(-y_max, y_max, height)
self.plots_eye.components[3].components[0].index.set_data(xs, ys)
self.plots_eye.components[3].x_axis.mapper.range.low = xs[0]
self.plots_eye.components[3].x_axis.mapper.range.high = xs[-1]
self.plots_eye.components[3].y_axis.mapper.range.low = ys[0]
self.plots_eye.components[3].y_axis.mapper.range.high = ys[-1]
self.plots_eye.components[3].invalidate_draw()
self.plots_eye.components[2].components[0].index.set_data(xs, ys)
self.plots_eye.components[2].x_axis.mapper.range.low = xs[0]
self.plots_eye.components[2].x_axis.mapper.range.high = xs[-1]
self.plots_eye.components[2].y_axis.mapper.range.low = ys[0]
self.plots_eye.components[2].y_axis.mapper.range.high = ys[-1]
self.plots_eye.components[2].invalidate_draw()
self.plots_eye.request_redraw()