객체지향의 역사와 이해

초기의 컴퓨터가 나타났을 때에 연산은 주로 배선으로 만들어 졌습니다. 이런 방식은 알고리즘의 수정이나 변경이 어려웠기 때문에 당연히 많은 문제점이 나타났고 이에 대한 개선의 노력이 이어졌습니다. 그러다가 폰 노이만(Von Neumann architecture)에 의해서 현대적인 컴퓨터 구조가 제안되면서 하드웨어, 소프트웨어로 구분되기 시작했습니다. 이후 1950년 부터 프로그램 언어가 등장하기 시작했습니다.

wikipedia.org

이후 연구자들은 소프트웨어 공학이라는 소프트웨어의 개발, 운용, 유지보수의 생명 주기 전반을 체계적으로 다르는 학물을 발전시켰습니다.
최초의 소프트웨어 공학용어를 사용한 해는 1968년으로 알려져 있습니다. 이 당시는 소프트웨어의 개발 속도가 하드웨어 개발 속도를 따라가지 못해 사용자들의 요구사항을 처리 할 수 없는 문제들이 발생되는 소프트웨어 위기(Software Crisis) 론이 등장하는 시기였습니다.
이러한 시대적인 요구사항 속에서 연구자들은 자연스럽게 이 위기를 어떻게 극복할 것인가에 대하여 그 해결책을 찾기 위해 고민했습니다.

객체지향언어도 이 당시에 최초로 등장했습니다.
최초의 객체지향 언어는 시뮬라67이었습니다. 비록 이 당시에 큰 주목을 받지 못했지만 향후 객체지향언어의 발전의 큰 영향을 주었습니다. 이후 스몰토크, 에이다 같은 프로그램들이 등장하면서 객체지향에 대한 연구가 활발하게 이뤄졌습니다.

객체지향에 대한 필요는 기존 언어의 한계에서 비롯되었다고 할 수 있습니다. 기존에는 절차식 프로그래밍(Procedural Programming)은 하위 프로그램, 서브 루틴, 메서드, 함수 등의 용어로 활용되는 프로시저를 호출하며 프로그램을 실행하는 것이었습니다.
어려운 말로 들리지만 간단히 말하면 수행되어야 할 코드를 연속적으로 정의하는 것이라고 할 수 있다.

비교적 프로그램의 구조가 간단하기 때문에 많이 사용되었지만 대규모 개발에서 생산성이 좋지 않고 프로그램의 유지보수가 좋지 않았기 때문에 여러가지 많은 문제들이 발생했습니다.

객체지향은 이러한 환경에서 출발했기 때문에 자연스럽게 프로그램 코드의 재사용성을 높여 생산성을 향상 할 수 있을까 또 어떻게 하면 프로그램의 유지보수나 기능 개선에 드는 시간과 비용을 절약할 수 있을까라는 문제 해결에 주안점을 두게 되었습니다.

그 결과 클래스(Class), 객체(Object), 메서드(Method) 를 기본 구성요소로 추상화, 상속, 다형성 등을 특징으로 가진 객체지향 언어가 등장했고 강한 응집력(Strong Cohesion)과 약한 결합력(Weak Coupling)을 통해서 기존 언어의 문제점들을 극복하고 1995년 자바가 등장한 이후 가장 강력한 언어가 되었습니다.

객체지향언어는 1905년 이후 자바가 등장하면서 많은 대중화를 이뤄냈고 자바(JAVA)는 객체지향이라는 등식을 만들어 냈습니다.

그렇다면 객체지향이란 무엇인가?
이 말은 객체와 지향이라는 두 단어의 조합으로 이뤄져있는 단어입니다.

객체(Object)는 이 세상의 존재하는 모든 것들이라고 할 수 있습니다. 그리고 지향한다는 것은 방향성을 의미한고 할 수 있습니다. 즉 객체지향이라는 것은 세상에 존재하는 객체가 어떻게 동작하고 구성되어 있는가의 원리를 프로그램으로 도입한 것이라고 할 수 있겠습니다.
그러나 객체지향은 여전히 절차적 언어의 특징을 가지고 있으며 기존의 다양한 제어문, 반복문 등을 사용합니다. 다만 기존의 개발방식에 객체라는 지향점을 코딩의 패러다임을 변화한 것이라고 생각하시면 됩니다.

예를 들어 인간이라는 캐릭터를 컴퓨터 상에 구현한다고 생각해봅니다.
(* 이것은 어떤 철학이나 의학 등의 개념이 아닌 프로그램의 예로 든것입니다.)

인간 = Attribute + Method 로 정의할 수 있습니다.
Attribute는 인간을 표현하는 여러가지 속성들 즉, 눈, 코, 입, 귀, 다리, 팔, 키, 몸무게 등이 있고 Method는 프로그램 안에서의 서다, 걷다, 눕다 등 인간의 동작이나 행위를 규정한다고 할 수 있습니다.
단, 프로그램 안에 모든 속성을 다 객체에 담을 필요는 없고 만들고자 하는 프로그램에서 목적하는 몇가지 속성과 메서드를 정의하면 됩니다. 본 예제에서는 이름과, 키, 몸무게 정도만 사용해서 human이라는 추상클래스(Attribute + Method)로 만들었습니다.

모든 인간 캐릭터는 추상클래스를 상속하게 됩니다. 그렇기 때문에 추상클래스는 이를 상속하는 객체의 대표성을 띄게 됩니다.

package ch01;

public abstract class Human {

	private String name;
	private int height;
	private int weight;
	
	public Human(String name, int height, int weight) {
		this.name = name;
		this.height = height;
		this.weight = weight;
	}
	
	public abstract void run();
	public abstract void jump();
	public abstract void walk();
	
	public void getFeature() {
		System.out.println(String.format("%s, %d, %d", name, height, weight));
	}
}

사람은 남자와 여자로 나눌 수 있기 때문에 인간의 특징 중에 남자의 특징적 요소를 다음과 같이 정의했습니다.

package ch01;

public interface Man {
	public void work();
	public void fight();
}

만약 어떤 클래스가 Human 클래스와 Man 인터페이스를 상속 받는 다면 해당 클래스는 이름, 키, 몸무게, 달리고, 뛰고, 걷고, 싸우고, 일하는 속성을 공통으로 받게 됩니다. 이것을 상속이라고 합니다.

예제는 Adam이라는 클래스를 만들고 Human, Man 등의 클래스를 상속 받았습니다.

package ch01;

public class Adam extends Human implements Man {

	public Adam(String name, int height, int weight) {
		super(name, height, weight);
	}

	@Override
	public void work() { }

	@Override
	public void fight() { }

	@Override
	public void run() { }

	@Override
	public void jump() { }

	@Override
	public void walk() { }
}

이제 상속 받은 클래스를 통해서 하나의 객체를 만들었습니다. 사실 클래스는 어떤 객체를 만드는 설계도라고 할 수 있고 이 설계도를 통해서 컴퓨러 메모리 공간에 만들어 낸 것을 객체(Object)라고 합니다. (이 이름에는 약간씩 다른 의미로 부르기도 합니다.)

package ch01;

public class HumanDemo {
	public static void main(String[] args) {
		Adam adam = new Adam("아담", 180, 80);
		Eve eve = new Eve("이브", 165, 50);
		
		adam.getFeature();
		eve.getFeature();
	}
}

이런 식으로 코딩하게 되면 인간+남자 클래스를 상속 받아 각기 다른 수많은 남자 객체를 메모리 상에 구현할 수 있습니다. 마찬가지로 인간+여자 클래스를 상속 받으면 수많은 여자 객체를 메모리 상에 구현 할 수 있습니다.

이러한 프로그램 개발 방식이 객체지향입니다. 이와 같은 방법으로 Car 클래스를 정의하고 이를 상속 받아 Bus, Taxi 등의 객체를 만들 수 있고 Animal 클래스를 정의하고 이를 상속 받아서 Cat, Dog 등의 클래스를 만들어 낼 수도 있습니다.
이렇게 개발하면 코드의 재사용성을 높일 수 있고 기존의 기능을 수정하거나 새로운 기능을 추가하거나 하기가 용이합니다.
그러나 그만큼 객체지향적 설계에 많은 시간을 들여야 합니다.

객체지향 언어는 몇가지 중요한 특징을 가지고 있습니다.

추상화(Abstraction), 캡슐화(Encapsulation), 상속(Inheritance), 다형성(Polymorphism)이 그것입니다.

다음에는 이러한 요소들에 대해서 살펴보도록 하겠습니다.

시계열 데이터의 이해

시계열 데이터는 간단히 말해서 시간별로 구성된 데이터의 집합입니다.

시계열 데이터는 기록 추세, 실시간 경고 또는 예측 모델링을 위해 분석 할 수 있습니다.

