[AWS EMR] Spark로 CSV 파일을 read 후 Parquet로 write하는 방법
현재 진행중인 프로젝트에서 웹 상에서 가져온 크롤링 데이터를 CSV로 받아왔다. 해당 파일을 다루기 쉬운 Parquet로 변환하는 방법으로 AWS서비스의 EMR(Elastic Map Reduce)를 이용하여 Spark로 코드화하였다.(Python 사용)
How to Convert CSV to Parquet Files?
1. Create AWS S3 bucket
AWS상의 버킷을 하나 만들어주고, 그 안에 폴더를 구분하였다.
- input : 크롤링 데이터인 CSV 샘플파일을 저장
- script : spark script 코드를 저장
- output : EMR 클러스터를 돌렸을 때 결과물이 저장되는 위치
폴더 이름은 본인이 편한걸로 설정해주면 된다. sample.csv 파일을 input 폴더에 업로드 하였으며, pysaprk 코드는 script 폴더에 업로드하였다.
2. EMR 클러스터 생성
AWS 서비스 탭을 클릭하여 Amazon EMR을 찾거나 검색창에 EMR을 검색하여 들어와 클러스터를 생성한다. 내가 지정한 Region 안에서 진행한다.
"고급 옵션으로 이동" 한다.
Spark에 체크표시한 뒤 나머지는 기본 설정으로 남겨두고 클러스터를 생성한다.
참고로 보안옵션의 키페어 없음으로 설정해두어도 실습은 가능하며, 생성한 뒤 약 15분정도가 경과하면 클러스터가 test 가능 상태가 된다.
초기 화면은 시작으로 표시되고, 영어버전으로 변경하면 Starting이라는 초록색 글씨를 확인할 수 있다. 대시보드상에서 생성 후 경과시간도 확인할 수 있으니 잠시 10분간 다른 일 하면서 기다려본다.
대시보드에서 마스터, 코어 부분이 현재 어디까지 구성중인지 중간 현황도 확인할 수 있다. 아직 프로비저닝 중으로 확인된다. 8-9분이 경과하니 아래와 같이 대기상태로 변경된다.
마찬가지로 대시보드 우측 하단의 네트워크 및 하드웨어에 보면 Running or 실행 이라는 초록색 글씨가 나오면 작업 환경이 준비가 된다.
3. 단계(Step) 탭에서 단계 추가
단계(Step) 탭을 클릭하면 단계추가 버튼이 보인다. 여기서 사용자 지정 JAR의 위치, 인수를 지정하여 추가해 줄 것이다. 참고로 클러스터를 생성할 때부터 이 단계추가가 가능하지만, 클러스터 생성부터 오류가 날 수 있으니 코드가 확실하다면 생성할 때 옵션들을 이용해 만들고 테스트 단계라면 단계 탭을 이용하는 것이 편하다.
- JAR위치 : command-runner.jar
- 인수 : spark-submit s3://<bucket name>/script/<pyspark filename> s3://<bucket name>/input/<csv filename> s3://<bucket name>/output/
JAR 위치는 command-runner.jar로 입력해준다.
인수 입력할 때 spark-submit 이라는 명령어는 spark-submit [script위치] [input위치] [output위치] 를 의미하며 띄어쓰기로 구분한다. <>안에 들어있는 내용은 각각 환경에 맞게 지정해주면 된다.
추가 후 기다리면 실패/성공 여부를 확인할 수 있고, 정상적으로 작동 되었다면 지정해놓은 S3 bucket의 output 폴더에 아래와 같이 SUCCESS를 볼 수 있다.
* CODE
참고로 여기서 pyspark 코드를 작성하는 데 아래 코드를 사용하였다.
import sys
import os
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
input_path = "s3://jhsim-emr-bucket/input/sample.csv"
output_path = "s3://jhsim-emr-bucket/output/"
spark = SparkSession \
.builder \
.appName("How to read CSV file to Parquet") \
.getOrCreate()
# read csv
SampleData = spark.read.option("inferSchema", "true").option("header", "true").csv(input_path)
SampleData.show()
# write parquet
SampleData.write.format("parquet").mode("overwrite").save(output_path)
먼저 필요한 모듈들과 SparkSession을 import한 뒤, input, output 경로는 가독성이 좋게 따로 지정해두었다. 이후 SparkSession을 지정하였다. 그리고 CSV파일을 read하는 코드, Parquet 형식으로 write하는 옵션을 지정해주면 성공적으로 작동한다.
Spark read CSV를 하는데 있어서 아래 설명을 참고하였다.
글을 마치며
해당 테스트를 해보기 전에 AWS workshops에 올라와있는 EMR workshop을 한 번 돌려봤다. 그 후 진행하니 개념에 대한 이해가 되고 적용점을 찾을 수 있었다. Jupyter notebook으로 기본 테스트를 해봤지만 프로젝트를 위해 Python 언어로 pyspark 스크립트화 해서 EMR에 돌리니 파일 변형하는 부분에 있어서 쉽게 자동화를 시킬 수 있어 편리했다.
CSV 파일마다 필드값이 달라질 수도 있고 같은 필드값을 가진 파일도 있을 것이다. 이번 프로젝트에서는 크롤링을 할 것이기 때문에 별도로 추가되는 필드값은 없다. 그래서 스키마를 필드별로 지정해서 사용하는 것이 더 안정적인 코드가 될 것 같다. 지금은 header값을 true로 지정하여 기본적인 코드로 작성했지만, 더 나아가서 각 필드값별로 스키마를 지정해 주는 것이 더 안정적일 것이다. 이번 코드 외에도 StructField 값을 지정하는 코드까지 2가지 방법을 테스트해봐야겠다.
<참고자료>
* AWS workshops
* Apache Spark Tutorial— How to Read and Write Data With PySpark
* 필드별 StructField 지정 참고