데이터로그😎

[Spark] Spark RDD 본문

Data Engineering

[Spark] Spark RDD

지연v'_'v 2023. 9. 14. 22:40

1. Resilient Distributed Dataset(RDD)

스파크에서 사용하는 데이터의 최소단위.

판다스에 넘파이가 있다면 spark sql, df에는 rdd가 있다!

# 특징

  • RDD = 스파크의 핵심 데이터 모델
  • cluster에 있는 worker 의 메모리 안쪽에 RDD가 위치함.
  • worker들의 메모리에 RDD가 쪼개져서 위치함. 각 RDD들은 서로 연관이 있음.
  • 변경이 불가. Inplace 불가. 작업을 할 때마다 새로운 RDD를 계속 만들게 됨.
  • 데이터 추상화
    • 여러군데에 쪼개져있는 파일을 마치 하나의 파일처럼 사용할 수 있도록 추상화 함.
    • 파일을 하나하나 신경써서 일일이 불러오는게 아니고 tool이 알아서 파악해서 불러옴.
    • 100PB파일을 여러개의 HDFS에 분산되어 저장
    • HDFS에 분산 저장되어 있는 파일은 아래의 명령어로 마치 하나의 파일인 것처럼 불러올 수 있다.
      • sc.textFile(filename)
      • sc.paralleize()
  • 데이터 불변(Immutable) → 장애 복구 가능~
    • RDD는 변환(Transform)을 거치면 기존 RDD가 변경되는 것이 아닌 새로운 RDD가 만들어지게 됨 (immutable)
    • RDD ⇒ 데이터 불변성(immutable)
    • RDD는 변환되는 과정 기록?? = DAG
    • RDD 변환과정은 하나의 비순환 그래프. 만약 RDD3 수행 때 컴에 장애 발생하면 RDD2번으로 다시 돌아갈 수 있다.
      • RDD1 ----(변환)-----> RDD2 ----(변환)-----> RDD3
  • Type Safe 특징
    • 컴파일 중 데이터 타입 판별 가능
    • 스파크 언어 ⇒ JAVA, SCALA, PYTHON
  • 다양한 데이터 구조 지원
    • Unstructured(텍스트), Structured Data(RDBMS,테이블..) 모두 담을 수 있음.
  • 게으른 연산(Lazy Evaluation)
    • 데이터 연산과정 = Transformation + Action
    • 하나의 데이터(대용량)에 대해 많은 연산을 거친다.
    • 그러나 계산 결과가 필요할 때까지 연산을 하지 않고 기다린다.
      • lazy: 연산 최대한 미뤄뒀다가 함.
      • T1, T2 = Transformation 작업등록/ A = Action 한꺼번에 실행게으른 연산(Lazy Evaluation) ↔ Eager Execution(pandas)

2. 데이터 연산방식: 병렬, 분산

https://magpienote.tistory.com/190

 

[Spark]RDD 이해하기

데이터 연산 방식 이해하기 RDD를 들어가기 전에 먼저 데이터 연산 방식을 이해 해야한다. 이를 이해하면 RDD가 왜 나왔는지 Spark에서는 RDD가 왜 사용돼는지 간접적으로 이해 할 수 있다. 데이터

magpienote.tistory.com

 

2-1. 데이터 연산방식:  병렬 vs 분산병렬

  Data-parallel Distributed Data-parallel
작동 방식
  1. 데이터를 여러개로 쪼갬.노드 1개.
  2. 여러 쓰레드에서 각자 task를 수행
  3. 여러 작업물들 합침
스칼라, 파이썬 ⇒ 함수형 프로그래밍이 가능.
worker들이 있는 RDD에서 작업이 일어나고, <task>가 worker들에게 넘어감. 모든 task(작업)은 worker들이 있는 RDD에서 일어남.
  1. 데이터를 여러개로 쪼개서 여러 노드로 보낸다. -> 하나의 컴퓨터가 하나의 조각을 처리
  2. 여러 노드에서 각자 독립적으로 task수행
    1. 프로그램이 worker에게 전달
  3. 각 노드의 task 결과물 합친다.

 

2-2. RDD가 나온 이유 간접적으로 이해하기

  • 데이터 연산 방식에서 병렬처리 보단 분산처리 방식이 빅데이터 처리에서는 빠르다.
  • 분산처리가 속도가 빠른 이유는 노드간 작업을 나누기 때문인데, Spark는 이를 쉽게 구현할 수 있도록 도와주는 툴이다. Spark를 이용한 분산된 환경에서도 일반적인 병렬처리를 하듯 코드를 짜서 분산 처리 환경이 가능하기 때문이다.
  • Spark에서는 분산된 환경에서 데이터 병렬 모델을 구현해 추상화 시켜주기 때문에 속도가 빠르다고 했는데 그것이 바로 Resilient Distributed Datasets(RDD)때문이다.
  • RDD.map(<task>) ⇒ RDD가 <task>를 분산된 환경에서도 병렬처리 할 수 있도록 알아서 추상화해 노드로 작업을 분배하고 합친다.

 

