데이터로그😎

[Spark] Spark SQL 본문

Data Engineering

[Spark] Spark SQL

지연v'_'v 2023. 9. 14. 23:42

데이터 불러오기 

from pyspark.sql import SparkSession

# spark session 생성
spark = SparkSession.builder.master('local').appName('spark-sql').getOrCreate()

movies = [
    (1, "어벤져스", "마블", 2012, 4, 26),
    (2, "슈퍼맨", "DC", 2013, 6, 13),
    (3, "배트맨", "DC", 2008, 8, 6),
    (4, "겨울왕국", "디즈니", 2014, 1, 16),
    (5, "아이언맨", "마블", 2008, 4, 30)
] # RDD

movie_schema = ["id", "name", "company", "year", "month", "day"]


attendances = [
    (1, 13934592., "KR"),
    (2, 2182227.,"KR"),
    (3, 4226242., "KR"),
    (4, 10303058., "KR"),
    (5, 4300365., "KR")
]

 

데이터 프레임 만들기 

- createDataFrame

- inferShema 옵션을 활용하면 스파크가 데이터 타입 자동 결정

# 데이터 프레임 생성
df = spark.createDataFrame(data=movies, schema = movie_schema)

# 스키마 확인
df.dtypes

# df = RDD

# 전체 데이터 프레임의 내용을 확인
df.show()

 

Spark에서 SQL 사용하기

방법1. createOrReplaceTempView

 

  • 데이터프레임을 만든 후 temporary view에 등록해야 spark SQL을 사용할 수 있다.
  • createOrReplaceTempView() : 임시 테이블 생성/ 만들어놓은 df를 temporary view로 등록 -> temporary view에서 sql 쿼리 실행

 

df.createOrReplaceTempView('movies') # movies라는 임시 테이블 생성

df.printSchema()
######################################
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- company: string (nullable = true)
 |-- year: long (nullable = true)
 |-- month: long (nullable = true)
 |-- day: long (nullable = true)
######################################
# movies 테이블에서 영화 이름(name)만 가져오기
query = '''
        SELECT name 
        FROM movies
'''

# query 실행
spark.sql(query).show()

+--------+
|    name|
+--------+
|어벤져스|
|  슈퍼맨|
|  배트맨|
|겨울왕국|
|아이언맨|
+--------+


# 2010년 이후에 개봉한 영화 조회
query = '''
SELECT * 
FROM movies
WHERE year >= 2010'''

spark.sql(query).show()

# 제목이 ~~ 맨으로 끝나는 영화 정보 조회
query = '''
SELECT * 
FROM movies
WHERE name LIKE "%맨"'''

spark.sql(query).show()


# 회사 이름이 '마'로 시작하거나, '나'로 끝나는 영화 중 2010년 이후로 개봉한 영화
query = """
SELECT *
FROM movies
WHERE (company LIKE '마%' OR company LIKE '%니')
AND year >= 2010
"""

spark.sql(query).show()


# 개봉년도 오름차순 정렬
query ="""
SELECT *
FROM movies
ORDER BY year ASC
"""

spark.sql(query).show()


# 회사 별 몇 개의 영화가 있는지 조회. 별칭 alias 적용
query ="""
SELECT company, count(*) as movie_cnt
FROM movies
GROUP BY company
"""

spark.sql(query).show()

 

방법2. 데이터 프레임 API 사용하기

데이터프레임 API를 사용할 때는 그냥 sql쓰듯이 select, groupBy 이런거 쓰면 됨

생성(Create)변환(Transform)액션(Action)

create - createDataFrame: 데이터를 사용하여 DataFrame을 생성합니다.
- read.csv, read.json, read.parquet 등: 다양한 데이터 소스에서 DataFrame을 생성합니다.
- range: 범위 내의 정수로 구성된 DataFrame을 생성합니다.
transform - select, selectExpr: 열 선택 및 표현식을 사용하여 열 변환을 수행합니다.
- filter, where: 특정 조건을 기반으로 행을 필터링합니다.
- groupBy, agg: 그룹화 및 집계 작업을 수행합니다.
- join, union, distinct: DataFrame 간의 조인, 합집합, 중복 제거 작업을 수행합니다.
- withColumn, drop: 열 추가 및 제거 작업을 수행합니다.
action - show, take, head: DataFrame의 일부 또는 전체 내용을 출력합니다.
- count, sum, avg: DataFrame의 행 수 또는 열의 합, 평균 등을 계산합니다.
- collect: DataFrame의 모든 데이터를 로컬 컬렉션으로 수집합니다. write: DataFrame을 다양한 형식으로 저장합니다.

 

# collect를 사용하면 RDD처럼 등장
df.select('*').collect()
df.select('name','company').collect()
df.select(df.name, (df.year-2000).alias("year")).show()

