카테고리 없음

Spark RDD 알아보기

빛날희- 2026. 3. 9. 14:45
728x90

(생성 AI를 기반으로 정리된 내용은 🤖 아이콘으로 표기해두었습니다)

 

아래 내용은 DE Zoomcamp 강의 내용을 기반으로 작성되었습니다.


 

Spark RDD란?

RDD는 스파크의 초기 버전에서 분산 컴퓨팅의 기반이었으며,다음과 같은 특징을 가집니다.

  • Partitioned Collection of Records
    RDD는 여러 파티션으로 나뉜 데이터 모음이다.
    각 파티션은 클러스터의 서로 다른 노드에 분산 저장되고,
    일반 정형 테이블뿐 아니라 (key, value) 형태의 데이터도 자연스럽게 다룬다.
  • Spread Across the Cluster, Read-only
    데이터는 클러스터 전반에 분산되며 읽기 전용으로 동작한다.
    한 번 생성된 RDD는 수정하지 않고, Transformation을 통해 새 RDD를 만들어낸다.
  • Caching Dataset in Memory
    반복해서 사용할 데이터는 메모리에 캐싱할 수 있다.

 

 🤖 그렇다면 RDD와 DataFrame의 차이점은?

RDD vs DataFrame 비교

  RDD DataFrame
API 수준 저수준(Low-level) API 고수준(High-level) API
데이터 구조 스키마 정보가 없는 객체의 모음 Row 객체로 구성된, 이름과 타입을 가진 컬럼의 테이블 형태
데이터 형태 정형/비정형 데이터 모두 처리 가능 정형 데이터 처리에 최적화
최적화 개발자가 직접 최적화 코드 작성 Catalyst 옵티마이저(최적의 실행계을 세우는 옵티마이저) 를 통한 자동 쿼리 최적화
편의성 세밀한 제어가 가능하지만, 코드가 복잡해질 수 있음 SQL과 유사한 API를 제공하여 사용이 편리함
성능 일반적으로 DataFrame보다 느리고 메모리 사용 비효율적
Tungsten 엔진과 옵티마이저 덕분에 성능 및 메모리 효율성 우수

요약 및 권장 사항

Spark 2.0 이후부터는 대부분의 경우 DataFrame 사용이 권장됩니다. 이유는 다음과 같습니다.

  • 성능: Catalyst 옵티마이저와 Tungsten 실행 엔진을 통해 RDD보다 훨씬 뛰어난 성능을 제공합니다.
  • 편의성: SQL과 유사한 직관적인 API를 제공하여 데이터를 쉽게 조작하고 분석할 수 있습니다.

물론, 다음과 같은 특정 상황에서는 RDD가 여전히 유용합니다.

  • 클러스터의 물리적 데이터 배치를 아주 세밀하게 제어해야 할 때
  • DataFrame API가 제공하지 않는 저수준의 기능이 필요할 때

🤖 Spark 2.0 이후 DataFrame 사용이 권장된다고 했는데, RDD를 사용해야 하는 특정 상황에 대한 실제 사용 사례를 알려주세요.


1. 클러스터 데이터의 물리적 배치 제어 (Custom Partitioning) 
RDD는 DataFrame이 제공하지 않는 데이터의 물리적 분포를 직접 제어하는 저수준(low-level) 기능을 제공하여, Shuffle을 최소화하는 고도의 성능 튜닝을 가능하게 합니다.

<시나리오: 대규모 사용자 데이터 조인 최적화>
수십억 건의 사용자 활동 로그(log RDD)와 사용자 프로필 정보(profile RDD)를 userID를 기준으로 조인(join)해야 하는 상황을 가정해 보겠습니다.

<문제점>
기본적으로 Spark는 해시(hash)를 기반으로 데이터를 파티셔닝합니다. 이로 인해 동일한 userID를 가진 데이터가 클러스터의 여러 노드에 흩어져 있을 수 있습니다.조인 연산을 수행할 때, 관련된 데이터들을 네트워크를 통해 특정 노드로 모으는 과정(Shuffle)이 발생하며, 이는 엄청난 성능 저하를 유발합니다.

