Module ktrain.text.shallownlp
Expand source code
from .classifier import Classifier
from .searcher import *
from .ner import NER
from .utils import sent_tokenize, extract_filenames, read_text
__all__ = ['Classifier',
'Searcher', 'search', 'find_chinese', 'find_arabic', 'find_russian', 'read_text',
'NER',
'sent_tokenize', 'extract_filenames', 'read_text']
Sub-modules
ktrain.text.shallownlp.classifier
ktrain.text.shallownlp.imports
ktrain.text.shallownlp.ner
ktrain.text.shallownlp.searcher
ktrain.text.shallownlp.utils
Functions
def extract_filenames(corpus_path, follow_links=False)
-
Expand source code
def extract_filenames(corpus_path, follow_links=False): if os.listdir(corpus_path) == []: raise ValueError("%s: path is empty" % corpus_path) for root, _, fnames in os.walk(corpus_path, followlinks=follow_links): for filename in fnames: try: yield os.path.join(root, filename) except Exception: continue
def find_arabic(s)
-
Expand source code
def find_arabic(s): return re.findall(r'[\u0600-\u06FF]+', s)
def find_chinese(s)
-
Expand source code
def find_chinese(s): return re.findall(r'[\u4e00-\u9fff]+', s)
def find_russian(s)
-
Expand source code
def find_russian(s): return find_cyrillic(s)
def read_text(filename)
-
Expand source code
def read_text(filename): with open(filename, 'rb') as f: text = f.read() encoding = detect_encoding([text]) try: decoded_text = text.decode(encoding) except: U.vprint('Decoding with %s failed 1st attempt - using %s with skips' % (encoding, encoding), verbose=verbose) decoded_text = decode_by_line(text, encoding=encoding) return decoded_text.strip()
def search(query, doc, case_sensitive=False, keys=[], progress=False)
-
Expand source code
def search(query, doc, case_sensitive=False, keys=[], progress=False): searcher = Searcher(query) return searcher.search(doc, case_sensitive=case_sensitive, keys=keys, progress=progress)
def sent_tokenize(text)
-
segment text into sentences
Expand source code
def sent_tokenize(text): """ segment text into sentences """ lang = detect_lang(text) sents = [] if is_chinese(lang): for sent in re.findall(u'[^!?。\.\!\?]+[!?。\.\!\?]?', text, flags=re.U): sents.append(sent) else: for paragraph in segmenter.process(text): for sentence in paragraph: sents.append(" ".join([t.value for t in sentence])) return sents
Classes
class Classifier (model=None)
-
instantiate a classifier with an optional previously-saved model
Expand source code
class Classifier: def __init__(self, model=None): """ instantiate a classifier with an optional previously-saved model """ self.model = None def create_model(self, ctype, texts, hp_dict={}, ngram_range=(1,3), binary=True): """ ``` create a model Args: ctype(str): one of {'nbsvm', 'logreg', 'sgdclassifier'} texts(list): list of texts hp_dict(dict): dictionary of hyperparameters to use for the ctype selected. hp_dict can also be used to supply arguments to CountVectorizer ngram_range(tuple): default ngram_range. overridden if 'ngram_range' in hp_dict binary(bool): default value for binary argument to CountVectorizer. overridden if 'binary' key in hp_dict ``` """ lang = U.detect_lang(texts) if U.is_chinese(lang): token_pattern = r'(?u)\b\w+\b' else: token_pattern = r'\w+|[%s]' % string.punctuation if ctype == 'nbsvm': clf = NBSVM(C=hp_dict.get('C', 0.01), alpha=hp_dict.get('alpha', 0.75), beta=hp_dict.get('beta', 0.25), fit_intercept=hp_dict.get('fit_intercept', False)) elif ctype=='logreg': clf = LogisticRegression(C=hp_dict.get('C', 0.1), dual=hp_dict.get('dual', True), penalty=hp_dict.get('penalty', 'l2'), tol=hp_dict.get('tol', 1e-4), intercept_scaling=hp_dict.get('intercept_scaling', 1), solver=hp_dict.get('solver', 'liblinear'), max_iter=hp_dict.get('max_iter', 100), multi_class=hp_dict.get('multi_class', 'auto'), warm_start=hp_dict.get('warm_start', False), n_jobs=hp_dict.get('n_jobs', None), l1_ratio=hp_dict.get('l1_ratio', None), random_state=hp_dict.get('random_state', 42), class_weight=hp_dict.get('class_weight', None) ) elif ctype == 'sgdclassifier': clf = SGDClassifier(loss=hp_dict.get('loss', 'hinge'), penalty=hp_dict.get('penalty', 'l2'), alpha=hp_dict.get('alpha', 1e-3), random_state=hp_dict.get('random_state', 42), max_iter=hp_dict.get('max_iter', 5), # scikit-learn default is 1000 tol=hp_dict.get('tol', None), l1_ratio=hp_dict.get('l1_ratio', 0.15), fit_intercept=hp_dict.get('fit_intercept', True), episilon=hp_dict.get('epsilon', 0.1), n_jobs=hp_dict.get('n_jobs', None), learning_rate=hp_dict.get('learning_rate', 'optimal'), eta0=hp_dict.get('eta0', 0.0), power_t=hp_dict.get('power_t', 0.5), early_stopping=hp_dict.get('early_stopping', False), validation_fraction=hp_dict.get('validation_fraction', 0.1), n_iter_no_change=hp_dict.get('n_iter_no_change', 5), warm_start=hp_dict.get('warm_start', False), average=hp_dict.get('average', False), class_weight=hp_dict.get('class_weight', None)) else: raise ValueError('Unknown ctype: %s' % (ctype)) self.model = Pipeline([ ('vect', CountVectorizer(ngram_range=hp_dict.get('ngram_range', ngram_range), binary=hp_dict.get('binary', binary), token_pattern=token_pattern, max_features=hp_dict.get('max_features', None), max_df=hp_dict.get('max_df', 1.0), min_df=hp_dict.get('min_df', 1), stop_words=hp_dict.get('stop_words', None), lowercase=hp_dict.get('lowercase', True), strip_accents=hp_dict.get('strip_accents', None), encoding=hp_dict.get('encoding', 'utf-8') )), ('clf', clf) ]) return @classmethod def load_texts_from_folder(cls, folder_path, subfolders=None, shuffle=True, encoding=None): """ ``` load text files from folder Args: folder_path(str): path to folder containing documents The supplied folder should contain a subfolder for each category, which will be used as the class label subfolders(list): list of subfolders under folder_path to consider Example: If folder_path contains subfolders pos, neg, and unlabeled, then unlabeled folder can be ignored by setting subfolders=['pos', 'neg'] shuffle(bool): If True, list of texts will be shuffled encoding(str): encoding to use. default:None (auto-detected) Returns: tuple: (texts, labels, label_names) ``` """ bunch = load_files(folder_path, categories=subfolders, shuffle=shuffle) texts = bunch.data labels = bunch.target label_names = bunch.target_names #print('target names:') #for idx, label_name in enumerate(bunch.target_names): #print('\t%s:%s' % (idx, label_name)) # decode based on supplied encoding if encoding is None: encoding = U.detect_encoding(texts) if encoding != 'utf-8': print('detected encoding: %s' % (encoding)) try: texts = [text.decode(encoding) for text in texts] except: print('Decoding with %s failed 1st attempt - using %s with skips' % (encoding, encoding)) texts = U.decode_by_line(texts, encoding=encoding) return (texts, labels, label_names) @classmethod def load_texts_from_csv(cls, csv_filepath, text_column='text', label_column='label', sep=',', encoding=None): """ ``` load text files from csv file CSV should have at least two columns. Example: Text | Label I love this movie. | positive I hated this movie.| negative Args: csv_filepath(str): path to CSV file text_column(str): name of column containing the texts. default:'text' label_column(str): name of column containing the labels in string format default:'label' sep(str): character that separates columns in CSV. default:',' encoding(str): encoding to use. default:None (auto-detected) Returns: tuple: (texts, labels, label_names) ``` """ if encoding is None: with open(csv_filepath, 'rb') as f: encoding = U.detect_encoding([f.read()]) if encoding != 'utf-8': print('detected encoding: %s (if wrong, set manually)' % (encoding)) import pandas as pd df = pd.read_csv(csv_filepath, encoding=encoding, sep=sep) texts = df[text_column].fillna('fillna').values labels = df[label_column].values le = LabelEncoder() le.fit(labels) labels = le.transform(labels) return (texts, labels, le.classes_) def fit(self, x_train, y_train, ctype='logreg'): """ ``` train a classifier Args: x_train(list or np.ndarray): training texts y_train(np.ndarray): training labels ctype(str): One of {'logreg', 'nbsvm', 'sgdclassifier'}. default:nbsvm ``` """ lang = U.detect_lang(x_train) if U.is_chinese(lang): x_train = U.split_chinese(x_train) if self.model is None: self.create_model(ctype, x_train) self.model.fit(x_train, y_train) return self def predict(self, x_test, return_proba=False): """ ``` make predictions on text data Args: x_test(list or np.ndarray or str): array of texts on which to make predictions or a string representing text ``` """ if return_proba and not hasattr(self.model['clf'], 'predict_proba'): raise ValueError('%s does not support predict_proba' % (type(self.model['clf']).__name__)) if isinstance(x_test, str): x_test = [x_test] lang = U.detect_lang(x_test) if U.is_chinese(lang): x_test = U.split_chinese(x_test) if self.model is None: raise ValueError('model is None - call fit or load to set the model') if return_proba: predicted = self.model.predict_proba(x_test) else: predicted = self.model.predict(x_test) if len(predicted) == 1: predicted = predicted[0] return predicted def predict_proba(self, x_test): """ predict_proba """ return self.predict(x_test, return_proba=True) def evaluate(self, x_test, y_test): """ ``` evaluate Args: x_test(list or np.ndarray): training texts y_test(np.ndarray): training labels ``` """ predicted = self.predict(x_test) return np.mean(predicted == y_test) def save(self, filename): """ save model """ dump(self.model, filename) def load(self, filename): """ load model """ self.model = load(filename) def grid_search(self, params, x_train, y_train, n_jobs=-1): """ ``` Performs grid search to find optimal set of hyperparameters Args: params (dict): A dictionary defining the space of the search. Example for finding optimal value of alpha in NBSVM: parameters = { #'clf__C': (1e0, 1e-1, 1e-2), 'clf__alpha': (0.1, 0.2, 0.4, 0.5, 0.75, 0.9, 1.0), #'clf__fit_intercept': (True, False), #'clf__beta' : (0.1, 0.25, 0.5, 0.9) } n_jobs(int): number of jobs to run in parallel. default:-1 (use all processors) ``` """ gs_clf = GridSearchCV(self.model, params, n_jobs=n_jobs) gs_clf = gs_clf.fit(x_train, y_train) #gs_clf.best_score_ for param_name in sorted(params.keys()): print("%s: %r" % (param_name, gs_clf.best_params_[param_name])) return
Static methods
def load_texts_from_csv(csv_filepath, text_column='text', label_column='label', sep=',', encoding=None)
-
load text files from csv file CSV should have at least two columns. Example: Text | Label I love this movie. | positive I hated this movie.| negative Args: csv_filepath(str): path to CSV file text_column(str): name of column containing the texts. default:'text' label_column(str): name of column containing the labels in string format default:'label' sep(str): character that separates columns in CSV. default:',' encoding(str): encoding to use. default:None (auto-detected) Returns: tuple: (texts, labels, label_names)
Expand source code
@classmethod def load_texts_from_csv(cls, csv_filepath, text_column='text', label_column='label', sep=',', encoding=None): """ ``` load text files from csv file CSV should have at least two columns. Example: Text | Label I love this movie. | positive I hated this movie.| negative Args: csv_filepath(str): path to CSV file text_column(str): name of column containing the texts. default:'text' label_column(str): name of column containing the labels in string format default:'label' sep(str): character that separates columns in CSV. default:',' encoding(str): encoding to use. default:None (auto-detected) Returns: tuple: (texts, labels, label_names) ``` """ if encoding is None: with open(csv_filepath, 'rb') as f: encoding = U.detect_encoding([f.read()]) if encoding != 'utf-8': print('detected encoding: %s (if wrong, set manually)' % (encoding)) import pandas as pd df = pd.read_csv(csv_filepath, encoding=encoding, sep=sep) texts = df[text_column].fillna('fillna').values labels = df[label_column].values le = LabelEncoder() le.fit(labels) labels = le.transform(labels) return (texts, labels, le.classes_)
def load_texts_from_folder(folder_path, subfolders=None, shuffle=True, encoding=None)
-
load text files from folder Args: folder_path(str): path to folder containing documents The supplied folder should contain a subfolder for each category, which will be used as the class label subfolders(list): list of subfolders under folder_path to consider Example: If folder_path contains subfolders pos, neg, and unlabeled, then unlabeled folder can be ignored by setting subfolders=['pos', 'neg'] shuffle(bool): If True, list of texts will be shuffled encoding(str): encoding to use. default:None (auto-detected) Returns: tuple: (texts, labels, label_names)
Expand source code
@classmethod def load_texts_from_folder(cls, folder_path, subfolders=None, shuffle=True, encoding=None): """ ``` load text files from folder Args: folder_path(str): path to folder containing documents The supplied folder should contain a subfolder for each category, which will be used as the class label subfolders(list): list of subfolders under folder_path to consider Example: If folder_path contains subfolders pos, neg, and unlabeled, then unlabeled folder can be ignored by setting subfolders=['pos', 'neg'] shuffle(bool): If True, list of texts will be shuffled encoding(str): encoding to use. default:None (auto-detected) Returns: tuple: (texts, labels, label_names) ``` """ bunch = load_files(folder_path, categories=subfolders, shuffle=shuffle) texts = bunch.data labels = bunch.target label_names = bunch.target_names #print('target names:') #for idx, label_name in enumerate(bunch.target_names): #print('\t%s:%s' % (idx, label_name)) # decode based on supplied encoding if encoding is None: encoding = U.detect_encoding(texts) if encoding != 'utf-8': print('detected encoding: %s' % (encoding)) try: texts = [text.decode(encoding) for text in texts] except: print('Decoding with %s failed 1st attempt - using %s with skips' % (encoding, encoding)) texts = U.decode_by_line(texts, encoding=encoding) return (texts, labels, label_names)
Methods
def create_model(self, ctype, texts, hp_dict={}, ngram_range=(1, 3), binary=True)
-
create a model Args: ctype(str): one of {'nbsvm', 'logreg', 'sgdclassifier'} texts(list): list of texts hp_dict(dict): dictionary of hyperparameters to use for the ctype selected. hp_dict can also be used to supply arguments to CountVectorizer ngram_range(tuple): default ngram_range. overridden if 'ngram_range' in hp_dict binary(bool): default value for binary argument to CountVectorizer. overridden if 'binary' key in hp_dict
Expand source code
def create_model(self, ctype, texts, hp_dict={}, ngram_range=(1,3), binary=True): """ ``` create a model Args: ctype(str): one of {'nbsvm', 'logreg', 'sgdclassifier'} texts(list): list of texts hp_dict(dict): dictionary of hyperparameters to use for the ctype selected. hp_dict can also be used to supply arguments to CountVectorizer ngram_range(tuple): default ngram_range. overridden if 'ngram_range' in hp_dict binary(bool): default value for binary argument to CountVectorizer. overridden if 'binary' key in hp_dict ``` """ lang = U.detect_lang(texts) if U.is_chinese(lang): token_pattern = r'(?u)\b\w+\b' else: token_pattern = r'\w+|[%s]' % string.punctuation if ctype == 'nbsvm': clf = NBSVM(C=hp_dict.get('C', 0.01), alpha=hp_dict.get('alpha', 0.75), beta=hp_dict.get('beta', 0.25), fit_intercept=hp_dict.get('fit_intercept', False)) elif ctype=='logreg': clf = LogisticRegression(C=hp_dict.get('C', 0.1), dual=hp_dict.get('dual', True), penalty=hp_dict.get('penalty', 'l2'), tol=hp_dict.get('tol', 1e-4), intercept_scaling=hp_dict.get('intercept_scaling', 1), solver=hp_dict.get('solver', 'liblinear'), max_iter=hp_dict.get('max_iter', 100), multi_class=hp_dict.get('multi_class', 'auto'), warm_start=hp_dict.get('warm_start', False), n_jobs=hp_dict.get('n_jobs', None), l1_ratio=hp_dict.get('l1_ratio', None), random_state=hp_dict.get('random_state', 42), class_weight=hp_dict.get('class_weight', None) ) elif ctype == 'sgdclassifier': clf = SGDClassifier(loss=hp_dict.get('loss', 'hinge'), penalty=hp_dict.get('penalty', 'l2'), alpha=hp_dict.get('alpha', 1e-3), random_state=hp_dict.get('random_state', 42), max_iter=hp_dict.get('max_iter', 5), # scikit-learn default is 1000 tol=hp_dict.get('tol', None), l1_ratio=hp_dict.get('l1_ratio', 0.15), fit_intercept=hp_dict.get('fit_intercept', True), episilon=hp_dict.get('epsilon', 0.1), n_jobs=hp_dict.get('n_jobs', None), learning_rate=hp_dict.get('learning_rate', 'optimal'), eta0=hp_dict.get('eta0', 0.0), power_t=hp_dict.get('power_t', 0.5), early_stopping=hp_dict.get('early_stopping', False), validation_fraction=hp_dict.get('validation_fraction', 0.1), n_iter_no_change=hp_dict.get('n_iter_no_change', 5), warm_start=hp_dict.get('warm_start', False), average=hp_dict.get('average', False), class_weight=hp_dict.get('class_weight', None)) else: raise ValueError('Unknown ctype: %s' % (ctype)) self.model = Pipeline([ ('vect', CountVectorizer(ngram_range=hp_dict.get('ngram_range', ngram_range), binary=hp_dict.get('binary', binary), token_pattern=token_pattern, max_features=hp_dict.get('max_features', None), max_df=hp_dict.get('max_df', 1.0), min_df=hp_dict.get('min_df', 1), stop_words=hp_dict.get('stop_words', None), lowercase=hp_dict.get('lowercase', True), strip_accents=hp_dict.get('strip_accents', None), encoding=hp_dict.get('encoding', 'utf-8') )), ('clf', clf) ]) return
def evaluate(self, x_test, y_test)
-
evaluate Args: x_test(list or np.ndarray): training texts y_test(np.ndarray): training labels
Expand source code
def evaluate(self, x_test, y_test): """ ``` evaluate Args: x_test(list or np.ndarray): training texts y_test(np.ndarray): training labels ``` """ predicted = self.predict(x_test) return np.mean(predicted == y_test)
def fit(self, x_train, y_train, ctype='logreg')
-
train a classifier Args: x_train(list or np.ndarray): training texts y_train(np.ndarray): training labels ctype(str): One of {'logreg', 'nbsvm', 'sgdclassifier'}. default:nbsvm
Expand source code
def fit(self, x_train, y_train, ctype='logreg'): """ ``` train a classifier Args: x_train(list or np.ndarray): training texts y_train(np.ndarray): training labels ctype(str): One of {'logreg', 'nbsvm', 'sgdclassifier'}. default:nbsvm ``` """ lang = U.detect_lang(x_train) if U.is_chinese(lang): x_train = U.split_chinese(x_train) if self.model is None: self.create_model(ctype, x_train) self.model.fit(x_train, y_train) return self
def grid_search(self, params, x_train, y_train, n_jobs=-1)
-
Performs grid search to find optimal set of hyperparameters Args: params (dict): A dictionary defining the space of the search. Example for finding optimal value of alpha in NBSVM: parameters = { #'clf__C': (1e0, 1e-1, 1e-2), 'clf__alpha': (0.1, 0.2, 0.4, 0.5, 0.75, 0.9, 1.0), #'clf__fit_intercept': (True, False), #'clf__beta' : (0.1, 0.25, 0.5, 0.9) } n_jobs(int): number of jobs to run in parallel. default:-1 (use all processors)
Expand source code
def grid_search(self, params, x_train, y_train, n_jobs=-1): """ ``` Performs grid search to find optimal set of hyperparameters Args: params (dict): A dictionary defining the space of the search. Example for finding optimal value of alpha in NBSVM: parameters = { #'clf__C': (1e0, 1e-1, 1e-2), 'clf__alpha': (0.1, 0.2, 0.4, 0.5, 0.75, 0.9, 1.0), #'clf__fit_intercept': (True, False), #'clf__beta' : (0.1, 0.25, 0.5, 0.9) } n_jobs(int): number of jobs to run in parallel. default:-1 (use all processors) ``` """ gs_clf = GridSearchCV(self.model, params, n_jobs=n_jobs) gs_clf = gs_clf.fit(x_train, y_train) #gs_clf.best_score_ for param_name in sorted(params.keys()): print("%s: %r" % (param_name, gs_clf.best_params_[param_name])) return
def load(self, filename)
-
load model
Expand source code
def load(self, filename): """ load model """ self.model = load(filename)
def predict(self, x_test, return_proba=False)
-
make predictions on text data Args: x_test(list or np.ndarray or str): array of texts on which to make predictions or a string representing text
Expand source code
def predict(self, x_test, return_proba=False): """ ``` make predictions on text data Args: x_test(list or np.ndarray or str): array of texts on which to make predictions or a string representing text ``` """ if return_proba and not hasattr(self.model['clf'], 'predict_proba'): raise ValueError('%s does not support predict_proba' % (type(self.model['clf']).__name__)) if isinstance(x_test, str): x_test = [x_test] lang = U.detect_lang(x_test) if U.is_chinese(lang): x_test = U.split_chinese(x_test) if self.model is None: raise ValueError('model is None - call fit or load to set the model') if return_proba: predicted = self.model.predict_proba(x_test) else: predicted = self.model.predict(x_test) if len(predicted) == 1: predicted = predicted[0] return predicted
def predict_proba(self, x_test)
-
predict_proba
Expand source code
def predict_proba(self, x_test): """ predict_proba """ return self.predict(x_test, return_proba=True)
def save(self, filename)
-
save model
Expand source code
def save(self, filename): """ save model """ dump(self.model, filename)
class NER (lang='en', predictor_path=None)
-
pretrained NER. Only English and Chinese are currenty supported. Args: lang(str): Currently, one of {'en', 'zh', 'ru'}: en=English , zh=Chinese, or ru=Russian
Expand source code
class NER: def __init__(self, lang='en', predictor_path=None): """ ``` pretrained NER. Only English and Chinese are currenty supported. Args: lang(str): Currently, one of {'en', 'zh', 'ru'}: en=English , zh=Chinese, or ru=Russian ``` """ if lang is None: raise ValueError('lang is required (e.g., "en" for English, "zh" for Chinese, "ru" for Russian, etc.') if predictor_path is None and lang not in ['en', 'zh', 'ru']: raise ValueError("Unsupported language: if predictor_path is None, then lang must be " +\ "'en' for English, 'zh' for Chinese, or 'ru' for Chinese") self.lang = lang if os.environ.get('DISABLE_V2_BEHAVIOR', None) != '1': warnings.warn("Please add os.environ['DISABLE_V2_BEHAVIOR'] = '1' at top of your script or notebook") msg = "\nNER in ktrain uses the CRF module from keras_contrib, which is not yet\n" +\ "fully compatible with TensorFlow 2. To use NER, you must add the following to the top of your\n" +\ "script or notebook BEFORE you import ktrain (after restarting runtime):\n\n" +\ "import os\n" +\ "os.environ['DISABLE_V2_BEHAVIOR'] = '1'\n" print(msg) return else: import tensorflow.compat.v1 as tf tf.disable_v2_behavior() if predictor_path is None and self.lang == 'zh': dirpath = os.path.dirname(os.path.abspath(__file__)) fpath = os.path.join(dirpath, 'ner_models/ner_chinese') elif predictor_path is None and self.lang == 'ru': dirpath = os.path.dirname(os.path.abspath(__file__)) fpath = os.path.join(dirpath, 'ner_models/ner_russian') elif predictor_path is None and self.lang=='en': dirpath = os.path.dirname(os.path.abspath(__file__)) fpath = os.path.join(dirpath, 'ner_models/ner_english') elif predictor_path is None: raise ValueError("Unsupported language: if predictor_path is None, then lang must be " +\ "'en' for English, 'zh' for Chinese, or 'ru' for Chinese") else: if not os.path.isfile(predictor_path) or not os.path.isfile(predictor_path +'.preproc'): raise ValueError('could not find a valid predictor model '+\ '%s or valid Preprocessor %s at specified path' % (predictor_path, predictor_path+'.preproc')) fpath = predictor_path try: import io from contextlib import redirect_stdout f = io.StringIO() with redirect_stdout(f): import ktrain except: raise ValueError('ktrain could not be imported. Install with: pip install ktrain') self.predictor = ktrain.load_predictor(fpath) def predict(self, texts, merge_tokens=True, batch_size=32): """ ``` Extract named entities from supplied text Args: texts (list of str or str): list of texts to annotate merge_tokens(bool): If True, tokens will be merged together by the entity to which they are associated: ('Paul', 'B-PER'), ('Newman', 'I-PER') becomes ('Paul Newman', 'PER') batch_size(int): Batch size to use for predictions (default:32) ``` """ if isinstance(texts, str): texts = [texts] self.predictor.batch_size = batch_size texts = [t.strip() for t in texts] results = self.predictor.predict(texts, merge_tokens=merge_tokens) if len(results) == 1: results = results[0] return results
Methods
def predict(self, texts, merge_tokens=True, batch_size=32)
-
Extract named entities from supplied text Args: texts (list of str or str): list of texts to annotate merge_tokens(bool): If True, tokens will be merged together by the entity to which they are associated: ('Paul', 'B-PER'), ('Newman', 'I-PER') becomes ('Paul Newman', 'PER') batch_size(int): Batch size to use for predictions (default:32)
Expand source code
def predict(self, texts, merge_tokens=True, batch_size=32): """ ``` Extract named entities from supplied text Args: texts (list of str or str): list of texts to annotate merge_tokens(bool): If True, tokens will be merged together by the entity to which they are associated: ('Paul', 'B-PER'), ('Newman', 'I-PER') becomes ('Paul Newman', 'PER') batch_size(int): Batch size to use for predictions (default:32) ``` """ if isinstance(texts, str): texts = [texts] self.predictor.batch_size = batch_size texts = [t.strip() for t in texts] results = self.predictor.predict(texts, merge_tokens=merge_tokens) if len(results) == 1: results = results[0] return results
class Searcher (queries, lang=None)
-
Search for keywords in text documents
Args: queries(list of str): list of chinese text queries lang(str): language of queries. default:None --> auto-detected
Expand source code
class Searcher: """ Search for keywords in text documents """ def __init__(self, queries, lang=None): """ ``` Args: queries(list of str): list of chinese text queries lang(str): language of queries. default:None --> auto-detected ``` """ self.queries = queries if isinstance(self.queries, str): self.queries = [self.queries] self.lang = lang if self.lang is None: self.lang = U.detect_lang(queries) #print("lang:%s" %(self.lang)) def search(self, docs, case_sensitive=False, keys=[], min_matches=1, progress=True): """ ``` executes self.queries on supplied list of documents Args: docs(list of str): list of chinese texts case_sensitive(bool): If True, case sensitive search keys(list): list keys for supplied docs (e.g., file paths). default: key is index in range(len(docs)) min_matches(int): results must have at least these many word matches progress(bool): whether or not to show progress bar Returns: list of tuples of results of the form: (key, query, no. of matches) For Chinese, no. of matches will be number of unique Jieba-extracted character sequences that match ``` """ if isinstance(docs, str): docs = [docs] if keys and len(keys) != len(docs): raise ValueError('lengths of keys and docs must be the same') results = [] l = len(docs) for idx, text in enumerate(docs): for q in self.queries: if U.is_chinese(self.lang): r = self._search_chinese(q, [text], min_matches=min_matches, parse=1, progress=False) elif self.lang == 'ar': r = self._search(q, [text], case_sensitive=case_sensitive, min_matches=min_matches, progress=False, substrings_on=True) else: r = self._search(q, [text], case_sensitive=case_sensitive, min_matches=min_matches, progress=False, substrings_on=False) if not r: continue r = r[0] k = idx if keys: k = keys[idx] num_matches = len(set(r[2])) if U.is_chinese(self.lang) else len(r[2]) results.append((k, q, num_matches)) if progress: printProgressBar(idx+1, l, prefix='progress: ', suffix ='complete', length=50) return results def _search(self, query, docs, case_sensitive=False, substrings_on=False, min_matches=1, progress=True): """ ``` search documents for query string. Args: query(str or list): the word or phrase to search (or list of them) if list is provided, each element is combined using OR docs (list of str): list of text documents case_sensitive(bool): If True, case sensitive search substrings_on(bool): whether to use "\b" in regex. default:True If True, will find substrings returns: list or tuple: Returns list of results if len(docs) > 1. Otherwise, returns tuple of results ``` """ if not isinstance(query, (list, tuple, str)): raise ValueError('query must be str or list of str') if isinstance(query, str): query = [query] if not isinstance(docs, (list, np.ndarray)): raise ValueError('docs must be list of str') flag = 0 if not case_sensitive: flag = re.I qlist =[] for q in query: qlist.append('\s+'.join(q.split())) original_query = query query = '|'.join(qlist) bound = r'\b' if substrings_on: bound = '' pattern_str = r'%s(?:%s)%s' % (bound, query, bound) pattern = re.compile( pattern_str, flag) results = [] l = len(docs) for idx, text in enumerate(docs): matches = pattern.findall(text) if matches and len(matches)>=min_matches: results.append((idx, text, matches)) if progress: printProgressBar(idx+1, l, prefix='progress: ', suffix ='complete', length=50) return results def _search_chinese(self, query, docs, substrings_on=True, parse=1, min_matches=1, progress=False): """ convenience method to search chinese text """ original_query = query if not isinstance(query, str): raise ValueError('query must be str') if parse > 0: q = U.split_chinese(query)[0] num_words = len(q.split()) query = build_ngrams(q, n=parse) query = ["".join(q) for q in query] return self._search(query, docs, substrings_on=substrings_on, progress=progress)
Methods
def search(self, docs, case_sensitive=False, keys=[], min_matches=1, progress=True)
-
executes self.queries on supplied list of documents Args: docs(list of str): list of chinese texts case_sensitive(bool): If True, case sensitive search keys(list): list keys for supplied docs (e.g., file paths). default: key is index in range(len(docs)) min_matches(int): results must have at least these many word matches progress(bool): whether or not to show progress bar Returns: list of tuples of results of the form: (key, query, no. of matches) For Chinese, no. of matches will be number of unique Jieba-extracted character sequences that match
Expand source code
def search(self, docs, case_sensitive=False, keys=[], min_matches=1, progress=True): """ ``` executes self.queries on supplied list of documents Args: docs(list of str): list of chinese texts case_sensitive(bool): If True, case sensitive search keys(list): list keys for supplied docs (e.g., file paths). default: key is index in range(len(docs)) min_matches(int): results must have at least these many word matches progress(bool): whether or not to show progress bar Returns: list of tuples of results of the form: (key, query, no. of matches) For Chinese, no. of matches will be number of unique Jieba-extracted character sequences that match ``` """ if isinstance(docs, str): docs = [docs] if keys and len(keys) != len(docs): raise ValueError('lengths of keys and docs must be the same') results = [] l = len(docs) for idx, text in enumerate(docs): for q in self.queries: if U.is_chinese(self.lang): r = self._search_chinese(q, [text], min_matches=min_matches, parse=1, progress=False) elif self.lang == 'ar': r = self._search(q, [text], case_sensitive=case_sensitive, min_matches=min_matches, progress=False, substrings_on=True) else: r = self._search(q, [text], case_sensitive=case_sensitive, min_matches=min_matches, progress=False, substrings_on=False) if not r: continue r = r[0] k = idx if keys: k = keys[idx] num_matches = len(set(r[2])) if U.is_chinese(self.lang) else len(r[2]) results.append((k, q, num_matches)) if progress: printProgressBar(idx+1, l, prefix='progress: ', suffix ='complete', length=50) return results