# agg: Aggregation의 약자. 그룹핑 후 데이터를 하나로 합쳐주는 역할
df.agg({'id':'count'}).collect()

# query에서 사용가능한 함수들을 모아놓은 패키지가 존재
from pyspark.sql import functions as F

df.agg(F.min(df.year))

# groupBy
df.groupBy().avg().collect() # 컬럼명이 부여되지 않으면각 컬럼별로 집계를 수행한다.

# 회사별, 월별 영화 개수 정보
df.groupBy([df.company, df.month]).count().collect()

# 회사 별 개봉 월의 평균
df.groupBy('company').agg({'month':'mean'}).collect()


# join
df.join(att_df, 'id').select(df.name, att_df.att).show()

# select, where, orderBy 사용
df.select('name','company','year').where('company=="마블"').orderBy('id').show()

 

 

Pandas vs Spark

data import

  • pandas
import pandas as pd

**titanic_pdf** = pd.read_csv(filepath, header = 'infer')
titanic_pdf.head()
  • spark
from pyspark.sql import SparkSession 
spark = SparkSession.builder.master('local').appName('titanic').getOrCreate() 
filepath = '/home/ubuntu/working/spark-examples/data/titanic_train.csv' 
titanic_sdf = spark.read.csv(filepath, inferSchema=True, header=True)

 

spark dataframe → pandas dataframe으로 바꾸기

titanic_pdf = titanic_sdf.select('*').toPandas()

 

pandas의 info() vs spark의 printSchema()Pandas.info() Spark.printSchema()

