MySQL 테이블 구조 설정 및 테이블 생성
engine = create_engine(connection_url)
table_name = 'daily_weather'
# 테이블에 대한 Metadata 설정
metadata = MetaData()
table = Table(
table_name, metadata,
Column('dt', String(8), nullable=False, primary_key=True), # 'dt'는 문자열이며 기본 키로 설정됨 (고유값)
Column('time', String(6), nullable=False, primary_key=True), # 'time'는 문자열이며 기본 키로 설정됨 (고유값)
Column('measured_at', DateTime, nullable=False), # 'measured_at'은 DateTime 타입이고 null이 허용되지 않음
Column('id', Integer, primary_key=True), # 'id'는 정수 타입이며 기본 키로 설정됨 (고유값)
Column('city', String(100), nullable=True), # 'name'은 문자열이며 null이 허용됨
Column('temperature', Float, nullable=True), # 'temperature'는 부동소수점 타입이며 null이 허용됨
Column('humidity', Integer, nullable=True), # 'humidity'는 정수 타입이며 null이 허용됨
Column('wind_speed', Float, nullable=True) # 'wind_speed'는 부동소수점 타입이며 null이 허용됨
)
# CREATE
metadata.create_all(engine)
데이터베이스 조작
- 테이블 삭제: DROP
# 테이블 삭제 (DROP)
with engine.connect() as connection:
connection.execute(text(f"DROP TABLE IF EXISTS {table_name}"))
- 데이터 삽입(INSERT)
# 데이터 삽입 (INSERT)
df.to_sql(name=table_name, con=engine, if_exists='append', index=False)
- 데이터 삭제 후 삽입(DELETE + INSERT)
# 데이터 삭제 후 삽입 (DELETE+INSERT)
# 데이터프레임을 레코드의 리스트(딕셔너리로 구성된)로 변환
data = df.to_dict(orient='records')
# 테이블에서 고유 키(Primary Key) 열을 가져옴
key_columns = [pk_column.name for pk_column in table.primary_key.columns.values()]
key_values = [tuple(row[pk] for pk in key_columns) for row in data]
delete_values = ", ".join([f"({', '.join(map(repr, values))})" for values in key_values])
with engine.connect() as connection:
if key_values:
delete_sql = f"""
DELETE FROM {DB_DATABASE}.{table.name}
WHERE ({', '.join(key_columns)}) IN (
{delete_values}
)
"""
connection.execute(text(delete_sql))
connection.commit() # 생성된 DELETE 문 실행
# 변환된 데이터프레임을 테이블에 추가 (INSERT)
df.to_sql(name=table_name, con=engine, if_exists='append', index=False)
객체 지향 프로그래밍
from sqlalchemy import create_engine, MetaData, Table
from sqlalchemy import text
from sqlalchemy.engine import URL
import pandas as pd
class MySqlClient:
"""
MySQL 데이터베이스와 상호작용하기 위한 클라이언트 클래스입니다.
"""
def __init__(
self,
server_name: str,
database_name: str,
username: str,
password: str,
port: int = 5432,
):
# 데이터베이스 연결을 위한 초기 설정
self.host_name = server_name
self.database_name = database_name
self.username = username
self.password = password
self.port = port
# MySQL 연결 URL 생성
connection_url = URL.create(
drivername="mysql+mysqlconnector",
username=username,
password=password,
host=server_name,
port=port,
database=database_name,
)
# SQLAlchemy 엔진 생성
self.engine = create_engine(connection_url)
def create_table(self, metadata: MetaData) -> None:
"""
주어진 메타데이터 객체를 기반으로 테이블을 생성합니다.
Parameters:
- metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
"""
metadata.create_all(self.engine)
def drop_table(self, table_name: str) -> None:
"""
지정된 테이블을 삭제합니다. 테이블이 존재하지 않으면 무시합니다.
Parameters:
- table_name (str): 삭제할 테이블의 이름.
"""
with self.engine.connect() as connection:
connection.execute(text(f"DROP TABLE IF EXISTS {table_name}"))
def insert(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
"""
데이터를 테이블에 삽입합니다. 테이블이 없으면 생성 후 추가합니다.
Parameters:
- df (pd.DataFrame): 삽입할 데이터를 포함하는 Pandas DataFrame.
- metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
"""
self.create_table(metadata=metadata)
df.to_sql(name=table.name, con=self.engine, if_exists="append", index=False)
def upsert(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
"""
데이터를 테이블에 삽입하고, 기존 레코드가 있으면 업데이트합니다. 테이블이 없으면 생성 후 추가합니다.
Parameters:
- df (pd.DataFrame): 삽입 또는 갱신할 데이터를 포함하는 Pandas DataFrame.
- table (Table): 업서트 작업을 수행할 SQLAlchemy 테이블 객체.
- metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
"""
self.create_table(metadata=metadata)
# 데이터프레임을 레코드(딕셔너리 목록)으로 변환
data = df.to_dict(orient="records")
# 테이블의 고유 키(Primary Key) 추출
key_columns = [
pk_column.name for pk_column in table.primary_key.columns.values()
]
key_values = [tuple(row[pk] for pk in key_columns) for row in data]
delete_values = ", ".join(
[f"({', '.join(map(repr, values))})" for values in key_values]
)
with self.engine.connect() as connection:
if key_values:
delete_sql = f"""
DELETE FROM {self.database_name}.{table.name}
WHERE ({', '.join(key_columns)}) IN (
{delete_values}
)
"""
connection.execute(text(delete_sql))
connection.commit() # DELETE 문 실행
# 변환된 데이터프레임을 테이블에 추가 (INSERT)
df.to_sql(name=table.name, con=self.engine, if_exists="append", index=False)
from sqlalchemy import Integer, String, Float, DateTime
from sqlalchemy import MetaData, Table, Column
import pandas as pd
import os
from dotenv import load_dotenv
load_dotenv()
# 2회차에서 쌓아둔 데이터 불러오기
df = pd.read_csv(
'weather_api.csv',
dtype={
'dt': 'object',
'time': 'object',
'id': 'int64',
'city': 'object',
'temperature': 'float64',
'humidity': 'int64',
'wind_speed': 'float64'
},
parse_dates=['measured_at']
)
table_name = 'daily_weather'
# 테이블에 대한 Metadata 설정
metadata = MetaData()
table = Table(
table_name, metadata,
Column('dt', String(8), nullable=False, primary_key=True), # 'dt'는 문자열이며 기본 키로 설정됨 (고유값)
Column('time', String(6), nullable=False, primary_key=True), # 'time'는 문자열이며 기본 키로 설정됨 (고유값)
Column('measured_at', DateTime, nullable=False), # 'measured_at'은 DateTime 타입이고 null이 허용되지 않음
Column('id', Integer, primary_key=True), # 'id'는 정수 타입이며 기본 키로 설정됨 (고유값)
Column('city', String(100), nullable=True), # 'name'은 문자열이며 null이 허용됨
Column('temperature', Float, nullable=True), # 'temperature'는 부동소수점 타입이며 null이 허용됨
Column('humidity', Integer, nullable=True), # 'humidity'는 정수 타입이며 null이 허용됨
Column('wind_speed', Float, nullable=True) # 'wind_speed'는 부동소수점 타입이며 null이 허용됨
)
DB_SERVER_HOST = os.environ.get('DB_SERVER_HOST')
DB_USERNAME = os.environ.get('DB_USERNAME')
DB_PASSWORD = os.environ.get('DB_PASSWORD')
DB_DATABASE = os.environ.get('DB_DATABASE')
DB_PORT = os.environ.get('DB_PORT')
my_sql_client = MySqlClient(
server_name=DB_SERVER_HOST,
database_name=DB_DATABASE,
username=DB_USERNAME,
password=DB_PASSWORD,
port=DB_PORT
)
my_sql_client.create_table(metadata=metadata)
my_sql_client.insert(df=df, table=table, metadata=metadata)
# my_sql_client.upsert(df=df, table=table, metadata=metadata)
'[업무 지식] > Crawling' 카테고리의 다른 글
[selenium] 네이버페이 주문 이력 추출하기 (0) | 2025.01.02 |
---|---|
[Selenuim] 구글 검색하기 (0) | 2025.01.02 |
[youtube] 유튜브에서 동영상 정보 수집하기 (0) | 2025.01.01 |
[API 데이터 수집, 활용] 트위터(X)에서 데이터 수집하기 (0) | 2025.01.01 |
[변화 감지하기] 크롤링 대상의 변화에 대응하기 (0) | 2025.01.01 |