Seq2Seq 문장생성

Sequence2Sequence 모델을 활용해서 문장생성을 수행하는 테스트를 해보겠습니다. 테스트 환경은 Google Colab의 GPU를 활용합니다.

Google Drive에 업로드되어 있는 text 파일을 읽기 위해서 필요한 라이브러리를 임포트합니다. 해당 파일을 실행시키면 아래와 같은 이미지가 표시됩니다.

해당 링크를 클릭하고 들어가면 코드 값이 나오는데 코드값을 복사해서 입력하면 구글 드라이브가 마운트 되고 구글 드라이브에 저장된 파일들을 사용할 수 있게됩니다.

from google.colab import drive
drive.mount('/content/gdrive')

정상적으로 마운트 되면 “Mounted at /content/gdrive”와 같은 텍스트가 표시됩니다.

마운트 작업이 끝나면 필요한 라이브러리 들을 임포트합니다. 파이토치(PyTorch)를 사용하기 때문에 학습에 필요한 라이브러리 들을 임포트하고 기타 numpy, pandas도 함께 임포트합니다.

config 파일에는 학습에 필요한 몇가지 파라메터가 정의되어 있습니다. 학습이 완료된 후 모델을 저장하고 다시 불러올 때에 config 데이터가 저장되어 있으면 학습된 모델의 정보를 확인할 수 있어 편리합니다.

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import numpy as np
import pandas as pd
import os
from argparse import Namespace

from collections import Counter

config = Namespace(
    train_file='gdrive/***/book_of_genesis.txt', seq_size=7, batch_size=100...
)

이제 학습을 위한 파일을 읽어오겠습니다. 파일은 성경 “창세기 1장”을 학습 데이터로 활용합니다. 테스트 파일은 영문 버전을 활용합니다. 파일을 읽은 후에 공백으로 분리해서 배열에 담으면 아래와 같은 형태의 값을 가지게됩니다.

with open(config.train_file, 'r', encoding='utf-8') as f:
    text = f.read()