Pandas.info() Spark.printSchema()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 891 entries, 0 to 890
Data columns (total 12 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 PassengerId 891 non-null int64
1 Survived 891 non-null int64
2 Pclass 891 non-null int64
3 Name 891 non-null object
4 Sex 891 non-null object 5 Age 714 non-null float.........
dtypes: float64(2), int64(5), object(5) memory usage: 83.7+ KB
root
|-- PassengerId: integer (nullable = true)
|-- Survived: integer (nullable = true)
|-- Pclass: integer (nullable = true)
|-- Name: string (nullable = true)
|-- Sex: string (nullable = true)
|-- Age: double (nullable = true)
|-- SibSp: integer (nullable = true)
|-- Parch: integer (nullable = true)
|-- Ticket: string (nullable = true)
|-- Fare: double (nullable = true)

|-- Cabin: string (nullable = true) |-- Embarked: string (nullable = true)
컬럼명, data type, not null 건수 컬럼명, data type
*not null 건수를 위해서는 별도의 SQL성 쿼리 작성 필요./ isnull() 이런거 없음
  • spark에서 null값 확인하기?
from pyspark.sql.functions import count, isnan, when, col

titanic_sdf.select([count(when(col(c).isNull()|isnan(c),c)).alias(c) for c in titanic_sdf.columns])

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|177|    0|    0|     0|   0|  687|       2|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+

 

pandas의 describe() vs spark의 describe()

# pandas
titanic_pdf.describe()

# spark
titanic_sdf.describe().show()

spark dataframe은 pandas dataframe과 비슷하게 describe()를 통해 모든 컬럼값들의 건수/평균/표준편차/최소값/최대값 등의 정보를 확인할 수 있음. 다만 percentile값(IQR)을 만들지는 않음.

*문자열 형태 데이터도 숫자형으로 만들어 집계

 

  • 숫자형태 데이터의 describe()만 보기?
num_col = [col_name for col_name, dtype in titanic_sdf.dtypes if dtype != 'string' ]
titanic_sdf.select(num_col).describe().show()

shape

titanic_pdf.shape => (891, 12)

titanic_sdf.count() => 891 (행의 개수, 데이터 수)
len(titanic_sdf.columns) => 12 (열의 수)

(titanic_sdf.count(), len(titanic_sdf.columns)) => (891,12)

 

Spark의 select()

  • select() 메소드는 SQL의 SELECT 절과 유사하게 한개 이상의 컬럼들의 값을 DataFrame형태로 반환
  • 한개의 컬럼명, 또는 여러개의 컬럼명을 인자로 입력할 수 있음.
  • 개별 컬럼명을 문자열 형태로 또는 DataFrame의 컬럼 속성으로 지정.
  • DataFrame의 컬럼 속성으로 지정시에는 DataFrame, 컬럼명, DataFrame[컬럼명], col(컬럼명)으로 지정 가능.
# column 내용 조회

# pandas
titanic_pdf['Name']
titanic_pdf[['Name','Fare']]

# spark
titanic_sdf.select('Name').show() 
titanic_sdf.select('Name','Fare').show()

from pyspark.sql.functions import col
titanic_sdf.select(col('Name'),col('Fare')).show()


# Name만 빼고 가져오고 싶을 때

select_columns = titanic_sdf.columns
select_columns.remove('Name')

 

  • list 사용하거나 list를 그대로 사용해서 데이터를 조회
    • list : list 내의 모든 원소를 unpack 한다. 스파크 데이터프레임에서는 리스트가 아닌, 컬럼의 목록이 그대로 들어가는 것이 원칙!
    • select ('A','B','C') 처럼 선택하는 것이 원칙! 그런데 버전 업이 되면서 select(['A','B','C']) 처럼 리스트가 그대로 들어갈 수 있게 됨.
titanic_sdf.select(*select_columns).show()

titanic_sdf.select(titanic_sdf.Fare, titanic_sdf['Fare']*100).show()
**from pyspark.sql.functions import upper, lower,col**

# 1. 브라켓을 활용해서 함수 연산 적용
titanic_sdf.select('Name',upper(titanic_sdf.Name)).show()

# 2. .을 활용해서 함수 연산 적용
titanic_sdf.select('Name', upper(titanic_sdf['Name'])).show()

# 3. col을 활용해서 함수 연산 적용
titanic_sdf.select('Name', upper(col('Name')).alias('Capped Name')).show()

 

 

Spark의 filter() 메소드

  • filter()는 SQL의 where와 유사하게 DataFrame 내의 특정 조건을 만족하는 레코드를 DataFrame으로 반환.
  • filter()내의 조건 컬럼은 컬럼 속성으로 지정 가능. 조건문 자체는 SQL과 유사한 문자열로 지정할 수 있음(조건 컬럼은 문자열 지정이 안됨.)
  • filter() = where()
  • where() 메소드는 filter()의 alias이며 SQL where와 직관적인 동일성을 갖추기 위해 생성.
  • 복합 조건 and는 & 로, or를 |로 사용. 개별조건은 ()로 감싸야 함.
titanic_sdf.filter(col('Embarked') == 'Q').show()
titanic_sdf.where(col('Embarked') == 'Q').show()
titanic_sdf.filter("Embarked == 'Q'").show()

titanic_sdf.filter((col('Embarked')=='S')&(col('Pclass')==1)).show()
titanic_sdf.filter((col('Embarked')=='S')|(col('Pclass')==1)).show()

# 이름에 Miss가 들어가는 사람의 정보 가져오기
titanic_sdf.filter( col('Name').like('%Miss%')).show()
# 이름이 H로 시작하는 사람
titanic_sdf.filter( upper(col('Name')).like('H%')).show()

titanic_sdf.filter("Name like '%Mr%'").show()
titanic_sdf.filter("lower(Name) like '%mr%'").show() # 별도의 pyspark.sql.functions 호출이 없이 사용이 가능.
titanic_sdf.filter(upper(col('Name')).like('%MISS%')).select('Pclass','Embarked','Name').show()

 

Spark의 orderBy()

  • 한 개 이상의 컬럼을 기준으로 정렬하는 기능
  • Transformation 작업
  • pandas
# pandas 정렬

# name 컬럼 기준으로 오름차순 정렬
titanic_pdf_sorted_1 = titanic_pdf.**sort_values**(by='Name',ascending=True)

# name 컬럼 오름차순, age 내림차순
titanic_pdf_sorted_2 = titanic_pdf.sort_values(by=['Name','Age'], ascending=[True,False])

  • spark
from pyspark.sql.functions import col

# orderBy
titanic_sdf.**orderBy**(col('Pclass'), ascending=False).show()

titanic_sdf.orderBy([col('Pclass'),col('Age')],ascending=False).show(5)
titanic_sdf.orderBy([col('Pclass'), col('Age')],ascending=[True,False]).show(5)

# sort
titanic_sdf.sort(col('Pclass').asc(), col('Age').desc()).show()

 

Agg , groupBy()

  • Spark DataFrame은 DataFrame 객체에서 aggregation 메소드가 별로 없다.
  • Aggregation 메소드 적용 시에는 pyspark.sql.functions 에 있는 min, max, sum 등의 함수를 사용하자
  • agg는 조금 더 간단한 연산을 할 때 ( count, minmax, )
  • groupby 는 조금 더 고차원적인 집계

 

  • Agg
# pandas 의 aggregation
titanic_pdf.count()

titanic_pdf[['Pclass','Age']].max()
# spark의 agg
titanic_sdf.count() # spark의 count() = data 행의 개수

**from pyspark.sql.functions import max, min, sum**

titanic_sdf_max = titanic_sdf.select(max('Age'))
titanic_sdf_max.show()

+--------+
|max(Age)|
+--------+
|    80.0|
+--------+
  • groupBy()
# pandas의 groupBy
titanic_pdf_groupby = titanic_pdf.groupby(by='Pclass').count()

# pandas의 groupby 후 agg
agg_format = {'Age':'max',
				'SibSp':'sum',
				'Fare':'mean'}

titanic_pdf_groupby.agg(agg_format)
--------------------------------------------
Age      355.0
SibSp    891.0
Fare     297.0
dtype: float64
# pandas 의 value_counts()
titanic_pdf['Pclass'].value_counts()

# Spark Dataframe의 groupBy = pandas의 value_counts와 동일
titanic_sdf.groupBy('Pclass').count().show() # select pclass, count(*) from titanic_sdf group by pclass

titanic_sdf.groupBy('Pclass').count().orderBy('count',ascending=False).show()

titanic_sdf.groupBy('Pclass').max().show()
titanic_sdf.groupBy('Pclass').max('Age').show()
titanic_sdf.groupBy(['Pclass','Embarked']).max('Age').show()



********집계함수 안에 col쓰면 오류남.**************
titanic_sdf.groupby('Pclass').max(col('Age')).show()
from pyspark.sql.functions import avg,sum,max,min

# select max(age), min(age), sum(age), avg(age) from titanic_sdf group by pclass
titanic_sdf.groupBy('Pclass').agg(
    max('Age').alias('age_max'),
    min('Age'),
    sum('Age'),
    avg('Age')
).show()


titanic_sdf.groupBy(['Pclass']).agg(
                max('Age').alias('max_age'),
                min('Age').alias('min_age'),
                sum('Age').alias('sum_age'),
                avg('Age').alias('avg_age')).filter(col('max_age')>70).show()

 

 

 

컬럼 삽입

  • withColumn(): 기존 컬럼 값을 수정, 타입 변경, 신규 컬럼 추가
    • withColumn('신규 또는 업데이트 되는 **컬럼명**','신규 또는 업데이트되는 값')
    • 주의사항 : 신규 또는 업데이트 되는 값을 생성 시에 기존 컬럼 값을 기반으로 한다면, 신규 컬럼은 문자열, 기존 컬럼은 반드시 컬럼객체(col('컬럼명'))을 사용
    • 신규 컬럼을 추가하는 것은 select 로도 가능하다
  •  withColumnRename() : 컬럼명 변경은 

pandas

# pandas
titanic_pdf_copy = titanic_pdf.copy()
titanic_pdf_copy['Extra_Fare'] = titanic_pdf_copy['Fare']*10
titanic_pdf_copy['Extra_Fare'] = 10

 

spark: withColumn()

from pyspark.sql.functions import col

titanic_sdf_copy = titanic_sdf.select('*')
titanic_sdf_copy = titanic_sdf_copy.withColumn('Extra_Fare', col('Fare')*10)
titanic_sdf_copy = titanic_sdf_copy.withColumn('Extra_Fare',10) 
# --> 이렇겐 안됨!!!!!
# --> 업데이트 할 값은 반드시 컬럼형이 되어야 한다. (ex. col('Fare')*10)
                          
# 상숫값으로 업데이트 하려면 반드시 lit 함수를 적용할 것
from pyspark.sql.functions import lit
titanic_sdf_copy = titanic_sdf_copy.withColumn('Extra_Fare', lit(10))

# Name 컬럼을 ','로 분리하여 Name1, Name2 생성
titanic_sdf_copy = titanic_sdf_copy.withColumn('first_name',split(col('Name'),',').getItem(0))
titanic_sdf_copy = titanic_sdf_copy.withColumn('last_name',split(col('Name'),',').getItem(1))

spark: select()

# select() 메소드를 이용해서 컬럼을 추가. 
# 이 때 SQL의 substring 메소드 이용해 문자열의 일부를 추출하여 신규 컬럼생성.
from pyspark.sql.functions import substring

titanic_sdf_copy = titanic_sdf_copy.select('*',col('Embarked').alias('E'))
titanic_sdf_copy = titanic_sdf_copy.select('*',substring('Cabin',0,1).alias('Cabin_section'))

컬럼 데이터 수정

# pandas
titanic_pdf_copy['Fare'] = titanic_pdf_copy['Fare']+20
titanic_pdf_copy['Fare'] = titanic_pdf_copy['Fare'].astype(np.int64)

# spark
titanic_sdf_copy = titanic_sdf_copy.withColumn('Fare',col('Fare')+20)
titanic_sdf_copy = titanic_sdf_copy.withColumn('Fare', col('Fare').cast('integer'))

컬럼명 변경

titanic_sdf_copy = titanic_sdf_copy.withColumnRenamed('탑승지','탑탑승지')
titanic_sdf_copy.show(5)

titanic_sdf_copy = titanic_sdf_copy.withColumnRenamed('없는컬럼 넣어도','오류 안나는게 문제')
titanic_sdf_copy.show(5)