Home > EulerAgent > Tutorials > Graph > Graph 07. Parallel Execution Basics — Yo...

Graph 07. Parallel Execution Basics — Your First Fan-out/Fan-in

Learning Objectives

After completing this tutorial, you will be able to:


Prerequisites

mkdir -p examples/graphs/parallel

# 06_state_schema.md 완료 확인
euleragent graph validate examples/graphs/state/schema_demo.yaml

Parallel Fan-out/Fan-in Topology

This is the basic parallel structure we will implement in this tutorial.

source ──┬──→ branch_a ──┬──→ join_node → evaluate → finalize
         └──→ branch_b ──┘

Step-by-Step Walkthrough

Step 1: Graph Design

First, design the overall structure of the graph to be implemented.

Purpose: Collect information from two sources simultaneously, merge them, and evaluate the result.

Node Role Type
source Determine search direction llm (execute)
branch_a Web source search llm (execute)
branch_b Internal DB search llm (execute)
join_node Aggregate results and draft report llm (execute)
evaluate Quality evaluation judge
revise Revision llm (execute)

state_schema design: - results: list + append_list -- list of results collected by the two branches - summary: string + last_write -- final summary written by join_node

Step 2: state_schema Declaration

state_schema:
  results:
    type: list
    merge: append_list   # 두 브랜치 결과를 하나의 리스트로 합산

  summary:
    type: string
    merge: last_write    # join_node만 씀 (단일 브랜치 → 안전)

Step 3: parallel_groups Declaration

parallel_groups:
  - id: search_group           # 그룹 ID
    branches: [branch_a, branch_b]  # 병렬 실행할 브랜치 노드 ID 목록
    join: join_node            # 모든 브랜치 완료 후 실행할 조인 노드 ID

Step 4: Node Declarations (with writes_state and reads_state)

Parallel branch nodes must declare writes_state. Failing to do so will cause a PARALLEL_METADATA_DROPPED error.

nodes:
  - id: source
    kind: llm
    runner:
      mode: execute

  - id: branch_a
    kind: llm
    runner:
      mode: execute
    writes_state: [results]    # ← 반드시 선언 (빈 리스트도 가능)

  - id: branch_b
    kind: llm
    runner:
      mode: execute
    writes_state: [results]    # ← branch_a와 같은 키 → append_list로 안전

  - id: join_node
    kind: llm
    runner:
      mode: execute
    reads_state: [results]     # ← 두 브랜치의 결과를 읽음
    writes_state: [summary]    # ← 최종 요약을 씀
    artifacts:
      primary: combined_report.md

  - id: evaluate
    kind: judge
    judge:
      schema: evaluator_v1
      route_values: [finalize, revise]

  - id: revise
    kind: llm
    runner:
      mode: execute
    artifacts:
      primary: combined_report.md

Step 5: Edge Connections (Fan-out + Fan-in)

edges:
  # 팬아웃: source → 두 브랜치
  - from: source
    to: branch_a
    when: "true"
  - from: source
    to: branch_b
    when: "true"

  # 팬인: 두 브랜치 → join_node
  - from: branch_a
    to: join_node
    when: "true"
  - from: branch_b
    to: join_node
    when: "true"

  # 하류: join_node 이후 순차 실행
  - from: join_node
    to: evaluate
    when: "true"
  - from: evaluate
    to: finalize
    when: "judge.route == finalize"
  - from: evaluate
    to: revise
    when: "judge.route == revise"
  - from: revise
    to: evaluate
    when: "true"

Step 6: Writing the Complete YAML

Now assemble all the pieces into one.

# examples/graphs/parallel/my_first_parallel.yaml
id: graph.my_first_parallel
version: 1
category: research
description: 2개 브랜치 병렬 검색 — source → [branch_a|branch_b] → join → evaluate

# 병렬 실행이 있으면 state_schema 필수
state_schema:
  results:
    type: list
    merge: append_list   # branch_a, branch_b 모두 이 키에 씀 → 합산

  summary:
    type: string
    merge: last_write    # join_node만 씀 → 안전