시계열 데이터는 자산 또는 프로세스가 시간이 지남에 따라 어떻게 달라지는 지를 나타냅니다. 데이터에는 타임스탬프 축이 있어 시간의 순서대로 데이터가 축적되기 때문에 이러한 데이터를 활용하면 역방향으로 사건의 케이스를 분석 할 수 있고 이를 바탕으로 앞으로 일어날 일을 예측 할 수 있게 됩니다.

https://docs.microsoft.com/ko-kr/azure/architecture/data-guide/scenarios/images/time-series-insights.png

시계열 데이터의 대표적인 예로는 센서 데이터, 주가 데이터, 클릭 스트림, 애플리케이션 로그 등이 있습니다.

  • 추세를 검색하기 위해 시간에 따라 캡처한 주가 데이터
  • 서버 성능(예, CPU 사용량, I/O 부하, 메모리 사용량 및 네트워크 대역폭 사용량)
  • 보류 중인 장비 오류 및 트리거 경고 알림을 검색하는데 사용할 수 있는 산업 장비 센서
  • 주행에 대한 위험 점수 집계를 계산하기 위한 시간별 속도, 브레이크 사용 및 가속을 포함하는 자동차 원격 분석 데이터

이러한 경우에 데이터의 시간적인 순서는 매우 중요하다고 할 수 있습니다. 이벤트를 발생 순서대로 표시하는 것이 시계열 데이터의 핵심적인 특징입니다.

https://docs.microsoft.com/ko-kr/azure/architecture/data-guide/scenarios/time-series

RNN Time-Series 예측(2)

본 예제는 모두를 위한 딥러닝 시즌2의 데이터(data-02-stock_daily.csv)와 모델을 제외한 소스 코드를 참고했습니다.

# Reference
# 모두를 위한 딥러닝 시즌 2 - PyTorch
# Lab-11-4 RNN timeseries

필요한 라이브러리를 임포트 합니다.

import torch
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler

모델에 사용할 파라메터를 셋팅해줍니다. seq_length는 입력 시퀀스 정보, data_dim은 입력 데이터의 차원, hidden_dim은 출력 데이터의 차원, output_dim은 최종 예측 데이터의 차원 입니다.

# hyper parameters
seq_length = 7
data_dim = 5
hidden_dim = 30
output_dim = 1
learning_rate = 0.01
iterations = 501

해당 데이터는 주가 데이터로 개장 포인트, 가장 높은 포인트, 가장 낮은 포인트, 폐장시 포인트와 거래량으로 5개 변수 데이터를 가지고 있습니다. 본 예측은 학습 데이터를 7일로 분리하여 다음 폐장 포인트를 예측하는 모델입니다.

# load data
xy = np.loadtxt("data-02-stock_daily.csv", delimiter=",")
xy = xy[::-1]  # reverse order

학습용 데이터와 테스트용 데이터를 분리하는 내용입니다. 학습 데이터와 검증 데이터는 7:3 비율로 분리합니다.

# split train-test set
train_size = int(len(xy) * 0.7)
train_set = xy[0:train_size]
test_set = xy[train_size - seq_length:]
train_set.shape, test_set.shape

입력한 데이터를 학습에 사용하기 위해서는 정규화 과정이 필요합니다.
정규화를 왜 해야 하는지에 대해서는 아래의 그래프를 참고하시기 바랍니다.

해당 데이터셋은 총 5개로 구성되어 있습니다. 그중 4개의 데이터는 단위가 비슷하기 때문에 그래프를 통해 보면 유사한 형태를 보이고 있습니다. 그러나 Volume 이라는 컬럼을 같이 표현하고자 한다면 입력 단위의 차이가 매우 크기 때문에 아래의 그림과 같이 나머지 데이터는 식별이 불가능하게 됩니다.

  • 단위의 차이로 인해서 Volume 데이터를 시각화 하는데 한계가 있음

그러나 MinMaxScaler를 활용하여 정규화 하게 되면 모든 데이터를 0,1의 범위 안에 표현할 수 있기 때문에 모든 그래프를 한번에 그릴 수 있습니다. 그리고 이렇게 표현한 데이터는 다시 원래 단위의 형태로 복원 할 수 있습니다.

학습에서 MinMaxScaler를 사용하는 이유는 다차원 데이터값을 비교 분석하기 쉽게 만들어주고 자료의 오버플로우나 언더플로우를 방지해주고 최적과 과정에서 안정성 및 수렴 속도를 향상 시키기 위함입니다.

MinMaxScaler 수행후 데이터 시각화
scaler = MinMaxScaler()
scaler.fit(train_set)
print(scaler.n_samples_seen_, scaler.data_min_, scaler.data_max_, scaler.feature_range)
train_set = scaler.transform(train_set)
scaler.fit(test_set)
print(scaler.n_samples_seen_, scaler.data_min_, scaler.data_max_, scaler.feature_range)
test_set = scaler.transform(test_set)

build_dataset 함수는 RNN 학습을 위해서 입력 텐서를 만들어 주는 부분입니다.
time_series[0:7,], time_series[7,[-1]] 형식으로 되어 있습니다.

# make dataset to input
def build_dataset(time_series, seq_length):
    dataX = []
    dataY = []
    for i in range(0, len(time_series) - seq_length):
        _x = time_series[i:i + seq_length, :]
        _y = time_series[i + seq_length, [-1]]  # Next close price
        #print(_x, "->", _y)
        dataX.append(_x)
        dataY.append(_y)
    return np.array(dataX), np.array(dataY)
# make train-test dataset to input
trainX, trainY = build_dataset(train_set, seq_length)
testX, testY = build_dataset(test_set, seq_length)
print(trainX.shape, trainY.shape)

# convert to tensor
trainX_tensor = torch.FloatTensor(trainX)
trainY_tensor = torch.FloatTensor(trainY)

testX_tensor = torch.FloatTensor(testX)
testY_tensor = torch.FloatTensor(testY)

이제 학습 데이터를 통해서 데이터를 학습하는 모델을 만듭니다. 본 예제에서는 BiLSTM 방식으로 4개의 층을 쌓아 올린 형태입니다.

위의 그림은 해당 데이터에 대한 입력 데이터 형태와 입력와 출력의 형태는 아래 그림과 같습니다. 일단 벡터는 (n,7,5) -> (n,7,30) 형태로 나옵니다. 그러나 BiLSTM 모델을 사용했기 때문에 마지막 output은 30*2의 형태가 됩니다.

class Net(torch.nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, layers):
        super(Net, self).__init__()
        self.rnn = torch.nn.LSTM(input_dim, hidden_dim, num_layers=layers, batch_first=True, bidirectional=True)
        self.layers = torch.nn.Sequential(
            torch.nn.Linear(hidden_dim*2, 20),
            torch.nn.Linear(20, 10),
            torch.nn.Linear(10, output_dim)
        )

    def forward(self, x):
        x, (hidden, cell) = self.rnn(x)
        x = self.layers(x[:, -1, ])
        return x

net = Net(data_dim, hidden_dim, output_dim, 4)

이제 학습을 수행합니다.

# loss & optimizer setting
criterion = torch.nn.MSELoss()
optimizer = optim.Adam(net.parameters(), lr=learning_rate)

# start training
for i in range(iterations):
    outputs = net(trainX_tensor)
    loss = criterion(outputs, trainY_tensor)
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    if i%50 == 0:
        print(i, loss.item())

학습이 완료된 후 테스트를 수행합니다.

net.eval()
predict_data = net(testX_tensor).data.numpy()
plt.grid(True)
plt.autoscale(axis='x', tight=True)
plt.plot(testY)
plt.plot(predict_data, color='red')
plt.legend(['original', 'prediction'])
plt.show()

학습된 결과와 원본 데이터를 통해 비교해보면 예측이 비교적 잘됐음을 알 수 있습니다.

RNN Time-Series 예측(1)

해당 예측 모델의 원본 링크는 아래와 같습니다.
본 예제는 아래에 구현된 링크와 동일한 데이터를 사용했고 RNN의 모델과 학습 부분의 로직을 수정했습니다.

https://stackabuse.com/time-series-prediction-using-lstm-with-pytorch-in-python/

필요한 라이브러리를 임포트 합니다.

import torch
import torch.nn as nn
import seaborn as sns
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

seaborn에 샘플 데이터 중에서 flights 정보를 로드합니다.
seaborn 패키지에는 flights 외에도 다양한 샘플 데이터가 있습니다.

sns.get_dataset_names()
['anscombe', 'attention', 'brain_networks', 'car_crashes','diamonds', 'dots', 'exercise', 'flights','fmri', 'gammas', 'geyser', 'iris', 'mpg','penguins', 'planets', 'tips', 'titanic']