<RDD 해결책>
userID를 키로 하는 (userID, log_data) 형태의 Pair RDD와 (userID, profile_data) 형태의 Pair RDD를 만듭니다.

두 RDD 모두에 대해 사용자 정의 파티셔너(Custom Partitioner)를 적용하여 partitionBy() 연산을 수행합니다.
예를 들어, userID의 특정 범위나 지역 코드에 따라 데이터가 저장될 파티션(노드)을 명시적으로 지정하는 파티셔너를 만듭니다. "한국 사용자는 110번 파티션에, 미국 사용자는 1120번 파티션에 저장하라" 와 같은 규칙을 정할 수 있습니다.

이렇게 하면 조인하려는 데이터들이 사전에 같은 노드에 위치(co-located)하게 됩니다.

이제 join 연산을 수행하면, 네트워크를 통한 데이터 이동(Shuffle)이 거의 발생하지 않고 각 노드 내에서 독립적으로 조인이 처리되므로 성능이 극적으로 향상됩니다.


2. DataFrame API로 표현하기 어려운 복잡한 알고리즘
RDD의 유연한 데이터 모델은 테이블 형태에 국한되지 않아, 그래프, 행렬 연산, 유전 알고리즘 등 복잡하고 반복적인 계산이 필요한 고급 분석 알고리즘을 구현하는 데 필수적입니다.

<시나리오: 그래프(Graph) 알고리즘 구현>
소셜 네트워크에서 '친구의 친구'를 추천하거나, 금융 거래의 이상 패턴을 탐지하는 등 데이터 간의 복잡한 관계를 분석해야 할 때가 있습니다.

<문제점>
그래프 데이터(정점(vertices)과 간선(edges)의 관계)는 행과 열로 구성된 테이블 형태의 DataFrame으로 표현하고 다루기가 매우 부자연스럽고 복잡합니다.
페이지랭크(PageRank)나 최단 경로 찾기 같은 그래프 순회 알고리즘은 DataFrame의 연산 모델로는 구현하기 어렵습니다.

<RDD 해결책>
Spark의 그래프 처리 라이브러리인 GraphX(또는 GraphFrames)는 내부적으로 RDD를 기반으로 동작합니다.
정점(사용자, 계좌 등)을 담은 VertexRDD와 간선(친구 관계, 거래 내역 등)을 담은 EdgeRDD를 생성합니다.

RDD API를 기반으로 하는 map, flatMap, aggregateMessages 등의 그래프 연산자를 사용하여, 각 정점이 이웃 정점과 메시지를 주고받으며 상태를 업데이트하는 방식으로 복잡한 알고리즘을 효율적으로 구현할 수 있습니다.

 

 


DataFrame 연산을 RDD로 동일하게 해보기

Spark DataFrame은 내부적으로 RDD(Resilient Distributed Dataset)를 기반으로 만들어졌기 때문에 모든 DataFrame은 .rdd 속성을 통해 원시(raw) RDD에 접근할 수 있으며, 이 RDD는 Row 객체들로 구성됩니다.

해당 섹션에서는 아래 SQL 쿼리를 DataFrame이 아닌, RDD 연산으로 직접 구현하는 과정을 살펴봅니다.

 

이 과정을 쭉 따라가면서, SQL 과 동일한 연산을 RDD에서 하기 위해선 상당히 복잡한 과정을 거쳐야한다는 것을 체감했습니다..
물론 Spark 에 DataFrame과 SQL API가 도입되면서 RDD 연산방식은 사용하지 않게되었으나, 저수준 레벨에서의 RDD 연산을 통한 최적화가 필요한 경우, spark가 이렇게 돌아가는구나 정도는 알고는 있어야겠구나라는 생각이 들었습니다.
 
