728x90
반응형

(PyTorch) koGPT2 ChatBot

참고 : https://wikidocs.net/158023

 

9-3. koGPT2 ChatBot

프로그래밍에 사용된 패키지들의 버전은 다음과 같다. * huggingface-hub 0.2.1 pypi_0 pypi *…

wikidocs.net

"PyTorch 딥러닝 챗봇" wikidocs의 koGPT2 챗봇 만들기 page를 따라해보며 공부한 내용을 정리해보았다.

 

 

KoGPT2란?

참고 : https://github.com/SKT-AI/KoGPT2

 

GitHub - SKT-AI/KoGPT2: Korean GPT-2 pretrained cased (KoGPT2)

Korean GPT-2 pretrained cased (KoGPT2). Contribute to SKT-AI/KoGPT2 development by creating an account on GitHub.

github.com

KoGPT2는 SKT-AI에서 개발한 한국어 GPT-2 모델이다.

GPT-2는 주어진 텍스트의 다음 단어를 잘 예측할 수 있도록 학습된 언어 모델로, 문장 생성에 최적화되어 있다.

KoGPT2는 부족한 한국어 성능을 극복하기 위해 40GB 이상의 텍스트로 학습된 한국어 디코더 언어 모델이다.

 

 

필요한 library를 설치한다

!pip install transformers

 

Tokenizer를 선언한다

tokenizers 패키지의 Character BPE tokenizer로 학습되었다.

from transformers import PreTrainedTokenizerFast
Q_TKN = "<usr>"
A_TKN = "<sys>"
BOS = '</s>'
EOS = '</s>'
MASK = '<unused0>'
SENT = '<unused1>'
PAD = '<pad>'

# 허깅페이스 transformers 에 등록된 사전 학습된 koGTP2 토크나이저를 가져온다.
# PreTrainedTokenizerFast 클래스의 from_pretrained 메소드를 사용하여 사전 훈련된 토크나이저를 로드
tokenizer = PreTrainedTokenizerFast.from_pretrained("skt/kogpt2-base-v2", bos_token=BOS, eos_token=EOS, unk_token="<unk>", pad_token=PAD, mask_token=MASK,)
tokenizer.tokenize("안녕하세요. 한국어 GPT-2 입니다.😤:)l^o")

 

사전학습된 koGPT2 언어 모델 객체를 생성한다

import torch
from transformers import GPT2LMHeadModel # GPT2 LM Head Model : LM Head가 추가된 GPT-2 모델, 주로 자연어 생성 작업에 사용

# GPT2LMHeadModel 클래스의 from_pretrained 메소드를 사용하여 사전 훈련된 GPT-2 모델을 로드
model = GPT2LMHeadModel.from_pretrained('skt/kogpt2-base-v2')

text = '근육이 커지기 위해서는'
input_ids = tokenizer.encode(text, return_tensors='pt')
# encode() : token string을 token id 의 리스트로 변환
# return_tensors : 토큰화된 결과를 파이썬 정수 목록 대신 텐서로 반환
# 'tf': TensorFlow tf.constant 객체
# 'pt': PyTorch torch.Tensor 객체
# 'np': Numpy np.ndarray 객체

gen_ids = model.generate(input_ids,
                           max_length=128,
                           repetition_penalty=2.0,
                           pad_token_id=tokenizer.pad_token_id,
                           eos_token_id=tokenizer.eos_token_id,
                           bos_token_id=tokenizer.bos_token_id,
                           use_cache=True)

# decode() : tokenizer 와 vocabulary를 이용해서 token id를 string으로 변환
generated = tokenizer.decode(gen_ids[0])
print(generated)

 

Chatbot Dataset 다운로드하기

chatbot dataset 출처 : https://github.com/songys/Chatbot_data

 

GitHub - songys/Chatbot_data: Chatbot_data_for_Korean

Chatbot_data_for_Korean. Contribute to songys/Chatbot_data development by creating an account on GitHub.

github.com

import pandas as pd
import urllib.request

urllib.request.urlretrieve(
    "https://raw.githubusercontent.com/songys/Chatbot_data/master/ChatbotData.csv",
    filename="ChatBotData.csv",
)
Chatbot_Data = pd.read_csv("ChatBotData.csv")
# print(Chatbot_Data.shape)

