데이터로그😎

[airflow] airflow(ubuntu)-mysql 연결하기? 본문

Data Engineering

[airflow] airflow(ubuntu)-mysql 연결하기?

지연v'_'v 2023. 9. 13. 17:16

mysql 계정 생성하기

id: air/ pw: 1234  생성해볼것

먼저 ubuntu에서 mysql 계정을 사용하여 아래 명령어를 입력하고 들어간다. 

mysql -u jeeyeon -p  (jeeyeon 이라는 계정으로 패스워드 입력해서 접속할게요)

jeeyeon 자리에 각자의 mysql 계정을 입력하면 된다. (보통 root를 사용한다. 혹은 root에 버금가는 권한을 가진 계정)

(airflow-env) ubuntu@JeeYeon:~$ mysql -u jeeyeon -p
Enter password:
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 19
Server version: 8.0.34-0ubuntu0.22.04.1 (Ubuntu)

Copyright (c) 2000, 2023, Oracle and/or its affiliates.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

 

 

mysql 계정으로 접속 후 local뿐만 아니라 외부에서도 접속 가능한 mysql 계정을 하나 더 만들 것이다.

create user 'air'@'%' identified by '1234'; (%는 외부에서도 접속 가능하단 의미이다. 여기서 당신들이 바꾸어야할 것은 air부분에 새로 생성할 계정명을, by 뒤에는 그 계정의 비밀번호를 입력하라. air와 1234옆에 있는 작은 따옴표('')는 그대로 둘 것.

mysql> create user 'air'@'%' identified by '1234';
Query OK, 0 rows affected (0.08 sec)

mysql> grant all privileges on *.* to 'air'@'%';
Query OK, 0 rows affected (0.01 sec)

mysql> flush privileges
    -> ;
Query OK, 0 rows affected (0.01 sec)
(airflow-env) ubuntu@JeeYeon:~$ mysql -u air -p
Enter password:
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 21
Server version: 8.0.34-0ubuntu0.22.04.1 (Ubuntu)

Copyright (c) 2000, 2023, Oracle and/or its affiliates.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

 

계정을 다 만들었으니, 아래의 명령어를 통해 새로운 database를 만들어보자.

database명:airflow

create table airflow;

 

airflow-mysql 연결하기

air라는 계정을 1234라는 패스워드와 함께 만들었다. ubuntu에서!!

그럼 이제 airflow로 연결하는 법?

airflow를 가상환경에 설치하면 airflow.cfg라는 파일이 생긴다.

그 파일에서 sql_alchemy_conn 에 위의 가상환경에서 만든 id(air)와 비번(1234)를 적고, host와 port도 알맞게 적어야한다.

 

airflow webserver -p 8080을 터미널에 치면 아래와 같은 브라우저를 열 수 있다.

Admin -> connection 에 들어가 새로 생성하기 버튼을 누르고 mysql과의 연결을 생성한다.

schema: airflow (아까 만든 데이터베이스)

login, password에는 ubuntu 상에서 만든 mysql 계정,비번(air, 1234)를 넣는다.

 

 

DAG 올리기

이제 DAG을 만들어서 올려볼 차례.

/home/ubuntu/airflow/dags/infra_pipeline.py 라는 파일에 아래의 코드를 기재했다.

from datetime import datetime
from airflow import DAG
import json
from pandas import json_normalize  # JSON to Pandas
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from airflow.providers.http.sensors.http import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from infra_populationinfra_data import *
import sys
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator




default_args = {'start_date':datetime(2023,1,1)}

with DAG(
    dag_id='clustering_pipeline',
    schedule='@monthly',
    default_args = default_args,
    tags =['kakao','api','pipeline'],
    catchup=False
    ) as dag:
        
    creating_table = SQLExecuteQueryOperator(
        task_id = 'creating_table',
        conn_id = 'mysql_conn',
        params={'schema':'airflow'},
        # sqlite_conn_id='sqlite_db',
        sql = """
            CREATE TABLE IF NOT EXISTS tb_kakao_infra(
                NODE_ID TEXT,
                ARS_ID TEXT,
                정류소명 TEXT,
                X좌표 TEXT,
                Y좌표 TEXT,
                법정동코드 TEXT,
                법정동_구 TEXT,
                법정동 TEXT,
                행정동코드 TEXT,
                행정동 TEXT,
                academy_cnt INT,
                kindergarten_cnt INT,
                mart_cnt INT,
                restaurant_cnt INT,
                school_cnt INT,
                subway_cnt INT,
                tour_cnt INT,
                cafe_cnt INT,
                hospital_cnt INT,
                culture_cnt INT,
                public_office_cnt  INT,
                'employee_cnt' INT, 
                'alone_ratio' FLOAT, 
                'emp_corp_ratio' FLOAT, 
                'population_15to64' INT,
               'RIDE_SUM_6_10' INT, 
               'ALIGHT_SUM_6_10' INT
            )

            """
    )




    # kakao_preprocess = PythonOperator(
    #     task_id="kakao_preprocess", python_callable=fetch_and_process_data # 실행 시킬 함수
    # )

    # # csv를 분해 후 table에 넣어주기
    # store_result = BashOperator(
    #     task_id="store_kakao",
    #     bash_command='echo -e ".separator ","\n.import /home/ubuntu/airflow/dags/tmp/bus_station_XY.csv tb_kakao_infra" | sqlite3 /home/ubuntu/airflow/airflow.db',
    # )


    # # # 파이프라인 구성하기
    # # creating_table >> kakao_preprocess >> store_result

 

그 후 아래를 실행해보면 test를 할 수 있다.

airflow tasks test clustering_pipeline creating_table 2023-01-01

 

그 후 다시 airflow webserver -p 8080을 통해 다시 webserver를 켜보면 아래 사진처럼 dag에 'clustering_pipeline'이 추가된 것을 볼 수 있다. 

 

해당 dag를 클릭하여 graph를 보면 dag 아래에 만들었던 creating_table이라는 task가 등록된 것을 볼 수 있다.