본문 바로가기
[업무 지식]/Crawling

[ETL] Weather, MySQL 클래스를 객체화

by 에디터 윤슬 2025. 1. 2.

패키지 불러오기

from etl_module.connectors.weather_api import WeatherApiClient # 별도 생성한 클래스. 하단 etl_module 폴더에 코드 작성
from etl_module.connectors.mysql import MySqlClient # 별도 생성한 클래스. 하단 etl_module 폴더에 코드 작성
import pandas as pd 
import os
from dotenv import load_dotenv

load_dotenv()

 

  • 기존 환경변수(.env) 변경 시 코드
from dotenv import load_dotenv
from dotenv import dotenv_values

# 기존 환경 변수 제거
os.environ.pop('API_KEY', None)
os.environ.pop('DB_SERVER_HOST', None)
os.environ.pop('DB_USERNAME', None)
os.environ.pop('DB_PASSWORD', None)
os.environ.pop('DB_DATABASE', None)
os.environ.pop('DB_PORT', None)

# .env 파일 다시 로드
load_dotenv()

openweathermap API 인증 및 불러오기

 

Current weather and forecast - OpenWeatherMap

OpenWeather Weather forecasts, nowcasts and history in a fast and elegant way

openweathermap.org

# !pip3 install mysql-connector-python
# Client 
API_KEY = os.environ.get("API_KEY")
weather_api_client = WeatherApiClient(api_key = API_KEY)

 

# 테스트
weather_api_client.get_city(city_name='seoul')

 

추출 Extract

# 여러 도시에 날씨 정보 추출
def extract_weather(weather_api_client: WeatherApiClient) -> pd.DataFrame:
    """
    여러 도시의 날씨 데이터를 추출합니다.

    Parameters:
    - weather_api_client (WeatherApiClient): API에서 날씨 데이터를 가져오기 위한 클라이언트.

    Returns:
    - pd.DataFrame: 지정된 도시들의 날씨 데이터를 포함하는 DataFrame.
    """
    cities = ["seoul", "busan", "sejong", "daegu", "incheon", "daejeon", "ulsan"]
    weather_data = []
    for city_name in cities:
        weather_data.append(weather_api_client.get_city(city_name=city_name))
    df = pd.json_normalize(weather_data)
    return df

 

# 확인

df = extract_weather(weather_api_client=weather_api_client)
df.head()

 

변환 Transform

# 데이터 전처리 코드 함수

def transform_weather(df: pd.DataFrame) -> pd.DataFrame:
    """
    날씨 데이터를 변환하고 전처리합니다.

    Parameters:
    - df (pd.DataFrame): 원본 날씨 데이터를 포함하는 DataFrame.

    Returns:
    - pd.DataFrame: 선택된 컬럼과 이름이 변경된 데이터로 구성된 변환된 DataFrame.
    """
    df["measured_at"] = pd.to_datetime(df["dt"], unit="s") + pd.Timedelta(hours=9)  # 한국시간
    df["dt"] = df["measured_at"].dt.strftime("%Y%m%d")  # 기준년월일 (YYYYMMDD)
    df["time"] = df["measured_at"].dt.strftime("%H%M%S")  # 기준년월일 (HHHHMMSS)
    df_selected = df[["dt", "time", "measured_at", "id", "name", "main.temp", "main.humidity", "wind.speed"]]
    df_selected = df_selected.rename(columns={
        "name": "city", 
        "main.temp": "temperature", 
        "main.humidity": "humidity", 
        "wind.speed": "wind_speed"
    })
    return df_selected
# 확인
clean_df = transform_weather(df)
clean_df.head()

적재 Load

  • 환경변수 설정
  • MySQL 연결
from etl_module.connectors.mysql import MySqlClient # mysql.py 파일 클래스 참고(하단에 작성됨)

# 환경변수 설정 
DB_SERVER_HOST = os.environ.get('DB_SERVER_HOST')  # 데이터베이스 서버의 호스트 이름 (로컬호스트로 설정)
DB_USERNAME = os.environ.get('DB_USERNAME')      # 데이터베이스 아이디 
DB_PASSWORD = os.environ.get('DB_PASSWORD')      # 데이터베이스 비밀번호
DB_DATABASE = os.environ.get('DB_DATABASE') # 사용할 데이터베이스 이름
DB_PORT = os.environ.get('DB_PORT')              # 데이터베이스 연결을 위한 포트 (Default: 3306)

