|
from __future__ import annotations |
|
|
|
import json |
|
import random |
|
import re |
|
|
|
from aiohttp import ClientSession |
|
|
|
from .base_provider import AsyncProvider, format_prompt, get_cookies |
|
|
|
|
|
class Bard(AsyncProvider): |
|
url = "https://bard.google.com" |
|
needs_auth = True |
|
working = True |
|
_snlm0e = None |
|
|
|
@classmethod |
|
async def create_async( |
|
cls, |
|
model: str, |
|
messages: list[dict[str, str]], |
|
proxy: str = None, |
|
cookies: dict = None, |
|
**kwargs |
|
) -> str: |
|
prompt = format_prompt(messages) |
|
if proxy and "://" not in proxy: |
|
proxy = f"http://{proxy}" |
|
if not cookies: |
|
cookies = get_cookies(".google.com") |
|
|
|
headers = { |
|
'authority': 'bard.google.com', |
|
'origin': 'https://bard.google.com', |
|
'referer': 'https://bard.google.com/', |
|
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36', |
|
'x-same-domain': '1', |
|
} |
|
|
|
async with ClientSession( |
|
cookies=cookies, |
|
headers=headers |
|
) as session: |
|
if not cls._snlm0e: |
|
async with session.get(cls.url, proxy=proxy) as response: |
|
text = await response.text() |
|
|
|
match = re.search(r'SNlM0e\":\"(.*?)\"', text) |
|
if not match: |
|
raise RuntimeError("No snlm0e value.") |
|
cls._snlm0e = match.group(1) |
|
|
|
params = { |
|
'bl': 'boq_assistant-bard-web-server_20230326.21_p0', |
|
'_reqid': random.randint(1111, 9999), |
|
'rt': 'c' |
|
} |
|
|
|
data = { |
|
'at': cls._snlm0e, |
|
'f.req': json.dumps([None, json.dumps([[prompt]])]) |
|
} |
|
|
|
intents = '.'.join([ |
|
'assistant', |
|
'lamda', |
|
'BardFrontendService' |
|
]) |
|
|
|
async with session.post( |
|
f'{cls.url}/_/BardChatUi/data/{intents}/StreamGenerate', |
|
data=data, |
|
params=params, |
|
proxy=proxy |
|
) as response: |
|
response = await response.text() |
|
response = json.loads(response.splitlines()[3])[0][2] |
|
response = json.loads(response)[4][0][1][0] |
|
return response |
|
|
|
@classmethod |
|
@property |
|
def params(cls): |
|
params = [ |
|
("model", "str"), |
|
("messages", "list[dict[str, str]]"), |
|
("stream", "bool"), |
|
("proxy", "str"), |
|
] |
|
param = ", ".join([": ".join(p) for p in params]) |
|
return f"g4f.provider.{cls.__name__} supports: ({param})" |
|
|