--- title: Utilities keywords: fastai sidebar: home_sidebar summary: "Helper functions used throughout the library not related to timeseries data." description: "Helper functions used throughout the library not related to timeseries data." nb_path: "nbs/000_utils.ipynb" ---
{% raw %}
{% endraw %} {% raw %}
{% endraw %} {% raw %}

computer_setup[source]

computer_setup(*pkgs)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
computer_setup()
os             : Darwin
os version     : 19.6.0
python         : 3.6.13
tsai           : 0.2.20
fastai         : 2.5.2
fastcore       : 1.3.26
torch          : 1.9.0
n_cpus         : 8
device         : cpu
{% endraw %} {% raw %}
import matplotlib
import numpy as np
import pandas as pd
import scipy as sp
computer_setup(matplotlib, np, pd, sp)
os             : Darwin
os version     : 19.6.0
python         : 3.6.13
tsai           : 0.2.20
fastai         : 2.5.2
fastcore       : 1.3.26
matplotlib     : 3.3.4
numpy          : 1.19.5
pandas         : 1.1.5
scipy          : 1.5.4
torch          : 1.9.0
n_cpus         : 8
device         : cpu
{% endraw %} {% raw %}
{% endraw %} {% raw %}
fns = ['data', 'export', 'models']
for fn in fns: 
    path = Path('.')/fn
    if not os.path.exists(path): os.makedirs(path)
{% endraw %} {% raw %}

totensor[source]

totensor(o)

{% endraw %} {% raw %}

toarray[source]

toarray(o)

{% endraw %} {% raw %}

toL[source]

toL(o)

{% endraw %} {% raw %}

to3dtensor[source]

to3dtensor(o)

{% endraw %} {% raw %}

to2dtensor[source]

to2dtensor(o)

{% endraw %} {% raw %}

to1dtensor[source]

to1dtensor(o)

{% endraw %} {% raw %}

to3darray[source]

to3darray(o)

{% endraw %} {% raw %}

to2darray[source]

to2darray(o)

{% endraw %} {% raw %}

to1darray[source]

to1darray(o)

{% endraw %} {% raw %}

to3d[source]

to3d(o)

{% endraw %} {% raw %}

to2d[source]

to2d(o)

{% endraw %} {% raw %}

to1d[source]

to1d(o)

{% endraw %} {% raw %}

to2dPlus[source]

to2dPlus(o)

{% endraw %} {% raw %}

to3dPlus[source]

to3dPlus(o)

{% endraw %} {% raw %}

to2dPlusTensor[source]

to2dPlusTensor(o)

{% endraw %} {% raw %}

to2dPlusArray[source]

to2dPlusArray(o)

{% endraw %} {% raw %}

to3dPlusTensor[source]

to3dPlusTensor(o)

{% endraw %} {% raw %}

to3dPlusArray[source]

to3dPlusArray(o)

{% endraw %} {% raw %}

todtype[source]

todtype(dtype)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.random.rand(100).astype(np.float32)
b = torch.from_numpy(a).float()
test_eq(totensor(a), b)
test_eq(a, toarray(b))
test_eq(to3dtensor(a).ndim, 3)
test_eq(to2dtensor(a).ndim, 2)
test_eq(to1dtensor(a).ndim, 1)
test_eq(to3darray(b).ndim, 3)
test_eq(to2darray(b).ndim, 2)
test_eq(to1darray(b).ndim, 1)
{% endraw %} {% raw %}

bytes2size[source]

bytes2size(size_bytes)

{% endraw %} {% raw %}

bytes2GB[source]

bytes2GB(byts)

{% endraw %} {% raw %}

get_size[source]

get_size(o, return_str=False)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.random.rand(10, 5, 3)
test_eq(get_size(a, True), '1.3 KB')
{% endraw %} {% raw %}

delete_all_in_dir[source]

delete_all_in_dir(tgt_dir, exception=None)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

reverse_dict[source]

reverse_dict(dictionary)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

is_tuple[source]

is_tuple(o)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

itemify[source]

itemify(*o, tup_id=None)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = [1, 2, 3]
b = [4, 5, 6]
print(itemify(a, b))
test_eq(len(itemify(a, b)), len(a))
a = [1, 2, 3]
b = None
print(itemify(a, b))
test_eq(len(itemify(a, b)), len(a))
a = [1, 2, 3]
b = [4, 5, 6]
c = None
print(itemify(a, b, c))
test_eq(len(itemify(a, b, c)), len(a))
[(1, 4), (2, 5), (3, 6)]
[(1,), (2,), (3,)]
[(1, 4), (2, 5), (3, 6)]
{% endraw %} {% raw %}