my_sql_client = MySqlClient(
    server_name=DB_SERVER_HOST, 
    database_name=DB_DATABASE, 
    username=DB_USERNAME, 
    password=DB_PASSWORD, 
    port=DB_PORT
)

 

  • MySQL 데이터베이스 로드
from sqlalchemy import MetaData, Table, Column, Integer, String, Float, DateTime

def load_weather(df: pd.DataFrame, my_sql_client: MySqlClient, method: str = "upsert",) -> None:
    """
    변환된 날씨 데이터를 MySQL 데이터베이스에 로드합니다.

    Parameters:
    - df (pd.DataFrame): 변환된 날씨 데이터를 포함하는 DataFrame.
    - my_sql_client (MySqlClient): MySQL 데이터베이스와 상호작용하는 클라이언트.
    - table (Table): 데이터를 로드할 대상 데이터베이스 테이블.
    - metadata (MetaData): 테이블 정의에 대한 SQLAlchemy 메타데이터 객체.
    - method (str, optional): 데이터 삽입 방법을 지정합니다.
                              옵션: "insert", "upsert", "overwrite".
                              기본값은 "upsert"입니다.
    """
    metadata = MetaData()
    table = Table(
        "daily_weather",
        metadata,
        Column("dt", String(8), nullable=False, primary_key=True),
        Column("time", String(6), nullable=False, primary_key=True),
        Column("measured_at", DateTime, nullable=False),
        Column("id", Integer, primary_key=True),
        Column("city", String(100), nullable=True),
        Column("temperature", Float, nullable=True),
        Column("humidity", Integer, nullable=True),
        Column("wind_speed", Float, nullable=True),
    )
    if method == "insert":
        my_sql_client.insert(df=df, table=table, metadata=metadata)
    elif method == "upsert":
        my_sql_client.upsert(df=df, table=table, metadata=metadata)
    elif method == "overwrite":
        my_sql_client.overwrite(df=df, table=table, metadata=metadata)
    else:
        raise Exception("올바른 method를 설정해주세요: [insert, upsert, overwrite]")

 

# 확인

load_weather(df=clean_df, my_sql_client=my_sql_client, method="upsert")

ETL Module 생성

  • 폴더명 etl_module 생성
    • 하위 폴더 1: assets
    • 하위 폴더 2: connectors
    • 하위 폴더 3: pipeline

  • assets -> __init__.py(빈 파일 생성)
  • assets -> weather.py 생성
  • extract_weather + transform_weather + load_weather
# weather.py

from etl_module.connectors.weather_api import WeatherApiClient
from etl_module.connectors.mysql import MySqlClient
import pandas as pd
from sqlalchemy import MetaData, Table, Column, String, DateTime, Integer, Float


def extract_weather(weather_api_client: WeatherApiClient) -> pd.DataFrame:
    """
    여러 도시의 날씨 데이터를 추출합니다.

    Parameters:
    - weather_api_client (WeatherApiClient): API에서 날씨 데이터를 가져오기 위한 클라이언트.

    Returns:
    - pd.DataFrame: 지정된 도시들의 날씨 데이터를 포함하는 DataFrame.
    """
    cities = ["seoul", "busan", "sejong", "daegu", "incheon", "daejeon", "ulsan"]
    weather_data = []
    for city_name in cities:
        weather_data.append(weather_api_client.get_city(city_name=city_name))
    df = pd.json_normalize(weather_data)
    return df


def transform_weather(df: pd.DataFrame) -> pd.DataFrame:
    """
    날씨 데이터를 변환하고 전처리합니다.

    Parameters:
    - df (pd.DataFrame): 원본 날씨 데이터를 포함하는 DataFrame.

    Returns:
    - pd.DataFrame: 선택된 컬럼과 이름이 변경된 데이터로 구성된 변환된 DataFrame.
    """
    df["measured_at"] = pd.to_datetime(df["dt"], unit="s") + pd.Timedelta(
        hours=9
    )  # 한국시간
    df["dt"] = df["measured_at"].dt.strftime("%Y%m%d")  # 기준년월일 (YYYYMMDD)
    df["time"] = df["measured_at"].dt.strftime("%H%M%S")  # 기준년월일 (HHHHMMSS)
    df_selected = df[
        [
            "dt",
            "time",
            "measured_at",
            "id",
            "name",
            "main.temp",
            "main.humidity",
            "wind.speed",
        ]
    ]
    df_selected = df_selected.rename(  # 컬럼명 수정
        columns={
            "name": "city",
            "main.temp": "temperature",
            "main.humidity": "humidity",
            "wind.speed": "wind_speed",
        }
    )
    return df_selected


