コンテンツにスキップ

Gate + Fan-in 注意事項

Gate + Fan-in トポロジー: 開発者がハマりやすい注意事項

GraphExecutorChain で Gate と fan-in を組み合わせる際の落とし穴と、設計上の制約をまとめます。


推奨対策一覧

項目 理論的に起きうるか 推奨対策
エッジの to: null △ 実装済み(スキップされる) dependency_graph の next_elem_ids ループで None をスキップ。終端ノードは to を省略するか実在要素を指定
Gate 閉鎖時の fan-in ハング △ 実装済みで理論上起きない 実装を維持。改修時は test_gate_fanin_is_last.py を必ず実行してリグレッション確認
Gate 開放時の event 順序 △ 実装済みで理論上起きない 同上
fan-in 前段の「完了を待たない」 − 設計の説明 ドキュメント化済み。対策不要
executed_elements の前倒し △ 実装済みで理論上起きない 実装を維持。該当箇所に「前段 await の前であること必須」のコメントを残す
Value に exists() はない ○ 起きる(テストや呼び出し側で values.xxx.exists() を使うと AttributeError) テスト/呼び出し修正: Values.exists("xxx") に変更。または Value_has_received_value 等を公開する exists 相当の API を追加
message_id 重複 ○ 起きる(YAML の誤記) 既に _normalize_edges で検証。エラーメッセージに従い YAML を修正。対策不要
ChainMessage の values 上書き ○ 起きる(YAML で values を期待してハマる) chain-graph-construction-guide を参照。YAML で chain_messages の values を指定しない
fan-in ノードのブロック判定 − 設計の説明 _collect_gate_downstream 改修時に注意。対策不要

1. エッジの to: null(実装済み: スキップ)

現象

to: null を指定したエッジは、dependency_graphNone が含まれる。以前は KeyError が発生していた。

対処(実装済み)

execute_element 内の next_elem_ids ループで next_id is None または next_id not in chain_elements_dict の場合をスキップする。to: null のエッジは事実上無視され、KeyError は発生しない。

ただし、**終端ノードは to を省略するか、実在する要素を指定する**ことを推奨する。

# ❌ 避ける(スキップされるが意図が不明確)
edges:
  - from: last_element
    to: null
    message_id: m_out

# ✅ 推奨: 終端の場合は to を省略するか、実在要素を指定

2. Gate 閉鎖時の fan-in ハング

現象

Gate が閉じている経路と開いている経路が fan-in で合流するとき、チェーンがハングする(タイムアウトまで待ち続ける)。

原因

fan-in ノードは「全入力から is_last が来るまで」待つ。Gate 閉鎖経路は下流を実行しないため、その経路の ChainMessage に is_last が来ず、fan-in が永遠に待つ。

対処(実装済み)

_collect_gate_downstream で Gate 閉鎖時にブロック対象の ChainMessage に trigger_event(force_is_last=True) を呼び、fan-in が待ちを解除できるようにしている。この仕組みを壊さないこと。


3. Gate 開放時の event 通知順序

現象

Gate を開放したときに、下流の fan-in がデッドロックする。

原因

Gate 開放時、下流タスクを起動してから gate_decision_events[elem_id].set() すると、fan-in 側が「Gate の判定完了」を待っているのに event が来ないためデッドロックになる。

対処(実装済み)

必ず event を先に set してから 下流タスクを起動する。

# ✅ 正しい順序
if elem_id in gate_decision_events:
    gate_decision_events[elem_id].set()
if elem_id in dependency_graph:
    # 下流タスク起動

4. fan-in の前段は「完了を待たない」

現象

fan-in の前段を await で順次待つと、ストリーミングや並列実行のメリットが失われる。

設計

fan-in の前段は asyncio.create_task() で起動するだけで、完了は待たない。これにより、例: PublicLLM のストリーム完了を待たずに Lambda→TTS3 が早めに合成を開始できる。

ただし、**Gate の下流にある前段**は gate_decision_events を待ってからプルする。Gate 閉鎖時は is_last 伝搬で解決し、Gate 開放時は順方向で既に起動済みなので no-op になる。


5. executed_elements の前倒し

現象

二重実行(例: 音声の二重再生)が発生する。

原因

executed_elements.add(elem_id) を「前段の await の後」に置くと、await で他タスクに切り替わった隙に、別経路から同じ要素が実行される。

対処(実装済み)

executed_elements.add(elem_id) は**前段 await の前**に実行する。


6. fan-in ノードのブロック判定

設計

_collect_gate_downstream では、fan-in ノード(Gate 外からも到達可能な要素)はブロックしない。全 predecessor が「既に blocked 確定」または Gate 自身である要素のみブロックする。

共有 ChainMessage(ブロック対象外からも出力される)は is_last 伝搬対象から除外する。


7. エッジの message_id 重複

現象

ValueError: Duplicate message_id 'xxx' detected

原因

**異なる from 要素**から同じ message_id を使っている。

対処

from 要素ごとに別の message_id を使う。同じ from からの fan-out では同じ message_id でよい。


8. ChainMessage の values は YAML で上書きされる

現象

YAML の chain_messages で定義した values が反映されない。

原因

_rebuild_chain_messages_values() で、エッジの from 要素の get_output_values() から自動決定され、YAML 定義は上書きされる。

対処

values は YAML で直接指定せず、エッジと get_output_values() の整合性を保つ。


9. Value に exists() はない

現象

AttributeError: 'Value' object has no attribute 'exists'

原因

Value クラスには exists メソッドがない。Values.exists(name) はあるが、values.status.exists() のような呼び出しは無効。

対処

output_msg.values.exists("status") のように、Values に対して名前を渡して呼ぶ。


関連ドキュメント

  • グラフ構築の全体像: リポジトリ内 docs/developper-guides/chain-graph-construction-guide.md
  • トポロジー別テスト例: リポジトリ内 marionette/tests/test_gate_fanin_is_last.py