Chatbot_Data = Chatbot_Data[:1000]
Chatbot_Data.head()

# Q : 발화
# A : 발화
# label : 일상다반사 0, 이별(부정) 1, 사랑(긍정) 2

 

Dataset 생성하기

from torch.utils.data import DataLoader, Dataset
import numpy as np
import re
# 챗봇 데이터를 처리하는 클래스를 만든다.
class ChatbotDataset(Dataset):
    def __init__(self, chats, max_len=40):  # 데이터셋의 전처리를 해주는 부분
        self._data = chats
        self.max_len = max_len
        self.q_token = Q_TKN
        self.a_token = A_TKN
        self.sent_token = SENT
        self.eos = EOS
        self.mask = MASK
        self.tokenizer = tokenizer

    def __len__(self):  # chatbotdata 의 길이를 리턴한다.
        return len(self._data)

    def __getitem__(self, idx):  # 로드한 챗봇 데이터를 차례차례 DataLoader로 넘겨주는 메서드
        turn = self._data.iloc[idx]
        q = turn["Q"]  # 질문을 가져온다.
        q = re.sub(r"([?.!,])", r" ", q)  # 구둣점들을 제거한다.

        a = turn["A"]  # 답변을 가져온다.
        a = re.sub(r"([?.!,])", r" ", a)  # 구둣점들을 제거한다.

        q_toked = self.tokenizer.tokenize(self.q_token + q + self.sent_token)
        q_len = len(q_toked)

        a_toked = self.tokenizer.tokenize(self.a_token + a + self.eos)
        a_len = len(a_toked)

        # #질문의 길이가 최대길이보다 크면
        # if q_len > self.max_len:
        #     a_len = self.max_len - q_len        #답변의 길이를 최대길이 - 질문길이
        #     if a_len <= 0:       #질문의 길이가 너무 길어 질문만으로 최대 길이를 초과 한다면
        #         q_toked = q_toked[-(int(self.max_len / 2)) :]   #질문길이를 최대길이의 반으로 
        #         q_len = len(q_toked)
        #         a_len = self.max_len - q_len              #답변의 길이를 최대길이 - 질문길이
        #     a_toked = a_toked[:a_len]
        #     a_len = len(a_toked)

        # #질문의 길이 + 답변의 길이가 최대길이보다 크면
        # if q_len + a_len > self.max_len:
        #     a_len = self.max_len - q_len        #답변의 길이를 최대길이 - 질문길이
        #     if a_len <= 0:       #질문의 길이가 너무 길어 질문만으로 최대 길이를 초과 한다면
        #         q_toked = q_toked[-(int(self.max_len / 2)) :]   #질문길이를 최대길이의 반으로 
        #         q_len = len(q_toked)
        #         a_len = self.max_len - q_len              #답변의 길이를 최대길이 - 질문길이
        #     a_toked = a_toked[:a_len]
        #     a_len = len(a_toked)

        # mask = 질문길이 0 + 답변길이 1 + 나머지 0
        mask = [0] * q_len + [1] * a_len + [0] * (self.max_len - q_len - a_len)

        # 답변 labels = [mask, mask, ...., mask, ..., <bos>,..답변.. <eos>, <pad>....]
        labels = [self.mask,] * q_len + a_toked[1:]    
        # 답변 labels을 index 로 만든다.
        labels_ids = self.tokenizer.convert_tokens_to_ids(labels)        
        # 최대길이만큼 PADDING
        while len(labels_ids) < self.max_len:
            labels_ids += [self.tokenizer.pad_token_id]

        # 질문 + 답변을 index 로 만든다.    
        token_ids = self.tokenizer.convert_tokens_to_ids(q_toked + a_toked)
        # 최대길이만큼 PADDING
        while len(token_ids) < self.max_len:
            token_ids += [self.tokenizer.pad_token_id]

        #질문+답변, 마스크, 답변
        return (token_ids, np.array(mask), labels_ids)