defaults:
  max_iterations: 3
  max_total_tool_calls: 30
  max_web_search_calls: 5

# 병렬 그룹 선언
parallel_groups:
  - id: search_group
    branches: [branch_a, branch_b]
    join: join_node

nodes:
  - id: source
    kind: llm
    runner:
      mode: execute

  - id: branch_a
    kind: llm
    runner:
      mode: execute
    writes_state: [results]    # ← 반드시 선언!

  - id: branch_b
    kind: llm
    runner:
      mode: execute
    writes_state: [results]    # ← branch_a와 같은 키

  - id: join_node
    kind: llm
    runner:
      mode: execute
    reads_state: [results]     # ← 두 브랜치의 수집 결과 읽기
    writes_state: [summary]
    artifacts:
      primary: combined_report.md

  - id: evaluate
    kind: judge
    judge:
      schema: evaluator_v1
      route_values: [finalize, revise]

  - id: revise
    kind: llm
    runner:
      mode: execute
    artifacts:
      primary: combined_report.md

edges:
  # 팬아웃
  - from: source
    to: branch_a
    when: "true"
  - from: source
    to: branch_b
    when: "true"

  # 팬인
  - from: branch_a
    to: join_node
    when: "true"
  - from: branch_b
    to: join_node
    when: "true"

  # 하류 순차 실행
  - from: join_node
    to: evaluate
    when: "true"
  - from: evaluate
    to: finalize
    when: "judge.route == finalize"
  - from: evaluate
    to: revise
    when: "judge.route == revise"
  - from: revise
    to: evaluate
    when: "true"

finalize:
  artifact: combined_report.md

Step 7: Running graph validate

euleragent graph validate examples/graphs/parallel/my_first_parallel.yaml

Expected output:

검증 중: examples/graphs/parallel/my_first_parallel.yaml

단계 1/3: YAML 파싱...
  id: graph.my_first_parallel
  노드 수: 6 (source, branch_a, branch_b, join_node, evaluate, revise)
  엣지 수: 8
  parallel_groups: 1개 (search_group)
  완료

단계 2/3: Pattern 기본 검증...
  [✓] 노드 ID 유일성
  [✓] 엣지 소스/타겟 존재
  [✓] finalize 도달 가능성
  [✓] judge route_values 커버리지
  [✓] 순환 감지: evaluate → revise → evaluate (max_iterations=3)
  완료

단계 3/3: Graph 추가 검증...
  [✓] state_schema 존재 (parallel_groups 사용 시 필수)
  [✓] state_schema 타입+merge 호환성:
        results: list + append_list ✓
        summary: string + last_write ✓
  [✓] parallel_groups 검증:
        search_group:
          branches: [branch_a, branch_b] (2개, 최대 8개)
          join: join_node ✓ (노드 존재)
  [✓] 각 브랜치 writes_state 선언:
        branch_a: [results] ✓
        branch_b: [results] ✓
  [✓] 팬아웃 엣지:
        source → branch_a ✓
        source → branch_b ✓
  [✓] 팬인 엣지 (브랜치 → 조인):
        branch_a → join_node ✓
        branch_b → join_node ✓
  [✓] 브랜치가 finalize로 직접 라우팅 안 함 ✓
  [✓] 브랜치 수 제한 (2 ≤ 8) ✓
  [✓] 병렬 브랜치 부작용 도구 미사용 ✓
  완료

결과: 유효 ✓ (오류 없음, 경고 없음)

All parallel checks pass!

Step 8: Verify IR with graph compile

euleragent graph compile examples/graphs/parallel/my_first_parallel.yaml \
  --out /tmp/my_first_parallel_ir.json

# IR의 parallel_groups 섹션 확인
python -m json.tool /tmp/my_first_parallel_ir.json | \
  python -c "
import sys, json
d = json.load(sys.stdin)
print('=== parallel_groups ===')
print(json.dumps(d['parallel_groups'], indent=2))
print()
print('=== langgraph_builder (병렬 엣지) ===')
lb = d['langgraph_builder']
print('add_edges:', json.dumps(lb['add_edges'], indent=2))
"

