Home > EulerAgent > Tutorials > Graph > Graph 09. Parallel Research + Judge Quality Loop —...

Graph 09. Parallel Research + Judge Quality Loop — Comprehensive Example

Learning Objectives

After completing this tutorial, you will be able to:


Prerequisites

# 07, 08 튜토리얼 완료 확인
euleragent graph validate examples/graphs/parallel/my_first_parallel.yaml
euleragent graph validate examples/graphs/parallel/three_branch_research.yaml

mkdir -p examples/graphs/capstone

Capstone Graph Full Topology

plan
  │
  ├──→ web_search   (plan mode + HITL) ──┐
  ├──→ local_search (execute mode)    ──┤──→ merge_findings → evaluate ⇄ revise
  └──→ doc_search   (execute mode)    ──┘                          ↓ finalize

Characteristics of each branch: - web_search: mode: plan + force_tool: web.search -- HITL approval required, internet search - local_search: mode: execute -- autonomous execution, local document/vector DB search - doc_search: mode: execute -- autonomous execution, specific document retrieval

Key challenges of this topology: - web_search must wait for HITL approval, so the approvals_resolved condition is needed - However, Fan-in must transition to the join node only after all branches have completed - The combination of approvals_resolved and parallel Fan-in must be handled correctly


Step-by-Step Walkthrough

Step 1: Designing the state_schema

state_schema:
  # 3개 브랜치가 모두 수집 결과를 합산
  findings:
    type: list
    merge: append_list   # web_search, local_search, doc_search 모두 씀 → 결정론적

  # 각 브랜치가 찾은 소스 수를 합산
  source_count:
    type: integer
    merge: sum_int       # 3개 브랜치 모두 씀 → 결정론적

  # merge_findings가 작성하는 최종 요약 (단일 노드)
  final_summary:
    type: string
    merge: last_write    # merge_findings만 씀 → 안전

Design rationale: - findings: Each of the 3 branches contributes collected results to a single list -> append_list - source_count: Each branch contributes "I found N sources" -> sum_int - final_summary: Written by a single node after the parallel phase -> last_write is safe

Step 2: Declaring parallel_groups

parallel_groups:
  - id: research_group
    branches: [web_search, local_search, doc_search]
    join: merge_findings

Step 3: Designing the plan Node

- id: plan
  kind: llm
  runner:
    mode: execute      # 검색 방향 결정 (자율 실행)
  artifacts:
    primary: search_plan.md

The plan node determines which search strategy to use. Since it runs before the parallel branch execution, it operates in execute mode for autonomous execution.

Step 4: Designing the web_search Branch (HITL)

- id: web_search
  kind: llm
  runner:
    mode: plan             # HITL: 검색 계획 제안 → 사람 승인 → 실행
    force_tool: web.search # 반드시 web.search 도구 사용
    min_proposals: 2       # 최소 2개 검색 쿼리 제안
  guardrails:
    tool_call_budget:
      web.search: 5        # 웹 검색 최대 5회
  writes_state: [findings, source_count]

Important: Since web_search uses mode: plan, HITL approval is required before execution. The Fan-in edge from this branch to merge_findings uses the approvals_resolved condition.

- id: local_search
  kind: llm
  runner:
    mode: execute          # 자율 실행 (승인 불필요)
    exclude_tools: [shell.exec, file.write]  # 안전 설정
  writes_state: [findings, source_count]
- id: doc_search
  kind: llm
  runner:
    mode: execute
    exclude_tools: [shell.exec, file.write, web.search]
  writes_state: [findings]   # source_count는 쓰지 않음 (문서 수는 별도 집계)

doc_search does not write to source_count. This branch queries from a specific document collection, so the concept of "source count" is different. As such, writes_state for each branch can differ.

Step 7: merge_findings Join Node

- id: merge_findings
  kind: llm
  runner:
    mode: execute
  reads_state: [findings, source_count]
  writes_state: [final_summary]
  artifacts:
    primary: research_report.md

Step 8: evaluate Judge and revise Node

- id: evaluate
  kind: judge
  judge:
    schema: evaluator_v1
    route_values: [finalize, revise]

- id: revise
  kind: llm
  runner:
    mode: execute
    max_loops: 2
  artifacts:
    primary: research_report.md

Step 9: Edge Connections (Key: approvals_resolved Handling)

