import json import traceback import requests def get_mixed_people(payload, api_key): try: url = "https://api.apollo.io/api/v1/mixed_people/search" headers = { "accept": "application/json", "Cache-Control": "no-cache", "Content-Type": "application/json", # "x-api-key": "OykhjTWxqLFmU4fXgwqK5A", "x-api-key": api_key, } # API payload # payload = { # "page": 1, # "contact_email_status_v2": ["likely_to_engage", "verified"], # "prospected_by_current_team": ["no"], # "sort_by_field": "[none]", # "sort_ascending": False, # "included_organization_keyword_fields": ["tags", "name"], # "person_titles": ["ceo"], # "organization_industry_tag_ids": ["5567e09973696410db020800"], # "person_locations": ["Pakistan"], # "display_mode": "explorer_mode", # "per_page": 25, # "open_factor_names": [], # "num_fetch_result": 1, # "context": "people-index-page", # "show_suggestions": False, # "include_account_engagement_stats": False, # "include_contact_engagement_stats": False, # "finder_version": 2, # "ui_finder_random_seed": "n6d8ehzv75e", # "typed_custom_fields": [], # "cacheKey": 1733218374342, # } response = requests.post(url, headers=headers, json=payload) return response.json() except Exception as e: print(f"Exception | get_mixed_people | {str(e)}") traceback.print_exc() return None def get_person_contact(linkedin_url_list, api_key): try: url = "https://api.apollo.io/api/v1/people/bulk_match?reveal_personal_emails=false&reveal_phone_number=false" payload = json.dumps({"details": linkedin_url_list}) headers = { "Cache-Control": "no-cache", "Content-Type": "application/json", "accept": "application/json", # "x-api-key": "OykhjTWxqLFmU4fXgwqK5A", "x-api-key": api_key, "Cookie": "__cf_bm=fbCqfv0bviE241FNxIFtGRazsfW7wpYXKqckX8qwaTU-1733229644-1.0.1.1-mcLtaqMx7hWrWH1BDNbzh2fogoZeFALgNOaOHI9e5M9JjJIOkWDiapult1XD2EPZnMknAghIxsaGbceRIrSnxw; GCLB=CMbm5PSXsf78BhAD; X-CSRF-TOKEN=_GS_YLyUWQYRh5SiYNNlv2x_VQx_oUP_7J4T9TNY5LVJwWY0ixqR_ecVAqILwjJr5aZKNH5ArKsUkdRbdt22kg; _leadgenie_session=87GR42WEPBd%2B5v0gjjLJtxeUYs66x%2FnC6iaESvijq%2FQVEJgh02tjtDCEaK9ojKuzQEhxeRjPbU9X%2F4foy1BtLzXfJsg2St4cru474ktdtlBCyo9iTKgJPLCAq7OYeV75ypLf3FhD5WfQkNP89uZs%2FLJ0aPV6yDVa3hzSwAZOfaqOYg0witdgFMLC4InO2CM1rq7KwYUFPeLSdOm76wsFLDM6WF6h9OT96UxKHgtoDqYBZwcQwg7MO2Sw0mfO7%2FMzWeJSB6pOM623Hry6l2W4rNb9JM%2FJFTWiXCw%3D--tAnL5HPcGsfD3exh--lO%2Fqu5DOk5Ix0MyXyIPcnQ%3D%3D", } response = requests.request("POST", url, headers=headers, data=payload) matches: list[dict] = response.json()["matches"] print("matches >> ", len(matches)) emails = [] phone_numbers = [] for match in matches: if match is not None: print("match > ", len(match)) emails.append(match.get("email", "None")) # Navigate the nested dictionary safely phone_number = ( match.get("organization", {}) .get("primary_phone", {}) .get("sanitized_number", None) ) phone_numbers.append(phone_number) else: emails.append(None) phone_numbers.append(None) return emails, phone_numbers except Exception as e: print(f"Exception | get_person_contact | {str(e)}") traceback.print_exc() return [], [] # payload = {"page":1,"contact_email_status_v2":["likely_to_engage","verified"],"prospected_by_current_team":["no"],"sort_by_field":"[none]","sort_ascending":false,"included_organization_keyword_fields":["tags","name"],"person_titles":["ceo"],"organization_industry_tag_ids":["5567e09973696410db020800"],"person_locations":["Pakistan"],"display_mode":"explorer_mode","per_page":25,"open_factor_names":[],"num_fetch_result":1,"context":"people-index-page","show_suggestions":false,"include_account_engagement_stats":false,"include_contact_engagement_stats":false,"finder_version":2,"ui_finder_random_seed":"ecsv3i0wog4","typed_custom_fields":[],"cacheKey":1733292002393} # res = get_mixed_people(payload) # print(res['pagination'])