Expected output:

=== parallel_groups ===
[
  {
    "id": "search_group",
    "branches": ["branch_a", "branch_b"],
    "join": "join_node",
    "fanout_source": "source",
    "fanin_target": "join_node"
  }
]

=== langgraph_builder (병렬 엣지) ===
add_edges: [
  ["source", "branch_a"],
  ["source", "branch_b"],
  ["branch_a", "join_node"],
  ["branch_b", "join_node"],
  ["join_node", "evaluate"],
  ["revise", "evaluate"]
]

The fanout_source and fanin_target are automatically added to the IR's parallel_groups. This information is used by LangGraph to build the actual parallel execution structure.


Step 9: Intentional Error Demonstrations

Error Demo 1: PARALLEL_METADATA_DROPPED

Remove writes_state from a branch node.

# branch_a에서 writes_state 제거
- id: branch_a
  kind: llm
  runner:
    mode: execute
  # writes_state: [results] ← 제거!
# 임시 파일로 테스트 (원본 수정 대신)
cp examples/graphs/parallel/my_first_parallel.yaml /tmp/parallel_no_writes.yaml
# branch_a의 writes_state 제거 후 검증

euleragent graph validate /tmp/parallel_no_writes.yaml

Expected output:

오류: PARALLEL_METADATA_DROPPED
  parallel_groups 'search_group'의 브랜치 노드 'branch_a'에
  writes_state가 선언되지 않았습니다.

  writes_state는 병렬 브랜치에서 필수입니다. 상태를 쓰지 않는
  브랜치라도 빈 리스트를 명시적으로 선언해야 합니다:
    writes_state: []

  이 요구사항은 브랜치의 상태 기여를 명시적으로 문서화하기
  위한 것입니다.

Fix:

# 상태를 쓰지 않는 브랜치라도 명시적으로 선언
- id: branch_a
  kind: llm
  runner:
    mode: execute
  writes_state: []  # 빈 리스트로 명시적 선언

Error Demo 2: PARALLEL_FINALIZE_BEFORE_JOIN

Add a direct edge from a branch to finalize.

# 잘못된 엣지: branch_a → finalize (조인 노드를 건너뜀)
edges:
  - from: branch_a
    to: finalize   # ← 직접 finalize? 금지!
    when: "true"

Expected output:

오류: PARALLEL_FINALIZE_BEFORE_JOIN
  병렬 브랜치 'branch_a'가 finalize 노드로 직접 라우팅합니다.
  이는 다른 브랜치(branch_b)가 완료되기 전에 그래프가 종료될 수 있어
  데이터 손실이 발생합니다.

  해결: 모든 브랜치는 반드시 조인 노드('join_node')를 거쳐야 합니다.
    branch_a → join_node (올바름)
    join_node → ... → finalize (올바름)

Error Demo 3: PARALLEL_STATE_SCHEMA_MISSING

Remove the top-level state_schema.

# state_schema: (완전히 제거)

parallel_groups:
  - id: search_group
    branches: [branch_a, branch_b]
    join: join_node

Expected output:

오류: PARALLEL_STATE_SCHEMA_MISSING
  parallel_groups가 선언되어 있지만 최상위에 state_schema가 없습니다.

  병렬 실행에서 브랜치들이 공유 상태를 쓸 때 LangGraph는 리듀서가
  필요합니다. state_schema 없이 실행하면 런타임에
  INVALID_CONCURRENT_GRAPH_UPDATE 예외가 발생합니다.

  해결:
  state_schema:
    results:
      type: list
      merge: append_list
    # ... 각 브랜치의 writes_state 키 정의

Expected Output Summary

Command Expected Result
graph validate my_first_parallel.yaml Valid, all parallel checks pass
graph compile my_first_parallel.yaml IR parallel_groups, fanout_source, fanin_target
validate after removing writes_state PARALLEL_METADATA_DROPPED
validate with direct branch-to-finalize edge PARALLEL_FINALIZE_BEFORE_JOIN
validate after removing state_schema PARALLEL_STATE_SCHEMA_MISSING

