Module ktrain.text.shallownlp.classifier
Expand source code
from .imports import *
from . import utils as U
__all__ = ['NBSVM']
class Classifier:
def __init__(self, model=None):
"""
instantiate a classifier with an optional previously-saved model
"""
self.model = None
def create_model(self, ctype, texts, hp_dict={}, ngram_range=(1,3), binary=True):
"""
```
create a model
Args:
ctype(str): one of {'nbsvm', 'logreg', 'sgdclassifier'}
texts(list): list of texts
hp_dict(dict): dictionary of hyperparameters to use for the ctype selected.
hp_dict can also be used to supply arguments to CountVectorizer
ngram_range(tuple): default ngram_range.
overridden if 'ngram_range' in hp_dict
binary(bool): default value for binary argument to CountVectorizer.
overridden if 'binary' key in hp_dict
```
"""
lang = U.detect_lang(texts)
if U.is_chinese(lang):
token_pattern = r'(?u)\b\w+\b'
else:
token_pattern = r'\w+|[%s]' % string.punctuation
if ctype == 'nbsvm':
clf = NBSVM(C=hp_dict.get('C', 0.01),
alpha=hp_dict.get('alpha', 0.75),
beta=hp_dict.get('beta', 0.25),
fit_intercept=hp_dict.get('fit_intercept', False))
elif ctype=='logreg':
clf = LogisticRegression(C=hp_dict.get('C', 0.1),
dual=hp_dict.get('dual', True),
penalty=hp_dict.get('penalty', 'l2'),
tol=hp_dict.get('tol', 1e-4),
intercept_scaling=hp_dict.get('intercept_scaling', 1),
solver=hp_dict.get('solver', 'liblinear'),
max_iter=hp_dict.get('max_iter', 100),
multi_class=hp_dict.get('multi_class', 'auto'),
warm_start=hp_dict.get('warm_start', False),
n_jobs=hp_dict.get('n_jobs', None),
l1_ratio=hp_dict.get('l1_ratio', None),
random_state=hp_dict.get('random_state', 42),
class_weight=hp_dict.get('class_weight', None)
)
elif ctype == 'sgdclassifier':
clf = SGDClassifier(loss=hp_dict.get('loss', 'hinge'),
penalty=hp_dict.get('penalty', 'l2'),
alpha=hp_dict.get('alpha', 1e-3),
random_state=hp_dict.get('random_state', 42),
max_iter=hp_dict.get('max_iter', 5), # scikit-learn default is 1000
tol=hp_dict.get('tol', None),
l1_ratio=hp_dict.get('l1_ratio', 0.15),
fit_intercept=hp_dict.get('fit_intercept', True),
episilon=hp_dict.get('epsilon', 0.1),
n_jobs=hp_dict.get('n_jobs', None),
learning_rate=hp_dict.get('learning_rate', 'optimal'),
eta0=hp_dict.get('eta0', 0.0),
power_t=hp_dict.get('power_t', 0.5),
early_stopping=hp_dict.get('early_stopping', False),
validation_fraction=hp_dict.get('validation_fraction', 0.1),
n_iter_no_change=hp_dict.get('n_iter_no_change', 5),
warm_start=hp_dict.get('warm_start', False),
average=hp_dict.get('average', False),
class_weight=hp_dict.get('class_weight', None))
else:
raise ValueError('Unknown ctype: %s' % (ctype))
self.model = Pipeline([ ('vect', CountVectorizer(ngram_range=hp_dict.get('ngram_range', ngram_range),
binary=hp_dict.get('binary', binary),
token_pattern=token_pattern,
max_features=hp_dict.get('max_features', None),
max_df=hp_dict.get('max_df', 1.0),
min_df=hp_dict.get('min_df', 1),
stop_words=hp_dict.get('stop_words', None),
lowercase=hp_dict.get('lowercase', True),
strip_accents=hp_dict.get('strip_accents', None),
encoding=hp_dict.get('encoding', 'utf-8')
)),
('clf', clf) ])
return
@classmethod
def load_texts_from_folder(cls, folder_path,
subfolders=None,
shuffle=True,
encoding=None):
"""
```
load text files from folder
Args:
folder_path(str): path to folder containing documents
The supplied folder should contain a subfolder
for each category, which will be used as the class label
subfolders(list): list of subfolders under folder_path to consider
Example: If folder_path contains subfolders pos, neg, and
unlabeled, then unlabeled folder can be ignored by
setting subfolders=['pos', 'neg']
shuffle(bool): If True, list of texts will be shuffled
encoding(str): encoding to use. default:None (auto-detected)
Returns:
tuple: (texts, labels, label_names)
```
"""
bunch = load_files(folder_path, categories=subfolders, shuffle=shuffle)
texts = bunch.data
labels = bunch.target
label_names = bunch.target_names
#print('target names:')
#for idx, label_name in enumerate(bunch.target_names):
#print('\t%s:%s' % (idx, label_name))
# decode based on supplied encoding
if encoding is None:
encoding = U.detect_encoding(texts)
if encoding != 'utf-8':
print('detected encoding: %s' % (encoding))
try:
texts = [text.decode(encoding) for text in texts]
except:
print('Decoding with %s failed 1st attempt - using %s with skips' % (encoding,
encoding))
texts = U.decode_by_line(texts, encoding=encoding)
return (texts, labels, label_names)
@classmethod
def load_texts_from_csv(cls, csv_filepath, text_column='text', label_column='label',
sep=',', encoding=None):
"""
```
load text files from csv file
CSV should have at least two columns.
Example:
Text | Label
I love this movie. | positive
I hated this movie.| negative
Args:
csv_filepath(str): path to CSV file
text_column(str): name of column containing the texts. default:'text'
label_column(str): name of column containing the labels in string format
default:'label'
sep(str): character that separates columns in CSV. default:','
encoding(str): encoding to use. default:None (auto-detected)
Returns:
tuple: (texts, labels, label_names)
```
"""
if encoding is None:
with open(csv_filepath, 'rb') as f:
encoding = U.detect_encoding([f.read()])
if encoding != 'utf-8':
print('detected encoding: %s (if wrong, set manually)' % (encoding))
import pandas as pd
df = pd.read_csv(csv_filepath, encoding=encoding, sep=sep)
texts = df[text_column].fillna('fillna').values
labels = df[label_column].values
le = LabelEncoder()
le.fit(labels)
labels = le.transform(labels)
return (texts, labels, le.classes_)
def fit(self, x_train, y_train, ctype='logreg'):
"""
```
train a classifier
Args:
x_train(list or np.ndarray): training texts
y_train(np.ndarray): training labels
ctype(str): One of {'logreg', 'nbsvm', 'sgdclassifier'}. default:nbsvm
```
"""
lang = U.detect_lang(x_train)
if U.is_chinese(lang):
x_train = U.split_chinese(x_train)
if self.model is None:
self.create_model(ctype, x_train)
self.model.fit(x_train, y_train)
return self
def predict(self, x_test, return_proba=False):
"""
```
make predictions on text data
Args:
x_test(list or np.ndarray or str): array of texts on which to make predictions or a string representing text
```
"""
if return_proba and not hasattr(self.model['clf'], 'predict_proba'):
raise ValueError('%s does not support predict_proba' % (type(self.model['clf']).__name__))
if isinstance(x_test, str): x_test = [x_test]
lang = U.detect_lang(x_test)
if U.is_chinese(lang): x_test = U.split_chinese(x_test)
if self.model is None: raise ValueError('model is None - call fit or load to set the model')
if return_proba:
predicted = self.model.predict_proba(x_test)
else:
predicted = self.model.predict(x_test)
if len(predicted) == 1: predicted = predicted[0]
return predicted
def predict_proba(self, x_test):
"""
predict_proba
"""
return self.predict(x_test, return_proba=True)
def evaluate(self, x_test, y_test):
"""
```
evaluate
Args:
x_test(list or np.ndarray): training texts
y_test(np.ndarray): training labels
```
"""
predicted = self.predict(x_test)
return np.mean(predicted == y_test)
def save(self, filename):
"""
save model
"""
dump(self.model, filename)
def load(self, filename):
"""
load model
"""
self.model = load(filename)
def grid_search(self, params, x_train, y_train, n_jobs=-1):
"""
```
Performs grid search to find optimal set of hyperparameters
Args:
params (dict): A dictionary defining the space of the search.
Example for finding optimal value of alpha in NBSVM:
parameters = {
#'clf__C': (1e0, 1e-1, 1e-2),
'clf__alpha': (0.1, 0.2, 0.4, 0.5, 0.75, 0.9, 1.0),
#'clf__fit_intercept': (True, False),
#'clf__beta' : (0.1, 0.25, 0.5, 0.9)
}
n_jobs(int): number of jobs to run in parallel. default:-1 (use all processors)
```
"""
gs_clf = GridSearchCV(self.model, params, n_jobs=n_jobs)
gs_clf = gs_clf.fit(x_train, y_train)
#gs_clf.best_score_
for param_name in sorted(params.keys()):
print("%s: %r" % (param_name, gs_clf.best_params_[param_name]))
return
class NBSVM(BaseEstimator, LinearClassifierMixin, SparseCoefMixin):
def __init__(self, alpha=1, C=1, beta=0.25, fit_intercept=False):
self.alpha = alpha
self.C = C
self.beta = beta
self.fit_intercept = fit_intercept
def fit(self, X, y):
self.classes_ = np.unique(y)
if len(self.classes_) == 2:
coef_, intercept_ = self._fit_binary(X, y)
self.coef_ = coef_
self.intercept_ = intercept_
else:
coef_, intercept_ = zip(*[
self._fit_binary(X, y == class_)
for class_ in self.classes_
])
self.coef_ = np.concatenate(coef_)
self.intercept_ = np.array(intercept_).flatten()
return self
def _fit_binary(self, X, y):
p = np.asarray(self.alpha + X[y == 1].sum(axis=0)).flatten()
q = np.asarray(self.alpha + X[y == 0].sum(axis=0)).flatten()
r = np.log(p/np.abs(p).sum()) - np.log(q/np.abs(q).sum())
b = np.log((y == 1).sum()) - np.log((y == 0).sum())
if isinstance(X, spmatrix):
indices = np.arange(len(r))
r_sparse = coo_matrix(
(r, (indices, indices)),
shape=(len(r), len(r))
)
X_scaled = X * r_sparse
else:
X_scaled = X * r
lsvc = LinearSVC(
C=self.C,
fit_intercept=self.fit_intercept,
max_iter=10000
).fit(X_scaled, y)
mean_mag = np.abs(lsvc.coef_).mean()
coef_ = (1 - self.beta) * mean_mag * r + \
self.beta * (r * lsvc.coef_)
intercept_ = (1 - self.beta) * mean_mag * b + \
self.beta * lsvc.intercept_
return coef_, intercept_
Classes
class NBSVM (alpha=1, C=1, beta=0.25, fit_intercept=False)
-
Base class for all estimators in scikit-learn.
Notes
All estimators should specify all the parameters that can be set at the class level in their
__init__
as explicit keyword arguments (no*args
or**kwargs
).Expand source code
class NBSVM(BaseEstimator, LinearClassifierMixin, SparseCoefMixin): def __init__(self, alpha=1, C=1, beta=0.25, fit_intercept=False): self.alpha = alpha self.C = C self.beta = beta self.fit_intercept = fit_intercept def fit(self, X, y): self.classes_ = np.unique(y) if len(self.classes_) == 2: coef_, intercept_ = self._fit_binary(X, y) self.coef_ = coef_ self.intercept_ = intercept_ else: coef_, intercept_ = zip(*[ self._fit_binary(X, y == class_) for class_ in self.classes_ ]) self.coef_ = np.concatenate(coef_) self.intercept_ = np.array(intercept_).flatten() return self def _fit_binary(self, X, y): p = np.asarray(self.alpha + X[y == 1].sum(axis=0)).flatten() q = np.asarray(self.alpha + X[y == 0].sum(axis=0)).flatten() r = np.log(p/np.abs(p).sum()) - np.log(q/np.abs(q).sum()) b = np.log((y == 1).sum()) - np.log((y == 0).sum()) if isinstance(X, spmatrix): indices = np.arange(len(r)) r_sparse = coo_matrix( (r, (indices, indices)), shape=(len(r), len(r)) ) X_scaled = X * r_sparse else: X_scaled = X * r lsvc = LinearSVC( C=self.C, fit_intercept=self.fit_intercept, max_iter=10000 ).fit(X_scaled, y) mean_mag = np.abs(lsvc.coef_).mean() coef_ = (1 - self.beta) * mean_mag * r + \ self.beta * (r * lsvc.coef_) intercept_ = (1 - self.beta) * mean_mag * b + \ self.beta * lsvc.intercept_ return coef_, intercept_
Ancestors
- sklearn.base.BaseEstimator
- sklearn.linear_model._base.LinearClassifierMixin
- sklearn.base.ClassifierMixin
- sklearn.linear_model._base.SparseCoefMixin
Methods
def fit(self, X, y)
-
Expand source code
def fit(self, X, y): self.classes_ = np.unique(y) if len(self.classes_) == 2: coef_, intercept_ = self._fit_binary(X, y) self.coef_ = coef_ self.intercept_ = intercept_ else: coef_, intercept_ = zip(*[ self._fit_binary(X, y == class_) for class_ in self.classes_ ]) self.coef_ = np.concatenate(coef_) self.intercept_ = np.array(intercept_).flatten() return self