본문 바로가기
[업무 지식]/Crawling

[youtube] 유튜브에서 동영상 정보 수집하기

by 에디터 윤슬 2025. 1. 1.

 

  • .env 파일에 YOUTUBE_API_KEY 선언

Google API Client for Python 사용하기

import os
from dotenv import load_dotenv
# !pip3 install google-api-python-client
from apiclient.discovery import build

load_dotenv()

# 환경변수에서 API 키 추출하기
YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY')

# YouTube API 클라이언트를 생성합니다.
# build() 함수의 첫 번째 매개변수에는 API 이름
# 두 번째 매개변수에는 API 버전을 지정합니다.
# 키워드 매개변수 developerKey에는 API 키를 지정합니다.
# 이 함수는 내부적으로 https://www.googleapis.com/discovery/v1/apis/youtube/v3/rest라는
# URL에 접근하고 API 리소스와 메서드 정보를 추출합니다.
youtube = build('youtube', 'v3', developerKey=YOUTUBE_API_KEY)

# 키워드 매개변수로 매개변수를 지정하고
# search.list 메서드를 호출합니다.
# list() 메서드를 실행하면 googleapiclient.http.HttpRequest가 반환됩니다. 
# execute() 메서드를 실행하면 실제 HTTP 요청이 보내지며, API 응답이 반환됩니다.
search_response = youtube.search().list(
    part='snippet',
    q='요리',
    type='video',
).execute()

# search_response는 API 응답을 JSON으로 나타낸 dict 객체입니다.
for item in search_response['items']:
    # 동영상 제목을 출력합니다.
    print(item['snippet']['title'])

 

동영상의 상세한 메타 정보 추출하기 & MySQL에 저장하고 검색하기

import os
import sys
import pymysql
from apiclient.discovery import build

load_dotenv()
# 환경변수에서 API 키를 추출합니다.
YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY')
DB_SERVER_HOST=os.getenv('DB_SERVER_HOST')
DB_PORT=os.getenv('DB_PORT')
DB_DATABASE=os.getenv('DB_DATABASE')
DB_USERNAME=os.getenv('DB_USERNAME')
DB_PASSWORD=os.getenv('DB_PASSWORD')

def main():
    """
    메인 처리
    """
    # MySQL 데이터베이스 연결 설정
    db_config = {
        "host": DB_SERVER_HOST,
        "user": DB_USERNAME,  # MySQL 사용자 이름
        "password": DB_PASSWORD,  # MySQL 비밀번호
        "database": DB_DATABASE,  # 사용할 데이터베이스 이름
        "charset": "utf8mb4"
    }
    
    
    
    # MySQL 연결 생성
    conn = pymysql.connect(**db_config)
    cursor = conn.cursor()

    # 테이블 생성 (필요시)
    create_table_query = """
    CREATE TABLE IF NOT EXISTS videos (
        id VARCHAR(255) PRIMARY KEY,
        title VARCHAR(255) NOT NULL,
        description TEXT,
        view_count INT,
        like_count INT,
        comment_count INT
    )
    """
    cursor.execute(create_table_query)
    conn.commit()

    # 기존의 모든 데이터를 삭제합니다.
    cursor.execute("DELETE FROM videos")
    conn.commit()

    # 동영상을 검색하고, 페이지 단위로 아이템 목록을 저장합니다.
    for items_per_page in search_videos('요리'):
        save_to_mysql(cursor, conn, items_per_page)

    # 뷰 수가 높은 동영상을 출력합니다.
    show_top_videos(cursor)

    # 연결 종료
    cursor.close()
    conn.close()

def search_videos(query, max_pages=5):
    """
    동영상을 검색하고, 페이지 단위로 list를 yield합니다.
    """
    # YouTube의 API 클라이언트 생성하기
    youtube = build('youtube', 'v3', developerKey=YOUTUBE_API_KEY)
    
    # search.list 메서드로 처음 페이지 추출을 위한 요청 전송하기
    search_request = youtube.search().list(
        part='id',
        q=query,
        type='video',
        maxResults=50,
    )
    
    i = 0
    while search_request and i < max_pages:
        # 요청을 전송합니다.
        search_response = search_request.execute()
        
        # 동영상 ID의 리스트를 추출합니다.
        video_ids = [item['id']['videoId'] for item in search_response['items']]
        
        # videos.list 메서드로 동영상의 상세 정보를 추출합니다.
        videos_response = youtube.videos().list(
            part='snippet,statistics',
            id=','.join(video_ids)
        ).execute()
        
        # 현재 페이지 내부의 아이템을 yield합니다.
        yield videos_response['items']
        
        # list_next() 메서드로 다음 페이지를 추출하기 위한 요청을 보냅니다.
        search_request = youtube.search().list_next(search_request, search_response)
        i += 1

def save_to_mysql(cursor, conn, items):
    """
    MySQL에 아이템을 저장합니다.
    """
    for item in items:
        video_id = item['id']
        title = item['snippet']['title']
        description = item['snippet'].get('description', '')
        
        statistics = item.get('statistics', {})
        view_count = int(statistics.get('viewCount', 0))
        like_count = int(statistics.get('likeCount', 0))
        comment_count = int(statistics.get('commentCount', 0))

        # INSERT 쿼리를 실행합니다.
        insert_query = """
            INSERT INTO videos (id, title, description, view_count, like_count, comment_count)
            VALUES (%s, %s, %s, %s, %s, %s)
            ON DUPLICATE KEY UPDATE 
                title=VALUES(title),
                description=VALUES(description),
                view_count=VALUES(view_count),
                like_count=VALUES(like_count),
                comment_count=VALUES(comment_count)
        """
        
        cursor.execute(insert_query, (video_id, title, description, view_count, like_count, comment_count))
    
    conn.commit()
    print(f"Inserted {len(items)} records into MySQL.", file=sys.stderr)

def show_top_videos(cursor):
    """
    MySQL에서 뷰 수를 기준으로 상위 5개 동영상을 출력합니다.
    """
    query = "SELECT view_count, title FROM videos ORDER BY view_count DESC LIMIT 5"
    
    cursor.execute(query)
    
    for row in cursor.fetchall():
        print(f"{row[0]} views - {row[1]}")

if __name__ == '__main__':
    main()