-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
122 lines (100 loc) · 3.82 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import asyncio
import base64
import json
import os
import sys
import threading
from typing import Any
import numpy as np
import sounddevice as sd
import websocket
from dotenv import load_dotenv
load_dotenv()
REALTIME_API_URL = (
"wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-12-17"
)
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# グローバル変数としてWebSocketを保持
global_ws: websocket.WebSocketApp | None = None
SYSTEM_PROMPT = f"""
あなたは日本語でのみ会話してください。
"""
def audio_callback(
indata: np.ndarray[Any, np.dtype[np.int16]],
frames: int,
time: Any,
status: sd.CallbackFlags,
) -> None:
# 無音なら送信しない(必要に応じてしきい値を調整)
amplitude = np.abs(indata).mean()
if amplitude < 10:
return
# 音声データをBase64エンコードし、サーバーに送信
audio_chunk = base64.b64encode(indata.tobytes()).decode("utf-8")
if global_ws and global_ws.sock and global_ws.sock.connected:
payload = json.dumps(
{"type": "input_audio_buffer.append", "audio": audio_chunk}
)
global_ws.send(payload)
def on_open(ws: websocket.WebSocket) -> None:
# WSを介してクライアントからサーバーへイベントをsession.updateのイベントを送信する
# サーバーのデフォルト設定からこちらの希望する設定に変更することが可能
init_payload = json.dumps(
{
"type": "session.update",
"session": {
"instructions": SYSTEM_PROMPT,
},
}
)
ws.send(init_payload)
def on_message(ws: websocket.WebSocket, message: str) -> None:
try:
response = json.loads(message)
if "type" in response and response["type"] == "session.created":
# WSを介してセッションを開始した時にサーバー側で送信されるイベント
# デフォルトの設定等が入っている
print("✅ セッションが作成されました")
print(f"✅ セッション開始ログ: {response}")
if "type" in response and response["type"] == "session.updated":
print("✅ セッションが更新されました")
print(f"✅ セッション更新ログ: {response}")
if "type" in response and response["type"] == "response.audio_transcript.delta":
sys.stdout.write(response["delta"])
sys.stdout.flush()
if "type" in response and response["type"] == "response.audio_transcript.done":
# transcriptの中に最終のテキスト全体が入っている
# print(f"最終結果: {response['transcript']}")
print("\n✅ リアルタイム録音中... Ctrl+C で停止")
except json.JSONDecodeError:
print("JSON decode error, message:", message)
def on_error(ws: websocket.WebSocket, error: str) -> None:
print(f"❌ WebSocket エラー: {error}")
def run_ws() -> None:
global global_ws
headers = [f"Authorization: Bearer {OPENAI_API_KEY}", "OpenAI-Beta: realtime=v1"]
global_ws = websocket.WebSocketApp(
REALTIME_API_URL,
header=headers,
on_open=on_open,
on_message=on_message,
on_error=on_error,
)
global_ws.run_forever()
async def main() -> None:
# WebSocketを別スレッドで実行
ws_thread = threading.Thread(target=run_ws, daemon=True)
ws_thread.start()
# マイクからの音声入力を開始
with sd.InputStream(
samplerate=16000,
channels=1,
dtype=np.int16,
blocksize=1024,
callback=audio_callback,
):
print("✅ リアルタイム録音中... Ctrl+C で停止")
while True:
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())