RNN Time-Series 예측(2)

본 예제는 모두를 위한 딥러닝 시즌2의 데이터(data-02-stock_daily.csv)와 모델을 제외한 소스 코드를 참고했습니다.

# Reference
# 모두를 위한 딥러닝 시즌 2 - PyTorch
# Lab-11-4 RNN timeseries

필요한 라이브러리를 임포트 합니다.

import torch
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler

모델에 사용할 파라메터를 셋팅해줍니다. seq_length는 입력 시퀀스 정보, data_dim은 입력 데이터의 차원, hidden_dim은 출력 데이터의 차원, output_dim은 최종 예측 데이터의 차원 입니다.

# hyper parameters
seq_length = 7
data_dim = 5
hidden_dim = 30
output_dim = 1
learning_rate = 0.01
iterations = 501

해당 데이터는 주가 데이터로 개장 포인트, 가장 높은 포인트, 가장 낮은 포인트, 폐장시 포인트와 거래량으로 5개 변수 데이터를 가지고 있습니다. 본 예측은 학습 데이터를 7일로 분리하여 다음 폐장 포인트를 예측하는 모델입니다.

# load data
xy = np.loadtxt("data-02-stock_daily.csv", delimiter=",")
xy = xy[::-1]  # reverse order

학습용 데이터와 테스트용 데이터를 분리하는 내용입니다. 학습 데이터와 검증 데이터는 7:3 비율로 분리합니다.

# split train-test set
train_size = int(len(xy) * 0.7)
train_set = xy[0:train_size]
test_set = xy[train_size - seq_length:]
train_set.shape, test_set.shape

입력한 데이터를 학습에 사용하기 위해서는 정규화 과정이 필요합니다.
정규화를 왜 해야 하는지에 대해서는 아래의 그래프를 참고하시기 바랍니다.

해당 데이터셋은 총 5개로 구성되어 있습니다. 그중 4개의 데이터는 단위가 비슷하기 때문에 그래프를 통해 보면 유사한 형태를 보이고 있습니다. 그러나 Volume 이라는 컬럼을 같이 표현하고자 한다면 입력 단위의 차이가 매우 크기 때문에 아래의 그림과 같이 나머지 데이터는 식별이 불가능하게 됩니다.

  • 단위의 차이로 인해서 Volume 데이터를 시각화 하는데 한계가 있음

그러나 MinMaxScaler를 활용하여 정규화 하게 되면 모든 데이터를 0,1의 범위 안에 표현할 수 있기 때문에 모든 그래프를 한번에 그릴 수 있습니다. 그리고 이렇게 표현한 데이터는 다시 원래 단위의 형태로 복원 할 수 있습니다.

학습에서 MinMaxScaler를 사용하는 이유는 다차원 데이터값을 비교 분석하기 쉽게 만들어주고 자료의 오버플로우나 언더플로우를 방지해주고 최적과 과정에서 안정성 및 수렴 속도를 향상 시키기 위함입니다.

MinMaxScaler 수행후 데이터 시각화
scaler = MinMaxScaler()
scaler.fit(train_set)
print(scaler.n_samples_seen_, scaler.data_min_, scaler.data_max_, scaler.feature_range)
train_set = scaler.transform(train_set)
scaler.fit(test_set)
print(scaler.n_samples_seen_, scaler.data_min_, scaler.data_max_, scaler.feature_range)
test_set = scaler.transform(test_set)

build_dataset 함수는 RNN 학습을 위해서 입력 텐서를 만들어 주는 부분입니다.
time_series[0:7,], time_series[7,[-1]] 형식으로 되어 있습니다.

# make dataset to input
def build_dataset(time_series, seq_length):
    dataX = []
    dataY = []
    for i in range(0, len(time_series) - seq_length):
        _x = time_series[i:i + seq_length, :]
        _y = time_series[i + seq_length, [-1]]  # Next close price
        #print(_x, "->", _y)
        dataX.append(_x)
        dataY.append(_y)
    return np.array(dataX), np.array(dataY)
# make train-test dataset to input
trainX, trainY = build_dataset(train_set, seq_length)
testX, testY = build_dataset(test_set, seq_length)
print(trainX.shape, trainY.shape)

# convert to tensor
trainX_tensor = torch.FloatTensor(trainX)
trainY_tensor = torch.FloatTensor(trainY)

testX_tensor = torch.FloatTensor(testX)
testY_tensor = torch.FloatTensor(testY)

