arabellastrange commited on
Commit
1aca16d
·
1 Parent(s): d1ac8cf

returned concurrency

Browse files
Files changed (1) hide show
  1. web_search.py +54 -80
web_search.py CHANGED
@@ -1,3 +1,4 @@
 
1
  import copy
2
  import json
3
  import logging
@@ -9,7 +10,6 @@ import warnings
9
  from itertools import zip_longest
10
 
11
  import requests
12
- from unstructured.partition.html import partition_html
13
  from zenrows import ZenRowsClient
14
 
15
  from llmsearch import site_stats
@@ -58,48 +58,22 @@ def search(msg, query_phrase):
58
 
59
 
60
  # Define a function to make a single URL request and process the response
61
- def process_url(url, timeout):
62
  start_time = time.time()
63
  site = ut.extract_site(url)
64
  result = ""
65
  try:
66
  with warnings.catch_warnings():
67
  warnings.simplefilter("ignore")
68
- # options = Options()
69
- # options.page_load_strategy = "eager"
70
- # options.add_argument("--headless")
71
- # options.add_argument("--no-sandbox")
72
- # options.add_argument("--disable-dev-shm-usage")
73
- #
74
- # options.add_argument("start-maximized")
75
- # options.add_argument("disable-infobars")
76
- # options.add_argument("--disable-extensions")
77
- # options.add_argument("--disable-gpu")
78
- # options.add_argument("--disable-dev-shm-usage")
79
  result = ""
80
- # make driver exec
81
- # os.chmod('chromedriver-linux64/chromedriver', stat.S_IEXEC)
82
  try:
83
- # driver = webdriver.Chrome(service=ChromeService(executable_path='chromedriver-linux64/chromedriver'),
84
- # options=options)
85
- # logger.info(f"*****setting page load timeout {timeout}")
86
- # driver.set_page_load_timeout(timeout)
87
- # driver.get(url)
88
- # response = driver.page_source
89
  client = ZenRowsClient(os.getenv('zenrows_api_key'))
90
  response = client.get(url)
91
  print(f'got response, status: {response.status_code}')
92
- # result = response_text_extract(url=url, response=response)
93
  result = response.text
94
  except Exception:
95
  traceback.print_exc()
96
  return "", url
97
- # except selenium.common.exceptions.TimeoutException:
98
- # return "", url
99
- # except selenium.common.exceptions.WebDriverException:
100
- # traceback.print_exc()
101
- # logger.info(f"webdriver failed to load")
102
- # return "", url
103
  except Exception:
104
  traceback.print_exc()
105
  print(f"{site} err")
@@ -119,59 +93,59 @@ def process_urls(urls):
119
  urls_tried = ["" for i in range(30)]
120
  start_time = time.time()
121
  in_process = []
122
- # processed = []
123
- # google_futures = []
124
 
125
- # with (concurrent.futures.ThreadPoolExecutor(max_workers=11) as executor):
126
- # initialize scan of Google urls
127
- try:
128
- while (len(urls) > 0
129
- # no sense starting if not much time left
130
- and (len(full_text) < 4800 and len(in_process) < 10 and time.time() - start_time < 8)
131
- ):
132
- recommendation = site_stats.get_next(urls, sample_unknown=True)
133
- # set timeout so we don't wait for a slow site forever
134
- timeout = 12 - int(time.time() - start_time)
135
- url = recommendation[1]
136
- # future = executor.submit(process_url, query_phrase, url, timeout)
137
- result, url = process_url(url, timeout)
138
- # google_futures.append(future)
139
- # in_process.append(future)
140
- urls_tried[tried_index] = url
141
- tried_index += 1
142
- urls.remove(url)
143
- print(f"queued {ut.extract_site(url)}, {timeout}")
144
- # Process the responses as they arrive
145
- # for future in in_process:
146
- # if future.done():
147
- # result, url = future.result()
148
- # processed.append(future)
149
- # in_process.remove(future)
150
- if len(result) > 0:
151
- urls_used[used_index] = url
152
- used_index += 1
153
- print(
154
- f"adding {len(result)} chars from {ut.extract_site(url)} to {len(response)} prior responses"
155
- )
156
- if "an error has occurred" not in result.lower() and "permission to view this page" not in result.lower() and "403 ERROR" not in result.lower() and "have been blocked" not in result.lower() and "too many requests" not in result.lower():
157
- response.append(
158
- {
159
- "source": ut.extract_domain(url),
160
- "url": url,
161
- "text": result,
162
- }
163
- )
164
-
165
- if (len(urls) == 0 and len(in_process) == 0) or (time.time() - start_time > 28):
166
- # executor.shutdown(wait=False)
167
- print(
168
- f"n****** exiting process urls early {len(response)} {int(time.time() - start_time)} secs\n"
169
- )
170
- return response, used_index, urls_used, tried_index, urls_tried
171
- time.sleep(0.5)
172
- except:
173
- traceback.print_exc()
174
- # executor.shutdown(wait=False)
175
  print(
176
  f"\n*****processed all urls {len(response)} {int(time.time() - start_time)} secs"
177
  )
 
