import os |
import sys |
import time |
import random |
import httplib2 |
from termcolor import colored |
from oauth2client.file import Storage |
from apiclient.discovery import build |
from apiclient.errors import HttpError |
from apiclient.http import MediaFileUpload |
from oauth2client.tools import argparser, run_flow |
from oauth2client.client import flow_from_clientsecrets |
httplib2.RETRIES = 1 |
RETRIABLE_EXCEPTIONS = (httplib2.HttpLib2Error, IOError, httplib2.ServerNotFoundError) |
RETRIABLE_STATUS_CODES = [500, 502, 503, 504] |
CLIENT_SECRETS_FILE = "./client_secret.json" |
SCOPES = ['https://www.googleapis.com/auth/youtube.upload', |
'https://www.googleapis.com/auth/youtube', |
'https://www.googleapis.com/auth/youtubepartner'] |
WARNING: Please configure OAuth 2.0 |
To make this sample run you will need to populate the client_secrets.json file |
found at: |
{os.path.abspath(os.path.join(os.path.dirname(__file__), CLIENT_SECRETS_FILE))} |
with information from the API Console |
https://console.cloud.google.com/ |
For more information about the client_secrets.json file format, please visit: |
https://developers.google.com/api-client-library/python/guide/aaa_client_secrets |
""" |
VALID_PRIVACY_STATUSES = ("public", "private", "unlisted") |
def get_authenticated_service(): |
""" |
This method retrieves the YouTube service. |
Returns: |
any: The authenticated YouTube service. |
""" |
flow = flow_from_clientsecrets(CLIENT_SECRETS_FILE, |
scope=SCOPES, |
storage = Storage(f"{sys.argv[0]}-oauth2.json") |
credentials = storage.get() |
if credentials is None or credentials.invalid: |
flags = argparser.parse_args() |
credentials = run_flow(flow, storage, flags) |
http=credentials.authorize(httplib2.Http())) |
def initialize_upload(youtube: any, options: dict): |
""" |
This method uploads a video to YouTube. |
Args: |
youtube (any): The authenticated YouTube service. |
options (dict): The options to upload the video with. |
Returns: |
response: The response from the upload process. |
""" |
tags = None |
if options['keywords']: |
tags = options['keywords'].split(",") |
body = { |
'snippet': { |
'title': options['title'], |
'description': options['description'], |
'tags': tags, |
'categoryId': options['category'] |
}, |
'status': { |
'privacyStatus': options['privacyStatus'], |
'madeForKids': False, |
'selfDeclaredMadeForKids': False |
} |
} |
insert_request = youtube.videos().insert( |
part=",".join(body.keys()), |
body=body, |
media_body=MediaFileUpload(options['file'], chunksize=-1, resumable=True) |
) |
return resumable_upload(insert_request) |
def resumable_upload(insert_request: MediaFileUpload): |
""" |
This method implements an exponential backoff strategy to resume a |
failed upload. |
Args: |
insert_request (MediaFileUpload): The request to insert the video. |
Returns: |
response: The response from the upload process. |
""" |
response = None |
error = None |
retry = 0 |
while response is None: |
try: |
print(colored(" => Uploading file...", "magenta")) |
status, response = insert_request.next_chunk() |
if 'id' in response: |
print(f"Video id '{response['id']}' was successfully uploaded.") |
return response |
except HttpError as e: |
if e.resp.status in RETRIABLE_STATUS_CODES: |
error = f"A retriable HTTP error {e.resp.status} occurred:\n{e.content}" |
else: |
raise |
error = f"A retriable error occurred: {e}" |
if error is not None: |
print(colored(error, "red")) |
retry += 1 |
if retry > MAX_RETRIES: |
raise Exception("No longer attempting to retry.") |
max_sleep = 2 ** retry |
sleep_seconds = random.random() * max_sleep |
print(colored(f" => Sleeping {sleep_seconds} seconds and then retrying...", "blue")) |
time.sleep(sleep_seconds) |
def upload_video(video_path, title, description, category, keywords, privacy_status): |
try: |
youtube = get_authenticated_service() |
channels_response = youtube.channels().list(mine=True, part='id').execute() |
for channel in channels_response['items']: |
print(colored(f" => Channel ID: {channel['id']}", "blue")) |
video_response = initialize_upload(youtube, { |
'file': video_path, |
'title': title, |
'description': description, |
'category': category, |
'keywords': keywords, |
'privacyStatus': privacy_status |
}) |
return video_response |
except HttpError as e: |
print(colored(f"[-] An HTTP error {e.resp.status} occurred:\n{e.content}", "red")) |
if e.resp.status in [401, 403]: |
youtube = get_authenticated_service() |
video_response = initialize_upload(youtube, { |
'file': video_path, |
'title': title, |
'description': description, |
'category': category, |
'keywords': keywords, |
'privacyStatus': privacy_status |
}) |
return video_response |
else: |
raise e |