--- title: Utilities keywords: fastai sidebar: home_sidebar summary: "Helper functions used throughout the library not related to timeseries data." description: "Helper functions used throughout the library not related to timeseries data." nb_path: "nbs/001_utils.ipynb" ---
{% raw %}
{% endraw %} {% raw %}
{% endraw %} {% raw %}

is_nparray[source]

is_nparray(o)

{% endraw %} {% raw %}

is_tensor[source]

is_tensor(o)

{% endraw %} {% raw %}

is_zarr[source]

is_zarr(o)

{% endraw %} {% raw %}

is_dask[source]

is_dask(o)

{% endraw %} {% raw %}

is_memmap[source]

is_memmap(o)

{% endraw %} {% raw %}

is_slice[source]

is_slice(o)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
fns = ['data', 'export', 'models']
for fn in fns: 
    path = Path('.')/fn
    if not os.path.exists(path): os.makedirs(path)
{% endraw %} {% raw %}

totensor[source]

totensor(o)

{% endraw %} {% raw %}

toarray[source]

toarray(o)

{% endraw %} {% raw %}

toL[source]

toL(o)

{% endraw %} {% raw %}

to3dtensor[source]

to3dtensor(o)

{% endraw %} {% raw %}

to2dtensor[source]

to2dtensor(o)

{% endraw %} {% raw %}

to1dtensor[source]

to1dtensor(o)

{% endraw %} {% raw %}

to3darray[source]

to3darray(o)

{% endraw %} {% raw %}

to2darray[source]

to2darray(o)

{% endraw %} {% raw %}

to1darray[source]

to1darray(o)

{% endraw %} {% raw %}

to3d[source]

to3d(o)

{% endraw %} {% raw %}

to2d[source]

to2d(o)

{% endraw %} {% raw %}

to1d[source]

to1d(o)

{% endraw %} {% raw %}

to2dPlus[source]

to2dPlus(o)

{% endraw %} {% raw %}

to3dPlus[source]

to3dPlus(o)

{% endraw %} {% raw %}

to2dPlusTensor[source]

to2dPlusTensor(o)

{% endraw %} {% raw %}

to2dPlusArray[source]

to2dPlusArray(o)

{% endraw %} {% raw %}

to3dPlusTensor[source]

to3dPlusTensor(o)

{% endraw %} {% raw %}

to3dPlusArray[source]

to3dPlusArray(o)

{% endraw %} {% raw %}

todtype[source]

todtype(dtype)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.random.rand(100).astype(np.float32)
b = torch.from_numpy(a).float()
test_eq(totensor(a), b)
test_eq(a, toarray(b))
test_eq(to3dtensor(a).ndim, 3)
test_eq(to2dtensor(a).ndim, 2)
test_eq(to1dtensor(a).ndim, 1)
test_eq(to3darray(b).ndim, 3)
test_eq(to2darray(b).ndim, 2)
test_eq(to1darray(b).ndim, 1)
{% endraw %} {% raw %}
data = np.random.rand(10, 20)
df = pd.DataFrame(data)
df['target'] = np.random.randint(0, 3, len(df))
X = df[df.columns[:-1]]
y = df['target']
test_eq(to3darray(X).shape, (10, 1, 20))
test_eq(toarray(y).shape, (10,))
{% endraw %} {% raw %}

bytes2size[source]

bytes2size(size_bytes:int, decimals=2)

Type Default Details
size_bytes int Number of bytes
decimals int 2 Number of decimals in the output
{% endraw %} {% raw %}

bytes2GB[source]

bytes2GB(byts)

{% endraw %} {% raw %}

get_size[source]

get_size(o, return_str:bool=True, decimals:int=2)

Type Default Details
o Any object
return_str bool True True returns size in human-readable format (KB, MB, GB, ...). False in bytes.
decimals int 2 Number of decimals in the output
{% endraw %} {% raw %}

get_dir_size[source]

get_dir_size(dir_path:str, return_str:bool=True, decimals:int=2, verbose:bool=False)

Type Default Details
dir_path str path to directory
return_str bool True True returns size in human-readable format (KB, MB, GB, ...). False in bytes.
decimals int 2 Number of decimals in the output
verbose bool False Controls verbosity
{% endraw %} {% raw %}

get_file_size[source]

get_file_size(file_path:str, return_str:bool=True, decimals:int=2)

