
文字が流れるように表示される仕組み
このシリーズ: 全10回
ChatGPTのようなリアルタイム表示を実現
はじめに
ChatGPTを使っていると、AIの回答が文字単位で流れてくるのを見たことがあるでしょう。
あの体験は、ただカッコいいだけではありません。待ち時間のストレスを大幅に軽減する効果があります。
- 全文が表示されるまで待つ:30秒が「長い」と感じる
- 文字が流れてくる:30秒でも「AIが考えている」と感じる
本記事では、このストリーミング表示を実現する技術「NDJSON」と、接続を維持する「Keepalive」の仕組みを解説します。
ストリーミングの基本
従来の方式:全部できてから
一般的なWeb APIは、処理が完了してからレスポンスを返します。
[リクエスト] → [10秒処理] → [レスポンス]
ユーザーの体験:
「...」(10秒待つ)
「ドン!」(一気に表示)これは「バッファリング」と呼ばれる方式で、短い処理なら問題ありませんが、AI分析のような長い処理では待ち時間が苦痛になります。
ストリーミング方式:少しずつ
ストリーミングでは、生成された部分からすぐに送信します。
[リクエスト] → [処理しながら送信] → [処理しながら送信] → ...
ユーザーの体験:
「こ」「の」「グ」「ラ」「フ」「を」「分」「析」「す」「る」「と」...
(文字が流れてくる)NDJSONという形式
JSONを1行ずつ送る
NDJSON(Newline Delimited JSON)は、JSONを1行ずつ送る形式です。
{"type":"text","content":"こんにちは"}
{"type":"text","content":"、画像を"}
{"type":"text","content":"分析します"}
{"type":"done"}各行が独立したJSONオブジェクトになっています。
なぜこの形式か:
- パースが簡単 - 改行で分割すればOK
- 部分読み取り可能 - 途中でも処理できる
- エラーに強い - 1行が壊れても他は読める
実際のデータ例
AIからのストリーミングレスポンスは、こんな形で流れてきます。
{"type":"ping"}
{"type":"text","content":"このグラフを分析すると、"}
{"type":"text","content":"以下の特徴が見られます。\n\n"}
{"type":"text","content":"1. **計画値との乖離**\n"}
{"type":"text","content":" 午前10時頃から実績値が計画を下回っています。"}
{"type":"ping"}
{"type":"text","content":"\n\n2. **異常な落ち込み**\n"}
{"type":"text","content":" 11時30分に急激な落ち込みがあります。"}
{"type":"done","usage":{"input_tokens":1500,"output_tokens":200}}Keepaliveの重要性
接続が切れる問題
ストリーミング中に、データが長時間流れないと接続が切れることがあります。
原因:
- ロードバランサーのタイムアウト
- プロキシサーバーの設定
- ブラウザの挙動例えば、AIが複雑な分析をしていて15秒間何も出力しないと、間にあるサーバーが「この接続は死んだ」と判断して切ってしまうことがあります。
解決策:定期的に信号を送る
10秒ごとに「ping」を送ることで、接続が生きていることを示します。
{"type":"text","content":"分析中..."}
{"type":"ping"} ← 10秒後
{"type":"ping"} ← 20秒後
{"type":"text","content":"結果が出ました!"}
{"type":"done"}pingは内容を含まない「空の信号」なので、ユーザーには表示されません。
バックエンド実装
FastAPIでストリーミング
# backend/streaming_app/main.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
import asyncio
import threading
import queue
app = FastAPI()
async def stream_response(
prompt: str,
image_url: str | None = None,
system_prompt: str = "",
):
"""
Bedrockからの応答をストリーミングで返す
"""
result_queue = queue.Queue()
error_holder = [None]
# 別スレッドでBedrock呼び出し(ブロッキング処理)
def invoke_in_thread():
try:
for chunk in invoke_bedrock_streaming(
prompt=prompt,
image_url=image_url,
system_prompt=system_prompt,
):
result_queue.put(chunk)
except Exception as e:
error_holder[0] = e
finally:
result_queue.put(None) # 終了シグナル
thread = threading.Thread(target=invoke_in_thread)
thread.start()
# Keepalive用のタイマー
last_send_time = asyncio.get_event_loop().time()
KEEPALIVE_INTERVAL = 10 # 10秒
while True:
current_time = asyncio.get_event_loop().time()
# キューからデータを取得(非ブロッキング)
try:
chunk = result_queue.get_nowait()
if chunk is None:
# 終了
if error_holder[0]:
yield json.dumps({
"type": "error",
"message": str(error_holder[0])
}) + "\n"
else:
yield json.dumps({"type": "done"}) + "\n"
break
# テキストチャンクを送信
yield json.dumps({
"type": "text",
"content": chunk
}) + "\n"
last_send_time = current_time
except queue.Empty:
# キューが空の場合
# Keepaliveが必要か確認
if current_time - last_send_time >= KEEPALIVE_INTERVAL:
yield json.dumps({"type": "ping"}) + "\n"
last_send_time = current_time
# 少し待つ
await asyncio.sleep(0.1)
@app.post("/analyze")
async def analyze(request: AnalyzeRequest):
"""
画像分析エンドポイント
"""
return StreamingResponse(
stream_response(
prompt=request.prompt,
image_url=request.image_url,
system_prompt=request.system_prompt,
),
media_type="application/x-ndjson",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Nginx対策
}
)ポイント解説
1. 別スレッドでBedrock呼び出し
thread = threading.Thread(target=invoke_in_thread)Bedrockの呼び出しはブロッキング処理です。メインスレッドで実行すると、Keepaliveが送れなくなります。別スレッドで実行することで、メインスレッドはKeepaliveを送り続けられます。
2. Queueで結果を受け渡し
result_queue.put(chunk) # スレッドから
chunk = result_queue.get_nowait() # メインからスレッド間でデータを安全にやり取りするため、Queueを使います。
3. 非ブロッキングで待つ
await asyncio.sleep(0.1)ビジーループ(100%CPU使用)を避けるため、少し待ちます。
フロントエンド実装
ReadableStreamで受け取る
// frontend/src/services/streamingClient.ts
type StreamChunk = {
type: 'text' | 'ping' | 'done' | 'error';
content?: string;
message?: string;
};
export async function fetchStreaming(
url: string,
body: object,
onChunk: (chunk: StreamChunk) => void,
): Promise<void> {
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${await getToken()}`,
},
body: JSON.stringify(body),
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}
const reader = response.body?.getReader();
if (!reader) {
throw new Error('ReadableStream not supported');
}
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
// 受信データをバッファに追加
buffer += decoder.decode(value, { stream: true });
// 改行で分割してパース
const lines = buffer.split('\n');
buffer = lines.pop() || ''; // 最後の不完全な行は保持
for (const line of lines) {
if (!line.trim()) continue;
try {
const chunk: StreamChunk = JSON.parse(line);
onChunk(chunk);
} catch (e) {
console.warn('Failed to parse:', line);
}
}
}
}UIへの反映
// frontend/src/components/ChatInterface.tsx
import { useState } from 'react';
import { fetchStreaming } from '../services/streamingClient';
export function ChatInterface() {
const [messages, setMessages] = useState<Message[]>([]);
const [currentResponse, setCurrentResponse] = useState('');
const [isLoading, setIsLoading] = useState(false);
const handleSend = async (prompt: string, imageUrl?: string) => {
setIsLoading(true);
setCurrentResponse('');
// ユーザーメッセージを追加
setMessages((prev) => [
...prev,
{ role: 'user', content: prompt, imageUrl }
]);
try {
await fetchStreaming(
'/api/analyze',
{ prompt, imageUrl },
(chunk) => {
switch (chunk.type) {
case 'text':
// 文字を追加していく
setCurrentResponse((prev) => prev + (chunk.content || ''));
break;
case 'ping':
// Keepaliveは無視
break;
case 'done':
// 完了:確定したメッセージとして追加
setMessages((prev) => [
...prev,
{ role: 'assistant', content: currentResponse }
]);
setCurrentResponse('');
break;
case 'error':
// エラー表示
alert(chunk.message || 'エラーが発生しました');
break;
}
}
);
} finally {
setIsLoading(false);
}
};
return (
<div>
{/* メッセージ一覧 */}
{messages.map((msg, i) => (
<MessageBubble key={i} message={msg} />
))}
{/* ストリーミング中の表示 */}
{currentResponse && (
<MessageBubble
message={{ role: 'assistant', content: currentResponse }}
isStreaming
/>
)}
{/* 入力欄 */}
<MessageInput onSend={handleSend} disabled={isLoading} />
</div>
);
}スクロール位置の調整
新しい文字が追加されたら、自動でスクロールします。
// frontend/src/components/MessageList.tsx
import { useRef, useEffect } from 'react';
export function MessageList({ messages, streamingContent }) {
const bottomRef = useRef<HTMLDivElement>(null);
// メッセージ追加時にスクロール
useEffect(() => {
bottomRef.current?.scrollIntoView({ behavior: 'smooth' });
}, [messages, streamingContent]);
return (
<div className="overflow-y-auto">
{messages.map((msg, i) => (
<MessageBubble key={i} message={msg} />
))}
{streamingContent && (
<MessageBubble
message={{ role: 'assistant', content: streamingContent }}
isStreaming
/>
)}
{/* スクロール位置のアンカー */}
<div ref={bottomRef} />
</div>
);
}トラブルシューティング
文字が一気に来る
原因: バッファリングが有効になっている
解決策:
# バックエンド側
headers={
"X-Accel-Buffering": "no", # Nginx
"Cache-Control": "no-cache",
}// フロントエンド側:fetchオプション確認
fetch(url, {
// cache: 'no-store' を追加
cache: 'no-store',
// ...
})途中で止まる
原因: Keepaliveが送られていない
解決策: Keepalive間隔を確認(10秒推奨)
KEEPALIVE_INTERVAL = 10 # 10秒以下に設定文字化けする
原因: マルチバイト文字が途中で切れている
解決策: TextDecoderでstream: trueを指定
const decoder = new TextDecoder();
buffer += decoder.decode(value, { stream: true }); // ← これが重要まとめ
ストリーミングの効果
| 観点 | 従来方式 | ストリーミング |
|---|---|---|
| 待ち時間の体感 | 長い | 短い |
| 処理の可視化 | 不可 | 可能 |
| 中断の容易さ | 難しい | 簡単 |
| 実装の複雑さ | 低 | 中 |
実装チェックリスト
バックエンド:
- NDJSON形式でのレスポンス
- Keepalive(10秒間隔)
- 別スレッドでのBedrock呼び出し
- バッファリング無効化ヘッダー
フロントエンド:
- ReadableStreamでの受信
- 改行での分割とパース
- ping(Keepalive)の無視
- UIへのリアルタイム反映
- 自動スクロール