- .env 파일에 YOUTUBE_API_KEY 선언
Google API Client for Python 사용하기
import os
from dotenv import load_dotenv
# !pip3 install google-api-python-client
from apiclient.discovery import build
load_dotenv()
# 환경변수에서 API 키 추출하기
YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY')
# YouTube API 클라이언트를 생성합니다.
# build() 함수의 첫 번째 매개변수에는 API 이름
# 두 번째 매개변수에는 API 버전을 지정합니다.
# 키워드 매개변수 developerKey에는 API 키를 지정합니다.
# 이 함수는 내부적으로 https://www.googleapis.com/discovery/v1/apis/youtube/v3/rest라는
# URL에 접근하고 API 리소스와 메서드 정보를 추출합니다.
youtube = build('youtube', 'v3', developerKey=YOUTUBE_API_KEY)
# 키워드 매개변수로 매개변수를 지정하고
# search.list 메서드를 호출합니다.
# list() 메서드를 실행하면 googleapiclient.http.HttpRequest가 반환됩니다.
# execute() 메서드를 실행하면 실제 HTTP 요청이 보내지며, API 응답이 반환됩니다.
search_response = youtube.search().list(
part='snippet',
q='요리',
type='video',
).execute()
# search_response는 API 응답을 JSON으로 나타낸 dict 객체입니다.
for item in search_response['items']:
# 동영상 제목을 출력합니다.
print(item['snippet']['title'])
동영상의 상세한 메타 정보 추출하기 & MySQL에 저장하고 검색하기
import os
import sys
import pymysql
from apiclient.discovery import build
load_dotenv()
# 환경변수에서 API 키를 추출합니다.
YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY')
DB_SERVER_HOST=os.getenv('DB_SERVER_HOST')
DB_PORT=os.getenv('DB_PORT')
DB_DATABASE=os.getenv('DB_DATABASE')
DB_USERNAME=os.getenv('DB_USERNAME')
DB_PASSWORD=os.getenv('DB_PASSWORD')
def main():
"""
메인 처리
"""
# MySQL 데이터베이스 연결 설정
db_config = {
"host": DB_SERVER_HOST,
"user": DB_USERNAME, # MySQL 사용자 이름
"password": DB_PASSWORD, # MySQL 비밀번호
"database": DB_DATABASE, # 사용할 데이터베이스 이름
"charset": "utf8mb4"
}
# MySQL 연결 생성
conn = pymysql.connect(**db_config)
cursor = conn.cursor()
# 테이블 생성 (필요시)
create_table_query = """
CREATE TABLE IF NOT EXISTS videos (
id VARCHAR(255) PRIMARY KEY,
title VARCHAR(255) NOT NULL,
description TEXT,
view_count INT,
like_count INT,
comment_count INT
)
"""
cursor.execute(create_table_query)
conn.commit()
# 기존의 모든 데이터를 삭제합니다.
cursor.execute("DELETE FROM videos")
conn.commit()
# 동영상을 검색하고, 페이지 단위로 아이템 목록을 저장합니다.
for items_per_page in search_videos('요리'):
save_to_mysql(cursor, conn, items_per_page)
# 뷰 수가 높은 동영상을 출력합니다.
show_top_videos(cursor)
# 연결 종료
cursor.close()
conn.close()
def search_videos(query, max_pages=5):
"""
동영상을 검색하고, 페이지 단위로 list를 yield합니다.
"""
# YouTube의 API 클라이언트 생성하기
youtube = build('youtube', 'v3', developerKey=YOUTUBE_API_KEY)
# search.list 메서드로 처음 페이지 추출을 위한 요청 전송하기
search_request = youtube.search().list(
part='id',
q=query,
type='video',
maxResults=50,
)
i = 0
while search_request and i < max_pages:
# 요청을 전송합니다.
search_response = search_request.execute()
# 동영상 ID의 리스트를 추출합니다.
video_ids = [item['id']['videoId'] for item in search_response['items']]
# videos.list 메서드로 동영상의 상세 정보를 추출합니다.
videos_response = youtube.videos().list(
part='snippet,statistics',
id=','.join(video_ids)
).execute()
# 현재 페이지 내부의 아이템을 yield합니다.
yield videos_response['items']
# list_next() 메서드로 다음 페이지를 추출하기 위한 요청을 보냅니다.
search_request = youtube.search().list_next(search_request, search_response)
i += 1
def save_to_mysql(cursor, conn, items):
"""
MySQL에 아이템을 저장합니다.
"""
for item in items:
video_id = item['id']
title = item['snippet']['title']
description = item['snippet'].get('description', '')
statistics = item.get('statistics', {})
view_count = int(statistics.get('viewCount', 0))
like_count = int(statistics.get('likeCount', 0))
comment_count = int(statistics.get('commentCount', 0))
# INSERT 쿼리를 실행합니다.
insert_query = """
INSERT INTO videos (id, title, description, view_count, like_count, comment_count)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
title=VALUES(title),
description=VALUES(description),
view_count=VALUES(view_count),
like_count=VALUES(like_count),
comment_count=VALUES(comment_count)
"""
cursor.execute(insert_query, (video_id, title, description, view_count, like_count, comment_count))
conn.commit()
print(f"Inserted {len(items)} records into MySQL.", file=sys.stderr)
def show_top_videos(cursor):
"""
MySQL에서 뷰 수를 기준으로 상위 5개 동영상을 출력합니다.
"""
query = "SELECT view_count, title FROM videos ORDER BY view_count DESC LIMIT 5"
cursor.execute(query)
for row in cursor.fetchall():
print(f"{row[0]} views - {row[1]}")
if __name__ == '__main__':
main()
'[업무 지식] > Crawling' 카테고리의 다른 글
[Selenuim] 구글 검색하기 (0) | 2025.01.02 |
---|---|
[MySQL] 연결 및 테이블 조작 (0) | 2025.01.01 |
[API 데이터 수집, 활용] 트위터(X)에서 데이터 수집하기 (0) | 2025.01.01 |
[변화 감지하기] 크롤링 대상의 변화에 대응하기 (0) | 2025.01.01 |
[CacheControl] 변경된 데이터만 추출하기 (0) | 2025.01.01 |