flights 데이터를 DataFrame 형태로 입력 받아서 상위 5개 데이터를 출력해봅니다. 해당 데이터프레임은 year, month, passengers 컬럼이 있습니다. 데이터의 형식은 해당 년도에 월별로 승객의 수가 등록되어 있습니다. 데이터는 1949~1960년까지의 143개 데이터입니다. 참고로 df.head()로는 상위 5개 데이터를 df.tail()로는 하위 5개의 데이터를 출력합니다.

df = sns.load_dataset('flights')
df.head()
idxyearmonthpassengers
01949January112
11949February118
21949March132
31949April129
41949May121
샘플 데이터

데이터셋의 결측치를 다음과 같이 확인해보고 예측에 사용할 컬럼인 passengers가 어떻게 변화하는지 추이 정보를 출력합니다.

학습에 앞서 훈련용 데이터와 검증용 데이터를 분리하겠습니다.
학습용 데이터는 59~60년도 데이터를 제외한 나머지 데이터입니다. 해당 모델을 통해서 2개년도의 승객 추이를 예측해보겠습니다.

data = df['passengers'].values.astype(float)
valid_data_size = 24 
train_data = data[:-valid_data_size]
valid_data = data[-valid_data_size:]

데이터를 분리한 후 MinMaxScaler를 통해서 데이터를 0~1 사이의 값으로 변환합니다.
스케일링 작업을 통해 다차원 데이터의 값들을 비교 분석하기 쉽게 만들어주고 자료의 오버플로우나 언더플로우를 방지해주고 최적화 과정에서 안정성 및 수렴 속도를 향상시켜줍니다.

from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
train_data_norm = scaler.fit_transform(train_data.reshape(-1,1))

학습용 데이터는 아래와 같은 방법으로 생성합니다. 학습 데이터셋은 1월-12월 데이터를 통해 다음해 1월의 승객 수를 예측하고 2월-다음해 1월 데이터를 통해 2월의 승객 수를 예측하는 형태로 구성되어 있습니다.

sequence_length = 12 # monthly
def make_batch(input_data, sl):
    train_x = []
    train_y = []
    L = len(input_data)
    for i in range(L-sl):
        train_seq = input_data[i:i+sl]
        train_label = input_data[i+sl:i+sl+1]
        train_x.append(train_seq)
        train_y.append(train_label)
    return train_x, train_y

Array 형태의 데이터를 파이토치 텐서로 변환해줍니다.

train_x, train_y = make_batch(train_data_norm, sequence_length)
tensor_x = torch.Tensor(train_x)
tensor_y = torch.Tensor(train_y)

학습을 위한 데이터의 최종 형태는 아래와 같은 형태가 됩니다.
RNN 입력 자료의 특성상 배치사이즈, 타임 시퀀스, 입력 벡터의 형태를 가지게 됩니다.

tensor_x.size(), tensor_y.size()
output : (torch.Size([108, 12, 1]), torch.Size([108, 1, 1]))

이제 학습을 위한 모델 클래스를 만듭니다. 모델은 LSTM을 사용합니다.
모델의 초기화를 위해서 입력 벡터, 입력 시퀀스 정보를 각각 설정합니다. LSTM의 출력 벡터는 100으로 주었고 단층이 아닌 4개 층으로 구성했습니다.

아래와 같은 모델을 통해 구성하면 입력 시퀀스가 12이기 때문에 최종 LSTM 출력의 벡터는 (N, 12, 100)의 형태로 만들어집니다. 해당 모델에서는 12개의 시퀀스에서 나오는 데이를 사용하지 않고 마지막 스텝에서 나오는 시퀀스 정보만 사용하게 되기 때문에 RNN의 모델 중에서 Many-to-One에 해당한다고 할 수 있습니다.

class RNN(nn.Module):
    
    def __init__(self):
        super().__init__()
        self.input_vector = 1
        self.sequence_length = 12
        self.output_vector = 100
        self.num_layers = 4
        
        self.lstm = nn.LSTM(input_size=self.input_vector, hidden_size=self.output_vector, num_layers=self.num_layers, batch_first=True)
        self.linear = nn.Sequential(
            nn.Linear(self.output_vector, 50),
            nn.Linear(50, 30),
            nn.Linear(30, 10),
            nn.Linear(10,1)
        )
        
    def forward(self, x):
        output, _ = self.lstm(x) #(hidden, cell) 데이터는 사용하지 않음
        return self.linear(output[:,-1,:])

model = RNN()
RNN(
  (lstm): LSTM(1, 100, num_layers=4, batch_first=True)
  (linear): Sequential(
    (0): Linear(in_features=100, out_features=50, bias=True)
    (1): Linear(in_features=50, out_features=30, bias=True)
    (2): Linear(in_features=30, out_features=10, bias=True)
    (3): Linear(in_features=10, out_features=1, bias=True)
  )
)

해당 모델의 학습을 수행합니다.

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()

epochs = 501

for i in range(epochs):
    model.train()
    
    output = model(tensor_x)
    loss = criterion(output, tensor_y.view(-1,1))
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    if i%25 == 0:
        print('Epoch {}, Loss {:.5f}'.format(i, loss.item()))

학습이 완료되면 해당 모델이 원본 데이터와 비교하여 얼마나 추이를 잘 나타내는지 확인하는 과정이 필요합니다. 이 과정을 위해서 사전에 분리한 valid 데이터를 사용합니다.

valid_data_norm = train_data_norm[-valid_data_size:]
valid_x, _ = make_batch(valid_data_norm, sequence_length)

valid 데이터 역시 학습과 동일한 과정을 수행합니다.
다만 학습이 일어나는 것은 아니기 때문에 loss를 계산하거나 역전파와 같은 프로세스는 수행하지 않습니다.
또한 해당 데이터는 0,1 사이 값으로 변환한 데이터이기 때문에 이 값을 다시 scaler를 통해 원래 값의 형태로 변경해줍니다.

model.eval()
with torch.no_grad():
    valid_tensor = torch.Tensor(valid_x)
    predict = model(valid_tensor)
predict = predict.data.numpy()
actual_predictions = scaler.inverse_transform(predict)

이렇게 변경한 데이터를 통해서 원본 데이터와 그래프를 그려봅니다. blue 라인이 원본 데이터이고 red 라인이 예측한 데이터입니다.

x = np.arange(120,132,1)
plt.title('Month vs Passenger')
plt.ylabel('Total Passengers')
plt.grid(True)
plt.autoscale(axis='x', tight=True)
plt.plot(df['passengers'][0:132])
plt.plot(x,actual_predictions)
plt.show()
sequence_length=12

참고로 아래는 Sequence_Length=6으로 예측한 결과 입니다.

Sequence_length=6

CNN Filter 변환 결과

입력 이미지가 CNN(Convolutional Neural Network) 필터를 통과하면서 이미지의 변화가 어떻게 되는지 알아보고자 합니다. CNN 필터는 기존의 머신러닝 기반의 Feature 추출 방식과는 달리 자동으로 Feature를 학습합니다.

https://www.researchgate.net/figure/Architecture-of-our-unsupervised-CNN-Network-contains-three-stages-each-of-which_283433254

CNN 네트워크는 위와 같은 네트워크를 통과 하며 스스로 이미지에 대한 feature 정보를 습득하게 됩니다. 본 코드는 하나의 각 필터를 통과하며 이미지가 어떻게 변화되는지에 대한 정보를 보여주는 코드입니다.

테스트를 위해 필요한 라이브러리를 import합니다.

import cv2
import torch
import numpy as np
import matplotlib.pyplot as plt

위의 이미지는 cv2.imread로 읽으면 (225, 400, 3) 형태의 shape 정보를 가집니다. 우선 데이터의 자료형을 바꾸고 정규화를 수행합니다.

또 하나 작업해야 할 것은 pytorch에서 데이터를 처리하기 편하도록 dimension을 변경하는 작업을 수행합니다. pytorch cnn에서 데이터를 읽기 위한 데이터 타입은 아래와 같습니다.

conv2d input, output data shape

conv2d의 입력 형태는 (N, C, H, W) 형태이기 때문에 이에 맞게 shape 정보를 변경하고 plt.imshow() 함수를 통해서 이미지를 표시해봅니다.

_image = cv2.imread('./lena.jpg')
_image = _image.astype(np.float32)
_image = np.multiply(_image, 1.0 / 255.0)

image = torch.from_numpy(_image)
image = image[np.newaxis, :] 
image = image.permute(0, 3, 1, 2)

plt.imshow(image[0,2,:], cmap='gray')

이미지를 출력하기 위해서 간단한 출력용 함수를 정의합니다.

