-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
233 lines (191 loc) · 8.09 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
from flask import Flask, request, jsonify, send_from_directory
import os
import json
import uuid
import sqlite3
from libs.agent_database import AgentDatabase, AgentTable, AgentRunResultsTable
from libs.agent import AgentRunResult
from tools.user_directory import UserDirectory
from tools.quest_manager import Quest, QuestSubmission, QuestReview, QuestManager
app = Flask(__name__)
app.static_folder = 'web'
app.static_url_path = '/static'
# Create the Directory instance pointing to our forum.db
forum_db_path = os.path.join(os.path.dirname(__file__), 'forum.db')
agent_db_path = os.path.join(os.path.dirname(__file__), 'agent_database.db')
user_directory_db_path = os.path.join(os.path.dirname(__file__), 'user_directory.db')
quest_db_path = os.path.join(os.path.dirname(__file__), 'quest_database.db')
agent_database = AgentDatabase(agent_db_path)
user_directory = UserDirectory(user_directory_db_path)
quest_manager = QuestManager(agent_id="admin", db_path=quest_db_path)
llm_url = "http://localhost:5000"
# Create a dummy admin agent for messaging
class AdminAgent:
def __init__(self):
self.id = "admin"
self.name = "Admin"
admin_agent = AdminAgent()
# Ensure admin user exists in the directory
user_directory.add_user(admin_agent)
@app.route('/')
def serve_app(agent_id=None):
return send_from_directory('web', 'index.html')
@app.route('/messaging')
def serve_messaging():
return send_from_directory('web', 'messaging.html')
@app.route('/api/create_quest', methods=['POST'])
def create_quest():
data = request.json
quest_manager._create_quest_for_agent(llm_url, data['agent_id'], data['overall_goal'], data['details'], data['context'])
return jsonify({"success": True})
@app.route('/api/list_agents', methods=['GET'])
def list_agents():
agent_list = agent_database.get_agent_list()
# Convert the list of tuples to list of dicts
formatted_list = [{"agent_id": row[0], "agent_name": row[1], "pass_number": row[2], "last_run_date": row[3]} for row in agent_list]
return jsonify(formatted_list)
@app.route('/api/get_agent_run_results', methods=['GET'])
def get_agent_run_results():
agent_id = request.args.get('agent_id')
limit = int(request.args.get('limit', 10))
offset = int(request.args.get('offset', 0))
if not agent_id:
return jsonify({"error": "agent_id parameter is required"}), 400
agent_run_results = agent_database.get_agent_run_results(agent_id, limit, offset)
# Convert the database rows to AgentRunResultsTable objects
run_results_table_rows = [
AgentRunResultsTable(
pass_id=row[0],
agent_id=row[1],
pass_number=row[2],
agent_run_result=row[3],
run_date=row[4]
) for row in agent_run_results
]
# Return the JSON strings directly
return jsonify([row.agent_run_result for row in run_results_table_rows])
# New messaging endpoints
@app.route('/api/get_users', methods=['GET'])
def get_users():
users_text = user_directory.get_users()
# Parse the text output into a structured format
users = []
for line in users_text.split('\n')[1:]: # Skip the "Users:" header
line = line.strip()
if line:
# Extract name and user_id from the line
parts = line.split('(user_id: ')
if len(parts) == 2:
name = parts[0].strip('- ').strip()
user_id = parts[1].strip(')')
users.append({"user_id": user_id, "name": name})
return jsonify(users)
@app.route('/api/send_message', methods=['POST'])
def send_message():
data = request.json
recipient_id = data.get('recipient_id')
message = data.get('message')
if not recipient_id or not message:
return jsonify({"error": "recipient_id and message are required"}), 400
result = user_directory.send_message(admin_agent, recipient_id, message)
return jsonify({"result": result})
@app.route('/api/get_messages', methods=['GET'])
def get_messages():
sender_id = request.args.get('sender_id')
limit = int(request.args.get('limit', 10))
messages_text = user_directory.get_messages(admin_agent, sender_id, limit)
# Parse the text output into a structured format
messages = []
for line in messages_text.split('\n')[1:]: # Skip the "Messages:" header
line = line.strip()
if line and not line == "[No messages]":
# Extract sender and message from the line
parts = line.split(': ', 1)
if len(parts) == 2:
sender = parts[0].strip('- From ')
message_content = parts[1]
messages.append({"sender": sender, "message": message_content})
return jsonify(messages)
@app.route('/api/get_new_messages', methods=['GET'])
def get_new_messages():
messages_text = user_directory.get_new_messages(admin_agent)
# Parse the text output into a structured format
messages = []
for line in messages_text.split('\n')[1:]: # Skip the "Messages:" header
line = line.strip()
if line and not line == "[No messages]":
# Extract sender and message from the line
parts = line.split(': ', 1)
if len(parts) == 2:
sender = parts[0].strip('- From ')
message_content = parts[1]
messages.append({"sender": sender, "message": message_content})
return jsonify(messages)
# Add a route to serve static files from web directory
@app.route('/static/<path:path>')
def send_static(path):
return send_from_directory('web', path)
# Add new quest-related endpoints
@app.route('/api/get_agent_quests', methods=['GET'])
def get_agent_quests():
agent_id = request.args.get('agent_id')
if not agent_id:
return jsonify({"error": "agent_id parameter is required"}), 400
db = sqlite3.connect(quest_db_path)
cursor = db.execute("SELECT * FROM quests WHERE agent_id = ?", (agent_id,))
quests = []
for row in cursor:
print(row)
quest_data = row[3]
quest = Quest.model_validate_json(quest_data)
result = {
"quest_id": row[0],
"agent_id": row[1],
"quest_title": row[2],
"quest": quest.model_dump_json()
}
quests.append(result)
db.close()
return jsonify(quests)
@app.route('/api/get_quest_submissions', methods=['GET'])
def get_quest_submissions():
quest_id = request.args.get('quest_id')
if not quest_id:
return jsonify({"error": "quest_id parameter is required"}), 400
db = sqlite3.connect(quest_db_path)
cursor = db.execute("SELECT * FROM quest_submissions WHERE quest_id = ?", (quest_id,))
submissions = []
for row in cursor:
submissions.append(row)
db.close()
return jsonify(submissions)
@app.route('/api/get_quest_reviews', methods=['GET'])
def get_quest_reviews():
quest_id = request.args.get('quest_id')
if not quest_id:
return jsonify({"error": "quest_id parameter is required"}), 400
db = sqlite3.connect(quest_db_path)
cursor = db.execute("SELECT * FROM quest_reviews WHERE quest_id = ?", (quest_id,))
reviews = []
for row in cursor:
reviews.append(row)
db.close()
return jsonify(reviews)
@app.route('/api/submit_quest_review', methods=['POST'])
def submit_quest_review():
data = request.json
if not all(k in data for k in ['quest_id', 'submission_id', 'review_notes', 'accepted', 'exp_awarded']):
return jsonify({"error": "Missing required fields"}), 400
import datetime
review_id = str(uuid.uuid4())
review_date = datetime.datetime.now().isoformat()
quest_manager.submit_quest_review(review_id, data['quest_id'], data['submission_id'], 'admin',
data['quest_title'], data['review_notes'], data['accepted'],
data['exp_awarded'], review_date)
return jsonify({"success": True, "review_id": review_id})
@app.route('/quests')
def serve_quests():
return send_from_directory('web', 'quests.html')
if __name__ == '__main__':
# Run Flask in debug mode for local development
app.run(debug=True, port=5001)