isnone[source]

isnone(o)

{% endraw %} {% raw %}

exists[source]

exists(o)

{% endraw %} {% raw %}

ifelse[source]

ifelse(a, b, c)

b if a is True else c

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.array(3)
test_eq(isnone(a), False)
test_eq(exists(a), True)
b = None
test_eq(isnone(b), True)
test_eq(exists(b), False)
{% endraw %} {% raw %}

is_not_close[source]

is_not_close(a, b, eps=1e-05)

Is a within eps of b

{% endraw %} {% raw %}

test_not_close[source]

test_not_close(a, b, eps=1e-05)

test that a is within eps of b

{% endraw %} {% raw %}

test_type[source]

test_type(a, b)

{% endraw %} {% raw %}

test_ok[source]

test_ok(f, *args, **kwargs)

{% endraw %} {% raw %}

test_not_ok[source]

test_not_ok(f, *args, **kwargs)

{% endraw %} {% raw %}

test_error[source]

test_error(error, f, *args, **kwargs)

{% endraw %} {% raw %}

test_eq_nan[source]

test_eq_nan(a, b)

test that a==b excluding nan values (valid for torch.Tensor and np.ndarray)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

assert_fn[source]

assert_fn(*args, **kwargs)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

test_gt[source]

test_gt(a, b)

test that a>b

{% endraw %} {% raw %}

test_ge[source]

test_ge(a, b)

test that a>=b

{% endraw %} {% raw %}

test_lt[source]

test_lt(a, b)

test that a>b

{% endraw %} {% raw %}

test_le[source]

test_le(a, b)

test that a>b

{% endraw %} {% raw %}
{% endraw %} {% raw %}
test_ok(test_gt, 5, 4)
test_not_ok(test_gt, 4, 4)
test_ok(test_ge, 4, 4)
test_not_ok(test_ge, 3, 4)

test_ok(test_lt, 3, 4)
test_not_ok(test_lt, 4, 4)
test_ok(test_le, 4, 4)
test_not_ok(test_le, 5, 4)
{% endraw %} {% raw %}
t = torch.rand(100)
t[t<.5] = np.nan
test_ne(t, t)
test_eq_nan(t, t)
{% endraw %} {% raw %}

stack[source]

stack(o, axis=0, retain=True)

{% endraw %} {% raw %}

stack_pad[source]

stack_pad(o, padding_value=nan)

Converts a an iterable into a numpy array using padding if necessary

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = [[0,1,2], [4,5,6,7]]
test_eq(stack_pad(a).shape, (2, 4))
test_eq(type(stack_pad(a)), np.ndarray)
test_eq(np.isnan(stack_pad(a)).sum(), 1)
{% endraw %} {% raw %}
a = np.random.rand(2, 3, 4)
t = torch.from_numpy(a)
test_eq_type(stack(itemify(a, tup_id=0)), a)
test_eq_type(stack(itemify(t, tup_id=0)), t)
{% endraw %} {% raw %}

match_seq_len[source]

match_seq_len(*arrays)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.random.rand(10, 5, 8)
b = np.random.rand(3, 5, 10)
c, d = match_seq_len(a, b)
test_eq(c.shape[-1], d.shape[-1])
{% endraw %} {% raw %}

random_shuffle[source]

random_shuffle(o, random_state=None)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.arange(10)
test_eq_type(random_shuffle(a, 1), np.array([2, 9, 6, 4, 0, 3, 1, 7, 8, 5]))
t = torch.arange(10)
test_eq_type(random_shuffle(t, 1), tensor([2, 9, 6, 4, 0, 3, 1, 7, 8, 5]))
l = list(a)
test_eq(random_shuffle(l, 1), [2, 9, 6, 4, 0, 3, 1, 7, 8, 5])
l2 = L(l)
test_eq_type(random_shuffle(l2, 1), L([2, 9, 6, 4, 0, 3, 1, 7, 8, 5]))
{% endraw %} {% raw %}

cat2int[source]

cat2int(o)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.array(['b', 'a', 'a', 'b', 'a', 'b', 'a'])
test_eq_type(cat2int(a), TensorCategory([1, 0, 0, 1, 0, 1, 0]))
{% endraw %} {% raw %}
TensorBase([1,2,3])
TensorBase([1, 2, 3])
{% endraw %} {% raw %}

