Elastic Search

Elastic은 텍스트, 숫자, 위치 기반 정보, 정형 및 비정형 데이터 등 모든 유형의 데이터를 위한 무료 검색 및 분석엔진으로 분산과 개방형이 특징입니다.

Elastic은 강력한 검색 기능을 통해 사람들이 차별화된 방식으로 데이터를 탐색하고 분석할 수 있도록 도와줍니다 – Elastic Search

ElasticSearch는 Apache Lucene을 기반으로 구축되었습니다. 무료인데다 처리 할 수 있는 데이터도 다양하고 성능도 뛰어나서 다양한 분야에서 사용되고 있습니다.

Elastic은 2010년에 최초로 출시된 이후로 REST API 형태의 편리한 사용성, 고성능을 필요로 하는 환경에서의 확장과 분산 처리는 Elastic의 빠른 성장을 이끌어 왔습니다. 또한 ELK(Elasticsearch, Logstash, Kibna) Stack라고 불리는 강력한 도구는 데이터의 수집, 검색, 시각화 등을 가능하게 해주었습니다.

https://www.elastic.co/kr/what-is/elk-stack

Elastic의 사용성은 웹사이트 검색, 로깅과 로그 분석, 애플리케이션 성능 모니터링, 위치 기반 정보 분석 및 시각화, 보안 분석, 비지니스 분석 등 다양 분야에서 활용되고 있습니다.

Elasticsearch의 작동 방식

시스템에서 만들어지는 로그 데이터, 웹 애플리케이션에서 만들어지는 다양한 정형(비정형) 데이터 등 다양한 원시 데이터가 Elastic에 들어갑니다. 데이터 수집은 원시 데이터가 Elastic에서 색인되기 전에 수행되는 구문 분석, 정규화를 포함합니다.

Elastic에서 색인되면 사용자는 분석을 위해 복잡한 쿼리를 수행하고 다양한 형태의 집계를 사용해서 데이터를 분석, 요약, 검색을 할 수 있습니다. Kibana 이러한 작업을 수행하는 사용자를 위한 다양한 형태의 시각화 도구를 제공해줍니다. Elastic의 데이터 색인은 기본적으로 시계열 데이터를 포함하여 수집되는 데이터를 분석할 수 있으며 위치 기반의 정보 분석를 이용한 분석이 가능합니다.

Elasticsearch 색인

Elastic의 색인(Index)는 서로 관련되어 있는 문서(Document)들의 모음입니다. Elastic은 json 문서로 데이터를 저장하며 각 문서는 일련의 키와 그에 해당하는 문자열, 숫자, 부울, 배열, 위치 데이터 또는 기타 유형의 데이터를 서로 연결되어 있습니다.

Elastic은 역 인덱스라고 하는 데이터의 구조를 사용하는데 이것은 아주 빠른 풀텍스트 검색을 할 수 있도록 설계된 것입니다. 역 인덱스는 문서에 나타나는 모든 고유한 단어들의 목록을 만들고 각 단어가 발생하는 문서를 식발하게 됩니다.

색인 프로세스 중에 Elastic은 문서를 저장하고 역 인덱스를 구축하여 거의 실시간으로 문서를 검색할 수 있습니다. 인덱스 API를 사용해 색인이 시작되고 이를 통해서 특정한 인덱스에 json 문서를 새롭게 추가하거나 업데이트 할 수 있습니다.

Inverted Indexes and Index Terms

위의 이미지는 역 인덱스를 설명하는 좋은 예제입니다.

왼쪽의 이미지는 3개의 문장이 있고 각 문장은 인덱스 번호(1,2,3)가 부여되어 있습니다. 이것은 마치 관계형데이터베이스(RDBMS)의 한 테이블에 3개의 문장이 있는 것과 같은 의미고 각 문장의 PK는 1,2,3입니다.

이 상태에서 보통의 RDBMS에서는 ‘choice’를 찾기 위해서 1번부터 순차적으로 단어를 검색하며 내려가게 됩니다. 그리고 3번째 문장에서 그 답을 찾을 수 있고 사용자에게 정답을 리턴합니다.

이제 오른쪽은 Elastic의 역 인덱스 방식입니다. 각 문장의 단어를 분리해서 빈도수를 포함한 Dictionary를 만들게 됩니다. 이것은 마치 python에서 dict를 와 비슷한 개념입니다. 또 각 단어가 어떤 문장의 위치에 있는지 그 정보를 저장하게됩니다. 이런 정보는 이전에 ‘choice’라는 단어를 찾을 때에 바로 문서의 위치를 리턴해주기 때문에 아주 빠른 검색이 가능하게됩니다.

Elastic을 사용하는 이유

Elastic을 이용하는 이유는 앞선 글에서와 같은 다양한 장점이 있기 때문입니다. 이를 요약하면 3가지로 이야기 할 수 있습니다.

첫째, Elastic은 빠릅니다. Elastic은 Lucene을 기반으로 구축되어 있기 때문에 풀 텍스트 검색에서 뛰어납니다. 거의 실시간이라고 할 수 있을 정도로 색인 될 때부터 검색이 가능해질 때까지의 대기 시간이 아주 짧다는 의미입니다. 이 대기 시간은 보통 1초입니다. 결과적으로 Elastic은 보안 분석, 인프라 모니터링 같은 시간이 중요한 사례에 가장 이상적인 엔진입니다.

둘째, Elastic은 분산적입니다. Elastic에 저장된 문서는 샤드라고 하는 여러 다른 컨테이너에 걸쳐 분산되며 이 샤드는 복제되어 하드웨어 장애 시에 중복되는 데이터 산본을 제공합니다. Elastic의 분산적인 특징은 수백 개(심지어 수천 개)의 서버까지 확장하고 페타 바이트의 데이터를 처리 할 수 있도록 해줍니다.

셋째, Elastic은 광범위한 기능 세트와 함께 제공됩니다. 속도, 확장성, 복원력 뿐만 아니라 데이터의 롤업, 인덱스 수명 주기 관리 등과 같이 데이터를 훨씬 더 효율적으로 저장하고 검색 할 수 있게 해주는 다수의 기본 기능이 탑재 되어 있습니다.

넷째, Elastic은 데이터 수집, 시각화, 보고를 간소화합니다. Beats와 Logstash의 통합은 Elastic으로 색인 하기 전에 데이터를 훨씬 더 쉽게 처리 할 수 있도록 만들어줍니다. 사용자는 Elastic에 데이터를 입력하기 위해서 별도의 특별한 도구를 개발하지 않아도 파일 형태의 다양한 데이터, RDBMS 데이터 등을 빠르게 입력 할 수 있습니다. 또한 Kibana와 같은 시각화 도구는 실시간으로 입력되는 데이터를 분석, 모니터링 할 수 있도록 훌륭한 UI를 제공합니다.

https://www.elastic.co/guide/kr/logstash/current/introduction.html

Elastic은 국내에도 많은 사용자가 있고 기술 공유와 교육을 위한 활동을 계속 해오고 있습니다.

또한 많은 자료들이 공식 사이트를 통해서 공유되어 있으니 쉽고 빠르게 소식을 접할 수 있습니다.

챗봇 슬롯 채우기(Slot-Filling)

대화는 어떤 목적을 이루기 위한 대화(TOD, Task Oriented Dialog)가 있고 특별한 목적 없이 자신의 생각을 자유롭게 이야기 하는 소통을 목적으로 하는 대화(Chit-Chat Dialog)가 있습니다.

목적이 있는 대화의 경우에는 대화의 특징에 따라서 n개의 대화턴(Turn)으로 구성된 대화의 묶음으로 나눌 수 있는데 그것을 태스크(Task)라고 할 수 있습니다. 그리고 각 태스크는 또 n개의 액티비티(Activity)로 나눌 수 있습니다. 그리고 각각의 액티비티에는 태스크의 원활한 진행을 위해서 반드시 입력되어야 할 필요한 정보들이 있습니다.

이처럼 특정한 태스크의 발화(Utterance)에서 반드시 필요한 의미 있는 정보들을 슬롯(Slot)이라고 하고 이 슬롯을 채우는 것을 슬롯 필링(Slot Filling)이라고 합니다.

https://d2.naver.com/helloworld/2110494

그리고 이 슬롯을 채우기 위해서 거듭해서 질문을 하게되는데 이러한 것을 Follow-Up Question이라고 합니다. 아래의 그림과 같이 좌석을 예약할 때에 기본적으로 필요한 정보들이 있고 이러한 정보들이 입력되지 않으면 시스템은 이용자에게 질문을 통해 해당 정보를 얻게 됩니다.

https://d2.naver.com/helloworld/2110494

이런 과정을 수행하기 위해서는 개체명인식(NEG, Named Entity Recognition) 작업이 필요합니다.
해당 작업은 사용자가 입력한 텍스트를 사전에 정의된 몇가지 단어들… 예를 들어서 메뉴명, 지역명, 호텔명 등에 태깅하는 작업을 의미하고 이것은 비슷한 말로 엔티티 태깅(Entity Tagging)이라고도 합니다.

여기서 중요한 것은 기존에 정의된 태그 목록이라는 것입니다. 태그 목록은 기존에 범용적으로 사용되는 것도 있고 대화가 진행되는 특정한 도메인에서 활용되는 것도 있습니다.

이처럼 사용자가 입력한 단어들이 기존에 정의된 개체명에 포함되어 있는지를 살펴보고 없다면 필요한 정보를 다시 요구하는 방식으로 요청 정보를 채우게됩니다.

다음에 구현한 예제는 이러한 과정을 간단히 작성한 내용입니다. 주문상황을 가정하고 챗봇을 통해서 주문을 받아보는 방식을 생각해보겠습니다.
먼저 아래와 같이 슬롯을 정의합니다.

#Intent Slot 구성
slot_entity = { "주문": {"메뉴":None, "장소":None, "날짜":None},
                "예약": {"장소":None, "날짜":None},
                "날씨": {"장소":None, "시간":None}
              }

사용자가 입력된 텍스트가 “주문 부탁해”입니다. 챗봇 시스템이 먼저 파악해야 할 것이 있다면 텍스트를 입력한 사람이 어떤 의도로 이런 텍스트를 입력했는지를 알아내는 것입니다.

간단히는 ‘주문’이라는 키워드를 확인해서 대화의 의도를 파악할 수 있고 몇가지 ‘주문’ 상황에서 쓸 수 있는 문장들을 입력하고 유사도 검사를 통해서도 이용자의 의도를 파악할 수 있습니다. 최근에는 딥러닝을 활용해서 해당 문장이 어떤 의도인지 알아내기도 합니다. 딥러닝(LSTM)을 활용한 문장 유사도를 찾는 것은 예제로 구현되어 있으니 참고하시기 바랍니다.

입력한 문장 ‘햄버거 주문할께요’라는 단어가 ‘주문’이라는 의도를 가지고 있다는 것을 파악한 다음에 ‘햄버거 주문할께요’라는 입력 문장을 분석해서 예약에 필요한 정보들을 담고 있는지 확인해야합니다. 그러기 위해서 형태소 분석(Morphology Analysis) 과정이 필요합니다.

input_txt = '햄버거 주문할께요'
intent_code = '주문'

from konlpy.tag import Kkma
kkma = Kkma()
morpheme = kkma.pos(input_txt)
print(input_txt,'\n',morpheme)
# [('햄버거', 'NNG'), ('주문', 'NNG'), ('하', 'XSV'), ('ㄹ게요', 'EFN')]

입력된 문장의 형태소 분석을 통해서 ‘햄버거’,’주문’이라는 개체명(명사)을 찾아내었습니다. 이제 다음으로는 이런 개체명이 사전에 정의해둔 개체명과 일치하는 내용을 찾는 과정을 수행합니다. 입력된 단어 중에서 ‘햄버거’란 단어는 이미 메뉴 아이템에 사전으로 등록했기 때문에 ‘햄버거’는 menu_item이라고 인식합니다.

# 개체명
menu_item = ['피자','햄버거','치킨','떡볶이']
loc_item = ['세종','대전','공주']
date_item = ['지금','내일','모래']

# 개체명 태깅
for pos_tag in morpheme:
    if (pos_tag[1] in ['NNG', 'NNP']): #명사, 영어만 사용
        if pos_tag[0] in menu_item: #메뉴 item 검색
            slot_value["메뉴"] = pos_tag[0] 
        elif pos_tag[0] in loc_item: #장소 item 검색
            slot_value["장소"] = pos_tag[0] 
        elif pos_tag[0] in date_item: #날짜 item 검색
            slot_value["날짜"] = pos_tag[0] 
print (slot_entity.get(intent_code))

입력된 문장을 분석해본 결과 주문에 필요한 나머지 정보들 즉, ‘주소’ 정보와 ‘시간’ 정보는 입력되지 않았습니다. 챗봇 시스템은 빠진 두개의 정보를 입력할 것을 요청합니다. 이것을 Follow-Up Question이라고 합니다.

if(None in slot_value.values()): #빈 Slot 출력
    key_values = ""
    for key in slot_value.keys():
        if(slot_value[key] is None):
            key_values = key_values + key + ","
    output_data = key_values[:-1] + '를 입력해주세요.'
else:
    output_data = "주문이 완료 되었습니다."
            
print (output_data)
#메뉴,장소,날짜를 입력해주세요.

이러한 방법으로 주문에 필요한 모든 정보가 입력되면 챗봇 시스템은 주문 정보를 기반으로 실제로 요청한 내용들을 주문하게됩니다.

참고로 본 글을 작성하기 위해서 잘 정리된 아래의 두개 글을 참고했습니다. 관심이 있으신 분은 아래의 글을 검색해보시기 바랍니다.

REF
[Naver] 챗봇을 위한 대화는 어떻게 디자인할까
[KAKAO] 카카오 미니의 슬롯 태깅

챗봇(Chatbot) 이란

채팅은 보통 일상의 소소한 대화를 주고 받는 경우와 어떤 목적을 위해 당사자간 정보를 주고 받는 경우 혹은 정보의 이동이 단방향인 경우 세가지가 있습니다. 이중에서 어떤 목적을 위해 당사자간에 정보를 주고 받는 경우는 대화하라고 하고 정보의 요구 주체가 있어서 상대방은 정보를 주기만 하는 것을 QnA라고 할 수 있습니다.

