-
Notifications
You must be signed in to change notification settings - Fork 6
/
agent.py
180 lines (151 loc) · 4.07 KB
/
agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import json
import logging
import re
import openai
import requests
from bs4 import BeautifulSoup
from utils.json import to_obj
from jsonpath import jsonpath
from env import env
from utils.parse import parseone
import html2text as ht
reg = 'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F]'\
+ '[0-9a-fA-F]))+'
# print(env)
# 设置 OpenAI API 密钥
openai.api_key = env['api_key']
openai.api_base = env['api_base']
def spider(query):
url = query
params = {}
headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 \
(KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36",
}
if query.find('::') > 0:
arr = query.split('::')
url = arr[0]
params = parseone(arr[1], '=')
else:
pass
res = requests.get(url, headers=headers)
res.encoding = res.apparent_encoding
# print(query, params, res.text)
if 'select' in params:
soup = BeautifulSoup(res.text, "html.parser")
t = soup.select(params['select'])
str = ''
for item in t:
str += item.get_text()
return str
elif 'jsonpath' in params:
return jsonpath(to_obj(res.text), params['jsonpath'])
elif 'tojson' in params:
return to_obj(res.text)
else:
return res.text
def gpt_agent(content):
# print(content)
# 创建 OpenAI GPT 对象
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "user", "content": content},
]
)
result = ''
for choice in response.choices:
result += choice.message.content
return result
def gpt_agent_stream(content, messages):
if len(messages) < 2:
messages = [
{"role": "user", "content": content},
]
if len(messages) > 1:
messages[len(messages)-1]['content'] = content
# print(content)
# 创建 OpenAI GPT 对象
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=messages,
stream=True
)
return response
def gpt_agent_http_stream(content, messages):
str = _link2text(content)
# print(str)
# return str
return gpt_agent_stream(str, messages)
def _link2text(content):
links = re.findall(reg, content)
for link in links:
text = spider('{}::select=body'.format(link)).strip()
content = content.replace(link, '{}'.format(re.sub('\n+', ' ', text)))
return content
def decrease(env, key):
v = int(env[key])-1
env[key] = v
return v
def increase(env, key):
v = int(env[key])+1
env[key] = v
return v
def len0(env, key):
return len(env[key])
def textlen0(data):
if data == 'None':
return 0
else:
return len(data)
def set(env, exp):
kv = exp.split('=')
k = kv[0].trim()
v = kv[1].trim()
v = env[v] if v in env else v
if k in env:
env[k] = v
return v
else:
return None
def define(env, exp):
kv = exp.split('=')
k = kv[0].trim()
v = kv[1].trim()
v = env[v] if v in env else v
if k in env:
return None
else:
env[k] = v
return v
def html2md(data):
text_maker = ht.HTML2Text()
text_maker.bypass_tables = False
md = text_maker.handle(data)
return md
def agent(content, env, prompt):
index = content.find(':')
agentName = content[:index]
promp = content[index+1:].strip()
# print(promp)
# print(str(index)+agentName+'---' + promt)
if agentName == 'chat':
return gpt_agent(promp)
elif agentName == 'spider':
return spider(promp)
elif agentName == 'html2md':
return html2md(promp)
elif agentName == '-':
return decrease(env, promp)
elif agentName == '+':
return increase(env, promp)
elif agentName == 'set!':
return set(env, promp)
elif agentName == 'define':
return define(env, promp)
elif agentName == 'len':
return len0(env, promp)
elif agentName == 'text-len':
return textlen0(promp)
else:
return promp