Spark2.0 New Features(1) DataSet

Spark 2.0 New Feature 중심으로 살펴보기

Spark2.0으로 업그레이드 되면서 이전 버전보다 좀 더 단순해지고 성능 개선을 위해 노력한 부분이 많이 있다. 이번 시리즈에서는 DataFrame API의 확장형인 type-safe한 Dataset에 대해 살펴 보려고 한다.  이미 Spark1.6에서 alpha버전이 소개된바 있다.

Spark 데이터 구조 파악하기

  • RDD
  • DataFrame(Spark 1.X version)
  • DataSet(Spark 2.X version)

RDD

우선 Spark의 기본 데이터 타입인 RDD부터 살펴보기로 하자.  RDD는 Resilient Distributed Datasets 의 약자로 다음과 같이 설명하고 있다. (http://www-bcf.usc.edu/~minlanyu/teach/csci599-fall12/papers/nsdi_spark.pdf)

RDD - Fault- Tolerant를 보장하기 위해 클러스터내에서 인메모리 연산이 가능하도록 하는 탄력적인 분산 데이터 셋이다.

다음 예제에서 HDFS에 저장된 로그로 부터 ERROR를 추출한후 HDFS를 추출하여 time field를 추출하는 예제이다.

 Lineage graph for the third query in our example. Boxes represent RDDs and arrows represent transformations.

Lineage graph for the third query in our example.
Boxes represent RDDs and arrows represent transformations.

각각의 box는 RDD를 뜻하며 filter,map등의 coarse-grained transformations를 수행하였다.

코드로 표현하면 다음과 같다.

1
2
3
4
5
6
7
lines = spark.textFile("hdfs://...") //HDFS path로 부터 로그 파일을 읽어 lines란 RDD를 생성한다.
errors = lines.filter(_.startWith("ERROR"))  //lines RDD로 부터 ERROR시작되는 데이터만 필터링 하여 errors RDD를 생성한다. 
errors.persist() //in memory에 errors RDD를 persist한다. 
errors.count() //errors의 count를 세려고 action을 수행한다. spark은 lazy execution 방식으로 동작한다. 
errors.filter(_.contains("HDFS")) //errors RDD에서 HDFS문자열이 포함되어 있는 경우 tab으로 분리하여 3번째 필드를 가져오려고 한다. 
      .map(_.split('\t')(3)) //여기서는 3번째 field가 time이라고 가정한다. 
      .collect()  //time field에 대한 결과 값을 리턴한다.

위의 그림처럼 각 RDD는 연결되어 있으며 해당 box가 실패하는 경우 이전 단계로 re-processing을 수행하는  lineage graph를 갖고 있다. In-memory computing에서의 Fault Tolerance를 보장하기 위함이다.

DataFrame

Spark 1.3.X부터 릴리즈된 DataFrame은 named column으로 구성된 데이터의 분산 집합이다. named column에서 의미하듯이 스키마를 가진 RDD이다. R/Python에서의 data frame/pandas모듈과 유사하고 Spark내부에서 최적화 할 수 있도록 하는 기능들이 추가되었다. 또한, 기존 RDD에 스키마를 부여하고 질의나 API통해 데이터를 쉽게 처리 할 수 있다. 대규모의 데이터 셋을 더욱 쉽게 처리 할 수 있도록 High-level API 를 지원하여 RDD programming 보다 더 직관적으로 구현이 가능하도록 추상적인 인터페이스를 지원한다. Spark2.0에서는 DataFrame과 Datasets가 Datasets으로 병합되어 데이터 처리를 통합하고 있다. 내부적인 동작 방식에는 Catalyst Optimizer를 통해 실행 시점에 최적화된 코드를 제공하고 있어 RDD 프로그래밍과 달리 언어(Scala, Java, Pytho, R)에 상관없이 동일한 성능을 보장한다.

Datasets

Spark2.0부터는 Dataset은 강력한 형식의 API와 형식이 지정되지 않는 API두가지를 사용한다. 개념적으로 DataFrame은 Dataset[Row]로 간주되며 Dataset의 subset으로 볼 수 있다. 여기서 Row는 유형이 지정되지 않는 JVM객체이다. Dataset은 Scala 나  Java클래스에서 정의하는 case class를 통해 타입을 선언한 강력한 형식의 JVM객체의 모음이다.  Datasets은 Scala와 Java만 지원하는데 Python과 R의 경우 컴파일 타임형 안정성이 없기 때문이다.

Datasets의 이점

정적 타이핑 및 런타임 유형 안전

아래 도식에서 보듯이 SQL은 데이터 집합에 대해 구문에러 분석 에러에 대해 제한적이다. 즉 Spark SQL문자열 쿼리에서 런타임까지 구문 오류를 알 수 없다. 반면에 Dataframe과 Dataset의 경우 컴파일 타임에 오류를 잡을 수 있다. Dataframe에서 API의 일부가 아닌 함수를 호출하면 컴파일 시점에 에러가 나지만 런타임에서는 존재 하지 않는 열 이름을 감지 할 수 없다. Dataset의 경우 람다 함수 및 JVM유형 객체로 표현되기 때문에 유형이 지정된 매개변수의 불일치가 컴파일 시점에 감지되며 분석오류까지 알 수 있다. Dataset의 경우 개발자에게는 제약이 많이 따르지만 생산적인 측면도 있다.

sql vs dataframe vs dataset

높은 추상화와 구조/반 구제에 따른 사용자 정의 보기

Dataset의 집합인 DataFrame[Row]는 구조화 된 사용자 정의 뷰를 반 구조화된 데이터 형태로 보여준다. 예를 들어 JSON데이터 포맷의 경우 스키마를 포함하고 있어 DataFrame으로 스키마 정보가 바로 바인딩된다.  다음과 같이 case class를 생성하여 필요로 하는 데이터를 정의한 후 JSON데이터를 읽어 Dataset[T]의 Scala JVM객체로 변환해 보자.

1
{"device_id": 198164, "device_name": "sensor-pad-198164owomcJZ", "ip": "80.55.20.25", "cca2": "PL", "cca3": "POL", "cn": "Poland", "latitude": 53.080000, "longitude": 18.620000, "scale": "Celsius", "temp": 21, "humidity": 65, "battery_level": 8, "c02_level": 1408, "lcd": "red", "timestamp" :1458081226051}
1
case class DeviceIoTData (battery_level: Long, c02_level: Long, cca2: String, cca3: String, cn: String, device_id: Long, device_name: String, humidity: Long, ip: String, latitude: Double, lcd: String, longitude: Double, scale:String, temp: Long, timestamp: Long)
1
2
3
4
// read the json file and create the dataset from the 
// case class DeviceIoTData
// ds is now a collection of JVM Scala objects DeviceIoTData
val ds = spark.read.json(“/databricks-public-datasets/data/iot/iot_devices.json”).as[DeviceIoTData]

위와 같이 Dataset[ElementType]을 통해 강력하게 형식화된 객체에 대해 컴파일 타임 안정성 및 사용자 정의 보기가 가능하다.

사용 편의성

Dataset은 높은 수준의 API로 수행할 수 있어서 Dataset의 유형 객체에 대해 직접 엑세스 하여 agg, select, sum, avg, map, filter, groupBy등의 작업을 수행하는 것이 더욱 간결해 졌다.

1
2
3
4
5
6
7
// Use filter(), map(), groupBy() country, and compute avg() 
// for temperatures and humidity. This operation results in 
// another immutable Dataset. The query is simpler to read, 
// and expressive
val dsAvgTmp = ds.filter(d => {d.temp > 25}).map(d => (d.temp, d.humidity, d.cca3)).groupBy($"_3").avg()
//display the resulting dataset
display(dsAvgTmp)

성능 및 최적화

Dataset도 DataFrame과 마찬가지로 Catalyst Optimizer를 통해 실행 시점에 코드 최적화를 통해 성능을 향상하고 있다. Datasets은 JVM객체에 대한 바인딩으로 Encoder를 사용하여 유형별 JVM객체를 Tungsten의 내부 메모리 표현으로 매핑하게 된다. 결과적으로 Encoder를 통해 JVM객체를 효율적으로 직렬화/비직렬화 할 수 있으며 소형의 바이트 코드를 생성하게 됨으로 이는 cache size 축소 및 처리시간 관련 성능상 이점을 가질 수 있다.

RDD vs DataFrame vs Datasets언제 쓸까?

나중에 나온 기술이 이전 기술을 보완하고 있기 때문에 더 좋을 것이다. 개인적인 의견은 RDD의 경우 데이터를 직접적으로 핸들링 해야 하는 경우라면 낮은 수준의 API를 제공하므로 RDD를 고수할 수 있겠지만 추상화된 API를 사용하여 간결하게 코드를 작성하고 Catalyst Optimizer를 통해 성능 향상을 꾀하고자 한다면 DataFrame, Datasets을 고려해 볼 수 있다. 데이터 엔지니어냐 데이터 분석가냐에 따라 사용하기 편한 언어와 환경은 다를수 있다. Scala/Java개발자라면 Encoder의 장점을 활용하는 Datasets을 권하고 싶다.

Spark2.0 시리즈는 TO BE CONTINUED

[참고]

  • https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html
  • https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html
  • https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html

Popit은 페이스북 댓글만 사용하고 있습니다. 페이스북 로그인 후 글을 보시면 댓글이 나타납니다.