1
+ import concurrent.futures
2
  import copy
3
  import json
4
  import logging
 
10
  from itertools import zip_longest
11
 
12
  import requests
 
13
  from zenrows import ZenRowsClient
14
 
15
  from llmsearch import site_stats
 
58
 
59
 
60
  # Define a function to make a single URL request and process the response
61
+ def process_url(url):
62
  start_time = time.time()
63
  site = ut.extract_site(url)
64
  result = ""
65
  try:
66
  with warnings.catch_warnings():
67
  warnings.simplefilter("ignore")
 
 
 
 
 
 
 
 
 
 
 
68
  result = ""
 
 
69
  try:
 
 
 
 
 
 
70
  client = ZenRowsClient(os.getenv('zenrows_api_key'))
71
  response = client.get(url)
72
  print(f'got response, status: {response.status_code}')
 
73
  result = response.text
74
  except Exception:
75
  traceback.print_exc()
76
  return "", url
 
 
 
 
 
 
77
  except Exception:
78
  traceback.print_exc()
79
  print(f"{site} err")
 
93
  urls_tried = ["" for i in range(30)]
94
  start_time = time.time()
95
  in_process = []
96
+ processed = []
97
+ google_futures = []
98
 
99
+ with (concurrent.futures.ThreadPoolExecutor(max_workers=11) as executor):
100
+ # initialize scan of Google urls
101
+ while True:
102
+ try:
103
+ while (len(urls) > 0
104
+ # no sense starting if not much time left
105
+ and (len(full_text) < 4800 and len(in_process) < 10 and time.time() - start_time < 8)
106
+ ):
107
+ recommendation = site_stats.get_next(urls, sample_unknown=True)
108
+ # set timeout so we don't wait for a slow site forever
109
+ timeout = 12 - int(time.time() - start_time)
110
+ url = recommendation[1]
111
+ future = executor.submit(process_url, url)
112
+ # result, url = process_url(url)
113
+ google_futures.append(future)
114
+ in_process.append(future)
115
+ urls_tried[tried_index] = url
116
+ tried_index += 1
117
+ urls.remove(url)
118
+ print(f"queued {ut.extract_site(url)}, {timeout}")
119
+ # Process the responses as they arrive
120
+ for future in in_process:
121
+ if future.done():
122
+ result, url = future.result()
123
+ processed.append(future)
124
+ in_process.remove(future)
125
+ if len(result) > 0:
126
+ urls_used[used_index] = url
127
+ used_index += 1
128
+ print(
129
+ f"adding {len(result)} chars from {ut.extract_site(url)} to {len(response)} prior responses"
130
+ )
131
+ if "an error has occurred" not in result.lower() and "permission to view this page" not in result.lower() and "403 ERROR" not in result.lower() and "have been blocked" not in result.lower() and "too many requests" not in result.lower():
132
+ response.append(
133
+ {
134
+ "source": ut.extract_domain(url),
135
+ "url": url,
136
+ "text": result,
137
+ }
138
+ )
139
+ if (len(urls) == 0 and len(in_process) == 0) or (time.time() - start_time > 28):
140
+ executor.shutdown(wait=False)
141
+ print(
142
+ f"n****** exiting process urls early {len(response)} {int(time.time() - start_time)} secs\n"
143
+ )
144
+ return response, used_index, urls_used, tried_index, urls_tried
145
+ time.sleep(0.5)
146
+ except:
147
+ traceback.print_exc()
148
+ executor.shutdown(wait=False)
149
  print(
150
  f"\n*****processed all urls {len(response)} {int(time.time() - start_time)} secs"
151
  )