데이터로그😎
[Spark] Spark RDD 본문
1. Resilient Distributed Dataset(RDD)
스파크에서 사용하는 데이터의 최소단위.
판다스에 넘파이가 있다면 spark sql, df에는 rdd가 있다!
# 특징
- RDD = 스파크의 핵심 데이터 모델
- cluster에 있는 worker 의 메모리 안쪽에 RDD가 위치함.
- worker들의 메모리에 RDD가 쪼개져서 위치함. 각 RDD들은 서로 연관이 있음.
- 변경이 불가. Inplace 불가. 작업을 할 때마다 새로운 RDD를 계속 만들게 됨.
- 데이터 추상화
- 여러군데에 쪼개져있는 파일을 마치 하나의 파일처럼 사용할 수 있도록 추상화 함.
- 파일을 하나하나 신경써서 일일이 불러오는게 아니고 tool이 알아서 파악해서 불러옴.
- 100PB파일을 여러개의 HDFS에 분산되어 저장
- HDFS에 분산 저장되어 있는 파일은 아래의 명령어로 마치 하나의 파일인 것처럼 불러올 수 있다.
- sc.textFile(filename)
- sc.paralleize()
- 데이터 불변(Immutable) → 장애 복구 가능~
- RDD는 변환(Transform)을 거치면 기존 RDD가 변경되는 것이 아닌 새로운 RDD가 만들어지게 됨 (immutable)
- RDD ⇒ 데이터 불변성(immutable)
- RDD는 변환되는 과정 기록?? = DAG
- RDD 변환과정은 하나의 비순환 그래프. 만약 RDD3 수행 때 컴에 장애 발생하면 RDD2번으로 다시 돌아갈 수 있다.
- RDD1 ----(변환)-----> RDD2 ----(변환)-----> RDD3
- Type Safe 특징
- 컴파일 중 데이터 타입 판별 가능
- 스파크 언어 ⇒ JAVA, SCALA, PYTHON
- 다양한 데이터 구조 지원
- Unstructured(텍스트), Structured Data(RDBMS,테이블..) 모두 담을 수 있음.
- 게으른 연산(Lazy Evaluation)
- 데이터 연산과정 = Transformation + Action
- 하나의 데이터(대용량)에 대해 많은 연산을 거친다.
- 그러나 계산 결과가 필요할 때까지 연산을 하지 않고 기다린다.
- lazy: 연산 최대한 미뤄뒀다가 함.
- T1, T2 = Transformation 작업등록/ A = Action 한꺼번에 실행게으른 연산(Lazy Evaluation) ↔ Eager Execution(pandas)
2. 데이터 연산방식: 병렬, 분산
https://magpienote.tistory.com/190
2-1. 데이터 연산방식: 병렬 vs 분산병렬
Data-parallel | Distributed Data-parallel | |
작동 방식 |
worker들이 있는 RDD에서 작업이 일어나고, <task>가 worker들에게 넘어감. 모든 task(작업)은 worker들이 있는 RDD에서 일어남. |
|
2-2. RDD가 나온 이유 간접적으로 이해하기
- 데이터 연산 방식에서 병렬처리 보단 분산처리 방식이 빅데이터 처리에서는 빠르다.
- 분산처리가 속도가 빠른 이유는 노드간 작업을 나누기 때문인데, Spark는 이를 쉽게 구현할 수 있도록 도와주는 툴이다. Spark를 이용한 분산된 환경에서도 일반적인 병렬처리를 하듯 코드를 짜서 분산 처리 환경이 가능하기 때문이다.
- Spark에서는 분산된 환경에서 데이터 병렬 모델을 구현해 추상화 시켜주기 때문에 속도가 빠르다고 했는데 그것이 바로 Resilient Distributed Datasets(RDD)때문이다.
- RDD.map(<task>) ⇒ RDD가 <task>를 분산된 환경에서도 병렬처리 할 수 있도록 알아서 추상화해 노드로 작업을 분배하고 합친다.
2-3 분산처리에서 신경써야 할 문제
- 부분실패 & 속도!!!
- 네트워크 통신 사용 → 속도 저하(네트워크 통신 줄이면서 최적화해야)
SQL | Spark |
SELECT | reduceByKey (여러 노드 데이터 불러와서 하나로 합치기) |
FROM | RDD |
WHERE | filter(조건에 맞는 데이터 찾기) |
2-4. Key-Value RDD
- (Key, Value) 쌍을 갖음 = Pairs RDD
- Key 기준으로 고차원 (집계)연산 가능 = GROUP BY
- 집계 = 목적에 맞게 데이터 grouping
- 예: (’Kuwait’,900)
pairs = rdd.map(lambda x : (x,1))
# x = Data 하나
# x를 (x,1)의 튜플형식으로 변형
# RDD | Pairs RDD
# --------------------
# 짜장면 | (짜장면,1),
# 짬뽕 | (짬뽕, 1)
Key-Value RDD의 Reduction 연산
- Key를 기준으로 데이터 줄여주는
pairs = rdd.map(lambda x : (x,1))
count = pairs.reduceByKey(lambda a, b : a + b)
# lambda의 a = 누적값, b = 새로운 값
# 누적값 = 지금까지 연산이 되어온 값
# ex) 1,2,3...-> 1단계 => 1+2 = 3, 2단계 => 3(누적값)+3(새로운값) = 6
3. RDD Transformation & Action
3-1. Transformations
spark는 java기반이기때문에 네이밍도 java 따라감.
함수
map() | Data 변환 후 반환 | 하나하나의 데이터의 배열이 나옴 | Narrow |
flatMap() | Data 변환 후 반환 | 평탄화된 배열이 나옴 | Narrow |
filter() | Data 거르기 | Narrow | |
distinct() | 중복값 제거 | Wide | |
reduceByKey() | Key별 집계 | Wide | |
groupByKey() | Key별로 grouping | Wide | |
mapValues() | value만… | ||
flatMapValues() | value에 대해서만 평탄화 | ||
sortByKey() | Key기준으로 정렬 | Wide |
Transformation = Narrow Transformations + Wide Transformations
최적화 목적 = shuffling이 많이 일어나지 않도록
Narrow Transformation Wide Transformation
Narrow Trransformation | Wide Transformation | |
특징 | • 1:1 변환. 하나의 데이터만 바뀜 • 내가 지금 속해있는 파티션에서만 변환. 다른 파티션의 데이터는 참고하지 않음 |
• Shuffling • 다른 파티션의 데이터 들어갈 수 있음. • Sort도 wide이다.(여러 파티션 넘나듦) • 각 파티션 별로 sort하면 shuffling 많이 발생할 수도. • 그러니까 다른 파티션도 참고해서 해야 함. |
함수 | filter(), map(), flatMap(), sample(), union() | Intersection and join, distinct, cartesian, reduceByKey, groupByKey |
3-2. Action
왜 T-A 단계로 나눴을까?
- Lazy Execution
- 연산이 지연되므로 디스크, 네트워크의 연산을 최소화 함.
- 반복적인 데이터 작업에 의한 비효율그렇다면…디스크에 저장하지 않고 바로 다음 task에 전달한다면? task끼리 메모리 내에서 데이터를 교환한다면? 훨씬 빠르지않을까? like Machine Learning
- —> cash(), persist() 통해 메모리에 저장 가능
- 작업 끝나면 디스크에 저장 → 불러옴 → 또 저장 → 불러옴 ——> 디스크를 거치면 속도 저하
hdfs → hdd유리, spark같이 쓰면 ssd 유리
storage level
MEMORY_ONLY_SER (serialize=직렬화) , 용량 down
그냥 cache쓰면 웬만한 성능 보장.
cache 남발하면 memory 부족 현상 발생.
'Data Engineering' 카테고리의 다른 글
[Spark] Spark Machine-Learning (0) | 2023.09.15 |
---|---|
[Spark] Spark SQL (0) | 2023.09.14 |
분산 병렬 처리 시스템 (MPP, DFS) (0) | 2023.09.14 |
[airflow] airflow(ubuntu)-mysql 연결하기? (0) | 2023.09.13 |
[airflow] airflow 사용하기 (0) | 2023.09.11 |