参考文档
环境配置
pip3 install websocket-client
核心代码
import _thread as thread
import base64
import datetime
import hashlib
import hmac
import json
from urllib.parse import urlparse
import ssl
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time
import websocket
from PIL import Image
import ioappid = "xxx"
api_secret = "xxxx"
api_key ="xxxx"
def resize_image(image_path, size):image = Image.open(image_path)resized_image = image.resize(size)print(resized_image.size)return resized_image
def encode_image(image):image_byte_array = io.BytesIO()image.save(image_byte_array, format='PNG')image_byte_array = image_byte_array.getvalue()return image_byte_array, base64.b64encode(image_byte_array).decode('utf-8')
image_path = "images/PointAClouth.png"
new_size = (1000, 1000)
resized_image = resize_image(image_path, new_size)
imagedata, base64_image = encode_image(resized_image)imageunderstanding_url = "wss://spark-api.cn-huabei-1.xf-yun.com/v2.1/image"
text =[{"role": "user", "content": base64_image, "content_type":"image"}]class Ws_Param(object):def __init__(self, APPID, APIKey, APISecret, imageunderstanding_url):self.APPID = APPIDself.APIKey = APIKeyself.APISecret = APISecretself.host = urlparse(imageunderstanding_url).netlocself.path = urlparse(imageunderstanding_url).pathself.ImageUnderstanding_url = imageunderstanding_urldef create_url(self):now = datetime.now()date = format_date_time(mktime(now.timetuple()))signature_origin = "host: " + self.host + "\n"signature_origin += "date: " + date + "\n"signature_origin += "GET " + self.path + " HTTP/1.1"signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),digestmod=hashlib.sha256).digest()signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')v = {"authorization": authorization,"date": date,"host": self.host}url = self.ImageUnderstanding_url + '?' + urlencode(v)return url
def on_error(ws, error):print("### error:", error)
def on_close(ws,one,two):print(" ")
def on_open(ws):thread.start_new_thread(run, (ws,))def run(ws, *args):data = json.dumps(gen_params(appid=ws.appid, question= ws.question ))ws.send(data)
def on_message(ws, message):data = json.loads(message)code = data['header']['code']if code != 0:print(f'请求错误: {code}, {data}')ws.close()else:choices = data["payload"]["choices"]status = choices["status"]content = choices["text"][0]["content"]print(content,end ="")global answeranswer += contentif status == 2:print("usage:", data["payload"]['usage'])ws.close()def gen_params(appid, question):"""通过appid和用户的提问来生成请参数"""data = {"header": {"app_id": appid},"parameter": {"chat": {"domain": "image","temperature": 0.5,"top_k": 4,"max_tokens": 2028,"auditing": "default"}},"payload": {"message": {"text": question}}
}return datadef main(appid, api_key, api_secret, imageunderstanding_url,imagedata,question):wsParam = Ws_Param(appid, api_key, api_secret, imageunderstanding_url)websocket.enableTrace(False)wsUrl = wsParam.create_url()ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)ws.appid = appidws.imagedata = imagedataws.question = questionws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})def getText(role, content):jsoncon = {}jsoncon["role"] = rolejsoncon["content"] = contenttext.append(jsoncon)return textdef getlength(text):length = 0for content in text:temp = content["content"]leng = len(temp)length += lengreturn lengthdef checklen(text):while (getlength(text[1:])> 8000):del text[1]return textif __name__ == '__main__':text.cleargetText("system","xxxx")Input = "xxxxx"question = checklen(getText("user",Input))answer = ""print("答:",end = "")main(appid, api_key, api_secret, imageunderstanding_url, imagedata,question)getText("assistant", answer)print(text[1:])