이제 학습 데이터를 통해서 데이터를 학습하는 모델을 만듭니다. 본 예제에서는 BiLSTM 방식으로 4개의 층을 쌓아 올린 형태입니다.

위의 그림은 해당 데이터에 대한 입력 데이터 형태와 입력와 출력의 형태는 아래 그림과 같습니다. 일단 벡터는 (n,7,5) -> (n,7,30) 형태로 나옵니다. 그러나 BiLSTM 모델을 사용했기 때문에 마지막 output은 30*2의 형태가 됩니다.

class Net(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, layers):
        super(Net, self).__init__()
        self.rnn = torch.nn.LSTM(input_dim, hidden_dim, num_layers=layers, batch_first=True, bidirectional=True)
        self.layers = torch.nn.Sequential(
            torch.nn.Linear(hidden_dim*2, 20),
            torch.nn.Linear(20, 10),
            torch.nn.Linear(10, output_dim)
        )

    def forward(self, x):
        x, (hidden, cell) = self.rnn(x)
        x = self.layers(x[:, -1, ])
        return x

net = Net(data_dim, hidden_dim, output_dim, 4)

이제 학습을 수행합니다.

# loss & optimizer setting
criterion = torch.nn.MSELoss()
optimizer = optim.Adam(net.parameters(), lr=learning_rate)

# start training
for i in range(iterations):
    outputs = net(trainX_tensor)
    loss = criterion(outputs, trainY_tensor)
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    if i%50 == 0:
        print(i, loss.item())

학습이 완료된 후 테스트를 수행합니다.

net.eval()
predict_data = net(testX_tensor).data.numpy()
plt.grid(True)
plt.autoscale(axis='x', tight=True)
plt.plot(testY)
plt.plot(predict_data, color='red')
plt.legend(['original', 'prediction'])
plt.show()

학습된 결과와 원본 데이터를 통해 비교해보면 예측이 비교적 잘됐음을 알 수 있습니다.

RNN Time-Series 예측(1)

해당 예측 모델의 원본 링크는 아래와 같습니다.
본 예제는 아래에 구현된 링크와 동일한 데이터를 사용했고 RNN의 모델과 학습 부분의 로직을 수정했습니다.

https://stackabuse.com/time-series-prediction-using-lstm-with-pytorch-in-python/

필요한 라이브러리를 임포트 합니다.

import torch
import torch.nn as nn
import seaborn as sns
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

seaborn에 샘플 데이터 중에서 flights 정보를 로드합니다.
seaborn 패키지에는 flights 외에도 다양한 샘플 데이터가 있습니다.

sns.get_dataset_names()
['anscombe', 'attention', 'brain_networks', 'car_crashes','diamonds', 'dots', 'exercise', 'flights','fmri', 'gammas', 'geyser', 'iris', 'mpg','penguins', 'planets', 'tips', 'titanic']

flights 데이터를 DataFrame 형태로 입력 받아서 상위 5개 데이터를 출력해봅니다. 해당 데이터프레임은 year, month, passengers 컬럼이 있습니다. 데이터의 형식은 해당 년도에 월별로 승객의 수가 등록되어 있습니다. 데이터는 1949~1960년까지의 143개 데이터입니다. 참고로 df.head()로는 상위 5개 데이터를 df.tail()로는 하위 5개의 데이터를 출력합니다.

df = sns.load_dataset('flights')
df.head()
idxyearmonthpassengers
01949January112
11949February118
21949March132
31949April129
41949May121
샘플 데이터

데이터셋의 결측치를 다음과 같이 확인해보고 예측에 사용할 컬럼인 passengers가 어떻게 변화하는지 추이 정보를 출력합니다.

학습에 앞서 훈련용 데이터와 검증용 데이터를 분리하겠습니다.
학습용 데이터는 59~60년도 데이터를 제외한 나머지 데이터입니다. 해당 모델을 통해서 2개년도의 승객 추이를 예측해보겠습니다.

data = df['passengers'].values.astype(float)
valid_data_size = 24 
train_data = data[:-valid_data_size]
valid_data = data[-valid_data_size:]

데이터를 분리한 후 MinMaxScaler를 통해서 데이터를 0~1 사이의 값으로 변환합니다.
스케일링 작업을 통해 다차원 데이터의 값들을 비교 분석하기 쉽게 만들어주고 자료의 오버플로우나 언더플로우를 방지해주고 최적화 과정에서 안정성 및 수렴 속도를 향상시켜줍니다.