def imagegrid(args, output):
    width=args[0]
    height=args[1]
    rows = args[2]
    cols = args[3]
    axes=[]
    fig=plt.figure()
    fig.set_size_inches((20,14))
    for i in range(rows*cols):
        img = output[0,i,:]
        axes.append( fig.add_subplot(rows, cols, i+1) )
        plt.imshow(img)
    fig.tight_layout()    
    plt.show()

첫번째 conv2d는 입력 데이터를 받아서 12 Channel로 출력값을 나타내는 함수입니다. 이때 kernel 사이즈를 5로 정의했습니다. conv2d를 통해서 출력되는 이미지 정보는 아래와 같습니다. 해당 이미지들은 CNN에서 생성한 필터를 통과한 이미지 들입니다. 이러한 방식을 계속하며 곡선, 직선, 대각선 등 이미지의 특징 정보를 추출하게 됩니다.

conv1 = torch.nn.Conv2d(3,12,5)
output = conv1(image)
conv1_output = output.detach().numpy()
imagegrid([100,100,3,4], conv1_output)

위와 같은 방법으로 conv2d 네트워크를 한번 더 통과해봅니다. 이번에는 24개의 이미지를 출력하게 됩니다. conv2d를 통과하면서 점점 이미지의 크기는 작아지고 원본 이미지 형태는 필터를 지나며 점점 Feature의 특징 정보가 드러나게 됩니다.

conv2 = torch.nn.Conv2d(12,24,5)
output = conv2(output)
conv2_output = output.detach().numpy()
imagegrid([100,100,6,4], conv2_output)

한번 더 conv2d를 수행해서 48개의 이미지 데이터를 만들어 냅니다.

conv3 = torch.nn.Conv2d(24,48,5)
output = conv3(output)
conv3_output = output.detach().numpy()
imagegrid([100,100,6,8], conv3_output)

이런 방식으로 여러 차례 conv2d를 통과 하면서 이미지의 특징들일 추출해 내는 과정을 반복하게 됩니다. CNN 은 마지막에 이렇게 만들어진 정보들을 Linear 모듈을 통과하면서 최종 적으로 Classification 하게 됩니다.

예를 들어 현재의 이미지가 (60,97) 사이즈라면 Linear 모듈의 입력은 48*60*97을 입력으로 받고 임의의 크기의 출력 값을 얻을 수 있습니다. 이러한 Linear 과정을 통과하면서 ReLU, BatchNorm과 같은 함수를 사용하면 학습이 더 효율적일 수 있습니다.

CNN Fashion-MNIST 테스트 (PyTorch)

Fashion-MNIST에 대한 설명은 아래 링크로 대신하겠습니다.

Fashion-MNIST is a dataset of Zalando‘s article images—consisting of a training set of 60,000 examples and a test set of 10,000 examples. Each example is a 28×28 grayscale image, associated with a label from 10 classes. We intend Fashion-MNIST to serve as a direct drop-in replacement for the original MNIST dataset for benchmarking machine learning algorithms. It shares the same image size and structure of training and testing splits.

본 예제 코드는 데이터셋을 학습해서 입력되는 이미지가 어떤 분류에 속하는지를 예측해보는 것입니다. Fashion-MNIST 데이터셋을 벡터 공간에 표시하면 위와 같은 이미지로 분류할 수 있습니다.

이제 학습을 위해 해당 데이터셋을 다운로드합니다.

# Define a transform to normalize the data
transform = transforms.Compose([transforms.ToTensor(),
                                transforms.Normalize((0.5,), (0.5,))])

# Download and load the training data
train_loader = torch.utils.data.DataLoader(datasets.FashionMNIST('../F_MNIST_data/', download=True, train=True, transform=transform), batch_size=128, shuffle=True)

# Download and load the test data
test_loader = torch.utils.data.DataLoader(datasets.FashionMNIST('../F_MNIST_data/', download=True, train=False, transform=transform), batch_size=128, shuffle=True)

다운로드한 데이터가 어떤 이미지가 있는지 살펴보기 위해서 랜덤하게 몇개의 샘플을 추출해서 표시해보겠습니다. 해당 이미지들은 10개 [‘t-shirt’, ‘trouser’, ‘pullover’, ‘press’, ‘coat’, ‘sandal’, ‘shirt’, ‘sneaker’, ‘bag’, ‘ankleboot’]로 분류할 수 있는 패션 아이템들입니다.

x_train, y_train = next(iter(train_loader))
x_valid, y_valid = next(iter(test_loader))

fig, ax = plt.subplots(5,5)
fig.set_size_inches((20,14))
for i in range(5):
    for j in range(5):
        idx = numpy.random.randint(128)
        ax[i][j].imshow(x_train[idx,0,:])
        ax[i][j].set_xlabel(label[y_train[idx].item()])
        ax[i][j].set_xticklabels([])
        ax[i][j].set_yticklabels([])

학습을 위한 모델을 선언합니다. 이전 MNIST 데이터셋을 테스트했을 때와 같은 모델을 재활용했습니다.

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        
        self.convs = nn.Sequential(
            nn.Conv2d(1, 10, kernel_size=3), # input_channel, output_channel, kernel_size
            nn.ReLU(),
            nn.BatchNorm2d(10),
            nn.Conv2d(10, 20, kernel_size=3, stride=2),
            nn.ReLU(),
            nn.BatchNorm2d(20),
            nn.Conv2d(20, 40, kernel_size=3, stride=2)
        )
        
        self.layers = nn.Sequential(
            nn.Linear(40*5*5, 500),
            nn.Dropout(p=0.2),
            nn.ReLU(),
            nn.BatchNorm1d(500),
            nn.Linear(500,250),
            nn.Linear(250,100),
            nn.Dropout(p=0.2),
            nn.ReLU(),
            nn.BatchNorm1d(100),
            nn.Linear(100,50),
            nn.Linear(50, 10),
            nn.Softmax(dim=-1)
        )

    def forward(self, x):
        x = self.convs(x)
        x = x.view(-1, 40*5*5)
        return self.layers(x)
    
cnn = Net().to(DEVICE)

이전 MNIST 코드는 하나의 mini batch 데이터만 학습했다면 이번에는 전체 데이터를 대상으로 학습을 진행합니다. 많은 학습을 거친다면 모델의 정확도가 높아지겠지만 성능을 높이는 테스트가 아니기 때문에 최소한의 학습 epcohs만 수행합니다.

optimizer = optim.Adam(cnn.parameters())
criterion = nn.CrossEntropyLoss()

hist_loss = []
hist_accr = []

epochs = 30

for epoch in range(epochs):

    for idx, (data, label) in enumerate(train_loader):
        data, label = data.to(DEVICE), label.to(DEVICE)
        output = cnn(data)
        loss = criterion(output, label)
        
        predict = torch.argmax(output, dim=-1) == label
        accuracy = predict.float().mean().item()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        hist_loss.append(loss.item())
        hist_accr.append(accuracy)

        if idx % 100 == 0:
            print('Epoch {}, idx {}, Loss : {:.5f}, Accuracy : {:.5f}'.format(epoch, idx, loss.item(), accuracy))

학습이 완료되고 학습의 진행이 어떻게 되었는지 알기 위해서 사전에 정의한 hist_loss와 hist_accr을 사용해서 시각화 해보겠습니다.

fig, ax = plt.subplots(2,1)
fig.set_size_inches((12,8))

ax[0].set_title('Loss')
ax[0].plot(hist_loss, color='red')
ax[0].set_ylabel('Loss')
ax[1].set_title('Accuracy')
ax[1].plot(hist_accr, color='blue')
ax[1].set_ylabel('Accuracy')
ax[1].set_xlabel('Epochs')

학습이 완료된 후에 테스트 데이터를 사용해서 모델의 정확도를 확인해보았고 결과 값으로 Accuracy : 0.93750를 얻었습니다.

cnn.eval()

with torch.no_grad():
    for idx, (data, label) in enumerate(test_loader):
        data, label = data.to(DEVICE), label.to(DEVICE)
        output = cnn(data)
        loss = criterion(output, label)
    
        predict = torch.argmax(output, dim=-1) == label
        accuracy =  predict.float().mean().item()
    
        print('Accuracy : {:.5f}'.format(accuracy))

CNN MNIST 테스트 (PyTorch)

CNN 알고리즘을 MNIST 데이터셋을 활용해서 테스트해봅니다.

CNN 알고리즘에 대한 다양한 많은 설명이 있으니 자세한 내용은 아래의 강의를 참고하시기 바랍니다. 비록 작은 부분의 차이들은 있을 수 있지만 본 예제 역시 인터넷에 많은 소스 코드와 다르지 않습니다.
단, 아래의 영상은 텐서플로우로 설명하는 영상이지만 본 예제는 파이토치로 구현되어 있으며 학습도 전체 데이터를 대상으로 하지 않고 첫번째 미니배치만 학습하는 것으로 작성했습니다.

