데이터로그😎
[Spark] Spark SQL 본문
데이터 불러오기
from pyspark.sql import SparkSession
# spark session 생성
spark = SparkSession.builder.master('local').appName('spark-sql').getOrCreate()
movies = [
(1, "어벤져스", "마블", 2012, 4, 26),
(2, "슈퍼맨", "DC", 2013, 6, 13),
(3, "배트맨", "DC", 2008, 8, 6),
(4, "겨울왕국", "디즈니", 2014, 1, 16),
(5, "아이언맨", "마블", 2008, 4, 30)
] # RDD
movie_schema = ["id", "name", "company", "year", "month", "day"]
attendances = [
(1, 13934592., "KR"),
(2, 2182227.,"KR"),
(3, 4226242., "KR"),
(4, 10303058., "KR"),
(5, 4300365., "KR")
]
데이터 프레임 만들기
- createDataFrame
- inferShema 옵션을 활용하면 스파크가 데이터 타입 자동 결정
# 데이터 프레임 생성
df = spark.createDataFrame(data=movies, schema = movie_schema)
# 스키마 확인
df.dtypes
# df = RDD
# 전체 데이터 프레임의 내용을 확인
df.show()
Spark에서 SQL 사용하기
방법1. createOrReplaceTempView
- 데이터프레임을 만든 후 temporary view에 등록해야 spark SQL을 사용할 수 있다.
- createOrReplaceTempView() : 임시 테이블 생성/ 만들어놓은 df를 temporary view로 등록 -> temporary view에서 sql 쿼리 실행
df.createOrReplaceTempView('movies') # movies라는 임시 테이블 생성
df.printSchema()
######################################
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- company: string (nullable = true)
|-- year: long (nullable = true)
|-- month: long (nullable = true)
|-- day: long (nullable = true)
######################################
# movies 테이블에서 영화 이름(name)만 가져오기
query = '''
SELECT name
FROM movies
'''
# query 실행
spark.sql(query).show()
+--------+
| name|
+--------+
|어벤져스|
| 슈퍼맨|
| 배트맨|
|겨울왕국|
|아이언맨|
+--------+
# 2010년 이후에 개봉한 영화 조회
query = '''
SELECT *
FROM movies
WHERE year >= 2010'''
spark.sql(query).show()
# 제목이 ~~ 맨으로 끝나는 영화 정보 조회
query = '''
SELECT *
FROM movies
WHERE name LIKE "%맨"'''
spark.sql(query).show()
# 회사 이름이 '마'로 시작하거나, '나'로 끝나는 영화 중 2010년 이후로 개봉한 영화
query = """
SELECT *
FROM movies
WHERE (company LIKE '마%' OR company LIKE '%니')
AND year >= 2010
"""
spark.sql(query).show()
# 개봉년도 오름차순 정렬
query ="""
SELECT *
FROM movies
ORDER BY year ASC
"""
spark.sql(query).show()
# 회사 별 몇 개의 영화가 있는지 조회. 별칭 alias 적용
query ="""
SELECT company, count(*) as movie_cnt
FROM movies
GROUP BY company
"""
spark.sql(query).show()
방법2. 데이터 프레임 API 사용하기
데이터프레임 API를 사용할 때는 그냥 sql쓰듯이 select, groupBy 이런거 쓰면 됨
생성(Create)변환(Transform)액션(Action)
create | - createDataFrame: 데이터를 사용하여 DataFrame을 생성합니다. - read.csv, read.json, read.parquet 등: 다양한 데이터 소스에서 DataFrame을 생성합니다. - range: 범위 내의 정수로 구성된 DataFrame을 생성합니다. |
transform | - select, selectExpr: 열 선택 및 표현식을 사용하여 열 변환을 수행합니다. - filter, where: 특정 조건을 기반으로 행을 필터링합니다. - groupBy, agg: 그룹화 및 집계 작업을 수행합니다. - join, union, distinct: DataFrame 간의 조인, 합집합, 중복 제거 작업을 수행합니다. - withColumn, drop: 열 추가 및 제거 작업을 수행합니다. |
action | - show, take, head: DataFrame의 일부 또는 전체 내용을 출력합니다. - count, sum, avg: DataFrame의 행 수 또는 열의 합, 평균 등을 계산합니다. - collect: DataFrame의 모든 데이터를 로컬 컬렉션으로 수집합니다. write: DataFrame을 다양한 형식으로 저장합니다. |
# collect를 사용하면 RDD처럼 등장
df.select('*').collect()
df.select('name','company').collect()
df.select(df.name, (df.year-2000).alias("year")).show()
# agg: Aggregation의 약자. 그룹핑 후 데이터를 하나로 합쳐주는 역할
df.agg({'id':'count'}).collect()
# query에서 사용가능한 함수들을 모아놓은 패키지가 존재
from pyspark.sql import functions as F
df.agg(F.min(df.year))
# groupBy
df.groupBy().avg().collect() # 컬럼명이 부여되지 않으면각 컬럼별로 집계를 수행한다.
# 회사별, 월별 영화 개수 정보
df.groupBy([df.company, df.month]).count().collect()
# 회사 별 개봉 월의 평균
df.groupBy('company').agg({'month':'mean'}).collect()
# join
df.join(att_df, 'id').select(df.name, att_df.att).show()
# select, where, orderBy 사용
df.select('name','company','year').where('company=="마블"').orderBy('id').show()
Pandas vs Spark
data import
- pandas
import pandas as pd
**titanic_pdf** = pd.read_csv(filepath, header = 'infer')
titanic_pdf.head()
- spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local').appName('titanic').getOrCreate()
filepath = '/home/ubuntu/working/spark-examples/data/titanic_train.csv'
titanic_sdf = spark.read.csv(filepath, inferSchema=True, header=True)
spark dataframe → pandas dataframe으로 바꾸기
titanic_pdf = titanic_sdf.select('*').toPandas()
pandas의 info() vs spark의 printSchema()Pandas.info() Spark.printSchema()
Pandas.info() | Spark.printSchema() |
<class 'pandas.core.frame.DataFrame'> RangeIndex: 891 entries, 0 to 890 Data columns (total 12 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 PassengerId 891 non-null int64 1 Survived 891 non-null int64 2 Pclass 891 non-null int64 3 Name 891 non-null object 4 Sex 891 non-null object 5 Age 714 non-null float......... dtypes: float64(2), int64(5), object(5) memory usage: 83.7+ KB |
root |-- PassengerId: integer (nullable = true) |-- Survived: integer (nullable = true) |-- Pclass: integer (nullable = true) |-- Name: string (nullable = true) |-- Sex: string (nullable = true) |-- Age: double (nullable = true) |-- SibSp: integer (nullable = true) |-- Parch: integer (nullable = true) |-- Ticket: string (nullable = true) |-- Fare: double (nullable = true) |-- Cabin: string (nullable = true) |-- Embarked: string (nullable = true) |
컬럼명, data type, not null 건수 | 컬럼명, data type *not null 건수를 위해서는 별도의 SQL성 쿼리 작성 필요./ isnull() 이런거 없음 |
- spark에서 null값 확인하기?
from pyspark.sql.functions import count, isnan, when, col
titanic_sdf.select([count(when(col(c).isNull()|isnan(c),c)).alias(c) for c in titanic_sdf.columns])
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
| 0| 0| 0| 0| 0|177| 0| 0| 0| 0| 687| 2|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
pandas의 describe() vs spark의 describe()
# pandas
titanic_pdf.describe()
# spark
titanic_sdf.describe().show()
spark dataframe은 pandas dataframe과 비슷하게 describe()를 통해 모든 컬럼값들의 건수/평균/표준편차/최소값/최대값 등의 정보를 확인할 수 있음. 다만 percentile값(IQR)을 만들지는 않음.
*문자열 형태 데이터도 숫자형으로 만들어 집계
- 숫자형태 데이터의 describe()만 보기?
num_col = [col_name for col_name, dtype in titanic_sdf.dtypes if dtype != 'string' ]
titanic_sdf.select(num_col).describe().show()
shape
titanic_pdf.shape => (891, 12)
titanic_sdf.count() => 891 (행의 개수, 데이터 수)
len(titanic_sdf.columns) => 12 (열의 수)
(titanic_sdf.count(), len(titanic_sdf.columns)) => (891,12)
Spark의 select()
- select() 메소드는 SQL의 SELECT 절과 유사하게 한개 이상의 컬럼들의 값을 DataFrame형태로 반환
- 한개의 컬럼명, 또는 여러개의 컬럼명을 인자로 입력할 수 있음.
- 개별 컬럼명을 문자열 형태로 또는 DataFrame의 컬럼 속성으로 지정.
- DataFrame의 컬럼 속성으로 지정시에는 DataFrame, 컬럼명, DataFrame[컬럼명], col(컬럼명)으로 지정 가능.
# column 내용 조회
# pandas
titanic_pdf['Name']
titanic_pdf[['Name','Fare']]
# spark
titanic_sdf.select('Name').show()
titanic_sdf.select('Name','Fare').show()
from pyspark.sql.functions import col
titanic_sdf.select(col('Name'),col('Fare')).show()
# Name만 빼고 가져오고 싶을 때
select_columns = titanic_sdf.columns
select_columns.remove('Name')
- list 사용하거나 list를 그대로 사용해서 데이터를 조회
- list : list 내의 모든 원소를 unpack 한다. 스파크 데이터프레임에서는 리스트가 아닌, 컬럼의 목록이 그대로 들어가는 것이 원칙!
- select ('A','B','C') 처럼 선택하는 것이 원칙! 그런데 버전 업이 되면서 select(['A','B','C']) 처럼 리스트가 그대로 들어갈 수 있게 됨.
titanic_sdf.select(*select_columns).show()
titanic_sdf.select(titanic_sdf.Fare, titanic_sdf['Fare']*100).show()
**from pyspark.sql.functions import upper, lower,col**
# 1. 브라켓을 활용해서 함수 연산 적용
titanic_sdf.select('Name',upper(titanic_sdf.Name)).show()
# 2. .을 활용해서 함수 연산 적용
titanic_sdf.select('Name', upper(titanic_sdf['Name'])).show()
# 3. col을 활용해서 함수 연산 적용
titanic_sdf.select('Name', upper(col('Name')).alias('Capped Name')).show()
Spark의 filter() 메소드
- filter()는 SQL의 where와 유사하게 DataFrame 내의 특정 조건을 만족하는 레코드를 DataFrame으로 반환.
- filter()내의 조건 컬럼은 컬럼 속성으로 지정 가능. 조건문 자체는 SQL과 유사한 문자열로 지정할 수 있음(조건 컬럼은 문자열 지정이 안됨.)
- filter() = where()
- where() 메소드는 filter()의 alias이며 SQL where와 직관적인 동일성을 갖추기 위해 생성.
- 복합 조건 and는 & 로, or를 |로 사용. 개별조건은 ()로 감싸야 함.
titanic_sdf.filter(col('Embarked') == 'Q').show()
titanic_sdf.where(col('Embarked') == 'Q').show()
titanic_sdf.filter("Embarked == 'Q'").show()
titanic_sdf.filter((col('Embarked')=='S')&(col('Pclass')==1)).show()
titanic_sdf.filter((col('Embarked')=='S')|(col('Pclass')==1)).show()
# 이름에 Miss가 들어가는 사람의 정보 가져오기
titanic_sdf.filter( col('Name').like('%Miss%')).show()
# 이름이 H로 시작하는 사람
titanic_sdf.filter( upper(col('Name')).like('H%')).show()
titanic_sdf.filter("Name like '%Mr%'").show()
titanic_sdf.filter("lower(Name) like '%mr%'").show() # 별도의 pyspark.sql.functions 호출이 없이 사용이 가능.
titanic_sdf.filter(upper(col('Name')).like('%MISS%')).select('Pclass','Embarked','Name').show()
Spark의 orderBy()
- 한 개 이상의 컬럼을 기준으로 정렬하는 기능
- Transformation 작업
- pandas
# pandas 정렬
# name 컬럼 기준으로 오름차순 정렬
titanic_pdf_sorted_1 = titanic_pdf.**sort_values**(by='Name',ascending=True)
# name 컬럼 오름차순, age 내림차순
titanic_pdf_sorted_2 = titanic_pdf.sort_values(by=['Name','Age'], ascending=[True,False])
- spark
from pyspark.sql.functions import col
# orderBy
titanic_sdf.**orderBy**(col('Pclass'), ascending=False).show()
titanic_sdf.orderBy([col('Pclass'),col('Age')],ascending=False).show(5)
titanic_sdf.orderBy([col('Pclass'), col('Age')],ascending=[True,False]).show(5)
# sort
titanic_sdf.sort(col('Pclass').asc(), col('Age').desc()).show()
Agg , groupBy()
- Spark DataFrame은 DataFrame 객체에서 aggregation 메소드가 별로 없다.
- Aggregation 메소드 적용 시에는 pyspark.sql.functions 에 있는 min, max, sum 등의 함수를 사용하자
- agg는 조금 더 간단한 연산을 할 때 ( count, minmax, )
- groupby 는 조금 더 고차원적인 집계
- Agg
# pandas 의 aggregation
titanic_pdf.count()
titanic_pdf[['Pclass','Age']].max()
# spark의 agg
titanic_sdf.count() # spark의 count() = data 행의 개수
**from pyspark.sql.functions import max, min, sum**
titanic_sdf_max = titanic_sdf.select(max('Age'))
titanic_sdf_max.show()
+--------+
|max(Age)|
+--------+
| 80.0|
+--------+
- groupBy()
# pandas의 groupBy
titanic_pdf_groupby = titanic_pdf.groupby(by='Pclass').count()
# pandas의 groupby 후 agg
agg_format = {'Age':'max',
'SibSp':'sum',
'Fare':'mean'}
titanic_pdf_groupby.agg(agg_format)
--------------------------------------------
Age 355.0
SibSp 891.0
Fare 297.0
dtype: float64
# pandas 의 value_counts()
titanic_pdf['Pclass'].value_counts()
# Spark Dataframe의 groupBy = pandas의 value_counts와 동일
titanic_sdf.groupBy('Pclass').count().show() # select pclass, count(*) from titanic_sdf group by pclass
titanic_sdf.groupBy('Pclass').count().orderBy('count',ascending=False).show()
titanic_sdf.groupBy('Pclass').max().show()
titanic_sdf.groupBy('Pclass').max('Age').show()
titanic_sdf.groupBy(['Pclass','Embarked']).max('Age').show()
********집계함수 안에 col쓰면 오류남.**************
titanic_sdf.groupby('Pclass').max(col('Age')).show()
from pyspark.sql.functions import avg,sum,max,min
# select max(age), min(age), sum(age), avg(age) from titanic_sdf group by pclass
titanic_sdf.groupBy('Pclass').agg(
max('Age').alias('age_max'),
min('Age'),
sum('Age'),
avg('Age')
).show()
titanic_sdf.groupBy(['Pclass']).agg(
max('Age').alias('max_age'),
min('Age').alias('min_age'),
sum('Age').alias('sum_age'),
avg('Age').alias('avg_age')).filter(col('max_age')>70).show()
컬럼 삽입
- withColumn(): 기존 컬럼 값을 수정, 타입 변경, 신규 컬럼 추가
- withColumn('신규 또는 업데이트 되는 **컬럼명**','신규 또는 업데이트되는 값')
- 주의사항 : 신규 또는 업데이트 되는 값을 생성 시에 기존 컬럼 값을 기반으로 한다면, 신규 컬럼은 문자열, 기존 컬럼은 반드시 컬럼객체(col('컬럼명'))을 사용
- 신규 컬럼을 추가하는 것은 select 로도 가능하다
- withColumnRename() : 컬럼명 변경은
pandas
# pandas
titanic_pdf_copy = titanic_pdf.copy()
titanic_pdf_copy['Extra_Fare'] = titanic_pdf_copy['Fare']*10
titanic_pdf_copy['Extra_Fare'] = 10
spark: withColumn()
from pyspark.sql.functions import col
titanic_sdf_copy = titanic_sdf.select('*')
titanic_sdf_copy = titanic_sdf_copy.withColumn('Extra_Fare', col('Fare')*10)
titanic_sdf_copy = titanic_sdf_copy.withColumn('Extra_Fare',10)
# --> 이렇겐 안됨!!!!!
# --> 업데이트 할 값은 반드시 컬럼형이 되어야 한다. (ex. col('Fare')*10)
# 상숫값으로 업데이트 하려면 반드시 lit 함수를 적용할 것
from pyspark.sql.functions import lit
titanic_sdf_copy = titanic_sdf_copy.withColumn('Extra_Fare', lit(10))
# Name 컬럼을 ','로 분리하여 Name1, Name2 생성
titanic_sdf_copy = titanic_sdf_copy.withColumn('first_name',split(col('Name'),',').getItem(0))
titanic_sdf_copy = titanic_sdf_copy.withColumn('last_name',split(col('Name'),',').getItem(1))
spark: select()
# select() 메소드를 이용해서 컬럼을 추가.
# 이 때 SQL의 substring 메소드 이용해 문자열의 일부를 추출하여 신규 컬럼생성.
from pyspark.sql.functions import substring
titanic_sdf_copy = titanic_sdf_copy.select('*',col('Embarked').alias('E'))
titanic_sdf_copy = titanic_sdf_copy.select('*',substring('Cabin',0,1).alias('Cabin_section'))
컬럼 데이터 수정
# pandas
titanic_pdf_copy['Fare'] = titanic_pdf_copy['Fare']+20
titanic_pdf_copy['Fare'] = titanic_pdf_copy['Fare'].astype(np.int64)
# spark
titanic_sdf_copy = titanic_sdf_copy.withColumn('Fare',col('Fare')+20)
titanic_sdf_copy = titanic_sdf_copy.withColumn('Fare', col('Fare').cast('integer'))
컬럼명 변경
titanic_sdf_copy = titanic_sdf_copy.withColumnRenamed('탑승지','탑탑승지')
titanic_sdf_copy.show(5)
titanic_sdf_copy = titanic_sdf_copy.withColumnRenamed('없는컬럼 넣어도','오류 안나는게 문제')
titanic_sdf_copy.show(5)
'Data Engineering' 카테고리의 다른 글
[Spark] Spark SQL에서 날짜 형식 중 년, 월, 일 따로 추출하는 법! (0) | 2023.09.15 |
---|---|
[Spark] Spark Machine-Learning (0) | 2023.09.15 |
[Spark] Spark RDD (0) | 2023.09.14 |
분산 병렬 처리 시스템 (MPP, DFS) (0) | 2023.09.14 |
[airflow] airflow(ubuntu)-mysql 연결하기? (0) | 2023.09.13 |