본문 바로가기
[업무 지식]/Crawling

[ETL] ETL(Extract, Transform, Load) 프로세스 스케줄링

by 에디터 윤슬 2025. 1. 3.

모듈 호출

https://yoonsll.tistory.com/280

 

[ETL] Weather, MySQL 클래스를 객체화

패키지 불러오기from etl_module.connectors.weather_api import WeatherApiClient # 별도 생성한 클래스. 하단 etl_module 폴더에 코드 작성from etl_module.connectors.mysql import MySqlClient # 별도 생성한 클래스. 하단 etl_module

yoonsll.tistory.com

  • 위 링크 이어서 진행
from dotenv import load_dotenv
import os
from datetime import datetime
from etl_module.connectors.weather_api import WeatherApiClient
from etl_module.connectors.mysql import MySqlClient
from etl_module.assets.weather import extract_weather, transform_weather, load_weather
from loguru import logger
import schedule
import yaml
import time

	•`dotenv`: `.env` 파일에서 환경 변수를 로드합니다.
	•`os`: 환경 변수 접근 및 파일 경로 처리에 사용됩니다.
	•`datetime`: 현재 날짜와 시간을 가져와 로그 파일 이름을 생성합니다.
	•`loguru`: 로그 기록을 관리하는 라이브러리입니다.
	•`schedule`: 주기적으로 ETL 작업을 실행하기 위한 스케줄링 라이브러리입니다.
	•`yaml`: 설정 파일(.yaml)을 읽어옵니다.
	•사용자 정의 모듈:
		•`WeatherApiClient`: Weather API와 통신하는 객체.
		•`MySqlClient`: MySQL 데이터베이스와 통신하는 객체.
		•`extract_weather`, `transform_weather`, `load_weather`: 각각 ETL의 추출, 변환, 적재 단계를 담당합니다.

 

 

ETL_module: Connectors 폴더

 

  • mysql.py
from sqlalchemy import create_engine, MetaData, Table, MetaData, Column
from sqlalchemy import text
from sqlalchemy.engine import URL
import pandas as pd


