Source code for distil.active_learning_strategies.badge

from .strategy import Strategy
import numpy as np

from torch.utils.data import DataLoader

import pickle
from scipy.spatial.distance import cosine
import sys
import gc
from scipy.linalg import det
from scipy.linalg import pinv as inv
from copy import copy as copy
from copy import deepcopy as deepcopy
import torch
from torch import nn
import random
import math

from torch.nn import functional as F
import argparse
import torch.nn as nn
from collections import OrderedDict
from scipy import stats
import time
import numpy as np
import scipy.sparse as sp
from itertools import product
from sklearn.base import BaseEstimator, ClusterMixin, TransformerMixin
from sklearn.metrics.pairwise import euclidean_distances
from sklearn.metrics.pairwise import pairwise_distances_argmin_min
from sklearn.utils.extmath import row_norms, squared_norm, stable_cumsum
from sklearn.utils.sparsefuncs_fast import assign_rows_csr
from sklearn.utils.sparsefuncs import mean_variance_axis
from sklearn.utils.validation import _num_samples
from sklearn.utils import check_array
from sklearn.utils import gen_batches
from sklearn.utils import check_random_state
from sklearn.utils.validation import check_is_fitted
from sklearn.utils.validation import FLOAT_DTYPES
from sklearn.metrics.pairwise import rbf_kernel as rbf
#from sklearn.externals.six import string_types
from sklearn.exceptions import ConvergenceWarning
from sklearn.metrics import pairwise_distances


def init_centers(X, K):
    device = "cuda" if torch.cuda.is_available() else "cpu"
    pdist = nn.PairwiseDistance(p=2)
    ind = np.argmax([np.linalg.norm(s, 2) for s in X])
    mu = [X[ind]]
    indsAll = [ind]
    centInds = [0.] * len(X)
    cent = 0
    #print('#Samps\tTotal Distance')
    while len(mu) < K:
        if len(mu) == 1:
            t0 = time.time()
            D2 = pdist(torch.from_numpy(X).to(device), torch.from_numpy(mu[-1]).to(device))
            D2 = torch.flatten(D2)
            D2 = D2.cpu().numpy().astype(float)
        else:
            newD = pdist(torch.from_numpy(X).to(device), torch.from_numpy(mu[-1]).to(device))
            newD = torch.flatten(newD)
            newD = newD.cpu().numpy().astype(float)
            for i in range(len(X)):
                if D2[i] >  newD[i]:
                    centInds[i] = cent
                    D2[i] = newD[i]

        if sum(D2) == 0.0: pdb.set_trace()
        D2 = D2.ravel().astype(float)
        Ddist = (D2 ** 2)/ sum(D2 ** 2)
        customDist = stats.rv_discrete(name='custm', values=(np.arange(len(D2)), Ddist))
        ind = customDist.rvs(size=1)[0]
        mu.append(X[ind])
        indsAll.append(ind)
        cent += 1
    #gram = np.matmul(X[indsAll], X[indsAll].T)
    #val, _ = np.linalg.eig(gram)
    #val = np.abs(val)
    #vgt = val[val > 1e-2]
    return indsAll

[docs]class BADGE(Strategy): """ Implementation of Deep Batch Active Learning by Diverse, Uncertain Gradient Lower Bounds (BADGE) :footcite:`DBLP:journals/corr/abs-1906-03671` Strategy. This class extends : class:`active_learning_strategies.strategy.Strategy`. This method is based on the paper `Deep Batch Active Learning by Diverse, Uncertain Gradient Lower Bounds <https://arxiv.org/abs/1906.03671>`_ According to the paper, Batch Active learning by Diverse Gradient Embeddings (BADGE), samples groups of points that are disparate and high magnitude when represented in a hallucinated gradient space, a strategy designed to incorporate both predictive uncertainty and sample diversity into every selected batch. Crucially, BADGE trades off between uncertainty and diversity without requiring any hand-tuned hyperparameters. Here at each round of selection, loss gradients are computed using the hypothesised labels. Then to select the points to be labeled are selected by applying k-means++ on these loss gradients. Parameters ---------- X: Numpy array Features of the labled set of points Y: Numpy array Lables of the labled set of points unlabeled_x: Numpy array Features of the unlabled set of points net: class object Model architecture used for training. Could be instance of models defined in `distil.utils.models` or something similar. handler: class object It should be a subclasses of torch.utils.data.Dataset i.e, have __getitem__ and __len__ methods implemented, so that is could be passed to pytorch DataLoader.Could be instance of handlers defined in `distil.utils.DataHandler` or something similar. nclasses: int No. of classes in tha dataset args: dictionary This dictionary should have 'batch_size' as a key. """ def __init__(self, X, Y, unlabeled_x, net, handler,nclasses, args): super(BADGE, self).__init__(X, Y, unlabeled_x, net, handler,nclasses, args)
[docs] def select_per_batch(self, budget, batch_size): """ Select points to label by using per-batch BADGE strategy Parameters ---------- budget : int Number of indices to be selected from unlabeled set batch_size : TYPE Size of batches to form Returns ------- chosen: list List of selected data point indices with respect to unlabeled_x """ if torch.cuda.is_available(): device = "cuda" else: device = "cpu" # Compute gradient embeddings of each unlabeled point grad_embedding = self.get_grad_embedding(self.unlabeled_x,bias_grad=False) # Calculate number of batches to choose from, embedding dimension, and adjusted budget num_batches = math.ceil(grad_embedding.shape[0] / batch_size) embed_dim = grad_embedding.shape[1] batch_budget = math.ceil(budget / batch_size) # Instantiate list of lists of indices drawn from the possible range of the gradient embedding batch_indices_list = [] draw_without_replacement = list(range(grad_embedding.shape[0])) while len(draw_without_replacement) > 0: if len(draw_without_replacement) < batch_size: batch_random_sample = draw_without_replacement else: batch_random_sample = random.sample(draw_without_replacement, batch_size) batch_indices_list.append(batch_random_sample) for index in batch_random_sample: draw_without_replacement.remove(index) # Instantiate batch average tensor gradBatchEmbedding = torch.zeros([num_batches, embed_dim]).to(device) # Calculate the average vector embedding of each batch for i in range(num_batches): indices = batch_indices_list[i] vec_avg = torch.zeros(embed_dim).to(device) for index in indices: vec_avg = vec_avg + grad_embedding[index] vec_avg = vec_avg / len(indices) gradBatchEmbedding[i] = vec_avg # Perform initial centers problem using new budget chosen_batch = init_centers(gradBatchEmbedding.cpu().numpy(), batch_budget) # For each chosen batch, construct the list of indices to return. chosen = [] for batch_index in chosen_batch: indices_to_add = batch_indices_list[batch_index] chosen.extend(indices_to_add) return chosen
[docs] def select(self, budget): """ Select next set of points Parameters ---------- budget: int Number of indexes to be returned for next set Returns ---------- chosen: list List of selected data point indexes with respect to unlabeled_x """ gradEmbedding = self.get_grad_embedding(self.unlabeled_x,bias_grad=False) chosen = init_centers(gradEmbedding.cpu().numpy(), budget) return chosen