그렇다면 이러한 챗봇은 어떤 방법으로 사람의 말을 이해하고 또 어떻게 구현할 수 있을까? (사실 챗봇과 음성인식은 거의 비슷한 기술을 사용합니다. 단지 앞 단에 음성을 텍스트로 변환해주거나 텍스트를 음성으로 변환해주는 기능을 수행하는 STT/TTS와 같은 기술이 적용될 뿐입니다.)
먼저 어떻게 이해하는지에 대해서 간단히 그림으로 살펴보면 아래와 같습니다.

사람은 머리속에 생각이나 감정 등을 언어의 형태로 상대방에게 전달합니다. 그 언어를 자연어(Natural Language)라고 합니다. 이 자연어는 인간의 언어이기 때문에 당연히 컴퓨터가 이해할 수 없습니다. 그렇기 때문에 인간의 언어를 컴퓨터에게 이해시키기 위해서 NLU(자연어이해, Natural Language Understanding)라는 분석과정이 필요합니다. 컴퓨터는 이러한 특별한 처리 단계를 통해서 인간의 이 말이 어떤 의미가 있는가를 이해하게 됩니다.

NLU의 과정은 축적된 데이터가 큰 역활을 합니다. 마치 아이들이 언어를 배울 때 좋은 환경에서 말을 배우는 것과 때로 거친 환경에서 말을 배우는 것이 사용하는 어휘의 차이가 있듯이 컴퓨터도 인간의 언어를 학습하는데 데이터가 절대적인 영향을 미치게됩니다.

어떤 데이터를 어떻게 축적 했느냐에 따라서 인간의 말을 더 잘 이해할 수 있습니다. 실제로 이러한 일을 수행하기에는 굉장히 큰 사전 데이터를 필요로합니다. 대화처리기는 학습된 데이터를 통해서 인간의 말에 어떤 응답을 해야할지를 선택하게 되고 그것을 NLG(자연어생성, Natural Language Generator)에 전달하게됩니다. NLG는 인간의 언어를 이해한 내용을 바탕으로 다시 인간이 이해할 수 있는 음성과 글의 형태로 출력해줍니다.

사실 이러한 과정은 사람에게서도 비슷하게 일어나고 있습니다. 인간 역시 화자의 관념화된 사상을 말이나 글로 표현하게 되고 상대방은 이러한 글을 다시 해석해서 관념화 하는 과정이 일어나고 있습니다.

위의 그림은 서정연교수님의 <대화 인터페이스, 챗봇, 그리고 자연어처리>라는 강의자료에 첨부되어 있는 그림입니다. 입력 데이터를 문장으로 가정할 때에 문장이 입력되고 그 문장이 어떤 과정에 의해서 어떻게 이해되는지에 대한 그림이 도식화되어 있습니다. 이후에 사용하는 자료도 해당 슬라이드에서 인용했습니다.

가장 기본이 되는 것은 보라색으로 표시되어 있는 부분입니다.

형태소분석기(Morphology) – 구문분석기(Syntax) – 의미분석기(Semantics) – 담화분석기(Discourse)

형태소분석(Morphology) : 명사, 조사 따위로 분리하는 단계, 의미를 가지는 가장 작은 단위로 분리
구문분석(Syntax) : 형태소들이 결합하여 문장이나 구절을 만드는 구문 규칙에 따라서 문장 내에서 각 형태소들이 가지는 역할을 분석
의미분석(Semantics) : 문장의 각 품사들이 어떤 역할을 하는지 보고 분석하는 단계, 각 어휘간 같은 단어라도 문장에서 어떤 의미로 사용되는가를 분석
담화분석기(Discourse) : 담화는 글의 흐름 및 연속체란 의미로 한 문장이 아닌 전체 문장간의 관계를 연구하여 글의 결합력과 통일성을 보는 연구, 언어가 사용되는 상황을 고려한 문맥의 이해

이러한 방식으로 자연어를 이해하고 처리합니다. 이런 과정을 통합하여 자연어처리(NLP, Natural Language Processing)라고 합니다.

챗봇을 만드는 방법은 여러가지가 있습니다. 우선 말꼬리를 이어서 대화를 이어가는 방식이 있습니다. 말을 계속해서 이어가기 때문에 적절한 응답을 할 수는 있지만 문맥의 흐름이 맞지 않을 수 있다는 단점이 있습니다.

또 하나는 미리 만들어진 대화쌍을 DB에 저장하고 사용자의 발화와 가장 유사한 대화쌍을 찾아 대화를 이어가는 방식이 있습니다. 그리고 이러한 방법을 확장을 확장하여 아래와 같이 표현합니다.

어디 사는지 물어봐도 되요? 라는 질문은 아래와 같이 6개의 질문으로 확장할 수 있습니다. 이에 따른 답변도 오른쪽과 같이 7개로 제공합니다. 이러한 대화쌍이 풍성해지면 채팅 이용자의 다양한 질문에 재밌는 방식으로 대응 할 수 있게 됩니다.

대화형 챗봇에서 가장 많이 사용되는 것은 시나리오 기반의 챗봇입니다.

물론 현재 딥러닝 기술의 발전으로 인해서 인공지능이 번역, 대화인식 등 많은 일을 해내고 있으나 아직 입력된 문장을 통해서 자연스러운 대화를 만들어 내는 것은 더 많은 연구와 노력이 필요한 단계입니다.

앞서 이야기한 시나리오 기반의 챗봇은 미리 입력한 시나리오를 통해서 대화가 진행되도록 합니다. 가장 유명한 것은 Google의 DialogFlow입니다. 이 외에도 국내에 많은 회사들이 이러한 시나리오 기반의 챗봇을 활용해서 다양한 서비스를 제공하고 있습니다.

최근 세종학당에서 개발한 <인공지능 기반의 한국어 교육용 서비스>도 이러한 방식으로 구현되었습니다. 사전에 전문가와 함께 다수의 상황별 교육용 시나리오를 제작하고 이를 챗봇 엔진에 탑재해서 이를 통해서 한글 학습을 할 수 있도록 개발되었습니다.

시나리오 기반의 챗봇은 주어진 흐름에 따른 대화만 인식한다는 단점이 있기 때문에 이를 극복하기 위한 자연스러운 예외처리가 필요합니다. 또 주제를 이탈했을 경우 어떻게 다시 주제로 복귀하는지에 대한 기술도 필요합니다.

이외에도 또 중요한 것은 대화의 의도를 파악하는 일입니다. 이것을 의도(Intention)라고 합니다.

https://d2.naver.com/helloworld/2110494

예를 들어서 대화의 순서가 “인사-주문-결제-감사”의 순으로 진행된다면 입력된 사용자의 대화가 어떤 의도로 말한 것인지 정확하게 판단해야 합니다. 의도를 파악하지 못하면 챗봇은 사용자의 질문에 엉뚱한 대답을 하게됩니다.

이런 의도를 파악하는데 다양한 인공지능 기법이 사용됩니다. 인공지능은 사용자의 글을 통해서 이것이 어떤 내용인지 분류하고 해당 분류에 있는 대답중에서 하나를 출력합니다. 예제로 구현한 테스트 코드에서는 BiLSTM을 사용합니다. 해당 알고리즘은 RNN 기법 중 하나로 분류에서 RNN에서 좋은 성능을 나타내는 알고리즘입니다.

이 외에도 슬롯-필링(Slot Filling)이라는 기법이 있습니다. 이것은 말 그대로 빈 칸을 채우는 기법입니다.

예를 들어서 날씨를 묻는 사용자의 질문에 기본적으로 시스템이 알아야 할 정보를 사전에 정의하고 부족한 정보를 다시 사용자에게 요청하는 것입니다. 만약 사용자가 “날씨를 알려줘”라고 질문하게 되면 시스템은 시간, 장소 등을 다시 물어보게 됩니다.

아래의 그림과 같이 예약을 원하는 사용자에게는 메뉴, 가격, 사이드 메뉴, 결제 방법 등을 추가로 물어볼 수 있고 이것을 Follow-Up Questions 이라고 할 수 있습니다.

https://d2.naver.com/helloworld/2110494

시나리오 기반 챗봇(Naive Scenario Chatbot)

이번 예제에서는 간단한 시나리오 기반 챗봇을 구현해보겠습니다.

해당 예제를 실행한 결과는 아래의 영상과 같습니다.

이 대화는 아래와 같이 4개의 턴(Turn)으로 이루어져있습니다. 대화의 흐름은 “인사-간단한 일상 대화-주문-끝인사”로 이뤄져있습니다. 각 턴을 수행하면 자연스럽게 다음 턴으로 연결됩니다. 대화가 예상된 흐름으로 넘어가지 않을 때는 사전에 정의된 간단한 대화를 출력하고 다시 이전 질문을 다시 수행합니다.

Dialog Flow : Greeting – Where – Order – Bye

테스트에 사용할 간단한 시나리오는 아래와 같습니다. 아래에 order – bye가 화면상에는 표시되어 있지 않지만 내용은 위와 다르지 않습니다.

category에 NaN으로 되어 있는 부분은 시스템의 발화 부분입니다. 그 외의 부분은 시스템에 입력되는 기대값들입니다.

예를 들어 greeting 카테고리를 살펴보면 시스템이 “안녕하세요”라고 발화 했을 때에 해당 발화에 답변으로는 기대되는 값들을 greeting 카테고리에 등록합니다. 현재 시스템 발화에는 하나만 등록했지만 만약 시스템 발화 부분을 다양하게 하고자 한다면 여러개의 답변을 넣고 그중에서 하나의 답을 랜덤하게 표시해주는 방법으로 해도 됩니다. 실제로 많은 채팅 시나리오가 같은 방법으로 제작되고 있습니다. 여기서는 간단하게 시스템에서는 하나의 답변만 낼 수 있도록 합니다.

시스템 메세지에 대한 사용자의 기대되는 “안녕하세요”, “안녕”, “헬로”, “네 반갑습니다”, “hi hello” 5가지 중에 하나로 입력된다고 가정합니다.

이와 같은 방법으로 “어디서 오셨나요?”라는 시스템의 질문에도 사용자는 몇가지 대답을 할 수 있습니다. 그에 대한 답변을 미리 등록해봅니다.

동일한 방법으로 나머지 시나리오도 입력해봅니다.

그렇지만 안타깝게도 위와 같이 정의된 답변만 사용자가 입력하지는 않습니다. 사용자는 여러가지 답변을 입력할 수 있습니다. 기본적으로는 챗봇에게 많은 내용을 학습시킬 수 있다면 좋겠지만 실제로 그렇게 하기는 쉽지 않습니다. 또 하나의 문제는 시스템은 사용자가 어떤 순서로 답변을 낼지 알지 못한다는 것입니다.

그렇기 때문에 챗봇 시스템은 사용자의 입력한 답변이 입력한 시나리오에 있는지 그렇다면 어떤 질문인지 만약에 아니라면 어떻게 예외적인 사항을 처리해야 하는지 알아야합니다. 즉, NLU(자연어이해, Natural Language Understanding)가 필요합니다.

이부분에 형태소 분석과 구문분석 등의 과정이 필요하고 더 높은 이해를 위해서 사전 구축등의 작업이 필요합니다. 하지만 비슷한 예제를 이미 구현했기 때문에 여기서는 간단히 해당 문장이 어떤 카테고리에 속하는지 분류하는 분류기 정도로만 구현해 보겠습니다.

딥러닝을 활용하여 텍스트 분류를 수행할 수 있습니다.

위와 같은 텍스트 분류 예제를 참고하시기 바랍니다.

간단히 작성한 시나리오를 통해서 학습을 위해 아래와 같이 각 카테고리(Category 혹은 Intent)에 코드값을 부여해줍니다. 이때 시스템의 발화는 제외하고 사용자의 발화만 코드값을 부여합니다.

index2category = {0:'greeting',1:'where',2:'ask',3:'bye'}
def category_define(x):
    code = ''
    if x=='greeting': code=0
    elif x=='where': code=1
    elif x=='ask': code=2
    elif x=='bye': code=3
    else: code='NaN'
    return code

# category code!
df['code'] = df['category'].apply(category_define)

# only answer
df=df[df['category'].notnull()]

사용자가 입력할 예상 문장들을 아래와 같이 추출할 수 있습니다.

sentence = df['text'].values
print(sentence)
array(['안녕하세요', '안녕', '헬로', '네 반갑습니다', 'hi hello', '세종에서 왔습니다',
       '세종에서 살아요', '대전 살아요', '세종이요', '세종요', '세종시에서 왔지', '서울요', '부산요',
       '빵을 사고 싶어요', '음료수 사고 싶어요', '커피 주세요', '빵 주세요', '케이크 주세요',
       '아이스 아메리카노 주세요', '베이글 주세요', '감사합니다', '고맙습니다', '잘먹을께요', 'Thank you'],
      dtype=object)

각 문장을 형태소분석이나 구분분석 등의 과정을 생략하고 단순히 문장을 공백으로 분리하여 각 단어의 집합을 생성합니다. 집합 생성시에 입력되는 문장에 단어가 없는 경우를 위해서 unk 코드와 자리수를 맞추기 위한 padding 값을 부여합니다.

sentence = df['text'].values

words = list(set([w for word in sentence for w in word.split(' ')]))
words = np.insert(words,0,'!') # padding 1
words = np.insert(words,0,'#') # unk 0

이제 생성한 문장을 단어 단위로 분리하고 각각 Index 값을 부여했기 때문에 각 문장을 숫자로 표현할 수 있습니다. word2index의 경우는 입력되는 단어들을 Index 값으로 바꿔주는 python dictionary이고 index2word는 그 반대의 경우입니다.

해당 과정을 거치면 문장은 숫자의 형태로 변경됩니다. 이렇게 하는 이유는 컴퓨터가 인간의 문장을 이해하지 못하기 때문입니다. 이제 학습을 위해 각 단어의 입력 Sequence Length를 맞춰주는 작업이 필요합니다. 이때 지나치게 패딩을 많이 입력하면 훈련 데이터에 노이즈가 많이 들어가기 때문에 예측 결과가 좋지 않을 수 있습니다. 그리고 마지막에 각 문장이 어떤 카테고리 혹은 의도(Intent)에 속하는지 Label 데이터를 입력해줍니다.