cycle_dl[source]

cycle_dl(dl)

{% endraw %} {% raw %}

cycle_dl_to_device[source]

cycle_dl_to_device(dl)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

cache_data[source]

cache_data(o, slice_len=10000, verbose=False)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

get_func_defaults[source]

get_func_defaults(f)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

get_idx_from_df_col_vals[source]

get_idx_from_df_col_vals(df, col, val_list)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

get_sublist_idxs[source]

get_sublist_idxs(aList, bList)

Get idxs that when applied to aList will return bList. aList must contain all values in bList

{% endraw %} {% raw %}
{% endraw %} {% raw %}
x = np.array([3, 5, 7, 1, 9, 8, 6, 2])
y = np.array([6, 1, 5, 7])
idx = get_sublist_idxs(x, y)
test_eq(x[idx], y)
x = np.array([3, 5, 7, 1, 9, 8, 6, 6, 2])
y = np.array([6, 1, 5, 7, 5])
idx = get_sublist_idxs(x, y)
test_eq(x[idx], y)
{% endraw %} {% raw %}

flatten_list[source]

flatten_list(l)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

display_pd_df[source]

display_pd_df(df, max_rows:int=False, max_columns:int=False)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
old_max_rows, old_max_columns = pd.get_option('display.max_rows'), pd.get_option('display.max_columns')
df = pd.DataFrame(np.random.rand(70, 25))
display_pd_df(df, max_rows=2, max_columns=3)
test_eq(old_max_rows, pd.get_option('display.max_rows'))
test_eq(old_max_columns, pd.get_option('display.max_columns'))
0 ... 24
0 0.945335 ... 0.386681
... ... ... ...
69 0.859585 ... 0.942017

70 rows × 25 columns

{% endraw %} {% raw %}

ttest[source]

ttest(data1, data2, equal_var=False)

Calculates t-statistic and p-value based on 2 sample distributions

{% endraw %} {% raw %}

tscore[source]

tscore(o)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.random.normal(0.5, 1, 100)
b = np.random.normal(0.15, .5, 50)
plt.hist(a, 50)
plt.hist(b, 50)
plt.show()
ttest(a,b)
(3.5526044294612644, 0.000513558087294865)
{% endraw %} {% raw %}
a = np.random.normal(0.5, 1, 100)
t = torch.normal(0.5, 1, (100, ))
tscore(a), tscore(t)
(5.421348526403489, tensor(5.1050))
{% endraw %} {% raw %}

ttest_tensor[source]

ttest_tensor(a, b)

differentiable pytorch function equivalent to scipy.stats.ttest_ind with equal_var=False

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = torch.rand(100).requires_grad_(True) + .1
b = torch.rand(100).requires_grad_(True)
ttest_tensor(a, b)
tensor(1.7030, grad_fn=<DivBackward0>)
{% endraw %} {% raw %}

pcc[source]

pcc(a, b)

{% endraw %} {% raw %}

scc[source]

scc(a, b)

{% endraw %} {% raw %}
(-0.09043660167320663, -0.18088208820882087)
{% endraw %} {% raw %}

remove_fn[source]

remove_fn(fn, verbose=False)

Removes a file (fn) if exists

{% endraw %} {% raw %}
{% endraw %} {% raw %}

npsave[source]

npsave(array_fn, array, verbose=True)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
fn = 'data/remove_fn_test.npy'
a = np.zeros(1)
npsave(fn, a)
del a
np.load(fn, mmap_mode='r+')
remove_fn(fn, True)
remove_fn(fn, True)
data/remove_fn_test.npy does not exist
saving data/remove_fn_test.npy...
...data/remove_fn_test.npy saved
data/remove_fn_test.npy file removed
data/remove_fn_test.npy does not exist
{% endraw %} {% raw %}

permute_2D[source]

permute_2D(array, axis=None)

Permute rows or columns in an array. This can be used, for example, in feature permutation

{% endraw %} {% raw %}
{% endraw %} {% raw %}
s = np.arange(100 * 50).reshape(100, 50) 
test_eq(permute_2D(s, axis=0).mean(0), s.mean(0))
test_ne(permute_2D(s, axis=0), s)
test_eq(permute_2D(s, axis=1).mean(1), s.mean(1))
test_ne(permute_2D(s, axis=1), s)
test_ne(permute_2D(s), s)
{% endraw %} {% raw %}