edges:
  # plan → 팬아웃 (3개 브랜치 동시 시작)
  - from: plan
    to: web_search
    when: "true"
  - from: plan
    to: local_search
    when: "true"
  - from: plan
    to: doc_search
    when: "true"

  # 팬인: web_search → merge_findings
  # web_search는 HITL이므로 approvals_resolved 사용
  - from: web_search
    to: merge_findings
    when: "approvals_resolved"   # ← HITL 승인 완료 후 조인

  # 팬인: local_search, doc_search → merge_findings
  # 자율 실행 브랜치는 "true" 사용
  - from: local_search
    to: merge_findings
    when: "true"
  - from: doc_search
    to: merge_findings
    when: "true"

  # 하류 순차 실행
  - from: merge_findings
    to: evaluate
    when: "true"
  - from: evaluate
    to: finalize
    when: "judge.route == finalize"
  - from: evaluate
    to: revise
    when: "judge.route == revise"
  - from: revise
    to: evaluate
    when: "true"

Key explanation:

web_search (HITL) ──[approvals_resolved]──┐
local_search      ──[true]────────────────┤──→ merge_findings
doc_search        ──[true]────────────────┘

LangGraph는 3개의 팬인 엣지를 모두 기다린 후 merge_findings를 실행합니다.
web_search가 HITL로 지연되어도 LangGraph가 모든 브랜치 완료를 기다립니다.

Step 10: Writing the Complete YAML

# examples/graphs/capstone/parallel_research_with_quality.yaml
id: graph.parallel_research_with_quality
version: 1
category: research
description: |
  3개 소스 병렬 리서치 + Judge 품질 루프
  plan → [web_search(HITL) | local_search | doc_search] → merge_findings → evaluate ⇄ revise

state_schema:
  findings:
    type: list
    merge: append_list

  source_count:
    type: integer
    merge: sum_int

  final_summary:
    type: string
    merge: last_write

defaults:
  max_iterations: 4
  max_total_tool_calls: 60
  max_web_search_calls: 10

parallel_groups:
  - id: research_group
    branches: [web_search, local_search, doc_search]
    join: merge_findings

nodes:
  - id: plan
    kind: llm
    runner:
      mode: execute
    artifacts:
      primary: search_plan.md

  - id: web_search
    kind: llm
    runner:
      mode: plan
      force_tool: web.search
      min_proposals: 2
    guardrails:
      tool_call_budget:
        web.search: 5
    writes_state: [findings, source_count]

  - id: local_search
    kind: llm
    runner:
      mode: execute
      exclude_tools: [shell.exec, file.write]
    writes_state: [findings, source_count]

  - id: doc_search
    kind: llm
    runner:
      mode: execute
      exclude_tools: [shell.exec, file.write, web.search]
    writes_state: [findings]

  - id: merge_findings
    kind: llm
    runner:
      mode: execute
    reads_state: [findings, source_count]
    writes_state: [final_summary]
    artifacts:
      primary: research_report.md

  - id: evaluate
    kind: judge
    judge:
      schema: evaluator_v1
      route_values: [finalize, revise]

  - id: revise
    kind: llm
    runner:
      mode: execute
      max_loops: 2
    artifacts:
      primary: research_report.md

edges:
  # plan → 팬아웃
  - from: plan
    to: web_search
    when: "true"
  - from: plan
    to: local_search
    when: "true"
  - from: plan
    to: doc_search
    when: "true"

  # 팬인
  - from: web_search
    to: merge_findings
    when: "approvals_resolved"   # HITL 승인 후
  - from: local_search
    to: merge_findings
    when: "true"
  - from: doc_search
    to: merge_findings
    when: "true"

  # 하류 순차
  - from: merge_findings
    to: evaluate
    when: "true"
  - from: evaluate
    to: finalize
    when: "judge.route == finalize"
  - from: evaluate
    to: revise
    when: "judge.route == revise"
  - from: revise
    to: evaluate
    when: "true"

finalize:
  artifact: research_report.md

Step 11: Full graph validate Pass Confirmation

euleragent graph validate examples/graphs/capstone/parallel_research_with_quality.yaml

Expected output:

검증 중: examples/graphs/capstone/parallel_research_with_quality.yaml

단계 1/3: YAML 파싱...
  id: graph.parallel_research_with_quality
  노드 수: 7 (plan, web_search, local_search, doc_search, merge_findings, evaluate, revise)
  엣지 수: 10
  parallel_groups: 1개 (research_group, 3개 브랜치)
  완료

단계 2/3: Pattern 기본 검증...
  [✓] 노드 ID 유일성
  [✓] 엣지 소스/타겟 존재
  [✓] finalize 도달 가능성
  [✓] judge route_values 커버리지
        evaluate: [finalize, revise] ← 엣지와 일치 ✓
  [✓] 순환 감지됨: evaluate → revise → evaluate
      max_iterations: 4 ✓
  완료