class MySqlClient:
    """
    MySQL 데이터베이스와 상호작용하기 위한 클라이언트 클래스입니다.

    이 클래스는 SQLAlchemy를 사용하여 MySQL 데이터베이스에 연결하고 테이블을 생성, 삭제, 삽입,
    업서트(upsert)와 같은 작업을 지원합니다.
    """

    def __init__(
        self,
        server_name: str,
        database_name: str,
        username: str,
        password: str,
        port: int = 3306,
    ):
        # 데이터베이스 연결을 위한 초기 설정
        self.host_name = server_name
        self.database_name = database_name
        self.username = username
        self.password = password
        self.port = port

        # MySQL 연결 URL 생성
        connection_url = URL.create(
            drivername="mysql+pymysql",
            username=username,
            password=password,
            host=server_name,
            port=port,
            database=database_name,
        )

        # SQLAlchemy 엔진 생성
        self.engine = create_engine(connection_url)

    def create_table(self, metadata: MetaData) -> None:
        """
        주어진 메타데이터 객체를 기반으로 테이블을 생성합니다.

        Parameters:
        - metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
        """
        metadata.create_all(self.engine)

    def drop_table(self, table: Table) -> None:
        """
        지정된 테이블을 삭제합니다. 테이블이 존재하지 않으면 무시합니다.

        Parameters:
        - table_name (str): 삭제할 테이블의 이름.
        """
        with self.engine.connect() as connection:
            connection.execute(text(f"DROP TABLE IF EXISTS {table.name}"))

    def insert(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
        """
        주어진 DataFrame을 테이블에 삽입합니다. 테이블이 존재하지 않으면 생성합니다.

        Parameters:
        - df (pd.DataFrame): 삽입할 데이터를 포함하는 Pandas DataFrame.
        - table (Table): 데이터 삽입을 위한 SQLAlchemy Table 객체.
        - metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
        """
        self.create_table(metadata=metadata)
        df.to_sql(name=table.name, con=self.engine, if_exists="append", index=False)

    def upsert(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
        """
        주어진 DataFrame 데이터를 테이블에 삽입하고, 기존 레코드가 있으면 갱신합니다.

        Parameters:
        - df (pd.DataFrame): 삽입 또는 갱신할 데이터를 포함하는 Pandas DataFrame.
        - table (Table): 업서트 작업을 수행할 SQLAlchemy Table 객체.
        - metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
        """
        self.create_table(metadata=metadata)

        # 데이터프레임을 레코드(딕셔너리 목록)으로 변환
        data = df.to_dict(orient="records")

        # 테이블의 고유 키(Primary Key) 추출
        key_columns = [
            pk_column.name for pk_column in table.primary_key.columns.values()
        ]
        key_values = [tuple(row[pk] for pk in key_columns) for row in data]
        delete_values = ", ".join(
            [f"({', '.join(map(repr, values))})" for values in key_values]
        )

        with self.engine.connect() as connection:
            if key_values:
                delete_sql = f"""
                    DELETE FROM {self.database_name}.{table.name}
                    WHERE ({', '.join(key_columns)}) IN (
                        {delete_values}
                    )
                """
                connection.execute(text(delete_sql))
                connection.commit()  # DELETE 문 실행

        # 변환된 데이터프레임을 테이블에 추가 (INSERT)
        df.to_sql(name=table.name, con=self.engine, if_exists="append", index=False)

    def overwrite(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
        """
        주어진 DataFrame 데이터를 테이블의 기존 데이터를 모두 대체하도록 삽입합니다.

        Parameters:
        - df (pd.DataFrame): 테이블의 기존 데이터를 대체할 데이터를 포함하는 Pandas DataFrame.
        - table (Table): 데이터를 대체할 SQLAlchemy Table 객체.
        - metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
        """
        # 테이블이 존재하지 않으면 생성
        self.create_table(metadata=metadata)

        # 기존 테이블 데이터 삭제
        self.drop_table(table=table)

        # 새로운 데이터를 테이블에 삽입
        self.insert(df=df, table=table, metadata=metadata)

 

  • 생성자('__init__')
# MySQL 데이터베이스 연결을 초기화합니다.

def __init__(
    self,
    server_name: str,
    database_name: str,
    username: str,
    password: str,
    port: int = 3306,
):
    self.host_name = server_name
    self.database_name = database_name
    self.username = username
    self.password = password
    self.port = port

    # MySQL 연결 URL 생성
    connection_url = URL.create(
        drivername="mysql+pymysql",
        username=username,
        password=password,
        host=server_name,
        port=port,
        database=database_name,
    )

    # SQLAlchemy 엔진 생성
    self.engine = create_engine(connection_url)


1. 입력 인자:
	•`server_name`: MySQL 서버 주소 (예: `localhost`).
	•`database_name`: 연결할 데이터베이스 이름.
	•`username`: MySQL 사용자 이름.
	•`password`: MySQL 비밀번호.
	•`port`: MySQL 포트 번호 (기본값은 3306).

2. 연결 URL 생성:
	•SQLAlchemy의 `URL.create()`를 사용하여 MySQL 연결 문자열을 생성합니다.
	•드라이버는 `mysql+pymysql`을 사용합니다.

3. SQLAlchemy 엔진 생성:
	•`create_engine()`를 통해 데이터베이스 작업에 사용할 엔진 객체를 생성합니다.

 

  • 메서드: create_table()
# 주어진 테이블 정의(`MetaData`)를 기반으로 테이블을 생성합니다.

def create_table(self, metadata: MetaData) -> None:
    metadata.create_all(self.engine)

	•SQLAlchemy의 `MetaData` 객체에 정의된 테이블 스키마를 기반으로 실제 데이터베이스에 테이블을 생성합니다.
	•이미 존재하는 테이블은 무시됩니다.

 

  • 메서드: drop_tabel()
# 지정된 테이블을 삭제합니다. 테이블이 존재하지 않으면 무시됩니다.

def drop_table(self, table: Table) -> None:
    with self.engine.connect() as connection:
        connection.execute(text(f"DROP TABLE IF EXISTS {table.name}"))

	1. SQL 쿼리 실행:
		•`DROP TABLE IF EXISTS {table.name}`: 테이블이 존재하면 삭제하고, 없으면 무시합니다.
	2. SQLAlchemy의 `text()`를 사용하여 쿼리를 실행합니다.

 

  • 메서드: insert()
# Pandas DataFrame 데이터를 지정된 테이블에 삽입합니다.
# 테이블이 존재하지 않으면 생성 후 삽입합니다.

def insert(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
    self.create_table(metadata=metadata)
    df.to_sql(name=table.name, con=self.engine, if_exists="append", index=False)


1. 테이블 생성:
	•`self.create_table(metadata)`로 테이블이 존재하지 않을 경우 생성합니다.

2. DataFrame 삽입:
	•Pandas의 `to_sql()` 메서드를 사용하여 DataFrame 데이터를 MySQL 테이블에 삽입합니다.
	•`if_exists="append"` 옵션으로 기존 데이터에 추가됩니다.

 

  • 메서드: upsert()
# DataFrame 데이터를 삽입하거나, 기존 레코드가 있을 경우 업데이트합니다.

def upsert(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
    self.create_table(metadata=metadata)

    data = df.to_dict(orient="records")
    key_columns = [pk_column.name for pk_column in table.primary_key.columns.values()]
    key_values = [tuple(row[pk] for pk in key_columns) for row in data]
    delete_values = ", ".join(
        [f"({', '.join(map(repr, values))})" for values in key_values]
    )

    with self.engine.connect() as connection:
        if key_values:
            delete_sql = f"""
                DELETE FROM {self.database_name}.{table.name}
                WHERE ({', '.join(key_columns)}) IN (
                    {delete_values}
                )
            """
            connection.execute(text(delete_sql))
            connection.commit()

    df.to_sql(name=table.name, con=self.engine, if_exists="append", index=False)

1. 테이블 생성:
	• 테이블이 없으면 생성합니다.

2. 기존 레코드 삭제:
	• DataFrame에서 Primary Key 값을 추출하여 삭제 쿼리를 실행합니다.

3. 새 데이터 삽입:
	• Pandas의 `to_sql()` 메서드를 사용하여 데이터를 추가로 삽입합니다.

 

  • 메서드: overwrite()
# 기존 데이터를 모두 삭제하고 새로운 데이터를 삽입합니다.

def overwrite(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
    self.create_table(metadata=metadata)
    self.drop_table(table=table)
    self.insert(df=df, table=table, metadata=metadata)

1. 테이블 삭제 및 재생성:
	• 기존 데이터를 모두 삭제하기 위해 테이블을 드롭(drop)한 뒤 다시 생성합니다.

2. 새 데이터 삽입:
	• Pandas DataFrame 데이터를 새로 삽입합니다.

 

  • weather_api.py
import requests


class WeatherApiClient:
    """
    OpenWeatherMap API 클라이언트를 사용하여 날씨 데이터를 가져오는 클래스입니다.
    """

    def __init__(self, api_key: str):
        self.base_url = "http://api.openweathermap.org/data/2.5"
        if api_key is None:
            raise Exception("API 키는 None으로 설정할 수 없습니다.")
        self.api_key = api_key

    def get_city(self, city_name: str, temperature_units: str = "metric") -> dict:
        """
        지정된 도시의 최신 날씨 데이터를 가져옵니다.

        Parameters:
        - city_name (str): 날씨 정보를 조회할 도시 이름.
        - temperature_units (str): 온도 단위 (기본값은 'metric'으로 섭씨 기준).
                                   'metric'은 섭씨, 'imperial'은 화씨, 'standard'는 켈빈 단위를 의미합니다.

        Returns:
        - dict: 요청한 도시의 날씨 데이터가 포함된 JSON 응답을 반환합니다.

        Raises:
        - Exception: API 요청이 실패한 경우 상태 코드와 응답 메시지와 함께 예외가 발생합니다.
        """
        params = {"q": city_name, "units": temperature_units, "appid": self.api_key}
        response = requests.get(f"{self.base_url}/weather", params=params)
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(
                f"Open Weather API에서 데이터를 추출하지 못했습니다. 상태 코드: {response.status_code}. 응답: {response.text}"
            )

 

  • 생성자('__init__')
# 클래스를 초기화하고, API 키와 기본 URL을 설정합니다.

def __init__(self, api_key: str):
    self.base_url = "http://api.openweathermap.org/data/2.5"
    if api_key is None:
        raise Exception("API 키는 None으로 설정할 수 없습니다.")
    self.api_key = api_key

1. 기본 URL 설정:
	• OpenWeatherMap API의 기본 URL은 `http://api.openweathermap.org/data/2.5`입니다.
	• 이 URL은 모든 요청의 기본 경로로 사용됩니다.

2. API 키 검증:
	• `api_key`가 `None`이면 예외를 발생시킵니다.
	• API 키는 OpenWeatherMap API에 인증을 위해 필수적으로 필요합니다.

3. API 키 저장:
	• 유효한 API 키를 `self.api_key`에 저장합니다.

 

  • 메서드: get_city()
# 지정된 도시의 최신 날씨 데이터를 가져옵니다.

def get_city(self, city_name: str, temperature_units: str = "metric") -> dict:
    params = {"q": city_name, "units": temperature_units, "appid": self.api_key}
    response = requests.get(f"{self.base_url}/weather", params=params)
    if response.status_code == 200:
        return response.json()
    else:
        raise Exception(
            f"Open Weather API에서 데이터를 추출하지 못했습니다. 상태 코드: {response.status_code}. 응답: {response.text}"
        )

1. 매개변수
	•`city_name (str)`:
		•날씨 정보를 조회할 도시 이름입니다.
		•예: `"Seoul"`, `"New York"`.
	•`temperature_units (str)`:
		•온도 단위를 지정합니다.
		•기본값은 `"metric"`(섭씨)이며, 다음 중 하나를 선택할 수 있습니다:
		•`"metric"`: 섭씨
		•`"imperial"`: 화씨
		•`"standard"`: 켈빈
        
2. API 요청 파라미터 설정
	•OpenWeatherMap API에 전달할 쿼리 파라미터를 딕셔너리로 정의합니다:
		•`q`: 도시 이름 (예: `"Seoul"`).
		•`units`: 온도 단위 (예: `"metric"`).
		•`appid`: OpenWeatherMap API 키.

3. HTTP GET 요청
	•	`requests.get()` 메서드를 사용하여 OpenWeatherMap API에 HTTP GET 요청을 보냅니다.
    •	예시: http://api.openweathermap.org/data/2.5/weather?q=Seoul&units=metric&appid=YOUR_API_KEY

 

ETL_module: Assets 폴더

 

  • __init__.py는 빈 폴더
  • weather.py
  • 코드의 주요 구성
    1. `extract_weather()`: 데이터를 외부 API에서 추출.
    2. `transform_weather()`: 데이터를 변환 및 전처리.
    3. `load_weather()`: 데이터를 MySQL 데이터베이스에 적재.
from etl_module.connectors.weather_api import WeatherApiClient
from etl_module.connectors.mysql import MySqlClient
import pandas as pd
from sqlalchemy import MetaData, Table, Column, String, DateTime, Integer, Float


def extract_weather(
    weather_api_client: WeatherApiClient, cities: list = ["Seoul", "Busan"]
) -> pd.DataFrame:
    """
    여러 도시의 날씨 데이터를 추출합니다.

    Parameters:
    - weather_api_client (WeatherApiClient): API에서 날씨 데이터를 가져오기 위한 클라이언트.

    Returns:
    - pd.DataFrame: 지정된 도시들의 날씨 데이터를 포함하는 DataFrame.
    """

    weather_data = []
    for city_name in cities:
        weather_data.append(weather_api_client.get_city(city_name=city_name))
    df = pd.json_normalize(weather_data)
    return df


def transform_weather(df: pd.DataFrame) -> pd.DataFrame:
    """
    날씨 데이터를 변환하고 전처리합니다.

    Parameters:
    - df (pd.DataFrame): 원본 날씨 데이터를 포함하는 DataFrame.

    Returns:
    - pd.DataFrame: 선택된 컬럼과 이름이 변경된 데이터로 구성된 변환된 DataFrame.
    """
    df["measured_at"] = pd.to_datetime(df["dt"], unit="s") + pd.Timedelta(
        hours=9
    )  # 한국시간
    df["dt"] = df["measured_at"].dt.strftime("%Y%m%d")  # 기준년월일 (YYYYMMDD)
    df["time"] = df["measured_at"].dt.strftime("%H%M%S")  # 기준년월일 (HHHHMMSS)
    df_selected = df[
        [
            "dt",
            "time",
            "measured_at",
            "id",
            "name",
            "main.temp",
            "main.humidity",
            "wind.speed",
        ]
    ]
    df_selected = df_selected.rename(  # 컬럼명 수정
        columns={
            "name": "city",
            "main.temp": "temperature",
            "main.humidity": "humidity",
            "wind.speed": "wind_speed",
        }
    )
    return df_selected


def load_weather(
    df: pd.DataFrame,
    my_sql_client: MySqlClient,
    method: str = "upsert",
) -> None:
    """
    변환된 날씨 데이터를 MySQL에 로드하는 함수.

    Parameters:
    - df (pd.DataFrame): 변환된 데이터
    - my_sql_client (MySqlClient): 데이터베이스 클라이언트
    - method (str, optional): 삽입 방법 ('insert', 'upsert', 'overwrite')
    """
    metadata = MetaData()
    table = Table(
        "daily_weather",
        metadata,
        Column("dt", String(8), nullable=False, primary_key=True),
        Column("time", String(6), nullable=False, primary_key=True),
        Column("measured_at", DateTime, nullable=False),
        Column("id", Integer, primary_key=True),
        Column("city", String(100), nullable=True),
        Column("temperature", Float, nullable=True),
        Column("humidity", Integer, nullable=True),
        Column("wind_speed", Float, nullable=True),
    )
    if method == "insert":
        my_sql_client.insert(df=df, table=table, metadata=metadata)
    elif method == "upsert":
        my_sql_client.upsert(df=df, table=table, metadata=metadata)
    elif method == "overwrite":
        my_sql_client.overwrite(df=df, table=table, metadata=metadata)
    else:
        raise Exception("올바른 method를 설정해주세요: [insert, upsert, overwrite]")

 

  • extract_weather()
# 외부 Weather API에서 여러 도시의 날씨 데이터를 추출하여 Pandas DataFrame으로 반환합니다.

def extract_weather(
    weather_api_client: WeatherApiClient, cities: list = ["Seoul", "Busan"]
) -> pd.DataFrame:

    weather_data = []
    for city_name in cities:
        weather_data.append(weather_api_client.get_city(city_name=city_name))
    df = pd.json_normalize(weather_data)
    return df
    
    
	1.	`cities` 리스트:
		•기본적으로 `"Seoul", "Busan"`을 사용하며, 필요할 경우 다른 도시 리스트를 전달할 수 있습니다.
		•코드 내에서는 `cities = "Busan", "Seoul"`로 고정되어 있음.
	2.	데이터 추출:
		•`weather_api_client.get_city(city_name=city_name)` 메서드를 호출하여 각 도시의 날씨 데이터를 가져옵니다.
		•결과는 JSON 형식으로 반환됩니다.
	3.	Pandas DataFrame 변환:
		•`pd.json_normalize(weather_data)`를 사용하여 JSON 데이터를 DataFrame으로 변환합니다.
		•반환된 DataFrame은 모든 날씨 데이터를 포함합니다.

 

  • transform_weather()
# 추출된 날씨 데이터를 변환 및 전처리하여 필요한 컬럼만 선택하고, 데이터 형식을 조정합니다.

def transform_weather(df: pd.DataFrame) -> pd.DataFrame:
    df["measured_at"] = pd.to_datetime(df["dt"], unit="s") + pd.Timedelta(hours=9)
    df["dt"] = df["measured_at"].dt.strftime("%Y%m%d")
    df["time"] = df["measured_at"].dt.strftime("%H%M%S")
    df_selected = df[
        [
            "dt",
            "time",
            "measured_at",
            "id",
            "name",
            "main.temp",
            "main.humidity",
            "wind.speed",
        ]
    ]
    df_selected = df_selected.rename(
        columns={
            "name": "city",
            "main.temp": "temperature",
            "main.humidity": "humidity",
            "wind.speed": "wind_speed",
        }
    )
    return df_selected

 

  • load_weather()
# 변환된 날씨 데이터를 MySQL 데이터베이스에 저장합니다.
# 저장 방식은 `insert`, `upsert`, 또는 `overwrite` 중 하나를 선택할 수 있습니다.

def load_weather(
	df: pd.DataFrame, 
    my_sql_client: MySqlClient, 
    method: str = "upsert",
) -> None:
    metadata = MetaData()
    table = Table(
        "daily_weather",
        metadata,
        Column("dt", String(8), nullable=False, primary_key=True),
        Column("time", String(6), nullable=False, primary_key=True),
        Column("measured_at", DateTime, nullable=False),
        Column("id", Integer, primary_key=True),
        Column("city", String(100), nullable=True),
        Column("temperature", Float, nullable=True),
        Column("humidity", Integer, nullable=True),
        Column("wind_speed", Float, nullable=True),
    )
    if method == "insert":
        my_sql_client.insert(df=df, table=table, metadata=metadata)
    elif method == "upsert":
        my_sql_client.upsert(df=df, table=table, metadata=metadata)
    elif method == "overwrite":
        my_sql_client.overwrite(df=df, table=table, metadata=metadata)
    else:
        raise Exception("올바른 method를 설정해주세요: [insert, upsert, overwrite]")
        
        
        
        
1. 테이블 정의:
	•SQLAlchemy의 `Table` 객체를 사용하여 데이터베이스 테이블 스키마를 정의합니다.
	•테이블 이름: `"daily_weather"`
	•주요 컬럼:
    dt (String): 기준 날짜 (YYYYMMDD), Primary Key
    time (String): 기준 시간 (HHMMSS), Primary Key
    measured_at (DateTime): 측정 시각
    id (Integer): 도시 ID, Primary Key
    city (String): 도시 이름
    temperature (Float): 온도
    humidity (Integer): 습도
    wind_speed (Float): 풍속

2. 저장 방식 선택 (`method`):
	•`"insert"`: 새 데이터를 추가.
	•`"upsert"`: 기존 데이터가 있으면 업데이트하고 없으면 삽입.
	•`"overwrite"`: 기존 데이터를 삭제하고 새 데이터로 덮어씀.

3. MySQL 클라이언트 호출:
	•`my_sql_client.insert()`, `my_sql_client.upsert()`, 또는 `my_sql_client.overwrite()` 메서드를 호출하여 데이터를 저장.

4. 예외 처리:
	•지정되지 않은 방법이 입력되면 예외를 발생시킵니다.

 

  • 코드 실행 흐름
    • 1. Weather API 클라이언트(`WeatherApiClient`)와 MySQL 클라이언트(`MySqlClient`)를 초기화합니다.
    • 2. ETL 프로세스를 순차적으로 실행합니다:
      •  추출 (`extract_weather`):
        • API에서 날씨 데이터를 가져옵니다.
        • 결과는 Pandas DataFrame으로 반환됩니다.
      • 변환 (`transform_weather`):
        • 데이터를 전처리하고 필요한 컬럼만 선택합니다.
        • 결과는 변환된 DataFrame입니다.
      • 적재 (`load_weather`):
        • 변환된 데이터를 MySQL 데이터베이스에 저장합니다.

 

ETL_module: Pipeline 폴더

 

  • etl_pipeline.yaml
# yaml 파일에 변수 내용 저장
# 추후 변경 및 관리 용이

log_folder_path: "./etl_module/logs"
cities:
  - "Seoul"
  - "Busan"
  - "sejong"
  - "daegu"
  - "incheon"
  - "daejeon"
  - "ulsan"
run_minutes: 30

 

  • etl_pipeline.py
import os
from dotenv import load_dotenv
from etl_module.connectors.weather_api import WeatherApiClient
from etl_module.connectors.mysql import MySqlClient
from etl_module.assets.weather import extract_weather, transform_weather, load_weather
import sys
import yaml
import schedule
import time
from datetime import datetime
from loguru import logger

def main(config):
    """
    main 함수는 전체 ETL 프로세스를 실행합니다.

    1. 환경 변수를 로드하여 Weather API와 MySQL 데이터베이스 연결에 필요한 정보를 가져옵니다.
    2. WeatherApiClient와 MySqlClient 객체를 생성합니다.
    3. ETL 프로세스를 순차적으로 수행합니다:
       - 데이터를 Weather API에서 추출합니다.
       - 추출된 데이터를 변환하여 필요한 형태로 가공합니다.
       - 가공된 데이터를 MySQL 데이터베이스에 적재합니다.
    """
    
    """
    - ETL 시작
    - 환경변수 (.env) 읽는다
    - 객체 생성 (weather, mysql) 사용
    - extract log 시작, 끝
    - transform log 시작, 끝
    - load log 시작, 끝
    - ETL 파이프라인 끝
    """
    
    # 현재 날짜와 시간을 기반으로 Log 파일명 생성
    current_datetime = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    log_filename = f"{config.get('log_folder_path')}/etl_process_{current_datetime}.log"
    logger.add(log_filename)

    logger.info("ETL 프로세스를 시작합니다.")
         
    print('.env 파일 읽기 시작')
    load_dotenv()
    
    try:
        API_KEY = os.environ.get("API_KEY")
        DB_SERVER_HOST = os.environ.get("DB_SERVER_HOST")
        DB_USERNAME = os.environ.get("DB_USERNAME")
        DB_PASSWORD = os.environ.get("DB_PASSWORD")
        DB_DATABASE = os.environ.get("DB_DATABASE")
        DB_PORT = os.environ.get("DB_PORT")
        print('.env 파일 읽기 끝')
        
        if not all(
            [API_KEY, DB_SERVER_HOST, DB_USERNAME, DB_PASSWORD, DB_DATABASE, DB_PORT]
        ):
            # 누락된 변수들 확인
            missing_vars = [
                var
                for var, value in [
                    ("API_KEY", API_KEY),
                    ("DB_SERVER_HOST", DB_SERVER_HOST),
                    ("DB_USERNAME", DB_USERNAME),
                    ("DB_PASSWORD", DB_PASSWORD),
                    ("DB_DATABASE", DB_DATABASE),
                    ("DB_PORT", DB_PORT),
                ]
                if value is None
            ]
            error_message = f"누락된 환경 변수: {', '.join(missing_vars)}"
            logger.error(error_message)
            raise ValueError(error_message)  # 누락된 환경 변수가 있으면 예외를 발생시킴
        
        logger.info("환경 변수를 성공적으로 로드했습니다.")
        
        weather_api_client = WeatherApiClient(api_key=API_KEY)
        logger.info("WeatherApiClient가 초기화되었습니다.")
        
        my_sql_client = MySqlClient(
            server_name=DB_SERVER_HOST,
            database_name=DB_DATABASE,
            username=DB_USERNAME,
            password=DB_PASSWORD,
            port=DB_PORT,
        )
        logger.info("MySqlClient가 초기화되었습니다.")

        # ETL 실행
        logger.info("Weather API에서 데이터 추출을 시작합니다.")
        df = extract_weather(weather_api_client=weather_api_client, cities = config.get('cities'))
        logger.info(
            f"데이터 추출이 완료되었습니다. 총 {len(df)}개의 레코드가 있습니다."
        )
        
        logger.info("데이터 변환을 시작합니다.")
        clean_df = transform_weather(df)
        logger.info(
            f"데이터 변환이 완료되었습니다. 변환된 데이터프레임의 크기: {clean_df.shape}"
        )
                
        logger.info("MySQL 데이터베이스로 데이터 적재를 시작합니다.")
        load_weather(df=clean_df, my_sql_client=my_sql_client)
        logger.info("데이터 적재가 성공적으로 완료되었습니다.")
        
    except Exception as e:
        logger.error(f"ETL 프로세스 중 오류가 발생했습니다. 오류: {e}")

if __name__ == "__main__":
    
    # get config variables
    yaml_file_path = __file__.replace(".py", ".yaml")
    print(f"YAML 파일 위치: {yaml_file_path}")

    with open(yaml_file_path) as yaml_file:
        config = yaml.safe_load(yaml_file)

    log_folder_path = config.get("log_folder_path")
    os.makedirs(log_folder_path, exist_ok=True)

    # 스케줄러 생성
    schedule.every(config.get("run_minutes")).minutes.do(main, config=config)

    while True:
        schedule.run_pending()
        time.sleep(5)
  • 코드의 주요 흐름
    • 1. `.env` 파일과 YAML 설정 파일에서 환경 변수 및 설정 정보를 로드.
    • 2. Weather API와 MySQL 데이터베이스 클라이언트를 초기화.
    • 3. ETL 프로세스 실행:
      • 추출 (Extract): Weather API에서 데이터 가져오기.
      • 변환 (Transform): 데이터를 전처리 및 변환.
      • 적재 (Load): 데이터를 MySQL 데이터베이스에 저장.
    • 4. 로그 파일에 모든 작업 기록(Loguru 사용).
    • 5. 스케줄링 기능으로 일정 주기로 ETL 프로세스 실행.

 

  • 로그 파일 생성
current_datetime = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_filename = f"{config.get('log_folder_path')}/etl_process_{current_datetime}.log"
logger.add(log_filename)
logger.info("ETL 프로세스를 시작합니다.")

	•	현재 시간(`datetime.now()`)을 기반으로 고유한 로그 파일 이름을 생성합니다.
	•	Loguru를 사용하여 ETL 작업의 모든 이벤트를 기록합니다.

 

  • 환경 변수 로드
print('.env 파일 읽기 시작')
load_dotenv()

API_KEY = os.environ.get("API_KEY")
DB_SERVER_HOST = os.environ.get("DB_SERVER_HOST")
DB_USERNAME = os.environ.get("DB_USERNAME")
DB_PASSWORD = os.environ.get("DB_PASSWORD")
DB_DATABASE = os.environ.get("DB_DATABASE")
DB_PORT = os.environ.get("DB_PORT")
print('.env 파일 읽기 끝')

	•	`.env` 파일에서 OpenWeatherMap API 키와 MySQL 연결 정보를 로드합니다.
	•	환경 변수가 누락된 경우를 확인하고, 누락된 변수 이름을 로그에 기록합니다:

 

  • weather API 및 MySQL 클라이언트 초기화
weather_api_client = WeatherApiClient(api_key=API_KEY)
logger.info("WeatherApiClient가 초기화되었습니다.")

my_sql_client = MySqlClient(
    server_name=DB_SERVER_HOST,
    database_name=DB_DATABASE,
    username=DB_USERNAME,
    password=DB_PASSWORD,
    port=DB_PORT,
)
logger.info("MySqlClient가 초기화되었습니다.")

	•	`WeatherApiClient`: OpenWeatherMap API와 통신하는 객체를 생성합니다.
	•	`MySqlClient`: MySQL 데이터베이스와 통신하는 객체를 생성합니다.

 

  • ETL 실행
# 데이터 추출(Extract)

logger.info("Weather API에서 데이터 추출을 시작합니다.")
df = extract_weather(weather_api_client=weather_api_client, cities=config.get('cities'))
logger.info(f"데이터 추출이 완료되었습니다. 총 {len(df)}개의 레코드가 있습니다.")

	•	`extract_weather()` 함수는 Weather API에서 지정된 도시들의 날씨 데이터를 가져옵니다.
	•	결과는 Pandas DataFrame으로 반환됩니다.

 

# 데이터 변환 (Transform)

logger.info("데이터 변환을 시작합니다.")
clean_df = transform_weather(df)
logger.info(f"데이터 변환이 완료되었습니다. 변환된 데이터프레임의 크기: {clean_df.shape}")

	•	`transform_weather()` 함수는 데이터를 전처리하고 필요한 컬럼만 선택하여 반환합니다.

 

# 데이터 적재(Load)

logger.info("MySQL 데이터베이스로 데이터 적재를 시작합니다.")
load_weather(df=clean_df, my_sql_client=my_sql_client)
logger.info("데이터 적재가 성공적으로 완료되었습니다.")

	•	`load_weather()` 함수는 변환된 데이터를 MySQL 데이터베이스에 저장합니다.

 

  • yaml 설정 파일 로드
# yaml 파일 경로 생성
yaml_file_path = __file__.replace(".py", ".yaml")
print(f"YAML 파일 위치: {yaml_file_path}")

	•	현재 스크립트 경로에서 `.py` 확장자를 `.yaml`로 변경하여 YAML 설정 파일 경로를 생성합니다.
    

# yaml 파일 읽기
with open(yaml_file_path) as yaml_file:
    config = yaml.safe_load(yaml_file)

	•	YAML 설정 파일을 읽어 Python 딕셔너리(`config`)로 변환합니다.

 

  • 스케줄링 및 실행
schedule.every(config.get("run_minutes")).minutes.do(main, config=config)

	•	`schedule.every(...).minutes.do(...)`: 지정된 시간 간격마다 `main()` 함수를 실행하도록 설정합니다.
	•	`config` 딕셔너리를 `main()` 함수에 전달합니다.
    
# 다양한 주기 실행
schedule.every(1).seconds.do(job)

# 10초마다 실행
schedule.every(10).seconds.do(job)

# 1분마다 실행
schedule.every(60).seconds.do(job)
schedule.every(1).minute.do(job)

# 매일 자정에 실행
schedule.every().day.at("00:00").do(job)

# 매주 월요일 9시에 실행
schedule.every().monday.at("09:00").do(job)

 

  • 무한 루프 실행
while True:
    schedule.run_pending()
    time.sleep(5)

	•	스케줄러가 실행 대기 중인 작업들을 확인(`run_pending()`)하고, 5초 간격으로 반복 실행됩니다.

 

터미널에서 실행

  • 폴더 위치 실행(프로덕트 위치가 아니라면)
cd /Users/~~~~/python/crawling/python_challange/5_challenge_module
  • 실행
python3 -m etl_module.pipeline.etl_pipeline