leadfetcher / apollo_apis.py
ShahzainHaider's picture
Upload folder using huggingface_hub
25feb32 verified
import json
import traceback
import requests
def get_mixed_people(payload, api_key):
try:
url = "https://api.apollo.io/api/v1/mixed_people/search"
headers = {
"accept": "application/json",
"Cache-Control": "no-cache",
"Content-Type": "application/json",
# "x-api-key": "OykhjTWxqLFmU4fXgwqK5A",
"x-api-key": api_key,
}
# API payload
# payload = {
# "page": 1,
# "contact_email_status_v2": ["likely_to_engage", "verified"],
# "prospected_by_current_team": ["no"],
# "sort_by_field": "[none]",
# "sort_ascending": False,
# "included_organization_keyword_fields": ["tags", "name"],
# "person_titles": ["ceo"],
# "organization_industry_tag_ids": ["5567e09973696410db020800"],
# "person_locations": ["Pakistan"],
# "display_mode": "explorer_mode",
# "per_page": 25,
# "open_factor_names": [],
# "num_fetch_result": 1,
# "context": "people-index-page",
# "show_suggestions": False,
# "include_account_engagement_stats": False,
# "include_contact_engagement_stats": False,
# "finder_version": 2,
# "ui_finder_random_seed": "n6d8ehzv75e",
# "typed_custom_fields": [],
# "cacheKey": 1733218374342,
# }
response = requests.post(url, headers=headers, json=payload)
return response.json()
except Exception as e:
print(f"Exception | get_mixed_people | {str(e)}")
traceback.print_exc()
return None
def get_person_contact(linkedin_url_list, api_key):
try:
url = "https://api.apollo.io/api/v1/people/bulk_match?reveal_personal_emails=false&reveal_phone_number=false"
payload = json.dumps({"details": linkedin_url_list})
headers = {
"Cache-Control": "no-cache",
"Content-Type": "application/json",
"accept": "application/json",
# "x-api-key": "OykhjTWxqLFmU4fXgwqK5A",
"x-api-key": api_key,
"Cookie": "__cf_bm=fbCqfv0bviE241FNxIFtGRazsfW7wpYXKqckX8qwaTU-1733229644-1.0.1.1-mcLtaqMx7hWrWH1BDNbzh2fogoZeFALgNOaOHI9e5M9JjJIOkWDiapult1XD2EPZnMknAghIxsaGbceRIrSnxw; GCLB=CMbm5PSXsf78BhAD; X-CSRF-TOKEN=_GS_YLyUWQYRh5SiYNNlv2x_VQx_oUP_7J4T9TNY5LVJwWY0ixqR_ecVAqILwjJr5aZKNH5ArKsUkdRbdt22kg; _leadgenie_session=87GR42WEPBd%2B5v0gjjLJtxeUYs66x%2FnC6iaESvijq%2FQVEJgh02tjtDCEaK9ojKuzQEhxeRjPbU9X%2F4foy1BtLzXfJsg2St4cru474ktdtlBCyo9iTKgJPLCAq7OYeV75ypLf3FhD5WfQkNP89uZs%2FLJ0aPV6yDVa3hzSwAZOfaqOYg0witdgFMLC4InO2CM1rq7KwYUFPeLSdOm76wsFLDM6WF6h9OT96UxKHgtoDqYBZwcQwg7MO2Sw0mfO7%2FMzWeJSB6pOM623Hry6l2W4rNb9JM%2FJFTWiXCw%3D--tAnL5HPcGsfD3exh--lO%2Fqu5DOk5Ix0MyXyIPcnQ%3D%3D",
}
response = requests.request("POST", url, headers=headers, data=payload)
matches: list[dict] = response.json()["matches"]
print("matches >> ", len(matches))
emails = []
phone_numbers = []
for match in matches:
if match is not None:
print("match > ", len(match))
emails.append(match.get("email", "None"))
# Navigate the nested dictionary safely
phone_number = (
match.get("organization", {})
.get("primary_phone", {})
.get("sanitized_number", None)
)
phone_numbers.append(phone_number)
else:
emails.append(None)
phone_numbers.append(None)
return emails, phone_numbers
except Exception as e:
print(f"Exception | get_person_contact | {str(e)}")
traceback.print_exc()
return [], []
# payload = {"page":1,"contact_email_status_v2":["likely_to_engage","verified"],"prospected_by_current_team":["no"],"sort_by_field":"[none]","sort_ascending":false,"included_organization_keyword_fields":["tags","name"],"person_titles":["ceo"],"organization_industry_tag_ids":["5567e09973696410db020800"],"person_locations":["Pakistan"],"display_mode":"explorer_mode","per_page":25,"open_factor_names":[],"num_fetch_result":1,"context":"people-index-page","show_suggestions":false,"include_account_engagement_stats":false,"include_contact_engagement_stats":false,"finder_version":2,"ui_finder_random_seed":"ecsv3i0wog4","typed_custom_fields":[],"cacheKey":1733292002393}
# res = get_mixed_people(payload)
# print(res['pagination'])