コンテンツにスキップ

ストリーミングによる高速化

ストリーミングによる高速化

このガイドでは、ユーザーの体感待ち時間を最小化するための「ストリーミング処理」について解説します。


概念

ストリーミング処理とは

ストリーミング処理とは、データが完全に揃うのを待たずに、部分的なデータが到着次第、順次処理を行う 手法です。

例えば、LLM(大規模言語モデル)が「こんにちは、今日はいい天気ですね。」という文章を生成する場合:

  • 従来の処理(ブロッキング型): 文章全体が生成されるまで待ち、完成後に次の処理へ
  • ストリーミング処理: 「こん」「こんにちは」「こんにちは、今日」...と部分的に生成されるたびに次の処理へ
【従来(ブロッキング型)】
ユーザー入力 → LLM完全出力(2.7秒) → 処理 → TTS完了 → 音声再生(3.3秒)
            ここで待機!ユーザーは無音で待たされる

【ストリーミング処理(並行型)】
ユーザー入力 → LLM部分出力(0.77秒) → 処理 → TTS開始 → 音声再生(1.5秒)
                  続きを生成 → 処理 → TTS継続 → 音声継続

改善効果:
- 音声再生開始: 3.3秒 → 1.5秒(55%以上短縮
- ユーザー体感の大幅な向上


is_last とは

is_last は、「この値の更新が最終的なものであるか」を示すフラグ です。
ストリーミング処理では、このis_lastを待たずに処理を行うことで、高速化を行います。

is_last 意味
False この値は中間的なもので、今後さらに更新される可能性がある
True この値は最終的なもので、これ以上更新されない

ストリーミングの例:

更新1: "こん"           → is_last=False
更新2: "こんにちは"     → is_last=False
更新3: "こんにちは、今日" → is_last=False
...
最終:  "こんにちは、今日はいい天気ですね。" → is_last=True

is_last と処理タイミングの関係

Marionetteの各コンポーネントは、is_last の値に応じて 処理を開始するタイミング を制御できます。

graph LR
    A[LLM<br/>ストリーミング出力] -->|is_last=False<br/>部分データ| B{次のItem}
    A -->|is_last=True<br/>完全データ| B

    B -->|realtime| C[即時処理]
    B -->|gate/once| D[is_last=True<br/>まで待機]

    style A fill:#e8f5e9
    style C fill:#fff4e1
    style D fill:#e1f5ff
処理モード 動作 用途
realtime 値が更新されるたびに即時処理 低レイテンシが必要な場合(LLM→TTS)
gate / once is_last=True になるまで待機 完全なデータが必要な場合(構造化出力、データベース保存)

各コンポーネントの設定

LLM(PublicLLM / GeppettoLLM / SystemLLM)

LLMコンポーネントは、streaming パラメータでストリーミング出力を制御します。

パラメータ 動作
streaming true 生成されたテキストを逐次出力(is_last=False で更新、最後に is_last=True
streaming false 全テキスト生成完了後に一括出力(is_last=True のみ)

TTS3(音声合成)

TTSは内部的にストリーミング処理をサポートしており、text_stream の入力が更新されるたびに音声合成を進めます。

重要

TTSは30文字程度のまとまりで句読点(。!?など)を検出し、自然な区切りで音声を生成します。

EventResponse

EventResponseは、mode パラメータでクライアントへの送信タイミングを制御します。

mode 動作 用途
once is_last=True になったら一度だけ送信 最終結果の送信
realtime(cumulative) 更新のたびに累積データを送信 テキストストリーミング
realtime(incremental) 更新のたびに差分データを送信 音声チャンク送信

StateSet

StateSetは、mode パラメータで更新タイミングを制御します。

mode 動作 用途
realtime 値が更新されるたびに状態を更新 リアルタイムな状態同期
once すべての値が is_last=True になったら一括更新 最終結果の保存

LambdaFunc

LambdaFuncは、process_mode パラメータで処理タイミングを制御します。

process_mode 動作 用途
gate 指定した gates の値がすべて is_last=True になったら実行 完全なデータが必要な処理
realtime 指定した gates の値が更新されるたびに実行 ストリーミング処理
immediate 即時実行(入力を待たない) 初期化処理

buffer辞書(realtimeモード専用)

realtime モードでは、buffer という辞書が自動的に提供されます。
この辞書を使って、関数の複数回の呼び出し間で状態を保持できます。

# bufferの使用例
if 'count' not in buffer:
    buffer['count'] = 0
buffer['count'] += 1

LambdaFunc応用編

Continue() と Break() について

realtime モードの LambdaFunc では、入力が更新されるたびに関数が呼び出されます。
このとき、処理の流れを制御する ために Continue()Break() を使用できます。

制御コマンド 動作 用途
Continue() 現在の更新をスキップし、次の更新を待つ データが不完全な場合(括弧が閉じていない等)
Break() ループを終了し、処理を完了する 特定の条件で早期終了したい場合
ストリーミング入力: "【感情" → "【感情】こんにちは" → "【感情】こんにちは (心情" → "【感情】こんにちは (心情)"

Continue()の動作:
  "【感情"           → Continue() (【】が閉じていない)
  "【感情】こんにちは" → Continue() (()がまだない、または閉じていない)
  "【感情】こんにちは (心情" → Continue() (()が閉じていない)
  "【感情】こんにちは (心情)" → 処理実行! → "こんにちは" を出力

注意

Continue()Break() は **realtime モードでのみ有効**です。
gate モードでは、is_last=True になってから一度だけ関数が呼ばれるため、これらは使用しません。

例: 「【xxx】yyy (zzz)」から yyy を取り出す

LLMが 【感情】発話内容 (心の声) のような形式で出力する場合に、発話内容(yyy)だけを取り出す例です。

非ストリーミング版(process_mode: gate)

is_last=True になってから一度だけ処理します。完全なデータが揃っているため、シンプルに正規表現で抽出できます。

async def extract_speech_gate(input_message, output_message):
    """
    【xxx】yyy (zzz) から yyy を取り出す(非ストリーミング版)

    入力例: "【嬉しい】こんにちは、元気ですか? (本当は眠い)"
    出力例: "こんにちは、元気ですか?"
    """
    # import re <- 独自importは禁止。大体のライブラリはすでにimport済み

    # gate モードなので、この時点で完全なデータが揃っている
    text = input_message.values.response.x

    # 【xxx】を除去
    text = re.sub(r'【[^】]*】', '', text)

    # (zzz) を除去
    text = re.sub(r'\s*[((][^))]*[))]', '', text)

    # 前後の空白を除去
    result = text.strip()

    # 結果を出力(一度だけなので is_last=True)
    await output_message.update('response', result, is_last=True)
ストリーミング版(process_mode: realtime)

ストリーミングでは、データが部分的に到着するため、括弧が閉じるまで待つ か、括弧が開いた時点で早期終了 する必要があります。
ここで Continue()Break() を使用します。

async def extract_speech_realtime(input_message, output_message):
    """
    【xxx】yyy (zzz) から yyy を取り出す(ストリーミング版)

    入力の流れ:
      "【嬉し"                    → Continue()(【】が閉じていない)
      "【嬉しい】"                → Continue()(本文がまだない)
      "【嬉しい】こんにちは"       → "こんにちは" を出力 (is_last=False)
      "【嬉しい】こんにちは (本当" → "こんにちは" を出力 (is_last=True) + Break()
      (以降は処理されない)

    ポイント: (が出現したら、その前までを出力してループ終了
    """
    # import re <- 独自importは禁止。大体のライブラリはすでにimport済み

    text = input_message.values.response.x
    is_last = input_message.is_last('response')

    # --- 【】の処理 ---
    # 【が開いているが】が閉じていない場合は待機
    if '【' in text and '】' not in text:
        return Continue()

    # 【xxx】を除去
    text_without_bracket = re.sub(r'【[^】]*】', '', text)

    # 本文がまだない場合は待機
    if not text_without_bracket.strip():
        return Continue()

    # --- ()の処理 ---
    # (または(が出現したら、その前までを出力して早期終了
    if '(' in text_without_bracket or '(' in text_without_bracket:
        # (以前の部分を抽出
        result = re.split(r'[((]', text_without_bracket)[0]
        await output_message.update('response', result.strip(), is_last=True)
        return Break()  # ループを終了

    # 括弧がない場合は通常出力
    await output_message.update('response', text_without_bracket.strip(), is_last=is_last)

buffer の使い方

realtime モードでは、関数が複数回呼び出されます。呼び出し間で状態を保持するために、buffer 辞書が自動的に提供されます。

基本的な使い方
async def my_func(input_message, output_message):
    # buffer は realtime モードで自動的に提供される辞書
    # 初回呼び出し時に初期化
    if 'count' not in buffer:
        buffer['count'] = 0
        buffer['prev_text'] = ""

    # 状態を更新
    buffer['count'] += 1

    # 処理...
buffer が必要なケース
ケース 説明
差分検出 前回の入力と比較して新しい部分だけを処理 文字列の増分だけを抽出
データ蓄積 複数回の入力を結合して処理 文区切りバッファリング
カウンター 呼び出し回数や処理回数を記録 デバッグ、制限処理
例: 差分だけを出力する
async def output_diff(input_message, output_message):
    """
    前回からの差分(新しく追加された文字)だけを出力する

    入力の流れ:
      "こん"           → "こん" を出力
      "こんにち"       → "にち" を出力(差分)
      "こんにちは"     → "は" を出力(差分)
    """
    text = input_message.values.response.x
    is_last = input_message.is_last('response')

    # 前回のテキストを保持
    if 'prev_text' not in buffer:
        buffer['prev_text'] = ""

    # 差分を計算
    diff = text[len(buffer['prev_text']):]
    buffer['prev_text'] = text

    # 差分がある場合のみ出力
    if diff:
        await output_message.update('response', diff, is_last=is_last)

    # 終了時にリセット
    if is_last:
        buffer['prev_text'] = ""

注意

buffer は **realtime モード専用**です。gate モードでは関数が一度しか呼ばれないため、状態保持の必要がありません。


実践的なドラマ構成例 (ストリーミング化)

例1: シンプルなストリーミング(LLM → TTS → 出力 + 履歴保存)

graph LR
    A[Entry] --> B[DialogHistory]
    B --> C[LLM<br/>streaming=true]
    C -->|streaming| D[TTS3]
    D -->|streaming| E[EventResponse]
    C -->|is_last=Trueで追加| F[DialogAdd]

    style C fill:#e8f5e9
    style D fill:#e1f5ff
    style E fill:#ffe8e8
    style F fill:#f0f0f0

主要なコンポーネント設定:

コンポーネント 設定 説明
LLM streaming: true ストリーミング出力を有効化
TTS3 (デフォルト) ストリーミング入力を自動処理
EventResponse mode: "realtime(incremental)" 増分データを順次送信(デフォルト)
DialogAdd (デフォルト) is_last=True で履歴追加

処理の流れ:

  1. LLMがストリーミング出力 → is_last=False で順次出力
  2. TTSが音声合成を開始 → 音声チャンクを is_last=False で出力
  3. EventResponseがクライアントに送信 → ユーザーは音声を聞き始める
  4. LLMが完了 → is_last=True で出力
  5. DialogAddは is_last=True になったときに会話履歴に追加

DialogAddのストリーミング動作

DialogAddは、role, text, meta のすべてが is_last=True になったときに会話履歴に追加します。
ストリーミング中(is_last=False)は履歴に追加されません。


例2: テキスト整形して出力(発話部分のみ抽出)

LLMが 【感情】発話内容 (心の声) 形式で出力する場合、発話内容のみを抽出してTTSに渡し、履歴にも発話内容のみを保存します。

graph LR
    A[Entry] --> B[DialogHistory]
    B --> C[LLM<br/>streaming=true]
    C -->|streaming| D[LambdaFunc<br/>realtime<br/>bbを抽出]
    D -->|streaming| E[EventResponse]
    D -->|is_last=Trueで追加| F[DialogAdd<br/>text=bb]

    style C fill:#e8f5e9
    style D fill:#fff4e1
    style E fill:#ffe8e8
    style F fill:#f0f0f0

主要なコンポーネント設定:

コンポーネント 設定 説明
LLM streaming: true ストリーミング出力を有効化
LambdaFunc process_mode: realtime 更新のたびに実行
EventResponse mode: "realtime(incremental)" 増分データを順次送信(デフォルト)
DialogAdd (デフォルト) is_last=True で履歴追加

LambdaFunc設定(realtime):

async def extract_speech(input_message, output_message):
    """
    【aa】bb (cc) から bb を取り出す
    """
    import re

    text = input_message.values.response.x
    is_last = input_message.is_last('response')

    # 【】が閉じていない場合は待機
    if '【' in text and '】' not in text:
        return Continue()

    # 【aa】を除去
    text = re.sub(r'【[^】]*】', '', text)

    # (が来たら、その前までを出力してループ終了
    if '(' in text or '(' in text:
        text = re.split(r'[((]', text)[0]
        await output_message.update('response', text.strip(), is_last=True)
        return Break()

    await output_message.update('response', text.strip(), is_last=is_last)

処理の流れ:

入力 出力 説明
"【嬉し" Continue() 【】が閉じていない
"【嬉しい】こんにち" "こんにち" (is_last=False) 発話部分を抽出
"【嬉しい】こんにちは (わく" "こんにちは" (is_last=True) + Break() (を検出、早期終了

DialogAdd: is_last=True"こんにちは" が履歴に追加される


例3: 感情・発話・心の声を分離して出力

LLMが 【感情】発話内容 (心の声) 形式で出力する場合、それぞれを分離して 別々のEventResponse でクライアントに送信します。

graph LR
    A[Entry] --> B[DialogHistory]
    B --> C[LLM<br/>streaming=true]
    C -->|streaming| D[LambdaFunc<br/>realtime]
    D -->|emotion| E1[EventResponse<br/>mode=once]
    D -->|text streaming| E2[EventResponse<br/>mode=realtime]
    D -->|thought| E3[EventResponse<br/>mode=once]
    D -->|is_last=True| F[DialogAdd]

    style C fill:#e8f5e9
    style D fill:#fff4e1
    style E1 fill:#ffe8e8
    style E2 fill:#ffe8e8
    style E3 fill:#ffe8e8
    style F fill:#f0f0f0

主要なコンポーネント設定:

コンポーネント 設定 説明
LLM streaming: true ストリーミング出力を有効化
LambdaFunc process_mode: realtime 更新のたびに実行
EventResponse (emotion) mode: "once" emotionが is_last=True になったら1回送信
EventResponse (text) mode: "realtime(incremental)" textを順次ストリーミング送信
EventResponse (thought) mode: "once" thoughtが is_last=True になったら1回送信
DialogAdd text: $text, meta: $meta text と meta を受け取って履歴追加

各EventResponseの動作:

EventResponse mode 送信タイミング 送信回数
event_emotion once emotionが is_last=True になったとき 1回
event_text realtime(incremental) textが更新されるたびに 複数回
event_thought once thoughtが is_last=True になったとき 1回

LambdaFunc設定(realtime):

async def parse_emotion_text(input_message, output_message):
    """
    【aa】bb (cc) を aa, bb, cc に分離する

    送信タイミング:
      - emotion: 【】が閉じたら確定送信
      - text: (が来たら確定送信(それまではストリーミング)
      - thought: ()が閉じたら確定送信
    """
    import re

    input_text = input_message.values.response.x
    is_last = input_message.is_last('response')

    # --- buffer初期化 ---
    if 'emotion_sent' not in buffer:
        buffer['emotion_sent'] = False
        buffer['text_sent'] = False
        buffer['thought_sent'] = False
        buffer['emotion'] = ""
        buffer['thought'] = ""

    # --- 【】が閉じていない場合は待機 ---
    if '【' in input_text and '】' not in input_text:
        return Continue()

    # --- 感情の抽出と送信 ---
    match = re.search(r'【(.+?)】', input_text)
    if match:
        buffer['emotion'] = match.group(1)

    # emotionは一度だけis_last=Trueで送信
    if not buffer['emotion_sent']:
        await output_message.update('emotion', buffer['emotion'], is_last=True)
        buffer['emotion_sent'] = True

    # --- 【aa】を除去した残り ---
    remaining = re.sub(r'【[^】]*】', '', input_text)

    # --- 括弧の有無を確認 ---
    has_open_paren = '(' in remaining or '(' in remaining
    open_count = remaining.count('(') + remaining.count('(')
    close_count = remaining.count(')') + remaining.count(')')
    paren_closed = open_count > 0 and open_count == close_count

    # --- 発話部分((の前)を抽出 ---
    if has_open_paren:
        text_content = re.split(r'[((]', remaining)[0].strip()
    else:
        text_content = remaining.strip()

    # --- textの送信 ---
    if not buffer['text_sent']:
        if has_open_paren:
            # (が来たらtextを確定送信
            if text_content:
                await output_message.update('text', text_content, is_last=True)
            buffer['text_sent'] = True
        elif text_content:
            # (がまだ来ていない場合はストリーミング送信
            await output_message.update('text', text_content, is_last=False)

    # --- 心の声の抽出と送信 ---
    if paren_closed and not buffer['thought_sent']:
        thought_match = re.search(r'[((](.+?)[))]', remaining)
        buffer['thought'] = thought_match.group(1) if thought_match else ""
        await output_message.update('thought', buffer['thought'], is_last=True)
        buffer['thought_sent'] = True

    # --- ()が閉じていない場合は待機 ---
    if has_open_paren and not paren_closed:
        return Continue()

    # --- DialogAdd用のmeta(is_last=Trueのときのみ意味がある)---
    if is_last:
        await output_message.update('meta', {
            'emotion': buffer['emotion'],
            'thought': buffer['thought']
        }, is_last=True)
        # bufferリセット
        buffer['emotion_sent'] = False
        buffer['text_sent'] = False
        buffer['thought_sent'] = False
        buffer['emotion'] = ""
        buffer['thought'] = ""

処理の流れ:

入力 emotion text thought 説明
"【嬉し" - - - Continue()(【】が閉じていない)
"【嬉しい】" "嬉しい" - - emotion確定送信(textは空なので送信しない)
"【嬉しい】こんにち" - "こんにち" - textストリーミング
"【嬉しい】こんにちは(わく" - "こんにちは" - text確定送信((が来た)+ Continue()
"【嬉しい】こんにちは(わくわく)" - - "わくわく" thought確定送信

※ ✓ は is_last=True で送信

クライアントへの送信順序:

1. emotion: "嬉しい" (is_last=True)   ← 感情が確定
2. text: "こんにち" (is_last=False)   ← テキストストリーミング
3. text: "こんにちは" (is_last=True)  ← (が来たのでテキスト確定
4. thought: "わくわく" (is_last=True) ← 心の声が確定
5. meta: {...} (is_last=True)         ← DialogAdd用メタデータ

DialogAdd:

text: "こんにちは"
meta: {emotion: "嬉しい", thought: "わくわく"}

ストリーミングとDialogAddの組み合わせ

  • emotion: 確定した時点で即座に送信(表情変更などに使用)
  • text: ストリーミングで順次送信(低レイテンシでテキスト表示)
  • thought: 確定した時点で送信(心の声の表示)
  • DialogAdd: すべてが確定した is_last=True 時点で履歴に保存

例4: 感情タグを分離してTTSに渡す

LLMが [emotion:xxx]セリフ 形式で出力する場合、感情タグ(decoration_id)と発話テキストを分離してTTS3Chainに渡します。

TTS3Chainの待機動作:

TTS3Chainは、decoration_id設定にvalues.decoration_idが含まれている場合、その値のis_last=Trueを**自動的に待機**してから音声合成を開始します。これにより、LambdaFuncでdecoration_idを確定させてからTTSが処理を開始することが保証されます。

graph LR
    A[Entry] --> B[DialogHistory]
    B --> C[LLM<br/>streaming=true]
    C -->|streaming| D[LambdaFunc<br/>realtime<br/>タグ分離]
    D -->|decoration_id<br/>is_last=True| E[TTS3<br/>待機後に開始]
    D -->|response<br/>streaming| E
    E -->|audio streaming| F[EventResponse]

    style C fill:#e8f5e9
    style D fill:#fff4e1
    style E fill:#e1f5ff
    style F fill:#ffe8e8

TTS3の設定:

decoration_id: "{{ values.decoration_id if values.decoration_id else 'neutral' }}"
text_stream: "{{values.response}}"

TTS3Chainの自動待機

decoration_id設定にvalues.decoration_idが含まれている場合、TTS3Chainは自動的にその値が確定する(is_last=Trueになる)まで待機します。これにより、正しい感情で音声合成が行われます。

LambdaFunc設定(realtime):

async def extract_decoration_id(input_message, output_message):
    """
    セリフから [emotion:xxx] を抽出し、テキストと decoration_id を分離する

    入力例: "[emotion:laughing_speech]ハッハー!「よろー」だって!?"
    出力:
      - decoration_id: "laughing_speech" (is_last=True)
      - response: "ハッハー!「よろー」だって!?" (streaming)
    """
    text = input_message.values.response.x
    is_last = input_message.is_last('response')

    # --- タグが閉じていない場合は待機 ---
    if '[' in text and ']' not in text:
        return Continue()

    # --- buffer初期化 ---
    if 'decoration_id_sent' not in buffer:
        buffer['decoration_id_sent'] = False
        buffer['decoration_id'] = 'neutral'  # デフォルト値

    # --- decoration_id の抽出(一度だけ送信)---
    match = re.search(r'\[(?:emotion|decoration_id):([^\]]+)\]', text)
    if match and not buffer['decoration_id_sent']:
        buffer['decoration_id'] = match.group(1).strip()
        # ★ decoration_idを先に確定送信(TTS3がこれを待つ)
        await output_message.update('decoration_id', buffer['decoration_id'], is_last=True)
        buffer['decoration_id_sent'] = True

    # タグを除去してテキストを取得
    clean_text = re.sub(r'\[[^\]]*\]', '', text).strip()

    # テキストがまだない場合は待機
    if not clean_text:
        return Continue()

    # ★ responseをストリーミング送信(TTS3はdecoration_id確定後に処理開始)
    await output_message.update('response', clean_text, is_last=is_last)

処理の流れ:

LLM出力 decoration_id response TTS3動作
"[emotion:laugh" - - Continue()(タグ未閉じ)
"[emotion:laughing_speech]" "laughing_speech" - 待機中...
"[emotion:laughing_speech]ハッハー" - "ハッハー" 音声合成開始!
"[emotion:laughing_speech]ハッハー!..." - "ハッハー!..." 音声合成継続

※ ✓ は is_last=True で送信

ポイント:

  1. decoration_idを最初にis_last=Trueで送信(一度だけ)
  2. TTS3Chainはvalues.decoration_idの確定を自動的に待機
  3. decoration_id確定後、responseのストリーミングに応じて音声合成を開始
  4. 正しい感情(laughing_speech)で音声が再生される