import os import string import random import time import json import requests from requests_oauthlib import OAuth1 import pandas as pd from datasets import load_dataset BEARER_TOKEN = os.environ.get("BEARER_TOKEN") CONSUMER_KEY = os.environ.get("CONSUMER_KEY") CONSUMER_SECRET = os.environ.get("CONSUMER_SECRET") ACCESS_TOKEN = os.environ.get("ACCESS_TOKEN") ACCESS_TOKEN_SECRET = os.environ.get("ACCESS_TOKEN_SECRET") # Real tweets real_dataset = load_dataset("dawood/elon-tweets", split="train") real_df = real_dataset.to_pandas() real_tweets = real_df["Tweets"][:500].values.tolist() def load_real_tweet(): index = random.randint(0, len(real_tweets)) return real_tweets[index] # Fake tweets fake_df = pd.read_csv("elon_generated_tweets.csv") fake_tweets = fake_df.values.tolist() def load_fake_tweet(): index = random.randint(0, len(fake_tweets)) return fake_tweets[index] def connect_to_endpoint(url, params=None, data=None, type='GET'): resp = None if type == 'GET': resp = requests.get(url, json=data, params=params, auth=bearer_oauth) elif type == 'POST': resp = requests.post(url, json=data, params=params, auth=OAuth1(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_TOKEN_SECRET)) elif type == 'DELETE': resp = requests.delete(url, json=data, params=params, auth= OAuth1(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_TOKEN_SECRET)) if not resp.status_code in (200, 201): raise Exception(resp.status_code, resp.text) return resp.json() def tweet(content): url = "https://api.twitter.com/2/tweets" data = { "text": content, } connect_to_endpoint(url, data=data, type='POST') def create_poll(): url = "https://api.twitter.com/2/tweets" real_or_fake = random.randint(0, 2) function = load_real_tweet if real_or_fake > 0 else load_fake_tweet tweet = function() data = { "text": tweet, "poll": { "options": [ "Elon", "Not", ], "duration_minutes": 24 * 60 } } connect_to_endpoint(url, data=data, type='POST') return "Elon" if real_or_fake > 0 else "Not" if __name__ == "__main__": while True: answer = create_poll() time.sleep(24 * 60 * 60) tweet(f"The answer is: {answer}, contgratulations if you guessed it right!")