File size: 2,352 Bytes
ed1e314 1aa1aec ed1e314 1aa1aec ed1e314 1aa1aec ed1e314 1aa1aec ed1e314 1aa1aec ed1e314 1aa1aec ed1e314 1aa1aec ed1e314 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
import ray
from ray import serve
import time
import asyncio
# Create a Semaphore object
semaphore = asyncio.Semaphore(10)
test_image_url = "https://static.wixstatic.com/media/4d6b49_42b9435ce1104008b1b5f7a3c9bfcd69~mv2.jpg/v1/fill/w_454,h_333,fp_0.50_0.50,q_90/4d6b49_42b9435ce1104008b1b5f7a3c9bfcd69~mv2.jpg"
english_text = (
"It was the best of times, it was the worst of times, it was the age "
"of wisdom, it was the age of foolishness, it was the epoch of belief"
)
async def send_text_request(serve_client, number):
async with semaphore:
# async_handle = serve_client.get_handle("CLIPTransform", sync=False)
async_handle = serve.get_deployment("CLIPTransform").get_handle(sync=False)
# async_handle = serve.get_deployment("CLIPTransform").get_handle()
embeddings = ray.get(await async_handle.text_to_embeddings.remote(english_text))
# embeddings = await async_handle.text_to_embeddings.remote(english_text)
# embeddings = async_handle.text_to_embeddings.remote(english_text)
# embeddings = await ray.get(embeddings)
return number, embeddings
# def process_text(server_client, numbers, max_workers=10):
# with ThreadPoolExecutor(max_workers=max_workers) as executor:
# futures = [executor.submit(send_text_request, server_client, number) for number in numbers]
# for future in as_completed(futures):
# n_result, result = future.result()
# print (f"{n_result} : {len(result[0])}")
async def process_text(server_client, numbers):
tasks = [send_text_request(server_client, number) for number in numbers]
for future in asyncio.as_completed(tasks):
n_result, result = await future
print (f"{n_result} : {len(result[0])}")
if __name__ == "__main__":
# n_calls = 100000
n_calls = 1
numbers = list(range(n_calls))
ray.init()
server_client = serve.start(detached=True)
start_time = time.monotonic()
# Run the async function
asyncio.run(process_text(server_client, numbers))
end_time = time.monotonic()
total_time = end_time - start_time
avg_time_ms = total_time / n_calls * 1000
calls_per_sec = n_calls / total_time
print(f"Average time taken: {avg_time_ms:.2f} ms")
print(f"Number of calls per second: {calls_per_sec:.2f}")
ray.shutdown()
|