random_normal[source]

random_normal()

Returns a number between -1 and 1 with a normal distribution

{% endraw %} {% raw %}

random_half_normal[source]

random_half_normal()

Returns a number between 0 and 1 with a half-normal distribution

{% endraw %} {% raw %}

random_normal_tensor[source]

random_normal_tensor(shape=1, device=None)

Returns a tensor of a predefined shape between -1 and 1 with a normal distribution

{% endraw %} {% raw %}

random_half_normal_tensor[source]

random_half_normal_tensor(shape=1, device=None)

Returns a tensor of a predefined shape between 0 and 1 with a half-normal distribution

{% endraw %} {% raw %}
{% endraw %} {% raw %}

default_dpi[source]

default_dpi()

{% endraw %} {% raw %}

get_plot_fig[source]

get_plot_fig(size=None, dpi=72)

{% endraw %} {% raw %}

fig2buf[source]

fig2buf(fig)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
default_dpi()
72
{% endraw %} {% raw %}

plot_scatter[source]

plot_scatter(x, y, deg=1)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.random.rand(100)
b = np.random.rand(100)**2
plot_scatter(a, b)
{% endraw %} {% raw %}

get_idxs[source]

get_idxs(o, aList)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = random_shuffle(np.arange(100, 200))
b = np.random.choice(a, 10, False)
idxs = get_idxs(a, b)
test_eq(a[idxs], b)
{% endraw %} {% raw %}

apply_cmap[source]

apply_cmap(o, cmap)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.random.rand(16, 1, 40, 50)
s = L(a.shape)
s[1] = 3
test_eq(L(apply_cmap(a, 'viridis').shape), s)

s[0] = 1
a = np.random.rand(1, 40, 50)
test_eq(L(apply_cmap(a, 'viridis').shape), s)
{% endraw %} {% raw %}

torch_tile[source]

torch_tile(a, n_tile, dim=0)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
test_eq(torch_tile(torch.arange(2), 3), tensor([0, 0, 0, 1, 1, 1]))
{% endraw %} {% raw %}

to_tsfresh_df[source]

to_tsfresh_df(ts)

Prepares a time series (Tensor/ np.ndarray) to be used as a tsfresh dataset to allow feature extraction

{% endraw %} {% raw %}
{% endraw %} {% raw %}
ts = torch.rand(16, 3, 20)
a = to_tsfresh_df(ts)
ts = ts.numpy()
b = to_tsfresh_df(ts)
{% endraw %} {% raw %}

pcorr[source]

pcorr(a, b)

{% endraw %} {% raw %}

scorr[source]

scorr(a, b)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

torch_diff[source]

torch_diff(t, lag=1, pad=True)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
t = torch.arange(24).reshape(2,3,4)
test_eq(torch_diff(t, 1)[..., 1:].float().mean(), 1.)
test_eq(torch_diff(t, 2)[..., 2:].float().mean(), 2.)
{% endraw %} {% raw %}

get_outliers_IQR[source]

get_outliers_IQR(o, axis=None)

{% endraw %} {% raw %}

clip_outliers[source]

clip_outliers(o, axis=None)

{% endraw %} {% raw %}

get_percentile[source]

get_percentile(o, percentile, axis=None)

{% endraw %} {% raw %}

torch_clamp[source]

torch_clamp(o, min=None, max=None)

Clamp torch.Tensor using 1 or multiple dimensions

{% endraw %} {% raw %}
{% endraw %} {% raw %}
t = torch.randn(2,3,100)
test_eq(type(get_outliers_IQR(t, -1)[0]), torch.Tensor)
a = np.random.randn(2,3,100)
test_eq(type(get_outliers_IQR(a, -1)[0]), np.ndarray)
{% endraw %} {% raw %}

torch_slice_by_dim[source]

torch_slice_by_dim(t, index, dim=-1, **kwargs)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
t = torch.rand(5, 3)
index = torch.randint(0, 3, (5, 1))
# index = [[0, 2], [0, 1], [1, 2], [0, 2], [0, 1]]
torch_slice_by_dim(t, index)
tensor([[0.0267],
        [0.2523],
        [0.7502],
        [0.1589],
        [0.9777]])
{% endraw %} {% raw %}

torch_nanmean[source]

torch_nanmean(o, dim=None, keepdim=False)