word2index = {w:i for i,w in enumerate(words)}
index2word = {i:w for i,w in enumerate(words)}

def xgenerator(x):
    return [ word2index['#'] if x_ not in word2index else word2index[x_] for x_ in x]

x_data = [xgenerator(words.split(' ')) for words in sentence]

for ndx,d in enumerate(x_data):
    x_data[ndx] = np.pad(d, (0, config.max_length), 'constant', constant_values=0)[:config.max_length]

y_data = df['code'].values

아래와 같이 학습에 필요한 파이토치 라이브러리를 import하고 DataLoader를 통해서 학습용 데이터셋을 만들어줍니다. 배치 사이즈는 데이터 셋이 크지 않기 때문에 한번에 학습을 하는 것으로 설정하시면 됩니다.

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

config.vocab_size = len(word2index)
config.input_size = 30
config.hidden_size = len(df['code'].unique())

from torch.utils.data import Dataset, DataLoader

class TxtDataSet(Dataset):
    def __init__(self, data, labels):
        super().__init__()
        self.data = data
        self.labels = labels
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

train_loader = DataLoader(dataset=TxtDataSet(x_data, y_data), batch_size=config.batch_size, shuffle=True)

학습용 모델을 생성합니다. 학습은 아래의 3개의 레이어를 통과하고 나온 결과 값을 사용합니다. 제가 작성한 여러 예제에 해당 모델에 대한 설명이 있기 때문에 자세한 설명은 하지 않고 넘어가겠습니다.

Embedding Layer – LSTM Layer – Linear Layer

class RNN(nn.Module):
    def __init__(self, vocab_size, input_size, hidden_size):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.vocab_size = vocab_size
        
        self.embedding = nn.Embedding(self.vocab_size, self.input_size)
        self.rnn = nn.LSTM(
            input_size = self.input_size, 
            hidden_size = self.hidden_size, 
            num_layers=4, 
            batch_first=True, 
            bidirectional=True
        )
        
        self.layers = nn.Sequential(
            nn.ReLU(), 
            nn.Linear(hidden_size*2, hidden_size),
            
        )
        
    def forward(self,x):
        x = self.embedding(x)
        y, _ = self.rnn(x)
        y = self.layers(y[:,-1]) # last output dim...

        return F.softmax(y, dim=-1)
    
model = RNN(config.vocab_size, config.input_size, config.hidden_size)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())
print(model)

생성한 모델을 출력하면 아래와 같이 표시됩니다.

RNN(
  (embedding): Embedding(35, 30)
  (rnn): LSTM(30, 4, num_layers=4, batch_first=True, bidirectional=True)
  (layers): Sequential(
    (0): ReLU()
    (1): Linear(in_features=8, out_features=4, bias=True)
  )
)
model.train()

hist_loss = []
hist_accr = []

for epoch in range(config.number_of_epochs):
    epoch_loss = 0
    for x_i, y_i in train_loader:
        y_hat = model(x_i) 
        loss = criterion(y_hat, y_i)
        
        accr = torch.argmax(y_hat, axis=1)== y_i
        accr = accr.data.numpy()
        accr = accr.sum()/len(y_i)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        epoch_loss += float(loss)
    
    if epoch % 100 == 0:
        print('epoch:{}, loss:{:.5f}, accr:{:.5f}'.format(epoch, epoch_loss/config.number_of_epochs, accr))
epoch:0, loss:0.00070, accr:0.20833
epoch:100, loss:0.00068, accr:0.29167
epoch:200, loss:0.00065, accr:0.50000
epoch:300, loss:0.00062, accr:0.62500
epoch:400, loss:0.00056, accr:0.62500
epoch:500, loss:0.00054, accr:0.62500
epoch:600, loss:0.00052, accr:0.62500
epoch:700, loss:0.00051, accr:0.83333
epoch:800, loss:0.00050, accr:0.83333
epoch:900, loss:0.00049, accr:0.83333
epoch:1000, loss:0.00048, accr:0.83333
epoch:1100, loss:0.00048, accr:0.83333
epoch:1200, loss:0.00047, accr:1.00000
epoch:1300, loss:0.00046, accr:1.00000
epoch:1400, loss:0.00046, accr:1.00000
epoch:1500, loss:0.00045, accr:1.00000
epoch:1600, loss:0.00044, accr:1.00000
epoch:1700, loss:0.00044, accr:1.00000
epoch:1800, loss:0.00043, accr:1.00000
epoch:1900, loss:0.00043, accr:1.00000

학습한 모델을 통해서 입력한 어떤 내용으로 발화한 것인지를 예측해봅니다.

test_sentence = ['잘먹을께요']
x_test = [xgenerator(words.split(' ')) for words in test_sentence]
for ndx,d in enumerate(x_test):
    x_test[ndx] = np.pad(d, (0, config.max_length), 'constant', constant_values=0)[:config.max_length]
    
with torch.no_grad():
    x_test = torch.tensor(x_test, dtype=torch.long)
    predict = model(x_test)
    print(predict)
    result = torch.argmax(predict,dim=-1).data.numpy()
    print([index2category[p] for p in result])

학습을 완료한 후 모델을 아래와 같이 저장합니다.

torch.save({
  'model': model.state_dict(), 'config':config
}, './model/model.scenario')

import pickle
def save_obj(obj, name):
    with open('./pkl/'+ name + '.pkl', 'wb') as f:
        pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL)
    
save_obj(index2category,'index2category')
save_obj(word2index,'word2index')
save_obj({'vocab_size':config.vocab_size,'input_size':config.input_size,'hidden_size':config.hidden_size,'max_length':config.max_length},'config')

django를 통해서 간단한 웹서버를 만들고 해당 모델을 활용해서 간단한 챗봇을 만들어봅니다.

웹서버의 사용자의 입력을 받아서 시나리오에서 어떤 흐름에 속하는 대화인지를 찾아내고 그 흐름에 맞으면 다음 대화를 진행하고 맞지 않을 경우 다시 발화를 할 수 있도록 유도합니다.

CNN을 활용한 비음성 구간 검출

합성곱신경망(CNN, Convolutional Neural Network)은 처음에는 이미지 분류에 많이 사용되었지만 현재는 이미지 분류 외에도 아주 다양한 분류 문제 해결에 사용되고 있습니다.

이번에는 CNN을 활용하여 음성신호를 분류하는 문제를 다뤄보겠습니다.

일단 음성신호를 몇개 만들어봅니다. 사용하는 음성신호는 wav(waveform audio file format)입니다.

위키피디아 정의에 의하면 WAV 또는 WAVE는 개인용 컴퓨터에서 오디오를 재생하는 마이크로소프트와 IBM 오디오 파일 표준 포맷이다. 가공되지 않은 오디오를 위한 윈도우 시스템의 기본 포맷이다. WAV는 비압축 오디오 포맷으로 프로그램 구동음이나 일반 수준의 녹으로 사용되지만 전문 녹음용으로는 사용되지 않는다.

구조적으로 보면 wav 파일은 PCM 파일에 Header를 붙인 것이라고 할 수 있습니다. 다시 말하면 PCM 정보를 이용하면 wav 파일을 만들수 있다는 의미가 됩니다. Header에는 Sampling Rate와 같은 정보, 채널 같은 정보가 필요합니다. wav 파일을 읽을 때에 별도의 조치가 필요하지 않은 이유는 바로 Header에 이런 정보가 포함되어 있기 때문입니다.

Sampling Rate는 아나로그인 음성 파일을 초당 몇번의 샘플링을 하는가에 대한 숫자입니다. 높으면 높을 수록 손실되는 정보가 적어지니 음질이 좋아지겠지만 저장을 위해서 더 많은 공간을 필요로 하기 때문에 목적에 따른 적절한 수준의 정의가 필요합니다. 이번 예제에서는 22,050 Sampling Rate를 사용합니다.

이 외에도 Mono, Stereo 를 표시하는 Channel과 8-bit, 16-bit, 32-bit를 정의하는 Resolution 정보가 헤더에 포함되어 있습니다. 이 헤더 정보를 자세히 살펴보면 아래와 같은 구조로 되어 있습니다.

보면 파일 구조 중에서 0-44 byte까지가 헤더 부분이고 실제 데이터는 그 이후에 추가됩니다.

wav 파일에 대한 자세한 정보는 인터넷에 공개된 다른 정보를 참고해 보시기 바랍니다.

테스트를 위한 개략적인 개요는 아래의 그림과 같습니다.

먼저 음성신호를 입력 받아서 각 신호를 1초 단위로 slicing 합니다. 그렇게 되면 60초의 음성 wav 파일이 있다면 각 파일은 1초 단위의 wav 파일로 나눠지게됩니다. 이제 각 분리한 음성파일을 들어보고 음성이면 0, 비음성이면 1로 레이블링합니다.

이 과정이 시간이 오래걸립니다. 두말할 것도 없이 이러한 데이터가 많으면 많을 수록 비음성을 찾을 확률이 커집니다. 비음성은 녹음 환경이 어디야에 따라서 많은 차이가 있습니다. 차안, 방안, 야외, 놀이터, 사무실 등에서 발생하는 비음성 데이터를 샘플링하면 더 좋은 예측 모델을 만들 수 있습니다.

1초 단위로 샘플링하면 이 데이터는 [0-22049]의 벡터로 변환할 수 있습니다. 만약 음성 길이가 60초라면 [60×22050]의 데이터가 됩니다. 이제 이 데이터를 CNN을 통해서 학습할 수 있도록 데이터의 형태를 바꿔주면 해당 데이터는 [60×1×n×m] 형태의 데이터가 됩니다. n과 m은 각각 원하시는 사이즈로 만드실 수 있습니다.

이제 해당 음성 데이터를 Google Drive에 업로드하고 Google Colab(Pro)를 사용해서 모델을 학습해보겠습니다. 아래의 그림은 테스트에 사용되는 Colab GPU 정보입니다.

gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Select the Runtime > "Change runtime type" menu to enable a GPU accelerator, ')
  print('and then re-execute this cell.')
else:
  print(gpu_info)
def load_wav2():
    file_list = sorted(glob.glob("gdrive/.../*sound/*.wav"))
    file_class = []

    for f in file_list:
      if f[40:47]=='av/nois':
          file_class.append([1,0])
      else:
          file_class.append([0,1])
  
    return file_list, file_class

import time
def time_per_epoch(st, et):
  elt = et - st
  elaps_min = int(elt/60)
  elaps_sec = int(elt - (elaps_min*60))
  
  return elaps_min, elaps_sec

# noise 0, norm 1
wav_list, wav_class = load_wav2()

음성 파일이 있는 위치에서 학습 파일 경로를 읽어옵니다. 학습 파일을 읽은 후에 노이즈(비음성)일 경우 0, 음성일 경우 1로 표시합니다. 즉 class는 2가됩니다. 이 말은 예측의 결과 값도 결국 0 or 1의 형태를 가진다는 말이됩니다. 그런 다음 1 epoch에 걸리는 시간을 기록하기 위해서 time_per_epoch() 이라는 함수를 선언해줍니다.

파일 리스트와 레이블 정보를 출력해보면 아래의 정보와 같습니다.

for w,c in zip(wav_list[0:10], wav_class[0:10]):
  print(w,c)

#gdrive/.../noise_sound/basic_10_0.wav [1, 0]
#gdrive/.../noise_sound/basic_10_0_cp.wav [1, 0]
#gdrive/.../noise_sound/basic_10_1.wav [1, 0]
#gdrive/.../noise_sound/basic_10_1_cp.wav [1, 0]
#gdrive/.../noise_sound/basic_10_21.wav [1, 0]
...

이제 음성 파일을 읽어서 벡터로 변환합니다.
각 음성 파일은 librosa 패키지 안에 있는 load() 함수를 사용해서 아나로그 신호를 벡터 정보로 변환합니다. Sampling Rate가 22050로 설정했기 때문에 벡터의 길이는 [0-22049]가 됩니다. 전체 데이터를 읽어서 메모리에 저장하는데 많은 시간이 필요합니다.

예제에서는 전체 파일이 아닌 일부 데이터만 읽어 오기 때문에 한번에 읽을 수 있지만 파일이 큰 경우는 메모리가 제한되어 있기 때문에 한번에 데이터를 읽을 수 없습니다. 그럴 경우는 batch_size를 사전에 정의하고 파이토치의 데이터로더(Dataloader)와 같은 도구를 사용해서 파일을 부분적으로 읽어서 학습을 수행할 수 있습니다. 데이터로더 예제는 블로그의 다른 글에 있으니 참고하시기 바랍니다.

wav_data = []
start_time = time.time()

for idx, wav in enumerate(wav_list):
  y, sr = librosa.load(wav)
  wav_data.append(y)

end_time = time.time()
lap_mins, lap_secs = time_per_epoch(start_time, end_time)
print(f"Lap Time: {lap_mins} mins, {lap_secs} secs")

데이터를 읽은 후에 학습용 데이터셋과 테스트용 데이터셋으로 분리[8:2]로 분리합니다.

xx = torch.tensor(x_data, dtype=torch.float, device=device)
yy = torch.tensor(y_data, dtype=torch.long, device=device)

train_cnt = int(xx.size(0) * config.train_rate)
valid_cnt = xx.size(0) - train_cnt

# Shuffle dataset to split into train/valid set.
indices = torch.randperm(xx.size(0)).to(device) # random 순열 리턴
x = torch.index_select(xx, dim=0, index=indices).to(device).split([train_cnt, valid_cnt], dim=0)
y = torch.index_select(yy, dim=0, index=indices).to(device).split([train_cnt, valid_cnt], dim=0)

