Optimizing Spark Streaming App with Data Skew: Reducing File Count

최적화 작업에 대해 얘기하기 전에 이번에 작업한 스파크 스트리밍 앱에 대해 알아보자. 이 스파크 스트리밍 앱은 카프카로부터 데이터를 수신하여 HDFS에 저장한다. 데이터를 가공하고 필터하는 부분이 있지만, 특정 상태를 가지거나 다른 저장소를 참조하는 부분은 없다. 이 앱은 1분을 주기로 갖는 마이크로 배치로 스트리밍 프로세싱을 한다. 그리고 이 앱은 데이터를 시간 데이터뿐만 아니라 특정 필드 기반으로 나누어 저장한다.

예를 들어,

{
  "id": "100",
  "name": "aanakwad",
  "timestamp": "2025-02-23T22:44:45+09:00"
}

이러한 데이터를 timestampid기준으로 나누어 저장한다. 저장하는 디렉토리의 구조를 보면 다음과 같다.

$HOME/month=202502/day=20250223/hour=22/id=100
$HOME/month=202502/day=20250223/hour=22/id=101
...
$HOME/month=202502/day=20250223/hour=22/id=599

이 앱은 각 디렉토리마다 저장하는 파일의 수를 제어하기 위해 카프카로부터 소비한 데이터의 레코드 수를 이용하고 있다. 카프카로부터 데이터를 1:1로 소비하기 위해 보통 스파크 앱의 파티션 수를 카프카 토픽의 파티션 수와 일치시킨다. 이 앱도 마찬가지지만, 카프카로부터 데이터를 가져온 이후 소비한 레코드 수에 따라 스파크 파티션 수를 조정한다. 다시 말하자면, 저장할 데이터의 레코드 수가 많으면 더 많은 수의 파티션(파일)으로 데이터를 저장한다. 반대로 저장할 데이터의 레코드 수가 적으면 더 적은 수의 파티션(파일)으로 데이터를 저장한다. 앱이 파일의 수를 고정하지 않고 동적으로 제어하는 이유는 파일들을 HDFS에 저장하기 때문이다. 이에 대한 이유는 추후 살펴보자. 여기까지 이 스파크 스트리밍 앱에 대해 요약하면,

  • 카프카 to HDFS
  • 1분 주기의 마이크로 배치
  • timestampid기반으로 파티션
  • 1분 동안의 소비한 데이터 양에 따른 파티션(파일) 수 조정

Deep Dive about File Count

Dynamic Partition with Record Count

앞서 살펴본 디렉토리 구조와 파일의 수를 제어하는 로직에 대해서 더 깊게 살펴보자. 예를 들어, 저장할 데이터의 레코드 수가 24,000,000로 가정하자. 그리고 파일 하나당 저장할 레코드 수를 240,000로 생각하고 파티션의 수를 계산해보자.

24000000 / 240000 = 100

스파크 스트리밍 앱은 위의 계산처럼 100개의 파티션으로 데이터를 처리한다. 그리고 디렉토리 구조를 보면 timestampid기준으로 데이터를 나눠서 저장하는데, 100개의 파티션이 각 디렉토리마다 100개의 파일로 데이터를 저장한다. 그 이유는 timestamp 값은 최소 단위가 시(hour) 단위이기 때문에 1분 단위의 배치에서는 동일한 값이라서 하나의 디렉토리다. 즉, timestamp가 같는 cardinality는 1이다(단, 5x분과 0x분 사이의 데이터는 여기서 고려하지 않음). 그러나 id는 1분 단위 배치에서도 서로 다른 값이 존재하기 때문에 다수의 디렉토리를 갖는다. timestamp와 달리 1보다 큰 값의 cardinality를 갖는다. 따라서 1분 단위 배치에서 생성하는 파일의 수는 다음과 같다.

1 x [`id`의 cardinality] x 100

만약 id의 cardinality가 20라고 하면, 이 앱은 1분마다 2,000개의 파일을 생성한다. 이를 한 시간, 하루 단위로 계산하면 각각 120,000개, 2,880,000개의 파일이다.

How to Store Files in HDFS

HDFS는 파일을 그대로 저장하는 것이 아니라 일정한 크기로 나누어 저장한다. 이를 블락(Block)이라고 하는데, 보통 하나의 파일을 일정한 블락 크기(128MB)로 나누어 다수의 블락들로 저장한다. 위의 이 앱은 한 파일당 240,000개의 레코드를 저장하고 있고 한 레코드당 평균 약 512 바이트 크기(실제 데이터 기반의 값)를 갖는다.

240000 x 512 = 122880000

그래서 한 파일당 약 120MB 크기를 갖고 HDFS에서는 이 파일을 하나의 블락으로 저장한다. 또한, HDFS는 저장한 파일과 파일에 해당하는 블락들의 메타데이터를 네임노드(Name node)의 메모리에 저장하고 있다. 보통 메타데이터 하나의 크기를 200 바이트로 가정한다면, 위의 앱에서 저장한 1일 치 파일들의 메타데이터 크기를 계산하면 다음과 같다.

