본문 바로가기
[업무 지식]/Crawling

[MySQL] 연결 및 테이블 조작

by 에디터 윤슬 2025. 1. 1.

MySQL 테이블 구조 설정 및 테이블 생성

engine = create_engine(connection_url)
table_name = 'daily_weather'

# 테이블에 대한 Metadata 설정 
metadata = MetaData()
table = Table(
    table_name, metadata,
    Column('dt', String(8), nullable=False, primary_key=True),  # 'dt'는 문자열이며 기본 키로 설정됨 (고유값)
    Column('time', String(6), nullable=False, primary_key=True), # 'time'는 문자열이며 기본 키로 설정됨 (고유값)
    Column('measured_at', DateTime, nullable=False),    # 'measured_at'은 DateTime 타입이고 null이 허용되지 않음
    Column('id', Integer, primary_key=True),            # 'id'는 정수 타입이며 기본 키로 설정됨 (고유값)
    Column('city', String(100), nullable=True),         # 'name'은 문자열이며 null이 허용됨
    Column('temperature', Float, nullable=True),        # 'temperature'는 부동소수점 타입이며 null이 허용됨
    Column('humidity', Integer, nullable=True),         # 'humidity'는 정수 타입이며 null이 허용됨
    Column('wind_speed', Float, nullable=True)          # 'wind_speed'는 부동소수점 타입이며 null이 허용됨
)

# CREATE 
metadata.create_all(engine)

 

데이터베이스 조작

  • 테이블 삭제: DROP
# 테이블 삭제 (DROP)  
with engine.connect() as connection:
    connection.execute(text(f"DROP TABLE IF EXISTS {table_name}"))

 

  • 데이터 삽입(INSERT)
# 데이터 삽입 (INSERT) 
df.to_sql(name=table_name, con=engine, if_exists='append', index=False)

 

  • 데이터 삭제 후 삽입(DELETE + INSERT)
# 데이터 삭제 후 삽입 (DELETE+INSERT)

# 데이터프레임을 레코드의 리스트(딕셔너리로 구성된)로 변환
data = df.to_dict(orient='records')

# 테이블에서 고유 키(Primary Key) 열을 가져옴
key_columns = [pk_column.name for pk_column in table.primary_key.columns.values()]
key_values = [tuple(row[pk] for pk in key_columns) for row in data]
delete_values = ", ".join([f"({', '.join(map(repr, values))})" for values in key_values])

with engine.connect() as connection:
    if key_values:
        delete_sql = f"""
            DELETE FROM {DB_DATABASE}.{table.name}
            WHERE ({', '.join(key_columns)}) IN (
                {delete_values}
            )
        """
        connection.execute(text(delete_sql)) 
        connection.commit() # 생성된 DELETE 문 실행
        
# 변환된 데이터프레임을 테이블에 추가 (INSERT)
df.to_sql(name=table_name, con=engine, if_exists='append', index=False)

 

객체 지향 프로그래밍

from sqlalchemy import create_engine, MetaData, Table
from sqlalchemy import text
from sqlalchemy.engine import URL
import pandas as pd