이제 분석을 위한 CNN 학습 모듈을 생성합니다. 학습 모듈은 Conv2d() → ReLU() → BatchNorm2d() → Linear() → Softmax()로 구성되어 있습니다. 최종 출력값은 0 or 1의 형태를 가지게 됩니다.

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        
        self.convs = nn.Sequential(
            nn.Conv2d(1, 10, kernel_size=3), # input_channel, output_channel, kernel_size
            nn.ReLU(),
            nn.BatchNorm2d(10),
            nn.Conv2d(10, 20, kernel_size=3, stride=2),
            nn.ReLU(),
            nn.BatchNorm2d(20),
            nn.Conv2d(20, 40, kernel_size=3, stride=2),
            nn.ReLU(),
            nn.BatchNorm2d(40),
            nn.Conv2d(40, 80, kernel_size=3, stride=2)
        )
        
        self.layers = nn.Sequential(
            nn.Linear(80*17*17, 500),
            nn.ReLU(),
            nn.BatchNorm1d(500),
            nn.Linear(500,250),
            nn.Linear(250,100),
            nn.ReLU(),
            nn.BatchNorm1d(100),
            nn.Linear(100,50),
            nn.Linear(50, 10),
            nn.ReLU(),
            nn.BatchNorm1d(10),
            nn.Linear(10, 2),
            nn.Softmax(dim=-1)
        )

    def forward(self, x):
        x = self.convs(x)
        x = x.view(-1, 80*17*17)
        return self.layers(x)
    
model = Net()
model.to(device)

이제 모델을 아래와 같이 학습합니다.

optimizer = optim.Adam(model.parameters(), lr=config.lr)
criterion = nn.CrossEntropyLoss()

hist_loss = []
hist_accr = []

model.train()
for epoch in range(config.number_of_epochs):
    output = model(x[0]) # Train 
    loss = criterion(output, torch.argmax(y[0], dim=1)) # Train
    
    predict = torch.argmax(output, dim=-1) == torch.argmax(y[0], dim=-1)
    accuracy = predict.sum().item()/x[0].size(0)
    
    
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    hist_loss.append(loss)
    hist_accr.append(accuracy)
    
    if epoch % 5 == 0:
        print('epoch{}, loss : {:.5f}, accuracy : {:.5f}'.format(epoch, loss.item(), accuracy))

plt.plot(hist_loss)
plt.plot(hist_accr)
plt.legend(['Loss','Accuracy'])
plt.title('Loss/Legend')
plt.xlabel('Epoch')
plt.show()

학습이 완료된 후 아래와 같이 Loss/Accuracy를 표시해봅니다.

테스트 데이터를 통해서 해당 모델의 정확률을 테스트해보니 0.94 값을 얻었습니다.

with torch.no_grad():
  output = model(x[1])

  predict = torch.argmax(output, dim=1)
  accur = (predict == torch.argmax(y[1], dim=1)).to('cpu').numpy()
  print(accur.sum()/len(accur)) # 0.94

CNN을 활용한 비속어 필터링

CNN을 활용해서 텍스트를 분류하는 예제는 이전에서 살펴봤습니다.
해당 예제는 영화평점 텍스트를 학습하고 그 평가가 긍정인지 부정인지를 판단하는 문제를 CNN을 통해서 예측하는 내용이었습니다.
내용이 궁금하신 분은 아래의 예제를 살펴보시기 바랍니다.

이번에도 CNN을 활용한 예제로 비속어를 필터링하는 내용입니다. 본 예제에서는 학습을 통해서 생성된 모델을 python에서 제공하는 웹 어플리케이션 제작 프레임워크인 django를 통해서 간단한 웹서버를 구현해보겠습니다.

먼저 아래의 그림은 이미 많은 연구 문서나 블로그에서 보셨을듯한 이미지로 CNN을 활용한 텍스트 분류의 대표적인 이미지입니다. 해당 이미지를 간단히 설면하면 아래의 문장 “wait for the video and don’t rent it” 문장을 어절단위로 분리하여 Embedding(n, 5) 레이어를 통과시키면 아래의 문장은 “문장어절 × 5” 형태의 값을 가지게 됩니다. 이것이 첫번째 이미지입니다. 이것을 CNN에서 처리하는 벡터 shape으로 만들기 위해서는 앞에 Channel 값을 입력하게 됩니다.

이렇게 되면 예시 문장은 하나의 이미지 데이터의 모양(Shape)을 가지게 됩니다. 그러나 이런 문장이 하나만 존재하지는 않고 여러개 존재합니다. 그렇게 되면 맨 앞에는 n_batch 정보를 입력 할 수 있습니다.

예를 들어서 100개의 문장이라면 [100 × 1 × 9 × 5] 형태의 shape이 되는 것이죠. 이는 CNN을 통해서 이미지를 분류한다고 생각할 때에 9×5의 1채널 이미지 100장에 해당됩니다.

이제 이것을 Conv2d 레이어를 통과시키고 나온 output 데이터를 Fully Connected 한 후에 Linear 레이어를 통과시키고 이 값을 Softmax로 시켜 최종 출력값을 얻습니다.

이 과정은 이미지를 분류하는 과정과 굉장히 유사합니다. 다만 앞부분에서 “어떻게 텍스트를 처리해서 매트릭스를 만드는가?” 하는 과정만 차이가 있습니다. 위의 논문에서는 어절 단위로 분리해서 처리했고 또 다른 논문에서는 한글 자소단위로 분리하여 필터를 적용하는 연구도 있습니다.

본 예제에서는 문장을 단어 단위로 분리하되 각 단어를 n-gram 하여 2글자씩 분리해서 워드 벡터를 생성하였습니다. 학습에 사용할 비속어 리스트는 아래와 같습니다.

txt 컬럼은 비속어 정보를 label 정보는 1의 경우는 비속어, 0은 일반 단어로 표시합니다. 각 텍스트는 2글자씩 분리하여 워드 벡터를 만든다고 했는데 해당 과정을 수행하면 “야해요야동” 문장의 경우”야해, 해요, 요야, 야동, 야해요야동” 이런 방법으로 구성됩니다. 비속어가 1글자 인경우도 상당히 많기 때문에 1글자의 경우는 한글자만 사용하는 것으로 했습니다. 2글자씩 분리한 다음 마지막에는 원문도 포함해서 훈련용 데이터셋을 생성합니다.

# n-gram
def textgram(text):
    tmp = []
    if len(text) > 1:
        for i in range(len(text)-1):
            tmp.append(text[i]+text[i+1])
        tmp.append(text)
    return tmp

textgram('야해요야동')
# output ['야해', '해요', '요야', '야동', '야해요야동']

아래 데이터는 테스트에 사용했던 도메인의 일반 텍스트입니다. 당연한 말이지만 도메인이 넓은 경우 보다는 한정된 범위로 축소하는 것이 더 좋은 예측 결과를 보입니다. 본 예제의 경우는 챗봇에 사용하는 일상 대화들로부터 데이터를 수집하였습니다. 아래의 데이터셋 역시 2글자로 분리합니다. label이 0인 것은 정상 단어라는 의미입니다.

생성된 단어의 리스트들을 통해서 vocab을 만듭니다. 이때 중복 제거는 필수입니다.
만들어진 vocab에 padding, unk 값을 추가합니다. 그 이유는 각 단어가 기준 크기 보다 작은 경우 빈 값을 패딩값으로 채우기 위함입니다. unk의 경우는 vocab에 존재하지 않는 단어가 나올 경우 해당 위치를 채워주는 코드입니다.

vocab = list(set([w for word in words for w in word]))
vocab = np.insert(vocab,0,'!') # padding
vocab = np.insert(vocab,0,'#') # unk

x_data = [[word2index[w] for w in word] for word in words]

이제 각 워드를 인덱스로 바꾸는 과정을 완료하면 아래와 같은 데이터셋을 얻을 수 있습니다. 아래의 데이터셋은 보시는 것처럼 그 크기가 각각 달라서 일정한 값으로 Shape을 맞출 필요가 있습니다.

print(x_data[0:10])

[[2179, 2693, 4402, 2776, 3215],
 [964, 1927, 2027, 1767, 6721, 3171],
 [964, 1927, 1525, 5679, 3310],
 [964, 1927, 4133, 257, 2462, 1061, 554, 1941, 1753, 1666],
 [964, 1927, 5247, 1177],
 [964, 1927, 3795, 6693, 191, 5585, 3299, 5066],
 [964, 1927, 601, 4397, 2938],
 [1298, 5558, 2423, 5374, 877, 4260],
 [3911, 229, 5374, 877, 4103],
 [3241, 5603, 3173]]

고정 크기를 정해주고 해당 길이보다 작은 데이터들은 아래와 같이 사전에 정의한 padding 값으로 채워줍니다. 그렇게 되면 아래와 같은 형태의 데이터 데이터 값을 얻을 수 있습니다. 여기서 vocab에 존재하지 않는 새로운 단어가 입력되면 해당 단어는 0으로 채우게 됩니다.

[array([2179, 2693, 4402, 2776, 3215,    1,    1,    1,    1,    1]),
 array([ 964, 1927, 2027, 1767, 6721, 3171,    1,    1,    1,    1]),
 array([ 964, 1927, 1525, 5679, 3310,    1,    1,    1,    1,    1]),
 array([ 964, 1927, 4133,  257, 2462, 1061,  554, 1941, 1753, 1666]),
 array([ 964, 1927, 5247, 1177,    1,    1,    1,    1,    1,    1]),
 array([ 964, 1927, 3795, 6693,  191, 5585, 3299, 5066,    1,    1]),
 array([ 964, 1927,  601, 4397, 2938,    1,    1,    1,    1,    1]),
 array([1298, 5558, 2423, 5374,  877, 4260,    1,    1,    1,    1]),
 array([3911,  229, 5374,  877, 4103,    1,    1,    1,    1,    1]),
 array([3241, 5603, 3173,    1,    1,    1,    1,    1,    1,    1])]

입력 데이터에 대한 준비가 마무리되면 파이토치의 nn.Module 모듈을 상속 받아서 훈련용 모듈을 생성합니다. 이전에 활용한 CNN을 활용한 텍스트 분류 글에서 사용했던 CNN 모듈을 그대로 사용하기 때문에 해당 부분은 생략합니다. 다만 생성된 모듈을 출력해보면 아래와 같은 정보를 얻을 수 있습니다.

CNN(
  (embedding): Embedding(7037, 100)
  (convs): ModuleList(
    (0): Conv2d(1, 100, kernel_size=(2, 100), stride=(1, 1))
    (1): Conv2d(1, 100, kernel_size=(3, 100), stride=(1, 1))
    (2): Conv2d(1, 100, kernel_size=(4, 100), stride=(1, 1))
    (3): Conv2d(1, 100, kernel_size=(5, 100), stride=(1, 1))
  )
  (fc): Linear(in_features=400, out_features=2, bias=True)
)

해당 모듈은 Embedding 레이어, 4개의 Conv2d 레이어, Linear 레이어로 구성되어 있습니다. Linear 레이어의 최종 값은 2이고 이는 (0,1) 둘 중에 하나의 값을 출력하게됩니다.

훈련용 데이터는 DataLoader를 통해서 데이터셋을 만들고 batch_size = 1000, epoch = 1000으로 학습을 수행합니다. 테스트 환경은 구글 코랩 프로(Colab Pro) 버전을 사용합니다. 일반 Colab 버전을 사용해서 테스트 하셔도 무방합니다. 속도의 차이가 있지만 그리 큰 차이는 아닌듯 합니다.

최종 학습이 수행하고 나온 model과 word2index 파일을 저장합니다.

이제 저장된 모델을 django를 통해서 간단한 웹서버를 만들어봅니다. 참고로 해당 부분에 대한 설명은 이번 글에서는 하지 않고 Django를 통해서 웹서버를 개발하는 예제는 이후에 다른 글에서 다뤄보겠습니다.

위와 같은 형태로 간단한 입력과 출력 결과를 표시합니다. 결과에 보면 “영화관”은 비속어가 아닌데 비속어로 처리된 부분이 있습니다. 이 부분은 영화관이라는 단어가 비속어 데이터에 추가 되어 있기 때문에 표시된 부분입니다. 학습용 데이터의 중요성이 다시 한번 확인되네요.

CNN을 통해서 필터링 하면 철자에 오타가 있거나 단어의 조합인 경우 앞뒤 순서가 바뀌는 경우에도 비교적 잘 탐지 하는 것을 확인했습니다.

CNN을 활용한 텍스트 분류

CNN(Convolutional Neural Networks)은 이미지 분류에 높은 성능을 발휘하는 알고리즘이나 이 외에도 여러 분야에서도 활용되고 있습니다. 그중에 하나가 텍스트를 분류하는 문제입니다.

본 예제는 아래의 논문을 참조하고 있습니다.

Convolutional Neural Networks for Sentence Classification

We report on a series of experiments with convolutional neural networks (CNN) trained on top of pre-trained word vectors for sentence-level classification tasks. We show that a simple CNN with little hyperparameter tuning and static vectors achieves excellent results on multiple benchmarks. Learning task-specific vectors through fine-tuning offers further gains in performance. We additionally propose a simple modification to the architecture to allow for the use of both task-specific and static vectors. The CNN models discussed herein improve upon the state of the art on 4 out of 7 tasks, which include sentiment analysis and question classification.

https://arxiv.org/abs/1408.5882

합성곱신경망이라고도 불리는 CNN 알고리즘은 여러 좋은 강의가 있으니 참고하시기 바랍니다. 또 관련해서 좋은 예제들도 많이 있으니 아래 예제를 수행하시기 전에 살펴보시면 도움이 되시리라 생각합니다.
아래의 예제는 가장 유명한 예제 중에 하나인 MNIST 분류 예제입니다.

먼저 config를 정의합니다. config에는 학습에 필요한 여러가지 변수들을 미리 정의하는 부분입니다. model을 저장할 때에 함께 저장하면 학습 모델을 이해하는데 도움이 됩니다.

학습을 완료하고 저장된 모델 파일을 업로드해서 사용할 때에 해당 모델이 어떻게 학습됐는지에 대한 정보가 없을 경우나 모델을 재학습 한다거나 할 때에 config 정보가 유용하게 사용됩니다. 본 예제는 해당 알고리즘을 이해하는 정도로 활용할 예정이기 때문에 학습은 100번 정도로 제한합니다.

나머지 정의된 변수들은 예제에서 사용할 때에 설명하도록 하겠습니다.

from argparse import Namespace
config = Namespace(
    number_of_epochs=100, lr=0.001, batch_size=50, sentence_lg=30, train_ratio=0.2, embedding_dim=100, n_filters=100, n_filter_size=[2,3,4], output_dim=2
)