There's currently no torch.nanmean function

{% endraw %} {% raw %}

torch_nanstd[source]

torch_nanstd(o, dim=None, keepdim=False)

There's currently no torch.nanstd function

{% endraw %} {% raw %}
{% endraw %} {% raw %}
t = torch.rand(1000)
t[:100] = float('nan')
assert torch_nanmean(t).item() > 0
{% endraw %} {% raw %}

concat[source]

concat(colls)

Concatenate all collections in colls

{% endraw %} {% raw %}
{% endraw %} {% raw %}

reduce_memory_usage[source]

reduce_memory_usage(df)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

cls_name[source]

cls_name(o)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
test_eq(cls_name(timer), 'Timer')
{% endraw %} {% raw %}

roll2d[source]

roll2d(o, roll1:Union[NoneType, list, int]=None, roll2:Union[NoneType, list, int]=None)

Rolls a 2D object on the indicated axis This solution is based on https://stackoverflow.com/questions/20360675/roll-rows-of-a-matrix-independently

{% endraw %} {% raw %}

roll3d[source]

roll3d(o, roll1:Union[NoneType, list, int]=None, roll2:Union[NoneType, list, int]=None, roll3:Union[NoneType, list, int]=None)

Rolls a 3D object on the indicated axis This solution is based on https://stackoverflow.com/questions/20360675/roll-rows-of-a-matrix-independently

{% endraw %} {% raw %}

random_roll2d[source]

random_roll2d(o, axis=())

Rolls a 2D object on the indicated axis This solution is based on https://stackoverflow.com/questions/20360675/roll-rows-of-a-matrix-independently

{% endraw %} {% raw %}

random_roll3d[source]

random_roll3d(o, axis=(), replace=False)

Randomly rolls a 3D object along the indicated axes This solution is based on https://stackoverflow.com/questions/20360675/roll-rows-of-a-matrix-independently

{% endraw %} {% raw %}

rotate_axis0[source]

rotate_axis0(o, steps=1)

{% endraw %} {% raw %}

rotate_axis1[source]

rotate_axis1(o, steps=1)

{% endraw %} {% raw %}

rotate_axis2[source]

rotate_axis2(o, steps=1)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.tile(np.arange(10), 3).reshape(3, 10) * np.array([1, 10, 100]).reshape(-1, 1)
a
array([[  0,   1,   2,   3,   4,   5,   6,   7,   8,   9],
       [  0,  10,  20,  30,  40,  50,  60,  70,  80,  90],
       [  0, 100, 200, 300, 400, 500, 600, 700, 800, 900]])
{% endraw %} {% raw %}
roll2d(a, roll1=[2, 1, 0])
array([[  0, 100, 200, 300, 400, 500, 600, 700, 800, 900],
       [  0,  10,  20,  30,  40,  50,  60,  70,  80,  90],
       [  0,   1,   2,   3,   4,   5,   6,   7,   8,   9]])
{% endraw %} {% raw %}
roll2d(a, roll2=3)
array([[  7,   8,   9,   0,   1,   2,   3,   4,   5,   6],
       [ 70,  80,  90,   0,  10,  20,  30,  40,  50,  60],
       [700, 800, 900,   0, 100, 200, 300, 400, 500, 600]])
{% endraw %} {% raw %}
o = torch.arange(24).reshape(2,3,4)
test_eq(rotate_axis0(o)[1], o[0])
test_eq(rotate_axis1(o)[:,1], o[:,0])
test_eq(rotate_axis2(o)[...,1], o[...,0])
{% endraw %} {% raw %}

create_empty_array[source]

create_empty_array(shape, fname=None, path='./data', on_disk=True, dtype='float32', mode='r+', **kwargs)

mode: ‘r’: Open existing file for reading only. ‘r+’: Open existing file for reading and writing. ‘w+’: Create or overwrite existing file for reading and writing. ‘c’: Copy-on-write: assignments affect data in memory, but changes are not saved to disk. The file on disk is read-only.

{% endraw %} {% raw %}
{% endraw %} {% raw %}
fname = 'X_on_disk'
shape = (100, 10, 10)
X = create_empty_array(shape, fname, on_disk=True, mode='r+')

chunksize = 10
pbar = progress_bar(range(math.ceil(len(X) / chunksize)), leave=False)
start = 0
for i in pbar: 
    end = min(start + chunksize, len(X))
    partial_data = np.random.rand(end - start, X.shape[1] , X.shape[2])
    X[start:end] = partial_data
    start = end
    del partial_data
    gc.collect()