Parallel Checklist (7 Required Items)

□ 1. state_schema 선언 (parallel_groups 사용 시 필수)
□ 2. 모든 브랜치 writes_state 선언 (빈 리스트도 가능)
□ 3. 조인 노드 존재 (parallel_groups.join에 지정)
□ 4. 팬아웃 엣지: source → 모든 브랜치 (하나라도 누락 시 에러)
□ 5. 팬인 엣지: 모든 브랜치 → 조인 노드 (하나라도 누락 시 에러)
□ 6. 브랜치가 finalize로 직접 연결 안 함
□ 7. 브랜치 수 2 이상, 8 이하

Key Concepts Summary

Concept Description
Fan-out Branching from source node to multiple branch nodes
Fan-in Converging from multiple branch nodes to a join node
parallel_groups.branches List of node IDs to execute simultaneously (2-8)
parallel_groups.join Join node ID to execute after all branches complete
writes_state List of state_schema keys a branch node writes to
reads_state List of state_schema keys the join node reads from
PARALLEL_METADATA_DROPPED Branch node missing writes_state declaration
PARALLEL_FINALIZE_BEFORE_JOIN Branch routes directly to finalize without going through join
PARALLEL_STATE_SCHEMA_MISSING parallel_groups present but state_schema missing

Common Errors

Error 1: Missing Fan-out Edge (PARALLEL_FANOUT_MISSING)

# 오류: source에서 branch_b로 가는 팬아웃 엣지 없음
edges:
  - from: source
    to: branch_a
    when: "true"
  # branch_b 팬아웃 엣지 없음!
오류: PARALLEL_FANOUT_MISSING
  parallel_groups 'search_group'의 브랜치 'branch_b'로 향하는
  팬아웃 엣지가 없습니다.
  해결: source에서 branch_b로 향하는 엣지를 추가하세요.

Error 2: Missing Fan-in Edge (PARALLEL_BRANCH_NOT_CONVERGING)

# 오류: branch_b에서 join_node로 가는 팬인 엣지 없음
edges:
  - from: branch_a
    to: join_node
    when: "true"
  # branch_b → join_node 엣지 없음!
오류: PARALLEL_BRANCH_NOT_CONVERGING
  parallel_groups 'search_group'의 브랜치 'branch_b'가
  조인 노드 'join_node'로 수렴하지 않습니다.
  해결: branch_b에서 join_node로 향하는 엣지를 추가하세요.

Error 3: Duplicate Join Node (PARALLEL_JOIN_MULTIPLE)

# 오류: parallel_groups에 join이 2개?
parallel_groups:
  - id: grp
    branches: [branch_a, branch_b]
    join: join_node_1   # ← 하나만 가능!
    # join: join_node_2 — YAML에서 중복 키는 마지막 값으로 덮어쓰임
경고: PARALLEL_JOIN_MULTIPLE
  parallel_groups 'grp'에 join이 중복 선언되어 있습니다.
  YAML에서 중복 키는 마지막 값(join_node_2)으로 덮어쓰입니다.
  의도한 조인 노드를 하나만 명시하세요.

Practice Exercise

Exercise: News Analysis Parallel Graph

Write a parallel news analysis graph with the following structure.

plan → [domestic_news | international_news] → synthesize → finalize

Requirements: - plan node: determine analysis direction (execute mode) - domestic_news branch: domestic news analysis (execute mode, writes_state: [articles]) - international_news branch: international news analysis (execute mode, writes_state: [articles]) - synthesize join node: synthesize both analyses (execute mode, reads_state: [articles]) - state_schema: articles (list + append_list) - Final artifact: daily_briefing.md

euleragent graph validate examples/graphs/parallel/news_analysis.yaml
euleragent graph compile examples/graphs/parallel/news_analysis.yaml

Previous: 06_state_schema.md | Next: 08_parallel_advanced.md

← Prev Back to List Next →