def load_weather(
    df: pd.DataFrame,
    my_sql_client: MySqlClient,
    method: str = "upsert",
) -> None:
    """
    변환된 날씨 데이터를 MySQL에 로드하는 함수.

    Parameters:
    - df (pd.DataFrame): 변환된 데이터
    - my_sql_client (MySqlClient): 데이터베이스 클라이언트
    - method (str, optional): 삽입 방법 ('insert', 'upsert', 'overwrite')
    """
    metadata = MetaData()
    table = Table(
        "daily_weather",
        metadata,
        Column("dt", String(8), nullable=False, primary_key=True),
        Column("time", String(6), nullable=False, primary_key=True),
        Column("measured_at", DateTime, nullable=False),
        Column("id", Integer, primary_key=True),
        Column("city", String(100), nullable=True),
        Column("temperature", Float, nullable=True),
        Column("humidity", Integer, nullable=True),
        Column("wind_speed", Float, nullable=True),
    )
    if method == "insert":
        my_sql_client.insert(df=df, table=table, metadata=metadata)
    elif method == "upsert":
        my_sql_client.upsert(df=df, table=table, metadata=metadata)
    elif method == "overwrite":
        my_sql_client.overwrite(df=df, table=table, metadata=metadata)
    else:
        raise Exception("올바른 method를 설정해주세요: [insert, upsert, overwrite]")

 

  • connectors -> __init__.py 빈 파일 생성
  • connectors -> mysql.py 생성
# mysql.py

from sqlalchemy import create_engine, MetaData, Table, MetaData, Column
from sqlalchemy import text
from sqlalchemy.engine import URL
import pandas as pd