단계 3/3: Graph 추가 검증...
  [✓] state_schema 존재
  [✓] state_schema 타입+merge 호환성:
        findings: list + append_list ✓
        source_count: integer + sum_int ✓
        final_summary: string + last_write ✓
  [✓] parallel_groups 검증:
        research_group:
          branches: [web_search, local_search, doc_search] (3개)
          join: merge_findings ✓
  [✓] 각 브랜치 writes_state:
        web_search: [findings, source_count] ✓
        local_search: [findings, source_count] ✓
        doc_search: [findings] ✓  (source_count 미선언 — 허용)
  [✓] findings (append_list): 3개 브랜치 사용 → 결정론적 ✓
  [✓] source_count (sum_int): 2개 브랜치 사용 → 결정론적 ✓
  [✓] 팬아웃 엣지: plan → web_search, local_search, doc_search ✓
  [✓] 팬인 엣지:
        web_search → merge_findings (approvals_resolved) ✓
        local_search → merge_findings (true) ✓
        doc_search → merge_findings (true) ✓
  [✓] 브랜치가 finalize로 직접 라우팅 안 함 ✓
  [✓] 병렬 브랜치 부작용 도구:
        web_search: force_tool=web.search (OK, web.search는 허용)
        local_search: exclude_tools=[shell.exec, file.write] ✓
        doc_search: exclude_tools=[shell.exec, file.write, web.search] ✓
  완료

결과: 유효 ✓ (오류 없음, 경고 없음)

Step 12: Full IR Verification with graph compile

euleragent graph compile \
  examples/graphs/capstone/parallel_research_with_quality.yaml \
  --out examples/graphs/capstone/parallel_research_compiled.json

# IR 전체 구조 요약 출력
python -m json.tool examples/graphs/capstone/parallel_research_compiled.json | \
  python -c "
import sys, json
d = json.load(sys.stdin)
print(f'graph_type: {d[\"graph_type\"]}')
print(f'id: {d[\"id\"]}')
print(f'compiled_at: {d[\"compiled_at\"]}')
print()
print('=== state_schema ===')
for key, val in d['state_schema'].items():
    print(f'  {key}: {val[\"type\"]} + {val[\"merge\"]}')
print()
print('=== parallel_groups ===')
for g in d['parallel_groups']:
    print(f'  {g[\"id\"]}: branches={g[\"branches\"]}, join={g[\"join\"]}')
print()
print('=== nodes ===')
for n in d['nodes']:
    ib = n.get('interrupt_before', False)
    ia = n.get('interrupt_after', False)
    ws = n.get('writes_state', [])
    rs = n.get('reads_state', [])
    print(f'  [{n[\"kind\"]}] {n[\"id\"]} writes={ws} reads={rs} ib={ib} ia={ia}')
print()
print('=== langgraph_builder ===')
lb = d['langgraph_builder']
print(f'  interrupt_before: {lb[\"interrupt_before\"]}')
print(f'  interrupt_after: {lb[\"interrupt_after\"]}')
print(f'  conditional_edges: {len(lb[\"add_conditional_edges\"])}개')
"

Expected output:

graph_type: graph
id: graph.parallel_research_with_quality
compiled_at: 2026-02-23T11:00:00Z

=== state_schema ===
  findings: list + append_list
  source_count: integer + sum_int
  final_summary: string + last_write

=== parallel_groups ===
  research_group: branches=['web_search', 'local_search', 'doc_search'], join=merge_findings

=== nodes ===
  [llm] plan writes=[] reads=[] ib=False ia=False
  [llm] web_search writes=['findings', 'source_count'] reads=[] ib=False ia=False
  [llm] local_search writes=['findings', 'source_count'] reads=[] ib=False ia=False
  [llm] doc_search writes=['findings'] reads=[] ib=False ia=False
  [llm] merge_findings writes=['final_summary'] reads=['findings', 'source_count'] ib=False ia=False
  [judge] evaluate writes=[] reads=[] ib=False ia=False
  [llm] revise writes=[] reads=[] ib=False ia=False

=== langgraph_builder ===
  interrupt_before: []
  interrupt_after: []
  conditional_edges: 1개

Hands-On Exercise: Competitor Analysis + Market Positioning Dual Parallel Graph

Design a more complex graph using all the concepts learned so far.

Topology

strategy_plan
    │
    ├──→ [competitor_a | competitor_b | competitor_c] → merge_competitors
    │    (parallel_group_1: competitor_research)
    │
    └──→ market_analysis (sequential, not parallel)
         ↓
    gap_analysis (reads results from both merge_competitors and market_analysis)
         ↓
    positioning_draft
         ↓
    quality_judge ⇄ positioning_revise
         ↓ finalize

