🍐

Why Skewed Data Causes Problem

이번에는 스파크 스트리밍(Spark Streaming) 애플리케이션에서 데이터 스큐(Skewed Data)와 작은 파일 문제(Small File Problem)를 어떻게 개선했는지 살펴보자.

Spark Streaming Application

스파크 스트리밍 애플리케이션은 카프카(Kafka)로부터 데이터를 수신하여 HDFS에 저장하는 단순한 작업을 처리한다. 이 앱은 1분 단위의 마이크로 배치(Micro Batch)로 동작하며, 데이터를 [year, month, day, hour, column_a]로 나눠 저장한다. 여기서 column_a의 값은 [100, 600) 범위의 데이터를 포함한다. 실제 데이터는 아래와 같다.

  • 1xx: 100 - 110
  • 2xx: 200 - 220
  • 3xx: 300 - 320
  • 4xx: 400 - 420
  • 5xx: 500 - 520

이 애플리케이션에서 때때로 작업 지연이 발생한다. 원인은 HDFS 네임 노드(Name Node)의 부하가 증가했을 때 발생하는 것으로 추정된다. 물론, 1분 단위로 소비되는 데이터의 양이 많아질 때도 작업 지연이 발생하지만, 여기서 다루고자 하는 핵심은 HDFS 네임 노드의 부하 문제다. 실제 작업 지연이 발생했을 때, HDFS 네임 노드에서 대량의 파일 작업(File Ops.) 요청이 발생한 것을 지표를 통해 확인했다. 그렇다면, 왜 이런 대량의 파일 작업 요청이 발생했을까?


Small File Problem

스파크 스트리밍 애플리케이션이 적재한 데이터는 다른 시스템에서 사용된다. 해당 시스템은 하루 단위의 모든 데이터를 읽고 있는데, 이 작업을 실행할 때마다 파일 작업 요청이 대량으로 발생하는 것을 확인했다. 여기서 의문점은 하루 단위의 데이터가 크지 않음에도 불구하고 네임 노드에 과부하가 발생했다는 것이다.

다음 쿼리를 생각해보자:

123456
SELECT COUNT(*)
FROM db.table
WHERE year = 2024
AND month = 202412
AND day = 20241201
AND column_a IN (500, 501, 502, ..., 519);

이 쿼리는 2024년 12월 1일 자 데이터 중, column_a 값이 ‘5xx’인 데이터의 개수를 구하고자 한다. 데이터 크기가 1GiB 이하임에도 불구하고, 다량의 파일 작업 요청이 발생하여 네임 노드에 부하를 주었다. 그 이유는 데이터 크기는 작지만, 작은 파일로 나뉘어 저장되었기 때문이다.

HDFS 네임 노드는 모든 파일과 관련된 메타데이터를 메모리에 저장하며, 파일 읽기와 쓰기 요청을 처리한다. 작은 파일이 많아질수록 네임 노드의 메모리와 CPU 자원을 소모하여 병목 현상을 초래한다.


Skewed Data

데이터 스큐는 특정 컬럼 값의 데이터 비율이 불균형한 상태를 말한다. 여기에서는 column_a를 기준으로 데이터 비율을 보면 다음과 같다.

  • 1xx: 95%
  • 2xx: 1%
  • 3xx: 0.x%
  • 4xx: 3%
  • 5xx: 0.x%

전체 데이터의 95%가 1xx 값에 집중되어 있다. 이런 데이터 스큐로 인해 스파크 애플리케이션은 다수의 작은 파일을 생성했다. 애플리케이션은 column_a 파티션을 고려하지 않고, 단순히 전체 데이터의 양과 파일 크기를 기준으로 파티션을 나눴다.

이로 인해, 작은 데이터 비율을 가진 값들이 각각 많은 작은 파일로 저장되었고, 이는 HDFS 네임 노드 부하의 원인이 되었다.


Improved Spark Streaming Application

스파크 스트리밍 애플리케이션 개선 작업은 두 가지 관점에서 진행되었다:

  1. 저장하는 데이터 줄이기
  2. 저장하는 파일 수 줄이기