filename = X.filename
del X
X = np.load(filename, mmap_mode='r+')
test_eq((X == 0).sum(), 0)
test_eq(X.shape, shape)
os.remove(X.filename)
{% endraw %} {% raw %}

np_save_compressed[source]

np_save_compressed(arr, fname=None, path='./data', verbose=False, **kwargs)

{% endraw %} {% raw %}

np_load_compressed[source]

np_load_compressed(fname=None, path='./data', **kwargs)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
X1 = np.random.rand(10)
np_save_compressed(X1, 'X_comp', path='./data')
X2 = np_load_compressed('X_comp')
test_eq(X1, X2)
{% endraw %} {% raw %}

np2memmap[source]

np2memmap(arr, fname=None, path='./data', dtype='float32', mode='c', **kwargs)

Function that turns an ndarray into a memmap ndarray mode: ‘r’: Open existing file for reading only. ‘r+’: Open existing file for reading and writing. ‘w+’: Create or overwrite existing file for reading and writing. ‘c’: Copy-on-write: assignments affect data in memory, but changes are not saved to disk. The file on disk is read-only.

{% endraw %} {% raw %}
{% endraw %} {% raw %}
X1 = np.random.rand(10)
X2 = np2memmap(X1, 'X1_test')
test_eq(X1, X2)
test_ne(type(X1), type(X2))
{% endraw %} {% raw %}

torch_mean_groupby[source]

torch_mean_groupby(o, idxs)

Computes torch mean along axis 0 grouped by the idxs. Need to ensure that idxs have the same order as o

{% endraw %} {% raw %}
{% endraw %} {% raw %}
o = torch.arange(6*2*3).reshape(6, 2, 3).float()
idxs = np.array([[0,1,2,3], [2,3]], dtype=object)
output = torch_mean_groupby(o, idxs)
test_eq(o[:2], output[:2])
test_eq(o[2:4].mean(0), output[2])
test_eq(o[4:6].mean(0), output[3])
{% endraw %} {% raw %}

torch_flip[source]

torch_flip(t, dims=-1)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
t = torch.randn(2, 3, 4)
test_eq(torch.flip(t, (2,)), torch_flip(t, dims=-1))
{% endraw %} {% raw %}

torch_nan_to_num[source]

torch_nan_to_num(o, num=0, inplace=False)

{% endraw %} {% raw %}

torch_masked_to_num[source]

torch_masked_to_num(o, mask, num=0, inplace=False)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
x = torch.rand(2, 4, 6)
x[:, :3][x[:, :3] < .5] = np.nan
nan_values = torch.isnan(x).sum()
y = torch_nan_to_num(x[:, :3], inplace=False)
test_eq(torch.isnan(y).sum(), 0)
test_eq(torch.isnan(x).sum(), nan_values)
torch_nan_to_num(x[:, :3], inplace=True)
test_eq(torch.isnan(x).sum(), 0)
{% endraw %} {% raw %}
x = torch.rand(2, 4, 6)
mask = x[:, :3] > .5
x[:, :3] = torch_masked_to_num(x[:, :3], mask, num=0, inplace=False)
test_eq(x[:, :3][mask].sum(), 0)
{% endraw %} {% raw %}
x = torch.rand(2, 4, 6)
mask = x[:, :3] > .5
torch_masked_to_num(x[:, :3], mask, num=0, inplace=True)
test_eq(x[:, :3][mask].sum(), 0)
{% endraw %} {% raw %}

mpl_trend[source]

mpl_trend(x, y, deg=1)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
x = np.sort(np.random.randint(0, 100, 100)/10)
y = np.random.rand(100) + np.linspace(0, 10, 100)
trend = mpl_trend(x, y)
plt.scatter(x, y)
plt.plot(x, trend, 'r')
plt.show()
{% endraw %} {% raw %}

int2digits[source]

int2digits(o, n_digits=None, normalize=True)

{% endraw %} {% raw %}

array2digits[source]

array2digits(o, n_digits=None, normalize=True)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
o = -9645
test_eq(int2digits(o, 6), np.array([ 0,  0, -.9, -.6, -.4, -.5]))

a = np.random.randint(-1000, 1000, 10)
test_eq(array2digits(a,5).shape, (10,5))
{% endraw %} {% raw %}