필요한 라이브러리를 임포트하고 GPU 사용 설정하는 부분과 MNIST 데이터를 로드하는 부분에 대해서는 자세한 설명을 하지 않고 지나가겠습니다. GPU 설정이 필요 없는 CPU 상에서 예제를 구동하는 경우는 device 설정을 하지 않고 넘어가셔도 무방합니다.

GPU 서버가 없는 경우는 무료로 Colab을 이용하시는 것도 추천합니다.

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, datasets

import matplotlib.pyplot as plt

USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_CUDA else "cpu")

BATCH_SIZE = 128

train_loader = torch.utils.data.DataLoader(
    datasets.MNIST('../data',
                   train=True,
                   download=True,
                   transform=transforms.Compose([
                       transforms.ToTensor(),
                       transforms.Normalize((0.1307,), (0.3081,))
                   ])),
    batch_size=BATCH_SIZE, shuffle=True)
test_loader = torch.utils.data.DataLoader(
    datasets.MNIST('../data',
                   train=False, 
                   transform=transforms.Compose([
                       transforms.ToTensor(),
                       transforms.Normalize((0.1307,), (0.3081,))
                   ])),
    batch_size=BATCH_SIZE, shuffle=True)

MNIST 데이터는 DataLoader로 불러왔습니다. 데이터는 (128,1,28,28) 형태로 149개로 분리되어 있습니다. 테스트 데이터 역시 마찬가지입니다. 128은 배치사이즈, 1은 채널 사이즈, (28*28)은 이미지의 크기입니다.

본 테스트에서는 전체 149개 데이터를 모두 학습하지 않고 1세트만 301회 학습을 수행합니다. 더 높은 정확도 얻고자 하시는 분은 전체 데이터를 통해 더 많은 학습을 해보시기 바랍니다.

dataset = next(iter(train_loader)) # 학습 데이터
x_data = dataset[0] # x 데이터
y_data = dataset[1] # label 데이터

dataset = next(iter(test_loader)) # 검증 데이터
x_test = dataset[0].to(DEVICE) # x 데이터
y_test = dataset[1].to(DEVICE) # label 데이터

각 데이터 셋에 어떤 이미지가 있는지 확인해보기 위해서 아래와 같은 코드를 수행합니다. MNIST 데이터 셋은 동일한 크기의 손글씨 이미지가 들어있기 때문에 각각의 이미지를 표시해보면 0-9까지의 손글씨 이미지가 저장되어 있는 것을 확인 할 수 있습니다.
아래와 같이 0번째 배열에 숫자 5가 있는 것을 확인 할 수 있습니다.

plt.imshow(x_data[0,0,:])

참고로 CNN의 입력 데이터의 Shape은 아래의 그림과 같습니다. 입력 데이터는 4개의 차원으로 구성되어 있습니다. 가장 먼저는 Batch_Size로 해당 이미지의 갯수를 의미합니다. 다음에 나오는 것은 이미지의 Channel입니다. 3인경우는 RGB 값을 가지고 1인 경우는 대부분 단일 색상으로 표현하는 값입니다. 그리고 나오는 값은 Height, Width 값입니다.

이런 상태에서 [0,0,:]의 의미는 0번째 이미지에서 첫번째 채널의 이미지 데이터를 가지고 온다는 의미가 됩니다.

이제 학습을 위한 모델을 구성합니다. 본 모델을 크게 두부분으로 되어 있습니다. self.convs는 convolution을 수행하는 부분으로 원본 이미지에서 특징정보를 추출하는 부분입니다. 이때 중요한 것은 각 Conv2d를 수행하며 어떤 형태의 아웃풋이 나오는지 확인하는 것이 중요합니다.

예를 들어 28*28 이미지를 kernel_size 3으로 계산하면 출력되는 이미지 사이즈는 (26*26)입니다. 어떻게 이렇게 나오는지는 아래 식에서 확인하실 수 있습니다.

https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html

하지만 매번 위의 식을 통해서 계산하는 것은 좀 귀찮고 힘든 입니다. 위의 공식을 간단한 함수로 구현한 내용을 공유해드립니다.

def conv_output_shape(h_w, kernel_size=1, stride=1, pad=0, dilation=1):
    from math import floor
    if type(kernel_size) is not tuple:
        kernel_size = (kernel_size, kernel_size)
    h = floor( ((h_w[0] + (2 * pad) - ( dilation * (kernel_size[0] - 1) ) - 1 )/ stride) + 1)
    w = floor( ((h_w[1] + (2 * pad) - ( dilation * (kernel_size[1] - 1) ) - 1 )/ stride) + 1)
    return h, w

위와 같은 식을 거쳐서 self.convs 레이어의 최종 output_shape은 총 5*5 이미지 사이즈를 가진 40장의 이미지 데이터를 얻을 수 있습니다.

이렇게 얻은 데이터는 self.layers를 거치다 보면 최종 0~9까지의 숫자 정보를 얻을 수 있습니다.

PyTorch의 Conv2d 패키지는 프로그래머가 간단히 Convolution Layer를 구성할 수 있도록 해줍니다. 프로그래머는 간단히 입력 채널의 수와 출력 채널의 수 그리고 커널 사이즈와 스트라이드 정보만 맞춰주면 자동으로 이미지를 구성해줍니다.

아래의 경우는 최초 28×28 이미지를 입력하고 커널을 3으로 맞춰서 앞선 함수를 통해서 출력 shape을 보면 26×26의 이미지를 출력한다는 것을 확인 할 수 있습니다. 또 다음 레이어는 커널을 3, 스트라이드를 2로 정의하고 이전에 입력된 이미지의 크기를 입력하면 출력 이미지는 12×12로 표시되는 것을 확인 할 수 있습니다. 이런 방법으로 마지막 이미지가 출력되는 크기는 5×5의 이미지가 됩니다.

그렇게 되면 마지막의 fully-connected layer에 들어가는 값은 40×5×5의 입력 값이 됩니다. 그리고 맨 마지막까지 Linear Layer를 거치게 되면 10개의 값으로 출력되고 이에 Softmax를 취하면 0-9 중에 하나의 값을 예측하게 됩니다.

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        
        self.convs = nn.Sequential(
            nn.Conv2d(1, 10, kernel_size=3), # input_channel, output_channel, kernel_size
            nn.ReLU(),
            nn.BatchNorm2d(10),
            nn.Conv2d(10, 20, kernel_size=3, stride=2),
            nn.ReLU(),
            nn.BatchNorm2d(20),
            nn.Conv2d(20, 40, kernel_size=3, stride=2)
        )
        
        self.layers = nn.Sequential(
            nn.Linear(40*5*5, 500),
            nn.ReLU(),
            nn.BatchNorm1d(500),
            nn.Linear(500,250),
            nn.Linear(250,100),
            nn.ReLU(),
            nn.BatchNorm1d(100),
            nn.Linear(100,50),
            nn.Linear(50, 10),
            nn.Softmax(dim=-1)
        )

    def forward(self, x):
        x = self.convs(x)
        x = x.view(-1, 40*5*5)
        return self.layers(x)
    
cnn = Net().to(DEVICE)

이제 학습을 위한 모든 준비가 완료되었고 아래와 같이 학습을 수행합니다. 예제에서는 간단히 301회 학습을 수행했습니다. 학습이 진행되면서 loss와 accuracy 정보의 변화를 기록하기 위해서 list 변수를 각각 선언해줍니다.

또 100회 학습이 완료될 때마다 변화되는 loss와 accuracy 값을 화면에 출력해줍니다.

optimizer = optim.Adam(cnn.parameters())
criterion = nn.CrossEntropyLoss()

hist_loss = []
hist_accr = []

epochs = 301
for epoch in range(epochs):
    cnn.train()
    output = cnn(x_data)
    loss = criterion(output, y_data)
    
    predict = torch.argmax(output, dim=-1) == y_data
    accuracy = predict.float().mean().item()
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    hist_loss.append(loss)
    hist_accr.append(accuracy)
    
    if epoch % 100 == 0:
        print('epoch{}, {:.5f}, {:.5f}'.format(epoch, loss.item(), accuracy))

학습이 완료되고 loss와 accuracy 값을 그래프로 그려줍니다.
그래프를 보니 학습의 곡선이 완만하게 내려가고 정확도는 1에 가까운 값을 나타내어 학습이 잘이뤄지는 것을 확인할 수 있습니다.

그러나 training 데이터를 통한 학습정확도이기 때문에 검증용 데이터를 통해서 정확도 계산을 다시 할 필요가 있습니다.