from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
train_data_norm = scaler.fit_transform(train_data.reshape(-1,1))

학습용 데이터는 아래와 같은 방법으로 생성합니다. 학습 데이터셋은 1월-12월 데이터를 통해 다음해 1월의 승객 수를 예측하고 2월-다음해 1월 데이터를 통해 2월의 승객 수를 예측하는 형태로 구성되어 있습니다.

sequence_length = 12 # monthly
def make_batch(input_data, sl):
    train_x = []
    train_y = []
    L = len(input_data)
    for i in range(L-sl):
        train_seq = input_data[i:i+sl]
        train_label = input_data[i+sl:i+sl+1]
        train_x.append(train_seq)
        train_y.append(train_label)
    return train_x, train_y

Array 형태의 데이터를 파이토치 텐서로 변환해줍니다.

train_x, train_y = make_batch(train_data_norm, sequence_length)
tensor_x = torch.Tensor(train_x)
tensor_y = torch.Tensor(train_y)

학습을 위한 데이터의 최종 형태는 아래와 같은 형태가 됩니다.
RNN 입력 자료의 특성상 배치사이즈, 타임 시퀀스, 입력 벡터의 형태를 가지게 됩니다.

tensor_x.size(), tensor_y.size()
output : (torch.Size([108, 12, 1]), torch.Size([108, 1, 1]))

이제 학습을 위한 모델 클래스를 만듭니다. 모델은 LSTM을 사용합니다.
모델의 초기화를 위해서 입력 벡터, 입력 시퀀스 정보를 각각 설정합니다. LSTM의 출력 벡터는 100으로 주었고 단층이 아닌 4개 층으로 구성했습니다.

아래와 같은 모델을 통해 구성하면 입력 시퀀스가 12이기 때문에 최종 LSTM 출력의 벡터는 (N, 12, 100)의 형태로 만들어집니다. 해당 모델에서는 12개의 시퀀스에서 나오는 데이를 사용하지 않고 마지막 스텝에서 나오는 시퀀스 정보만 사용하게 되기 때문에 RNN의 모델 중에서 Many-to-One에 해당한다고 할 수 있습니다.

class RNN(nn.Module):
    
    def __init__(self):
        super().__init__()
        self.input_vector = 1
        self.sequence_length = 12
        self.output_vector = 100
        self.num_layers = 4
        
        self.lstm = nn.LSTM(input_size=self.input_vector, hidden_size=self.output_vector, num_layers=self.num_layers, batch_first=True)
        self.linear = nn.Sequential(
            nn.Linear(self.output_vector, 50),
            nn.Linear(50, 30),
            nn.Linear(30, 10),
            nn.Linear(10,1)
        )
        
    def forward(self, x):
        output, _ = self.lstm(x) #(hidden, cell) 데이터는 사용하지 않음
        return self.linear(output[:,-1,:])

model = RNN()
RNN(
  (lstm): LSTM(1, 100, num_layers=4, batch_first=True)
  (linear): Sequential(
    (0): Linear(in_features=100, out_features=50, bias=True)
    (1): Linear(in_features=50, out_features=30, bias=True)
    (2): Linear(in_features=30, out_features=10, bias=True)
    (3): Linear(in_features=10, out_features=1, bias=True)
  )
)

해당 모델의 학습을 수행합니다.

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()

epochs = 501

for i in range(epochs):
    model.train()
    
    output = model(tensor_x)
    loss = criterion(output, tensor_y.view(-1,1))
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    if i%25 == 0:
        print('Epoch {}, Loss {:.5f}'.format(i, loss.item()))

학습이 완료되면 해당 모델이 원본 데이터와 비교하여 얼마나 추이를 잘 나타내는지 확인하는 과정이 필요합니다. 이 과정을 위해서 사전에 분리한 valid 데이터를 사용합니다.

valid_data_norm = train_data_norm[-valid_data_size:]
valid_x, _ = make_batch(valid_data_norm, sequence_length)

valid 데이터 역시 학습과 동일한 과정을 수행합니다.
다만 학습이 일어나는 것은 아니기 때문에 loss를 계산하거나 역전파와 같은 프로세스는 수행하지 않습니다.
또한 해당 데이터는 0,1 사이 값으로 변환한 데이터이기 때문에 이 값을 다시 scaler를 통해 원래 값의 형태로 변경해줍니다.