Type Default Details
file_path str path to file
return_str bool True True returns size in human-readable format (KB, MB, GB, ...). False in bytes.
decimals int 2 Number of decimals in the output
{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.random.rand(10, 5, 3)
test_eq(get_size(a, True), '1.3 KB')
{% endraw %} {% raw %}

is_file[source]

is_file(path)

{% endraw %} {% raw %}

is_dir[source]

is_dir(path)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
test_eq(is_file("001_utils.ipynb"), True)
test_eq(is_file("utils.ipynb"), False)
{% endraw %} {% raw %}

delete_all_in_dir[source]

delete_all_in_dir(tgt_dir, exception=None)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

reverse_dict[source]

reverse_dict(dictionary)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

is_tuple[source]

is_tuple(o)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

itemify[source]

itemify(*o, tup_id=None)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = [1, 2, 3]
b = [4, 5, 6]
print(itemify(a, b))
test_eq(len(itemify(a, b)), len(a))
a = [1, 2, 3]
b = None
print(itemify(a, b))
test_eq(len(itemify(a, b)), len(a))
a = [1, 2, 3]
b = [4, 5, 6]
c = None
print(itemify(a, b, c))
test_eq(len(itemify(a, b, c)), len(a))
[(1, 4), (2, 5), (3, 6)]
[(1,), (2,), (3,)]
[(1, 4), (2, 5), (3, 6)]
{% endraw %} {% raw %}

isnone[source]

isnone(o)

{% endraw %} {% raw %}

exists[source]

exists(o)

{% endraw %} {% raw %}

ifelse[source]

ifelse(a, b, c)

b if a is True else c

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.array(3)
test_eq(isnone(a), False)
test_eq(exists(a), True)
b = None
test_eq(isnone(b), True)
test_eq(exists(b), False)
{% endraw %} {% raw %}

is_not_close[source]

is_not_close(a, b, eps=1e-05)

Is a within eps of b

{% endraw %} {% raw %}

test_not_close[source]

test_not_close(a, b, eps=1e-05)

test that a is within eps of b

{% endraw %} {% raw %}

test_type[source]

test_type(a, b)

{% endraw %} {% raw %}

test_ok[source]

test_ok(f, *args, **kwargs)

{% endraw %} {% raw %}

test_not_ok[source]

test_not_ok(f, *args, **kwargs)

{% endraw %} {% raw %}

test_error[source]

test_error(error, f, *args, **kwargs)

{% endraw %} {% raw %}

test_eq_nan[source]

test_eq_nan(a, b)

test that a==b excluding nan values (valid for torch.Tensor and np.ndarray)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

assert_fn[source]

assert_fn(*args, **kwargs)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

test_gt[source]

test_gt(a, b)

test that a>b

{% endraw %} {% raw %}

test_ge[source]

test_ge(a, b)

test that a>=b

{% endraw %} {% raw %}

test_lt[source]

test_lt(a, b)

test that a>b

{% endraw %} {% raw %}

test_le[source]

test_le(a, b)

test that a>b

{% endraw %} {% raw %}
{% endraw %} {% raw %}
test_ok(test_gt, 5, 4)
test_not_ok(test_gt, 4, 4)
test_ok(test_ge, 4, 4)
test_not_ok(test_ge, 3, 4)

test_ok(test_lt, 3, 4)
test_not_ok(test_lt, 4, 4)
test_ok(test_le, 4, 4)
test_not_ok(test_le, 5, 4)
{% endraw %} {% raw %}
t = torch.rand(100)
t[t<.5] = np.nan
test_ne(t, t)
test_eq_nan(t, t)
{% endraw %} {% raw %}

stack[source]

stack(o, axis=0, retain=True)

{% endraw %} {% raw %}

stack_pad[source]

stack_pad(o, padding_value=nan)

Converts a an iterable into a numpy array using padding if necessary

{% endraw %} {% raw %}
{% endraw %} {% raw %}
o = [[0,1,2], [4,5,6,7]]
test_eq(stack_pad(o).shape, (1, 2, 4))
test_eq(type(stack_pad(o)), np.ndarray)
test_eq(np.isnan(stack_pad(o)).sum(), 1)
/Users/nacho/opt/anaconda3/envs/py37torch110/lib/python3.7/site-packages/ipykernel_launcher.py:13: VisibleDeprecationWarning: Creating an ndarray from ragged nested sequences (which is a list-or-tuple of lists-or-tuples-or ndarrays with different lengths or shapes) is deprecated. If you meant to do this, you must specify 'dtype=object' when creating the ndarray.
  del sys.path[0]
{% endraw %} {% raw %}
o = 3
print(stack_pad(o))
test_eq(stack_pad(o), np.array([[3.]]))
o = [4,5]
print(stack_pad(o))
test_eq(stack_pad(o), np.array([[4., 5.]]))
o = [[0,1,2], [4,5,6,7]]
print(stack_pad(o))
o = np.array([0, [1,2]], dtype=object)
print(stack_pad(o))
o = np.array([[[0], [10, 20], [100, 200, 300]], [[0, 1, 2, 3], [10, 20], [100]]], dtype=object)
print(stack_pad(o))
o = np.array([0, [10, 20]], dtype=object)
print(stack_pad(o))
[[3.]]
[[4. 5.]]
[[[ 0.  1.  2. nan]
  [ 4.  5.  6.  7.]]]
[[ 0. nan]
 [ 1.  2.]]
[[[  0.  nan  nan  nan]
  [ 10.  20.  nan  nan]
  [100. 200. 300.  nan]]

 [[  0.   1.   2.   3.]
  [ 10.  20.  nan  nan]
  [100.  nan  nan  nan]]]
[[ 0. nan]
 [10. 20.]]
/Users/nacho/opt/anaconda3/envs/py37torch110/lib/python3.7/site-packages/ipykernel_launcher.py:13: VisibleDeprecationWarning: Creating an ndarray from ragged nested sequences (which is a list-or-tuple of lists-or-tuples-or ndarrays with different lengths or shapes) is deprecated. If you meant to do this, you must specify 'dtype=object' when creating the ndarray.
  del sys.path[0]
{% endraw %} {% raw %}
a = np.random.rand(2, 3, 4)
t = torch.from_numpy(a)
test_eq_type(stack(itemify(a, tup_id=0)), a)
test_eq_type(stack(itemify(t, tup_id=0)), t)
{% endraw %} {% raw %}

pad_sequences[source]

pad_sequences(o, maxlen:int=None, dtype:(str, type)=float64, padding:str='pre', truncating:str='pre', padding_value:float=nan)

Transforms an iterable with sequences into a 3d numpy array using padding or truncating sequences if necessary

Type Default Details
o Iterable object
maxlen int None Optional max length of the output. If None, max length of the longest individual sequence.
dtype (str, type) float64 Type of the output sequences. To pad sequences with variable length strings, you can use object.
padding str pre 'pre' or 'post' pad either before or after each sequence.
truncating str pre 'pre' or 'post' remove values from sequences larger than maxlen, either at the beginning or at the end of the sequences.
padding_value float nan Value used for padding.
{% endraw %} {% raw %}
{% endraw %}

This function transforms a list (of length n_samples) of sequences into a 3d numpy array of shape:

[n_samples x n_vars x seq_len]

seq_len is either the maxlen argument if provided, or the length of the longest sequence in the list.

Sequences that are shorter than seq_len are padded with value until they are seq_len long.

Sequences longer than seq_len are truncated so that they fit the desired length.

The position where padding or truncation happens is determined by the arguments padding and truncating, respectively. Pre-padding or removing values from the beginning of the sequence is the default.

Input sequences to pad_sequences may be have 1, 2 or 3 dimensions:

{% raw %}
a1 = np.arange(6)
a2 = np.arange(3) * 10
a3 = np.arange(2) * 100
o  = [a1, a2, a3]
padded_o = pad_sequences(o, maxlen=4, dtype=np.float64, padding='post', truncating='pre', padding_value=np.nan)
test_eq(padded_o.shape, (3, 1, 4))
padded_o
array([[[  2.,   3.,   4.,   5.]],

       [[  0.,  10.,  20.,  nan]],

       [[  0., 100.,  nan,  nan]]])
{% endraw %} {% raw %}
a1 = np.arange(12).reshape(2, 6)
a2 = np.arange(6).reshape(2, 3) * 10
a3 = np.arange(4).reshape(2, 2) * 100
o  = [a1, a2, a3]
padded_o = pad_sequences(o, maxlen=4, dtype=np.float64, padding='post', truncating='pre', padding_value=np.nan)
test_eq(padded_o.shape, (3, 2, 4))
padded_o
array([[[  2.,   3.,   4.,   5.],
        [  8.,   9.,  10.,  11.]],

       [[  0.,  10.,  20.,  nan],
        [ 30.,  40.,  50.,  nan]],

       [[  0., 100.,  nan,  nan],
        [200., 300.,  nan,  nan]]])
{% endraw %} {% raw %}
a1 = np.arange(10).reshape(1, 2, 5)
a2 = np.arange(6).reshape(1, 2, 3) * 10
a3 = np.arange(4).reshape(1, 2, 2) * 100
o  = [a1, a2, a3]
padded_o = pad_sequences(o, maxlen=None, dtype=np.float64, padding='pre', truncating='pre', padding_value=np.nan)
test_eq(padded_o.shape, (3, 2, 5))
padded_o
array([[[  0.,   1.,   2.,   3.,   4.],
        [  5.,   6.,   7.,   8.,   9.]],

       [[ nan,  nan,   0.,  10.,  20.],
        [ nan,  nan,  30.,  40.,  50.]],

       [[ nan,  nan,  nan,   0., 100.],
        [ nan,  nan,  nan, 200., 300.]]])
{% endraw %} {% raw %}
a1 = np.arange(10).reshape(1, 2, 5)
a2 = np.arange(6).reshape(1, 2, 3) * 10
a3 = np.arange(4).reshape(1, 2, 2) * 100
o  = [a1, a2, a3]
padded_o = pad_sequences(o, maxlen=4, dtype=np.float64, padding='pre', truncating='pre', padding_value=np.nan)
test_eq(padded_o.shape, (3, 2, 4))
padded_o
array([[[  1.,   2.,   3.,   4.],
        [  6.,   7.,   8.,   9.]],

       [[ nan,   0.,  10.,  20.],
        [ nan,  30.,  40.,  50.]],

       [[ nan,  nan,   0., 100.],
        [ nan,  nan, 200., 300.]]])
{% endraw %} {% raw %}
a1 = np.arange(10).reshape(1, 2, 5)
a2 = np.arange(6).reshape(1, 2, 3) * 10
a3 = np.arange(4).reshape(1, 2, 2) * 100
o  = [a1, a2, a3]
padded_o = pad_sequences(o, maxlen=4, dtype=np.float64, padding='post', truncating='pre', padding_value=np.nan)
test_eq(padded_o.shape, (3, 2, 4))
padded_o
array([[[  1.,   2.,   3.,   4.],
        [  6.,   7.,   8.,   9.]],

       [[  0.,  10.,  20.,  nan],
        [ 30.,  40.,  50.,  nan]],

       [[  0., 100.,  nan,  nan],
        [200., 300.,  nan,  nan]]])
{% endraw %} {% raw %}
a1 = np.arange(10).reshape(1, 2, 5)
a2 = np.arange(6).reshape(1, 2, 3) * 10
a3 = np.arange(4).reshape(1, 2, 2) * 100
o  = [a1, a2, a3]
padded_o = pad_sequences(o, maxlen=4, dtype=np.float64, padding='post', truncating='post', padding_value=np.nan)
test_eq(padded_o.shape, (3, 2, 4))
padded_o
array([[[  0.,   1.,   2.,   3.],
        [  5.,   6.,   7.,   8.]],

       [[  0.,  10.,  20.,  nan],
        [ 30.,  40.,  50.,  nan]],

       [[  0., 100.,  nan,  nan],
        [200., 300.,  nan,  nan]]])
{% endraw %} {% raw %}
a1 = np.arange(12).reshape(1, 2, 6).tolist()
a2 = (np.arange(6).reshape(1, 2, 3) * 10).tolist()
a3 = (np.arange(4).reshape(1, 2, 2) * 100).tolist()
o  = [a1, a2, a3]
padded_o = pad_sequences(o, maxlen=None, dtype=np.float64, padding='post', truncating='pre', padding_value=np.nan)
test_eq(padded_o.shape, (3, 2, 6))
padded_o
array([[[  0.,   1.,   2.,   3.,   4.,   5.],
        [  6.,   7.,   8.,   9.,  10.,  11.]],

       [[  0.,  10.,  20.,  nan,  nan,  nan],
        [ 30.,  40.,  50.,  nan,  nan,  nan]],

       [[  0., 100.,  nan,  nan,  nan,  nan],
        [200., 300.,  nan,  nan,  nan,  nan]]])
{% endraw %} {% raw %}

match_seq_len[source]

match_seq_len(*arrays)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.random.rand(10, 5, 8)
b = np.random.rand(3, 5, 10)
c, d = match_seq_len(a, b)
test_eq(c.shape[-1], d.shape[-1])
{% endraw %} {% raw %}

random_shuffle[source]

random_shuffle(o, random_state=None)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.arange(10)
test_eq_type(random_shuffle(a, 1), np.array([2, 9, 6, 4, 0, 3, 1, 7, 8, 5]))
t = torch.arange(10)
test_eq_type(random_shuffle(t, 1), tensor([2, 9, 6, 4, 0, 3, 1, 7, 8, 5]))
l = list(a)
test_eq(random_shuffle(l, 1), [2, 9, 6, 4, 0, 3, 1, 7, 8, 5])
l2 = L(l)
test_eq_type(random_shuffle(l2, 1), L([2, 9, 6, 4, 0, 3, 1, 7, 8, 5]))
{% endraw %} {% raw %}

cat2int[source]

cat2int(o)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.array(['b', 'a', 'a', 'b', 'a', 'b', 'a'])
test_eq_type(cat2int(a), TensorCategory([1, 0, 0, 1, 0, 1, 0]))
{% endraw %} {% raw %}
TensorBase([1,2,3])
TensorBase([1, 2, 3])
{% endraw %} {% raw %}

cycle_dl[source]

cycle_dl(dl, show_progress_bar=True)

{% endraw %} {% raw %}

cycle_dl_to_device[source]

cycle_dl_to_device(dl, show_progress_bar=True)

{% endraw %} {% raw %}

cycle_dl_estimate[source]

cycle_dl_estimate(dl, iters=10)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

cache_data[source]

cache_data(o, slice_len=10000, verbose=False)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

get_func_defaults[source]

get_func_defaults(f)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

get_idx_from_df_col_vals[source]

get_idx_from_df_col_vals(df, col, val_list)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

get_sublist_idxs[source]

get_sublist_idxs(aList, bList)

Get idxs that when applied to aList will return bList. aList must contain all values in bList

{% endraw %} {% raw %}
{% endraw %} {% raw %}
x = np.array([3, 5, 7, 1, 9, 8, 6, 2])
y = np.array([6, 1, 5, 7])
idx = get_sublist_idxs(x, y)
test_eq(x[idx], y)
x = np.array([3, 5, 7, 1, 9, 8, 6, 6, 2])
y = np.array([6, 1, 5, 7, 5])
idx = get_sublist_idxs(x, y)
test_eq(x[idx], y)
{% endraw %} {% raw %}

flatten_list[source]

flatten_list(l)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

display_pd_df[source]

display_pd_df(df, max_rows:Union[bool, int]=False, max_columns:Union[bool, int]=False)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
old_max_rows, old_max_columns = pd.get_option('display.max_rows'), pd.get_option('display.max_columns')
df = pd.DataFrame(np.random.rand(70, 25))
display_pd_df(df, max_rows=2, max_columns=3)
test_eq(old_max_rows, pd.get_option('display.max_rows'))
test_eq(old_max_columns, pd.get_option('display.max_columns'))
0 ... 24
0 0.372317 ... 0.631153
... ... ... ...
69 0.937259 ... 0.801330

70 rows × 25 columns

{% endraw %} {% raw %}

ttest[source]

ttest(data1, data2, equal_var=False)

Calculates t-statistic and p-value based on 2 sample distributions

{% endraw %} {% raw %}

kstest[source]

kstest(data1, data2, alternative='two-sided', mode='auto', by_axis=None)

Performs the two-sample Kolmogorov-Smirnov test for goodness of fit.

Parameters data1, data2: Two arrays of sample observations assumed to be drawn from a continuous distributions. Sample sizes can be different. alternative: {‘two-sided’, ‘less’, ‘greater’}, optional. Defines the null and alternative hypotheses. Default is ‘two-sided’. mode: {‘auto’, ‘exact’, ‘asymp’}, optional. Defines the method used for calculating the p-value. by_axis (optional, int): for arrays with more than 1 dimension, the test will be run for each variable in that axis if by_axis is not None.

{% endraw %} {% raw %}

tscore[source]

tscore(o)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.random.normal(0.5, 1, 100)
b = np.random.normal(0.15, .5, 50)
plt.hist(a, 50)
plt.hist(b, 50)
plt.show()
ttest(a,b)
(2.898275642196504, 0.004327466826415265)
{% endraw %} {% raw %}
a = np.random.normal(0.5, 1, (100,3))
b = np.random.normal(0.5, 1, (50,))
kstest(a,b)
(0.13333333333333333, 0.4068699278729948)
{% endraw %} {% raw %}
a = np.random.normal(0.5, 1, (100,3))
b = np.random.normal(0.15, .5, (50,))
kstest(a,b)
(0.36333333333333334, 1.59766620329016e-05)
{% endraw %} {% raw %}
data1 = np.random.normal(0,1,(100, 5, 3))
data2 = np.random.normal(0,2,(100, 5, 3))
kstest(data1, data2, by_axis=1)
([0.19, 0.20666666666666667, 0.20333333333333334, 0.19666666666666666, 0.19],
 [3.7746802141706127e-05,
  5.0749555303276145e-06,
  7.686752727943917e-06,
  1.727127220822253e-05,
  3.7746802141706127e-05])
{% endraw %} {% raw %}
a = np.random.normal(0.5, 1, 100)
t = torch.normal(0.5, 1, (100, ))
tscore(a), tscore(t)
(5.526223578225063, tensor(3.6844))
{% endraw %} {% raw %}

ttest_tensor[source]

ttest_tensor(a, b)

differentiable pytorch function equivalent to scipy.stats.ttest_ind with equal_var=False

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = torch.rand(100).requires_grad_(True) + .1
b = torch.rand(100).requires_grad_(True)
ttest_tensor(a, b)
tensor(3.1645, grad_fn=<DivBackward0>)
{% endraw %} {% raw %}

pcc[source]

pcc(a, b)

{% endraw %} {% raw %}

scc[source]

scc(a, b)

{% endraw %} {% raw %}
(-0.03971704705658001, -0.057941794179417944)
{% endraw %} {% raw %}

remove_fn[source]

remove_fn(fn, verbose=False)

Removes a file (fn) if exists

{% endraw %} {% raw %}
{% endraw %} {% raw %}

npsave[source]

npsave(array_fn, array, verbose=True)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
fn = 'data/remove_fn_test.npy'
a = np.zeros(1)
npsave(fn, a)
del a
np.load(fn, mmap_mode='r+')
remove_fn(fn, True)
remove_fn(fn, True)
data/remove_fn_test.npy does not exist
saving data/remove_fn_test.npy...
...data/remove_fn_test.npy saved
data/remove_fn_test.npy file removed
data/remove_fn_test.npy does not exist
{% endraw %} {% raw %}

permute_2D[source]

permute_2D(array, axis=None)

Permute rows or columns in an array. This can be used, for example, in feature permutation

{% endraw %} {% raw %}
{% endraw %} {% raw %}
s = np.arange(100 * 50).reshape(100, 50) 
test_eq(permute_2D(s, axis=0).mean(0), s.mean(0))
test_ne(permute_2D(s, axis=0), s)
test_eq(permute_2D(s, axis=1).mean(1), s.mean(1))
test_ne(permute_2D(s, axis=1), s)
test_ne(permute_2D(s), s)
{% endraw %} {% raw %}

random_normal[source]

random_normal()

Returns a number between -1 and 1 with a normal distribution

{% endraw %} {% raw %}

random_half_normal[source]

random_half_normal()

Returns a number between 0 and 1 with a half-normal distribution

{% endraw %} {% raw %}

random_normal_tensor[source]

random_normal_tensor(shape=1, device=None)

Returns a tensor of a predefined shape between -1 and 1 with a normal distribution

{% endraw %} {% raw %}

random_half_normal_tensor[source]

random_half_normal_tensor(shape=1, device=None)

Returns a tensor of a predefined shape between 0 and 1 with a half-normal distribution

{% endraw %} {% raw %}
{% endraw %} {% raw %}

default_dpi[source]

default_dpi()

{% endraw %} {% raw %}

get_plot_fig[source]

get_plot_fig(size=None, dpi=72)

{% endraw %} {% raw %}

fig2buf[source]

fig2buf(fig)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
default_dpi()
72
{% endraw %} {% raw %}

plot_scatter[source]

plot_scatter(x, y, deg=1)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.random.rand(100)
b = np.random.rand(100)**2
plot_scatter(a, b)
{% endraw %} {% raw %}

get_idxs[source]

get_idxs(o, aList)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = random_shuffle(np.arange(100, 200))
b = np.random.choice(a, 10, False)
idxs = get_idxs(a, b)
test_eq(a[idxs], b)
{% endraw %} {% raw %}

apply_cmap[source]

apply_cmap(o, cmap)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.random.rand(16, 1, 40, 50)
s = L(a.shape)
s[1] = 3
test_eq(L(apply_cmap(a, 'viridis').shape), s)

s[0] = 1
a = np.random.rand(1, 40, 50)
test_eq(L(apply_cmap(a, 'viridis').shape), s)
{% endraw %} {% raw %}

torch_tile[source]

torch_tile(a, n_tile, dim=0)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
test_eq(torch_tile(torch.arange(2), 3), tensor([0, 1, 0, 1, 0, 1]))
{% endraw %} {% raw %}

to_tsfresh_df[source]

to_tsfresh_df(ts)

Prepares a time series (Tensor/ np.ndarray) to be used as a tsfresh dataset to allow feature extraction

{% endraw %} {% raw %}
{% endraw %} {% raw %}
ts = torch.rand(16, 3, 20)
a = to_tsfresh_df(ts)
ts = ts.numpy()
b = to_tsfresh_df(ts)
{% endraw %} {% raw %}

pcorr[source]

pcorr(a, b)

{% endraw %} {% raw %}

scorr[source]

scorr(a, b)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

torch_diff[source]

torch_diff(t, lag=1, pad=True, append=0)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
t = torch.arange(24).reshape(2,3,4)
test_eq(torch_diff(t, 1)[..., 1:].float().mean(), 1.)
test_eq(torch_diff(t, 2)[..., 2:].float().mean(), 2.)
{% endraw %} {% raw %}

get_outliers_IQR[source]

get_outliers_IQR(o, axis=None, quantile_range=(25.0, 75.0))

{% endraw %} {% raw %}

clip_outliers[source]

clip_outliers(o, axis=None)

{% endraw %} {% raw %}

get_percentile[source]

get_percentile(o, percentile, axis=None)

{% endraw %} {% raw %}

torch_clamp[source]

torch_clamp(o, min=None, max=None)

Clamp torch.Tensor using 1 or multiple dimensions

{% endraw %} {% raw %}
{% endraw %} {% raw %}
t = torch.randn(2,3,100)
test_eq(type(get_outliers_IQR(t, -1)[0]), torch.Tensor)
a = t.numpy()
test_eq(type(get_outliers_IQR(a, -1)[0]), np.ndarray)
test_close(get_percentile(t, 25).numpy(), get_percentile(a, 25))
{% endraw %} {% raw %}

get_robustscale_params[source]

get_robustscale_params(o, by_var=True, percentiles=(25, 75), eps=1e-06)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.random.rand(16, 3, 100)
a[a>.8] = np.nan
median, IQR = get_robustscale_params(a, by_var=True, percentiles=(25, 75))
a_scaled = (a - median) / IQR
test_eq(a.shape, a_scaled.shape)
test_eq(np.isnan(median).sum(),0)
test_eq(np.isnan(IQR).sum(),0)
test_eq(np.isnan(a), np.isnan(a_scaled))
{% endraw %} {% raw %}

torch_slice_by_dim[source]

torch_slice_by_dim(t, index, dim=-1, **kwargs)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
t = torch.rand(5, 3)
index = torch.randint(0, 3, (5, 1))
# index = [[0, 2], [0, 1], [1, 2], [0, 2], [0, 1]]
torch_slice_by_dim(t, index)
tensor([[0.5917],
        [0.2177],
        [0.9194],
        [0.2101],
        [0.4193]])
{% endraw %} {% raw %}

torch_nanmean[source]

torch_nanmean(o, dim=None, keepdim=False)

There's currently no torch.nanmean function

{% endraw %} {% raw %}

torch_nanstd[source]

torch_nanstd(o, dim=None, keepdim=False)

There's currently no torch.nanstd function

{% endraw %} {% raw %}
{% endraw %} {% raw %}
t = torch.rand(1000)
t[:100] = float('nan')
assert torch_nanmean(t).item() > 0
{% endraw %} {% raw %}

concat[source]

concat(*ls)

Concatenate tensors, arrays, lists, or tuples

{% endraw %} {% raw %}
{% endraw %} {% raw %}

reduce_memory_usage[source]

reduce_memory_usage(df)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

cls_name[source]

cls_name(o)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
test_eq(cls_name(timer), 'Timer')
{% endraw %} {% raw %}

roll2d[source]

roll2d(o, roll1:Union[NoneType, list, int]=None, roll2:Union[NoneType, list, int]=None)

Rolls a 2D object on the indicated axis This solution is based on https://stackoverflow.com/questions/20360675/roll-rows-of-a-matrix-independently

{% endraw %} {% raw %}

roll3d[source]

roll3d(o, roll1:Union[NoneType, list, int]=None, roll2:Union[NoneType, list, int]=None, roll3:Union[NoneType, list, int]=None)

Rolls a 3D object on the indicated axis This solution is based on https://stackoverflow.com/questions/20360675/roll-rows-of-a-matrix-independently

{% endraw %} {% raw %}

random_roll2d[source]

random_roll2d(o, axis=(), replace=False)

Rolls a 2D object on the indicated axis This solution is based on https://stackoverflow.com/questions/20360675/roll-rows-of-a-matrix-independently

{% endraw %} {% raw %}

random_roll3d[source]

random_roll3d(o, axis=(), replace=False)

Randomly rolls a 3D object along the indicated axes This solution is based on https://stackoverflow.com/questions/20360675/roll-rows-of-a-matrix-independently

{% endraw %} {% raw %}

rotate_axis0[source]

rotate_axis0(o, steps=1)

{% endraw %} {% raw %}

rotate_axis1[source]

rotate_axis1(o, steps=1)

{% endraw %} {% raw %}

rotate_axis2[source]

rotate_axis2(o, steps=1)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.tile(np.arange(10), 3).reshape(3, 10) * np.array([1, 10, 100]).reshape(-1, 1)
a
array([[  0,   1,   2,   3,   4,   5,   6,   7,   8,   9],
       [  0,  10,  20,  30,  40,  50,  60,  70,  80,  90],
       [  0, 100, 200, 300, 400, 500, 600, 700, 800, 900]])
{% endraw %} {% raw %}
roll2d(a, roll1=[2, 1, 0])
array([[  0, 100, 200, 300, 400, 500, 600, 700, 800, 900],
       [  0,  10,  20,  30,  40,  50,  60,  70,  80,  90],
       [  0,   1,   2,   3,   4,   5,   6,   7,   8,   9]])
{% endraw %} {% raw %}
roll2d(a, roll2=3)
array([[  7,   8,   9,   0,   1,   2,   3,   4,   5,   6],
       [ 70,  80,  90,   0,  10,  20,  30,  40,  50,  60],
       [700, 800, 900,   0, 100, 200, 300, 400, 500, 600]])
{% endraw %} {% raw %}
o = torch.arange(24).reshape(2,3,4)
test_eq(rotate_axis0(o)[1], o[0])
test_eq(rotate_axis1(o)[:,1], o[:,0])
test_eq(rotate_axis2(o)[...,1], o[...,0])
{% endraw %} {% raw %}

chunks_calculator[source]

chunks_calculator(shape, dtype='float32', n_bytes=1073741824)

Function to calculate chunks for a given size of n_bytes (default = 1024**3 == 1GB). It guarantees > 50% of the chunk will be filled

{% endraw %} {% raw %}
{% endraw %} {% raw %}
shape = (1_000, 10, 1000)
dtype = 'float32'
test_eq(chunks_calculator(shape, dtype), False)

shape = (54684, 10, 1000)
dtype = 'float32'
test_eq(chunks_calculator(shape, dtype), (27342, -1, -1))
{% endraw %} {% raw %}

is_memory_shared[source]

is_memory_shared(a, b)

Check if 2 array-like objects share memory

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.random.rand(2,3,4)
t1 = torch.from_numpy(a)
test_eq(is_memory_shared(a, t1), True)
a = np.random.rand(2,3,4)
t2 = torch.as_tensor(a)
test_eq(is_memory_shared(a, t2), True)
a = np.random.rand(2,3,4)
t3 = torch.tensor(a)
test_eq(is_memory_shared(a, t3), False)
{% endraw %} {% raw %}

assign_in_chunks[source]

assign_in_chunks(a, b, chunksize='auto', inplace=True, verbose=True)

Assigns values in b to an array-like object a using chunks to avoid memory overload.

The resulting a retains it's dtype and share it's memory. a: array-like object b: may be an integer, float, str, 'rand' (for random data), or another array like object. chunksize: is the size of chunks. If 'auto' chunks will have around 1GB each.

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.random.rand(10,3,4).astype('float32')
a_dtype = a.dtype
a_id = id(a)
b = np.random.rand(10,3,4).astype('float64')
assign_in_chunks(a, b, chunksize=2, inplace=True, verbose=True)
test_close(a, b)
test_eq(a.dtype, a_dtype)
test_eq(id(a), a_id)

a = np.random.rand(10,3,4).astype('float32')
a_dtype = a.dtype
a_id = id(a)
b = 1
assign_in_chunks(a, b, chunksize=2, inplace=True, verbose=True)
test_eq(a, np.ones_like(a).astype(a.dtype))
test_eq(a.dtype, a_dtype)
test_eq(id(a), a_id)

a = np.random.rand(10,3,4).astype('float32')
a_dtype = a.dtype
a_id = id(a)
b = 0.5
assign_in_chunks(a, b, chunksize=2, inplace=True, verbose=True)
test_eq(a.dtype, a_dtype)
test_eq(id(a), a_id)

a = np.random.rand(10,3,4).astype('float32')
a_dtype = a.dtype
a_id = id(a)
b = 'rand'
assign_in_chunks(a, b, chunksize=2, inplace=True, verbose=True)
test_eq(a.dtype, a_dtype)
test_eq(id(a), a_id)
/Users/nacho/opt/anaconda3/envs/py37torch110/lib/python3.7/site-packages/ipykernel_launcher.py:11: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison
  # This is added back by InteractiveShellApp.init_path()
/Users/nacho/opt/anaconda3/envs/py37torch110/lib/python3.7/site-packages/ipykernel_launcher.py:22: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison
{% endraw %} {% raw %}
a = np.random.rand(10,3,4).astype('float32')
b = np.random.rand(10,3,4).astype('float64')
c = assign_in_chunks(a, b, chunksize=2, inplace=False, verbose=True)
test_close(c, b)
test_eq(a.dtype, c.dtype)
test_eq(is_memory_shared(a, c), True)

a = np.random.rand(10,3,4).astype('float32')
b = 1
c = assign_in_chunks(a, b, chunksize=2, inplace=False, verbose=True)
test_eq(a, np.ones_like(a).astype(a.dtype))
test_eq(a.dtype, c.dtype)
test_eq(is_memory_shared(a, c), True)

a = np.random.rand(10,3,4).astype('float32')
b = 0.5
c = assign_in_chunks(a, b, chunksize=2, inplace=False, verbose=True)
test_eq(a.dtype, c.dtype)
test_eq(is_memory_shared(a, c), True)

a = np.random.rand(10,3,4).astype('float32')
b = 'rand'
c = assign_in_chunks(a, b, chunksize=2, inplace=False, verbose=True)
test_eq(a.dtype, c.dtype)
test_eq(is_memory_shared(a, c), True)
/Users/nacho/opt/anaconda3/envs/py37torch110/lib/python3.7/site-packages/ipykernel_launcher.py:11: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison
  # This is added back by InteractiveShellApp.init_path()
/Users/nacho/opt/anaconda3/envs/py37torch110/lib/python3.7/site-packages/ipykernel_launcher.py:22: FutureWarning: elementwise comparison failed; returning scalar instead, but in the future will perform elementwise comparison
{% endraw %} {% raw %}

create_array[source]

create_array(shape, fname=None, path='./data', on_disk=True, dtype='float32', mode='r+', fill_value='rand', chunksize='auto', verbose=True, **kwargs)

mode: ‘r’: Open existing file for reading only. ‘r+’: Open existing file for reading and writing. ‘w+’: Create or overwrite existing file for reading and writing. ‘c’: Copy-on-write: assignments affect data in memory, but changes are not saved to disk. The file on disk is read-only. fill_value: 'rand' (for random numbers), int or float chunksize = 'auto' to calculate chunks of 1GB, or any integer (for a given number of samples)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
fname = 'X_on_disk'
shape = (100, 10, 10)
X = create_array(shape, fname, on_disk=True, mode='r+')
test_ne(abs(X).sum(), 0)
os.remove(X.filename)
del X
{% endraw %} {% raw %}
fname = 'X_on_disk'
shape = (100, 10, 10)
X = create_empty_array(shape, fname, on_disk=True, mode='r+')
test_eq(abs(X).sum(), 0)

chunksize = 10
pbar = progress_bar(range(math.ceil(len(X) / chunksize)), leave=False)
start = 0
for i in pbar: 
    end = min(start + chunksize, len(X))
    partial_data = np.random.rand(end - start, X.shape[1] , X.shape[2])
    X[start:end] = partial_data
    start = end
    del partial_data
    gc.collect()
filename = X.filename
del X
X = np.load(filename, mmap_mode='r+')
test_eq((X == 0).sum(), 0)
test_eq(X.shape, shape)
os.remove(X.filename)
del X
{% endraw %} {% raw %}

np_save_compressed[source]

np_save_compressed(arr, fname=None, path='./data', verbose=False, **kwargs)

{% endraw %} {% raw %}

np_load_compressed[source]

np_load_compressed(fname=None, path='./data', **kwargs)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
X1 = np.random.rand(10)
np_save_compressed(X1, 'X_comp', path='./data')
X2 = np_load_compressed('X_comp')
test_eq(X1, X2)
{% endraw %} {% raw %}

np2memmap[source]

np2memmap(arr, fname=None, path='./data', dtype='float32', mode='c', **kwargs)

Function that turns an ndarray into a memmap ndarray mode: ‘r’: Open existing file for reading only. ‘r+’: Open existing file for reading and writing. ‘w+’: Create or overwrite existing file for reading and writing. ‘c’: Copy-on-write: assignments affect data in memory, but changes are not saved to disk. The file on disk is read-only.

{% endraw %} {% raw %}
{% endraw %} {% raw %}
X1 = np.random.rand(10)
X2 = np2memmap(X1, 'X1_test')
test_eq(X1, X2)
test_ne(type(X1), type(X2))
{% endraw %} {% raw %}

torch_mean_groupby[source]

torch_mean_groupby(o, idxs)

Computes torch mean along axis 0 grouped by the idxs. Need to ensure that idxs have the same order as o

{% endraw %} {% raw %}
{% endraw %} {% raw %}
o = torch.arange(6*2*3).reshape(6, 2, 3).float()
idxs = np.array([[0,1,2,3], [2,3]], dtype=object)
output = torch_mean_groupby(o, idxs)
test_eq(o[:2], output[:2])
test_eq(o[2:4].mean(0), output[2])
test_eq(o[4:6].mean(0), output[3])
{% endraw %} {% raw %}

torch_flip[source]

torch_flip(t, dims=-1)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
t = torch.randn(2, 3, 4)
test_eq(torch.flip(t, (2,)), torch_flip(t, dims=-1))
{% endraw %} {% raw %}

torch_nan_to_num[source]

torch_nan_to_num(o, num=0, inplace=False)

{% endraw %} {% raw %}

torch_masked_to_num[source]

torch_masked_to_num(o, mask, num=0, inplace=False)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
x = torch.rand(2, 4, 6)
x[:, :3][x[:, :3] < .5] = np.nan
nan_values = torch.isnan(x).sum()
y = torch_nan_to_num(x[:, :3], inplace=False)
test_eq(torch.isnan(y).sum(), 0)
test_eq(torch.isnan(x).sum(), nan_values)
torch_nan_to_num(x[:, :3], inplace=True)
test_eq(torch.isnan(x).sum(), 0)
{% endraw %} {% raw %}
x = torch.rand(2, 4, 6)
mask = x[:, :3] > .5
x[:, :3] = torch_masked_to_num(x[:, :3], mask, num=0, inplace=False)
test_eq(x[:, :3][mask].sum(), 0)
{% endraw %} {% raw %}
x = torch.rand(2, 4, 6)
mask = x[:, :3] > .5
torch_masked_to_num(x[:, :3], mask, num=0, inplace=True)
test_eq(x[:, :3][mask].sum(), 0)
{% endraw %} {% raw %}

mpl_trend[source]

mpl_trend(x, y, deg=1)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
x = np.sort(np.random.randint(0, 100, 100)/10)
y = np.random.rand(100) + np.linspace(0, 10, 100)
trend = mpl_trend(x, y)
plt.scatter(x, y)
plt.plot(x, trend, 'r')
plt.show()
{% endraw %} {% raw %}

int2digits[source]

int2digits(o, n_digits=None, normalize=True)

{% endraw %} {% raw %}

array2digits[source]

array2digits(o, n_digits=None, normalize=True)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
o = -9645
test_eq(int2digits(o, 6), np.array([ 0,  0, -.9, -.6, -.4, -.5]))

a = np.random.randint(-1000, 1000, 10)
test_eq(array2digits(a,5).shape, (10,5))
{% endraw %} {% raw %}

sincos_encoding[source]

sincos_encoding(seq_len, device=None, to_np=False)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
sin, cos = sincos_encoding(100)
plt.plot(sin.cpu().numpy())
plt.plot(cos.cpu().numpy())
plt.show()
{% endraw %} {% raw %}

linear_encoding[source]

linear_encoding(seq_len, device=None, to_np=False, lin_range=(-1, 1))

{% endraw %} {% raw %}
{% endraw %} {% raw %}
lin = linear_encoding(100)
plt.plot(lin.cpu().numpy())
plt.show()
{% endraw %} {% raw %}

encode_positions[source]

encode_positions(pos_arr, min_val=None, max_val=None, linear=False, lin_range=(-1, 1))

Encodes an array with positions using a linear or sincos methods

{% endraw %} {% raw %}
{% endraw %} {% raw %}
n_samples = 10
length = 500
_a = []
for i in range(n_samples):
    a = np.arange(-4000, 4000, 10)
    mask = np.random.rand(len(a)) > .5
    a = a[mask]
    a = np.concatenate([a, np.array([np.nan] * (length - len(a)))])
    _a.append(a.reshape(-1,1))
a = np.concatenate(_a, -1).transpose(1,0)
sin, cos = encode_positions(a, linear=False)
test_eq(a.shape, (n_samples, length))
test_eq(sin.shape, (n_samples, length))
test_eq(cos.shape, (n_samples, length))
plt.plot(sin.T)
plt.plot(cos.T)
plt.xlim(0, 500)
plt.show()
{% endraw %} {% raw %}
n_samples = 10
length = 500
_a = []
for i in range(n_samples):
    a = np.arange(-4000, 4000, 10)
    mask = np.random.rand(len(a)) > .5
    a = a[mask]
    a = np.concatenate([a, np.array([np.nan] * (length - len(a)))])
    _a.append(a.reshape(-1,1))
a = np.concatenate(_a, -1).transpose(1,0)
lin = encode_positions(a, linear=True)
test_eq(a.shape, (n_samples, length))
test_eq(lin.shape, (n_samples, length))
plt.plot(lin.T)
plt.xlim(0, 500)
plt.show()
{% endraw %} {% raw %}

sort_generator[source]

sort_generator(generator, bs)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
generator = (i for i in np.random.permutation(np.arange(1000000)).tolist())
l = list(sort_generator(generator, 512))
test_eq(l[:512], sorted(l[:512]))
{% endraw %} {% raw %}

get_subset_dict[source]

get_subset_dict(d, keys)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
keys = string.ascii_lowercase
values = np.arange(len(keys))
d = {k:v for k,v in zip(keys,values)}
test_eq(get_subset_dict(d, ['a', 'k', 'j', 'e']), {'a': 0, 'k': 10, 'j': 9, 'e': 4})
{% endraw %} {% raw %}

create_dir[source]

create_dir(directory, verbose=True)

{% endraw %} {% raw %}

remove_dir[source]

remove_dir(directory, verbose=True)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
path = "wandb3/wandb2/wandb"
create_dir(path)
assert Path(path).exists()

paths = ["wandb3/wandb2/wandb", "wandb3/wandb2", "wandb"]
remove_dir(paths)
for p in paths: 
    assert not Path(p).exists()

path = "wandb3"
assert Path(path).exists()
remove_dir(path)
assert not Path(path).exists()
wandb3/wandb2/wandb directory created.
wandb3/wandb2/wandb directory removed.
wandb3/wandb2 directory removed.
wandb directory doesn't exist.
wandb3 directory removed.
{% endraw %} {% raw %}
create_dir('./test')
test directory created.
{% endraw %} {% raw %}
%%file ./test/mod_dev.py
a = 5
def fn(b): return a + b
Writing ./test/mod_dev.py
{% endraw %} {% raw %}
fname = "./test/mod_dev.py"
while True: 
    if fname[0] in "/ .": fname = fname.split(fname[0], 1)[1]
    else: break
if '/' in fname and fname.rsplit('/', 1)[0] not in sys.path: sys.path.append(fname.rsplit('/', 1)[0])
mod = import_file_as_module(fname)
test_eq(mod.fn(3), 8)
sys.path = sys.path[:-1]
remove_dir('./test/')
test directory removed.
{% endraw %} {% raw %}

class named_partial[source]

named_partial(name, func, *args, **kwargs)

Create a partial function with a name

{% endraw %} {% raw %}
{% endraw %} {% raw %}
def add_1(x, add=1): return x+add
test_eq(add_1(1), 2)
add_2 = partial(add_1, add=2)
test_eq(add_2(2), 4)
test_ne(str(add_2), "add_2")
add_2 = named_partial('add_2', add_1, add=2)
test_eq(add_2(2), 4)
test_eq(str(add_2), "add_2")

class _A():
    def __init__(self, add=1): self.add = add
    def __call__(self, x): return x + self.add
    
test_eq(_A()(1), 2)
_A2 = partial(_A, add=2)
test_eq(_A2()(1), 3)
test_ne(str(_A2), '_A2')
_A2 = named_partial('_A2', _A, add=2)
test_eq(_A2()(1), 3)
test_eq(str(_A2), '_A2')
{% endraw %} {% raw %}

yaml2dict[source]

yaml2dict(fname)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
%%file sweep_config.yaml

program: wandb_scripts/train_script.py          # (required) Path to training script.
method: bayes                                   # (required) Specify the search strategy: grid, random or bayes
parameters:                                     # (required) Specify parameters bounds to search.
   bs:
      values: [32, 64, 128]
   depth:
      values: [3, 6, 9, 12]
   fc_dropout:
      distribution: uniform
      min: 0.
      max: 0.5
   lr_max:
      values: [0.001, 0.003, 0.01, 0.03, 0.1]
   n_epoch:
      values: [10, 15, 20]
   nb_filters:
      values: [32, 64, 128]
name: LSST_sweep_01
metric: 
   name: accuracy                              # This must match one of the metrics in the training script
   goal: maximize
early_terminate: 
   type: hyperband
   min_iter: 3
project: LSST_wandb_hpo
Writing sweep_config.yaml
{% endraw %} {% raw %}
fname = "sweep_config.yaml"
sweep_config = yaml2dict(fname)
print(sweep_config)
test_eq(sweep_config.method, 'bayes')
test_eq(sweep_config['metric'], {'name': 'accuracy', 'goal': 'maximize'})
os.remove(fname)
{'program': 'wandb_scripts/train_script.py', 'method': 'bayes', 'parameters': {'bs': {'values': [32, 64, 128]}, 'depth': {'values': [3, 6, 9, 12]}, 'fc_dropout': {'distribution': 'uniform', 'min': 0.0, 'max': 0.5}, 'lr_max': {'values': [0.001, 0.003, 0.01, 0.03, 0.1]}, 'n_epoch': {'values': [10, 15, 20]}, 'nb_filters': {'values': [32, 64, 128]}}, 'name': 'LSST_sweep_01', 'metric': {'name': 'accuracy', 'goal': 'maximize'}, 'early_terminate': {'type': 'hyperband', 'min_iter': 3}, 'project': 'LSST_wandb_hpo'}
{% endraw %} {% raw %}

str2list[source]

str2list(o)

{% endraw %} {% raw %}

str2index[source]

str2index(o)

{% endraw %} {% raw %}

get_cont_cols[source]

get_cont_cols(df)

{% endraw %} {% raw %}

get_cat_cols[source]

get_cat_cols(df)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
{% endraw %} {% raw %}

get_mapping[source]

get_mapping(arr, dim=1, return_counts=False)

{% endraw %} {% raw %}

map_array[source]

map_array(arr, dim=1)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.asarray(alphabet[np.random.randint(0,15,30)]).reshape(10,3)
b = np.asarray(ALPHABET[np.random.randint(6,10,30)]).reshape(10,3)
x = concat(a,b,dim=1)
maps, counts = get_mapping(x, dim=1, return_counts=True)
x, maps, counts
(array([['c', 'o', 'j', 'I', 'J', 'J'],
        ['e', 'j', 'f', 'H', 'J', 'H'],
        ['m', 'l', 'a', 'H', 'I', 'G'],
        ['b', 'n', 'g', 'G', 'I', 'G'],
        ['o', 'c', 'c', 'G', 'G', 'I'],
        ['h', 'a', 'l', 'I', 'J', 'G'],
        ['g', 'g', 'l', 'G', 'J', 'I'],
        ['d', 'a', 'g', 'I', 'H', 'I'],
        ['a', 'k', 'h', 'J', 'J', 'I'],
        ['a', 'd', 'd', 'J', 'H', 'G']], dtype='<U1'),
 [(#9) ['a','b','c','d','e','g','h','m','o'],
  (#9) ['a','c','d','g','j','k','l','n','o'],
  (#8) ['a','c','d','f','g','h','j','l'],
  (#4) ['G','H','I','J'],
  (#4) ['G','H','I','J'],
  (#4) ['G','H','I','J']],
 [9, 9, 8, 4, 4, 4])
{% endraw %} {% raw %}
x = np.asarray(alphabet[np.random.randint(0,15,30)]).reshape(10,3)
x, map_array(x), map_array(x, 1)
(array([['e', 'c', 'f'],
        ['g', 'c', 'n'],
        ['b', 'l', 'k'],
        ['d', 'o', 'd'],
        ['c', 'a', 'n'],
        ['b', 'l', 'l'],
        ['m', 'h', 'f'],
        ['g', 'a', 'j'],
        ['j', 'l', 'm'],
        ['d', 'h', 'm']], dtype='<U1'),
 array([[3, 1, 1],
        [4, 1, 6],
        [0, 3, 3],
        [2, 4, 0],
        [1, 0, 6],
        [0, 3, 4],
        [6, 2, 1],
        [4, 0, 2],
        [5, 3, 5],
        [2, 2, 5]]),
 array([[3, 1, 1],
        [4, 1, 6],
        [0, 3, 3],
        [2, 4, 0],
        [1, 0, 6],
        [0, 3, 4],
        [6, 2, 1],
        [4, 0, 2],
        [5, 3, 5],
        [2, 2, 5]]))
{% endraw %} {% raw %}

log_tfm[source]

log_tfm(o, inplace=False)

Log transforms an array-like object with positive and/or negative values

{% endraw %} {% raw %}
{% endraw %} {% raw %}
arr = np.asarray([-1000, -100, -10, -1, 0, 1, 10, 100, 1000]).astype(float)
plt.plot(arr, log_tfm(arr, False))
plt.show()
{% endraw %} {% raw %}
t = tensor([-1000, -100, -10, -1, 0, 1, 10, 100, 1000]).float()
plt.plot(t, log_tfm(t, False))
plt.show()
{% endraw %} {% raw %}

to_sincos_time[source]

to_sincos_time(arr, max_value)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
arr = np.sort(np.random.rand(100) * 5)
arr_sin, arr_cos = to_sincos_time(arr, 5)
plt.scatter(arr, arr_sin)
plt.scatter(arr, arr_cos)
plt.show()
{% endraw %} {% raw %}

plot_feature_dist[source]

plot_feature_dist(X, percentiles=[0, 0.1, 0.5, 1, 5, 10, 25, 50, 75, 90, 95, 99, 99.5, 99.9, 100])

{% endraw %} {% raw %}
{% endraw %} {% raw %}
arr = np.random.rand(10, 3, 100)
plot_feature_dist(arr, percentiles=[0,0.1,0.5,1,5,10,25,50,75,90,95,99,99.5,99.9,100])
{% endraw %} {% raw %}

rolling_moving_average[source]

rolling_moving_average(o, window=2)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.arange(60).reshape(2,3,10).astype(float)
t = torch.arange(60).reshape(2,3,10).float()
test_close(rolling_moving_average(a, window=3), rolling_moving_average(t, window=3).numpy())
print(t)
print(rolling_moving_average(t, window=3))
tensor([[[ 0.,  1.,  2.,  3.,  4.,  5.,  6.,  7.,  8.,  9.],
         [10., 11., 12., 13., 14., 15., 16., 17., 18., 19.],
         [20., 21., 22., 23., 24., 25., 26., 27., 28., 29.]],

        [[30., 31., 32., 33., 34., 35., 36., 37., 38., 39.],
         [40., 41., 42., 43., 44., 45., 46., 47., 48., 49.],
         [50., 51., 52., 53., 54., 55., 56., 57., 58., 59.]]])
tensor([[[ 0.0000,  0.5000,  1.0000,  2.0000,  3.0000,  4.0000,  5.0000,
           6.0000,  7.0000,  8.0000],
         [10.0000, 10.5000, 11.0000, 12.0000, 13.0000, 14.0000, 15.0000,
          16.0000, 17.0000, 18.0000],
         [20.0000, 20.5000, 21.0000, 22.0000, 23.0000, 24.0000, 25.0000,
          26.0000, 27.0000, 28.0000]],

        [[30.0000, 30.5000, 31.0000, 32.0000, 33.0000, 34.0000, 35.0000,
          36.0000, 37.0000, 38.0000],
         [40.0000, 40.5000, 41.0000, 42.0000, 43.0000, 44.0000, 45.0000,
          46.0000, 47.0000, 48.0000],
         [50.0000, 50.5000, 51.0000, 52.0000, 53.0000, 54.0000, 55.0000,
          56.0000, 57.0000, 58.0000]]])
{% endraw %} {% raw %}

ffill_sequence[source]

ffill_sequence(o)

Forward fills an array-like object alongside sequence dimension

{% endraw %} {% raw %}

bfill_sequence[source]

bfill_sequence(o)

Backward fills an array-like object alongside sequence dimension

{% endraw %} {% raw %}

fbfill_sequence[source]

fbfill_sequence(o)

Forward and backward fills an array-like object alongside sequence dimension

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.arange(80).reshape(2, 4, 10).astype(float)
mask = np.random.rand(*a.shape)
a[mask > .8] = np.nan
t = torch.from_numpy(a)
t
tensor([[[ 0., nan, nan,  3.,  4.,  5.,  6.,  7.,  8.,  9.],
         [10., nan, 12., 13., 14., 15., 16., nan, 18., 19.],
         [nan, 21., nan, nan, nan, 25., 26., 27., 28., 29.],
         [30., 31., 32., nan, 34., 35., 36., 37., 38., 39.]],

        [[40., nan, nan, 43., 44., 45., nan, nan, 48., 49.],
         [50., 51., nan, 53., 54., 55., 56., 57., nan, 59.],
         [60., 61., 62., 63., 64., 65., 66., 67., 68., 69.],
         [70., nan, nan, nan, 74., 75., 76., 77., 78., 79.]]],
       dtype=torch.float64)
{% endraw %} {% raw %}
filled_a = ffill_sequence(a)
print(filled_a)
m = np.isnan(filled_a)
test_eq(filled_a[~m], ffill_sequence(t).numpy()[~m])
[[[ 0.  0.  0.  3.  4.  5.  6.  7.  8.  9.]
  [10. 10. 12. 13. 14. 15. 16. 16. 18. 19.]
  [nan 21. 21. 21. 21. 25. 26. 27. 28. 29.]
  [30. 31. 32. 32. 34. 35. 36. 37. 38. 39.]]

 [[40. 40. 40. 43. 44. 45. 45. 45. 48. 49.]
  [50. 51. 51. 53. 54. 55. 56. 57. 57. 59.]
  [60. 61. 62. 63. 64. 65. 66. 67. 68. 69.]
  [70. 70. 70. 70. 74. 75. 76. 77. 78. 79.]]]
{% endraw %} {% raw %}
filled_a = bfill_sequence(a)
print(filled_a)
m = np.isnan(filled_a)
test_eq(filled_a[~m], bfill_sequence(t).numpy()[~m])
[[[ 0.  3.  3.  3.  4.  5.  6.  7.  8.  9.]
  [10. 12. 12. 13. 14. 15. 16. 18. 18. 19.]
  [21. 21. 25. 25. 25. 25. 26. 27. 28. 29.]
  [30. 31. 32. 34. 34. 35. 36. 37. 38. 39.]]

 [[40. 43. 43. 43. 44. 45. 48. 48. 48. 49.]
  [50. 51. 53. 53. 54. 55. 56. 57. 59. 59.]
  [60. 61. 62. 63. 64. 65. 66. 67. 68. 69.]
  [70. 74. 74. 74. 74. 75. 76. 77. 78. 79.]]]
{% endraw %} {% raw %}
filled_a = fbfill_sequence(a)
print(filled_a)
m = np.isnan(filled_a)
test_eq(filled_a[~m], fbfill_sequence(t).numpy()[~m])
[[[ 0.  0.  0.  3.  4.  5.  6.  7.  8.  9.]
  [10. 10. 12. 13. 14. 15. 16. 16. 18. 19.]
  [21. 21. 21. 21. 21. 25. 26. 27. 28. 29.]
  [30. 31. 32. 32. 34. 35. 36. 37. 38. 39.]]

 [[40. 40. 40. 43. 44. 45. 45. 45. 48. 49.]
  [50. 51. 51. 53. 54. 55. 56. 57. 57. 59.]
  [60. 61. 62. 63. 64. 65. 66. 67. 68. 69.]
  [70. 70. 70. 70. 74. 75. 76. 77. 78. 79.]]]
{% endraw %} {% raw %}

dummify[source]

dummify(o:Union[ndarray, Tensor], by_var:bool=True, inplace:bool=False, skip:Optional[list]=None, random_state=None)

Shuffles an array-like object along all dimensions or dimension 1 (variables) if by_var is True.

{% endraw %} {% raw %}
{% endraw %} {% raw %}
arr = np.random.rand(2,3,10)
arr_original = arr.copy()
dummy_arr = dummify(arr)
test_ne(arr_original, dummy_arr)
test_eq(arr_original, arr)
dummify(arr, inplace=True)
test_ne(arr_original, arr)
{% endraw %} {% raw %}
t = torch.rand(2,3,10)
t_original = t.clone()
dummy_t = dummify(t)
test_ne(t_original, dummy_t)
test_eq(t_original, t)
dummify(t, inplace=True)
test_ne(t_original, t)
{% endraw %} {% raw %}

shuffle_along_axis[source]

shuffle_along_axis(o, axis=-1, random_state=None)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
X = np.arange(60).reshape(2,3,10) + 10
X_shuffled = shuffle_along_axis(X,(0, -1), random_state=23)
test_eq(X_shuffled, np.array([[[13, 15, 41, 14, 40, 49, 18, 42, 47, 46],
                               [28, 56, 53, 50, 52, 25, 24, 57, 51, 59],
                               [34, 30, 38, 35, 69, 66, 63, 67, 61, 62]],

                              [[19, 10, 11, 16, 43, 12, 17, 48, 45, 44],
                               [23, 20, 26, 22, 21, 27, 58, 29, 54, 55],
                               [36, 31, 39, 60, 33, 68, 37, 32, 65, 64]]]))
{% endraw %} {% raw %}

analyze_feature[source]

analyze_feature(feature, bins=100, density=False, feature_name=None, clip_outliers_plot=False, quantile_range=(25.0, 75.0), percentiles=[1, 25, 50, 75, 99], text_len=12, figsize=(10, 6))

{% endraw %} {% raw %}

analyze_array[source]

analyze_array(o, bins=100, density=False, feature_names=None, clip_outliers_plot=False, quantile_range=(25.0, 75.0), percentiles=[1, 25, 50, 75, 99], text_len=12, figsize=(10, 6))

{% endraw %} {% raw %}
{% endraw %} {% raw %}
x = np.random.normal(size=(1000))
analyze_array(x)
 array shape: (1000,)
       dtype: float64
  nan values: 0.0%
         max: 2.834536550458444
           1: -2.233252886632196
          25: -0.7130652632718217
          50: -0.010035608009108268
          75: 0.6745471889321126
          99: 2.147397831090856
         min: -2.795074934711634
 outlier min: -2.794483941577723
 outlier max: 2.755965867238014
    outliers: 0.2%
        mean: -0.01612382257136098
         std: 0.9823579834092578
 normal dist: True
{% endraw %} {% raw %}
x1 = np.random.normal(size=(1000,2))
x2 = np.random.normal(3, 5, size=(1000,2))
x = x1 + x2
analyze_array(x)
 array shape: (1000, 2)

  0  feature: 0

       dtype: float64
  nan values: 0.0%
         max: 20.196476714548915
           1: -9.482577029734582
          25: -0.23112040976077006
          50: 3.1406246568751244
          75: 6.554752047140204
          99: 14.547035923148922
         min: -13.69039751989469
 outlier min: -10.409929095112231
 outlier max: 16.733560732491664
    outliers: 1.2%
        mean: 3.1273432578218303
         std: 5.116288270936447
 normal dist: True
  1  feature: 1

       dtype: float64
  nan values: 0.0%
         max: 19.83438015695223
           1: -8.189544500980482
          25: 0.06425629228048195
          50: 3.2429118652926157
          75: 6.474789395503235
          99: 15.832992606037855
         min: -11.771309043369433
 outlier min: -9.551543362553646
 outlier max: 16.090589050337364
    outliers: 1.2%
        mean: 3.212784469110814
         std: 4.940812134347808
 normal dist: True
{% endraw %} {% raw %}

get_relpath[source]

get_relpath(path)

{% endraw %} {% raw %}
{% endraw %} {% raw %}

split_in_chunks[source]

split_in_chunks(o, chunksize, start=0, shuffle=False, drop_last=False)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.arange(5, 15)
test_eq(split_in_chunks(a, 3, drop_last=False), [array([5, 6, 7]), array([ 8,  9, 10]), array([11, 12, 13]), array([14])])
test_eq(split_in_chunks(a, 3, drop_last=True), [array([5, 6, 7]), array([ 8,  9, 10]), array([11, 12, 13])])
test_eq(split_in_chunks(a, 3, start=2, drop_last=True), [array([7, 8, 9]), array([10, 11, 12])])
{% endraw %} {% raw %}

save_object[source]

save_object(o, file_path, verbose=True)

{% endraw %} {% raw %}

load_object[source]

load_object(file_path)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
split = np.arange(100)
save_object(split, file_path='data/test')
split2 = load_object('data/test.pkl')
test_eq(split, split2)
data directory already exists.
ndarray saved as data/test.pkl
{% endraw %} {% raw %}
splits = L([[[0,1,2,3,4], [5,6,7,8,9]],[[10,11,12,13,14], [15,16,17,18,19]]])
save_object(splits, file_path=Path('data/test'))
splits2 = load_object('data/test')
test_eq(splits, splits2)
data directory already exists.
L saved as data/test.pkl
{% endraw %} {% raw %}

get_idxs_to_keep[source]

get_idxs_to_keep(o, cond, crit='all', invert=False, axis=(1, 2), keepdims=False)

{% endraw %} {% raw %}
{% endraw %} {% raw %}
a = np.random.rand(100, 2, 10)
a[a > .95] = np.nan
idxs_to_keep = get_idxs_to_keep(a, np.isfinite)
if idxs_to_keep.size>0: 
    test_eq(np.isnan(a[idxs_to_keep]).sum(), 0)
{% endraw %}