text = text.split()
['In', 'the', 'beginning,', 'God', 'created', 'the', 'heavens', 'and', 'the', 'earth.', 'The', 'earth', 'was', 'without', 'form', 'and', 'void,', 'and', 'darkness', 'was'...

이제 학습을 위해 중복 단어를 제거하고 word2index, index2word 형태의 데이터셋을 생성합니다. 이렇게 만들어진 데이텃셋을 통해서 각 문장을 어절 단위로 분리하고 각 배열의 인덱스 값을 맵핑해서 문장을 숫자 형태의 값을 가진 데이터로 변경해줍니다. 이 과정은 자연어를 이해하지 못하는 컴퓨터가 어떠한 작업을 수행할 수 있도록 수치 형태의 데이터로 변경하는 과정입니다.

word_counts = Counter(text)
sorted_vocab = sorted(word_counts, key=word_counts.get, reverse=True)
int_to_vocab = {k: w for k, w in enumerate(sorted_vocab)}
vocab_to_int = {w: k for k, w in int_to_vocab.items()}
n_vocab = len(int_to_vocab)

print('Vocabulary size', n_vocab)

int_text = [vocab_to_int[w] for w in text] # 전체 텍스트를 index로 변경

다음은 학습을 위한 데이터를 만드는 과정입니다. 이 과정이 중요합니다. 데이터는 source_word와 target_word로 분리합니다. source_word는 [‘In’, ‘the’, ‘beginning,’, ‘God’, ‘created’, ‘the’, ‘heavens’], target_word는 [ ‘the’, ‘beginning,’, ‘God’, ‘created’, ‘the’, ‘heavens’,’and’]의 형태입니다.
즉, source_word 문장 배열 다음에 target_word가 순서대로 등장한다는 것을 모델이 학습하도록 하는 과정입니다.

여기서 문장의 크기는 7로 정했습니다. 더 큰 사이즈로 학습을 진행하면 문장을 생성할 때 더 좋은 예측을 할 수 있겠으나 계산량이 많아져서 학습 시간이 많이 필요합니다. 테스트를 통해서 적정 수준에서 값을 정해보시기 바랍니다.

source_words = []
target_words = []
for i in range(len(int_text)):
    ss_idx, se_idx, ts_idx, te_idx = i, (config.seq_size+i), i+1, (config.seq_size+i)+1
    if len(int_text[ts_idx:te_idx]) >= config.seq_size:
        source_words.append(int_text[ss_idx:se_idx])
        target_words.append(int_text[ts_idx:te_idx])

아래와 같이 어떻게 값이 들어가 있는지를 확인해보기 위해서 간단히 10개의 데이터를 출력해보겠습니다.

for s,t in zip(source_words[0:10], target_words[0:10]):
  print('source {} -> target {}'.format(s,t))
source [106, 0, 107, 3, 32, 0, 16] -> target [0, 107, 3, 32, 0, 16, 1]
source [0, 107, 3, 32, 0, 16, 1] -> target [107, 3, 32, 0, 16, 1, 0]
source [107, 3, 32, 0, 16, 1, 0] -> target [3, 32, 0, 16, 1, 0, 26]
source [3, 32, 0, 16, 1, 0, 26] -> target [32, 0, 16, 1, 0, 26, 62]
source [32, 0, 16, 1, 0, 26, 62] -> target [0, 16, 1, 0, 26, 62, 12]
source [0, 16, 1, 0, 26, 62, 12] -> target [16, 1, 0, 26, 62, 12, 4]
source [16, 1, 0, 26, 62, 12, 4] -> target [1, 0, 26, 62, 12, 4, 108]
source [1, 0, 26, 62, 12, 4, 108] -> target [0, 26, 62, 12, 4, 108, 109]
source [0, 26, 62, 12, 4, 108, 109] -> target [26, 62, 12, 4, 108, 109, 1]
source [26, 62, 12, 4, 108, 109, 1] -> target [62, 12, 4, 108, 109, 1, 110]

이제 학습을 위해서 모델을 생성합니다. 모델은 Encoder와 Decoder로 구성됩니다. 이 두 모델을 사용하는 것이 Sequence2Sequece의 전형적인 구조입니다. 해당 모델에 대해서 궁금하신 점은 pytorch 공식 사이트를 참조하시기 바랍니다. 인코더와 디코더에 대한 자세한 설명은 아래의 그림으로 대신하겠습니다. GRU 대신에 LSTM을 사용해도 무방합니다.

https://tutorials.pytorch.kr/intermediate/seq2seq_translation_tutorial.html

아래는 인코더의 구조입니다. 위의 그림에서와 같이 인코더는 두개의 값이 GRU 셀(Cell)로 들어가게 됩니다. 하나는 입력 값이 임베딩 레이어를 통해서 나오는 값과 또 하나는 이전 단계의 hidden 값입니다. 최종 출력은 입력을 통해서 예측된 값인 output, 다음 단계에 입력으로 들어가는 hidden이 그것입니다.

기본 구조의 seq2seq 모델에서는 output 값은 사용하지 않고 이전 단계의 hidden 값을 사용합니다. 최종 hidden 값은 입력된 문장의 전체 정보를 어떤 고정된 크기의 Context Vector에 축약하고 있기 때문에 이 값을 Decoder의 입력으로 사용합니다.

참고로 이후에 테스트할 Attention 모델은 이러한 구조와는 달리 encoder의 출력 값을 사용하는 모델입니다. 이 값을 통해서 어디에 집중할지를 정하게 됩니다.

class Encoder(nn.Module):

    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(input_size, hidden_size) #199->10
        self.gru = nn.GRU(hidden_size, hidden_size) #20-20

    def forward(self, x, hidden):
        x = self.embedding(x).view(1,1,-1)
        #print('Encoder forward embedding size {}'.format(x.size()))
        x, hidden = self.gru(x, hidden)
        return x, hidden

이제 아래의 그림과 같이 Decoder를 설계합니다. Decoder 역시 GRU 셀(Cell)을 가지고 있습니다.

https://tutorials.pytorch.kr/intermediate/seq2seq_translation_tutorial.html
class Decoder(nn.Module):
    def __init__(self, hidden_size, output_size):
        super().__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(output_size, hidden_size) #199->10
        self.gru = nn.GRU(hidden_size, hidden_size) #10->10
        self.out = nn.Linear(hidden_size, output_size) #10->199
        self.softmax = nn.LogSoftmax(dim=1)
        
    def forward(self, x, hidden):
        x = self.embedding(x).view(1,1,-1)
        x, hidden = self.gru(x, hidden)
        x = self.softmax(self.out(x[0]))
        return x, hidden

이제 GPU를 사용하기 위해서 설정을 수행합니다. Google Colab을 활용하시면 별도의 설정작업 없이 GPU를 사용할 수 있습니다.

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

인코더와 디코더 입출력 정보를 셋팅합니다.

enc_hidden_size = 50
dec_hidden_size = enc_hidden_size
encoder = Encoder(n_vocab, enc_hidden_size).to(device) # source(199) -> embedding(10)
decoder = Decoder(dec_hidden_size, n_vocab).to(device) # embedding(199) -> target(199)

encoder_optimizer = optim.SGD(encoder.parameters(), lr=0.01)
decoder_optimizer = optim.SGD(decoder.parameters(), lr=0.01)

criterion = nn.NLLLoss()

해당 모델의 이미지를 아래의 그림과 같이 나타낼 수 있습니다.

그림1 Sequence2Sequence Model
Encoder(
  (embedding): Embedding(199, 50)
  (gru): GRU(50, 50)
)
Decoder(
  (embedding): Embedding(199, 50)
  (gru): GRU(50, 50)
  (out): Linear(in_features=50, out_features=199, bias=True)
  (softmax): LogSoftmax(dim=1)
)

데이터를 100개씩 나눠서 훈련 할 수 있도록 배치 모델을 작성합니다.

pairs = list(zip(source_words, target_words))
def get_batch(pairs, batch_size):
  pairs_length = len(pairs)
  for ndx in range(0, pairs_length, batch_size):
    #print(ndx, min(ndx+batch_size, pairs_length))
    yield pairs[ndx:min(ndx+batch_size, pairs_length)]

해당 모델은 500번 학습을 수행합니다. 각 batch, epoch 마다 loss 정보를 표시합니다. 표1 은 마지막 스텝의 loss와 epoch 정보입니다.

number_of_epochs = 501
for epoch in range(number_of_epochs):
    total_loss = 0
    #for pair in get_batch(pairs, config.batch_size): # batch_size 100
    for pair in get_batch(pairs, 100): # batch_size 100
      batch_loss = 0
       
      for si, ti in pair:
        x = torch.Tensor(np.array([si])).long().view(-1,1).to(device)
        y = torch.Tensor(np.array([ti])).long().view(-1,1).to(device)
        encoder_hidden = torch.zeros(1,1,enc_hidden_size).to(device)

        for j in range(config.seq_size):
            _, encoder_hidden = encoder(x[j], encoder_hidden)

        decoder_hidden = encoder_hidden
        decoder_input = torch.Tensor([[0]]).long().to(device)

        loss = 0

        for k in range(config.seq_size):
            decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
            decoder_input = y[k]
            loss += criterion(decoder_output, y[k])

        batch_loss += loss.item()/config.seq_size
        encoder_optimizer.zero_grad()
        decoder_optimizer.zero_grad()

        loss.backward()

        encoder_optimizer.step()
        decoder_optimizer.step()

      total_loss += batch_loss/config.batch_size
      print('batch_loss {:.5f}'.format(batch_loss/config.batch_size))
    print('epoch {}, loss {:.10f}'.format(epoch, total_loss/(len(pairs)//config.batch_size)))
...
batch_loss 0.00523
batch_loss 0.00766
batch_loss 0.01120
batch_loss 0.00735
batch_loss 0.01218
batch_loss 0.00873
batch_loss 0.00352
batch_loss 0.00377
epoch 500, loss 0.0085196330

표1. 마지막 batch, epoch 학습 정보

학습이 종료된 모델을 저장소에 저장합니다. 저장 할 때에 학습 정보가 저장되어 있는 config 내용도 포함하는 것이 좋습니다.

# Save best model weights.
torch.save({
  'encoder': encoder.state_dict(), 'decoder':decoder.state_dict(),
  'config': config,
}, 'gdrive/***/model.genesis.210122')

학습이 완료된 후에 해당 모델이 잘 학습되었는지 확인해보겠습니다. 학습은 “darkness was”라는 몇가지 단어를 주고 모델이 어떤 문장을 생성하는 지를 알아 보는 방식으로 수행합니다.

decoded_words = []

words = [vocab_to_int['darkness'], vocab_to_int['was']]
x = torch.Tensor(words).long().view(-1,1).to(device)

encoder_hidden = torch.zeros(1,1,enc_hidden_size).to(device)

for j in range(x.size(0)):
    _, encoder_hidden = encoder(x[j], encoder_hidden)

decoder_hidden = encoder_hidden
decoder_input = torch.Tensor([[words[1]]]).long().to(device)  

for di in range(20):
  decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
  _, top_index = decoder_output.data.topk(1)
  decoded_words.append(int_to_vocab[top_index.item()])

  decoder_input = top_index.squeeze().detach()

predict_words = decoded_words    
predict_sentence = ' '.join(predict_words)
print(predict_sentence)

Word2Vec 구현

Word2Vec을 pytorch를 통해서 구현해보겠습니다. 파이토치 공식홈에도 유사한 예제가 있으니 관심있으신 분들은 공식홈에 있는 내용을 읽어보시는 것이 도움이 되시리라 생각됩니다.

먼저 아래와 같이 필요한 라이브러리들을 임포트합니다. 마지막에 임포트한 matplotlib의 경우는 시각화를 위한 것으로 단어들이 어떤 상관성을 가지는지 확인해보기 위함입니다.

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import numpy as np
import pandas as pd

아래와 같은 텍스트를 선언합니다. 몇개의 단어로 구성된 문장이고 중복된 문장들을 복사해서 붙여 넣었습니다. Word2Vec을 구현하는데 여러 방식이 있지만 이번 예제에서는 Skip-Gram 방식을 사용합니다.

위 구조에서 핵심은 가중치행렬 WW, W′W′ 두 개입니다. Word2Vec의 학습결과가 이 두 개의 행렬입니다. 그림을 자세히 보시면 입력층-은닉층, 은닉층-출력층을 잇는 가중치 행렬의 모양이 서로 전치(transpose)한 것과 동일한 것을 볼 수 있습니다. 그런데 전치하면 그 모양이 같다고 해서 완벽히 동일한 행렬은 아니라는 점에 주의할 필요가 있습니다. 물론 두 행렬을 하나의 행렬로 취급(tied)하는 방식으로 학습을 진행할 수 있고, 학습이 아주 잘되면 WW와 W′W′ 가운데 어떤 걸 단어벡터로 쓰든 관계가 없다고 합니다.

또 다른 방법은 COBOW(Continuous Bag-of-Words) 방식이 있습니다. 이 방식은 Skip-Gram과 반대의 방식입니다.
CBOW는 주변에 있는 단어들을 가지고, 중간에 있는 단어들을 예측하는 방법입니다. 반대로, Skip-Gram은 중간에 있는 단어로 주변 단어들을 예측하는 방법입니다. 메커니즘 자체는 거의 동일하기 때문에 이해하는데 어렵지는 않습니다.

보통 딥러닝이라함은, 입력층과 출력층 사이의 은닉층의 개수가 충분히 쌓인 신경망을 학습할 때를 말하는데 Word2Vec는 입력층과 출력층 사이에 하나의 은닉층만이 존재합니다. 이렇게 은닉층(hidden Layer)이 1개인 경우에는 일반적으로 심층신경망(Deep Neural Network)이 아니라 얕은신경망(Shallow Neural Network)이라고 부릅니다. 또한 Word2Vec의 은닉층은 일반적인 은닉층과는 달리 활성화 함수가 존재하지 않으며 룩업 테이블이라는 연산을 담당하는 층으로 일반적인 은닉층과 구분하기 위해 투사층(projection layer)이라고 부르기도 합니다.

corpus = [
    'he is a king',
    'she is a queen',
    'he is a man',
    'she is a woman',
    'warsaw is poland capital',
    'berlin is germany capital',
    'paris is france capital',
    'seoul is korea capital', 
    'bejing is china capital',
    'tokyo is japan capital',
]

def tokenize_corpus(corpus):
    tokens = [x.split() for x in corpus]
    return tokens

tokenized_corpus = tokenize_corpus(corpus)

단어들의 중복을 제거하여 vocabulary 리스트를 만들고 word2idx, idx2word dict를 만듭니다.

vocabulary = []
for sentence in tokenized_corpus:
    for token in sentence:
        if token not in vocabulary:
            vocabulary.append(token)

word2idx = {w: idx for (idx, w) in enumerate(vocabulary)}
idx2word = {idx: w for (idx, w) in enumerate(vocabulary)}

vocabulary_size = len(vocabulary)

Skip-Gram이나 CBOW 모두 window_size 가 필요합니다. 해당 파라메터는 주변의 단어를 몇개까지 학습에 이용할 것인가를 결정해주는 파라메터입니다. 이번 예제에서는 2개의 단어만 학습에 활용하도록 하겠습니다.

window_size = 2
idx_pairs = []

for sentence in tokenized_corpus:
    indices = [word2idx[word] for word in sentence]
    for center_word_pos in range(len(indices)):
        for w in range(-window_size, window_size + 1):
            context_word_pos = center_word_pos + w
            if context_word_pos < 0 or context_word_pos >= len(indices) or center_word_pos == context_word_pos:
                continue
            context_word_idx = indices[context_word_pos]
            idx_pairs.append((indices[center_word_pos], context_word_idx))

idx_pairs = np.array(idx_pairs) 

위와 같은 과정을 통해서 idx_pairs를 만들 수 있습니다. array에서 10개만 출력해보면 아래와 같은 배열을 볼 수 있습니다.

이것은 “he is a man”이라는 단어를 학습 할 때에 [he, is],[he,a],[is, he],[is,a],[is,man] … 형태의 학습데이터입니다. COBOW 방식은 주변의 단어들을 통해서 목적단어를 예측하는 형태라면 skip-gram 방식은 목적단어를 통해서 주변에 나올 수 있는 단어 [is, a]를 예측하는 방법으로 학습이 진행됩니다.

print(idx_pairs[0:10])
array([[0, 1],
       [0, 2],
       [1, 0],
       [1, 2],
       [1, 3],
       [2, 0],
       [2, 1],
       [2, 3],
       [3, 1],
       [3, 2]])

입력 데이터를 One-Hot 형태로 변경합니다. 참고로 One-Hot 형태를 사용하지 않고 nn.Embedding()을 통해서 룩업테이블(Look-Up Table)을 만들어 사용해도 무방합니다. nn.Embedding()을 사용하는 법은 이전 글에서 다뤘기 때문에 자세한 내용은 해당 게시물을 참조하시기 바랍니다.

def get_input_layer(word_idx):
    return np.eye(vocabulary_size)[word_idx]

X = []
y = []
for data, target in idx_pairs:
    X.append(get_input_layer(data))
    y.append(target)
    
X = torch.FloatTensor(np.array(X))
y = torch.Tensor(np.array(y)).long()

이제 신경망 모듈을 아래와 같이 생성합니다. 입력과 출력 사이에 2차원의 벡터형태로 정보를 압축하게됩니다.

class Word2VecModel(nn.Module):
    def __init__(self,inout_dim):
        super().__init__()
        self.linear1 = nn.Linear(inout_dim,2)
        self.linear2 = nn.Linear(2,inout_dim)
        
    def forward(self,x):
        return self.linear2(self.linear1(x))
    
model = Word2VecModel(X.size(dim=-1))

아래와 같이 데이터를 훈련합니다. 예측치(prediction)와 실제 값(y)를 통해서 cost를 계산하고 이를 출력해줍니다.

# optimizer 설정
optimizer = optim.Adam(model.parameters())

nb_epochs = 100
for epoch in range(nb_epochs + 1):

    # H(x) 계산
    prediction = model(X)

    # cost 계산
    cost = F.cross_entropy(prediction, y)

    # cost로 H(x) 개선
    optimizer.zero_grad()
    cost.backward()
    optimizer.step()
    
    # 20번마다 로그 출력
    if epoch % 100 == 0:
        print('Epoch {:4d}/{} Cost: {:.6f}'.format(
            epoch, nb_epochs, cost.item()
        ))

훈련이 완료된 후에 생성된 weight 정보를 출력해봅니다.

vector = model.state_dict()['linear2.weight'] + model.state_dict()['linear2.bias'].view(-1,1)
w2v_df = pd.DataFrame(vector.numpy(), columns = ['x1', 'x2'])
w2v_df['word'] = vocab
w2v_df = w2v_df[['word','x1','x2']]
w2v_df
ano = w2v_df['word'].values
x1 = w2v_df['x1'].values
x2 = w2v_df['x2'].values

fig, ax = plt.subplots(figsize=(5,5))
ax.scatter(x1, x2)

for i, txt in enumerate(ano):
    ax.annotate(txt, (x1[i], x2[i]))

2차원 벡터를 통해서 아래와 같이 시각화해봅니다.

Reference

[1]https://ratsgo.github.io/from%20frequency%20to%20semantics/2017/03/30/word2vec/
[2]https://towardsdatascience.com/nlp-101-word2vec-skip-gram-and-cbow-93512ee24314
[3]https://wikidocs.net/22660

Seq2Seq 문장번역

파이토치 Seq2Seq 예제

import random
import torch
import torch.nn as nn
import torch.optim as optim
torch.manual_seed(0)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
raw = ['I called Tom for help.	나는 톰에게 도움을 요청했다.',
'I do not like science.	나는 과학이 싫어.',
'I hate myself as well.	나도 내 자신을 싫어해.',
'I knew Tom would lose.	톰이 질 거라는 것을 난 알고 있었어.',
'I know Tom personally.	난 톰을 개인적으로 알고 있어.',
'I like Korean cuisine.	전 한국 요리가 좋아요.',
'I like Korean cuisine.	전 한국 요리를 좋아해요.',
'I like helping others.	나는 남을 돕는 것을 좋아한다.',
'I really like puppies.	저는 강아지가 정말 좋아요.',
'I run faster than Tom.	나는 톰보다 빠르게 달릴 수 있어.',
'I think Tom is lonely.	톰이 외로워하는 것 같아.',
'I think they like you.	그들이 널 좋아하는 것 같아.',
'I want to go to sleep.	나 자러 가고 싶어.',
'I want to go to sleep.	나 자고 싶어.',
'I want to visit Korea.	나는 한국에 들르고 싶다.']

사용한 데이터는 http://www.manythings.org/anki/ 에서 kor-eng.zip 파일을 다운로드 받아 일부 데이터만 사용했습니다. 해당 사이트에 들어가면 한국어 외에도 다양한 형태의 파일을 다운 받을 수 있습니다.

SOS_token = 0 # 문장의 시작 Start of Sentence
EOS_token = 1 #  문장의 끝 End of Sentence
class Vocab:
    def __init__(self):
        self.vocab2index = {"<SOS>":SOS_token, "<EOS>":EOS_token}
        self.index2vocab = {SOS_token:"<SOS>", EOS_token:"<SOS>"}
        self.vocab_count = {}
        self.n_vocab = len(self.vocab2index)
    
    def add_vocab(self, sentence):
        for word in sentence.split(' '):
            if word not in self.vocab2index:
                self.vocab2index[word] = self.n_vocab
                self.vocab_count[word] = 1
                self.index2vocab[self.n_vocab] = word
                self.n_vocab += 1
            else:
                self.vocab_count[word] += 1
# declare simple encoder
class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(input_size, hidden_size) # Embedding(17, 16)
        self.gru = nn.GRU(hidden_size, hidden_size)

    def forward(self, x, hidden):
        x = self.embedding(x).view(1, 1, -1)
        x, hidden = self.gru(x, hidden)
        return x, hidden

    
# declare simple decoder
class Decoder(nn.Module):
    def __init__(self, hidden_size, output_size):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size, num_layers=1, batch_first=True)
        self.out = nn.Linear(hidden_size, output_size)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x, hidden):
        x = self.embedding(x).view(1, 1, -1)
        x, hidden = self.gru(x, hidden) # lstm을 사용할 경우 해당 위치 수정
        x = self.softmax(self.out(x[0]))
        return x, hidden
# read and preprocess the corpus data
def preprocess(corpus):
    print("reading corpus...")
    pairs = []
    for line in corpus:
        pairs.append([s for s in line.strip().lower().split("\t")])
    print("Read {} sentence pairs".format(len(pairs)))

    pairs = [pair for pair in pairs]
    print("Trimmed to {} sentence pairs".format(len(pairs)))

    source_vocab = Vocab()
    target_vocab = Vocab()

    print("Counting words...")
    for pair in pairs:
        source_vocab.add_vocab(pair[0])
        target_vocab.add_vocab(pair[1])
    print("source vocab size =", source_vocab.n_vocab)
    print("target vocab size =", target_vocab.n_vocab)

    return pairs, source_vocab, target_vocab

# 데이터셋, 입력단어정보, 출력단어정보
pairs, source_vocab, target_vocab = preprocess(raw)

훈련용 입출력 데이터셋을 위와 같이 만든후 이제 인코더, 디코더 모델을 만들어야 합니다. 먼저 만들기 전에 인코더-디코더의 입출력 정보에 대하여 직접 그림으로 그려보시기를 추천합니다. 가장 좋은 것은 노트에 펜으로 그려보시는 것이 좋겠지만 그렇지 않다면 머리속으로 어떤 입력이 들어오고 어떤 출력이 나가는지에 대한 정보를 설계하는 과정이 필요합니다.

이런 과정이 없으면 나중에 인코더와 디코더를 설계할 때에 혼동하기 쉽기 때문에 반드시 모델의 입출력 흐름을 구상해보시기 바랍니다.

본 예제의 인코더-디코더 정보는 다음과 같습니다.
인코더 : input_vector(41) -> Embedding(41,30) -> LSTM(30,30)
디코더 : Embedding(52,30) -> LSTM(30, 52) – hidden_vector(52)

enc_hidden_size = 30
dec_hidden_size = enc_hidden_size
enc = Encoder(source_vocab.n_vocab, enc_hidden_size).to(device)
dec = Decoder(dec_hidden_size, target_vocab.n_vocab).to(device)
def tensorize(vocab, sentence):
    idx = [vocab.vocab2index[word] for word in sentence.lower().split(' ')]
    idx.append(vocab.vocab2index['<EOS>'])
    return torch.Tensor(idx).long().to(device).view(-1,1)
tensorize(source_vocab, 'I called Tom for help.')
output : tensor([[2], [3], [4], [5], [6], [1]])
training_source = [tensorize(source_vocab, pair[0]) for pair in pairs]
training_target = [tensorize(target_vocab, pair[1]) for pair in pairs]

Train

loss_total = 0
number_epoch = 5001

encoder_optimizer = optim.SGD(enc.parameters(), lr=0.01)
decoder_optimizer = optim.SGD(dec.parameters(), lr=0.01)

criterion = nn.NLLLoss()

for epoch in range(number_epoch):
    epoch_loss = 0
    
    for i in range(len(training_source)):
        encoder_optimizer.zero_grad()
        decoder_optimizer.zero_grad()
        
        source_tensor = training_source[i]
        target_tensor = training_target[i]

        encoder_hidden = torch.zeros([1, 1, enc.hidden_size]).to(device)

        source_length = source_tensor.size(0)
        target_length = target_tensor.size(0)
        
        loss = 0

        for enc_input in range(source_length):
            _, encoder_hidden = enc(source_tensor[enc_input], encoder_hidden)

        decoder_input = torch.Tensor([[SOS_token]]).long().to(device)
        decoder_hidden = encoder_hidden # connect encoder output to decoder input

        for di in range(target_length):
            decoder_output, decoder_hidden = dec(decoder_input, decoder_hidden)
            #print(decoder_output, target_tensor[di], criterion(decoder_output, target_tensor[di]))
            loss += criterion(decoder_output, target_tensor[di])
            decoder_input = target_tensor[di]  # teacher forcing
        
        loss.backward()

        encoder_optimizer.step()
        decoder_optimizer.step()
        
        #print(loss.item(),target_length)
        epoch_loss += loss.item()/target_length
        #loss_total += loss_epoch
    if epoch % 100 == 0:
        print('--- epoch {}, total loss {} '.format(epoch,float(epoch_loss/15)))

Evaluate

for pair in pairs:
    print(">", pair[0])
    print("=", pair[1])
    source_tensor = tensorize(source_vocab, pair[0])
    source_length = source_tensor.size()[0]
    encoder_hidden = torch.zeros([1, 1, enc.hidden_size]).to(device)

    for ei in range(source_length):
        _, encoder_hidden = enc(source_tensor[ei], encoder_hidden)
        #print(encoder_hidden.size()) # 1,1,16

    decoder_input = torch.Tensor([[SOS_token]], device=device).long()
    decoder_hidden = encoder_hidden
    decoded_words = []

    for di in range(20):
        decoder_output, decoder_hidden = dec(decoder_input, decoder_hidden)
        #print('decoder_iput',decoder_input, 'decoder_output',decoder_output)
        _, top_index = decoder_output.data.topk(1)
        if top_index.item() == EOS_token:
            decoded_words.append("<EOS>")
            break
        else:
            decoded_words.append(target_vocab.index2vocab[top_index.item()])

        decoder_input = top_index.squeeze().detach()

    predict_words = decoded_words
    predict_sentence = " ".join(predict_words)
    print("<", predict_sentence)
    print("")
> i called tom for help.
= 나는 톰에게 도움을 요청했다.
< 나는 톰에게 도움을 요청했다. <EOS>

> i do not like science.
= 나는 과학이 싫어.
< 나는 과학이 싫어. <EOS>

> i hate myself as well.
= 나도 내 자신을 싫어해.
< 나도 내 자신을 싫어해. <EOS>

> i knew tom would lose.
= 톰이 질 거라는 것을 난 알고 있었어.
< 톰이 질 거라는 것을 난 알고 있었어. <EOS>

> i know tom personally.
= 난 톰을 개인적으로 알고 있어.
< 난 톰을 개인적으로 알고 있어. <EOS>

> i like korean cuisine.
= 전 한국 요리가 좋아요.
< 전 한국 요리를 좋아해요. <EOS>

> i like korean cuisine.
= 전 한국 요리를 좋아해요.
< 전 한국 요리를 좋아해요. <EOS>

> i like helping others.
= 나는 남을 돕는 것을 좋아한다.
< 나는 남을 돕는 것을 좋아한다. <EOS>

> i really like puppies.
= 저는 강아지가 정말 좋아요.
< 저는 강아지가 정말 좋아요. <EOS>

> i run faster than tom.
= 나는 톰보다 빠르게 달릴 수 있어.
< 나는 톰보다 빠르게 달릴 수 있어. <EOS>

> i think tom is lonely.
= 톰이 외로워하는 것 같아.
< 톰이 외로워하는 것 같아. <EOS>

> i think they like you.
= 그들이 널 좋아하는 것 같아.
< 그들이 널 좋아하는 것 같아. <EOS>

> i want to go to sleep.
= 나 자러 가고 싶어.
< 나 자고 싶어. <EOS>

> i want to go to sleep.
= 나 자고 싶어.
< 나 자고 싶어. <EOS>

> i want to visit korea.
= 나는 한국에 들르고 싶다.
< 나는 한국에 들르고 싶다. <EOS>

PyTorch DataLoader Example

sklearn의 붓꽃 데이터를 활용하여 pytorch와 dataloader를 활용하여 분류 문제를 풀어 보겠습니다.

iris 데이터셋을 받아서 pandas로 데이터를 변환합니다. 변환 과정이 반드시 필요한 것은 아니지만 데이터셋을 변경하거나 학습용 컬럼 정보를 수정할 때에 도움이 됩니다.

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.datasets import load_iris
iris = load_iris()

df = pd.DataFrame(iris.data)
df.columns = iris.feature_names
df['class'] = iris.target

다음으로 PyTorch로 데이터를 import하여 학습용 데이터를 생성합니다. 학습용 데이터는 train_data와 valid_data로 분리하되 8:2 비율로 분리합니다.

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

## Prepare Dataset
data = torch.from_numpy(df.values).float()
#data.shape = torch.Size([150, 5])

# 데이터셋에서 feature 정보와 label 데이터를 분리하여 x,y 데이터를 생성
x = data[:,:4]
y = data[:,[-1]]

# train, valid 데이터셋 분리, 데이터는 8:2 or 7:3 생성
ratio = [.8, .2]

train_cnt = int(data.size(0) * ratio[0])
valid_cnt = data.size(0) - train_cnt
print(train_cnt, valid_cnt) #120, 30

# torch.randperm을 사용해서 랜덤한 int 순열을 생성, train/valid 데이터로 분리
indices = torch.randperm(data.size(0))
x = torch.index_select(x, dim=0, index=indice).split([train_cnt, valid_cnt], dim=0)
y = torch.index_select(y, dim=0, index=indice).split([train_cnt, valid_cnt], dim=0)

pytorch에서 제공하는 Dataset과 DataLoader를 import합니다.

Dataset 클래스를 상속하여 IrisDataset 클래스를 생성하고 data, label을 입력합니다.
IrisDataset을 DataLoader에 입력하여 데이터를 batch_size 만큼 데이터를 분리하여 train_loader에 넣어줍니다.

iris 데이터셋은 총 150개 데이터입니다. 이것을 train/valid 형태로 8:2로 분리했기 때문에 train 120, valid 30개의 데이터로 각각 생성됐습니다. 이렇게 생성된 데이터를 한번에 훈련하지 않고 일정 갯수로 데이터를 묶어 줍니다. 사실 소규모의 데이터 셋에서는 이러한 batch 작업이 불필요합니다. 그러나 많은 수의 데이터를 훈련하기 위해서는 이러한 작업이 필수입니다. 이번 예제에서는 30개 단위로 묶음을 만들어보겠습니다.

파이토치에서는 이러한 묶음 작업을 할 수 있는 DataLoader라는 편리한 패키지를 제공합니다. 이러한 과정을 통해서 120개의 데이터가 30개식 4묶음으로 train_loader에 저장되게 됩니다.

from torch.utils.data import Dataset, DataLoader

# Dataset 상속
class IrisDataset(Dataset):
    
    def __init__(self, data, labels):
        super().__init__()
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

# DataLoader
train_loader = DataLoader(dataset=IrisDataset(x[0],y[0]), batch_size=config['batch_size'], shuffle=True)
valid_loader = DataLoader(dataset=IrisDataset(x[1],y[1]), batch_size=config['batch_size'], shuffle=False)

참고로 data, train, validate, bacth_size, epoch을 이해하기 위해 예를 들어보면…
선생님이 학생들의 학력 수준을 알아보기 위해서 100문제를 만들었습니다. 선생님은 학생들에게 100문제 중에서 80 문제를 풀어보면서 수학적 원리를 설명합니다. 그러나 한번에 80문제를 풀기 어려우니 20문제씩 1~4교시 동안 풀어보게 합니다. 한번만 문제를 풀어보는 것보다는 같은 문제를 반복해서 풀어보는 것이 효과적이기 때문에 5~8교시 다시 문제를 풀어봅니다.

이제 학생들은 80문제를 20문제씩 나눠서 2번에 걸쳐 풀어본것이 됩니다. 만약 시간적 여유가 있다면 2번이 아니라 3번, 4번 풀어본다면 아마도 더 학습이 잘되겠죠.

이제 학생들이 수학원리를 잘 이해했는지 테스트해보기 위해서 남겨둔 20문제를 풀어보게 합니다. 그리고 20개의 문제를 얼마나 많은 학생이 맞췄는지를 계산해봅니다.

이러한 과정은 보통의 학습에서 매우 일반적인 방법입니다. 이제 생각해보면 100문제가 data, 80문제가 train_data, 20문제가 valid_data, 80문제를 20문제씩 나눠서 4묶음을 만드는 과정 batch, 같은 문제를 총 2회 풀어봄 epoch 이것이 지금까지의 과정에서 사용했던 용어를 정의한 것입니다.

즉, train_loader는 120개의 데이터가 30개씩 4묶음으로 되어 있는 것이 됩니다. valid_loader는 30개의 데이터가 30개씩 1묶음이 되겠네요.

자, 이제 모델을 간단히 구성합니다. 학습을 위한 모델이라기 보다는 간단히 테스트하기 위한 것임으로 간단한 모델을 만들어보겠습니다.

예측 데이터는 붓꽃의 꽃받침의 길이와 너비, 꽃잎의 길이와 너비에 따라 3종류 중 하나로 예측하는 것임으로 최종 아웃풋의 형태는 3입니다. 그리고 해당 데이터를 확률 값으로 나타내기 위하여 softmax_classification을 활용합니다.

# model 생성
model = nn.Sequential(
    nn.Linear(4,3)
)

optimizer = optim.Adam(model.parameters())

from copy import deepcopy
lowest_loss = np.inf
best_model = None
lowest_epoch = np.inf

copy 패키지로부터 deepcopy를 import합니다. 이것은 이번에 데이터를 만드는 과정과 직접적인 관련이 없기 때문에 간단히만 설명하면 객체의 모든 내용을 복사해서 새로운 하나의 객체를 만드는 것을 deep copy라고 합니다. 반대의 개념은 shallow copy 입니다.

이제 학습을 시작합니다. 이 모델은 2개의 for loop으로 되어 있습니다. 가장 먼저 나오는 for loop은 epoch에 대한 정의로 train data를 총 몇번 학습하는가에 대한 정의입니다. 다음에 나오는 또 하나의 for loop은 학습 데이터를 몇개로 나눠서 학습할 것인가 즉, batch에 대한 문제입니다.

1번 학습이 끝나면 학습의 loss를 계산해봅니다. loss는 정답과의 차이를 의미하는 것으로 작으면 작을 수록 학습이 잘됐다는 의미입니다. 한번 학습이 끝나면 valid data를 실행해봅니다. 그리고 valid에서 나온 loss와 train에서 나온 loss를 비교해보고 valid의 loss가 더 좋을 때에 해당 학습에 사용한 모델을 deepcopy해서 저장합니다.

그 이유는 무조건 학습을 오래 한다고 해서 좋은 결과가 나오는 것이 아니고 어느 순간에 학습이 정체되거나 과적합 되는 일이 있기 때문에 가장 좋은 모델을 저장하는 것입니다.

train_history, valid_history = [], []

for i in range(config['n_epochs']+1):
    model.train()
    
    train_loss, valid_loss = 0, 0
    y_hat = []
    
    # train_batch start
    for x_i, y_i in train_loader:
        y_hat_i = model(x_i)
        loss = F.cross_entropy(y_hat_i, y_i.long().squeeze())
        
        optimizer.zero_grad()
        loss.backward()

        optimizer.step()        
        train_loss += float(loss) # This is very important to prevent memory leak.

    train_loss = train_loss / len(train_loader)
    
    model.eval()
    with torch.no_grad():
        valid_loss = 0
        
        for x_i, y_i in valid_loader:
            y_hat_i = model(x_i)
            loss = F.cross_entropy(y_hat_i, y_i.long().squeeze())
            
            valid_loss += float(loss)
            
            y_hat += [y_hat_i]
            
    valid_loss = valid_loss / len(valid_loader)
    
    train_history.append(train_loss)
    valid_history.append(valid_loss)
    
    if i % config['print_interval'] == 0:
        print('Epoch %d: train loss=%.4e  valid_loss=%.4e  lowest_loss=%.4e' % (i, train_loss, valid_loss, lowest_loss))
        
    if valid_loss <= lowest_loss:
        lowest_loss = valid_loss
        lowest_epoch = i
        best_model = deepcopy(model.state_dict())
        
    model.load_state_dict(best_model)

이제 학습이 잘됐는지 아래와 같은 방법으로 train_loss와 valid_loss를 표시해봅니다.

import matplotlib.pyplot as plt

fig, loss_ax = plt.subplots()

loss_ax.plot(train_history, 'y', label='train loss')
loss_ax.plot(valid_history, 'r', label='val loss')

loss_ax.set_xlabel('epoch')
loss_ax.set_ylabel('loss')

loss_ax.legend(loc='upper left')

plt.show()

사실 이 예제는 torch의 Dataset과 DataLoader를 사용하는 방법에 대한 예제였는데 이것저것 설명하다 보니 글이 길어졌습니다.

여기서 중요한 것은 Dataset을 만들고 DataLoader를 통해서 학습에 사용하는 방법에 대한 내용이 중요하니 예제 코드를 활용해서 직접 테스트해보시기 바랍니다.