fig, ax = plt.subplots(2,1)
fig.set_size_inches((12,8))

ax[0].set_title('Loss')
ax[0].plot(hist_loss, color='red')
ax[0].set_ylabel('Loss')
ax[1].set_title('Accuracy')
ax[1].plot(hist_accr, color='blue')
ax[1].set_ylabel('Accuracy')
ax[1].set_xlabel('Epochs')

검증용 데이터셋(test_dataloader)를 통해서 한 배치 정보를 얻어서 방금 수행한 모델의 정확도를 테스트해봅니다. 테스트 결과 0.7109375 값을 얻을 수 있었습니다. 높은 값은 아니지만 전체 469개 미니배치 중에서 1개 데이터셋만 테스트 했기 때문에 전체 데이터를 대상으로 테스트하면 보다 높은 정확도를 얻을 수 있을 것입니다.

cnn.eval()

with torch.no_grad():
    output = cnn(x_test)
    loss = criterion(output, y_test)
    
    predict = torch.argmax(output, dim=-1) == y_test
    accuracy =  predict.float().mean().item()
    
    print(accuracy)

Google Colab GPU Text-classification

Colaboratory(혹은 Colab)를 사용하면 브라우저에서 Python을 작성하고 실행할 수 있습니다. 장점이라면 별도의 구성이 필요 없고 무료로 GPU를 사용할 수 있다는 장점이 있습니다. 또 만든 코드를 간단하게 공유할 수도 있습니다.

감성분석(Text Classification)에 사용한 데이터는 네이버에서 공개한 영화 평점 정보입니다. 해당 데이터는 아래 링크에서 받을 수 있습니다.
https://github.com/e9t/nsmc

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import io

본 예제에서 사용할 konlpy를 Colab에 설치합니다.

!pip install konlpy
from konlpy.tag import Okt
okt = Okt()

Colab에서 사용할 파일을 사용자 계정의 구글 드라이브에 업로드합니다. 그리고 업로드한 파일 정보를 Colab에서 읽을 수 있도록 필요한 python 라이브러리를 등록해야합니다. 아래 코드를 실행하면 구글 계정에 인증할 수 있는 정보가 나오고 키값을 입력하면 드라이브에 접근할 수 있습니다.

from google.colab import drive
drive.mount('/content/gdrive')

구글 드라이브에 접근이 완료되면 파일이 있는 디렉토리 위치로 이동합니다.

from google.colab import drive
drive.mount('/content/gdrive')
%cd gdrive/My\ Drive/Colab\ Notebooks

해당 위치로 이동한 후에 %ls 명령을 실행시켜보면 해당 폴더에 있는 파일 리스트를 표시해줍니다. 파일 중에서 학습에 사용할 파일을 open하면 됩니다.

%ls
def read_data(filename):
    with io.open(filename, 'r',encoding='utf-8') as f:
        data = [line for line in f.read().splitlines()]
        data = data[1:]
    return data 

sentences = []
# 테스트를 위해 길이가 30 이하인 문장을 읽음
for sentence in read_data('./ratings_test.txt'):
  if len(sentence) <= 30:
    sentences.append(sentence)

해당 파일을 읽어보면 아래와 같은 형태로 데이터가 구성되어 있습니다. 이전 예제에서 설명했던 것처럼 1은 긍정적인 답변을 0은 부정적인 답변을 의미합니다.

['6270596|굳 ㅋ|1',
 '7898805|음악이 주가 된, 최고의 음악영화|1',
 '6315043|진정한 쓰레기|0',
 '7462111|괜찮네요오랜만포켓몬스터잼밌어요|1',
 '10268521|소위 ㅈ문가라는 평점은 뭐냐?|1' ...
class Vocab():
    def __init__(self):
        self.vocab2index = {'<pad>':0,'<unk>':1} # padding 0, unkown 1
        self.index2vocab = {0:'<pad>',1:'<unk>'} # 0 padding, 1 unkown
        self.vocab_count = {}
        self.n_vocab = len(self.vocab2index)
        
    def add_vocab(self, sentence):
        for word in sentence:
            if word not in self.vocab2index:
                self.vocab2index[word] = self.n_vocab
                self.index2vocab[self.n_vocab] = word
                self.vocab_count[word] = 1
                self.n_vocab += 1
            else:
                self.vocab_count[word] += 1

vo = Vocab()

def charStrip(s):
    s = s.replace('"','').replace('「','').replace('」','').replace('“','').replace('?','').replace('”','')
    s = s.replace('(',' ').replace(')',' ').replace('‘','').replace('’','').replace('□','').replace('◆','').replace('◇','')
    s = s.replace('[',' ').replace(']',' ').replace('○','').replace('△','').replace('◎','').replace('▣','').replace('◇','')
    s = s.replace('.',' ').replace('*',' ').replace('.',' ').replace('~',' ')
    
    return s

x = [] # text sentence
y = [] # label
for sentence in sentences:
    arr = sentence.split('|')
    if(len(arr) == 3):
      sentence = okt.morphs(charStrip(arr[1]))
      
      vo.add_vocab(sentence)
      x.append(sentence) 
      y.append(float(arr[2])) 
MAX_SEQUENCE_LENGTH = 0

for sentence in x:
    if MAX_SEQUENCE_LENGTH < len(sentence): MAX_SEQUENCE_LENGTH = len(sentence)

MAX_SEQUENCE_LENGTH

데이터 중에서 가장 긴 문장을 확인해봅니다. 이 문장의 크기가 Sequence Length가 됩니다. 이 문장의 길이보다 작은 문장의 경우 빈칸은 <unk> 값으로 채워줍니다.

def tensorize(vocab, sentence):
    idx = [vocab.vocab2index[word] for word in sentence]
    #return torch.Tensor(idx).long().item()
    return idx

tmp_tensor = []
for sentence in x:
    tmp = tensorize(vo, sentence)
    tmp_zero = np.zeros(MAX_SEQUENCE_LENGTH)
    
    for i,val in enumerate(tmp):
        tmp_zero[i] = val
        
    tmp_tensor.append(tmp_zero)
    
x_data = torch.Tensor(tmp_tensor).long()
y_data = torch.Tensor([float(t) for t in y])
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
output : device(type='cuda')

구글 Colab에서는 GPU를 사용할 수 있기 때문에 필요한 설정을 해줍니다. CPU로 연산할 때보다 훨씬 빠른 계산속도를 보여줍니다.

학습용 데이터는 8/2로 학습용 Train Data / Valid Data 데이터로 나눠줍니다.

DATA_LENGTH = x_data.size(0)

train_cnt = int(DATA_LENGTH*.8)
valid_cnt = DATA_LENGTH - train_cnt
print('train_cnt, valid_cnt = ',train_cnt,valid_cnt)

idx = torch.randperm(DATA_LENGTH)
x_train = torch.index_select(x_data, dim=0, index=idx).to(device).split([train_cnt, valid_cnt], dim=0)
y_train = torch.index_select(y_data, dim=0, index=idx).to(device).split([train_cnt, valid_cnt], dim=0)

각 데이터셋은 pytorch의 Dataset, DataLoader를 사용해서 배치사이즈로 나눠줍니다. 학습할 데이터가 많은 경우에 많은 데이터를 한번에 읽으면 메모리 부족현상이 발생하는데 Dataset을 배치사이즈로 분리해서 로딩하면 메모리를 적게 사용하게 되어 큰 데이터도 학습할 수 있습니다.

from torch.utils.data import Dataset, DataLoader

class SimpanDataset(Dataset):
    
    def __init__(self, data, label):
        super().__init__()
        self.data = data
        self.labels = label
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]
    
# train_loader
train_loader = DataLoader(dataset=SimpanDataset(x_train[0], y_train[0]), batch_size=250, shuffle=True)
valid_loader = DataLoader(dataset=SimpanDataset(x_train[1], y_train[1]), batch_size=250, shuffle=False)

학습용 모델을 아래와 같이 설정합니다. 학습 모델은 Embedding -> BiLSTM -> Softmax 레이어를 통과하면서 최종 output을 만들어냅니다.
단, output은 모든 Sequence의 데이터를 사용하지 않고 마지막 시퀀스의 값만 사용하며 LSTM의 모델에서 bidirectional을 True로 설정했기 때문에 output*2의 값이 리턴됩니다. 학습에 최종 결과물은 0과 1이기 때문에 hidden_size는 2입니다.

