File size: 3,561 Bytes
b203730 0fed305 b203730 0fed305 b203730 0fed305 b203730 0fed305 b203730 0fed305 b203730 0fed305 b203730 0fed305 b203730 0fed305 b203730 0fed305 b203730 0fed305 b203730 0fed305 b203730 0fed305 b203730 0fed305 b203730 0fed305 b203730 0fed305 b203730 0fed305 b203730 0fed305 b203730 0fed305 b203730 0fed305 b203730 0fed305 b203730 0fed305 b203730 0fed305 b203730 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# Import necessary libraries
from requests import get
from browser_cookie3 import edge, chrome
from ssl import create_default_context
from certifi import where
from uuid import uuid4
from random import randint
from json import dumps, loads
import asyncio
import websockets
# Set up SSL context
ssl_context = create_default_context()
ssl_context.load_verify_locations(where())
def format(msg: dict) -> str:
"""Format message as JSON string with delimiter."""
return dumps(msg) + '\x1e'
def get_token():
"""Retrieve token from browser cookies."""
cookies = {c.name: c.value for c in edge(domain_name='bing.com')}
return cookies['_U']
class AsyncCompletion:
async def create(
prompt: str = 'hello world',
optionSets: list = [
'deepleo',
'enable_debug_commands',
'disable_emoji_spoken_text',
'enablemm',
'h3relaxedimg'
],
token: str = get_token()):
"""Create a connection to Bing AI and send the prompt."""
# Send create request
create = get('https://edgeservices.bing.com/edgesvc/turing/conversation/create',
headers={
'host': 'edgeservices.bing.com',
'authority': 'edgeservices.bing.com',
'cookie': f'_U={token}',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.69',
}
)
# Extract conversation data
conversationId = create.json()['conversationId']
clientId = create.json()['clientId']
conversationSignature = create.json()['conversationSignature']
# Connect to WebSocket
wss = await websockets.connect('wss://sydney.bing.com/sydney/ChatHub', max_size=None, ssl=ssl_context,
extra_headers={
# Add necessary headers
}
)
# Send JSON protocol version
await wss.send(format({'protocol': 'json', 'version': 1}))
await wss.recv()
# Define message structure
struct = {
# Add necessary message structure
}
# Send message
await wss.send(format(struct))
# Process responses
base_string = ''
final = False
while not final:
objects = str(await wss.recv()).split('\x1e')
for obj in objects:
if obj is None or obj == '':
continue
response = loads(obj)
if response.get('type') == 1 and response['arguments'][0].get('messages',):
response_text = response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0].get(
'text')
yield (response_text.replace(base_string, ''))
base_string = response_text
elif response.get('type') == 2:
final = True
await wss.close()
async def run():
"""Run the async completion and print the result."""
async for value in AsyncCompletion.create(
prompt='summarize cinderella with each word beginning with a consecutive letter of the alphabet, a-z',
optionSets=[
"galileo",
]
):
print(value, end='', flush=True)
asyncio.run(run())
|