ストリーミングによる高速化
ストリーミングによる高速化¶
このガイドでは、ユーザーの体感待ち時間を最小化するための「ストリーミング処理」について解説します。
概念¶
ストリーミング処理とは¶
ストリーミング処理とは、データが完全に揃うのを待たずに、部分的なデータが到着次第、順次処理を行う 手法です。
例えば、LLM(大規模言語モデル)が「こんにちは、今日はいい天気ですね。」という文章を生成する場合:
- 従来の処理(ブロッキング型): 文章全体が生成されるまで待ち、完成後に次の処理へ
- ストリーミング処理: 「こん」「こんにちは」「こんにちは、今日」...と部分的に生成されるたびに次の処理へ
【従来(ブロッキング型)】
ユーザー入力 → LLM完全出力(2.7秒) → 処理 → TTS完了 → 音声再生(3.3秒)
↑
ここで待機!ユーザーは無音で待たされる
【ストリーミング処理(並行型)】
ユーザー入力 → LLM部分出力(0.77秒) → 処理 → TTS開始 → 音声再生(1.5秒)
↓
続きを生成 → 処理 → TTS継続 → 音声継続
改善効果:
- 音声再生開始: 3.3秒 → 1.5秒(55%以上短縮)
- ユーザー体感の大幅な向上
is_last とは¶
is_last は、「この値の更新が最終的なものであるか」を示すフラグ です。
ストリーミング処理では、このis_lastを待たずに処理を行うことで、高速化を行います。
| is_last | 意味 |
|---|---|
False | この値は中間的なもので、今後さらに更新される可能性がある |
True | この値は最終的なもので、これ以上更新されない |
ストリーミングの例:
更新1: "こん" → is_last=False
更新2: "こんにちは" → is_last=False
更新3: "こんにちは、今日" → is_last=False
...
最終: "こんにちは、今日はいい天気ですね。" → is_last=True
is_last と処理タイミングの関係¶
Marionetteの各コンポーネントは、is_last の値に応じて 処理を開始するタイミング を制御できます。
graph LR
A[LLM<br/>ストリーミング出力] -->|is_last=False<br/>部分データ| B{次のItem}
A -->|is_last=True<br/>完全データ| B
B -->|realtime| C[即時処理]
B -->|gate/once| D[is_last=True<br/>まで待機]
style A fill:#e8f5e9
style C fill:#fff4e1
style D fill:#e1f5ff | 処理モード | 動作 | 用途 |
|---|---|---|
realtime | 値が更新されるたびに即時処理 | 低レイテンシが必要な場合(LLM→TTS) |
gate / once | is_last=True になるまで待機 | 完全なデータが必要な場合(構造化出力、データベース保存) |
各コンポーネントの設定¶
LLM(PublicLLM / GeppettoLLM / SystemLLM)¶
LLMコンポーネントは、streaming パラメータでストリーミング出力を制御します。
| パラメータ | 値 | 動作 |
|---|---|---|
streaming | true | 生成されたテキストを逐次出力(is_last=False で更新、最後に is_last=True) |
streaming | false | 全テキスト生成完了後に一括出力(is_last=True のみ) |
TTS3(音声合成)¶
TTSは内部的にストリーミング処理をサポートしており、text_stream の入力が更新されるたびに音声合成を進めます。
重要
TTSは30文字程度のまとまりで句読点(。!?など)を検出し、自然な区切りで音声を生成します。
EventResponse¶
EventResponseは、mode パラメータでクライアントへの送信タイミングを制御します。
| mode | 動作 | 用途 |
|---|---|---|
once | is_last=True になったら一度だけ送信 | 最終結果の送信 |
realtime(cumulative) | 更新のたびに累積データを送信 | テキストストリーミング |
realtime(incremental) | 更新のたびに差分データを送信 | 音声チャンク送信 |
StateSet¶
StateSetは、mode パラメータで更新タイミングを制御します。
| mode | 動作 | 用途 |
|---|---|---|
realtime | 値が更新されるたびに状態を更新 | リアルタイムな状態同期 |
once | すべての値が is_last=True になったら一括更新 | 最終結果の保存 |
LambdaFunc¶
LambdaFuncは、process_mode パラメータで処理タイミングを制御します。
| process_mode | 動作 | 用途 |
|---|---|---|
gate | 指定した gates の値がすべて is_last=True になったら実行 | 完全なデータが必要な処理 |
realtime | 指定した gates の値が更新されるたびに実行 | ストリーミング処理 |
immediate | 即時実行(入力を待たない) | 初期化処理 |
buffer辞書(realtimeモード専用)
realtime モードでは、buffer という辞書が自動的に提供されます。
この辞書を使って、関数の複数回の呼び出し間で状態を保持できます。
LambdaFunc応用編¶
Continue() と Break() について¶
realtime モードの LambdaFunc では、入力が更新されるたびに関数が呼び出されます。
このとき、処理の流れを制御する ために Continue() と Break() を使用できます。
| 制御コマンド | 動作 | 用途 |
|---|---|---|
Continue() | 現在の更新をスキップし、次の更新を待つ | データが不完全な場合(括弧が閉じていない等) |
Break() | ループを終了し、処理を完了する | 特定の条件で早期終了したい場合 |
ストリーミング入力: "【感情" → "【感情】こんにちは" → "【感情】こんにちは (心情" → "【感情】こんにちは (心情)"
Continue()の動作:
"【感情" → Continue() (【】が閉じていない)
"【感情】こんにちは" → Continue() (()がまだない、または閉じていない)
"【感情】こんにちは (心情" → Continue() (()が閉じていない)
"【感情】こんにちは (心情)" → 処理実行! → "こんにちは" を出力
注意
Continue() と Break() は **realtime モードでのみ有効**です。
gate モードでは、is_last=True になってから一度だけ関数が呼ばれるため、これらは使用しません。
例: 「【xxx】yyy (zzz)」から yyy を取り出す¶
LLMが 【感情】発話内容 (心の声) のような形式で出力する場合に、発話内容(yyy)だけを取り出す例です。
非ストリーミング版(process_mode: gate)¶
is_last=True になってから一度だけ処理します。完全なデータが揃っているため、シンプルに正規表現で抽出できます。
async def extract_speech_gate(input_message, output_message):
"""
【xxx】yyy (zzz) から yyy を取り出す(非ストリーミング版)
入力例: "【嬉しい】こんにちは、元気ですか? (本当は眠い)"
出力例: "こんにちは、元気ですか?"
"""
# import re <- 独自importは禁止。大体のライブラリはすでにimport済み
# gate モードなので、この時点で完全なデータが揃っている
text = input_message.values.response.x
# 【xxx】を除去
text = re.sub(r'【[^】]*】', '', text)
# (zzz) を除去
text = re.sub(r'\s*[((][^))]*[))]', '', text)
# 前後の空白を除去
result = text.strip()
# 結果を出力(一度だけなので is_last=True)
await output_message.update('response', result, is_last=True)
ストリーミング版(process_mode: realtime)¶
ストリーミングでは、データが部分的に到着するため、括弧が閉じるまで待つ か、括弧が開いた時点で早期終了 する必要があります。
ここで Continue() と Break() を使用します。
async def extract_speech_realtime(input_message, output_message):
"""
【xxx】yyy (zzz) から yyy を取り出す(ストリーミング版)
入力の流れ:
"【嬉し" → Continue()(【】が閉じていない)
"【嬉しい】" → Continue()(本文がまだない)
"【嬉しい】こんにちは" → "こんにちは" を出力 (is_last=False)
"【嬉しい】こんにちは (本当" → "こんにちは" を出力 (is_last=True) + Break()
(以降は処理されない)
ポイント: (が出現したら、その前までを出力してループ終了
"""
# import re <- 独自importは禁止。大体のライブラリはすでにimport済み
text = input_message.values.response.x
is_last = input_message.is_last('response')
# --- 【】の処理 ---
# 【が開いているが】が閉じていない場合は待機
if '【' in text and '】' not in text:
return Continue()
# 【xxx】を除去
text_without_bracket = re.sub(r'【[^】]*】', '', text)
# 本文がまだない場合は待機
if not text_without_bracket.strip():
return Continue()
# --- ()の処理 ---
# (または(が出現したら、その前までを出力して早期終了
if '(' in text_without_bracket or '(' in text_without_bracket:
# (以前の部分を抽出
result = re.split(r'[((]', text_without_bracket)[0]
await output_message.update('response', result.strip(), is_last=True)
return Break() # ループを終了
# 括弧がない場合は通常出力
await output_message.update('response', text_without_bracket.strip(), is_last=is_last)
buffer の使い方¶
realtime モードでは、関数が複数回呼び出されます。呼び出し間で状態を保持するために、buffer 辞書が自動的に提供されます。
基本的な使い方¶
async def my_func(input_message, output_message):
# buffer は realtime モードで自動的に提供される辞書
# 初回呼び出し時に初期化
if 'count' not in buffer:
buffer['count'] = 0
buffer['prev_text'] = ""
# 状態を更新
buffer['count'] += 1
# 処理...
buffer が必要なケース¶
| ケース | 説明 | 例 |
|---|---|---|
| 差分検出 | 前回の入力と比較して新しい部分だけを処理 | 文字列の増分だけを抽出 |
| データ蓄積 | 複数回の入力を結合して処理 | 文区切りバッファリング |
| カウンター | 呼び出し回数や処理回数を記録 | デバッグ、制限処理 |
例: 差分だけを出力する¶
async def output_diff(input_message, output_message):
"""
前回からの差分(新しく追加された文字)だけを出力する
入力の流れ:
"こん" → "こん" を出力
"こんにち" → "にち" を出力(差分)
"こんにちは" → "は" を出力(差分)
"""
text = input_message.values.response.x
is_last = input_message.is_last('response')
# 前回のテキストを保持
if 'prev_text' not in buffer:
buffer['prev_text'] = ""
# 差分を計算
diff = text[len(buffer['prev_text']):]
buffer['prev_text'] = text
# 差分がある場合のみ出力
if diff:
await output_message.update('response', diff, is_last=is_last)
# 終了時にリセット
if is_last:
buffer['prev_text'] = ""
注意
buffer は **realtime モード専用**です。gate モードでは関数が一度しか呼ばれないため、状態保持の必要がありません。
実践的なドラマ構成例 (ストリーミング化)¶
例1: シンプルなストリーミング(LLM → TTS → 出力 + 履歴保存)¶
graph LR
A[Entry] --> B[DialogHistory]
B --> C[LLM<br/>streaming=true]
C -->|streaming| D[TTS3]
D -->|streaming| E[EventResponse]
C -->|is_last=Trueで追加| F[DialogAdd]
style C fill:#e8f5e9
style D fill:#e1f5ff
style E fill:#ffe8e8
style F fill:#f0f0f0 主要なコンポーネント設定:
| コンポーネント | 設定 | 説明 |
|---|---|---|
| LLM | streaming: true | ストリーミング出力を有効化 |
| TTS3 | (デフォルト) | ストリーミング入力を自動処理 |
| EventResponse | mode: "realtime(incremental)" | 増分データを順次送信(デフォルト) |
| DialogAdd | (デフォルト) | is_last=True で履歴追加 |
処理の流れ:
- LLMがストリーミング出力 →
is_last=Falseで順次出力 - TTSが音声合成を開始 → 音声チャンクを
is_last=Falseで出力 - EventResponseがクライアントに送信 → ユーザーは音声を聞き始める
- LLMが完了 →
is_last=Trueで出力 - DialogAddは
is_last=Trueになったときに会話履歴に追加
DialogAddのストリーミング動作
DialogAddは、role, text, meta のすべてが is_last=True になったときに会話履歴に追加します。
ストリーミング中(is_last=False)は履歴に追加されません。
例2: テキスト整形して出力(発話部分のみ抽出)¶
LLMが 【感情】発話内容 (心の声) 形式で出力する場合、発話内容のみを抽出してTTSに渡し、履歴にも発話内容のみを保存します。
graph LR
A[Entry] --> B[DialogHistory]
B --> C[LLM<br/>streaming=true]
C -->|streaming| D[LambdaFunc<br/>realtime<br/>bbを抽出]
D -->|streaming| E[EventResponse]
D -->|is_last=Trueで追加| F[DialogAdd<br/>text=bb]
style C fill:#e8f5e9
style D fill:#fff4e1
style E fill:#ffe8e8
style F fill:#f0f0f0 主要なコンポーネント設定:
| コンポーネント | 設定 | 説明 |
|---|---|---|
| LLM | streaming: true | ストリーミング出力を有効化 |
| LambdaFunc | process_mode: realtime | 更新のたびに実行 |
| EventResponse | mode: "realtime(incremental)" | 増分データを順次送信(デフォルト) |
| DialogAdd | (デフォルト) | is_last=True で履歴追加 |
LambdaFunc設定(realtime):
async def extract_speech(input_message, output_message):
"""
【aa】bb (cc) から bb を取り出す
"""
import re
text = input_message.values.response.x
is_last = input_message.is_last('response')
# 【】が閉じていない場合は待機
if '【' in text and '】' not in text:
return Continue()
# 【aa】を除去
text = re.sub(r'【[^】]*】', '', text)
# (が来たら、その前までを出力してループ終了
if '(' in text or '(' in text:
text = re.split(r'[((]', text)[0]
await output_message.update('response', text.strip(), is_last=True)
return Break()
await output_message.update('response', text.strip(), is_last=is_last)
処理の流れ:
| 入力 | 出力 | 説明 |
|---|---|---|
"【嬉し" | Continue() | 【】が閉じていない |
"【嬉しい】こんにち" | "こんにち" (is_last=False) | 発話部分を抽出 |
"【嬉しい】こんにちは (わく" | "こんにちは" (is_last=True) + Break() | (を検出、早期終了 |
DialogAdd: is_last=True で "こんにちは" が履歴に追加される
例3: 感情・発話・心の声を分離して出力¶
LLMが 【感情】発話内容 (心の声) 形式で出力する場合、それぞれを分離して 別々のEventResponse でクライアントに送信します。
graph LR
A[Entry] --> B[DialogHistory]
B --> C[LLM<br/>streaming=true]
C -->|streaming| D[LambdaFunc<br/>realtime]
D -->|emotion| E1[EventResponse<br/>mode=once]
D -->|text streaming| E2[EventResponse<br/>mode=realtime]
D -->|thought| E3[EventResponse<br/>mode=once]
D -->|is_last=True| F[DialogAdd]
style C fill:#e8f5e9
style D fill:#fff4e1
style E1 fill:#ffe8e8
style E2 fill:#ffe8e8
style E3 fill:#ffe8e8
style F fill:#f0f0f0 主要なコンポーネント設定:
| コンポーネント | 設定 | 説明 |
|---|---|---|
| LLM | streaming: true | ストリーミング出力を有効化 |
| LambdaFunc | process_mode: realtime | 更新のたびに実行 |
| EventResponse (emotion) | mode: "once" | emotionが is_last=True になったら1回送信 |
| EventResponse (text) | mode: "realtime(incremental)" | textを順次ストリーミング送信 |
| EventResponse (thought) | mode: "once" | thoughtが is_last=True になったら1回送信 |
| DialogAdd | text: $text, meta: $meta | text と meta を受け取って履歴追加 |
各EventResponseの動作:
| EventResponse | mode | 送信タイミング | 送信回数 |
|---|---|---|---|
| event_emotion | once | emotionが is_last=True になったとき | 1回 |
| event_text | realtime(incremental) | textが更新されるたびに | 複数回 |
| event_thought | once | thoughtが is_last=True になったとき | 1回 |
LambdaFunc設定(realtime):
async def parse_emotion_text(input_message, output_message):
"""
【aa】bb (cc) を aa, bb, cc に分離する
送信タイミング:
- emotion: 【】が閉じたら確定送信
- text: (が来たら確定送信(それまではストリーミング)
- thought: ()が閉じたら確定送信
"""
import re
input_text = input_message.values.response.x
is_last = input_message.is_last('response')
# --- buffer初期化 ---
if 'emotion_sent' not in buffer:
buffer['emotion_sent'] = False
buffer['text_sent'] = False
buffer['thought_sent'] = False
buffer['emotion'] = ""
buffer['thought'] = ""
# --- 【】が閉じていない場合は待機 ---
if '【' in input_text and '】' not in input_text:
return Continue()
# --- 感情の抽出と送信 ---
match = re.search(r'【(.+?)】', input_text)
if match:
buffer['emotion'] = match.group(1)
# emotionは一度だけis_last=Trueで送信
if not buffer['emotion_sent']:
await output_message.update('emotion', buffer['emotion'], is_last=True)
buffer['emotion_sent'] = True
# --- 【aa】を除去した残り ---
remaining = re.sub(r'【[^】]*】', '', input_text)
# --- 括弧の有無を確認 ---
has_open_paren = '(' in remaining or '(' in remaining
open_count = remaining.count('(') + remaining.count('(')
close_count = remaining.count(')') + remaining.count(')')
paren_closed = open_count > 0 and open_count == close_count
# --- 発話部分((の前)を抽出 ---
if has_open_paren:
text_content = re.split(r'[((]', remaining)[0].strip()
else:
text_content = remaining.strip()
# --- textの送信 ---
if not buffer['text_sent']:
if has_open_paren:
# (が来たらtextを確定送信
if text_content:
await output_message.update('text', text_content, is_last=True)
buffer['text_sent'] = True
elif text_content:
# (がまだ来ていない場合はストリーミング送信
await output_message.update('text', text_content, is_last=False)
# --- 心の声の抽出と送信 ---
if paren_closed and not buffer['thought_sent']:
thought_match = re.search(r'[((](.+?)[))]', remaining)
buffer['thought'] = thought_match.group(1) if thought_match else ""
await output_message.update('thought', buffer['thought'], is_last=True)
buffer['thought_sent'] = True
# --- ()が閉じていない場合は待機 ---
if has_open_paren and not paren_closed:
return Continue()
# --- DialogAdd用のmeta(is_last=Trueのときのみ意味がある)---
if is_last:
await output_message.update('meta', {
'emotion': buffer['emotion'],
'thought': buffer['thought']
}, is_last=True)
# bufferリセット
buffer['emotion_sent'] = False
buffer['text_sent'] = False
buffer['thought_sent'] = False
buffer['emotion'] = ""
buffer['thought'] = ""
処理の流れ:
| 入力 | emotion | text | thought | 説明 |
|---|---|---|---|---|
"【嬉し" | - | - | - | Continue()(【】が閉じていない) |
"【嬉しい】" | "嬉しい" ✓ | - | - | emotion確定送信(textは空なので送信しない) |
"【嬉しい】こんにち" | - | "こんにち" | - | textストリーミング |
"【嬉しい】こんにちは(わく" | - | "こんにちは" ✓ | - | text確定送信((が来た)+ Continue() |
"【嬉しい】こんにちは(わくわく)" | - | - | "わくわく" ✓ | thought確定送信 |
※ ✓ は is_last=True で送信
クライアントへの送信順序:
1. emotion: "嬉しい" (is_last=True) ← 感情が確定
2. text: "こんにち" (is_last=False) ← テキストストリーミング
3. text: "こんにちは" (is_last=True) ← (が来たのでテキスト確定
4. thought: "わくわく" (is_last=True) ← 心の声が確定
5. meta: {...} (is_last=True) ← DialogAdd用メタデータ
DialogAdd:
ストリーミングとDialogAddの組み合わせ
- emotion: 確定した時点で即座に送信(表情変更などに使用)
- text: ストリーミングで順次送信(低レイテンシでテキスト表示)
- thought: 確定した時点で送信(心の声の表示)
- DialogAdd: すべてが確定した
is_last=True時点で履歴に保存
例4: 感情タグを分離してTTSに渡す¶
LLMが [emotion:xxx]セリフ 形式で出力する場合、感情タグ(decoration_id)と発話テキストを分離してTTS3Chainに渡します。
TTS3Chainの待機動作:
TTS3Chainは、decoration_id設定にvalues.decoration_idが含まれている場合、その値のis_last=Trueを**自動的に待機**してから音声合成を開始します。これにより、LambdaFuncでdecoration_idを確定させてからTTSが処理を開始することが保証されます。
graph LR
A[Entry] --> B[DialogHistory]
B --> C[LLM<br/>streaming=true]
C -->|streaming| D[LambdaFunc<br/>realtime<br/>タグ分離]
D -->|decoration_id<br/>is_last=True| E[TTS3<br/>待機後に開始]
D -->|response<br/>streaming| E
E -->|audio streaming| F[EventResponse]
style C fill:#e8f5e9
style D fill:#fff4e1
style E fill:#e1f5ff
style F fill:#ffe8e8 TTS3の設定:
decoration_id: "{{ values.decoration_id if values.decoration_id else 'neutral' }}"
text_stream: "{{values.response}}"
TTS3Chainの自動待機
decoration_id設定にvalues.decoration_idが含まれている場合、TTS3Chainは自動的にその値が確定する(is_last=Trueになる)まで待機します。これにより、正しい感情で音声合成が行われます。
LambdaFunc設定(realtime):
async def extract_decoration_id(input_message, output_message):
"""
セリフから [emotion:xxx] を抽出し、テキストと decoration_id を分離する
入力例: "[emotion:laughing_speech]ハッハー!「よろー」だって!?"
出力:
- decoration_id: "laughing_speech" (is_last=True)
- response: "ハッハー!「よろー」だって!?" (streaming)
"""
text = input_message.values.response.x
is_last = input_message.is_last('response')
# --- タグが閉じていない場合は待機 ---
if '[' in text and ']' not in text:
return Continue()
# --- buffer初期化 ---
if 'decoration_id_sent' not in buffer:
buffer['decoration_id_sent'] = False
buffer['decoration_id'] = 'neutral' # デフォルト値
# --- decoration_id の抽出(一度だけ送信)---
match = re.search(r'\[(?:emotion|decoration_id):([^\]]+)\]', text)
if match and not buffer['decoration_id_sent']:
buffer['decoration_id'] = match.group(1).strip()
# ★ decoration_idを先に確定送信(TTS3がこれを待つ)
await output_message.update('decoration_id', buffer['decoration_id'], is_last=True)
buffer['decoration_id_sent'] = True
# タグを除去してテキストを取得
clean_text = re.sub(r'\[[^\]]*\]', '', text).strip()
# テキストがまだない場合は待機
if not clean_text:
return Continue()
# ★ responseをストリーミング送信(TTS3はdecoration_id確定後に処理開始)
await output_message.update('response', clean_text, is_last=is_last)
処理の流れ:
| LLM出力 | decoration_id | response | TTS3動作 |
|---|---|---|---|
"[emotion:laugh" | - | - | Continue()(タグ未閉じ) |
"[emotion:laughing_speech]" | "laughing_speech" ✓ | - | 待機中... |
"[emotion:laughing_speech]ハッハー" | - | "ハッハー" | 音声合成開始! |
"[emotion:laughing_speech]ハッハー!..." | - | "ハッハー!..." ✓ | 音声合成継続 |
※ ✓ は is_last=True で送信
ポイント:
decoration_idを最初にis_last=Trueで送信(一度だけ)- TTS3Chainは
values.decoration_idの確定を自動的に待機 decoration_id確定後、responseのストリーミングに応じて音声合成を開始- 正しい感情(
laughing_speech)で音声が再生される