본 예제는 영화의 평점 데이터를 활용합니다. 해당 데이터는 네이버 영화 평점과 이에 대한 긍정,부정의 반응이 저장된 데이터입니다. 컬럼은 [id, document, label]의 구조로 되어 있습니다. 영화 평이 부정적인 경우는 label=0, 그렇지 않은 경우는 label=1으로 되어 있어 비교적 간단하게 활용할 수 있는 데이터입니다.

아래의 코드를 실행하면 데이터를 읽어 올 수 있습니다. 해당 데이터에 검색해보면 쉽게 찾을 수 있습니다. 파일인 train 데이터와 test 데이터로 되어 있습니다. 본 예제에서는 train 데이터만 사용합니다. 많은 데이터를 통해서 결과를 확인하고자 하시는 분은 train, test 모두 사용해보시길 추천합니다.

def read_data(filename):
    with open(filename, 'r',encoding='utf-8') as f:
        data = [line.split('\t') for line in f.read().splitlines()]
        data = data[1:]
    return data  
train_data = read_data("../Movie_rating_data/ratings_train.txt")

읽어온 데이터를 몇개 살펴보면 아래와 같습니다. 아래 샘플에는 Label 데이터를 표시하지 않았습니다. 하지만 읽어 보면 대충 이 리뷰를 작성한 사람이 영화를 추천하고 싶은지 그렇지 않은지를 이해할 수 있습니다. 사람의 경우에는 이러한 글을 읽고 판단 할 수 있지만 컴퓨터의 경우에는 이런 텍스트(자연어)를 바로 읽어서 긍정이나 부정을 파악하는 것은 어렵습니다. 그렇기 때문에 각 단어들을 숫자 형태의 벡터로 변환하는 작업을 수행합니다.

['많은 사람들이 이 다큐를 보고 우리나라 슬픈 현대사의 한 단면에 대해 깊이 생각하고 사죄하고 바로 잡기 위해 노력했으면 합니다. 말로만 듣던 보도연맹, 그 민간인 학살이 이정도 일 줄이야. 이건 명백한 살인입니다. 살인자들은 다 어디있나요?',
 '이틀만에 다 봤어요 재밌어요 근데 차 안에 물건 넣어 조작하려고 하면 차 안이 열려있다던지 집 안이 활짝 열려서 아무나 들어간다던가 문자를 조작하려고하면 비번이 안 걸려있고 ㅋㅋㅋ 그런 건 억지스러웠는데 그래도 내용 자체는 좋았어요',
 '이 영화를 이제서야 보다니.. 감히 내 인생 최고의 영화중 하나로 꼽을 수 있을만한 작품. 어떻게 살아야할지 나를 위한 고민을 한번 더 하게 되는 시간. 그리고 모건 프리먼은 나이가 들어도 여전히 섹시하다.',
 '아~ 진짜 조금만 더 손 좀 보면 왠만한 상업 영화 못지 않게 퀄리티 쩔게 만들어 질 수 있었는데 아쉽네요 그래도 충분히 재미있었습니다 개인적으로 조금만 더 잔인하게 더 자극적으로 노출씬도 화끈하게 했더라면 어땠을까 하는 국산영화라 많이 아낀 듯 보임',
 '평점이 너무 높다. 전혀 재미있지 않았다. 쓸데없이 말만 많음. 이런 류의 영화는 조연들의 뒷받침이 중요한데 조연들의 내용자체가 전혀 없음. 또한 여배우도 별로 매력 없었다. 이틀전에 저스트고위드잇의 애니스톤을 보고 이 영화를 봐서 그런가. 실망했음',
 '왜 극을 끌어가는 중심있는 캐릭터가 있어야 하는지 알게 된영화 살인마와 대적하는 그리고 사건을 해결하는 인물이 없고 그리고 왜 마지막에 다 탈출 해놓고 나서 잡히고 죽임을 당하는지 이해할수가 없다. 대체 조달환 정유미는 왜 나옴?',
 '초딩 때 친척형이 비디오로 빌려와서 봤던 기억이 난다...너무 재미 없었다 근데 나중에 우연히 다시보니 재밌더라 그 땐 왜 그렇게 재미가 없었을까?? 98년이면 내가 초등학교 2학년 때니까...사촌형이 당시 나름 최신 비디오를 빌려온거 같다',
 '창업을 꿈꾸는가! 좋은 아이템이 있어 사업을 하려하는가!! 그렇다면 기를 쓰고 이 영활 보기바란다!! 그 멀고 험한 여정에 스승이 될것이요 지침서가 될것이다... 혹은 단념에 도움이 될지도... 참 오랜만에 박장대소하며 본 독립영활세~~~ ★',
 "영화'산업'이라고 하잖는가? 이딴식으로 홍보 해놓고 속여서 팔았다는 게 소비자 입장에서는 짜증난다. 그나마 다행은 아주 싸구려를 상급품으로 속여판 게 아니라는 점. 그래서 1점. 차라리 연상호 감독 작품 처럼 홍보가 됐다면, 그 비슷하게 만이라도 하지",
 '도입부를 제외하고는 따분.헬기에서 민간인을 마구 쏴 죽이는 미군, 베트공 여성 스나이퍼 등,현실감 없는 극단적인 설정.라이언 일병에서의 업햄 그리고 이 영화 주인공인 조커, 두 넘 모두 내가 싫어하는 캐릭터, 착한척 하면서 주위에 피해를 주는 넘들.']

각 리뷰를 읽은 후에 문장을 어절 단위로 분리합니다. 분리한 어절을 형태소까지 분리해서 활용하면 좋겠지만 본 예제에서는 간단히 어절 단위로만 분리합니다. 형태소로 분리하는 예제는 본 블로그에 다른 예제에서도 내용이 있으니 참고하시기 바랍니다. 어절 단위로 분리한 텍스트에서 중복을 제거해보면 32,435개 어절을 얻을 수 있습니다.

이렇게 얻은 32,435개의 어절을 어떻게 벡터로 나타내는가에 대해서는 pytorch의 Embedding을 사용하여 표한합니다. 해당 내용도 본 블로그의 다른 예제에서 많이 다뤘기 때문에 여기서는 생략하고 넘어가도록 하겠습니다.

words = []
for s in sentences:
    words.append(s.split(' '))
    
words = [j for i in words for j in i]
words = set(list(words))

print('vocab size:{}'.format(len(words)))
vocab_size = len(words) #vocab size:32435

리뷰의 길이를 보면 길은 것은 70 어절이 넘고 짧은 것은 1 어절도 있기 때문에 어절의 편차가 크다는 것을 확인 할 수 있습니다. 그렇기 때문에 본 예제에서는 30 어절 이상 되는 리뷰들만 사용하겠습니다. 이를 위해서 config 파일에 sentence_lg=30와 같은 값을 설정했습니다.

x_data = [[word2index[i] for i in sentence.split(' ')] for sentence in sentences]
sentence_length = np.array([len(x) for x in x_data])
max_length = np.array([len(x) for x in x_data]).max()

위의 그래프는 샘플 어절의 분포를 나타냅니다. 본 예제에서는 약 30~40 사이의 어절 정도만 사용하도록 하겠습니다. 만약 어절의 편차가 너무 크면 상당 부분을 의미 없는 데이터로 채워야 합니다. 아래의 예제는 빈 어절을 패딩값(0)으로 채우는 부분입니다.

for ndx,d in enumerate(x_data):
    x_data[ndx] = np.pad(d, (0, max_length), 'constant', constant_values=0)[:max_length]

아래와 같이 각 어절을 숫자 형태의 값으로 변환하면 리뷰의 내용은 숫자로 구성된 리스트 형태가 됩니다. 이때 0은 패딩 값으로 max_length 보다 작을 경우 남은 값을 0으로 채우게 됩니다. 0 데이터가 많을 수록 예측의 정확도가 떨어지게 됩니다.

[array([18196, 16747,  1952, 27879,  4206, 29579,  3641, 14582,  8661,
        16754,   964, 10240,  6070, 25011,  3902, 16410, 30182, 22634,
         5531, 24456,  6360,  6482, 26016,  9239, 25466, 31032,  6505,
        30782, 30861, 30494,  6876, 12237, 27035, 14997,     0,     0,
            0,     0,     0,     0,     0]),
 array([30000, 27035, 15316,  4633, 26703,  7875,  5042, 16695, 25520,
        14681, 20133,  7875,    71,  8983,   363,    71,  5149,  2391,
        27910, 28746, 23902, 32136, 12475, 24439, 15973, 20236,  4726,
         6190, 17515, 20610, 29270, 13967, 28490,     0,     0,     0,
            0,     0,     0,     0,     0]),
 array([ 1952, 12632, 10665, 27623, 25106,  1978,   184,  1537, 29451,
         4705, 22537, 21866, 14473, 26012,  6744, 15690, 27119, 15822,
        12491, 31747, 11202, 14268, 31494,  3202, 10936, 21619, 29214,
        15185,  5496, 12854, 27679,     0,     0,     0,     0,     0,
            0,     0,     0,     0,     0])]

학습을 위한 데이터를 train, test 형태로 분리하게 됩니다. 분리하면서 학습용 데이터와 테스트용 데이터의 비율을 8:2로 설정합니다. test 데이터는 학습에 사용하지 않는 데이터로 모델의 정확도 평가에만 사용됩니다.

from sklearn.model_selection import train_test_split

y_data = np.array(label).astype(np.long)
x_train, x_test, y_train, y_test = train_test_split(np.array(x_data), y_data, test_size = config.train_ratio, random_state=0) # 8:2
print(x_train.shape, y_train.shape, x_test.shape, y_test.shape)

학습용 데이터는 데이터로더에 입력하여 일정 크기(config.batch_size)로 묶어 줍니다. 예를 들어 100건의 데이터를 20개로 묶는다면 5개의 묶음으로 나타낼 수 있습니다. 지금 수행하는 예제는 비교적 적은 양의 데이터이기 때문에 이런 과정이 불필요할 수도 있지만 많은 데이터를 통해서 학습하시는 분을 위해서 해당 로직을 구현했습니다. 그리고 학습 데이터를 shuffle 해줍니다. 이 과정도 훈련의 정확도를 높이기 위해서 필요한 부분이니 True로 설정하시기 바랍니다.

from torch.utils.data import Dataset, DataLoader
class TxtDataSet(Dataset):

    def __init__(self, data, labels):
        super().__init__()
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]
train_loader = DataLoader(dataset=TxtDataSet(x_train, y_train), batch_size=config.batch_size, shuffle=True)

파이토치를 활용해서 수행하기 때문에 필요한 모듈을 임포트합니다.

import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

CNN 모델을 아래와 같이 생성합니다. 이미 설명한 내용도 있기 때문에 자세한 내용은 넘어가겠습니다. 가장 중요한 부분은 텍스트 데이터를 [number_of_batch, channel, n, m] 형태의 데이터로 만드는 과정이 중요합니다. 이렇게 데이터가 만들어지면 해당 데이터를 통해서 학습을 수행합니다.

https://halfundecided.medium.com/%EB%94%A5%EB%9F%AC%EB%8B%9D-%EB%A8%B8%EC%8B%A0%EB%9F%AC%EB%8B%9D-cnn-convolutional-neural-networks-%EC%89%BD%EA%B2%8C-%EC%9D%B4%ED%95%B4%ED%95%98%EA%B8%B0-836869f88375

CNN 알고리즘을 잘 설명하고 있는 블로그의 링크를 올립니다. 자세한 내용은 이곳 블로그도 참고해 보시기 바랍니다.

class CNN(nn.Module):

    def __init__(self, vocab_size, embedding_dim, n_filters, filter_size, output_dim):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.convs = nn.ModuleList([
            nn.Conv2d(in_channels=1, out_channels=n_filters, kernel_size=(fs, embedding_dim)) for fs in filter_size
            ])
        self.fc = nn.Linear(len(filter_size)*n_filters, output_dim)

    def forward(self, text):
        embedded = self.embedding(text)
        embedded = embedded.unsqueeze(1)
        conved = [F.relu(conv(embedded)).squeeze(3) for conv in self.convs]
        pooled = [F.max_pool1d(conv, conv.shape[2]).squeeze(2) for conv in conved]
        
        return self.fc(torch.cat(pooled, dim=1)) # make fully-connected
    
embedding_dim = config.embedding_dim
n_filters = config.n_filters
n_filter_size = config.n_filter_size
output_dim = config.output_dim # 0 or 1

model = CNN(vocab_size, embedding_dim, n_filters, n_filter_size, output_dim)
print(model)
CNN(
  (embedding): Embedding(32435, 100)
  (convs): ModuleList(
    (0): Conv2d(1, 100, kernel_size=(2, 100), stride=(1, 1))
    (1): Conv2d(1, 100, kernel_size=(3, 100), stride=(1, 1))
    (2): Conv2d(1, 100, kernel_size=(4, 100), stride=(1, 1))
  )
  (fc): Linear(in_features=300, out_features=2, bias=True)
)

아래와 같이 학습을 수행합니다. 간단히 100번 정도만 반복했습니다.

optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()

model.train()
for epoch in range(config.number_of_epochs):
    train_loss, valid_loss = 0, 0
    
    # train_batch start
    for x_i, y_i in train_loader:
        optimizer.zero_grad()
        
        output = model(x_i)
        loss = criterion(output, y_i)
        
        loss.backward()
        optimizer.step()
        
        train_loss += float(loss)
    if epoch % 5 == 0:
        print('Epoch : {}, Loss : {:.5f}'.format(epoch, train_loss/len(train_loader)))
Epoch : 0, Loss : 0.69145
Epoch : 5, Loss : 0.04431
Epoch : 10, Loss : 0.00848
Epoch : 15, Loss : 0.00352
Epoch : 20, Loss : 0.00192
Epoch : 25, Loss : 0.00120
Epoch : 30, Loss : 0.00082
Epoch : 35, Loss : 0.00059
Epoch : 40, Loss : 0.00044
Epoch : 45, Loss : 0.00034
Epoch : 50, Loss : 0.00027
Epoch : 55, Loss : 0.00022
Epoch : 60, Loss : 0.00018
Epoch : 65, Loss : 0.00015
Epoch : 70, Loss : 0.00012
Epoch : 75, Loss : 0.00010
Epoch : 80, Loss : 0.00009
Epoch : 85, Loss : 0.00008
Epoch : 90, Loss : 0.00007
Epoch : 95, Loss : 0.00006

