데이터로그😎
[airflow] airflow(ubuntu)-mysql 연결하기? 본문
mysql 계정 생성하기
id: air/ pw: 1234 생성해볼것
먼저 ubuntu에서 mysql 계정을 사용하여 아래 명령어를 입력하고 들어간다.
mysql -u jeeyeon -p (jeeyeon 이라는 계정으로 패스워드 입력해서 접속할게요)
jeeyeon 자리에 각자의 mysql 계정을 입력하면 된다. (보통 root를 사용한다. 혹은 root에 버금가는 권한을 가진 계정)
(airflow-env) ubuntu@JeeYeon:~$ mysql -u jeeyeon -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 19
Server version: 8.0.34-0ubuntu0.22.04.1 (Ubuntu)
Copyright (c) 2000, 2023, Oracle and/or its affiliates.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql 계정으로 접속 후 local뿐만 아니라 외부에서도 접속 가능한 mysql 계정을 하나 더 만들 것이다.
create user 'air'@'%' identified by '1234'; (%는 외부에서도 접속 가능하단 의미이다. 여기서 당신들이 바꾸어야할 것은 air부분에 새로 생성할 계정명을, by 뒤에는 그 계정의 비밀번호를 입력하라. air와 1234옆에 있는 작은 따옴표('')는 그대로 둘 것.
mysql> create user 'air'@'%' identified by '1234';
Query OK, 0 rows affected (0.08 sec)
mysql> grant all privileges on *.* to 'air'@'%';
Query OK, 0 rows affected (0.01 sec)
mysql> flush privileges
-> ;
Query OK, 0 rows affected (0.01 sec)
(airflow-env) ubuntu@JeeYeon:~$ mysql -u air -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 21
Server version: 8.0.34-0ubuntu0.22.04.1 (Ubuntu)
Copyright (c) 2000, 2023, Oracle and/or its affiliates.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
계정을 다 만들었으니, 아래의 명령어를 통해 새로운 database를 만들어보자.
database명:airflow
create table airflow;
airflow-mysql 연결하기
air라는 계정을 1234라는 패스워드와 함께 만들었다. ubuntu에서!!
그럼 이제 airflow로 연결하는 법?
airflow를 가상환경에 설치하면 airflow.cfg라는 파일이 생긴다.
그 파일에서 sql_alchemy_conn 에 위의 가상환경에서 만든 id(air)와 비번(1234)를 적고, host와 port도 알맞게 적어야한다.
airflow webserver -p 8080을 터미널에 치면 아래와 같은 브라우저를 열 수 있다.
Admin -> connection 에 들어가 새로 생성하기 버튼을 누르고 mysql과의 연결을 생성한다.
schema: airflow (아까 만든 데이터베이스)
login, password에는 ubuntu 상에서 만든 mysql 계정,비번(air, 1234)를 넣는다.
DAG 올리기
이제 DAG을 만들어서 올려볼 차례.
/home/ubuntu/airflow/dags/infra_pipeline.py 라는 파일에 아래의 코드를 기재했다.
from datetime import datetime
from airflow import DAG
import json
from pandas import json_normalize # JSON to Pandas
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from infra_populationinfra_data import *
import sys
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
default_args = {'start_date':datetime(2023,1,1)}
with DAG(
dag_id='clustering_pipeline',
schedule='@monthly',
default_args = default_args,
tags =['kakao','api','pipeline'],
catchup=False
) as dag:
creating_table = SQLExecuteQueryOperator(
task_id = 'creating_table',
conn_id = 'mysql_conn',
params={'schema':'airflow'},
# sqlite_conn_id='sqlite_db',
sql = """
CREATE TABLE IF NOT EXISTS tb_kakao_infra(
NODE_ID TEXT,
ARS_ID TEXT,
정류소명 TEXT,
X좌표 TEXT,
Y좌표 TEXT,
법정동코드 TEXT,
법정동_구 TEXT,
법정동 TEXT,
행정동코드 TEXT,
행정동 TEXT,
academy_cnt INT,
kindergarten_cnt INT,
mart_cnt INT,
restaurant_cnt INT,
school_cnt INT,
subway_cnt INT,
tour_cnt INT,
cafe_cnt INT,
hospital_cnt INT,
culture_cnt INT,
public_office_cnt INT,
'employee_cnt' INT,
'alone_ratio' FLOAT,
'emp_corp_ratio' FLOAT,
'population_15to64' INT,
'RIDE_SUM_6_10' INT,
'ALIGHT_SUM_6_10' INT
)
"""
)
# kakao_preprocess = PythonOperator(
# task_id="kakao_preprocess", python_callable=fetch_and_process_data # 실행 시킬 함수
# )
# # csv를 분해 후 table에 넣어주기
# store_result = BashOperator(
# task_id="store_kakao",
# bash_command='echo -e ".separator ","\n.import /home/ubuntu/airflow/dags/tmp/bus_station_XY.csv tb_kakao_infra" | sqlite3 /home/ubuntu/airflow/airflow.db',
# )
# # # 파이프라인 구성하기
# # creating_table >> kakao_preprocess >> store_result
그 후 아래를 실행해보면 test를 할 수 있다.
airflow tasks test clustering_pipeline creating_table 2023-01-01
그 후 다시 airflow webserver -p 8080을 통해 다시 webserver를 켜보면 아래 사진처럼 dag에 'clustering_pipeline'이 추가된 것을 볼 수 있다.
해당 dag를 클릭하여 graph를 보면 dag 아래에 만들었던 creating_table이라는 task가 등록된 것을 볼 수 있다.
'Data Engineering' 카테고리의 다른 글
[Spark] Spark RDD (0) | 2023.09.14 |
---|---|
분산 병렬 처리 시스템 (MPP, DFS) (0) | 2023.09.14 |
[airflow] airflow 사용하기 (0) | 2023.09.11 |
[ubuntu] 로컬 WSL(ubuntu) - vscode 연결하기 (0) | 2023.09.11 |
[ubuntu] 로컬에 ubuntu(linux)를 설치하는 건에 관하여.... (0) | 2023.09.09 |