sincos_encoding[source]

sincos_encoding(seq_len, device=None, to_np=False)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
sin, cos = sincos_encoding(100)
plt.plot(sin.cpu().numpy())
plt.plot(cos.cpu().numpy())
plt.show()
{% endraw %} {% raw %}

linear_encoding[source]

linear_encoding(seq_len, device=None, to_np=False, lin_range=(-1, 1))

{% endraw %} {% raw %}
{% endraw %} {% raw %}
lin = linear_encoding(100)
plt.plot(lin.cpu().numpy())
plt.show()
{% endraw %} {% raw %}

encode_positions[source]

encode_positions(pos_arr, min_val=None, max_val=None, linear=False, lin_range=(-1, 1))

Encodes an array with positions using a linear or sincos methods

{% endraw %} {% raw %}
{% endraw %} {% raw %}
n_samples = 10
length = 500
_a = []
for i in range(n_samples):
    a = np.arange(-4000, 4000, 10)
    mask = np.random.rand(len(a)) > .5
    a = a[mask]
    a = np.concatenate([a, np.array([np.nan] * (length - len(a)))])
    _a.append(a.reshape(-1,1))
a = np.concatenate(_a, -1).transpose(1,0)
sin, cos = encode_positions(a, linear=False)
test_eq(a.shape, (n_samples, length))
test_eq(sin.shape, (n_samples, length))
test_eq(cos.shape, (n_samples, length))
plt.plot(sin.T)
plt.plot(cos.T)
plt.xlim(0, 500)
plt.show()
{% endraw %} {% raw %}
n_samples = 10
length = 500
_a = []
for i in range(n_samples):
    a = np.arange(-4000, 4000, 10)
    mask = np.random.rand(len(a)) > .5
    a = a[mask]
    a = np.concatenate([a, np.array([np.nan] * (length - len(a)))])
    _a.append(a.reshape(-1,1))
a = np.concatenate(_a, -1).transpose(1,0)
lin = encode_positions(a, linear=True)
test_eq(a.shape, (n_samples, length))
test_eq(lin.shape, (n_samples, length))
plt.plot(lin.T)
plt.xlim(0, 500)
plt.show()
{% endraw %} {% raw %}

sort_generator[source]

sort_generator(generator, bs)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
generator = (i for i in np.random.permutation(np.arange(1000000)).tolist())
l = list(sort_generator(generator, 512))
test_eq(l[:512], sorted(l[:512]))
{% endraw %} {% raw %}

get_subset_dict[source]

get_subset_dict(d, keys)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
keys = string.ascii_lowercase
values = np.arange(len(keys))
d = {k:v for k,v in zip(keys,values)}
test_eq(get_subset_dict(d, ['a', 'k', 'j', 'e']), {'a': 0, 'k': 10, 'j': 9, 'e': 4})
{% endraw %} {% raw %}
%%file mod_dev.py
a = 5
def sum(b): return a + b
Writing mod_dev.py
{% endraw %} {% raw %}
%%file mod_dev2.py
from fastcore.script import *
from tsai.imports import *

@call_parse
def add(
    path:  Param('path to A.', str)='',
    b:     Param('Integer.', int)=0,
):
    mod_A = import_file_as_module(path)
    output = mod_A.sum(b)
    print(output)
    return output
Writing mod_dev2.py
{% endraw %} {% raw %}
from mod_dev2 import *
test_eq(add('mod_dev.py', 3), 8)
8
{% endraw %} {% raw %}
r = !python mod_dev2.py --path "mod_dev.py" --b 3
test_eq(int(r[0]), 8)
{% endraw %} {% raw %}
if os.path.exists("mod_dev.py"): os.remove("mod_dev.py")
if os.path.exists("mod_dev2.py"): os.remove("mod_dev2.py")
{% endraw %} {% raw %}

chunks_calculator[source]

chunks_calculator(shape, dtype='float32', n_bytes=1073741824)

Function to calculate chunks for a given size of n_bytes (default = 1024**3 == 1GB). It guarantees > 50% of the chunk will be filled

{% endraw %} {% raw %}
{% endraw %} {% raw %}
shape = (1_000, 10, 1000)
dtype = 'float32'
test_eq(chunks_calculator(shape, dtype), False)

shape = (54684, 10, 1000)
dtype = 'float32'
test_eq(chunks_calculator(shape, dtype), (27342, -1, -1))
{% endraw %}