패키지 불러오기
from etl_module.connectors.weather_api import WeatherApiClient # 별도 생성한 클래스. 하단 etl_module 폴더에 코드 작성
from etl_module.connectors.mysql import MySqlClient # 별도 생성한 클래스. 하단 etl_module 폴더에 코드 작성
import pandas as pd
import os
from dotenv import load_dotenv
load_dotenv()
- 기존 환경변수(.env) 변경 시 코드
from dotenv import load_dotenv
from dotenv import dotenv_values
# 기존 환경 변수 제거
os.environ.pop('API_KEY', None)
os.environ.pop('DB_SERVER_HOST', None)
os.environ.pop('DB_USERNAME', None)
os.environ.pop('DB_PASSWORD', None)
os.environ.pop('DB_DATABASE', None)
os.environ.pop('DB_PORT', None)
# .env 파일 다시 로드
load_dotenv()
openweathermap API 인증 및 불러오기
# !pip3 install mysql-connector-python
# Client
API_KEY = os.environ.get("API_KEY")
weather_api_client = WeatherApiClient(api_key = API_KEY)
# 테스트
weather_api_client.get_city(city_name='seoul')
추출 Extract
# 여러 도시에 날씨 정보 추출
def extract_weather(weather_api_client: WeatherApiClient) -> pd.DataFrame:
"""
여러 도시의 날씨 데이터를 추출합니다.
Parameters:
- weather_api_client (WeatherApiClient): API에서 날씨 데이터를 가져오기 위한 클라이언트.
Returns:
- pd.DataFrame: 지정된 도시들의 날씨 데이터를 포함하는 DataFrame.
"""
cities = ["seoul", "busan", "sejong", "daegu", "incheon", "daejeon", "ulsan"]
weather_data = []
for city_name in cities:
weather_data.append(weather_api_client.get_city(city_name=city_name))
df = pd.json_normalize(weather_data)
return df
# 확인
df = extract_weather(weather_api_client=weather_api_client)
df.head()
변환 Transform
# 데이터 전처리 코드 함수
def transform_weather(df: pd.DataFrame) -> pd.DataFrame:
"""
날씨 데이터를 변환하고 전처리합니다.
Parameters:
- df (pd.DataFrame): 원본 날씨 데이터를 포함하는 DataFrame.
Returns:
- pd.DataFrame: 선택된 컬럼과 이름이 변경된 데이터로 구성된 변환된 DataFrame.
"""
df["measured_at"] = pd.to_datetime(df["dt"], unit="s") + pd.Timedelta(hours=9) # 한국시간
df["dt"] = df["measured_at"].dt.strftime("%Y%m%d") # 기준년월일 (YYYYMMDD)
df["time"] = df["measured_at"].dt.strftime("%H%M%S") # 기준년월일 (HHHHMMSS)
df_selected = df[["dt", "time", "measured_at", "id", "name", "main.temp", "main.humidity", "wind.speed"]]
df_selected = df_selected.rename(columns={
"name": "city",
"main.temp": "temperature",
"main.humidity": "humidity",
"wind.speed": "wind_speed"
})
return df_selected
# 확인
clean_df = transform_weather(df)
clean_df.head()
적재 Load
- 환경변수 설정
- MySQL 연결
from etl_module.connectors.mysql import MySqlClient # mysql.py 파일 클래스 참고(하단에 작성됨)
# 환경변수 설정
DB_SERVER_HOST = os.environ.get('DB_SERVER_HOST') # 데이터베이스 서버의 호스트 이름 (로컬호스트로 설정)
DB_USERNAME = os.environ.get('DB_USERNAME') # 데이터베이스 아이디
DB_PASSWORD = os.environ.get('DB_PASSWORD') # 데이터베이스 비밀번호
DB_DATABASE = os.environ.get('DB_DATABASE') # 사용할 데이터베이스 이름
DB_PORT = os.environ.get('DB_PORT') # 데이터베이스 연결을 위한 포트 (Default: 3306)
my_sql_client = MySqlClient(
server_name=DB_SERVER_HOST,
database_name=DB_DATABASE,
username=DB_USERNAME,
password=DB_PASSWORD,
port=DB_PORT
)
- MySQL 데이터베이스 로드
from sqlalchemy import MetaData, Table, Column, Integer, String, Float, DateTime
def load_weather(df: pd.DataFrame, my_sql_client: MySqlClient, method: str = "upsert",) -> None:
"""
변환된 날씨 데이터를 MySQL 데이터베이스에 로드합니다.
Parameters:
- df (pd.DataFrame): 변환된 날씨 데이터를 포함하는 DataFrame.
- my_sql_client (MySqlClient): MySQL 데이터베이스와 상호작용하는 클라이언트.
- table (Table): 데이터를 로드할 대상 데이터베이스 테이블.
- metadata (MetaData): 테이블 정의에 대한 SQLAlchemy 메타데이터 객체.
- method (str, optional): 데이터 삽입 방법을 지정합니다.
옵션: "insert", "upsert", "overwrite".
기본값은 "upsert"입니다.
"""
metadata = MetaData()
table = Table(
"daily_weather",
metadata,
Column("dt", String(8), nullable=False, primary_key=True),
Column("time", String(6), nullable=False, primary_key=True),
Column("measured_at", DateTime, nullable=False),
Column("id", Integer, primary_key=True),
Column("city", String(100), nullable=True),
Column("temperature", Float, nullable=True),
Column("humidity", Integer, nullable=True),
Column("wind_speed", Float, nullable=True),
)
if method == "insert":
my_sql_client.insert(df=df, table=table, metadata=metadata)
elif method == "upsert":
my_sql_client.upsert(df=df, table=table, metadata=metadata)
elif method == "overwrite":
my_sql_client.overwrite(df=df, table=table, metadata=metadata)
else:
raise Exception("올바른 method를 설정해주세요: [insert, upsert, overwrite]")
# 확인
load_weather(df=clean_df, my_sql_client=my_sql_client, method="upsert")
ETL Module 생성
- 폴더명 etl_module 생성
- 하위 폴더 1: assets
- 하위 폴더 2: connectors
- 하위 폴더 3: pipeline
- assets -> __init__.py(빈 파일 생성)
- assets -> weather.py 생성
- extract_weather + transform_weather + load_weather
# weather.py
from etl_module.connectors.weather_api import WeatherApiClient
from etl_module.connectors.mysql import MySqlClient
import pandas as pd
from sqlalchemy import MetaData, Table, Column, String, DateTime, Integer, Float
def extract_weather(weather_api_client: WeatherApiClient) -> pd.DataFrame:
"""
여러 도시의 날씨 데이터를 추출합니다.
Parameters:
- weather_api_client (WeatherApiClient): API에서 날씨 데이터를 가져오기 위한 클라이언트.
Returns:
- pd.DataFrame: 지정된 도시들의 날씨 데이터를 포함하는 DataFrame.
"""
cities = ["seoul", "busan", "sejong", "daegu", "incheon", "daejeon", "ulsan"]
weather_data = []
for city_name in cities:
weather_data.append(weather_api_client.get_city(city_name=city_name))
df = pd.json_normalize(weather_data)
return df
def transform_weather(df: pd.DataFrame) -> pd.DataFrame:
"""
날씨 데이터를 변환하고 전처리합니다.
Parameters:
- df (pd.DataFrame): 원본 날씨 데이터를 포함하는 DataFrame.
Returns:
- pd.DataFrame: 선택된 컬럼과 이름이 변경된 데이터로 구성된 변환된 DataFrame.
"""
df["measured_at"] = pd.to_datetime(df["dt"], unit="s") + pd.Timedelta(
hours=9
) # 한국시간
df["dt"] = df["measured_at"].dt.strftime("%Y%m%d") # 기준년월일 (YYYYMMDD)
df["time"] = df["measured_at"].dt.strftime("%H%M%S") # 기준년월일 (HHHHMMSS)
df_selected = df[
[
"dt",
"time",
"measured_at",
"id",
"name",
"main.temp",
"main.humidity",
"wind.speed",
]
]
df_selected = df_selected.rename( # 컬럼명 수정
columns={
"name": "city",
"main.temp": "temperature",
"main.humidity": "humidity",
"wind.speed": "wind_speed",
}
)
return df_selected
def load_weather(
df: pd.DataFrame,
my_sql_client: MySqlClient,
method: str = "upsert",
) -> None:
"""
변환된 날씨 데이터를 MySQL에 로드하는 함수.
Parameters:
- df (pd.DataFrame): 변환된 데이터
- my_sql_client (MySqlClient): 데이터베이스 클라이언트
- method (str, optional): 삽입 방법 ('insert', 'upsert', 'overwrite')
"""
metadata = MetaData()
table = Table(
"daily_weather",
metadata,
Column("dt", String(8), nullable=False, primary_key=True),
Column("time", String(6), nullable=False, primary_key=True),
Column("measured_at", DateTime, nullable=False),
Column("id", Integer, primary_key=True),
Column("city", String(100), nullable=True),
Column("temperature", Float, nullable=True),
Column("humidity", Integer, nullable=True),
Column("wind_speed", Float, nullable=True),
)
if method == "insert":
my_sql_client.insert(df=df, table=table, metadata=metadata)
elif method == "upsert":
my_sql_client.upsert(df=df, table=table, metadata=metadata)
elif method == "overwrite":
my_sql_client.overwrite(df=df, table=table, metadata=metadata)
else:
raise Exception("올바른 method를 설정해주세요: [insert, upsert, overwrite]")
- connectors -> __init__.py 빈 파일 생성
- connectors -> mysql.py 생성
# mysql.py
from sqlalchemy import create_engine, MetaData, Table, MetaData, Column
from sqlalchemy import text
from sqlalchemy.engine import URL
import pandas as pd
class MySqlClient:
"""
MySQL 데이터베이스와 상호작용하기 위한 클라이언트 클래스입니다.
이 클래스는 SQLAlchemy를 사용하여 MySQL 데이터베이스에 연결하고 테이블을 생성, 삭제, 삽입,
업서트(upsert)와 같은 작업을 지원합니다.
"""
def __init__(
self,
server_name: str,
database_name: str,
username: str,
password: str,
port: int = 3306,
):
# 데이터베이스 연결을 위한 초기 설정
self.host_name = server_name
self.database_name = database_name
self.username = username
self.password = password
self.port = port
# MySQL 연결 URL 생성
connection_url = URL.create(
drivername="mysql+pymysql",
username=username,
password=password,
host=server_name,
port=port,
database=database_name,
)
# SQLAlchemy 엔진 생성
self.engine = create_engine(connection_url)
def create_table(self, metadata: MetaData) -> None:
"""
주어진 메타데이터 객체를 기반으로 테이블을 생성합니다.
Parameters:
- metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
"""
metadata.create_all(self.engine)
def drop_table(self, table: Table) -> None:
"""
지정된 테이블을 삭제합니다. 테이블이 존재하지 않으면 무시합니다.
Parameters:
- table_name (str): 삭제할 테이블의 이름.
"""
with self.engine.connect() as connection:
connection.execute(text(f"DROP TABLE IF EXISTS {table.name}"))
def insert(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
"""
주어진 DataFrame을 테이블에 삽입합니다. 테이블이 존재하지 않으면 생성합니다.
Parameters:
- df (pd.DataFrame): 삽입할 데이터를 포함하는 Pandas DataFrame.
- table (Table): 데이터 삽입을 위한 SQLAlchemy Table 객체.
- metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
"""
self.create_table(metadata=metadata)
df.to_sql(name=table.name, con=self.engine, if_exists="append", index=False)
def upsert(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
"""
주어진 DataFrame 데이터를 테이블에 삽입하고, 기존 레코드가 있으면 갱신합니다.
Parameters:
- df (pd.DataFrame): 삽입 또는 갱신할 데이터를 포함하는 Pandas DataFrame.
- table (Table): 업서트 작업을 수행할 SQLAlchemy Table 객체.
- metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
"""
self.create_table(metadata=metadata)
# 데이터프레임을 레코드(딕셔너리 목록)으로 변환
data = df.to_dict(orient="records")
# 테이블의 고유 키(Primary Key) 추출
key_columns = [
pk_column.name for pk_column in table.primary_key.columns.values()
]
key_values = [tuple(row[pk] for pk in key_columns) for row in data]
delete_values = ", ".join(
[f"({', '.join(map(repr, values))})" for values in key_values]
)
with self.engine.connect() as connection:
if key_values:
delete_sql = f"""
DELETE FROM {self.database_name}.{table.name}
WHERE ({', '.join(key_columns)}) IN (
{delete_values}
)
"""
connection.execute(text(delete_sql))
connection.commit() # DELETE 문 실행
# 변환된 데이터프레임을 테이블에 추가 (INSERT)
df.to_sql(name=table.name, con=self.engine, if_exists="append", index=False)
def overwrite(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
"""
주어진 DataFrame 데이터를 테이블의 기존 데이터를 모두 대체하도록 삽입합니다.
Parameters:
- df (pd.DataFrame): 테이블의 기존 데이터를 대체할 데이터를 포함하는 Pandas DataFrame.
- table (Table): 데이터를 대체할 SQLAlchemy Table 객체.
- metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
"""
# 테이블이 존재하지 않으면 생성
self.create_table(metadata=metadata)
# 기존 테이블 데이터 삭제
self.drop_table(table=table)
# 새로운 데이터를 테이블에 삽입
self.insert(df=df, table=table, metadata=metadata)
- connectors -> weather_api.py 생성
# weather_api.py
import requests
class WeatherApiClient:
"""
OpenWeatherMap API 클라이언트를 사용하여 날씨 데이터를 가져오는 클래스입니다.
"""
def __init__(self, api_key: str):
self.base_url = "http://api.openweathermap.org/data/2.5"
if api_key is None:
raise Exception("API 키는 None으로 설정할 수 없습니다.")
self.api_key = api_key
def get_city(self, city_name: str, temperature_units: str = "metric") -> dict:
"""
지정된 도시의 최신 날씨 데이터를 가져옵니다.
Parameters:
- city_name (str): 날씨 정보를 조회할 도시 이름.
- temperature_units (str): 온도 단위 (기본값은 'metric'으로 섭씨 기준).
'metric'은 섭씨, 'imperial'은 화씨, 'standard'는 켈빈 단위를 의미합니다.
Returns:
- dict: 요청한 도시의 날씨 데이터가 포함된 JSON 응답을 반환합니다.
Raises:
- Exception: API 요청이 실패한 경우 상태 코드와 응답 메시지와 함께 예외가 발생합니다.
"""
params = {"q": city_name, "units": temperature_units, "appid": self.api_key}
response = requests.get(f"{self.base_url}/weather", params=params)
if response.status_code == 200:
return response.json()
else:
raise Exception(
f"Open Weather API에서 데이터를 추출하지 못했습니다. 상태 코드: {response.status_code}. 응답: {response.text}"
)
- pipeline -> etl_pipeline.py 생성
import os
from dotenv import load_dotenv
from etl_module.connectors.weather_api import WeatherApiClient
from etl_module.connectors.mysql import MySqlClient
from etl_module.assets.weather import extract_weather, transform_weather, load_weather
def main():
"""
main 함수는 전체 ETL 프로세스를 실행합니다.
1. 환경 변수를 로드하여 Weather API와 MySQL 데이터베이스 연결에 필요한 정보를 가져옵니다.
2. WeatherApiClient와 MySqlClient 객체를 생성합니다.
3. ETL 프로세스를 순차적으로 수행합니다:
- 데이터를 Weather API에서 추출합니다.
- 추출된 데이터를 변환하여 필요한 형태로 가공합니다.
- 가공된 데이터를 MySQL 데이터베이스에 적재합니다.
"""
load_dotenv()
API_KEY = os.environ.get("API_KEY")
DB_SERVER_HOST = os.environ.get("DB_SERVER_HOST")
DB_USERNAME = os.environ.get("DB_USERNAME")
DB_PASSWORD = os.environ.get("DB_PASSWORD")
DB_DATABASE = os.environ.get("DB_DATABASE")
DB_PORT = os.environ.get("DB_PORT")
weather_api_client = WeatherApiClient(api_key=API_KEY)
my_sql_client = MySqlClient(
server_name=DB_SERVER_HOST,
database_name=DB_DATABASE,
username=DB_USERNAME,
password=DB_PASSWORD,
port=DB_PORT,
)
# ETL 실행
df = extract_weather(weather_api_client=weather_api_client)
clean_df = transform_weather(df)
print(clean_df.shape)
load_weather(df=clean_df, my_sql_client=my_sql_client)
if __name__ == "__main__":
main()
'[업무 지식] > Crawling' 카테고리의 다른 글
[NAVER API] 쇼핑인사이트 (0) | 2025.01.03 |
---|---|
[Selenium] CGV review Crawling (0) | 2025.01.03 |
[geocode] GeoJSON의 내용을 출력하는 HTML (0) | 2025.01.02 |
[geocode] 지오코딩으로 위치 정보 추출하기 (0) | 2025.01.02 |
[selenium] 네이버페이 주문 이력 추출하기 (0) | 2025.01.02 |