2880000 x 2 x 200 = 1152000000

HDFS의 네임노드는 하루마다 약 1.2GB 크기의 데이터를 메모리에 저장한다. 이에 대해 정리하자면

  • 1일마다 HDFS에 2,880,000개의 파일 생성(1분마다 2,000개의 파일)
  • 1일마다 약 1.2GB 크기의 HDFS 네임노드의 메모리 사용

여기까지 앱에서 어떤 로직으로 동적 파티션을 가지고 얼만큼의 파일들을 저장하는지 알아보았다. 그리고 이 파일들이 HDFS에서 어떻게 저장되는지도 간단하게 살펴보았다. 그렇다면 이 앱과 HDFS가 건강한(healthy) 상태일까. HDFS의 네임노드의 많은 메모리 사용, 1분마다의 2K의 파일 생성 요청, 그리고 다수의 다른 스트리밍 앱도 같이 동작하면서 HDFS의 부하를 주는 환경이라면 어떨까. HDFS의 네임노드의 부하가 클 때마다 스트리밍 앱 프로세싱이 원활하지 않고 이에 따른 지연이 발생할 수 있다. 이 상황이라면 이 앱과 HDFS는 최적의 상황이 아니라고 할 수 있다. 이 앱과 HDFS를 최적화해보자.

Problem Statement

많은 파일의 수가 문제라면 파일 하나당 크기를 증가시키고 전체의 파일을 수를 감소하면 간단하지 않을까. 파일 수를 줄이기 위해 파일 하나당 저장하는 레코드 수를 늘이는 방법을 고려해보자. 레코드 수가 증가하면 파티션의 수는 작아지고 하나의 파티션 마다 처리하는 데이터 양은 많아진다. 파티션은 각각의 executor에서 처리하고 데이터 양에 따라 메모리와 디스크를 이용해서 처리할 것이다. 이전과 같은 앱의 성능을 보장하기 위해 executor의 메모리를 증가시켜 디스크 I/O도 어느정도 제어할 수 있다. 그러나 이 방법을 적용하기 전, 앱에서 적재한 파일들을 살펴보았다. 적재한 파일들 중 몇몇 파일은 앞서 설명했듯이 적정한 크기를 가지고 있었지만, 대부분은 120MB에 한참 못미치는 몇 KB 수준의 파일들도 있었다. 왜 그럴까. 그 이유는 id기준으로 데이터 양이 다르기 때문이다. 특정 id 몇 개가 데이터의 다수를 차지하고 있었고, 나머지 id들은 데이터 양이 적었다. 큰 비중을 가진 id들은 의도한 대로 데이터를 적정하게 저장하지만, 나머지들은 상대적으로 파일 하나당 한참 적은 데이터를 저장하고 있었다. 그래서 앞서 언급했던 단순히 파일당 레코드 수를 증가하는 것은 근본적인 해결책이 아닐것으로 판단했다. 여기서 해결 방법은 다수의 데이터를 차지하는 부분은 더 많은 수의 파일로, 소수의 데이터를 차지하는 부분은 더 적은 수의 파일로 적재하는 것이 필요했다.

Optimization

Dynamic Repartition

동적으로 파티션 수를 계산하는 부분을 고도화해보자. 단순히 전체 레코드 수로 파티션 수를 계산하는 것이 아니라 각 id가 가진 레코드 수로 파티션 수를 계산해보자. 파티션 컬럼(여기서는 id컬럼)마다 가진 레코드 수를 계산하고 이를 바탕으로 repartition을 하는 방법을 참고해보자. 해당 로직을 보면 레코드 수를 계산하기 위해 count()를 사용하고 그리고 이 결과를 join하여 임의의 seed 값을 만들고 repartition을 진행한다. 그러나 이 로직을 적용한 결과, 레코드 수를 계산하는 부분에서 작업이 오래 걸렸다. 결과적으로 스트리밍 처리 지연이 발생하여 이 로직을 적용할 수 없었다. 이 groupBy, count 또는 row_number를 계산하는 과정에서 특정 id기준으로 데이터를 모으는 추가 작업(셔플)이 생겨 지연이 발생하는 것으로 추정하고 있다. 그러면 추가적인 집계 연산 없이 진행할수는 없을까.

Repartition with Fixed Ratio

추가적인 집계 연산없이 미리 집계 연산을 한 데이터를 활용해서 repartition을 해보자. 저장한 데이터를 기반으로 데이터 분포를 집계해보면 아래와 같다.