class SimpanClassificationModel(nn.Module):
    
    def __init__(self, input_size, hidden_size):
        super().__init__()
        
        self.embedding = nn.Embedding(input_size, 300)
        
        self.rnn = nn.LSTM(input_size=300, hidden_size=100, num_layers=4, batch_first=True, bidirectional=True)
        
        self.layers = nn.Sequential(
            nn.ReLU(),
            nn.Linear(100*2,100),
            nn.Linear(100,30),
            
            nn.Linear(30, hidden_size),
        )
        
        self.softmax = nn.Softmax(dim=-1)
        
    def forward(self, x):
        y = self.embedding(x)
        y,_ = self.rnn(y)
        y = self.layers(y)
        
        return self.softmax(y[:,-1,:])
    
input_size = vo.n_vocab
hidden_size = torch.unique(y_train[1]).size(dim=-1)

model = SimpanClassificationModel(input_size, hidden_size)
model = model.cuda()
# loss & optimizer setting
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())

hist_loss = []
hist_accr = []

epochs = 501
# start training
model.train()

for epoch in range(epochs):
    epoch_loss = 0
    for x,y in train_loader:
      x, y = x.to(device), y.to(device)
      output = model(x)
      loss = criterion(output, y.long())
        
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
        
      epoch_loss += loss.item()
      accuracy = (torch.argmax(output, dim=-1) == y).float().mean().item()
    
      
    hist_loss.append(epoch_loss)
    hist_accr.append(accuracy)
        
    print('Cost: {:.6f}, Accuracy : {:.6f}'.format(loss.item(),accuracy))
    print('--{}--'.format(epoch))

학습의 진행상황을 기록하기 위해서 2개의 배열(hist_loss, hist_accr)을 사용합니다.

hist_loss는 loss값의 변화를 기록하는 배열이며 hist_accr은 해당 모델의 정확도 정보를 얻기 위해서 만든 배열입니다. 학습이 진행되며 해당 배열에 데이터가 기록되고 matplotlib.pyplot을 사용해서 그래프를 그려봅니다.

import matplotlib.pyplot as plt

fig, ax = plt.subplots(2,1)
fig.set_size_inches((12, 8)) 

ax[0].set_title('Loss')
ax[0].plot(hist_loss, color='red')
ax[0].set_ylabel('Loss')
ax[1].set_title('Accuracy')
ax[1].plot(hist_accr, color='blue')
ax[1].set_ylabel('Accuracy')
ax[1].set_xlabel('Epochs')

plt.show()
model.eval()
    
for xv, yv in valid_loader:
    output = model(x)
    
    accuracy = (torch.argmax(output, dim=-1) == y).float().mean().item()

    hist_loss.append(loss.item())
    hist_accr.append(accuracy)

    print('Accuracy : {:.6f}'.format(accuracy))

모델의 학습이 완료된 후 valid data를 통해서 학습 모델의 정확도를 알아봅니다.

Accuracy : 0.916667
Accuracy : 0.916667
Accuracy : 0.916667
Accuracy : 0.916667
Accuracy : 0.916667
Accuracy : 0.916667
Accuracy : 0.916667
Accuracy : 0.916667
Accuracy : 0.916667
Accuracy : 0.916667
Accuracy : 0.916667
Accuracy : 0.916667
Accuracy : 0.916667
Accuracy : 0.916667

인공신경망을 이용한 분류

인공신경망을 이용하여 데이터를 분류하는 문제를 테스트해보고자합니다.

필요한 라이브러리들을 다음과 같이 로드합니다.

import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim

분류 문제 해결을 위해 사용한 데이터는 zoo 데이터셋입니다. 해당 데이터는 동물의 16가지 특징 정보들 예를 들어 깃털이 있는지 알을 낳는지 비행이 가능한지 등을 체크해서 이 동물이 어느 분류에 들어가는지를 0-7까지의 중에 하나로 분류한 데이터 셋입니다.

xy = np.loadtxt('./zoo.csv', delimiter=',', dtype=np.float32)

해당 데이터의 shape을 보면 (101,17)로 되어 있습니다. 총 16개의 특징(1은 분류)을 가진 101개의 데이터라는 의미입니다.

x_data = xy[:,0:-1]
y_data = xy[:,-1]

x_data = torch.Tensor(x_data)
y_data = torch.Tensor(y_data).long()
x_data.size(), y_data.size() # (torch.Size([101, 16]), torch.Size([101]))

data_length = len(xy)

101개의 데이터를 모두 학습하지 않고 약 8/2정도로 나눠서 훈련용 데이터와 검증용 데이터를 만듭니다. 훈련용 데이터를 만들기 전에 데이터를 랜덤하게 섞어줍니다.

batch_size = .8

train_cnt = int(data_length * batch_size)
valid_cnt = data_length - train_cnt

idx = torch.randperm(data_length)
x = torch.index_select(x_data, dim=0, index=idx).split([train_cnt, valid_cnt], dim=0)
y = torch.index_select(y_data, dim=0, index=idx).split([train_cnt, valid_cnt], dim=0)

학습을 위한 모델을 만들어줍니다. 모델은 16개 데이터를 입력 받아서 최종적으로 7개의 데이터셋을 출력하는 형태입니다.

class ANN(nn.Module):
    
    def __init__(self, D_in, H, D_out):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(D_in, H),
            nn.ReLU(),
            nn.Linear(H, D_out),
        )
    
    def forward(self, x):
        return self.layers(x)

D_in, H, D_out = x_data.size(dim=-1), 100, torch.unique(y_data).size(dim=0)
model = ANN(D_in, H, D_out)

loss와 최적화를 위해 함수를 선언합니다. 또 어떻게 loss가 변화하고 그에따라 정확도가 올라가는지를 표시하기 위해서 hist_loss, hist_accr을 선언하고 학습이 완료될 때마다 데이터를 입력해줍니다.

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())

hist_loss = []
hist_accr = []

epochs = 501

for epoch in range(epochs):
    model.train()
    # loss
    y_pred = model(x[0])
    loss = criterion(y_pred, y[0])
    
    # accuracy
    predict = torch.argmax(y_pred, dim=-1).data == y[0]
    accr = predict.float().mean().item()
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    hist_loss.append(loss.item())
    hist_accr.append(accr)
    
    if epoch%10==0:
        #print(y_pred)
        print('Epoch {:4d}/{} Cost: {:.6f} Accuracy:{}'.format(
            epoch, epochs, loss.item(), accr
        ))

학습이 반복될때마다 loss가 낮아지고 정확도가 올라가는 모습을 볼 수 있습니다. loss도 완만하게 내려가는 것을 보니 learning_rate가 적절히 선언된것 같습니다.

import matplotlib.pyplot as plt

plt.plot(hist_loss)
plt.plot(hist_accr)
plt.legend(['Loss','Accuracy'])
plt.title('Loss/Legend')
plt.xlabel('Epoch')
plt.show()

이제 학습이 완료되었고 테스트용 데이터셋을 활용해서 모델의 정확도를 검증해본결과 Accuracy: 0.95238 데이터를 얻었습니다. 학습을 더 많이 진행한다면 더 높은 정확도를 얻을 수 있을 거라고 생각합니다.

model.eval()

with torch.no_grad():
    predict = model(x[1])
    
    # accuracy
    predict = torch.argmax(predict, dim=-1).data == y[1]
    accr = predict.float().mean().item()
    
    print("Accuracy: %.5f" % accr)

Seq2Seq 문장번역

파이토치 Seq2Seq 예제

import random
import torch
import torch.nn as nn
import torch.optim as optim
torch.manual_seed(0)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
raw = ['I called Tom for help.	나는 톰에게 도움을 요청했다.',
'I do not like science.	나는 과학이 싫어.',
'I hate myself as well.	나도 내 자신을 싫어해.',
'I knew Tom would lose.	톰이 질 거라는 것을 난 알고 있었어.',
'I know Tom personally.	난 톰을 개인적으로 알고 있어.',
'I like Korean cuisine.	전 한국 요리가 좋아요.',
'I like Korean cuisine.	전 한국 요리를 좋아해요.',
'I like helping others.	나는 남을 돕는 것을 좋아한다.',
'I really like puppies.	저는 강아지가 정말 좋아요.',
'I run faster than Tom.	나는 톰보다 빠르게 달릴 수 있어.',
'I think Tom is lonely.	톰이 외로워하는 것 같아.',
'I think they like you.	그들이 널 좋아하는 것 같아.',
'I want to go to sleep.	나 자러 가고 싶어.',
'I want to go to sleep.	나 자고 싶어.',
'I want to visit Korea.	나는 한국에 들르고 싶다.']

사용한 데이터는 http://www.manythings.org/anki/ 에서 kor-eng.zip 파일을 다운로드 받아 일부 데이터만 사용했습니다. 해당 사이트에 들어가면 한국어 외에도 다양한 형태의 파일을 다운 받을 수 있습니다.