class MySqlClient:
    """
    MySQL 데이터베이스와 상호작용하기 위한 클라이언트 클래스입니다.

    이 클래스는 SQLAlchemy를 사용하여 MySQL 데이터베이스에 연결하고 테이블을 생성, 삭제, 삽입,
    업서트(upsert)와 같은 작업을 지원합니다.
    """

    def __init__(
        self,
        server_name: str,
        database_name: str,
        username: str,
        password: str,
        port: int = 3306,
    ):
        # 데이터베이스 연결을 위한 초기 설정
        self.host_name = server_name
        self.database_name = database_name
        self.username = username
        self.password = password
        self.port = port

        # MySQL 연결 URL 생성
        connection_url = URL.create(
            drivername="mysql+pymysql",
            username=username,
            password=password,
            host=server_name,
            port=port,
            database=database_name,
        )

        # SQLAlchemy 엔진 생성
        self.engine = create_engine(connection_url)

    def create_table(self, metadata: MetaData) -> None:
        """
        주어진 메타데이터 객체를 기반으로 테이블을 생성합니다.

        Parameters:
        - metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
        """
        metadata.create_all(self.engine)

    def drop_table(self, table: Table) -> None:
        """
        지정된 테이블을 삭제합니다. 테이블이 존재하지 않으면 무시합니다.

        Parameters:
        - table_name (str): 삭제할 테이블의 이름.
        """
        with self.engine.connect() as connection:
            connection.execute(text(f"DROP TABLE IF EXISTS {table.name}"))

    def insert(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
        """
        주어진 DataFrame을 테이블에 삽입합니다. 테이블이 존재하지 않으면 생성합니다.

        Parameters:
        - df (pd.DataFrame): 삽입할 데이터를 포함하는 Pandas DataFrame.
        - table (Table): 데이터 삽입을 위한 SQLAlchemy Table 객체.
        - metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
        """
        self.create_table(metadata=metadata)
        df.to_sql(name=table.name, con=self.engine, if_exists="append", index=False)

    def upsert(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
        """
        주어진 DataFrame 데이터를 테이블에 삽입하고, 기존 레코드가 있으면 갱신합니다.

        Parameters:
        - df (pd.DataFrame): 삽입 또는 갱신할 데이터를 포함하는 Pandas DataFrame.
        - table (Table): 업서트 작업을 수행할 SQLAlchemy Table 객체.
        - metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
        """
        self.create_table(metadata=metadata)

        # 데이터프레임을 레코드(딕셔너리 목록)으로 변환
        data = df.to_dict(orient="records")

        # 테이블의 고유 키(Primary Key) 추출
        key_columns = [
            pk_column.name for pk_column in table.primary_key.columns.values()
        ]
        key_values = [tuple(row[pk] for pk in key_columns) for row in data]
        delete_values = ", ".join(
            [f"({', '.join(map(repr, values))})" for values in key_values]
        )

        with self.engine.connect() as connection:
            if key_values:
                delete_sql = f"""
                    DELETE FROM {self.database_name}.{table.name}
                    WHERE ({', '.join(key_columns)}) IN (
                        {delete_values}
                    )
                """
                connection.execute(text(delete_sql))
                connection.commit()  # DELETE 문 실행

        # 변환된 데이터프레임을 테이블에 추가 (INSERT)
        df.to_sql(name=table.name, con=self.engine, if_exists="append", index=False)

    def overwrite(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
        """
        주어진 DataFrame 데이터를 테이블의 기존 데이터를 모두 대체하도록 삽입합니다.

        Parameters:
        - df (pd.DataFrame): 테이블의 기존 데이터를 대체할 데이터를 포함하는 Pandas DataFrame.
        - table (Table): 데이터를 대체할 SQLAlchemy Table 객체.
        - metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
        """
        # 테이블이 존재하지 않으면 생성
        self.create_table(metadata=metadata)

        # 기존 테이블 데이터 삭제
        self.drop_table(table=table)

        # 새로운 데이터를 테이블에 삽입
        self.insert(df=df, table=table, metadata=metadata)

 

  • connectors -> weather_api.py 생성
# weather_api.py

import requests


class WeatherApiClient:
    """
    OpenWeatherMap API 클라이언트를 사용하여 날씨 데이터를 가져오는 클래스입니다.
    """

    def __init__(self, api_key: str):
        self.base_url = "http://api.openweathermap.org/data/2.5"
        if api_key is None:
            raise Exception("API 키는 None으로 설정할 수 없습니다.")
        self.api_key = api_key

    def get_city(self, city_name: str, temperature_units: str = "metric") -> dict:
        """
        지정된 도시의 최신 날씨 데이터를 가져옵니다.

        Parameters:
        - city_name (str): 날씨 정보를 조회할 도시 이름.
        - temperature_units (str): 온도 단위 (기본값은 'metric'으로 섭씨 기준).
                                   'metric'은 섭씨, 'imperial'은 화씨, 'standard'는 켈빈 단위를 의미합니다.

        Returns:
        - dict: 요청한 도시의 날씨 데이터가 포함된 JSON 응답을 반환합니다.

        Raises:
        - Exception: API 요청이 실패한 경우 상태 코드와 응답 메시지와 함께 예외가 발생합니다.
        """
        params = {"q": city_name, "units": temperature_units, "appid": self.api_key}
        response = requests.get(f"{self.base_url}/weather", params=params)
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(
                f"Open Weather API에서 데이터를 추출하지 못했습니다. 상태 코드: {response.status_code}. 응답: {response.text}"
            )

 

  • pipeline -> etl_pipeline.py 생성
import os
from dotenv import load_dotenv
from etl_module.connectors.weather_api import WeatherApiClient
from etl_module.connectors.mysql import MySqlClient
from etl_module.assets.weather import extract_weather, transform_weather, load_weather


def main():
    """
    main 함수는 전체 ETL 프로세스를 실행합니다.

    1. 환경 변수를 로드하여 Weather API와 MySQL 데이터베이스 연결에 필요한 정보를 가져옵니다.
    2. WeatherApiClient와 MySqlClient 객체를 생성합니다.
    3. ETL 프로세스를 순차적으로 수행합니다:
       - 데이터를 Weather API에서 추출합니다.
       - 추출된 데이터를 변환하여 필요한 형태로 가공합니다.
       - 가공된 데이터를 MySQL 데이터베이스에 적재합니다.
    """
    load_dotenv()
    API_KEY = os.environ.get("API_KEY")
    DB_SERVER_HOST = os.environ.get("DB_SERVER_HOST")
    DB_USERNAME = os.environ.get("DB_USERNAME")
    DB_PASSWORD = os.environ.get("DB_PASSWORD")
    DB_DATABASE = os.environ.get("DB_DATABASE")
    DB_PORT = os.environ.get("DB_PORT")

    weather_api_client = WeatherApiClient(api_key=API_KEY)
    my_sql_client = MySqlClient(
        server_name=DB_SERVER_HOST,
        database_name=DB_DATABASE,
        username=DB_USERNAME,
        password=DB_PASSWORD,
        port=DB_PORT,
    )

    # ETL 실행
    df = extract_weather(weather_api_client=weather_api_client)
    clean_df = transform_weather(df)
    print(clean_df.shape)
    load_weather(df=clean_df, my_sql_client=my_sql_client)


if __name__ == "__main__":
    main()