Requirements

  1. state_schema:
  2. competitor_data: list + append_list (written by competitor branches)
  3. market_size: integer + sum_int (sum of market sizes discovered by competitor branches)
  4. positioning_report: string + last_write (written by positioning_draft)

  5. parallel_groups:

  6. competitor_research: branches=[competitor_a, competitor_b, competitor_c], join=merge_competitors

  7. Node details:

  8. strategy_plan: execute mode
  9. competitor_a/b/c: execute mode, writes_state: [competitor_data, market_size]
  10. merge_competitors: execute mode, reads_state: [competitor_data, market_size]
  11. market_analysis: execute mode (not parallel, sequential after strategy_plan)
  12. gap_analysis: execute mode, reads_state: [competitor_data] (depends on merge_competitors results)
  13. positioning_draft: execute mode, writes_state: [positioning_report]
  14. quality_judge: judge, route_values: [finalize, revise]
  15. positioning_revise: execute mode, max_loops: 2

  16. Edges:

  17. strategy_plan -> competitor_a/b/c (Fan-out) + strategy_plan -> market_analysis (sequential)
  18. competitor branches -> merge_competitors (Fan-in)
  19. merge_competitors -> gap_analysis + market_analysis -> gap_analysis (after both nodes complete)
  20. gap_analysis -> positioning_draft -> quality_judge -> [finalize/revise]

  21. defaults: max_iterations: 3, max_total_tool_calls: 80

# 작성 후 전체 검증
euleragent graph validate examples/graphs/capstone/competitor_positioning.yaml
euleragent graph compile examples/graphs/capstone/competitor_positioning.yaml \
  --out examples/graphs/capstone/competitor_positioning_compiled.json

Summary of All Concepts Used in This Tutorial

Tutorial Concepts Used in This Graph
01_concepts Graph is a superset of Pattern, LangGraph StateGraph
02_linear_graph Basic node/edge structure, finalize
03_judge_route evaluate node, route_values [finalize, revise]
04_bounded_loop max_iterations: 4, evaluate <-> revise loop
05_interrupt_hooks (Not used in this graph -- can be added to web_search as an extension exercise)
06_state_schema findings (append_list), source_count (sum_int), final_summary (last_write)
07_parallel_basics parallel_groups, writes_state, reads_state, Fan-out/Fan-in edges
08_parallel_advanced 3-branch, different writes_state per branch, exclude_tools

Expected Output Summary

Command Expected Result
graph validate parallel_research_with_quality.yaml Valid, 0 errors, 0 warnings
graph compile parallel_research_with_quality.yaml IR generated, graph_type: "graph"
IR parallel_groups research_group: 3 branches
IR state_schema findings: append_list, source_count: sum_int
IR langgraph_builder.add_conditional_edges 1 (evaluate routing)

Key Concepts Summary

Concept Role in This Graph
Parallel Fan-out plan -> [web_search, local_search, doc_search]
Parallel Fan-in Three branches -> merge_findings
approvals_resolved + Fan-in web_search(HITL) -> merge_findings
append_list findings: combines results from three branches into one list
sum_int source_count: sums numeric contributions from two branches
last_write final_summary: written by a single join node
Judge loop evaluate <-> revise, max_iterations: 4
exclude_tools Explicitly excludes dangerous tools from parallel branches

Common Errors

Error 1: Using approvals_resolved on all Fan-in edges

# 잘못된 예: local_search는 HITL이 아닌데 approvals_resolved 사용
- from: local_search
  to: merge_findings
  when: "approvals_resolved"  # ← local_search는 execute mode! 항상 true여야 함

Use approvals_resolved only for branches that actually have HITL approval. If you use approvals_resolved on a branch without HITL, it may pass immediately because the approval queue is empty, but it is semantically confusing.

Error 2: doc_search doesn't write source_count -- is that an error with state_schema?

An error occurs if a node tries to write to a key not in its writes_state, but declaring only some keys in writes_state is allowed.

# OK: doc_search는 findings만 씀, source_count는 안 씀
- id: doc_search
  writes_state: [findings]   # source_count 없음 — 허용

Even though source_count exists in the state_schema, if doc_search doesn't write to it, this branch simply does not contribute to source_count. The sum_int Reducer treats a missing contribution as 0.

Error 3: merge_findings attempting to execute before the join

LangGraph does not execute the join node until all source branches of the Fan-in edges have completed. This is handled automatically, and no additional conditions need to be configured.

자동 동작:
  web_search (HITL 대기 중)
  local_search 완료 → merge_findings 대기
  doc_search 완료 → merge_findings 대기
  web_search 승인 완료 → merge_findings 실행! (세 브랜치 모두 완료)

Previous: 08_parallel_advanced.md | Next: 10_reference.md

← Prev Back to List Next →