학습을 완료하고 테스트 데이터를 통해서 모델을 평가해본 결과 68.39%의 정확도를 얻었습니다.
높은 정확도는 아니지만 많은 부분 간소화한 학습이었음을 감안하면 나름대로 유의미한 결과를 얻었다고 생각됩니다.

with torch.no_grad():
    output = model(torch.tensor(x_test, dtype=torch.long))
    predict = torch.argmax(output, dim=-1)
    predict = (predict==torch.tensor(y_test, dtype=torch.long))
    print('Accuracy!',predict.sum().item()/len(x_test)*100)
    #Accuracy! 68.39622641509435

Seq2Seq 어텐션 형태소 분석

본 블로그에 Seq2Seq 모델을 활용해서 간단한 문장을 생성한다던가 번역을 해보는 예제를 수행했습니다. 또 Seq2Seq에 Attention을 적용해서 문장생성을 테스트해 보기도 했습니다.

이번에는 Seq2Seq 어텐션을 활용해서 형태소 분석을 수행하는 예제를 문들어보겠습니다. 수행하는 방법은 이전에 수행했던 예제들과 아주 유사해서 이전 예제에서 활용했던 Word Embedding, Encoder, Decoder, RNN 모델을 그대로 사용하겠습니다.

해당 방법은 여러 연구자들에 의해서 연구되고 있습니다.
ETRI(한국전자통신연구원)에서도 해당 모델을 활용한 연구(Seq2Seq 주의집중 모델을 이용한 형태소 분석 및 품사 태깅, 2016년)를 수행했습니다.
이 외에도 포항공대에서도 “Sequence-to-sequence 기반 한국어 형태소 분석 및 품사 태깅”이라는 연구가 있었습니다.

먼저 형태소에 대한 정의는 아래와 같습니다.

형태소(形態素, 영어: morpheme)는 언어학에서 (일반적인 정의를 따르면) 일정한 의미가 있는 가장 작은 말의 단위로 발화체 내에서 따로 떼어낼 수 있는 것을 말한다. 즉, 더 분석하면 뜻이 없어지는 말의 단위이다. 음소와 마찬가지로 형태소는 추상적인 실체이며 발화에서 다양한 형태로 실현될 수 있다. [위키백과 : 형태소]

간단히 말하면 분석의 대상이 되는 문장이 입력 됐을 경우에 “일정한 의미가 있는 가장 작은 말의 단위”로 분할 하는 것이라고 할 수 있습니다.

해당 예제는 다음과 같은 방법으로 수행합니다. 먼저 보통의 짧은 문장 50개를 생성합니다. 생성한 문장을 KoNLPy 중 Okt() 태깅 클래스를 활용하여 형태소 분석을 수행합니다. 예를 들어서 [‘요즘도 많이 바쁘세요?’,’구두를 신고 싶어요.’,’운동화를 신고 싶어요.’,’엄마가 좋아요?’,’아빠가 좋아요?’]와 같은 문장 리스트가 주어졌다고 할 때에 이를 형태소 분석을 하게 되면 아래와 같은 형태로 데이터가 출력된다.

[요즘/Noun, 도/Josa, 많이/Adverb, 바쁘세요/Adjective, ?/Punctuation]
[구두/Noun, 를/Josa, 신고/Noun, 싶어요/Verb, ./Punctuation]
[운동화/Noun, 를/Josa, 신고/Noun, 싶어요/Verb, ./Punctuation]
[엄마/Noun, 가/Josa, 좋아요/Adjective, ?/Punctuation]
[아빠/Noun, 가/Josa, 좋아요/Adjective, ?/Punctuation]
https://konlpy-ko.readthedocs.io/ko/v0.4.3/

KoNLPy에 대해서 더 자세히 알아보고자 하시는 분은 위의 홈페이지에서 자료를 검색해보시기 바랍니다.

이제 입력된 원문을 Source에 입력하고 형태소 분석한 결과를 Target에 입력하는 것으로 학습 데이터를 생성하겠습니다. 이렇게 되면 Source 데이터를 Encoder에 입력하고 분석 결과를 Decoder에 입력해서 학습합니다.

먼저는 인코더에 넣을 텍스트 데이터를 숫자형태로 바꿔 주기 위한 클래스를 선언합니다. 해당 클래스는 문장의 시작<SOS, Start of Sentence>과 끝<EOS, End of Sentence>을 나태는 변수를 선언하는 것으로 시작합니다. 먼저 문장이 입력되면 음절 단위로 분리하고 음절이 존재 할 경우는 해당 어절의 카운트를 1 증가 시키고 없을 경우 dict에 음절을 추가합니다.

source_vocab은 인코딩 문장 즉, 원어절이 들어갑니다. 반면 target_vocab은 형태소 정보가 들어간 어절이 입력됩니다.

SOS_token = 0
EOS_token = 1

class Vocab:
  def __init__(self):
    self.vocab2index = {'<SOS>':SOS_token, '<EOS>':EOS_token}
    self.index2vocab = {SOS_token:'<SOS>', EOS_token:'<EOS>'}
    self.vocab_count = {}
    self.n_vocab = len(self.vocab2index)
    
  def add_vocab(self, sentence):
    for word in sentence.split(' '):
      if word not in self.vocab2index:
        self.vocab2index[word] = self.n_vocab
        self.vocab_count[word] = 1
        self.index2vocab[self.n_vocab] = word
        self.n_vocab += 1
      else:
        self.vocab_count[word] += 1

source_vocab = Vocab()
target_vocab = Vocab()

전체적인 흐름은 이전에 테스트했던 내용과 비슷하기 때문에 자세한 설명은 생략하고 변경된 내용만 정리합니다. 인코더는 131×5의 lookup 테이블에 맵핑됩니다. 즉, GRU에 131개의 input_size를 보내지 않고 5개의 값만을 사용한다는 의미입니다. GRU 셀(Cell)을 보면 설명드린대로 입력과 출력이 동일하게 정의했고 4개의 multi-layer로 구성했습니다. batch_first를 True로 설정했습니다.

Encoder(
  (embedding): Embedding(131, 5)
  (gru): GRU(5, 5, num_layers=4, batch_first=True)
)

디코더는 Attention 모델을 사용하여 모델을 설계합니다. 입력값 135를 받아서 5개의 입력으로 내보냅니다. 135는 target_vocab의 크기입니다. 5로 입력하는 것은 decoder가 이전 단계 encoder의 hidden_state를 입력으로 받기 때문에 encode와 동일한 사이즈로 정의해줍니다. attn Linear에서는 decoder에 입력되는 값과 이전 단계의 hidden 값을 합하여서 target의 max_length 값인 7로 정의합니다.

이것은 attention 모델에서 중요한 과정이라고 할 수 있는 attention weight(어떤 값에 집중할 것인가?)에 대한 부분을 정의하는 부분입니다. 이제 이 attention weight 값과 encoder의 output 데이터들을 곱하여 하나의 matrix를 생성합니다. 이 값을 decoder에 입력되는 값과 함께 GRU 셀에 입력 데이터로 사용합니다.

이렇게 나온 출력 값을 Linear 모델을 거쳐 target_vocab 사이즈와 동일하게 맞춰주고 출력값의 index 값을 찾아 일치되는 값을 출력합니다.

AttentionDecoder(
  (embedding): Embedding(135, 5)
  (attn): Linear(in_features=10, out_features=7, bias=True)
  (attn_combine): Linear(in_features=10, out_features=5, bias=True)
  (dropout): Dropout(p=0.1, inplace=False)
  (gru): GRU(5, 5, num_layers=4, batch_first=True)
  (out): Linear(in_features=5, out_features=135, bias=True)
)

글로 표현하는 것이 길뿐 코드로 표현하면 이전의 attention 모델과 동일합니다. attention 모델은 기본 seq2seq 모델에서 사용했던 context vector를 사용하지 않고 encoder의 각 output 결과를 사용하는 것이 차이가 있습니다.

class Test():

  def __init__(self, sentences, source_vocab, target_vocab, encoder, decoder):
    self.sentences = sentences
    self.vocab = source_vocab
    self.target_vocab = target_vocab
    self.encoder = encoder
    self.decoder = decoder

  def tensorize(self, sentence):
    idx = [self.vocab.vocab2index[word] for word in sentence.split(' ')]
    return idx

  def run(self):
    x_train = [self.tensorize(sentence) for sentence in self.sentences]
    text = []
    
    for x in x_train:
      decoded_word=[]      
      _x = torch.tensor(x, dtype=torch.long).view(-1,1)      
      encoder_hidden = self.encoder.initHidden()
      encoder_outputs = torch.zeros(config.max_length, self.encoder.hidden_size)
      
      for ei in range(_x.size(0)):
          encoder_output, encoder_hidden = self.encoder(_x[ei], encoder_hidden)
          encoder_outputs[ei] = encoder_output[0,0]

      decoder_input = torch.tensor([SOS_token], dtype=torch.long)
      decoder_hidden = encoder_hidden

      for di in range(config.max_length):
        decoder_output, decoder_hidden, decoder_attention = self.decoder(decoder_input, decoder_hidden, encoder_outputs)
        i_val,i_ndx = decoder_output.data.topk(1)
        t_word = self.target_vocab.index2vocab[i_ndx.item()]
        decoded_word.append(t_word)

        if _x.size(0) > di: 
          if _x[di] < self.target_vocab.n_vocab:
            decoder_input = _x[di] 
        else: 
          decoder_input = i_ndx.squeeze().detach()

        # 문장 마침표 break
        if t_word == './Punctuation': break
        # <EOS> break
        if i_ndx == 1: break
        
      text.append(decoded_word)
    return text

  def predict(self):
    return self.run()

이제 구축된 모델을 Test Class를 활용해서 테스트해보겠습니다.

sentences = ['많이 추워요.','김밥이 좋아요.','앞으로 오세요.','시험 공부를 해요.','책을 읽어보자.','처음 뵙겠습니다.']
test = Test(sentences, source_vocab, target_vocab, encoder, decoder)
predict = test.predict()
for ndx,(i,j) in enumerate(zip(sentences, predict)):
  print(ndx, i,j)

Seq2Seq 어텐션 문장생성

해당 모델은 이전에 테스트했던 Sequence2Sequence 모델에 Attention을 적용해본 것입니다. 이전 내용이 궁금하신 분은 아래의 게시물을 확인해보시기 바랍니다.

이전 모델에서는 Sequence2Sequence만 사용했고 영어문장을 활용했습니다. 이번에는 에턴션(Attention)을 적용하고 한글문서를 통해서 테스트해보겠습니다. 이번에도 구글 Colab의 GPU를 통해서 테스트해보겠습니다.

먼저 텍스트 데이터를 준비해보겠습니다. 텍스트 데이터는 요한복음 1-2장의 한글 텍스트를 활용했습니다. 동일한 데이터로 테스트를 해보시기 원하시면 아래의 링크에서 텍스트 데이터를 다운받으신 후에 *.txt 파일로 저장하시고 테스트해보시기 바랍니다.

http://www.holybible.or.kr/B_RHV/cgi/bibleftxt.php?VR=RHV&VL=43&CN=1&CV=99

태초에 말씀이 계시니라 이 말씀이 하나님과 함께 계셨으니 이 말씀은 곧 하나님이시니라
그가 태초에 하나님과 함께 계셨고
만물이 그로 말미암아 지은바 되었으니 지은 것이 하나도 그가 없이는 된 것이 없느니라
그 안에 생명이 있었으니 이 생명은 사람들의 빛이라
빛이 어두움에 비취되 어두움이 깨닫지 못하더라…
[테스트 데이터 일부]

학습을 위한 기본 설정은 아래와 같습니다. 구글 Colab에서 파일을 로딩하는 부분은 이전 게시물을 참조하시기 바랍니다. 아래의 config에 파일의 위치, 크기, 임베딩 사이즈 등을 정의했습니다. 학습은 배치 사이즈를 100으로 해서 epochs 1,000번 수행했습니다.

from argparse import Namespace
config = Namespace(
    train_file='gdrive/MyDrive/*/gospel_john.txt', 
    seq_size=14, batch_size=100, sample=30, dropout=0.1, max_length=14,
    enc_hidden_size=10, number_of_epochs=1000
)

생성한 텍스트 파일을 읽어서 train_data에 저장합니다. 저장된 데이터는 john_note에 배열 형태로 저장되게 되고 생성된 데이터는 note라는 배열에 어절 단위로 분리되어 저장됩니다. 형태소 분석과정은 생략하였고 음절 분리만 수행했습니다. 해당 모델을 통해서 더 많은 테스트를 해보고자 하시는 분은 음절분리 외에도 형태소 작업까지 같이 해서 테스트해보시길 추천합니다. 최종 생성된 note 데이터는 [‘태초에’, ‘말씀이’, ‘계시니라’, ‘이’, ‘말씀이’, ‘하나님과’, ‘함께’, ‘계셨으니’, ‘이’, ‘말씀은’,’하나님이니라’,…] 의 형태가 됩니다.

def read_data(filename):
    with io.open(filename, 'r',encoding='utf-8') as f:
        data = [line for line in f.read().splitlines()]
    return data 

train_data = read_data(config.train_file)

john_note = np.array(df['john'])
note = [n for note in john_note for n in note.split()]

note에 저장된 형태는 자연어로 이를 숫자로 변환할 필요가 있습니다. 이는 자연어 자체를 컴퓨터가 인식할 수 없기 때문입니다. 그렇기 때문에 각 단어들을 숫자화 할 필요가 있습니다. 일예로 ‘태초에’ -> 0, ‘말씀이’->1 이런 방법으로 만드는 과정이 필요합니다.

그리고 그에 앞서서 중복된 단어들은 삭제할 필요가 있습니다. ‘이’라는 단어가 여러번 나오지만 나올 때마다 벡터화 한다면 벡터의 사이즈가 증가하게 되고 이로 인한 계산량이 증가하기 때문입니다. 단, 형태소 분석을 통해 보면 ‘이’라는 단어가 각기 다른 의미를 가질 수는 있지만 이번 테스트에서는 동일한 데이터로 인식해서 초기화 겹치지 않도록 하겠습니다.