2-3 분산처리에서 신경써야 할 문제

  • 부분실패 & 속도!!!
  • 네트워크 통신 사용 → 속도 저하(네트워크 통신 줄이면서 최적화해야)
SQL Spark
SELECT reduceByKey
(여러 노드 데이터 불러와서 하나로 합치기)
FROM RDD
WHERE filter(조건에 맞는 데이터 찾기)

 

2-4. Key-Value RDD

  • (Key, Value) 쌍을 갖음 = Pairs RDD
  • Key 기준으로 고차원 (집계)연산 가능 = GROUP BY
    • 집계 = 목적에 맞게 데이터 grouping
  • 예: (’Kuwait’,900)
pairs = rdd.map(lambda x : (x,1))

# x = Data 하나
# x를 (x,1)의 튜플형식으로 변형

# RDD     |  Pairs RDD
# --------------------
# 짜장면  | (짜장면,1), 
# 짬뽕	  |	(짬뽕, 1)

 

Key-Value RDD의 Reduction 연산

  • Key를 기준으로 데이터 줄여주는
pairs = rdd.map(lambda x : (x,1))
count = pairs.reduceByKey(lambda a, b : a + b)

# lambda의 a = 누적값, b = 새로운 값
# 누적값 = 지금까지 연산이 되어온 값
# ex) 1,2,3...-> 1단계 => 1+2 = 3, 2단계 => 3(누적값)+3(새로운값) = 6

3. RDD Transformation & Action

3-1. Transformations

spark는 java기반이기때문에 네이밍도 java 따라감.

 

함수

map() Data 변환 후 반환 하나하나의 데이터의 배열이 나옴 Narrow
flatMap() Data 변환 후 반환 평탄화된 배열이 나옴 Narrow
filter() Data 거르기   Narrow
distinct() 중복값 제거   Wide
reduceByKey() Key별 집계   Wide
groupByKey() Key별로 grouping   Wide
mapValues() value만…    
flatMapValues() value에 대해서만 평탄화    
sortByKey() Key기준으로 정렬   Wide

Transformation = Narrow Transformations + Wide Transformations

최적화 목적 = shuffling이 많이 일어나지 않도록

Narrow Transformation Wide Transformation

 

  Narrow Trransformation Wide Transformation
특징 • 1:1 변환. 하나의 데이터만 바뀜
• 내가 지금 속해있는 파티션에서만 변환. 다른 파티션의 데이터는 참고하지 않음
• Shuffling
• 다른 파티션의 데이터 들어갈 수 있음.
• Sort도 wide이다.(여러 파티션 넘나듦)
• 각 파티션 별로 sort하면 shuffling 많이 발생할 수도.
• 그러니까 다른 파티션도 참고해서 해야 함.
함수 filter(), map(), flatMap(), sample(), union() Intersection and join, distinct, cartesian, reduceByKey, groupByKey

 

3-2. Action

왜 T-A 단계로 나눴을까?

  1. Lazy Execution
    • 연산이 지연되므로 디스크, 네트워크의 연산을 최소화 함.
  2. 반복적인 데이터 작업에 의한 비효율그렇다면…디스크에 저장하지 않고 바로 다음 task에 전달한다면? task끼리 메모리 내에서 데이터를 교환한다면? 훨씬 빠르지않을까? like Machine Learning
  3. —> cash(), persist() 통해 메모리에 저장 가능
  4. 작업 끝나면 디스크에 저장 → 불러옴 → 또 저장 → 불러옴 ——> 디스크를 거치면 속도 저하

hdfs → hdd유리, spark같이 쓰면 ssd 유리

storage level

MEMORY_ONLY_SER (serialize=직렬화) , 용량 down

그냥 cache쓰면 웬만한 성능 보장.

cache 남발하면 memory 부족 현상 발생.

'Data Engineering' 카테고리의 다른 글

[Spark] Spark Machine-Learning  (0) 2023.09.15
[Spark] Spark SQL  (0) 2023.09.14
분산 병렬 처리 시스템 (MPP, DFS)  (0) 2023.09.14
[airflow] airflow(ubuntu)-mysql 연결하기?  (0) 2023.09.13
[airflow] airflow 사용하기  (0) 2023.09.11