+-----+-----------+----------------+-------------------+
| id  | count     | total_count    | percentage        |
+-----+-----------+----------------+-------------------+
| 100 | 280000000 | 1000000000     | 28.00             |
| 101 | 270000000 | 1000000000     | 27.00             |
| 102 | 278040000 | 1000000000     | 27.80             |
| 103 | 50000000  | 1000000000     | 5.00              |
| 104 | 40000000  | 1000000000     | 4.00              |
| 105 | 30000000  | 1000000000     | 3.00              |
| 106 | 20000000  | 1000000000     | 2.00              |
| 107 | 15000000  | 1000000000     | 1.50              |
| 108 | 10000000  | 1000000000     | 1.00              |
| 109 | 5000000   | 1000000000     | 0.50              |
| 200 | 1000000   | 1000000000     | 0.10              |
| 201 | 600000    | 1000000000     | 0.06              |
| 202 | 80000     | 1000000000     | 0.008             |
| 203 | 70000     | 1000000000     | 0.007             |
| 204 | 60000     | 1000000000     | 0.006             |
| 205 | 50000     | 1000000000     | 0.005             |
| 206 | 40000     | 1000000000     | 0.004             |
| 207 | 30000     | 1000000000     | 0.003             |
| 208 | 20000     | 1000000000     | 0.002             |
| 209 | 10000     | 1000000000     | 0.001             |
+-----+-----------+----------------+-------------------+

전체 파티션 수는 전체 레코드 수에 따라 결정하고, 전체 파티션 수를 각 id값 마다의 percentage 값을 활용하여 파티션 수를 계산하는 방법이다. 비중이 작은 id값들은 파티션 수를 계산하는 것이 의미가 없다고 판단하여 별도의 2개의 파티션으로 처리하도록 했다. 앞 예제처럼 전체 파티션 수가 100이라고 하면 id가 100인 데이터는 28개의 파티션 수로 데이터를 처리한다.

Optimization Impact

Files & Blocks Reduction

최적화 이후, 1일 동안의 파일과 블락 수가 많이 감소했다. 아래는 실제 앱 최적화 작업의 결과다.

|                              | 최적화 작업 전 | 최적화 작업 이후 |   변화량   |
|:-----------------------------|--------------:|-----------------:|-----------:|
| 1일 동안의 HDFS 파일 개수    |     1,929,938 |           76,927 | -1,853,011 |
| 1일 동안의 HDFS 블락 개수    |     1,929,938 |           93,814 | -1,836,124 |

아래 차트 이미지는 애플리케이션이 HDFS에 저장하는 블락 수에 대한 지표다. 블락 수가 1천 3백만개 정도 유지하고 있었지만, 개선 이후 1백만개 정도로 줄었든 것을 볼 수 있다.

Usage of Memory in HDFS NN Reduction

파일과 블락 수가 감소함에 따른 HDFS 네임노드의 메모리 사용량도 감소하는 것을 볼 수 있었다. 아래는 최적화 이전에 HDFS에서 약 6일 동안의 데이터를 저장하고 있을 때, 어느정도의 메모리를 사용하는지 계산한 결과다.

6일 동안의 메모리 사용량
1929938 * 6 = 11579628 (files)
1929938 * 6 = 11579628 (blocks)
11579628 * 2 = 23159256
23159256 * 200(bytes) = 4631851200 (약 4.31GiB)  

아래는 최적화 이후, 6일 동안의 데이터를 저장하고 있을 때, 어느정도의 메모리를 사용하는지 계산한 결과다.

76927 * 6 = 461562 (files)  
93814 * 6 = 562884 (blocks)  
461562 + 562884 = 1024446  
1024446 * 200(byte) = 204889200 (약 195.40MiB)

최적화 이후, 약 4.31GiB에서 195.4MiB로 메모리 사용량이 줄어들었다.

HDFS File Ops. 감소

HDFS 네임노드의 메모리 사용량 감소와 더불어 HDFS file operation의 수도 많이 줄어들었다. 1분마다 2,000개의 파일을 생성했다면, 최적화 이후에는 100개의 파일이 생성한다. HDFS 네임노드의 부하가 줄어들면서 최적화를 진행했던 이 스파크 스트리밍 앱뿐만 아니라 HDFS를 사용하는 다른 스트리밍 앱들에서 더 이상 지연이 발생하지 않았다. 그리고 이 데이터를 사용하는 하루 단위의 데이터를 읽는 배치 작업에서도 200만개의 파일이 아닌 18만개의 파일을 읽게 되면서 처리속도가 빨라졌다. 위의 차트 이미지는 1분 단위로 HDFS file creation operation을 나타낸다. Current와 1D Ago, 7D Ago 라인을 비교해보면 기존과 달리 많은 수의 요청이 줄어듬을 확인할 수 있다.

Limitation

Fixed Data Volume Ratio

이벤트의 규모에 따른 파일의 수(파티션의 수)를 제어하기 위해 이미 저장되어 있는 데이터의 비율을 사용했다. 현재 이 스트리밍 앱은 데이터를 처리할 때, 고정적인 데이터 비율에 따라 동작한다. 이 상황에서 어떤 id가 사전에 집계한 데이터의 비율이 아닌 값으로 갑자기 급증하거나 급감하면 이에 대응할 수 없다.

  • 급증하면 급증한 만큼 파일(파티션)의 수를 증가시켜줘야한다.
  • 급감하면 급감한 만큼 파일(파티션)의 수를 감소시키거나 고정된 2개의 파티션으로 처리하도록 만들어야 한다. 다시 말하면, id마다의 데이터 비율이 달라지는 것에 대해 앱이 동적으로 대응하지 못하기 때문에 풀어나가야 할 문제다.