model.eval()
with torch.no_grad():
    valid_tensor = torch.Tensor(valid_x)
    predict = model(valid_tensor)
predict = predict.data.numpy()
actual_predictions = scaler.inverse_transform(predict)

이렇게 변경한 데이터를 통해서 원본 데이터와 그래프를 그려봅니다. blue 라인이 원본 데이터이고 red 라인이 예측한 데이터입니다.

x = np.arange(120,132,1)
plt.title('Month vs Passenger')
plt.ylabel('Total Passengers')
plt.grid(True)
plt.autoscale(axis='x', tight=True)
plt.plot(df['passengers'][0:132])
plt.plot(x,actual_predictions)
plt.show()
sequence_length=12

참고로 아래는 Sequence_Length=6으로 예측한 결과 입니다.

Sequence_length=6

PyTorch LSTM 예제

이 예제는 파이토치를 활용해서 LSTM을 구현하는 예제입니다.
LSTM은 RNN(Recurrent Neural Network)의 하나로 좋은 성능을 발휘하는 모델입니다. 파이토치는 LSTM를 직관적으로 구현할 수 있도록 하는 좋은 인공지능 프레임워크입니다.

본 예제는 현재 문장을 주고 다음 문장을 예측하는 알고리즘입니다. 이러한 예제들을 활용하면 다양한 시계열 데이터를 다룰 수 있습니다. 시계열 데이터라 함은 데이터가 어떤 시간의 순서를 가진다는 것을 의미합니다.

예를 들어 한 문장의 다양한 단어들은 비록 같은 단어일지라도 앞에 오느냐 뒤에 오느냐에 따라서 그 의미가 달라지는 경우가 있습니다. 그렇기 때문에 현재 문장을 유추하기 위해서는 앞에 어떤 단어가 있는지를 알아내는 것이 중요합니다.
RNN은 이러한 예측을 가능하게 해줍니다.

RNN은 두개의 Linear 모델이 합쳐진 하나의 Activation Function입니다. 아래 그림이 이를 잘 설명하고 있습니다.

https://pytorch.org/tutorials/intermediate/char_rnn_classification_tutorial.html
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

입력 문장은 “In the beginning God created the heavens and the earth”라는 문장입니다. x 데이터는 입력 문장이고 x 데이터를 통해 예측한 결과를 비교하기 위해서 정답 데이터셋 y를 만듭니다. y 데이터는 x 데이터의 첫번째 입력 ‘I’의 경우 ‘n’를 예측하고 ‘n’가 입력된 경우 ‘ ‘(공백) 를 시스템이 예측할 수 있도록 하기 위함입니다.

sentence = 'In the beginning God created the heavens and the earth'
x = sentence[:-1]
y = sentence[1:]

char_set = list(set(sentence))
input_size = len(char_set)
hidden_size = len(char_set)

index2char = {i:c for i, c in enumerate(char_set)}
char2index = {c:i for i, c in enumerate(char_set)}

index2char과 char2index는 각각 문자를 문자 자체로 입력하지 않고 one-hot의 형태로 입력하기 위해서 만들어준 python dict입니다.
char2index를 출력하면 아래와 같은 형태가 됩니다.

{‘s’: 0, ‘ ‘: 1, ‘t’: 2, ‘I’: 3, ‘o’: 4, ‘h’: 5, ‘e’: 6, ‘g’: 7, ‘d’: 8, ‘c’: 9, ‘b’: 10, ‘i’: 11, ‘n’: 12, ‘G’: 13, ‘v’: 14, ‘a’: 15, ‘r’: 16}

one_hot = []
for i, tkn in enumerate(x):
    one_hot.append(np.eye(len(char_set), dtype='int')[char2index[tkn]])

x_train = torch.Tensor(one_hot)
x_train = x_train.view(1,len(x),-1)

입력된 sentence는 그대로 입력값으로 사용하지 않고 one-hot 형태로 변경해서 최종 x_train 형태의 데이터를 만듭니다. 문장을 one-hot 형태로 만들기 위해서 numpy의 eye함수를 사용합니다.

print(x_train)