최종 생성할 데이터는 단어-숫자, 숫자-단어 형태를 가지는 python dict 입니다. 해당 dict를 생성하는 방법은 아래와 같습니다.

word_count = Counter(note)
sorted_vocab = sorted(word_count, key=word_count.get, reverse=True)
int_to_vocab = {k:w for k,w in enumerate(sorted_vocab)}
vocab_to_int = {w:k for k,w in int_to_vocab.items()}
n_vocab = len(int_to_vocab)

최종적으로 생성되는 단어는 셋은 Vocabulary size = 598 입니다. 생성되는 데이터 샘플(단어-숫자)은 아래와 같습니다.

{0: '이', 1: '곧', 2: '그', 3: '가로되', 4: '나는', 5: '그가', 6: '말미암아', 7: '것이', 8: '사람이', 9: '대하여', 10: '요한이' ... }

학습에 사용되는 문장은 각각 단어의 인덱스 값으로 치환된 데이터(int_text)를 사용하게 됩니다. 이를 생성하는 과정은 아래와 같습니다.

int_text = [vocab_to_int[w] for w in note]

생성된 전체 문장에서 입력 데이터와 정답 데이터를 나눕니다. 이 과정은 이전에 업로드 했던 게시물에 설명했으니 넘어가도록 하겠습니다.

source_words = []
target_words = []
for i in range(len(int_text)):
    ss_idx, se_idx, ts_idx, te_idx = i, (config.seq_size+i), i+1, (config.seq_size+i)+1
    #print('{}:{}-{}:{}'.format(ss_idx,se_idx,ts_idx,te_idx))
    if len(int_text[ts_idx:te_idx]) >= config.seq_size:
        source_words.append(int_text[ss_idx:se_idx])
        target_words.append(int_text[ts_idx:te_idx])

생성된 입력 데이터와 정답 데이터를 10개 출력해보면 아래와 같은 행태가 됩니다. 입력/정답 데이터의 길이를 늘려주면 이전의 Sequence2Sequence 모델에서는 학습이 제대로 일어나지 않았습니다. 그 이유는 Encoding 모델에서 최종 생성되는 Context Vector가 짧은 문장의 경우에는 지장이 없겠지만 긴 문장의 정보를 축약해서 담기에는 다소 무리가 있기 때문입니다. 이러한 문제를 해결하기 위해서 나온 모델이 바로 Attention 모델입니다.

for s,t in zip(source_words[0:10], target_words[0:10]):
    print('source {} -> target {}'.format(s,t))

source [21, 14, 57, 0, 14, 22, 23, 58, 0, 59, 1, 60, 5, 21] -> target [14, 57, 0, 14, 22, 23, 58, 0, 59, 1, 60, 5, 21, 22] source [14, 57, 0, 14, 22, 23, 58, 0, 59, 1, 60, 5, 21, 22] -> target [57, 0, 14, 22, 23, 58, 0, 59, 1, 60, 5, 21, 22, 23] source [57, 0, 14, 22, 23, 58, 0, 59, 1, 60, 5, 21, 22, 23] -> target [0, 14, 22, 23, 58, 0, 59, 1, 60, 5, 21, 22, 23, 61] source [0, 14, 22, 23, 58, 0, 59, 1, 60, 5, 21, 22, 23, 61] -> target [14, 22, 23, 58, 0, 59, 1, 60, 5, 21, 22, 23, 61, 62] source [14, 22, 23, 58, 0, 59, 1, 60, 5, 21, 22, 23, 61, 62] -> target [22, 23, 58, 0, 59, 1, 60, 5, 21, 22, 23, 61, 62, 24] source [22, 23, 58, 0, 59, 1, 60, 5, 21, 22, 23, 61, 62, 24] -> target [23, 58, 0, 59, 1, 60, 5, 21, 22, 23, 61, 62, 24, 6] source [23, 58, 0, 59, 1, 60, 5, 21, 22, 23, 61, 62, 24, 6] -> target [58, 0, 59, 1, 60, 5, 21, 22, 23, 61, 62, 24, 6, 25] source [58, 0, 59, 1, 60, 5, 21, 22, 23, 61, 62, 24, 6, 25] -> target [0, 59, 1, 60, 5, 21, 22, 23, 61, 62, 24, 6, 25, 63] source [0, 59, 1, 60, 5, 21, 22, 23, 61, 62, 24, 6, 25, 63] -> target [59, 1, 60, 5, 21, 22, 23, 61, 62, 24, 6, 25, 63, 64] source [59, 1, 60, 5, 21, 22, 23, 61, 62, 24, 6, 25, 63, 64] -> target [1, 60, 5, 21, 22, 23, 61, 62, 24, 6, 25, 63, 64, 7]

파이토치 라이브러리를 아래와 같이 임포트합니다.

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

학습 모델을 아래와 같이 생성합니다. Encoder 부분은 이전에 생성했던 모델과 다르지 않습니다.

class Encoder(nn.Module):
    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(input_size, hidden_size)
        self.gru = nn.GRU(hidden_size, hidden_size)
        
    def forward(self, x, hidden):
        x = self.embedding(x).view(1,1,-1)
        x, hidden = self.gru(x, hidden)
        return x, hidden
    
    def initHidden(self):
        return torch.zeros(1,1,self.hidden_size, device=device )

가장 중요한 AttndDecoder 모델 부분입니다. 핵심은 이전 단계의 Hidden 값을 이용하는 것에 추가로 Encoder에서 생성된 모든 Output 데이터를 Decoder의 입력 데이터로 활용한다는 것입니다. 인코더에서 셀이 10개라면 10개의 히든 데이터가 나온다는 의미이고 이 히든 값 모두를 어텐션 모델에서 활용한다는 것입니다.

아래 그림은 파이토치 공식 홈페이지에 있는 Attention Decoder에 대한 Diagram입니다. 이 그림에서와 같이 AttentionDecoder에 들어가는 입력은 prev_hidden, input, encoder_outputs 3가지입니다.

https://tutorials.pytorch.kr/intermediate/seq2seq_translation_tutorial.html

이 모델은 복잡해 보이지만 크게 3가지 부분으로 나눠볼 수 있습니다. 첫번째는 이전 단계의 히든 값과 현재 단계의 입력 값을 통해서 attention_weight를 구하는 부분입니다. 이 부분이 가장 중요합니다. 두번째는 인코더의 각 셀에서 나온 출력값과 attention_wieght를 곱해줍니다. 세번째는 이렇게 나온 값과 신규 입력값을 곱해줍니다. 이때 나온 값이 이전 단계의 히든 값과 함께 입력되기 GRU(RNN의 한 종류)에 입력되기 때문에 최종 Shape은 [[[…]]] 형태의 값이 됩니다.

class AttnDecoder(nn.Module):
    def __init__(self, hidden_size, output_size, dropout=config.dropout, max_length=config.max_length):
        super().__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.dropout = dropout
        self.max_length = max_length
        
        self.embedding = nn.Embedding(self.output_size, self.hidden_size)
        self.attn = nn.Linear(self.hidden_size*2, self.max_length)
        self.attn_combine = nn.Linear(self.hidden_size*2, self.hidden_size)
        self.dropout = nn.Dropout(self.dropout)
        self.gru = nn.GRU(self.hidden_size, self.hidden_size)
        self.out = nn.Linear(self.hidden_size, self.output_size)
        
    def forward(self, input, hidden, encoder_outputs):
        embedded = self.embedding(input).view(1,1,-1)
        embedded = self.dropout(embedded)
        # Step1  Attention Weight 생성
        attn_weights = F.softmax(self.attn(torch.cat((embedded[0], hidden[0]), 1)), dim=1)
       # Step2 생성된 Attention Weight와 인코더에서 생성한 모든 Output 데이터를 합친 후 RNN에 맞도록 [[[...]]] 형태로 shape 변경
        attn_applied = torch.bmm(attn_weights.unsqueeze(0), encoder_outputs.unsqueeze(0))
        #Step3 입력값과 attn_applied를 dim=1로 합침
        output = torch.cat((embedded[0], attn_applied[0]),1)
        output = self.attn_combine(output).unsqueeze(0)
        #Step4 output => [[[...]]] 형태의 값으로 reshape된 output과 이전단계 입력값을 gru cell에 입력
        output = F.relu(output)
        output, hidden = self.gru(output, hidden)
        
        output = F.log_softmax(self.out(output[0]), dim=1)
        return output, hidden, attn_weights
    
    def initHidden(self):
        return torch.zeros(1,1,self.hidden_size, device=device)        
        

Colab의 GPU를 사용하기 위해서 device 정보를 설정합니다.

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

학습을 위해 인코더, 디코더를 정의해줍니다. 최적화를 위해서 Adam Gradient Descent 알고리즘을 사용합니다. Gradient Descent 알고리즘은 여러 종류가 있습니다. 이에 대한 정보를 알고 싶으신 분을 위해서 잘 정리된 링크를 첨부하겠습니다.

http://shuuki4.github.io/deep%20learning/2016/05/20/Gradient-Descent-Algorithm-Overview.html

enc_hidden_size = config.enc_hidden_size
dec_hidden_size = enc_hidden_size

encoder = Encoder(n_vocab, enc_hidden_size).to(device)
decoder = AttnDecoder(dec_hidden_size,n_vocab).to(device)

encoder_optimizer = optim.Adam(encoder.parameters(), lr=0.001)
decoder_optimizer = optim.Adam(decoder.parameters(), lr=0.001)

criterion = nn.NLLLoss()

print(encoder)
print(decoder)

인코더와 디코더 정보를 출력해봅니다.

Encoder(
  (embedding): Embedding(258, 10)
  (gru): GRU(10, 10)
)
AttnDecoder(
  (embedding): Embedding(258, 10)
  (attn): Linear(in_features=20, out_features=14, bias=True)
  (attn_combine): Linear(in_features=20, out_features=10, bias=True)
  (dropout): Dropout(p=0.1, inplace=False)
  (gru): GRU(10, 10)
  (out): Linear(in_features=10, out_features=258, bias=True)
)

입력 데이터는 100개씩 batch 형태로 학습합니다. 학습에 Batch를 적용하는 이유는 이전 블로그에서 설명한 바가 있지만 다시 간략히 설명하겠습니다.

학습 데이터 전체를 한번에 학습하지 않고 일정 갯수의 묶음으로 수행하는 이유는 첫번째는 적은 양의 메모리를 사용하기 위함이며 또 하나는 모델의 학습효과를 높이기 위함입니다. 첫번째 이유는 쉽게 이해할 수 있지만 두번째 이유는 이와 같습니다.

예를 들어서 한 학생이 시험문제를 100개를 풀어 보는데… 100개의 문제를 한번에 모두 풀고 한번에 채점하는 것보다는 100개의 문제를 20개를 먼저 풀어보고 채점하고 틀린 문제를 확인한 후에 20개를 풀면 처음에 틀렸던 문제를 다시 틀리지 않을 수 있을 겁니다. 이런 방법으로 남은 문제를 풀어 본다면 처음 보다는 틀릴 확률이 줄어든다고 할 수 있습니다. 이와 같은 이유로 배치 작업을 수행합니다.

비슷한 개념이지만 Epoch의 경우는 20개씩 100문제를 풀어 본 후에 다시 100문제를 풀어보는 횟수입니다. 100문제를 1번 푸는 것보다는 2,3번 풀어보면 좀 더 학습 효과가 높아지겠죠~

pairs = list(zip(source_words, target_words))
def getBatch(pairs, batch_size):
    pairs_length = len(pairs)
    for ndx in range(0, pairs_length, batch_size):
        #print(ndx, min(ndx+batch_size, pairs_length))
        yield pairs[ndx:min(ndx+batch_size, pairs_length)]

이제 해당 학습을 위에 설명한대로 Batch와 Epoch을 사용해서 학습을 수행합니다. 본 예제에서는 100개씩 묶음으로 1,000번 학습을 수행합니다.
(좋은 개발환경을 가지신 분은 더 많은 학습을 해보시길 추천합니다.)

epochs = config.number_of_epochs
print(epochs)

encoder.train()
decoder.train()

