|
|
|
from requests import get |
|
from browser_cookie3 import edge, chrome |
|
from ssl import create_default_context |
|
from certifi import where |
|
from uuid import uuid4 |
|
from random import randint |
|
from json import dumps, loads |
|
|
|
import asyncio |
|
import websockets |
|
|
|
|
|
ssl_context = create_default_context() |
|
ssl_context.load_verify_locations(where()) |
|
|
|
|
|
def format(msg: dict) -> str: |
|
"""Format message as JSON string with delimiter.""" |
|
return dumps(msg) + '\x1e' |
|
|
|
|
|
def get_token(): |
|
"""Retrieve token from browser cookies.""" |
|
cookies = {c.name: c.value for c in edge(domain_name='bing.com')} |
|
return cookies['_U'] |
|
|
|
|
|
class AsyncCompletion: |
|
async def create( |
|
prompt: str = 'hello world', |
|
optionSets: list = [ |
|
'deepleo', |
|
'enable_debug_commands', |
|
'disable_emoji_spoken_text', |
|
'enablemm', |
|
'h3relaxedimg' |
|
], |
|
token: str = get_token()): |
|
"""Create a connection to Bing AI and send the prompt.""" |
|
|
|
|
|
create = get('https://edgeservices.bing.com/edgesvc/turing/conversation/create', |
|
headers={ |
|
'host': 'edgeservices.bing.com', |
|
'authority': 'edgeservices.bing.com', |
|
'cookie': f'_U={token}', |
|
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.69', |
|
} |
|
) |
|
|
|
|
|
conversationId = create.json()['conversationId'] |
|
clientId = create.json()['clientId'] |
|
conversationSignature = create.json()['conversationSignature'] |
|
|
|
|
|
wss = await websockets.connect('wss://sydney.bing.com/sydney/ChatHub', max_size=None, ssl=ssl_context, |
|
extra_headers={ |
|
|
|
} |
|
) |
|
|
|
|
|
await wss.send(format({'protocol': 'json', 'version': 1})) |
|
await wss.recv() |
|
|
|
|
|
struct = { |
|
|
|
} |
|
|
|
|
|
await wss.send(format(struct)) |
|
|
|
|
|
base_string = '' |
|
final = False |
|
while not final: |
|
objects = str(await wss.recv()).split('\x1e') |
|
for obj in objects: |
|
if obj is None or obj == '': |
|
continue |
|
|
|
response = loads(obj) |
|
if response.get('type') == 1 and response['arguments'][0].get('messages',): |
|
response_text = response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0].get( |
|
'text') |
|
|
|
yield (response_text.replace(base_string, '')) |
|
base_string = response_text |
|
|
|
elif response.get('type') == 2: |
|
final = True |
|
|
|
await wss.close() |
|
|
|
|
|
async def run(): |
|
"""Run the async completion and print the result.""" |
|
async for value in AsyncCompletion.create( |
|
prompt='summarize cinderella with each word beginning with a consecutive letter of the alphabet, a-z', |
|
optionSets=[ |
|
"galileo", |
|
] |
|
): |
|
print(value, end='', flush=True) |
|
|
|
asyncio.run(run()) |
|
|