class MySqlClient:
    """
    MySQL 데이터베이스와 상호작용하기 위한 클라이언트 클래스입니다.
    """

    def __init__(
        self,
        server_name: str,
        database_name: str,
        username: str,
        password: str,
        port: int = 5432,
    ):
        # 데이터베이스 연결을 위한 초기 설정
        self.host_name = server_name
        self.database_name = database_name
        self.username = username
        self.password = password
        self.port = port

        # MySQL 연결 URL 생성
        connection_url = URL.create(
            drivername="mysql+mysqlconnector",
            username=username,
            password=password,
            host=server_name,
            port=port,
            database=database_name,
        )

        # SQLAlchemy 엔진 생성
        self.engine = create_engine(connection_url)

    def create_table(self, metadata: MetaData) -> None:
        """
        주어진 메타데이터 객체를 기반으로 테이블을 생성합니다.

        Parameters:
        - metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
        """
        metadata.create_all(self.engine)

    def drop_table(self, table_name: str) -> None:
        """
        지정된 테이블을 삭제합니다. 테이블이 존재하지 않으면 무시합니다.

        Parameters:
        - table_name (str): 삭제할 테이블의 이름.
        """
        with self.engine.connect() as connection:
            connection.execute(text(f"DROP TABLE IF EXISTS {table_name}"))

    def insert(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
        """
        데이터를 테이블에 삽입합니다. 테이블이 없으면 생성 후 추가합니다.

        Parameters:
        - df (pd.DataFrame): 삽입할 데이터를 포함하는 Pandas DataFrame.
        - metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
        """
        self.create_table(metadata=metadata)
        df.to_sql(name=table.name, con=self.engine, if_exists="append", index=False)

    def upsert(self, df: pd.DataFrame, table: Table, metadata: MetaData) -> None:
        """
        데이터를 테이블에 삽입하고, 기존 레코드가 있으면 업데이트합니다. 테이블이 없으면 생성 후 추가합니다.

        Parameters:
        - df (pd.DataFrame): 삽입 또는 갱신할 데이터를 포함하는 Pandas DataFrame.
        - table (Table): 업서트 작업을 수행할 SQLAlchemy 테이블 객체.
        - metadata (MetaData): 테이블 정의를 포함하는 SQLAlchemy MetaData 객체.
        """
        self.create_table(metadata=metadata)

        # 데이터프레임을 레코드(딕셔너리 목록)으로 변환
        data = df.to_dict(orient="records")

        # 테이블의 고유 키(Primary Key) 추출
        key_columns = [
            pk_column.name for pk_column in table.primary_key.columns.values()
        ]
        key_values = [tuple(row[pk] for pk in key_columns) for row in data]
        delete_values = ", ".join(
            [f"({', '.join(map(repr, values))})" for values in key_values]
        )

        with self.engine.connect() as connection:
            if key_values:
                delete_sql = f"""
                    DELETE FROM {self.database_name}.{table.name}
                    WHERE ({', '.join(key_columns)}) IN (
                        {delete_values}
                    )
                """
                connection.execute(text(delete_sql))
                connection.commit()  # DELETE 문 실행

        # 변환된 데이터프레임을 테이블에 추가 (INSERT)
        df.to_sql(name=table.name, con=self.engine, if_exists="append", index=False)

 

from sqlalchemy import Integer, String, Float, DateTime
from sqlalchemy import MetaData, Table, Column
import pandas as pd
import os 
from dotenv import load_dotenv

load_dotenv()

# 2회차에서 쌓아둔 데이터 불러오기 
df = pd.read_csv(
    'weather_api.csv',
    dtype={
        'dt': 'object', 
        'time': 'object', 
        'id': 'int64', 
        'city': 'object', 
        'temperature': 'float64', 
        'humidity': 'int64', 
        'wind_speed': 'float64'
    },
    parse_dates=['measured_at'] 
)

table_name = 'daily_weather'

# 테이블에 대한 Metadata 설정 
metadata = MetaData()
table = Table(
    table_name, metadata,
    Column('dt', String(8), nullable=False, primary_key=True),  # 'dt'는 문자열이며 기본 키로 설정됨 (고유값)
    Column('time', String(6), nullable=False, primary_key=True), # 'time'는 문자열이며 기본 키로 설정됨 (고유값)
    Column('measured_at', DateTime, nullable=False),    # 'measured_at'은 DateTime 타입이고 null이 허용되지 않음
    Column('id', Integer, primary_key=True),            # 'id'는 정수 타입이며 기본 키로 설정됨 (고유값)
    Column('city', String(100), nullable=True),         # 'name'은 문자열이며 null이 허용됨
    Column('temperature', Float, nullable=True),        # 'temperature'는 부동소수점 타입이며 null이 허용됨
    Column('humidity', Integer, nullable=True),         # 'humidity'는 정수 타입이며 null이 허용됨
    Column('wind_speed', Float, nullable=True)          # 'wind_speed'는 부동소수점 타입이며 null이 허용됨
)

DB_SERVER_HOST = os.environ.get('DB_SERVER_HOST') 
DB_USERNAME = os.environ.get('DB_USERNAME')  
DB_PASSWORD = os.environ.get('DB_PASSWORD') 
DB_DATABASE = os.environ.get('DB_DATABASE') 
DB_PORT = os.environ.get('DB_PORT') 

my_sql_client = MySqlClient(
    server_name=DB_SERVER_HOST, 
    database_name=DB_DATABASE, 
    username=DB_USERNAME, 
    password=DB_PASSWORD, 
    port=DB_PORT
)
my_sql_client.create_table(metadata=metadata)
my_sql_client.insert(df=df, table=table, metadata=metadata)
# my_sql_client.upsert(df=df, table=table, metadata=metadata)