Source code for nlp_architect.models.transformers.base_model
# ******************************************************************************
# Copyright 2017-2019 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ******************************************************************************
import io
import logging
import os
from typing import List, Union
import torch
from torch.utils.data import DataLoader
from tqdm import tqdm, trange
from transformers import (AdamW, BertConfig, BertTokenizer, RobertaConfig,
RobertaTokenizer, XLMConfig, XLMTokenizer,
XLNetConfig, XLNetTokenizer,
get_linear_schedule_with_warmup)
from nlp_architect.models import TrainableModel
from nlp_architect.models.transformers.quantized_bert import \
QuantizedBertConfig
logger = logging.getLogger(__name__)
ALL_MODELS = sum((tuple(conf.pretrained_config_archive_map.keys())
for conf in (BertConfig, XLNetConfig, XLMConfig)), ())
[docs]def get_models(models: List[str]):
if models is not None:
return [m for m in ALL_MODELS if m.split('-')[0] in models]
return ALL_MODELS
[docs]class TransformerBase(TrainableModel):
"""
Transformers base model (for working with pytorch-transformers models)
"""
MODEL_CONFIGURATIONS = {
'bert': (BertConfig, BertTokenizer),
'quant_bert': (QuantizedBertConfig, BertTokenizer),
'xlnet': (XLNetConfig, XLNetTokenizer),
'xlm': (XLMConfig, XLMTokenizer),
'roberta': (RobertaConfig, RobertaTokenizer)
}
def __init__(self, model_type: str, model_name_or_path: str, labels: List[str] = None,
num_labels: int = None, config_name=None,
tokenizer_name=None, do_lower_case=False, output_path=None,
device='cpu', n_gpus=0):
"""
Transformers base model (for working with pytorch-transformers models)
Args:
model_type (str): transformer model type
model_name_or_path (str): model name or path to model
labels (List[str], optional): list of labels. Defaults to None.
num_labels (int, optional): number of labels. Defaults to None.
config_name ([type], optional): configuration name. Defaults to None.
tokenizer_name ([type], optional): tokenizer name. Defaults to None.
do_lower_case (bool, optional): lower case input words. Defaults to False.
output_path ([type], optional): model output path. Defaults to None.
device (str, optional): backend device. Defaults to 'cpu'.
n_gpus (int, optional): num of gpus. Defaults to 0.
Raises:
FileNotFoundError: [description]
"""
assert model_type in self.MODEL_CONFIGURATIONS.keys(), "unsupported model_type"
self.model_type = model_type
self.model_name_or_path = model_name_or_path
self.labels = labels
self.num_labels = num_labels
self.do_lower_case = do_lower_case
if output_path is not None and not os.path.exists(output_path):
raise FileNotFoundError('output_path is not found')
self.output_path = output_path
self.model_class = None
config_class, tokenizer_class = self.MODEL_CONFIGURATIONS[model_type]
self.config_class = config_class
self.tokenizer_class = tokenizer_class
self.tokenizer_name = tokenizer_name
self.tokenizer = self._load_tokenizer(self.tokenizer_name)
self.config_name = config_name
self.config = self._load_config(config_name)
self.model = None
self.device = device
self.n_gpus = n_gpus
self._optimizer = None
self._scheduler = None
[docs] def to(self, device='cpu', n_gpus=0):
if self.model is not None:
self.model.to(device)
if n_gpus > 1:
self.model = torch.nn.DataParallel(self.model)
self.device = device
self.n_gpus = n_gpus
@property
def optimizer(self):
return self._optimizer
@optimizer.setter
def optimizer(self, opt):
self._optimizer = opt
@property
def scheduler(self):
return self._scheduler
@scheduler.setter
def scheduler(self, sch):
self._scheduler = sch
[docs] def setup_default_optimizer(self,
weight_decay: float = 0.0,
learning_rate: float = 5e-5,
adam_epsilon: float = 1e-8,
warmup_steps: int = 0,
total_steps: int = 0):
# Prepare optimizer and schedule (linear warmup and decay)
no_decay = ['bias', 'LayerNorm.weight']
optimizer_grouped_parameters = [
{'params': [p for n, p in self.model.named_parameters() if not any(
nd in n for nd in no_decay)], 'weight_decay': weight_decay},
{'params': [p for n, p in self.model.named_parameters() if any(
nd in n for nd in no_decay)], 'weight_decay': 0.0}
]
self.optimizer = AdamW(optimizer_grouped_parameters, lr=learning_rate, eps=adam_epsilon)
self.scheduler = get_linear_schedule_with_warmup(self.optimizer,
num_warmup_steps=warmup_steps,
num_training_steps=total_steps)
def _load_config(self, config_name=None):
config = self.config_class.from_pretrained(config_name if config_name
else self.model_name_or_path,
num_labels=self.num_labels)
return config
def _load_tokenizer(self, tokenizer_name=None):
tokenizer = self.tokenizer_class.from_pretrained(
tokenizer_name if tokenizer_name
else self.model_name_or_path, do_lower_case=self.do_lower_case)
return tokenizer
[docs] def save_model(self, output_dir: str, save_checkpoint: bool = False, args=None):
"""
Save model/tokenizer/arguments to given output directory
Args:
output_dir (str): path to output directory
save_checkpoint (bool, optional): save as checkpoint. Defaults to False.
args ([type], optional): arguments object to save. Defaults to None.
"""
# Create output directory if needed
if not os.path.exists(output_dir):
os.makedirs(output_dir)
logger.info("Saving model checkpoint to %s", output_dir)
model_to_save = self.model.module if hasattr(self.model, 'module') else self.model
model_to_save.save_pretrained(output_dir)
if not save_checkpoint:
if self.tokenizer is not None:
self.tokenizer.save_pretrained(output_dir)
with io.open(output_dir + os.sep + 'labels.txt', 'w', encoding='utf-8') as fw:
for l in self.labels:
fw.write('{}\n'.format(l))
if args is not None:
torch.save(args, os.path.join(output_dir, 'training_args.bin'))
[docs] @classmethod
def load_model(cls, model_path: str, model_type: str, *args, **kwargs):
"""
Create a TranformerBase deom from given path
Args:
model_path (str): path to model
model_type (str): model type
Returns:
TransformerBase: model
"""
# Load a trained model and vocabulary from given path
if not os.path.exists(model_path):
raise FileNotFoundError
with io.open(model_path + os.sep + 'labels.txt') as fp:
labels = [l.strip() for l in fp.readlines()]
return cls(
model_type=model_type, model_name_or_path=model_path, labels=labels, *args, **kwargs)
[docs] @staticmethod
def get_train_steps_epochs(max_steps: int,
num_train_epochs: int,
gradient_accumulation_steps: int,
num_samples: int):
"""
get train steps and epochs
Args:
max_steps (int): max steps
num_train_epochs (int): num epochs
gradient_accumulation_steps (int): gradient accumulation steps
num_samples (int): number of samples
Returns:
Tuple: total steps, number of epochs
"""
if max_steps > 0:
t_total = max_steps
num_train_epochs = max_steps // (num_samples // gradient_accumulation_steps) + 1
else:
t_total = num_samples // gradient_accumulation_steps * num_train_epochs
return t_total, num_train_epochs
[docs] def get_logits(self, batch):
self.model.eval()
inputs = self._batch_mapper(batch)
outputs = self.model(**inputs)
return outputs[-1]
def _train(self,
data_set: DataLoader,
dev_data_set: Union[DataLoader, List[DataLoader]] = None,
test_data_set: Union[DataLoader, List[DataLoader]] = None,
gradient_accumulation_steps: int = 1,
per_gpu_train_batch_size: int = 8,
max_steps: int = -1,
num_train_epochs: int = 3,
max_grad_norm: float = 1.0,
logging_steps: int = 50,
save_steps: int = 100):
"""Run model training
batch_mapper: a function that maps a batch into parameters that the model
expects in the forward method (for use with custom heads and models).
If None it will default to the basic models input structure.
logging_callback_fn: a function that is called in each evaluation step
with the model as a parameter.
"""
t_total, num_train_epochs = self.get_train_steps_epochs(max_steps,
num_train_epochs,
gradient_accumulation_steps,
len(data_set))
if self.optimizer is None and self.scheduler is None:
logger.info("Loading default optimizer and scheduler")
self.setup_default_optimizer(total_steps=t_total)
train_batch_size = per_gpu_train_batch_size * max(1, self.n_gpus)
logger.info("***** Running training *****")
logger.info(" Num examples = %d", len(data_set.dataset))
logger.info(" Num Epochs = %d", num_train_epochs)
logger.info(" Instantaneous batch size per GPU/CPU = %d", per_gpu_train_batch_size)
logger.info(" Total train batch size (w. parallel, distributed & accumulation) = %d",
train_batch_size * gradient_accumulation_steps)
logger.info(" Gradient Accumulation steps = %d", gradient_accumulation_steps)
logger.info(" Total optimization steps = %d", t_total)
global_step = 0
tr_loss, logging_loss = 0.0, 0.0
self.model.zero_grad()
train_iterator = trange(num_train_epochs, desc="Epoch")
for _ in train_iterator:
epoch_iterator = tqdm(data_set, desc="Train iteration")
for step, batch in enumerate(epoch_iterator):
self.model.train()
batch = tuple(t.to(self.device) for t in batch)
inputs = self._batch_mapper(batch)
outputs = self.model(**inputs)
loss = outputs[0] # get loss
if self.n_gpus > 1:
loss = loss.mean() # mean() to average on multi-gpu parallel training
if gradient_accumulation_steps > 1:
loss = loss / gradient_accumulation_steps
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_grad_norm)
tr_loss += loss.item()
if (step + 1) % gradient_accumulation_steps == 0:
self.optimizer.step()
self.scheduler.step()
self.model.zero_grad()
global_step += 1
if logging_steps > 0 and global_step % logging_steps == 0:
# Log metrics and run evaluation on dev/test
for ds in [dev_data_set, test_data_set]:
if ds is None: # got no data loader
continue
if isinstance(ds, DataLoader):
ds = [ds]
for d in ds:
logits, label_ids = self._evaluate(d)
self.evaluate_predictions(logits, label_ids)
logger.info('lr = {}'.format(self.scheduler.get_lr()[0]))
logger.info('loss = {}'.format((tr_loss - logging_loss) / logging_steps))
logging_loss = tr_loss
if save_steps > 0 and global_step % save_steps == 0:
# Save model checkpoint
self.save_model_checkpoint(output_path=self.output_path,
name='checkpoint-{}'.format(global_step))
if 0 < max_steps < global_step:
epoch_iterator.close()
break
if 0 < max_steps < global_step:
train_iterator.close()
break
logger.info(" global_step = %s, average loss = %s", global_step, tr_loss)
def _evaluate(self, data_set: DataLoader):
logger.info("***** Running inference *****")
logger.info(" Batch size: {}".format(data_set.batch_size))
eval_loss = 0.0
nb_eval_steps = 0
preds = None
out_label_ids = None
for batch in tqdm(data_set, desc="Inference iteration"):
self.model.eval()
batch = tuple(t.to(self.device) for t in batch)
with torch.no_grad():
inputs = self._batch_mapper(batch)
outputs = self.model(**inputs)
if 'labels' in inputs:
tmp_eval_loss, logits = outputs[:2]
eval_loss += tmp_eval_loss.mean().item()
else:
logits = outputs[0]
nb_eval_steps += 1
model_output = logits.detach().cpu()
model_out_label_ids = inputs['labels'].detach().cpu(
) if 'labels' in inputs else None
if preds is None:
preds = model_output
out_label_ids = model_out_label_ids
else:
preds = torch.cat((preds, model_output), dim=0)
out_label_ids = torch.cat((out_label_ids, model_out_label_ids),
dim=0) if out_label_ids is not None else None
if out_label_ids is None:
return preds
return preds, out_label_ids
def _batch_mapper(self, batch):
mapping = {'input_ids': batch[0],
'attention_mask': batch[1],
# XLM don't use segment_ids
'token_type_ids': batch[2] if self.model_type in ['bert', 'quant_bert', 'xlnet']
else None}
if len(batch) == 4:
mapping.update({'labels': batch[3]})
return mapping
[docs] def evaluate_predictions(self, logits, label_ids):
raise NotImplementedError('evaluate_predictions method must be implemented in order to'
'be used for dev/test set evaluation')
[docs] def save_model_checkpoint(self, output_path: str, name: str):
"""
save model checkpoint
Args:
output_path (str): output path
name (str): name of checkpoint
"""
output_dir_path = os.path.join(output_path, name)
self.save_model(output_dir_path, save_checkpoint=True)
[docs]class InputFeatures(object):
"""A single set of features of data."""
def __init__(self, input_ids, input_mask, segment_ids, label_id=None, valid_ids=None):
self.input_ids = input_ids
self.input_mask = input_mask
self.segment_ids = segment_ids
self.label_id = label_id
self.valid_ids = valid_ids