tensor([[[0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0.],
         [0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.] ...

x_train 데이터를 출력하면 위와 같은 형태가 만들어집니다. 위의 데이터셋은 [1, 10, 8] 형태의 3차원 데이터셋입니다. NLP 데이터를 3차원 형태의 입력 데이터셋을 가집니다. 첫번째 차원은 문장의 갯수, 두번째는 단어의 갯수, 세번째는 단어의 입력 차원입니다.

참고로 CNN의 경우에는 4차원 형태의 데이터를 가집니다.
다음으로 아래와 같이 y_data를 만들어줍니다.

# y label
y_data = [char2index[c] for c in y]
y_data = torch.Tensor(y_data)

이제 모델을 만들 차례입니다.
파이토치는 nn.Module을 사용해서 Module을 만들 수 있습니다.

class RNN(nn.Module):
    
    # (batch_size, n, ) torch already know, you don't need to let torch know
    def __init__(self,input_size, hidden_size):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        
        self.rnn = nn.LSTM(
            input_size = input_size, 
            hidden_size = hidden_size, 
            num_layers = 4, 
            batch_first = True,
            bidirectional = True
        )
        
        self.layers = nn.Sequential(
            nn.ReLU(),
            nn.Linear(input_size*2, hidden_size),
        )
        
    def forward(self, x):
        y,_ = self.rnn(x)
        y = self.layers(y)
        return y
    
model = RNN(input_size, hidden_size)
model

RNN 클래스는 init, forward 함수로 구성됩니다. init함수는 LSTM 모델을 선언하는 부분과 softamx 함수를 선언하는 두부분이 있습니다. LSTM 함수는 두개의 인자값을 기본으로 받습니다. input_size, hidden_size입니다. input_size는 입력 벡터의 크기이며 hidden_size는 출력 벡터의 크기입니다. 본 예제에서는 입력 벡터와 출력 벡터가 크기가 같습니다. 배치 사이즈나 시퀀스 사이즈는 파이토치에서 자동으로 계산하기 때문에 입력할 필요가 없습니다.
num_layers는 RNN의 층을 의미합니다. 본 예제는 4개의 층으로 구성했기 때문에 num_layers를 4로 설정했습니다. 그리고 bidirectional을 True로 했기 때문에 마지막 output의 형태는 input_size*2의 형태가 됩니다.
Linear 레이어는 input_size의 차원을 줄이기 위해서 선언합니다.

또 모델을 만들면서 중요한 것은 batch_first를 True로 해줘야 한다는 것입니다. 그렇지 않으면 time-step(=sequence_length), batch_size, input_vector 의 형태가 됩니다.

선언한 모델의 정보를 출력해보면 다음과 같습니다.
RNN(
(rnn): LSTM(8, 8, num_layers=4, bidirectional=True)
(layers): Sequential(
(0): ReLU()
(1): Linear(in_features=16, out_features=8, bias=True)
)
)

# loss & optimizer setting
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())

# start training
for i in range(5000):
    model.train()
    outputs = model(x_train)
    loss = criterion(outputs.view(-1, input_size), y_data.view(-1).long())
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if i%500 == 0:
        result = outputs.data.numpy().argmax(axis=2)
        result_str = ''.join([char_set[c] for c in np.squeeze(result)])
        print(i, "loss: ", loss.item(), "\nprediction: ", result, "\ntrue Y: ", y_data, "\nprediction str: ", result_str,"\n")

위의 코드와 같이 학습을 수행합니다.
x_train 데이터를 입력 받아 나온 결과를 y_data와 비교해서 loss를 계산하고 이 loss 값을 Back-propagation을 수행하고 Gradient를 초기화하는 과정을 반복합니다.

5000 회 학습할 경우 다음과 같이 loss가 내려가는 것을 확인 할 수 있습니다.

실제로 데이터를 돌려보면 약 1500번 정도 학습을 완료하면 입력 단어를 통해서 정확히 다음 단어를 예측하는 것을 확인할 수 있습니다.

1500 loss: 0.31987816095352173
prediction: [[12 1 2 5 6 1 10 6 7 11 12 12 11 12 7 1 13 4 8 1 9 16 6 15
2 6 8 1 2 5 6 1 5 6 15 14 6 12 0 1 15 12 8 1 2 5 6 1
6 15 16 2 5]]
true Y: tensor([12., 1., 2., 5., 6., 1., 10., 6., 7., 11., 12., 12., 11., 12.,
7., 1., 13., 4., 8., 1., 9., 16., 6., 15., 2., 6., 8., 1.,
2., 5., 6., 1., 5., 6., 15., 14., 6., 12., 0., 1., 15., 12.,
8., 1., 2., 5., 6., 1., 6., 15., 16., 2., 5.])
prediction str: n the beginning God created the heavens and the earth