for epoch in range(epochs):
    total_loss = 0
    
    for pair in getBatch(pairs,config.batch_size):
        batch_loss = 0
        
        for si, ti in pair:
            x = torch.tensor(si, dtype=torch.long).to(device)
            y = torch.tensor(ti, dtype=torch.long).to(device)
            #print(x.size(), y.size())
            encoder_hidden = encoder.initHidden()
            encoder_outputs = torch.zeros(config.max_length, encoder.hidden_size, device=device)
            
            for ei in range(config.seq_size):
                #print(x[ei].size())
                encoder_output, encoder_hidden = encoder(x[ei], encoder_hidden)
                encoder_outputs[ei] = encoder_output[0,0] # 마지막 input_length
                
            decoder_input = torch.tensor([0], device=device)
            decoder_hidden = encoder_hidden
            loss = 0
            
            for di in range(config.seq_size):
                #print(y[di])
                decoder_output, decoder_hidden, decoder_attention = decoder(decoder_input, decoder_hidden, encoder_outputs)
                loss += criterion(decoder_output, y[di].view(1))
                #print(decoder_output.size(), y[di].view(1).size())
                decoder_input = y[di] # Force Teaching
            
            batch_loss += loss.item()/config.seq_size
            encoder_optimizer.zero_grad()
            decoder_optimizer.zero_grad()
            loss.backward()
            encoder_optimizer.step()
            decoder_optimizer.step()
            
        total_loss += batch_loss/config.batch_size
        #print('batch_loss {:.5f}'.format(batch_loss/config.batch_size))
    print('epoch {}, loss {:.10f}'.format(epoch, total_loss/(len(pairs)//config.batch_size)))
    

학습이 종료되고 아래와 같이 2개의 단어를 주고 14개의 단어로 구성된 문장을 생성해봅니다.

decode_word = []
words = [vocab_to_int['태초에'], vocab_to_int['말씀이']]
x = torch.tensor(words, dtype=torch.long).view(-1,1).to(device)

encoder_hidden = encoder.initHidden()
encoder_outputs = torch.zeros(config.max_length, encoder.hidden_size, device=device)

for ei in range(x.size(0)):
  encoder_output, encoder_hidden = encoder(x[ei], encoder_hidden)
  encoder_outputs[ei] = encoder_output[0,0]

decoder_input = torch.tensor([0], device=device)
decoder_hidden = encoder_hidden

for di in range(config.seq_size):
    decoder_output, decoder_hidden, decoder_attention = decoder(decoder_input, decoder_hidden, encoder_outputs)
    _, ndx = decoder_output.data.topk(1)
    decode_word.append(int_to_vocab[ndx.item()])

print(decode_word)

학습이 완료되면 아래와 같이 모델, 환경설정 정보, 텍스트 데이터 등을 저장해서 다음 예측 모델에서 활용합니다.

torch.save({
  'encoder': encoder.state_dict(), 'decoder':decoder.state_dict(), 'config':config
}, 'gdrive/***/model.john.210202')
import pickle

def save_obj(obj, name):
  with open('gdrive/***/'+ name + '.pkl', 'wb') as f:
    pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL)

save_obj(int_text,'int_text')
save_obj(int_to_vocab,'int_to_vocab')
save_obj(vocab_to_int,'vocab_to_int')

Seq2Seq 문장생성

Sequence2Sequence 모델을 활용해서 문장생성을 수행하는 테스트를 해보겠습니다. 테스트 환경은 Google Colab의 GPU를 활용합니다.

Google Drive에 업로드되어 있는 text 파일을 읽기 위해서 필요한 라이브러리를 임포트합니다. 해당 파일을 실행시키면 아래와 같은 이미지가 표시됩니다.

해당 링크를 클릭하고 들어가면 코드 값이 나오는데 코드값을 복사해서 입력하면 구글 드라이브가 마운트 되고 구글 드라이브에 저장된 파일들을 사용할 수 있게됩니다.

from google.colab import drive
drive.mount('/content/gdrive')

정상적으로 마운트 되면 “Mounted at /content/gdrive”와 같은 텍스트가 표시됩니다.

마운트 작업이 끝나면 필요한 라이브러리 들을 임포트합니다. 파이토치(PyTorch)를 사용하기 때문에 학습에 필요한 라이브러리 들을 임포트하고 기타 numpy, pandas도 함께 임포트합니다.

config 파일에는 학습에 필요한 몇가지 파라메터가 정의되어 있습니다. 학습이 완료된 후 모델을 저장하고 다시 불러올 때에 config 데이터가 저장되어 있으면 학습된 모델의 정보를 확인할 수 있어 편리합니다.

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import numpy as np
import pandas as pd
import os
from argparse import Namespace

from collections import Counter

config = Namespace(
    train_file='gdrive/***/book_of_genesis.txt', seq_size=7, batch_size=100...
)

이제 학습을 위한 파일을 읽어오겠습니다. 파일은 성경 “창세기 1장”을 학습 데이터로 활용합니다. 테스트 파일은 영문 버전을 활용합니다. 파일을 읽은 후에 공백으로 분리해서 배열에 담으면 아래와 같은 형태의 값을 가지게됩니다.

with open(config.train_file, 'r', encoding='utf-8') as f:
    text = f.read()
text = text.split()
['In', 'the', 'beginning,', 'God', 'created', 'the', 'heavens', 'and', 'the', 'earth.', 'The', 'earth', 'was', 'without', 'form', 'and', 'void,', 'and', 'darkness', 'was'...

이제 학습을 위해 중복 단어를 제거하고 word2index, index2word 형태의 데이터셋을 생성합니다. 이렇게 만들어진 데이텃셋을 통해서 각 문장을 어절 단위로 분리하고 각 배열의 인덱스 값을 맵핑해서 문장을 숫자 형태의 값을 가진 데이터로 변경해줍니다. 이 과정은 자연어를 이해하지 못하는 컴퓨터가 어떠한 작업을 수행할 수 있도록 수치 형태의 데이터로 변경하는 과정입니다.

word_counts = Counter(text)
sorted_vocab = sorted(word_counts, key=word_counts.get, reverse=True)
int_to_vocab = {k: w for k, w in enumerate(sorted_vocab)}
vocab_to_int = {w: k for k, w in int_to_vocab.items()}
n_vocab = len(int_to_vocab)

print('Vocabulary size', n_vocab)

int_text = [vocab_to_int[w] for w in text] # 전체 텍스트를 index로 변경

다음은 학습을 위한 데이터를 만드는 과정입니다. 이 과정이 중요합니다. 데이터는 source_word와 target_word로 분리합니다. source_word는 [‘In’, ‘the’, ‘beginning,’, ‘God’, ‘created’, ‘the’, ‘heavens’], target_word는 [ ‘the’, ‘beginning,’, ‘God’, ‘created’, ‘the’, ‘heavens’,’and’]의 형태입니다.
즉, source_word 문장 배열 다음에 target_word가 순서대로 등장한다는 것을 모델이 학습하도록 하는 과정입니다.

여기서 문장의 크기는 7로 정했습니다. 더 큰 사이즈로 학습을 진행하면 문장을 생성할 때 더 좋은 예측을 할 수 있겠으나 계산량이 많아져서 학습 시간이 많이 필요합니다. 테스트를 통해서 적정 수준에서 값을 정해보시기 바랍니다.

source_words = []
target_words = []
for i in range(len(int_text)):
    ss_idx, se_idx, ts_idx, te_idx = i, (config.seq_size+i), i+1, (config.seq_size+i)+1
    if len(int_text[ts_idx:te_idx]) >= config.seq_size:
        source_words.append(int_text[ss_idx:se_idx])
        target_words.append(int_text[ts_idx:te_idx])

아래와 같이 어떻게 값이 들어가 있는지를 확인해보기 위해서 간단히 10개의 데이터를 출력해보겠습니다.

for s,t in zip(source_words[0:10], target_words[0:10]):
  print('source {} -> target {}'.format(s,t))
source [106, 0, 107, 3, 32, 0, 16] -> target [0, 107, 3, 32, 0, 16, 1]
source [0, 107, 3, 32, 0, 16, 1] -> target [107, 3, 32, 0, 16, 1, 0]
source [107, 3, 32, 0, 16, 1, 0] -> target [3, 32, 0, 16, 1, 0, 26]
source [3, 32, 0, 16, 1, 0, 26] -> target [32, 0, 16, 1, 0, 26, 62]
source [32, 0, 16, 1, 0, 26, 62] -> target [0, 16, 1, 0, 26, 62, 12]
source [0, 16, 1, 0, 26, 62, 12] -> target [16, 1, 0, 26, 62, 12, 4]
source [16, 1, 0, 26, 62, 12, 4] -> target [1, 0, 26, 62, 12, 4, 108]
source [1, 0, 26, 62, 12, 4, 108] -> target [0, 26, 62, 12, 4, 108, 109]
source [0, 26, 62, 12, 4, 108, 109] -> target [26, 62, 12, 4, 108, 109, 1]
source [26, 62, 12, 4, 108, 109, 1] -> target [62, 12, 4, 108, 109, 1, 110]

이제 학습을 위해서 모델을 생성합니다. 모델은 Encoder와 Decoder로 구성됩니다. 이 두 모델을 사용하는 것이 Sequence2Sequece의 전형적인 구조입니다. 해당 모델에 대해서 궁금하신 점은 pytorch 공식 사이트를 참조하시기 바랍니다. 인코더와 디코더에 대한 자세한 설명은 아래의 그림으로 대신하겠습니다. GRU 대신에 LSTM을 사용해도 무방합니다.

https://tutorials.pytorch.kr/intermediate/seq2seq_translation_tutorial.html

아래는 인코더의 구조입니다. 위의 그림에서와 같이 인코더는 두개의 값이 GRU 셀(Cell)로 들어가게 됩니다. 하나는 입력 값이 임베딩 레이어를 통해서 나오는 값과 또 하나는 이전 단계의 hidden 값입니다. 최종 출력은 입력을 통해서 예측된 값인 output, 다음 단계에 입력으로 들어가는 hidden이 그것입니다.

기본 구조의 seq2seq 모델에서는 output 값은 사용하지 않고 이전 단계의 hidden 값을 사용합니다. 최종 hidden 값은 입력된 문장의 전체 정보를 어떤 고정된 크기의 Context Vector에 축약하고 있기 때문에 이 값을 Decoder의 입력으로 사용합니다.

참고로 이후에 테스트할 Attention 모델은 이러한 구조와는 달리 encoder의 출력 값을 사용하는 모델입니다. 이 값을 통해서 어디에 집중할지를 정하게 됩니다.

class Encoder(nn.Module):

    def __init__(self, input_size, hidden_size):
        super().__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(input_size, hidden_size) #199->10
        self.gru = nn.GRU(hidden_size, hidden_size) #20-20

    def forward(self, x, hidden):
        x = self.embedding(x).view(1,1,-1)
        #print('Encoder forward embedding size {}'.format(x.size()))
        x, hidden = self.gru(x, hidden)
        return x, hidden

이제 아래의 그림과 같이 Decoder를 설계합니다. Decoder 역시 GRU 셀(Cell)을 가지고 있습니다.

https://tutorials.pytorch.kr/intermediate/seq2seq_translation_tutorial.html
class Decoder(nn.Module):
    def __init__(self, hidden_size, output_size):
        super().__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(output_size, hidden_size) #199->10
        self.gru = nn.GRU(hidden_size, hidden_size) #10->10
        self.out = nn.Linear(hidden_size, output_size) #10->199
        self.softmax = nn.LogSoftmax(dim=1)
        
    def forward(self, x, hidden):
        x = self.embedding(x).view(1,1,-1)
        x, hidden = self.gru(x, hidden)
        x = self.softmax(self.out(x[0]))
        return x, hidden

이제 GPU를 사용하기 위해서 설정을 수행합니다. Google Colab을 활용하시면 별도의 설정작업 없이 GPU를 사용할 수 있습니다.

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

인코더와 디코더 입출력 정보를 셋팅합니다.

enc_hidden_size = 50
dec_hidden_size = enc_hidden_size
encoder = Encoder(n_vocab, enc_hidden_size).to(device) # source(199) -> embedding(10)
decoder = Decoder(dec_hidden_size, n_vocab).to(device) # embedding(199) -> target(199)

encoder_optimizer = optim.SGD(encoder.parameters(), lr=0.01)
decoder_optimizer = optim.SGD(decoder.parameters(), lr=0.01)

criterion = nn.NLLLoss()

해당 모델의 이미지를 아래의 그림과 같이 나타낼 수 있습니다.

그림1 Sequence2Sequence Model
Encoder(
  (embedding): Embedding(199, 50)
  (gru): GRU(50, 50)
)
Decoder(
  (embedding): Embedding(199, 50)
  (gru): GRU(50, 50)
  (out): Linear(in_features=50, out_features=199, bias=True)
  (softmax): LogSoftmax(dim=1)
)

데이터를 100개씩 나눠서 훈련 할 수 있도록 배치 모델을 작성합니다.

pairs = list(zip(source_words, target_words))
def get_batch(pairs, batch_size):
  pairs_length = len(pairs)
  for ndx in range(0, pairs_length, batch_size):
    #print(ndx, min(ndx+batch_size, pairs_length))
    yield pairs[ndx:min(ndx+batch_size, pairs_length)]

해당 모델은 500번 학습을 수행합니다. 각 batch, epoch 마다 loss 정보를 표시합니다. 표1 은 마지막 스텝의 loss와 epoch 정보입니다.

number_of_epochs = 501
for epoch in range(number_of_epochs):
    total_loss = 0
    #for pair in get_batch(pairs, config.batch_size): # batch_size 100
    for pair in get_batch(pairs, 100): # batch_size 100
      batch_loss = 0
       
      for si, ti in pair:
        x = torch.Tensor(np.array([si])).long().view(-1,1).to(device)
        y = torch.Tensor(np.array([ti])).long().view(-1,1).to(device)
        encoder_hidden = torch.zeros(1,1,enc_hidden_size).to(device)

        for j in range(config.seq_size):
            _, encoder_hidden = encoder(x[j], encoder_hidden)

        decoder_hidden = encoder_hidden
        decoder_input = torch.Tensor([[0]]).long().to(device)

        loss = 0

        for k in range(config.seq_size):
            decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
            decoder_input = y[k]
            loss += criterion(decoder_output, y[k])

        batch_loss += loss.item()/config.seq_size
        encoder_optimizer.zero_grad()
        decoder_optimizer.zero_grad()

        loss.backward()

        encoder_optimizer.step()
        decoder_optimizer.step()

      total_loss += batch_loss/config.batch_size
      print('batch_loss {:.5f}'.format(batch_loss/config.batch_size))
    print('epoch {}, loss {:.10f}'.format(epoch, total_loss/(len(pairs)//config.batch_size)))
...
batch_loss 0.00523
batch_loss 0.00766
batch_loss 0.01120
batch_loss 0.00735
batch_loss 0.01218
batch_loss 0.00873
batch_loss 0.00352
batch_loss 0.00377
epoch 500, loss 0.0085196330

표1. 마지막 batch, epoch 학습 정보

학습이 종료된 모델을 저장소에 저장합니다. 저장 할 때에 학습 정보가 저장되어 있는 config 내용도 포함하는 것이 좋습니다.

# Save best model weights.
torch.save({
  'encoder': encoder.state_dict(), 'decoder':decoder.state_dict(),
  'config': config,
}, 'gdrive/***/model.genesis.210122')

학습이 완료된 후에 해당 모델이 잘 학습되었는지 확인해보겠습니다. 학습은 “darkness was”라는 몇가지 단어를 주고 모델이 어떤 문장을 생성하는 지를 알아 보는 방식으로 수행합니다.

decoded_words = []

words = [vocab_to_int['darkness'], vocab_to_int['was']]
x = torch.Tensor(words).long().view(-1,1).to(device)

encoder_hidden = torch.zeros(1,1,enc_hidden_size).to(device)

for j in range(x.size(0)):
    _, encoder_hidden = encoder(x[j], encoder_hidden)

decoder_hidden = encoder_hidden
decoder_input = torch.Tensor([[words[1]]]).long().to(device)  

for di in range(20):
  decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
  _, top_index = decoder_output.data.topk(1)
  decoded_words.append(int_to_vocab[top_index.item()])

  decoder_input = top_index.squeeze().detach()

predict_words = decoded_words    
predict_sentence = ' '.join(predict_words)
print(predict_sentence)