SOS_token = 0 # 문장의 시작 Start of Sentence
EOS_token = 1 #  문장의 끝 End of Sentence
class Vocab:
    def __init__(self):
        self.vocab2index = {"<SOS>":SOS_token, "<EOS>":EOS_token}
        self.index2vocab = {SOS_token:"<SOS>", EOS_token:"<SOS>"}
        self.vocab_count = {}
        self.n_vocab = len(self.vocab2index)
    
    def add_vocab(self, sentence):
        for word in sentence.split(' '):
            if word not in self.vocab2index:
                self.vocab2index[word] = self.n_vocab
                self.vocab_count[word] = 1
                self.index2vocab[self.n_vocab] = word
                self.n_vocab += 1
            else:
                self.vocab_count[word] += 1
# declare simple encoder
class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(Encoder, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(input_size, hidden_size) # Embedding(17, 16)
        self.gru = nn.GRU(hidden_size, hidden_size)

    def forward(self, x, hidden):
        x = self.embedding(x).view(1, 1, -1)
        x, hidden = self.gru(x, hidden)
        return x, hidden

    
# declare simple decoder
class Decoder(nn.Module):
    def __init__(self, hidden_size, output_size):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size, num_layers=1, batch_first=True)
        self.out = nn.Linear(hidden_size, output_size)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x, hidden):
        x = self.embedding(x).view(1, 1, -1)
        x, hidden = self.gru(x, hidden) # lstm을 사용할 경우 해당 위치 수정
        x = self.softmax(self.out(x[0]))
        return x, hidden
# read and preprocess the corpus data
def preprocess(corpus):
    print("reading corpus...")
    pairs = []
    for line in corpus:
        pairs.append([s for s in line.strip().lower().split("\t")])
    print("Read {} sentence pairs".format(len(pairs)))

    pairs = [pair for pair in pairs]
    print("Trimmed to {} sentence pairs".format(len(pairs)))

    source_vocab = Vocab()
    target_vocab = Vocab()

    print("Counting words...")
    for pair in pairs:
        source_vocab.add_vocab(pair[0])
        target_vocab.add_vocab(pair[1])
    print("source vocab size =", source_vocab.n_vocab)
    print("target vocab size =", target_vocab.n_vocab)

    return pairs, source_vocab, target_vocab

# 데이터셋, 입력단어정보, 출력단어정보
pairs, source_vocab, target_vocab = preprocess(raw)

훈련용 입출력 데이터셋을 위와 같이 만든후 이제 인코더, 디코더 모델을 만들어야 합니다. 먼저 만들기 전에 인코더-디코더의 입출력 정보에 대하여 직접 그림으로 그려보시기를 추천합니다. 가장 좋은 것은 노트에 펜으로 그려보시는 것이 좋겠지만 그렇지 않다면 머리속으로 어떤 입력이 들어오고 어떤 출력이 나가는지에 대한 정보를 설계하는 과정이 필요합니다.

이런 과정이 없으면 나중에 인코더와 디코더를 설계할 때에 혼동하기 쉽기 때문에 반드시 모델의 입출력 흐름을 구상해보시기 바랍니다.

본 예제의 인코더-디코더 정보는 다음과 같습니다.
인코더 : input_vector(41) -> Embedding(41,30) -> LSTM(30,30)
디코더 : Embedding(52,30) -> LSTM(30, 52) – hidden_vector(52)

enc_hidden_size = 30
dec_hidden_size = enc_hidden_size
enc = Encoder(source_vocab.n_vocab, enc_hidden_size).to(device)
dec = Decoder(dec_hidden_size, target_vocab.n_vocab).to(device)
def tensorize(vocab, sentence):
    idx = [vocab.vocab2index[word] for word in sentence.lower().split(' ')]
    idx.append(vocab.vocab2index['<EOS>'])
    return torch.Tensor(idx).long().to(device).view(-1,1)
tensorize(source_vocab, 'I called Tom for help.')
output : tensor([[2], [3], [4], [5], [6], [1]])
training_source = [tensorize(source_vocab, pair[0]) for pair in pairs]
training_target = [tensorize(target_vocab, pair[1]) for pair in pairs]

Train

loss_total = 0
number_epoch = 5001

encoder_optimizer = optim.SGD(enc.parameters(), lr=0.01)
decoder_optimizer = optim.SGD(dec.parameters(), lr=0.01)

criterion = nn.NLLLoss()

for epoch in range(number_epoch):
    epoch_loss = 0
    
    for i in range(len(training_source)):
        encoder_optimizer.zero_grad()
        decoder_optimizer.zero_grad()
        
        source_tensor = training_source[i]
        target_tensor = training_target[i]

        encoder_hidden = torch.zeros([1, 1, enc.hidden_size]).to(device)

        source_length = source_tensor.size(0)
        target_length = target_tensor.size(0)
        
        loss = 0

        for enc_input in range(source_length):
            _, encoder_hidden = enc(source_tensor[enc_input], encoder_hidden)

        decoder_input = torch.Tensor([[SOS_token]]).long().to(device)
        decoder_hidden = encoder_hidden # connect encoder output to decoder input

        for di in range(target_length):
            decoder_output, decoder_hidden = dec(decoder_input, decoder_hidden)
            #print(decoder_output, target_tensor[di], criterion(decoder_output, target_tensor[di]))
            loss += criterion(decoder_output, target_tensor[di])
            decoder_input = target_tensor[di]  # teacher forcing
        
        loss.backward()

        encoder_optimizer.step()
        decoder_optimizer.step()
        
        #print(loss.item(),target_length)
        epoch_loss += loss.item()/target_length
        #loss_total += loss_epoch
    if epoch % 100 == 0:
        print('--- epoch {}, total loss {} '.format(epoch,float(epoch_loss/15)))

Evaluate

for pair in pairs:
    print(">", pair[0])
    print("=", pair[1])
    source_tensor = tensorize(source_vocab, pair[0])
    source_length = source_tensor.size()[0]
    encoder_hidden = torch.zeros([1, 1, enc.hidden_size]).to(device)

    for ei in range(source_length):
        _, encoder_hidden = enc(source_tensor[ei], encoder_hidden)
        #print(encoder_hidden.size()) # 1,1,16

    decoder_input = torch.Tensor([[SOS_token]], device=device).long()
    decoder_hidden = encoder_hidden
    decoded_words = []

    for di in range(20):
        decoder_output, decoder_hidden = dec(decoder_input, decoder_hidden)
        #print('decoder_iput',decoder_input, 'decoder_output',decoder_output)
        _, top_index = decoder_output.data.topk(1)
        if top_index.item() == EOS_token:
            decoded_words.append("<EOS>")
            break
        else:
            decoded_words.append(target_vocab.index2vocab[top_index.item()])

        decoder_input = top_index.squeeze().detach()

    predict_words = decoded_words
    predict_sentence = " ".join(predict_words)
    print("<", predict_sentence)
    print("")
> i called tom for help.
= 나는 톰에게 도움을 요청했다.
< 나는 톰에게 도움을 요청했다. <EOS>

> i do not like science.
= 나는 과학이 싫어.
< 나는 과학이 싫어. <EOS>

> i hate myself as well.
= 나도 내 자신을 싫어해.
< 나도 내 자신을 싫어해. <EOS>

> i knew tom would lose.
= 톰이 질 거라는 것을 난 알고 있었어.
< 톰이 질 거라는 것을 난 알고 있었어. <EOS>

> i know tom personally.
= 난 톰을 개인적으로 알고 있어.
< 난 톰을 개인적으로 알고 있어. <EOS>

> i like korean cuisine.
= 전 한국 요리가 좋아요.
< 전 한국 요리를 좋아해요. <EOS>

> i like korean cuisine.
= 전 한국 요리를 좋아해요.
< 전 한국 요리를 좋아해요. <EOS>

> i like helping others.
= 나는 남을 돕는 것을 좋아한다.
< 나는 남을 돕는 것을 좋아한다. <EOS>

> i really like puppies.
= 저는 강아지가 정말 좋아요.
< 저는 강아지가 정말 좋아요. <EOS>

> i run faster than tom.
= 나는 톰보다 빠르게 달릴 수 있어.
< 나는 톰보다 빠르게 달릴 수 있어. <EOS>

> i think tom is lonely.
= 톰이 외로워하는 것 같아.
< 톰이 외로워하는 것 같아. <EOS>

> i think they like you.
= 그들이 널 좋아하는 것 같아.
< 그들이 널 좋아하는 것 같아. <EOS>

> i want to go to sleep.
= 나 자러 가고 싶어.
< 나 자고 싶어. <EOS>

> i want to go to sleep.
= 나 자고 싶어.
< 나 자고 싶어. <EOS>

> i want to visit korea.
= 나는 한국에 들르고 싶다.
< 나는 한국에 들르고 싶다. <EOS>