SQL
-- 목표: 시간대(hour)와 위치(zone)별로 매출(amount)과 운행 횟수(number_records) 집계
SELECT 
    date_trunc('hour', lpep_pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    green
WHERE
    lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2

1. RDD 연산으로 SQL 쿼리 구현하기: map, filter, reduceByKey

RDD를 사용하여 위 SQL과 동일한 작업을 수행하려면, 데이터를 여러 단계에 걸쳐 변환해야 합니다.

1단계: 데이터 필터링 및 키-값(Key-Value) 형태로 변환 (filter map)

  1. filter: WHERE 절처럼, lpep_pickup_datetime이 '2020-01-01' 이후인 데이터만 남깁니다.
  2. map: GROUP BY 연산을 흉내 내기 위해, 필터링된 각 Row 객체를 (키, 값) 형태의 튜플로 변환하는 함수(prepare_for_grouping)를 적용합니다.
 
python
# 1. 필요한 컬럼만 선택하여 RDD로 변환
rdd = df_green \
    .select('lpep_pickup_datetime', 'PULocationID', 'total_amount') \
    .rdd

# 2. WHERE 절에 해당하는 필터링 함수
from datetime import datetime
start = datetime(year=2020, month=1, day=1)

def filter_outliers(row):
    return row.lpep_pickup_datetime >= start

# 3. 각 데이터를 (Key, Value) 쌍으로 변환하는 함수
def prepare_for_grouping(row): 
    # Key 생성
    hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)
    zone = row.PULocationID
    key = (hour, zone)
    
    # Value 생성
    amount = row.total_amount
    count = 1
    value = (amount, count)

    return (key, value)

# RDD 변환 파이프라인
transformed_rdd = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping)

2단계: 동일한 키를 가진 데이터 집계 (reduceByKey)

reduceByKey 연산을 사용하여 동일한 키 (hour, zone)를 가진 데이터들을 하나로 합칩니다.

이 연산은 각 키에 속한 모든 값들을 우리가 정의한 규칙(calculate_revenue 함수)에 따라 계산합니다. 

SUM(amount) COUNT(1)를 구현하기 위해 각 값의 amount count를 더해주면 됩니다.

 
python
# 2개의 Value 튜플을 입력받아 하나로 합치는 함수
def calculate_revenue(left_value, right_value):
    left_amount, left_count = left_value
    right_amount, right_count = right_value
    
    output_amount = left_amount + right_amount
    output_count = left_count + right_count
    
    return (output_amount, output_count)

# reduceByKey를 파이프라인에 추가
aggregated_rdd = transformed_rdd.reduceByKey(calculate_revenue)

3단계: 최종 결과 형태 정리 (map)

reduceByKey의 결과는 ( (hour, zone), (total_amount, total_count) ) 형태의 중첩된 튜플입니다.

이를 더 보기 좋은 형태로 풀어주기 위해 마지막으로 map을 한번 더 사용합니다.

 
python
from collections import namedtuple
RevenueRow = namedtuple('RevenueRow', ['hour', 'zone', 'revenue', 'count'])

def unwrap(row):
    return RevenueRow(
        hour=row[0][0], 
        zone=row[0][1],
        revenue=row[1][0],
        count=row[1][1]
    )

final_rdd = aggregated_rdd.map(unwrap)

2. RDD를 다시 DataFrame으로 변환하기

최종적으로 만들어진 RDD는 toDF() 메소드를 사용하여 DataFrame으로 다시 변환할 수 있습니다.

중요한 점은 RDD는 DataFrame과 달리 스키마(데이터의 구조와 타입 정보)가 없다는 것입니다. 따라서 toDF()를 호출할 때 스키마를 직접 지정해주는 것이 좋습니다.

  • 스키마를 지정하지 않으면: Spark가 RDD의 모든 데이터를 스캔하여 스키마를 추론해야 하므로, 데이터 양이 많을 경우 상당한 시간이 소요됩니다.
  • 스키마를 지정하면: Spark는 스키마 추론 단계를 건너뛰고 훨씬 빠르게 DataFrame을 생성할 수 있습니다.
 
python
from pyspark.sql import types

# 1. 결과 DataFrame의 스키마 정의
result_schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True),
    types.StructField('zone', types.IntegerType(), True),
    types.StructField('revenue', types.DoubleType(), True),
    types.StructField('count', types.IntegerType(), True)
])

# 2. 전체 파이프라인을 실행하고 최종 RDD를 스키마와 함께 DataFrame으로 변환
df_result = rdd \
    .filter(filter_outliers) \
    .map(prepare_for_grouping) \
    .reduceByKey(calculate_revenue) \
    .map(unwrap) \
    .toDF(result_schema) 

 

728x90