Module ktrain.text.shallownlp
Expand source code
from .classifier import Classifier
from .ner import NER
from .searcher import *
from .utils import extract_filenames, read_text, sent_tokenize
__all__ = [
"Classifier",
"Searcher",
"search",
"find_chinese",
"find_arabic",
"find_russian",
"read_text",
"NER",
"sent_tokenize",
"extract_filenames",
"read_text",
]
Sub-modules
ktrain.text.shallownlp.classifier
ktrain.text.shallownlp.imports
ktrain.text.shallownlp.ner
ktrain.text.shallownlp.searcher
ktrain.text.shallownlp.utils
Functions
def extract_filenames(corpus_path, follow_links=False)
-
Expand source code
def extract_filenames(corpus_path, follow_links=False): if os.listdir(corpus_path) == []: raise ValueError("%s: path is empty" % corpus_path) for root, _, fnames in os.walk(corpus_path, followlinks=follow_links): for filename in fnames: try: yield os.path.join(root, filename) except Exception: continue
def find_arabic(s)
-
Expand source code
def find_arabic(s): return re.findall(r"[\u0600-\u06FF]+", s)
def find_chinese(s)
-
Expand source code
def find_chinese(s): return re.findall(r"[\u4e00-\u9fff]+", s)
def find_russian(s)
-
Expand source code
def find_russian(s): return find_cyrillic(s)
def read_text(filename)
-
Expand source code
def read_text(filename): with open(filename, "rb") as f: text = f.read() encoding = detect_encoding([text]) try: decoded_text = text.decode(encoding) except: U.vprint( "Decoding with %s failed 1st attempt - using %s with skips" % (encoding, encoding), verbose=verbose, ) decoded_text = decode_by_line(text, encoding=encoding) return decoded_text.strip()
def search(query, doc, case_sensitive=False, keys=[], progress=False)
-
Expand source code
def search(query, doc, case_sensitive=False, keys=[], progress=False): searcher = Searcher(query) return searcher.search( doc, case_sensitive=case_sensitive, keys=keys, progress=progress )
def sent_tokenize(text)
-
segment text into sentences
Expand source code
def sent_tokenize(text): """ segment text into sentences """ lang = detect_lang(text) sents = [] if is_chinese(lang): for sent in re.findall("[^!?。\.\!\?]+[!?。\.\!\?]?", text, flags=re.U): sents.append(sent) else: for paragraph in segmenter.process(text): for sentence in paragraph: sents.append(" ".join([t.value for t in sentence])) return sents
Classes
class Classifier (model=None)
-
instantiate a classifier with an optional previously-saved model
Expand source code
class Classifier: def __init__(self, model=None): """ instantiate a classifier with an optional previously-saved model """ self.model = None def create_model(self, ctype, texts, hp_dict={}, ngram_range=(1, 3), binary=True): """ ``` create a model Args: ctype(str): one of {'nbsvm', 'logreg', 'sgdclassifier'} texts(list): list of texts hp_dict(dict): dictionary of hyperparameters to use for the ctype selected. hp_dict can also be used to supply arguments to CountVectorizer ngram_range(tuple): default ngram_range. overridden if 'ngram_range' in hp_dict binary(bool): default value for binary argument to CountVectorizer. overridden if 'binary' key in hp_dict ``` """ lang = U.detect_lang(texts) if U.is_chinese(lang): token_pattern = r"(?u)\b\w+\b" else: token_pattern = r"\w+|[%s]" % string.punctuation if ctype == "nbsvm": clf = NBSVM( C=hp_dict.get("C", 0.01), alpha=hp_dict.get("alpha", 0.75), beta=hp_dict.get("beta", 0.25), fit_intercept=hp_dict.get("fit_intercept", False), ) elif ctype == "logreg": clf = LogisticRegression( C=hp_dict.get("C", 0.1), dual=hp_dict.get("dual", True), penalty=hp_dict.get("penalty", "l2"), tol=hp_dict.get("tol", 1e-4), intercept_scaling=hp_dict.get("intercept_scaling", 1), solver=hp_dict.get("solver", "liblinear"), max_iter=hp_dict.get("max_iter", 100), multi_class=hp_dict.get("multi_class", "auto"), warm_start=hp_dict.get("warm_start", False), n_jobs=hp_dict.get("n_jobs", None), l1_ratio=hp_dict.get("l1_ratio", None), random_state=hp_dict.get("random_state", 42), class_weight=hp_dict.get("class_weight", None), ) elif ctype == "sgdclassifier": clf = SGDClassifier( loss=hp_dict.get("loss", "hinge"), penalty=hp_dict.get("penalty", "l2"), alpha=hp_dict.get("alpha", 1e-3), random_state=hp_dict.get("random_state", 42), max_iter=hp_dict.get("max_iter", 5), # scikit-learn default is 1000 tol=hp_dict.get("tol", None), l1_ratio=hp_dict.get("l1_ratio", 0.15), fit_intercept=hp_dict.get("fit_intercept", True), episilon=hp_dict.get("epsilon", 0.1), n_jobs=hp_dict.get("n_jobs", None), learning_rate=hp_dict.get("learning_rate", "optimal"), eta0=hp_dict.get("eta0", 0.0), power_t=hp_dict.get("power_t", 0.5), early_stopping=hp_dict.get("early_stopping", False), validation_fraction=hp_dict.get("validation_fraction", 0.1), n_iter_no_change=hp_dict.get("n_iter_no_change", 5), warm_start=hp_dict.get("warm_start", False), average=hp_dict.get("average", False), class_weight=hp_dict.get("class_weight", None), ) else: raise ValueError("Unknown ctype: %s" % (ctype)) self.model = Pipeline( [ ( "vect", CountVectorizer( ngram_range=hp_dict.get("ngram_range", ngram_range), binary=hp_dict.get("binary", binary), token_pattern=token_pattern, max_features=hp_dict.get("max_features", None), max_df=hp_dict.get("max_df", 1.0), min_df=hp_dict.get("min_df", 1), stop_words=hp_dict.get("stop_words", None), lowercase=hp_dict.get("lowercase", True), strip_accents=hp_dict.get("strip_accents", None), encoding=hp_dict.get("encoding", "utf-8"), ), ), ("clf", clf), ] ) return @classmethod def load_texts_from_folder( cls, folder_path, subfolders=None, shuffle=True, encoding=None ): """ ``` load text files from folder Args: folder_path(str): path to folder containing documents The supplied folder should contain a subfolder for each category, which will be used as the class label subfolders(list): list of subfolders under folder_path to consider Example: If folder_path contains subfolders pos, neg, and unlabeled, then unlabeled folder can be ignored by setting subfolders=['pos', 'neg'] shuffle(bool): If True, list of texts will be shuffled encoding(str): encoding to use. default:None (auto-detected) Returns: tuple: (texts, labels, label_names) ``` """ bunch = load_files(folder_path, categories=subfolders, shuffle=shuffle) texts = bunch.data labels = bunch.target label_names = bunch.target_names # print('target names:') # for idx, label_name in enumerate(bunch.target_names): # print('\t%s:%s' % (idx, label_name)) # decode based on supplied encoding if encoding is None: encoding = U.detect_encoding(texts) if encoding != "utf-8": print("detected encoding: %s" % (encoding)) try: texts = [text.decode(encoding) for text in texts] except: print( "Decoding with %s failed 1st attempt - using %s with skips" % (encoding, encoding) ) texts = U.decode_by_line(texts, encoding=encoding) return (texts, labels, label_names) @classmethod def load_texts_from_csv( cls, csv_filepath, text_column="text", label_column="label", sep=",", encoding=None, ): """ ``` load text files from csv file CSV should have at least two columns. Example: Text | Label I love this movie. | positive I hated this movie.| negative Args: csv_filepath(str): path to CSV file text_column(str): name of column containing the texts. default:'text' label_column(str): name of column containing the labels in string format default:'label' sep(str): character that separates columns in CSV. default:',' encoding(str): encoding to use. default:None (auto-detected) Returns: tuple: (texts, labels, label_names) ``` """ if encoding is None: with open(csv_filepath, "rb") as f: encoding = U.detect_encoding([f.read()]) if encoding != "utf-8": print("detected encoding: %s (if wrong, set manually)" % (encoding)) import pandas as pd df = pd.read_csv(csv_filepath, encoding=encoding, sep=sep) texts = df[text_column].fillna("fillna").values labels = df[label_column].values le = LabelEncoder() le.fit(labels) labels = le.transform(labels) return (texts, labels, le.classes_) def fit(self, x_train, y_train, ctype="logreg"): """ ``` train a classifier Args: x_train(list or np.ndarray): training texts y_train(np.ndarray): training labels ctype(str): One of {'logreg', 'nbsvm', 'sgdclassifier'}. default:nbsvm ``` """ lang = U.detect_lang(x_train) if U.is_chinese(lang): x_train = U.split_chinese(x_train) if self.model is None: self.create_model(ctype, x_train) self.model.fit(x_train, y_train) return self def predict(self, x_test, return_proba=False): """ ``` make predictions on text data Args: x_test(list or np.ndarray or str): array of texts on which to make predictions or a string representing text ``` """ if return_proba and not hasattr(self.model["clf"], "predict_proba"): raise ValueError( "%s does not support predict_proba" % (type(self.model["clf"]).__name__) ) if isinstance(x_test, str): x_test = [x_test] lang = U.detect_lang(x_test) if U.is_chinese(lang): x_test = U.split_chinese(x_test) if self.model is None: raise ValueError("model is None - call fit or load to set the model") if return_proba: predicted = self.model.predict_proba(x_test) else: predicted = self.model.predict(x_test) if len(predicted) == 1: predicted = predicted[0] return predicted def predict_proba(self, x_test): """ predict_proba """ return self.predict(x_test, return_proba=True) def evaluate(self, x_test, y_test): """ ``` evaluate Args: x_test(list or np.ndarray): training texts y_test(np.ndarray): training labels ``` """ predicted = self.predict(x_test) return np.mean(predicted == y_test) def save(self, filename): """ save model """ dump(self.model, filename) def load(self, filename): """ load model """ self.model = load(filename) def grid_search(self, params, x_train, y_train, n_jobs=-1): """ ``` Performs grid search to find optimal set of hyperparameters Args: params (dict): A dictionary defining the space of the search. Example for finding optimal value of alpha in NBSVM: parameters = { #'clf__C': (1e0, 1e-1, 1e-2), 'clf__alpha': (0.1, 0.2, 0.4, 0.5, 0.75, 0.9, 1.0), #'clf__fit_intercept': (True, False), #'clf__beta' : (0.1, 0.25, 0.5, 0.9) } n_jobs(int): number of jobs to run in parallel. default:-1 (use all processors) ``` """ gs_clf = GridSearchCV(self.model, params, n_jobs=n_jobs) gs_clf = gs_clf.fit(x_train, y_train) # gs_clf.best_score_ for param_name in sorted(params.keys()): print("%s: %r" % (param_name, gs_clf.best_params_[param_name])) return
Static methods
def load_texts_from_csv(csv_filepath, text_column='text', label_column='label', sep=',', encoding=None)
-
load text files from csv file CSV should have at least two columns. Example: Text | Label I love this movie. | positive I hated this movie.| negative Args: csv_filepath(str): path to CSV file text_column(str): name of column containing the texts. default:'text' label_column(str): name of column containing the labels in string format default:'label' sep(str): character that separates columns in CSV. default:',' encoding(str): encoding to use. default:None (auto-detected) Returns: tuple: (texts, labels, label_names)
Expand source code
@classmethod def load_texts_from_csv( cls, csv_filepath, text_column="text", label_column="label", sep=",", encoding=None, ): """ ``` load text files from csv file CSV should have at least two columns. Example: Text | Label I love this movie. | positive I hated this movie.| negative Args: csv_filepath(str): path to CSV file text_column(str): name of column containing the texts. default:'text' label_column(str): name of column containing the labels in string format default:'label' sep(str): character that separates columns in CSV. default:',' encoding(str): encoding to use. default:None (auto-detected) Returns: tuple: (texts, labels, label_names) ``` """ if encoding is None: with open(csv_filepath, "rb") as f: encoding = U.detect_encoding([f.read()]) if encoding != "utf-8": print("detected encoding: %s (if wrong, set manually)" % (encoding)) import pandas as pd df = pd.read_csv(csv_filepath, encoding=encoding, sep=sep) texts = df[text_column].fillna("fillna").values labels = df[label_column].values le = LabelEncoder() le.fit(labels) labels = le.transform(labels) return (texts, labels, le.classes_)
def load_texts_from_folder(folder_path, subfolders=None, shuffle=True, encoding=None)
-
load text files from folder Args: folder_path(str): path to folder containing documents The supplied folder should contain a subfolder for each category, which will be used as the class label subfolders(list): list of subfolders under folder_path to consider Example: If folder_path contains subfolders pos, neg, and unlabeled, then unlabeled folder can be ignored by setting subfolders=['pos', 'neg'] shuffle(bool): If True, list of texts will be shuffled encoding(str): encoding to use. default:None (auto-detected) Returns: tuple: (texts, labels, label_names)
Expand source code
@classmethod def load_texts_from_folder( cls, folder_path, subfolders=None, shuffle=True, encoding=None ): """ ``` load text files from folder Args: folder_path(str): path to folder containing documents The supplied folder should contain a subfolder for each category, which will be used as the class label subfolders(list): list of subfolders under folder_path to consider Example: If folder_path contains subfolders pos, neg, and unlabeled, then unlabeled folder can be ignored by setting subfolders=['pos', 'neg'] shuffle(bool): If True, list of texts will be shuffled encoding(str): encoding to use. default:None (auto-detected) Returns: tuple: (texts, labels, label_names) ``` """ bunch = load_files(folder_path, categories=subfolders, shuffle=shuffle) texts = bunch.data labels = bunch.target label_names = bunch.target_names # print('target names:') # for idx, label_name in enumerate(bunch.target_names): # print('\t%s:%s' % (idx, label_name)) # decode based on supplied encoding if encoding is None: encoding = U.detect_encoding(texts) if encoding != "utf-8": print("detected encoding: %s" % (encoding)) try: texts = [text.decode(encoding) for text in texts] except: print( "Decoding with %s failed 1st attempt - using %s with skips" % (encoding, encoding) ) texts = U.decode_by_line(texts, encoding=encoding) return (texts, labels, label_names)
Methods
def create_model(self, ctype, texts, hp_dict={}, ngram_range=(1, 3), binary=True)
-
create a model Args: ctype(str): one of {'nbsvm', 'logreg', 'sgdclassifier'} texts(list): list of texts hp_dict(dict): dictionary of hyperparameters to use for the ctype selected. hp_dict can also be used to supply arguments to CountVectorizer ngram_range(tuple): default ngram_range. overridden if 'ngram_range' in hp_dict binary(bool): default value for binary argument to CountVectorizer. overridden if 'binary' key in hp_dict
Expand source code
def create_model(self, ctype, texts, hp_dict={}, ngram_range=(1, 3), binary=True): """ ``` create a model Args: ctype(str): one of {'nbsvm', 'logreg', 'sgdclassifier'} texts(list): list of texts hp_dict(dict): dictionary of hyperparameters to use for the ctype selected. hp_dict can also be used to supply arguments to CountVectorizer ngram_range(tuple): default ngram_range. overridden if 'ngram_range' in hp_dict binary(bool): default value for binary argument to CountVectorizer. overridden if 'binary' key in hp_dict ``` """ lang = U.detect_lang(texts) if U.is_chinese(lang): token_pattern = r"(?u)\b\w+\b" else: token_pattern = r"\w+|[%s]" % string.punctuation if ctype == "nbsvm": clf = NBSVM( C=hp_dict.get("C", 0.01), alpha=hp_dict.get("alpha", 0.75), beta=hp_dict.get("beta", 0.25), fit_intercept=hp_dict.get("fit_intercept", False), ) elif ctype == "logreg": clf = LogisticRegression( C=hp_dict.get("C", 0.1), dual=hp_dict.get("dual", True), penalty=hp_dict.get("penalty", "l2"), tol=hp_dict.get("tol", 1e-4), intercept_scaling=hp_dict.get("intercept_scaling", 1), solver=hp_dict.get("solver", "liblinear"), max_iter=hp_dict.get("max_iter", 100), multi_class=hp_dict.get("multi_class", "auto"), warm_start=hp_dict.get("warm_start", False), n_jobs=hp_dict.get("n_jobs", None), l1_ratio=hp_dict.get("l1_ratio", None), random_state=hp_dict.get("random_state", 42), class_weight=hp_dict.get("class_weight", None), ) elif ctype == "sgdclassifier": clf = SGDClassifier( loss=hp_dict.get("loss", "hinge"), penalty=hp_dict.get("penalty", "l2"), alpha=hp_dict.get("alpha", 1e-3), random_state=hp_dict.get("random_state", 42), max_iter=hp_dict.get("max_iter", 5), # scikit-learn default is 1000 tol=hp_dict.get("tol", None), l1_ratio=hp_dict.get("l1_ratio", 0.15), fit_intercept=hp_dict.get("fit_intercept", True), episilon=hp_dict.get("epsilon", 0.1), n_jobs=hp_dict.get("n_jobs", None), learning_rate=hp_dict.get("learning_rate", "optimal"), eta0=hp_dict.get("eta0", 0.0), power_t=hp_dict.get("power_t", 0.5), early_stopping=hp_dict.get("early_stopping", False), validation_fraction=hp_dict.get("validation_fraction", 0.1), n_iter_no_change=hp_dict.get("n_iter_no_change", 5), warm_start=hp_dict.get("warm_start", False), average=hp_dict.get("average", False), class_weight=hp_dict.get("class_weight", None), ) else: raise ValueError("Unknown ctype: %s" % (ctype)) self.model = Pipeline( [ ( "vect", CountVectorizer( ngram_range=hp_dict.get("ngram_range", ngram_range), binary=hp_dict.get("binary", binary), token_pattern=token_pattern, max_features=hp_dict.get("max_features", None), max_df=hp_dict.get("max_df", 1.0), min_df=hp_dict.get("min_df", 1), stop_words=hp_dict.get("stop_words", None), lowercase=hp_dict.get("lowercase", True), strip_accents=hp_dict.get("strip_accents", None), encoding=hp_dict.get("encoding", "utf-8"), ), ), ("clf", clf), ] ) return
def evaluate(self, x_test, y_test)
-
evaluate Args: x_test(list or np.ndarray): training texts y_test(np.ndarray): training labels
Expand source code
def evaluate(self, x_test, y_test): """ ``` evaluate Args: x_test(list or np.ndarray): training texts y_test(np.ndarray): training labels ``` """ predicted = self.predict(x_test) return np.mean(predicted == y_test)
def fit(self, x_train, y_train, ctype='logreg')
-
train a classifier Args: x_train(list or np.ndarray): training texts y_train(np.ndarray): training labels ctype(str): One of {'logreg', 'nbsvm', 'sgdclassifier'}. default:nbsvm
Expand source code
def fit(self, x_train, y_train, ctype="logreg"): """ ``` train a classifier Args: x_train(list or np.ndarray): training texts y_train(np.ndarray): training labels ctype(str): One of {'logreg', 'nbsvm', 'sgdclassifier'}. default:nbsvm ``` """ lang = U.detect_lang(x_train) if U.is_chinese(lang): x_train = U.split_chinese(x_train) if self.model is None: self.create_model(ctype, x_train) self.model.fit(x_train, y_train) return self
def grid_search(self, params, x_train, y_train, n_jobs=-1)
-
Performs grid search to find optimal set of hyperparameters Args: params (dict): A dictionary defining the space of the search. Example for finding optimal value of alpha in NBSVM: parameters = { #'clf__C': (1e0, 1e-1, 1e-2), 'clf__alpha': (0.1, 0.2, 0.4, 0.5, 0.75, 0.9, 1.0), #'clf__fit_intercept': (True, False), #'clf__beta' : (0.1, 0.25, 0.5, 0.9) } n_jobs(int): number of jobs to run in parallel. default:-1 (use all processors)
Expand source code
def grid_search(self, params, x_train, y_train, n_jobs=-1): """ ``` Performs grid search to find optimal set of hyperparameters Args: params (dict): A dictionary defining the space of the search. Example for finding optimal value of alpha in NBSVM: parameters = { #'clf__C': (1e0, 1e-1, 1e-2), 'clf__alpha': (0.1, 0.2, 0.4, 0.5, 0.75, 0.9, 1.0), #'clf__fit_intercept': (True, False), #'clf__beta' : (0.1, 0.25, 0.5, 0.9) } n_jobs(int): number of jobs to run in parallel. default:-1 (use all processors) ``` """ gs_clf = GridSearchCV(self.model, params, n_jobs=n_jobs) gs_clf = gs_clf.fit(x_train, y_train) # gs_clf.best_score_ for param_name in sorted(params.keys()): print("%s: %r" % (param_name, gs_clf.best_params_[param_name])) return
def load(self, filename)
-
load model
Expand source code
def load(self, filename): """ load model """ self.model = load(filename)
def predict(self, x_test, return_proba=False)
-
make predictions on text data Args: x_test(list or np.ndarray or str): array of texts on which to make predictions or a string representing text
Expand source code
def predict(self, x_test, return_proba=False): """ ``` make predictions on text data Args: x_test(list or np.ndarray or str): array of texts on which to make predictions or a string representing text ``` """ if return_proba and not hasattr(self.model["clf"], "predict_proba"): raise ValueError( "%s does not support predict_proba" % (type(self.model["clf"]).__name__) ) if isinstance(x_test, str): x_test = [x_test] lang = U.detect_lang(x_test) if U.is_chinese(lang): x_test = U.split_chinese(x_test) if self.model is None: raise ValueError("model is None - call fit or load to set the model") if return_proba: predicted = self.model.predict_proba(x_test) else: predicted = self.model.predict(x_test) if len(predicted) == 1: predicted = predicted[0] return predicted
def predict_proba(self, x_test)
-
predict_proba
Expand source code
def predict_proba(self, x_test): """ predict_proba """ return self.predict(x_test, return_proba=True)
def save(self, filename)
-
save model
Expand source code
def save(self, filename): """ save model """ dump(self.model, filename)
class NER (lang='en', predictor_path=None)
-
pretrained NER. Only English and Chinese are currenty supported. Args: lang(str): Currently, one of {'en', 'zh', 'ru'}: en=English , zh=Chinese, or ru=Russian
Expand source code
class NER: def __init__(self, lang="en", predictor_path=None): """ ``` pretrained NER. Only English and Chinese are currenty supported. Args: lang(str): Currently, one of {'en', 'zh', 'ru'}: en=English , zh=Chinese, or ru=Russian ``` """ if lang is None: raise ValueError( 'lang is required (e.g., "en" for English, "zh" for Chinese, "ru" for Russian, etc.' ) if predictor_path is None and lang not in ["en", "zh", "ru"]: raise ValueError( "Unsupported language: if predictor_path is None, then lang must be " + "'en' for English, 'zh' for Chinese, or 'ru' for Chinese" ) self.lang = lang if os.environ.get("DISABLE_V2_BEHAVIOR", None) != "1": warnings.warn( "Please add os.environ['DISABLE_V2_BEHAVIOR'] = '1' at top of your script or notebook" ) msg = ( "\nNER in ktrain uses the CRF module from keras_contrib, which is not yet\n" + "fully compatible with TensorFlow 2. To use NER, you must add the following to the top of your\n" + "script or notebook BEFORE you import ktrain (after restarting runtime):\n\n" + "import os\n" + "os.environ['DISABLE_V2_BEHAVIOR'] = '1'\n" ) print(msg) return else: import tensorflow.compat.v1 as tf tf.disable_v2_behavior() if predictor_path is None and self.lang == "zh": dirpath = os.path.dirname(os.path.abspath(__file__)) fpath = os.path.join(dirpath, "ner_models/ner_chinese") elif predictor_path is None and self.lang == "ru": dirpath = os.path.dirname(os.path.abspath(__file__)) fpath = os.path.join(dirpath, "ner_models/ner_russian") elif predictor_path is None and self.lang == "en": dirpath = os.path.dirname(os.path.abspath(__file__)) fpath = os.path.join(dirpath, "ner_models/ner_english") elif predictor_path is None: raise ValueError( "Unsupported language: if predictor_path is None, then lang must be " + "'en' for English, 'zh' for Chinese, or 'ru' for Chinese" ) else: if not os.path.isfile(predictor_path) or not os.path.isfile( predictor_path + ".preproc" ): raise ValueError( "could not find a valid predictor model " + "%s or valid Preprocessor %s at specified path" % (predictor_path, predictor_path + ".preproc") ) fpath = predictor_path try: import io from contextlib import redirect_stdout f = io.StringIO() with redirect_stdout(f): import ktrain except: raise ValueError( "ktrain could not be imported. Install with: pip install ktrain" ) self.predictor = ktrain.load_predictor(fpath) def predict(self, texts, merge_tokens=True, batch_size=32): """ ``` Extract named entities from supplied text Args: texts (list of str or str): list of texts to annotate merge_tokens(bool): If True, tokens will be merged together by the entity to which they are associated: ('Paul', 'B-PER'), ('Newman', 'I-PER') becomes ('Paul Newman', 'PER') batch_size(int): Batch size to use for predictions (default:32) ``` """ if isinstance(texts, str): texts = [texts] self.predictor.batch_size = batch_size texts = [t.strip() for t in texts] results = self.predictor.predict(texts, merge_tokens=merge_tokens) if len(results) == 1: results = results[0] return results
Methods
def predict(self, texts, merge_tokens=True, batch_size=32)
-
Extract named entities from supplied text Args: texts (list of str or str): list of texts to annotate merge_tokens(bool): If True, tokens will be merged together by the entity to which they are associated: ('Paul', 'B-PER'), ('Newman', 'I-PER') becomes ('Paul Newman', 'PER') batch_size(int): Batch size to use for predictions (default:32)
Expand source code
def predict(self, texts, merge_tokens=True, batch_size=32): """ ``` Extract named entities from supplied text Args: texts (list of str or str): list of texts to annotate merge_tokens(bool): If True, tokens will be merged together by the entity to which they are associated: ('Paul', 'B-PER'), ('Newman', 'I-PER') becomes ('Paul Newman', 'PER') batch_size(int): Batch size to use for predictions (default:32) ``` """ if isinstance(texts, str): texts = [texts] self.predictor.batch_size = batch_size texts = [t.strip() for t in texts] results = self.predictor.predict(texts, merge_tokens=merge_tokens) if len(results) == 1: results = results[0] return results
class Searcher (queries, lang=None)
-
Search for keywords in text documents
Args: queries(list of str): list of chinese text queries lang(str): language of queries. default:None --> auto-detected
Expand source code
class Searcher: """ Search for keywords in text documents """ def __init__(self, queries, lang=None): """ ``` Args: queries(list of str): list of chinese text queries lang(str): language of queries. default:None --> auto-detected ``` """ self.queries = queries if isinstance(self.queries, str): self.queries = [self.queries] self.lang = lang if self.lang is None: self.lang = U.detect_lang(queries) # print("lang:%s" %(self.lang)) def search(self, docs, case_sensitive=False, keys=[], min_matches=1, progress=True): """ ``` executes self.queries on supplied list of documents Args: docs(list of str): list of chinese texts case_sensitive(bool): If True, case sensitive search keys(list): list keys for supplied docs (e.g., file paths). default: key is index in range(len(docs)) min_matches(int): results must have at least these many word matches progress(bool): whether or not to show progress bar Returns: list of tuples of results of the form: (key, query, no. of matches) For Chinese, no. of matches will be number of unique Jieba-extracted character sequences that match ``` """ if isinstance(docs, str): docs = [docs] if keys and len(keys) != len(docs): raise ValueError("lengths of keys and docs must be the same") results = [] l = len(docs) for idx, text in enumerate(docs): for q in self.queries: if U.is_chinese(self.lang): r = self._search_chinese( q, [text], min_matches=min_matches, parse=1, progress=False ) elif self.lang == "ar": r = self._search( q, [text], case_sensitive=case_sensitive, min_matches=min_matches, progress=False, substrings_on=True, ) else: r = self._search( q, [text], case_sensitive=case_sensitive, min_matches=min_matches, progress=False, substrings_on=False, ) if not r: continue r = r[0] k = idx if keys: k = keys[idx] num_matches = len(set(r[2])) if U.is_chinese(self.lang) else len(r[2]) results.append((k, q, num_matches)) if progress: printProgressBar( idx + 1, l, prefix="progress: ", suffix="complete", length=50 ) return results def _search( self, query, docs, case_sensitive=False, substrings_on=False, min_matches=1, progress=True, ): """ ``` search documents for query string. Args: query(str or list): the word or phrase to search (or list of them) if list is provided, each element is combined using OR docs (list of str): list of text documents case_sensitive(bool): If True, case sensitive search substrings_on(bool): whether to use "\b" in regex. default:True If True, will find substrings returns: list or tuple: Returns list of results if len(docs) > 1. Otherwise, returns tuple of results ``` """ if not isinstance(query, (list, tuple, str)): raise ValueError("query must be str or list of str") if isinstance(query, str): query = [query] if not isinstance(docs, (list, np.ndarray)): raise ValueError("docs must be list of str") flag = 0 if not case_sensitive: flag = re.I qlist = [] for q in query: qlist.append("\s+".join(q.split())) original_query = query query = "|".join(qlist) bound = r"\b" if substrings_on: bound = "" pattern_str = r"%s(?:%s)%s" % (bound, query, bound) pattern = re.compile(pattern_str, flag) results = [] l = len(docs) for idx, text in enumerate(docs): matches = pattern.findall(text) if matches and len(matches) >= min_matches: results.append((idx, text, matches)) if progress: printProgressBar( idx + 1, l, prefix="progress: ", suffix="complete", length=50 ) return results def _search_chinese( self, query, docs, substrings_on=True, parse=1, min_matches=1, progress=False ): """ convenience method to search chinese text """ original_query = query if not isinstance(query, str): raise ValueError("query must be str") if parse > 0: q = U.split_chinese(query)[0] num_words = len(q.split()) query = build_ngrams(q, n=parse) query = ["".join(q) for q in query] return self._search(query, docs, substrings_on=substrings_on, progress=progress)
Methods
def search(self, docs, case_sensitive=False, keys=[], min_matches=1, progress=True)
-
executes self.queries on supplied list of documents Args: docs(list of str): list of chinese texts case_sensitive(bool): If True, case sensitive search keys(list): list keys for supplied docs (e.g., file paths). default: key is index in range(len(docs)) min_matches(int): results must have at least these many word matches progress(bool): whether or not to show progress bar Returns: list of tuples of results of the form: (key, query, no. of matches) For Chinese, no. of matches will be number of unique Jieba-extracted character sequences that match
Expand source code
def search(self, docs, case_sensitive=False, keys=[], min_matches=1, progress=True): """ ``` executes self.queries on supplied list of documents Args: docs(list of str): list of chinese texts case_sensitive(bool): If True, case sensitive search keys(list): list keys for supplied docs (e.g., file paths). default: key is index in range(len(docs)) min_matches(int): results must have at least these many word matches progress(bool): whether or not to show progress bar Returns: list of tuples of results of the form: (key, query, no. of matches) For Chinese, no. of matches will be number of unique Jieba-extracted character sequences that match ``` """ if isinstance(docs, str): docs = [docs] if keys and len(keys) != len(docs): raise ValueError("lengths of keys and docs must be the same") results = [] l = len(docs) for idx, text in enumerate(docs): for q in self.queries: if U.is_chinese(self.lang): r = self._search_chinese( q, [text], min_matches=min_matches, parse=1, progress=False ) elif self.lang == "ar": r = self._search( q, [text], case_sensitive=case_sensitive, min_matches=min_matches, progress=False, substrings_on=True, ) else: r = self._search( q, [text], case_sensitive=case_sensitive, min_matches=min_matches, progress=False, substrings_on=False, ) if not r: continue r = r[0] k = idx if keys: k = keys[idx] num_matches = len(set(r[2])) if U.is_chinese(self.lang) else len(r[2]) results.append((k, q, num_matches)) if progress: printProgressBar( idx + 1, l, prefix="progress: ", suffix="complete", length=50 ) return results