저장하는 데이터 줄이기

저장 데이터를 줄이기 위해 데이터를 사용하는 다른 시스템을 조사하여 실제로 사용되지 않는 데이터를 필터링했다. 약 20%의 데이터를 제외함으로써 데이터 저장량을 줄일 수 있었다.

저장하는 파일 수 줄이기

파일 수를 줄이기 위해 처음에는 마이크로 배치 시간을 2분 또는 N분 단위로 늘려보았다. 그러나 이 방법만으로는 큰 효과를 보지 못했다. 파티션 수를 고정하거나 데이터를 더 큰 단위로 묶어 저장하는 방식으로도 여전히 문제가 남았다.

데이터 스큐 문제를 해결하기 위해, 데이터 비율을 기준으로 동적으로 파티션을 조정하는 방식을 도입했다.

Count with GroupBy

처음 시도한 방법은 집계 연산을 통해 column_a의 각 값이 가진 레코드 수를 계산하고 이를 활용하는 방법이었다. 다음 코드는 그 예를 보여준다:

12345678
val df = ...
val keyToCount = df.groupBy(col("column_a")).count()
.withColumnRenamed("column_a", "_column_a")
val repartitionedDf: DataFrame = df.join(keyToCount, df("column_a") === keyToCount("_column_a"))
.withColumn("_repartition_seed", ((rand() * keyToCount("count")) / 250000).cast(IntegerType))
.repartition(df("column_a"), col("repartition_seed"))
.select(df("*"))

하지만 이 방법은 집계 연산으로 인해 1분 단위 마이크로 배치가 약 6분 가까이 소요되는 비효율성을 보였다. 따라서 적용이 불가능했다.

Row Number

두 번째로 시도한 방법은 각 레코드에 고유 번호를 할당하는 방식이었다. 이 접근법은 집계를 하지 않고도 데이터를 나누는 데 활용할 수 있었다.

12
val repartitionedDf: DataFrame = df.withColumn("_row_number", row_number().over(Window.partitionBy("column_a").orderBy("column_a")) - 1)
.withColumn("_repartition_seed", (col("_row_number") / 250000).cast(IntegerType))

그러나 이 방법 또한 연산 비용이 높아 적용이 어려웠다.

Fixed Calculated Volume Ratio

최종적으로 선택한 방법은 데이터를 사전에 분석하여, 각 column_a 값의 비율을 기반으로 파티션을 동적으로 조정하는 방식이었다. 비율이 높은 값은 더 많은 파티션을 할당받고, 낮은 값은 고정된 두 개의 파티션에 저장되도록 했다.

12345678
val dfWithRepartitionSeed = df
.withColumn("_repartition_seed",
when(col("column_a").isin(Percentages.filter(_._2 >= ThresholdPercentage).keys.toSeq: _*),
concat(col("column_a"), lit("_"), (rand() * ceil(getPercentage(col("column_a")) * (numPartition - 2) / 100)).cast("int"))
).otherwise(concat(lit("999_"), (rand() * 2).cast("int"))))
val repartitionedDf = dfWithRepartitionSeed
.repartition(col("_repartition_seed"))

이 방식은 추가적인 연산 비용 없이 사전 계산된 비율을 활용하여 데이터 스큐 문제를 효과적으로 완화했다.


After Improvement

하루 단위의 데이터를 비교했을 때 전체 파일 수가 19,328,672개에서 132,196개로 줄었다. 이는 HDFS 파일 관리 측면에서 엄청난 개선이다.

다만, 카프카로부터 데이터 유입량이 크게 증가하거나, 기존에 작은 데이터 비율을 가진 값들이 갑자기 많아지는 경우를 대비한 추가 개선이 필요하다.

이번 작업을 통해 문제의 원인을 단순히 "그럴 것이다"라고 추정하기보다, 구체적인 데이터를 통해 접근하는 것이 중요함을 다시 한번 깨달았다.