def collate_batch(batch):
    data = [item[0] for item in batch]
    mask = [item[1] for item in batch]
    label = [item[2] for item in batch]
    return torch.LongTensor(data), torch.LongTensor(mask), torch.LongTensor(label)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

 

모델 학습하기

model.to(device)
model.train()
learning_rate = 3e-5
criterion = torch.nn.CrossEntropyLoss(reduction="none")
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

epoch = 10
Sneg = -1e18
losses  = 0

train_set = ChatbotDataset(Chatbot_Data, max_len=40)

#윈도우 환경에서 num_workers 는 무조건 0으로 지정, 리눅스에서는 2
train_dataloader = DataLoader(train_set, batch_size=32, num_workers=0, shuffle=True, collate_fn=collate_batch,)

print ("start")
for epoch in range(epoch):
    losses  = 0
    # train_dataloader에서 배치 단위로 샘플을 가져와 학습
    for batch_idx, samples in enumerate(train_dataloader):
        # Gradient 0으로 초기화
        optimizer.zero_grad()

        # 샘플에서 token_ids, mask, label을 가져옴
        token_ids, mask, label = samples
        token_ids = token_ids.to(device)
        mask = mask.to(device)
        label = label.to(device)

        out = model(token_ids)
        out = out.logits      #Returns a new tensor with the logit of the elements of input
        # print(out.shape)
        # print(mask.shape)
        # print(mask.unsqueeze(dim=2).shape)

        # repeat_interleave : out의 2번째 차원의 크기만큼 반복(repeats=out.shape[2]), 2번째(dim=2) 차원을 따라 반복이 수행
        mask_3d = mask.unsqueeze(dim=2).repeat_interleave(repeats=out.shape[2], dim=2)
        # print(mask_3d.shape)

        # mask_3d가 1인 위치에는 출력 값을 그대로 사용,
        # mask_3d가 0인 위치에는 out 배열과 동일한 크기의 배열을 생성하고, 모든 원소의 값을 Sneg 값으로 설정
        mask_out = torch.where(mask_3d == 1, out, Sneg * torch.ones_like(out))

        # mask_out 배열과 label 사용하여 손실 함수를 계산
        loss = criterion(mask_out.transpose(2, 1), label)

        # 평균 loss 만들기 avg_loss[0] / avg_loss[1] <- loss 정규화
        avg_loss = loss.sum() / mask.sum()

        # Backward pass (gradient 계산)
        avg_loss.backward()

        # Parameter update
        optimizer.step()

         # 손실 누적
        losses += avg_loss.item()
    print(f'epoch : %5d | loss : %.5f ' %(epoch+1, losses / len(list(train_dataloader))))
print ("end")

 

모델 테스트하기

with torch.no_grad():
    while 1:
        q = input("user > ").strip()
        if q == "quit":
            break
        a = ""
        while 1:
            input_ids = torch.LongTensor(tokenizer.encode(Q_TKN + q + SENT + A_TKN + a)).unsqueeze(dim=0).to(device)
            pred = model(input_ids)
            pred = pred.logits.to(device)
            gen = tokenizer.convert_ids_to_tokens(torch.argmax(pred, dim=-1).squeeze().cpu().numpy().tolist())[-1]
            # PyTorch 텐서는 GPU 메모리에 저장될 수 있지만, NumPy 배열은 항상 호스트(CPU) 메모리에 저장된다.
            # 따라서, GPU 메모리에 저장된 PyTorch 텐서를 직접 NumPy 배열로 변환할 수 없다
            # numpy 메소드를 사용하려면 다음과 같이 처리해줘야한다
            # tensor = tensor.cpu().numpy()
            if gen == EOS:
                break
            a += gen.replace("▁", " ")
        print("Chatbot > {}".format(a.strip()))

결과 : 

user > 오늘 날씨가 너무 흐려요
Chatbot > 집밖에 나가기가 힘들것 같아요
user > 비가 올 것 같아요
Chatbot > 집밖에 나가기가 힘들것 같아요
user > 커피를 너무 많이 마셨어요
Chatbot > 소화제 드세요
user > 내일 출근하기 싫어요
Chatbot > 오늘은 휴가가 간절하겠네요
user > quit

728x90
반응형