모듈 호출
https://yoonsll.tistory.com/280
- 위 링크 이어서 진행
from dotenv import load_dotenv
import os
from datetime import datetime
from etl_module.connectors.weather_api import WeatherApiClient
from etl_module.connectors.mysql import MySqlClient
from etl_module.assets.weather import extract_weather, transform_weather, load_weather
from loguru import logger
import schedule
import yaml
import time
•`dotenv`: `.env` 파일에서 환경 변수를 로드합니다.
•`os`: 환경 변수 접근 및 파일 경로 처리에 사용됩니다.
•`datetime`: 현재 날짜와 시간을 가져와 로그 파일 이름을 생성합니다.
•`loguru`: 로그 기록을 관리하는 라이브러리입니다.
•`schedule`: 주기적으로 ETL 작업을 실행하기 위한 스케줄링 라이브러리입니다.
•`yaml`: 설정 파일(.yaml)을 읽어옵니다.
•사용자 정의 모듈:
•`WeatherApiClient`: Weather API와 통신하는 객체.
•`MySqlClient`: MySQL 데이터베이스와 통신하는 객체.
•`extract_weather`, `transform_weather`, `load_weather`: 각각 ETL의 추출, 변환, 적재 단계를 담당합니다.
ETL_module: Connectors 폴더
- mysql.py
from sqlalchemy import create_engine, MetaData, Table, MetaData, Column
from sqlalchemy import text
from sqlalchemy.engine import URL
import pandas as pd
class MySqlClient:
"""
MySQL 데이터베이스와 상호작용하기 위한 클라이언트 클래스입니다.
이 클래스는 SQLAlchemy를 사용하여 MySQL 데이터베이스에 연결하고 테이블을 생성, 삭제, 삽입,
업서트(upsert)와 같은 작업을 지원합니다.
"""
def __init__(
self,
server_name: str,
database_name: str,
username: str,
password: str,
port: int = 3306,
):
# 데이터베이스 연결을 위한 초기 설정
self.host_name = server_name
self.database_name = database_name
self.username = username
self.password = password
self.port = port
# MySQL 연결 URL 생성
connection_url = URL.create(
drivername="mysql+pymysql",
username=username,
password=password,
host=server_name,
port=port,
database=database_name,
)
# SQLAlchemy 엔진 생성
self.engine = create_engine(connection_url)
def create_table(self, metadata: MetaData) -> None:
"""
주어진 메타데이터 객체를 기반으로 테이블을 생성합니다.
Parameters:
- metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
"""
metadata.create_all(self.engine)
def drop_table(self, table: Table) -> None:
"""
지정된 테이블을 삭제합니다. 테이블이 존재하지 않으면 무시합니다.
Parameters:
- table_name (str): 삭제할 테이블의 이름.
"""
with self.engine.connect() as connection:
connection.execute(text(f"DROP TABLE IF EXISTS {table.name}"))
def insert(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
"""
주어진 DataFrame을 테이블에 삽입합니다. 테이블이 존재하지 않으면 생성합니다.
Parameters:
- df (pd.DataFrame): 삽입할 데이터를 포함하는 Pandas DataFrame.
- table (Table): 데이터 삽입을 위한 SQLAlchemy Table 객체.
- metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
"""
self.create_table(metadata=metadata)
df.to_sql(name=table.name, con=self.engine, if_exists="append", index=False)
def upsert(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
"""
주어진 DataFrame 데이터를 테이블에 삽입하고, 기존 레코드가 있으면 갱신합니다.
Parameters:
- df (pd.DataFrame): 삽입 또는 갱신할 데이터를 포함하는 Pandas DataFrame.
- table (Table): 업서트 작업을 수행할 SQLAlchemy Table 객체.
- metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
"""
self.create_table(metadata=metadata)
# 데이터프레임을 레코드(딕셔너리 목록)으로 변환
data = df.to_dict(orient="records")
# 테이블의 고유 키(Primary Key) 추출
key_columns = [
pk_column.name for pk_column in table.primary_key.columns.values()
]
key_values = [tuple(row[pk] for pk in key_columns) for row in data]
delete_values = ", ".join(
[f"({', '.join(map(repr, values))})" for values in key_values]
)
with self.engine.connect() as connection:
if key_values:
delete_sql = f"""
DELETE FROM {self.database_name}.{table.name}
WHERE ({', '.join(key_columns)}) IN (
{delete_values}
)
"""
connection.execute(text(delete_sql))
connection.commit() # DELETE 문 실행
# 변환된 데이터프레임을 테이블에 추가 (INSERT)
df.to_sql(name=table.name, con=self.engine, if_exists="append", index=False)
def overwrite(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
"""
주어진 DataFrame 데이터를 테이블의 기존 데이터를 모두 대체하도록 삽입합니다.
Parameters:
- df (pd.DataFrame): 테이블의 기존 데이터를 대체할 데이터를 포함하는 Pandas DataFrame.
- table (Table): 데이터를 대체할 SQLAlchemy Table 객체.
- metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
"""
# 테이블이 존재하지 않으면 생성
self.create_table(metadata=metadata)
# 기존 테이블 데이터 삭제
self.drop_table(table=table)
# 새로운 데이터를 테이블에 삽입
self.insert(df=df, table=table, metadata=metadata)
- 생성자('__init__')
# MySQL 데이터베이스 연결을 초기화합니다.
def __init__(
self,
server_name: str,
database_name: str,
username: str,
password: str,
port: int = 3306,
):
self.host_name = server_name
self.database_name = database_name
self.username = username
self.password = password
self.port = port
# MySQL 연결 URL 생성
connection_url = URL.create(
drivername="mysql+pymysql",
username=username,
password=password,
host=server_name,
port=port,
database=database_name,
)
# SQLAlchemy 엔진 생성
self.engine = create_engine(connection_url)
1. 입력 인자:
•`server_name`: MySQL 서버 주소 (예: `localhost`).
•`database_name`: 연결할 데이터베이스 이름.
•`username`: MySQL 사용자 이름.
•`password`: MySQL 비밀번호.
•`port`: MySQL 포트 번호 (기본값은 3306).
2. 연결 URL 생성:
•SQLAlchemy의 `URL.create()`를 사용하여 MySQL 연결 문자열을 생성합니다.
•드라이버는 `mysql+pymysql`을 사용합니다.
3. SQLAlchemy 엔진 생성:
•`create_engine()`를 통해 데이터베이스 작업에 사용할 엔진 객체를 생성합니다.
- 메서드: create_table()
# 주어진 테이블 정의(`MetaData`)를 기반으로 테이블을 생성합니다.
def create_table(self, metadata: MetaData) -> None:
metadata.create_all(self.engine)
•SQLAlchemy의 `MetaData` 객체에 정의된 테이블 스키마를 기반으로 실제 데이터베이스에 테이블을 생성합니다.
•이미 존재하는 테이블은 무시됩니다.
- 메서드: drop_tabel()
# 지정된 테이블을 삭제합니다. 테이블이 존재하지 않으면 무시됩니다.
def drop_table(self, table: Table) -> None:
with self.engine.connect() as connection:
connection.execute(text(f"DROP TABLE IF EXISTS {table.name}"))
1. SQL 쿼리 실행:
•`DROP TABLE IF EXISTS {table.name}`: 테이블이 존재하면 삭제하고, 없으면 무시합니다.
2. SQLAlchemy의 `text()`를 사용하여 쿼리를 실행합니다.
- 메서드: insert()
# Pandas DataFrame 데이터를 지정된 테이블에 삽입합니다.
# 테이블이 존재하지 않으면 생성 후 삽입합니다.
def insert(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
self.create_table(metadata=metadata)
df.to_sql(name=table.name, con=self.engine, if_exists="append", index=False)
1. 테이블 생성:
•`self.create_table(metadata)`로 테이블이 존재하지 않을 경우 생성합니다.
2. DataFrame 삽입:
•Pandas의 `to_sql()` 메서드를 사용하여 DataFrame 데이터를 MySQL 테이블에 삽입합니다.
•`if_exists="append"` 옵션으로 기존 데이터에 추가됩니다.
- 메서드: upsert()
# DataFrame 데이터를 삽입하거나, 기존 레코드가 있을 경우 업데이트합니다.
def upsert(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
self.create_table(metadata=metadata)
data = df.to_dict(orient="records")
key_columns = [pk_column.name for pk_column in table.primary_key.columns.values()]
key_values = [tuple(row[pk] for pk in key_columns) for row in data]
delete_values = ", ".join(
[f"({', '.join(map(repr, values))})" for values in key_values]
)
with self.engine.connect() as connection:
if key_values:
delete_sql = f"""
DELETE FROM {self.database_name}.{table.name}
WHERE ({', '.join(key_columns)}) IN (
{delete_values}
)
"""
connection.execute(text(delete_sql))
connection.commit()
df.to_sql(name=table.name, con=self.engine, if_exists="append", index=False)
1. 테이블 생성:
• 테이블이 없으면 생성합니다.
2. 기존 레코드 삭제:
• DataFrame에서 Primary Key 값을 추출하여 삭제 쿼리를 실행합니다.
3. 새 데이터 삽입:
• Pandas의 `to_sql()` 메서드를 사용하여 데이터를 추가로 삽입합니다.
- 메서드: overwrite()
# 기존 데이터를 모두 삭제하고 새로운 데이터를 삽입합니다.
def overwrite(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
self.create_table(metadata=metadata)
self.drop_table(table=table)
self.insert(df=df, table=table, metadata=metadata)
1. 테이블 삭제 및 재생성:
• 기존 데이터를 모두 삭제하기 위해 테이블을 드롭(drop)한 뒤 다시 생성합니다.
2. 새 데이터 삽입:
• Pandas DataFrame 데이터를 새로 삽입합니다.
- weather_api.py
import requests
class WeatherApiClient:
"""
OpenWeatherMap API 클라이언트를 사용하여 날씨 데이터를 가져오는 클래스입니다.
"""
def __init__(self, api_key: str):
self.base_url = "http://api.openweathermap.org/data/2.5"
if api_key is None:
raise Exception("API 키는 None으로 설정할 수 없습니다.")
self.api_key = api_key
def get_city(self, city_name: str, temperature_units: str = "metric") -> dict:
"""
지정된 도시의 최신 날씨 데이터를 가져옵니다.
Parameters:
- city_name (str): 날씨 정보를 조회할 도시 이름.
- temperature_units (str): 온도 단위 (기본값은 'metric'으로 섭씨 기준).
'metric'은 섭씨, 'imperial'은 화씨, 'standard'는 켈빈 단위를 의미합니다.
Returns:
- dict: 요청한 도시의 날씨 데이터가 포함된 JSON 응답을 반환합니다.
Raises:
- Exception: API 요청이 실패한 경우 상태 코드와 응답 메시지와 함께 예외가 발생합니다.
"""
params = {"q": city_name, "units": temperature_units, "appid": self.api_key}
response = requests.get(f"{self.base_url}/weather", params=params)
if response.status_code == 200:
return response.json()
else:
raise Exception(
f"Open Weather API에서 데이터를 추출하지 못했습니다. 상태 코드: {response.status_code}. 응답: {response.text}"
)
- 생성자('__init__')
# 클래스를 초기화하고, API 키와 기본 URL을 설정합니다.
def __init__(self, api_key: str):
self.base_url = "http://api.openweathermap.org/data/2.5"
if api_key is None:
raise Exception("API 키는 None으로 설정할 수 없습니다.")
self.api_key = api_key
1. 기본 URL 설정:
• OpenWeatherMap API의 기본 URL은 `http://api.openweathermap.org/data/2.5`입니다.
• 이 URL은 모든 요청의 기본 경로로 사용됩니다.
2. API 키 검증:
• `api_key`가 `None`이면 예외를 발생시킵니다.
• API 키는 OpenWeatherMap API에 인증을 위해 필수적으로 필요합니다.
3. API 키 저장:
• 유효한 API 키를 `self.api_key`에 저장합니다.
- 메서드: get_city()
# 지정된 도시의 최신 날씨 데이터를 가져옵니다.
def get_city(self, city_name: str, temperature_units: str = "metric") -> dict:
params = {"q": city_name, "units": temperature_units, "appid": self.api_key}
response = requests.get(f"{self.base_url}/weather", params=params)
if response.status_code == 200:
return response.json()
else:
raise Exception(
f"Open Weather API에서 데이터를 추출하지 못했습니다. 상태 코드: {response.status_code}. 응답: {response.text}"
)
1. 매개변수
•`city_name (str)`:
•날씨 정보를 조회할 도시 이름입니다.
•예: `"Seoul"`, `"New York"`.
•`temperature_units (str)`:
•온도 단위를 지정합니다.
•기본값은 `"metric"`(섭씨)이며, 다음 중 하나를 선택할 수 있습니다:
•`"metric"`: 섭씨
•`"imperial"`: 화씨
•`"standard"`: 켈빈
2. API 요청 파라미터 설정
•OpenWeatherMap API에 전달할 쿼리 파라미터를 딕셔너리로 정의합니다:
•`q`: 도시 이름 (예: `"Seoul"`).
•`units`: 온도 단위 (예: `"metric"`).
•`appid`: OpenWeatherMap API 키.
3. HTTP GET 요청
• `requests.get()` 메서드를 사용하여 OpenWeatherMap API에 HTTP GET 요청을 보냅니다.
• 예시: http://api.openweathermap.org/data/2.5/weather?q=Seoul&units=metric&appid=YOUR_API_KEY
ETL_module: Assets 폴더
- __init__.py는 빈 폴더
- weather.py
- 코드의 주요 구성
1. `extract_weather()`: 데이터를 외부 API에서 추출.
2. `transform_weather()`: 데이터를 변환 및 전처리.
3. `load_weather()`: 데이터를 MySQL 데이터베이스에 적재.
from etl_module.connectors.weather_api import WeatherApiClient
from etl_module.connectors.mysql import MySqlClient
import pandas as pd
from sqlalchemy import MetaData, Table, Column, String, DateTime, Integer, Float
def extract_weather(
weather_api_client: WeatherApiClient, cities: list = ["Seoul", "Busan"]
) -> pd.DataFrame:
"""
여러 도시의 날씨 데이터를 추출합니다.
Parameters:
- weather_api_client (WeatherApiClient): API에서 날씨 데이터를 가져오기 위한 클라이언트.
Returns:
- pd.DataFrame: 지정된 도시들의 날씨 데이터를 포함하는 DataFrame.
"""
weather_data = []
for city_name in cities:
weather_data.append(weather_api_client.get_city(city_name=city_name))
df = pd.json_normalize(weather_data)
return df
def transform_weather(df: pd.DataFrame) -> pd.DataFrame:
"""
날씨 데이터를 변환하고 전처리합니다.
Parameters:
- df (pd.DataFrame): 원본 날씨 데이터를 포함하는 DataFrame.
Returns:
- pd.DataFrame: 선택된 컬럼과 이름이 변경된 데이터로 구성된 변환된 DataFrame.
"""
df["measured_at"] = pd.to_datetime(df["dt"], unit="s") + pd.Timedelta(
hours=9
) # 한국시간
df["dt"] = df["measured_at"].dt.strftime("%Y%m%d") # 기준년월일 (YYYYMMDD)
df["time"] = df["measured_at"].dt.strftime("%H%M%S") # 기준년월일 (HHHHMMSS)
df_selected = df[
[
"dt",
"time",
"measured_at",
"id",
"name",
"main.temp",
"main.humidity",
"wind.speed",
]
]
df_selected = df_selected.rename( # 컬럼명 수정
columns={
"name": "city",
"main.temp": "temperature",
"main.humidity": "humidity",
"wind.speed": "wind_speed",
}
)
return df_selected
def load_weather(
df: pd.DataFrame,
my_sql_client: MySqlClient,
method: str = "upsert",
) -> None:
"""
변환된 날씨 데이터를 MySQL에 로드하는 함수.
Parameters:
- df (pd.DataFrame): 변환된 데이터
- my_sql_client (MySqlClient): 데이터베이스 클라이언트
- method (str, optional): 삽입 방법 ('insert', 'upsert', 'overwrite')
"""
metadata = MetaData()
table = Table(
"daily_weather",
metadata,
Column("dt", String(8), nullable=False, primary_key=True),
Column("time", String(6), nullable=False, primary_key=True),
Column("measured_at", DateTime, nullable=False),
Column("id", Integer, primary_key=True),
Column("city", String(100), nullable=True),
Column("temperature", Float, nullable=True),
Column("humidity", Integer, nullable=True),
Column("wind_speed", Float, nullable=True),
)
if method == "insert":
my_sql_client.insert(df=df, table=table, metadata=metadata)
elif method == "upsert":
my_sql_client.upsert(df=df, table=table, metadata=metadata)
elif method == "overwrite":
my_sql_client.overwrite(df=df, table=table, metadata=metadata)
else:
raise Exception("올바른 method를 설정해주세요: [insert, upsert, overwrite]")
- extract_weather()
# 외부 Weather API에서 여러 도시의 날씨 데이터를 추출하여 Pandas DataFrame으로 반환합니다.
def extract_weather(
weather_api_client: WeatherApiClient, cities: list = ["Seoul", "Busan"]
) -> pd.DataFrame:
weather_data = []
for city_name in cities:
weather_data.append(weather_api_client.get_city(city_name=city_name))
df = pd.json_normalize(weather_data)
return df
1. `cities` 리스트:
•기본적으로 `"Seoul", "Busan"`을 사용하며, 필요할 경우 다른 도시 리스트를 전달할 수 있습니다.
•코드 내에서는 `cities = "Busan", "Seoul"`로 고정되어 있음.
2. 데이터 추출:
•`weather_api_client.get_city(city_name=city_name)` 메서드를 호출하여 각 도시의 날씨 데이터를 가져옵니다.
•결과는 JSON 형식으로 반환됩니다.
3. Pandas DataFrame 변환:
•`pd.json_normalize(weather_data)`를 사용하여 JSON 데이터를 DataFrame으로 변환합니다.
•반환된 DataFrame은 모든 날씨 데이터를 포함합니다.
- transform_weather()
# 추출된 날씨 데이터를 변환 및 전처리하여 필요한 컬럼만 선택하고, 데이터 형식을 조정합니다.
def transform_weather(df: pd.DataFrame) -> pd.DataFrame:
df["measured_at"] = pd.to_datetime(df["dt"], unit="s") + pd.Timedelta(hours=9)
df["dt"] = df["measured_at"].dt.strftime("%Y%m%d")
df["time"] = df["measured_at"].dt.strftime("%H%M%S")
df_selected = df[
[
"dt",
"time",
"measured_at",
"id",
"name",
"main.temp",
"main.humidity",
"wind.speed",
]
]
df_selected = df_selected.rename(
columns={
"name": "city",
"main.temp": "temperature",
"main.humidity": "humidity",
"wind.speed": "wind_speed",
}
)
return df_selected
- load_weather()
# 변환된 날씨 데이터를 MySQL 데이터베이스에 저장합니다.
# 저장 방식은 `insert`, `upsert`, 또는 `overwrite` 중 하나를 선택할 수 있습니다.
def load_weather(
df: pd.DataFrame,
my_sql_client: MySqlClient,
method: str = "upsert",
) -> None:
metadata = MetaData()
table = Table(
"daily_weather",
metadata,
Column("dt", String(8), nullable=False, primary_key=True),
Column("time", String(6), nullable=False, primary_key=True),
Column("measured_at", DateTime, nullable=False),
Column("id", Integer, primary_key=True),
Column("city", String(100), nullable=True),
Column("temperature", Float, nullable=True),
Column("humidity", Integer, nullable=True),
Column("wind_speed", Float, nullable=True),
)
if method == "insert":
my_sql_client.insert(df=df, table=table, metadata=metadata)
elif method == "upsert":
my_sql_client.upsert(df=df, table=table, metadata=metadata)
elif method == "overwrite":
my_sql_client.overwrite(df=df, table=table, metadata=metadata)
else:
raise Exception("올바른 method를 설정해주세요: [insert, upsert, overwrite]")
1. 테이블 정의:
•SQLAlchemy의 `Table` 객체를 사용하여 데이터베이스 테이블 스키마를 정의합니다.
•테이블 이름: `"daily_weather"`
•주요 컬럼:
dt (String): 기준 날짜 (YYYYMMDD), Primary Key
time (String): 기준 시간 (HHMMSS), Primary Key
measured_at (DateTime): 측정 시각
id (Integer): 도시 ID, Primary Key
city (String): 도시 이름
temperature (Float): 온도
humidity (Integer): 습도
wind_speed (Float): 풍속
2. 저장 방식 선택 (`method`):
•`"insert"`: 새 데이터를 추가.
•`"upsert"`: 기존 데이터가 있으면 업데이트하고 없으면 삽입.
•`"overwrite"`: 기존 데이터를 삭제하고 새 데이터로 덮어씀.
3. MySQL 클라이언트 호출:
•`my_sql_client.insert()`, `my_sql_client.upsert()`, 또는 `my_sql_client.overwrite()` 메서드를 호출하여 데이터를 저장.
4. 예외 처리:
•지정되지 않은 방법이 입력되면 예외를 발생시킵니다.
- 코드 실행 흐름
- 1. Weather API 클라이언트(`WeatherApiClient`)와 MySQL 클라이언트(`MySqlClient`)를 초기화합니다.
- 2. ETL 프로세스를 순차적으로 실행합니다:
- 추출 (`extract_weather`):
- API에서 날씨 데이터를 가져옵니다.
- 결과는 Pandas DataFrame으로 반환됩니다.
- 변환 (`transform_weather`):
- 데이터를 전처리하고 필요한 컬럼만 선택합니다.
- 결과는 변환된 DataFrame입니다.
- 적재 (`load_weather`):
- 변환된 데이터를 MySQL 데이터베이스에 저장합니다.
- 추출 (`extract_weather`):
ETL_module: Pipeline 폴더
- etl_pipeline.yaml
# yaml 파일에 변수 내용 저장
# 추후 변경 및 관리 용이
log_folder_path: "./etl_module/logs"
cities:
- "Seoul"
- "Busan"
- "sejong"
- "daegu"
- "incheon"
- "daejeon"
- "ulsan"
run_minutes: 30
- etl_pipeline.py
import os
from dotenv import load_dotenv
from etl_module.connectors.weather_api import WeatherApiClient
from etl_module.connectors.mysql import MySqlClient
from etl_module.assets.weather import extract_weather, transform_weather, load_weather
import sys
import yaml
import schedule
import time
from datetime import datetime
from loguru import logger
def main(config):
"""
main 함수는 전체 ETL 프로세스를 실행합니다.
1. 환경 변수를 로드하여 Weather API와 MySQL 데이터베이스 연결에 필요한 정보를 가져옵니다.
2. WeatherApiClient와 MySqlClient 객체를 생성합니다.
3. ETL 프로세스를 순차적으로 수행합니다:
- 데이터를 Weather API에서 추출합니다.
- 추출된 데이터를 변환하여 필요한 형태로 가공합니다.
- 가공된 데이터를 MySQL 데이터베이스에 적재합니다.
"""
"""
- ETL 시작
- 환경변수 (.env) 읽는다
- 객체 생성 (weather, mysql) 사용
- extract log 시작, 끝
- transform log 시작, 끝
- load log 시작, 끝
- ETL 파이프라인 끝
"""
# 현재 날짜와 시간을 기반으로 Log 파일명 생성
current_datetime = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_filename = f"{config.get('log_folder_path')}/etl_process_{current_datetime}.log"
logger.add(log_filename)
logger.info("ETL 프로세스를 시작합니다.")
print('.env 파일 읽기 시작')
load_dotenv()
try:
API_KEY = os.environ.get("API_KEY")
DB_SERVER_HOST = os.environ.get("DB_SERVER_HOST")
DB_USERNAME = os.environ.get("DB_USERNAME")
DB_PASSWORD = os.environ.get("DB_PASSWORD")
DB_DATABASE = os.environ.get("DB_DATABASE")
DB_PORT = os.environ.get("DB_PORT")
print('.env 파일 읽기 끝')
if not all(
[API_KEY, DB_SERVER_HOST, DB_USERNAME, DB_PASSWORD, DB_DATABASE, DB_PORT]
):
# 누락된 변수들 확인
missing_vars = [
var
for var, value in [
("API_KEY", API_KEY),
("DB_SERVER_HOST", DB_SERVER_HOST),
("DB_USERNAME", DB_USERNAME),
("DB_PASSWORD", DB_PASSWORD),
("DB_DATABASE", DB_DATABASE),
("DB_PORT", DB_PORT),
]
if value is None
]
error_message = f"누락된 환경 변수: {', '.join(missing_vars)}"
logger.error(error_message)
raise ValueError(error_message) # 누락된 환경 변수가 있으면 예외를 발생시킴
logger.info("환경 변수를 성공적으로 로드했습니다.")
weather_api_client = WeatherApiClient(api_key=API_KEY)
logger.info("WeatherApiClient가 초기화되었습니다.")
my_sql_client = MySqlClient(
server_name=DB_SERVER_HOST,
database_name=DB_DATABASE,
username=DB_USERNAME,
password=DB_PASSWORD,
port=DB_PORT,
)
logger.info("MySqlClient가 초기화되었습니다.")
# ETL 실행
logger.info("Weather API에서 데이터 추출을 시작합니다.")
df = extract_weather(weather_api_client=weather_api_client, cities = config.get('cities'))
logger.info(
f"데이터 추출이 완료되었습니다. 총 {len(df)}개의 레코드가 있습니다."
)
logger.info("데이터 변환을 시작합니다.")
clean_df = transform_weather(df)
logger.info(
f"데이터 변환이 완료되었습니다. 변환된 데이터프레임의 크기: {clean_df.shape}"
)
logger.info("MySQL 데이터베이스로 데이터 적재를 시작합니다.")
load_weather(df=clean_df, my_sql_client=my_sql_client)
logger.info("데이터 적재가 성공적으로 완료되었습니다.")
except Exception as e:
logger.error(f"ETL 프로세스 중 오류가 발생했습니다. 오류: {e}")
if __name__ == "__main__":
# get config variables
yaml_file_path = __file__.replace(".py", ".yaml")
print(f"YAML 파일 위치: {yaml_file_path}")
with open(yaml_file_path) as yaml_file:
config = yaml.safe_load(yaml_file)
log_folder_path = config.get("log_folder_path")
os.makedirs(log_folder_path, exist_ok=True)
# 스케줄러 생성
schedule.every(config.get("run_minutes")).minutes.do(main, config=config)
while True:
schedule.run_pending()
time.sleep(5)
- 코드의 주요 흐름
- 1. `.env` 파일과 YAML 설정 파일에서 환경 변수 및 설정 정보를 로드.
- 2. Weather API와 MySQL 데이터베이스 클라이언트를 초기화.
- 3. ETL 프로세스 실행:
- 추출 (Extract): Weather API에서 데이터 가져오기.
- 변환 (Transform): 데이터를 전처리 및 변환.
- 적재 (Load): 데이터를 MySQL 데이터베이스에 저장.
- 4. 로그 파일에 모든 작업 기록(Loguru 사용).
- 5. 스케줄링 기능으로 일정 주기로 ETL 프로세스 실행.
- 로그 파일 생성
current_datetime = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
log_filename = f"{config.get('log_folder_path')}/etl_process_{current_datetime}.log"
logger.add(log_filename)
logger.info("ETL 프로세스를 시작합니다.")
• 현재 시간(`datetime.now()`)을 기반으로 고유한 로그 파일 이름을 생성합니다.
• Loguru를 사용하여 ETL 작업의 모든 이벤트를 기록합니다.
- 환경 변수 로드
print('.env 파일 읽기 시작')
load_dotenv()
API_KEY = os.environ.get("API_KEY")
DB_SERVER_HOST = os.environ.get("DB_SERVER_HOST")
DB_USERNAME = os.environ.get("DB_USERNAME")
DB_PASSWORD = os.environ.get("DB_PASSWORD")
DB_DATABASE = os.environ.get("DB_DATABASE")
DB_PORT = os.environ.get("DB_PORT")
print('.env 파일 읽기 끝')
• `.env` 파일에서 OpenWeatherMap API 키와 MySQL 연결 정보를 로드합니다.
• 환경 변수가 누락된 경우를 확인하고, 누락된 변수 이름을 로그에 기록합니다:
- weather API 및 MySQL 클라이언트 초기화
weather_api_client = WeatherApiClient(api_key=API_KEY)
logger.info("WeatherApiClient가 초기화되었습니다.")
my_sql_client = MySqlClient(
server_name=DB_SERVER_HOST,
database_name=DB_DATABASE,
username=DB_USERNAME,
password=DB_PASSWORD,
port=DB_PORT,
)
logger.info("MySqlClient가 초기화되었습니다.")
• `WeatherApiClient`: OpenWeatherMap API와 통신하는 객체를 생성합니다.
• `MySqlClient`: MySQL 데이터베이스와 통신하는 객체를 생성합니다.
- ETL 실행
# 데이터 추출(Extract)
logger.info("Weather API에서 데이터 추출을 시작합니다.")
df = extract_weather(weather_api_client=weather_api_client, cities=config.get('cities'))
logger.info(f"데이터 추출이 완료되었습니다. 총 {len(df)}개의 레코드가 있습니다.")
• `extract_weather()` 함수는 Weather API에서 지정된 도시들의 날씨 데이터를 가져옵니다.
• 결과는 Pandas DataFrame으로 반환됩니다.
# 데이터 변환 (Transform)
logger.info("데이터 변환을 시작합니다.")
clean_df = transform_weather(df)
logger.info(f"데이터 변환이 완료되었습니다. 변환된 데이터프레임의 크기: {clean_df.shape}")
• `transform_weather()` 함수는 데이터를 전처리하고 필요한 컬럼만 선택하여 반환합니다.
# 데이터 적재(Load)
logger.info("MySQL 데이터베이스로 데이터 적재를 시작합니다.")
load_weather(df=clean_df, my_sql_client=my_sql_client)
logger.info("데이터 적재가 성공적으로 완료되었습니다.")
• `load_weather()` 함수는 변환된 데이터를 MySQL 데이터베이스에 저장합니다.
- yaml 설정 파일 로드
# yaml 파일 경로 생성
yaml_file_path = __file__.replace(".py", ".yaml")
print(f"YAML 파일 위치: {yaml_file_path}")
• 현재 스크립트 경로에서 `.py` 확장자를 `.yaml`로 변경하여 YAML 설정 파일 경로를 생성합니다.
# yaml 파일 읽기
with open(yaml_file_path) as yaml_file:
config = yaml.safe_load(yaml_file)
• YAML 설정 파일을 읽어 Python 딕셔너리(`config`)로 변환합니다.
- 스케줄링 및 실행
schedule.every(config.get("run_minutes")).minutes.do(main, config=config)
• `schedule.every(...).minutes.do(...)`: 지정된 시간 간격마다 `main()` 함수를 실행하도록 설정합니다.
• `config` 딕셔너리를 `main()` 함수에 전달합니다.
# 다양한 주기 실행
schedule.every(1).seconds.do(job)
# 10초마다 실행
schedule.every(10).seconds.do(job)
# 1분마다 실행
schedule.every(60).seconds.do(job)
schedule.every(1).minute.do(job)
# 매일 자정에 실행
schedule.every().day.at("00:00").do(job)
# 매주 월요일 9시에 실행
schedule.every().monday.at("09:00").do(job)
- 무한 루프 실행
while True:
schedule.run_pending()
time.sleep(5)
• 스케줄러가 실행 대기 중인 작업들을 확인(`run_pending()`)하고, 5초 간격으로 반복 실행됩니다.
터미널에서 실행
- 폴더 위치 실행(프로덕트 위치가 아니라면)
cd /Users/~~~~/python/crawling/python_challange/5_challenge_module
- 실행
python3 -m etl_module.pipeline.etl_pipeline
'[업무 지식] > Crawling' 카테고리의 다른 글
[네이버 뉴스] '이커머스' 검색 타이틀 크롤링 (0) | 2025.01.06 |
---|---|
[서울시 열린데이터] 서울시 상권분석서비스(소득소비-상권) (0) | 2025.01.03 |
[NAVER API] 쇼핑인사이트 (0) | 2025.01.03 |
[Selenium] CGV review Crawling (0) | 2025.01.03 |
[ETL] Weather, MySQL 클래스를 객체화 (0) | 2025.01.02 |