Partitioned Collection of Records RDD는 여러 파티션으로 나뉜 데이터 모음이다. 각 파티션은 클러스터의 서로 다른 노드에 분산 저장되고, 일반 정형 테이블뿐 아니라 (key, value) 형태의 데이터도 자연스럽게 다룬다.
Spread Across the Cluster, Read-only 데이터는 클러스터 전반에 분산되며 읽기 전용으로 동작한다. 한 번 생성된 RDD는 수정하지 않고, Transformation을 통해 새 RDD를 만들어낸다.
Caching Dataset in Memory 반복해서 사용할 데이터는 메모리에 캐싱할 수 있다.
🤖그렇다면 RDD와 DataFrame의 차이점은?
RDD vs DataFrame 비교
RDD
DataFrame
API 수준
저수준(Low-level) API
고수준(High-level) API
데이터 구조
스키마 정보가 없는 객체의 모음
Row 객체로 구성된, 이름과 타입을 가진 컬럼의 테이블 형태
데이터 형태
정형/비정형 데이터 모두 처리 가능
정형 데이터 처리에 최적화
최적화
개발자가 직접 최적화 코드 작성
Catalyst 옵티마이저(최적의 실행계을 세우는 옵티마이저) 를 통한 자동 쿼리 최적화
편의성
세밀한 제어가 가능하지만, 코드가 복잡해질 수 있음
SQL과 유사한 API를 제공하여 사용이 편리함
성능
일반적으로 DataFrame보다 느리고 메모리 사용 비효율적
Tungsten 엔진과 옵티마이저 덕분에 성능 및 메모리 효율성 우수
요약 및 권장 사항
Spark 2.0 이후부터는 대부분의 경우 DataFrame 사용이 권장됩니다. 이유는 다음과 같습니다.
성능: Catalyst 옵티마이저와 Tungsten 실행 엔진을 통해 RDD보다 훨씬 뛰어난 성능을 제공합니다.
편의성: SQL과 유사한 직관적인 API를 제공하여 데이터를 쉽게 조작하고 분석할 수 있습니다.
물론, 다음과 같은 특정 상황에서는 RDD가 여전히 유용합니다.
클러스터의 물리적 데이터 배치를 아주 세밀하게 제어해야 할 때
DataFrame API가 제공하지 않는 저수준의 기능이 필요할 때
🤖 Spark 2.0 이후 DataFrame 사용이 권장된다고 했는데, RDD를 사용해야 하는 특정 상황에 대한 실제 사용 사례를 알려주세요.
1. 클러스터 데이터의 물리적 배치 제어 (Custom Partitioning) RDD는 DataFrame이 제공하지 않는 데이터의 물리적 분포를 직접 제어하는 저수준(low-level) 기능을 제공하여, Shuffle을 최소화하는 고도의 성능 튜닝을 가능하게 합니다.
<시나리오: 대규모 사용자 데이터 조인 최적화> 수십억 건의 사용자 활동 로그(log RDD)와 사용자 프로필 정보(profile RDD)를 userID를 기준으로 조인(join)해야 하는 상황을 가정해 보겠습니다.
<문제점> 기본적으로 Spark는 해시(hash)를 기반으로 데이터를 파티셔닝합니다. 이로 인해 동일한 userID를 가진 데이터가 클러스터의 여러 노드에 흩어져 있을 수 있습니다.조인 연산을 수행할 때, 관련된 데이터들을 네트워크를 통해 특정 노드로 모으는 과정(Shuffle)이 발생하며, 이는 엄청난 성능 저하를 유발합니다.
<RDD 해결책> userID를 키로 하는 (userID, log_data) 형태의 Pair RDD와 (userID, profile_data) 형태의 Pair RDD를 만듭니다.
두 RDD 모두에 대해 사용자 정의 파티셔너(Custom Partitioner)를 적용하여 partitionBy() 연산을 수행합니다. 예를 들어, userID의 특정 범위나 지역 코드에 따라 데이터가 저장될 파티션(노드)을 명시적으로 지정하는 파티셔너를 만듭니다. "한국 사용자는 110번 파티션에, 미국 사용자는 1120번 파티션에 저장하라" 와 같은 규칙을 정할 수 있습니다.
이렇게 하면 조인하려는 데이터들이 사전에 같은 노드에 위치(co-located)하게 됩니다.
이제 join 연산을 수행하면, 네트워크를 통한 데이터 이동(Shuffle)이 거의 발생하지 않고 각 노드 내에서 독립적으로 조인이 처리되므로 성능이 극적으로 향상됩니다.
2. DataFrame API로 표현하기 어려운 복잡한 알고리즘 RDD의 유연한 데이터 모델은 테이블 형태에 국한되지 않아, 그래프, 행렬 연산, 유전 알고리즘 등 복잡하고 반복적인 계산이 필요한 고급 분석 알고리즘을 구현하는 데 필수적입니다. <시나리오: 그래프(Graph) 알고리즘 구현> 소셜 네트워크에서 '친구의 친구'를 추천하거나, 금융 거래의 이상 패턴을 탐지하는 등 데이터 간의 복잡한 관계를 분석해야 할 때가 있습니다.
<문제점> 그래프 데이터(정점(vertices)과 간선(edges)의 관계)는 행과 열로 구성된 테이블 형태의 DataFrame으로 표현하고 다루기가 매우 부자연스럽고 복잡합니다. 페이지랭크(PageRank)나 최단 경로 찾기 같은 그래프 순회 알고리즘은 DataFrame의 연산 모델로는 구현하기 어렵습니다.
<RDD 해결책> Spark의 그래프 처리 라이브러리인 GraphX(또는 GraphFrames)는 내부적으로 RDD를 기반으로 동작합니다. 정점(사용자, 계좌 등)을 담은 VertexRDD와 간선(친구 관계, 거래 내역 등)을 담은 EdgeRDD를 생성합니다.
RDD API를 기반으로 하는 map, flatMap, aggregateMessages 등의 그래프 연산자를 사용하여, 각 정점이 이웃 정점과 메시지를 주고받으며 상태를 업데이트하는 방식으로 복잡한 알고리즘을 효율적으로 구현할 수 있습니다.
DataFrame 연산을 RDD로 동일하게 해보기
Spark DataFrame은 내부적으로 RDD(Resilient Distributed Dataset)를 기반으로 만들어졌기 때문에 모든 DataFrame은.rdd속성을 통해 원시(raw) RDD에 접근할 수 있으며, 이 RDD는Row객체들로 구성됩니다.
해당 섹션에서는 아래 SQL 쿼리를 DataFrame이 아닌, RDD 연산으로 직접 구현하는 과정을 살펴봅니다.
이 과정을 쭉 따라가면서, SQL 과 동일한 연산을 RDD에서 하기 위해선 상당히 복잡한 과정을 거쳐야한다는 것을 체감했습니다..
물론 Spark 에 DataFrame과 SQL API가 도입되면서 RDD 연산방식은 사용하지 않게되었으나, 저수준 레벨에서의 RDD 연산을 통한 최적화가 필요한 경우, spark가 이렇게 돌아가는구나 정도는 알고는 있어야겠구나라는 생각이 들었습니다.
SQL
-- 목표: 시간대(hour)와 위치(zone)별로 매출(amount)과 운행 횟수(number_records) 집계
SELECT
date_trunc('hour', lpep_pickup_datetime) AS hour,
PULocationID AS zone,
SUM(total_amount) AS amount,
COUNT(1) AS number_records
FROM
green
WHERE
lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
1, 2
1. RDD 연산으로 SQL 쿼리 구현하기:map,filter,reduceByKey
RDD를 사용하여 위 SQL과 동일한 작업을 수행하려면, 데이터를 여러 단계에 걸쳐 변환해야 합니다.