Skip to content

Contrib Modules

The loom.contrib package contains optional integrations that extend Heddle's capabilities. Each module requires its own optional dependency extra.

Module Extra Purpose
contrib.council council Multi-round agent deliberation framework
contrib.chatbridge chatbridge External chat/LLM session adapters
contrib.duckdb duckdb Embedded analytics and vector search
contrib.lancedb lancedb ANN vector search via LanceDB
contrib.redis redis Production checkpoint persistence
contrib.rag rag Social media stream RAG pipeline

See Council How-To for the council and chatbridge guide. See RAG How-To for the RAG pipeline guide.

Council — Multi-Round Deliberation

Run structured team discussions where multiple LLM agents debate iteratively with pluggable protocols and convergence detection.

runner

CouncilRunner — NATS-free council execution.

Runs a multi-round deliberation directly against LLM backends without requiring NATS, actors, or running infrastructure. This is the council equivalent of :class:heddle.workshop.test_runner.WorkerTestRunner.

Usage::

from heddle.worker.backends import build_backends_from_env
from heddle.contrib.council.config import load_council_config
from heddle.contrib.council.runner import CouncilRunner

config = load_council_config("configs/councils/example.yaml")
runner = CouncilRunner(build_backends_from_env())
result = await runner.run("Should we adopt microservices?", config=config)

CouncilRunner

CouncilRunner(backends: dict[str, LLMBackend], config: CouncilConfig | None = None)

Execute a council discussion directly against LLM backends.

This replicates the multi-round deliberation loop without NATS. Each agent turn calls backend.complete() directly, builds a transcript entry, and feeds it into the next round.

Parameters:

Name Type Description Default
backends dict[str, LLMBackend]

Dict mapping tier name ("local", "standard", "frontier") to :class:LLMBackend instances.

required
config CouncilConfig | None

Optional default :class:CouncilConfig. Can be overridden per-call in :meth:run.

None
Source code in src/heddle/contrib/council/runner.py
def __init__(
    self,
    backends: dict[str, LLMBackend],
    config: CouncilConfig | None = None,
) -> None:
    self.backends = backends
    self._default_config = config
    self._active_transcript: TranscriptStore | None = None

inject

inject(agent_name: str, content: str, role: str = 'audience') -> None

Inject a spectator interjection into the active discussion.

Safe to call from another thread or coroutine while :meth:run is executing. The interjection will appear in the next agent's context as an audience reaction.

Raises :class:RuntimeError if no discussion is active.

Source code in src/heddle/contrib/council/runner.py
def inject(
    self,
    agent_name: str,
    content: str,
    role: str = "audience",
) -> None:
    """Inject a spectator interjection into the active discussion.

    Safe to call from another thread or coroutine while :meth:`run`
    is executing.  The interjection will appear in the next agent's
    context as an audience reaction.

    Raises :class:`RuntimeError` if no discussion is active.
    """
    if self._active_transcript is None:
        msg = "No active council discussion — call run() first"
        raise RuntimeError(msg)
    self._active_transcript.inject_interjection(agent_name, content, role)

run async

run(topic: str, config: CouncilConfig | None = None, on_turn: Callable | None = None) -> CouncilResult

Run a full council deliberation.

Parameters:

Name Type Description Default
topic str

The discussion topic / question.

required
config CouncilConfig | None

Council config (overrides the constructor default).

None
on_turn Callable | None

Optional callback invoked after each agent's turn with the :class:TranscriptEntry. May be sync or async.

None

Returns:

Type Description
CouncilResult

class:CouncilResult with the full transcript, synthesis,

CouncilResult

convergence info, and token usage.

Source code in src/heddle/contrib/council/runner.py
async def run(
    self,
    topic: str,
    config: CouncilConfig | None = None,
    on_turn: Callable | None = None,
) -> CouncilResult:
    """Run a full council deliberation.

    Args:
        topic: The discussion topic / question.
        config: Council config (overrides the constructor default).
        on_turn: Optional callback invoked after each agent's turn
            with the :class:`TranscriptEntry`.  May be sync or async.

    Returns:
        :class:`CouncilResult` with the full transcript, synthesis,
        convergence info, and token usage.
    """
    cfg = config or self._default_config
    if cfg is None:
        msg = "No council config provided"
        raise ValueError(msg)

    start = time.monotonic()
    log = logger.bind(council=cfg.name, topic=topic[:80])
    log.info("council.start", agents=len(cfg.agents), max_rounds=cfg.max_rounds)

    protocol = get_protocol(cfg.protocol)
    transcript = TranscriptStore()
    self._active_transcript = transcript
    convergence_backend = self.backends.get(cfg.convergence.backend_tier.value)
    detector = ConvergenceDetector(cfg.convergence, backend=convergence_backend)

    total_tokens: dict[str, int] = {"prompt_tokens": 0, "completion_tokens": 0}
    converged = False
    convergence_score: float | None = None
    rounds_completed = 0

    for round_num in range(1, cfg.max_rounds + 1):
        round_log = log.bind(round=round_num)
        round_log.info("council.round.start")

        transcript.start_round(round_num)
        turns = protocol.get_turn_order(round_num, cfg.agents, transcript)

        for turn in turns:
            agent = turn.agent
            context = protocol.build_agent_context(agent, transcript, round_num, topic)

            entry = await self._execute_agent_turn(
                agent=agent,
                context=context,
                round_num=round_num,
                topic=topic,
                config=cfg,
            )

            transcript.add_entry(entry)

            # Accumulate tokens.
            total_tokens["prompt_tokens"] += entry.token_count
            # token_count tracks prompt tokens; we don't have a
            # separate completion count from backend.complete()
            # in this simplified path.

            if on_turn is not None:
                result = on_turn(entry)
                if hasattr(result, "__await__"):
                    await result

            round_log.info(
                "council.agent_turn.done",
                agent=agent.name,
                model=entry.model_used,
                tokens=entry.token_count,
            )

        # Check convergence.
        conv_result = await detector.check(transcript, round_num, topic)
        transcript.set_convergence_score(round_num, conv_result.score)
        convergence_score = conv_result.score
        rounds_completed = round_num

        round_log.info(
            "council.round.done",
            convergence_score=conv_result.score,
            converged=conv_result.converged,
        )

        if conv_result.converged:
            converged = True
            log.info("council.converged", round=round_num, score=conv_result.score)
            break

    # Facilitator synthesis.
    synthesis = await self._synthesize(cfg, transcript, topic, total_tokens)

    # Build agent summaries (latest position per agent).
    agent_summaries = transcript.get_latest_positions()

    elapsed_ms = int((time.monotonic() - start) * 1000)
    log.info(
        "council.done",
        rounds=rounds_completed,
        converged=converged,
        elapsed_ms=elapsed_ms,
    )

    self._active_transcript = None

    return CouncilResult(
        topic=topic,
        rounds_completed=rounds_completed,
        converged=converged,
        convergence_score=convergence_score,
        synthesis=synthesis,
        transcript=transcript.rounds,
        agent_summaries=agent_summaries,
        total_token_usage=total_tokens,
        elapsed_ms=elapsed_ms,
    )

config

Council configuration loading and validation.

Loads council YAML configs into typed :class:CouncilConfig models. Follows the same pattern as :func:heddle.core.config.load_config.

CouncilConfig

Bases: BaseModel

Top-level council configuration, loaded from YAML.

load_council_config

load_council_config(path: str | Path) -> CouncilConfig

Load a council YAML config and return a validated model.

Raises:

Type Description
FileNotFoundError

If path does not exist.

ValidationError

If the YAML content is invalid.

Source code in src/heddle/contrib/council/config.py
def load_council_config(path: str | Path) -> CouncilConfig:
    """Load a council YAML config and return a validated model.

    Raises:
        FileNotFoundError: If *path* does not exist.
        pydantic.ValidationError: If the YAML content is invalid.
    """
    path = Path(path)
    with path.open() as f:
        raw = yaml.safe_load(f)
    return CouncilConfig(**raw)

schemas

Pydantic models for the council deliberation framework.

These models form the typed contract for multi-round agent discussions. They are used by :class:CouncilRunner (NATS-free execution), :class:CouncilOrchestrator (NATS-connected), and the MCP council bridge.

AgentConfig

Bases: BaseModel

Configuration for a single council agent.

Each agent is backed by either an existing Heddle worker (via worker_type) or an external chat bridge (via bridge). Exactly one must be set.

CouncilResult

Bases: BaseModel

Final output of a council deliberation.

ConvergenceResult

Bases: BaseModel

Result of a convergence check after a round.

TranscriptEntry

Bases: BaseModel

A single contribution within a discussion round.

entry_type distinguishes panelist turns from audience interjections. Agents may choose to engage with interjections or ignore them — the protocol presents them separately.

ChatBridge — External Chat Adapters

Session-aware adapters for Claude, OpenAI, Ollama, and human-in-the-loop participation. Each adapter maintains per-session conversation history.

base

ChatBridge ABC and shared data models.

All chat bridges implement :class:ChatBridge, which provides a session-aware interface for multi-turn conversations with external LLM providers or human participants.

ChatBridge

ChatBridge(system_prompt: str = '')

Bases: ABC

Abstract base for external chat session adapters.

Each bridge maintains per-session conversation history. The worker itself remains stateless (per Heddle invariants) — the state lives in the bridge's internal session dict or in the external provider's session.

Subclasses must implement :meth:send_turn, :meth:get_session_info, and :meth:close_session.

Source code in src/heddle/contrib/chatbridge/base.py
def __init__(self, system_prompt: str = "") -> None:
    self._sessions: dict[str, _Session] = {}
    self._system_prompt = system_prompt

send_turn abstractmethod async

send_turn(message: str, context: dict[str, Any], session_id: str) -> ChatResponse

Send a message and get a response.

Parameters:

Name Type Description Default
message str

The user message for this turn.

required
context dict[str, Any]

Additional context (round metadata, topic, etc.).

required
session_id str

Identifies the persistent conversation session.

required

Returns:

Type Description
ChatResponse

The assistant's response as a :class:ChatResponse.

Source code in src/heddle/contrib/chatbridge/base.py
@abstractmethod
async def send_turn(
    self,
    message: str,
    context: dict[str, Any],
    session_id: str,
) -> ChatResponse:
    """Send a message and get a response.

    Args:
        message: The user message for this turn.
        context: Additional context (round metadata, topic, etc.).
        session_id: Identifies the persistent conversation session.

    Returns:
        The assistant's response as a :class:`ChatResponse`.
    """

get_session_info abstractmethod async

get_session_info(session_id: str) -> SessionInfo

Return metadata about a session.

Source code in src/heddle/contrib/chatbridge/base.py
@abstractmethod
async def get_session_info(self, session_id: str) -> SessionInfo:
    """Return metadata about a session."""

close_session async

close_session(session_id: str) -> None

Clean up session state.

Source code in src/heddle/contrib/chatbridge/base.py
async def close_session(self, session_id: str) -> None:
    """Clean up session state."""
    self._sessions.pop(session_id, None)

ChatResponse

Bases: BaseModel

Response from a chat bridge turn.

SessionInfo

Bases: BaseModel

Metadata about an active chat session.

anthropic

Anthropic (Claude) chat bridge — session-aware Claude API adapter.

Unlike :class:heddle.worker.backends.AnthropicBackend which is stateless per-call, this bridge accumulates messages per session, enabling multi-turn conversations with Claude.

AnthropicChatBridge

AnthropicChatBridge(api_key: str | None = None, model: str = 'claude-sonnet-4-20250514', system_prompt: str = '', max_tokens: int = 2000)

Bases: ChatBridge

Claude API with per-session conversation history.

Parameters:

Name Type Description Default
api_key str | None

Anthropic API key. Falls back to ANTHROPIC_API_KEY env.

None
model str

Model identifier (default: claude-sonnet-4-20250514).

'claude-sonnet-4-20250514'
system_prompt str

System instructions applied to all sessions.

''
max_tokens int

Default max tokens per turn.

2000
Source code in src/heddle/contrib/chatbridge/anthropic.py
def __init__(
    self,
    api_key: str | None = None,
    model: str = "claude-sonnet-4-20250514",
    system_prompt: str = "",
    max_tokens: int = 2000,
) -> None:
    super().__init__(system_prompt=system_prompt)
    self._api_key = api_key or os.environ.get("ANTHROPIC_API_KEY", "")
    self._model = model
    self._max_tokens = max_tokens
    self._client = httpx.AsyncClient(
        base_url="https://api.anthropic.com",
        headers={
            "x-api-key": self._api_key,
            "anthropic-version": "2024-10-22",
            "content-type": "application/json",
        },
        timeout=120.0,
    )

send_turn async

send_turn(message: str, context: dict[str, Any], session_id: str) -> ChatResponse

Send a turn to Claude, accumulating session messages.

Source code in src/heddle/contrib/chatbridge/anthropic.py
async def send_turn(
    self,
    message: str,
    context: dict[str, Any],
    session_id: str,
) -> ChatResponse:
    """Send a turn to Claude, accumulating session messages."""
    session = self._get_or_create_session(session_id)
    session.messages.append({"role": "user", "content": message})

    body: dict[str, Any] = {
        "model": self._model,
        "max_tokens": self._max_tokens,
        "system": session.system_prompt or self._system_prompt,
        "messages": session.messages,
    }

    resp = await self._client.post("/v1/messages", json=body)
    resp.raise_for_status()
    data = resp.json()

    # Extract text content from response blocks.
    content = ""
    for block in data.get("content", []):
        if block.get("type") == "text":
            content += block.get("text", "")

    # Append assistant response to session history.
    session.messages.append({"role": "assistant", "content": content})

    usage = data.get("usage", {})
    return ChatResponse(
        content=content,
        model=data.get("model", self._model),
        token_usage={
            "prompt_tokens": usage.get("input_tokens", 0),
            "completion_tokens": usage.get("output_tokens", 0),
        },
        stop_reason=data.get("stop_reason"),
        session_id=session_id,
    )

get_session_info async

get_session_info(session_id: str) -> SessionInfo

Return session metadata.

Source code in src/heddle/contrib/chatbridge/anthropic.py
async def get_session_info(self, session_id: str) -> SessionInfo:
    """Return session metadata."""
    session = self._sessions.get(session_id)
    info = SessionInfo(
        session_id=session_id,
        bridge_type=self.bridge_type,
        model=self._model,
        message_count=len(session.messages) if session else 0,
    )
    if session:
        info.created_at = session.created_at
    return info

openai

OpenAI chat bridge — session-aware OpenAI/ChatGPT adapter.

Supports any OpenAI-compatible API (OpenAI, Azure OpenAI, etc.).

OpenAIChatBridge

OpenAIChatBridge(api_key: str | None = None, model: str = 'gpt-4o', base_url: str = 'https://api.openai.com', system_prompt: str = '', max_tokens: int = 2000)

Bases: ChatBridge

OpenAI Chat Completions API with per-session conversation history.

Parameters:

Name Type Description Default
api_key str | None

OpenAI API key. Falls back to OPENAI_API_KEY env.

None
model str

Model identifier (default: gpt-4o).

'gpt-4o'
base_url str

API base URL (default: OpenAI).

'https://api.openai.com'
system_prompt str

System instructions applied to all sessions.

''
max_tokens int

Default max tokens per turn.

2000
Source code in src/heddle/contrib/chatbridge/openai.py
def __init__(
    self,
    api_key: str | None = None,
    model: str = "gpt-4o",
    base_url: str = "https://api.openai.com",
    system_prompt: str = "",
    max_tokens: int = 2000,
) -> None:
    super().__init__(system_prompt=system_prompt)
    self._api_key = api_key or os.environ.get("OPENAI_API_KEY", "")
    self._model = model
    self._max_tokens = max_tokens
    self._client = httpx.AsyncClient(
        base_url=base_url,
        headers={
            "Authorization": f"Bearer {self._api_key}",
            "Content-Type": "application/json",
        },
        timeout=120.0,
    )

send_turn async

send_turn(message: str, context: dict[str, Any], session_id: str) -> ChatResponse

Send a turn via OpenAI Chat Completions, accumulating history.

Source code in src/heddle/contrib/chatbridge/openai.py
async def send_turn(
    self,
    message: str,
    context: dict[str, Any],
    session_id: str,
) -> ChatResponse:
    """Send a turn via OpenAI Chat Completions, accumulating history."""
    session = self._get_or_create_session(session_id)
    session.messages.append({"role": "user", "content": message})

    # Build messages array with system prompt prepended.
    api_messages: list[dict[str, str]] = []
    sys_prompt = session.system_prompt or self._system_prompt
    if sys_prompt:
        api_messages.append({"role": "system", "content": sys_prompt})
    api_messages.extend(session.messages)

    body: dict[str, Any] = {
        "model": self._model,
        "messages": api_messages,
        "max_tokens": self._max_tokens,
    }

    resp = await self._client.post("/v1/chat/completions", json=body)
    resp.raise_for_status()
    data = resp.json()

    choice = data.get("choices", [{}])[0]
    content = choice.get("message", {}).get("content", "")

    # Append assistant response to session history.
    session.messages.append({"role": "assistant", "content": content})

    usage = data.get("usage", {})
    return ChatResponse(
        content=content,
        model=data.get("model", self._model),
        token_usage={
            "prompt_tokens": usage.get("prompt_tokens", 0),
            "completion_tokens": usage.get("completion_tokens", 0),
        },
        stop_reason=choice.get("finish_reason"),
        session_id=session_id,
    )

get_session_info async

get_session_info(session_id: str) -> SessionInfo

Return session metadata.

Source code in src/heddle/contrib/chatbridge/openai.py
async def get_session_info(self, session_id: str) -> SessionInfo:
    """Return session metadata."""
    session = self._sessions.get(session_id)
    info = SessionInfo(
        session_id=session_id,
        bridge_type=self.bridge_type,
        model=self._model,
        message_count=len(session.messages) if session else 0,
    )
    if session:
        info.created_at = session.created_at
    return info

ollama

Ollama chat bridge — session-aware local model adapter.

Wraps the Ollama /api/chat endpoint with per-session conversation history for multi-turn local model interactions.

OllamaChatBridge

OllamaChatBridge(model: str = 'llama3.2:3b', base_url: str = 'http://localhost:11434', system_prompt: str = '', max_tokens: int = 2000)

Bases: ChatBridge

Ollama chat API with per-session conversation history.

Parameters:

Name Type Description Default
model str

Ollama model name (default: llama3.2:3b).

'llama3.2:3b'
base_url str

Ollama server URL (default: http://localhost:11434).

'http://localhost:11434'
system_prompt str

System instructions applied to all sessions.

''
max_tokens int

Default max tokens per turn (num_predict).

2000
Source code in src/heddle/contrib/chatbridge/ollama.py
def __init__(
    self,
    model: str = "llama3.2:3b",
    base_url: str = "http://localhost:11434",
    system_prompt: str = "",
    max_tokens: int = 2000,
) -> None:
    super().__init__(system_prompt=system_prompt)
    self._model = model
    self._max_tokens = max_tokens
    self._client = httpx.AsyncClient(
        base_url=base_url,
        timeout=120.0,
    )

send_turn async

send_turn(message: str, context: dict[str, Any], session_id: str) -> ChatResponse

Send a turn via Ollama /api/chat, accumulating history.

Source code in src/heddle/contrib/chatbridge/ollama.py
async def send_turn(
    self,
    message: str,
    context: dict[str, Any],
    session_id: str,
) -> ChatResponse:
    """Send a turn via Ollama /api/chat, accumulating history."""
    session = self._get_or_create_session(session_id)
    session.messages.append({"role": "user", "content": message})

    # Build messages array with system prompt prepended.
    api_messages: list[dict[str, str]] = []
    sys_prompt = session.system_prompt or self._system_prompt
    if sys_prompt:
        api_messages.append({"role": "system", "content": sys_prompt})
    api_messages.extend(session.messages)

    body: dict[str, Any] = {
        "model": self._model,
        "messages": api_messages,
        "stream": False,
        "options": {"num_predict": self._max_tokens},
    }

    resp = await self._client.post("/api/chat", json=body)
    resp.raise_for_status()
    data = resp.json()

    content = data.get("message", {}).get("content", "")

    # Append assistant response to session history.
    session.messages.append({"role": "assistant", "content": content})

    # Ollama provides token counts differently.
    prompt_tokens = data.get("prompt_eval_count", 0)
    completion_tokens = data.get("eval_count", 0)

    return ChatResponse(
        content=content,
        model=data.get("model", self._model),
        token_usage={
            "prompt_tokens": prompt_tokens,
            "completion_tokens": completion_tokens,
        },
        stop_reason="stop" if data.get("done") else None,
        session_id=session_id,
    )

get_session_info async

get_session_info(session_id: str) -> SessionInfo

Return session metadata.

Source code in src/heddle/contrib/chatbridge/ollama.py
async def get_session_info(self, session_id: str) -> SessionInfo:
    """Return session metadata."""
    session = self._sessions.get(session_id)
    info = SessionInfo(
        session_id=session_id,
        bridge_type=self.bridge_type,
        model=self._model,
        message_count=len(session.messages) if session else 0,
    )
    if session:
        info.created_at = session.created_at
    return info

manual

Manual chat bridge — human-in-the-loop adapter.

Allows a human participant to join a council discussion or other multi-agent flow. Two modes:

1. **Callback mode** — provide an ``on_prompt`` async callable that
   receives the context and returns a response string.
2. **Queue mode** — prompts are put onto an ``asyncio.Queue``, and
   responses are awaited from a separate response queue.

Both modes enforce a timeout to prevent indefinite blocking.

ManualChatBridge

ManualChatBridge(on_prompt: Callable[[str, dict, str], Awaitable[str]] | None = None, prompt_queue: Queue | None = None, response_queue: Queue | None = None, timeout_seconds: float = 300.0, system_prompt: str = '')

Bases: ChatBridge

Human-in-the-loop chat bridge.

Parameters:

Name Type Description Default
on_prompt Callable[[str, dict, str], Awaitable[str]] | None

Async callback (message, context, session_id) -> str. If provided, this is called for each turn.

None
prompt_queue Queue | None

Queue where prompts are put for external consumption.

None
response_queue Queue | None

Queue where responses are expected.

None
timeout_seconds float

Max time to wait for a human response.

300.0
system_prompt str

System instructions (informational for the human).

''
Source code in src/heddle/contrib/chatbridge/manual.py
def __init__(
    self,
    on_prompt: Callable[[str, dict, str], Awaitable[str]] | None = None,
    prompt_queue: asyncio.Queue | None = None,
    response_queue: asyncio.Queue | None = None,
    timeout_seconds: float = 300.0,
    system_prompt: str = "",
) -> None:
    super().__init__(system_prompt=system_prompt)
    self._on_prompt = on_prompt
    self._prompt_queue = prompt_queue
    self._response_queue = response_queue
    self._timeout = timeout_seconds

send_turn async

send_turn(message: str, context: dict[str, Any], session_id: str) -> ChatResponse

Request a human response for this turn.

Source code in src/heddle/contrib/chatbridge/manual.py
async def send_turn(
    self,
    message: str,
    context: dict[str, Any],
    session_id: str,
) -> ChatResponse:
    """Request a human response for this turn."""
    session = self._get_or_create_session(session_id)
    session.messages.append({"role": "system", "content": message})

    if self._on_prompt is not None:
        content = await asyncio.wait_for(
            self._on_prompt(message, context, session_id),
            timeout=self._timeout,
        )
    elif self._prompt_queue is not None and self._response_queue is not None:
        await self._prompt_queue.put(
            {
                "message": message,
                "context": context,
                "session_id": session_id,
            }
        )
        content = await asyncio.wait_for(
            self._response_queue.get(),
            timeout=self._timeout,
        )
    else:
        msg = "ManualChatBridge needs either on_prompt or both prompt_queue and response_queue"
        raise ValueError(msg)

    session.messages.append({"role": "human", "content": content})

    return ChatResponse(
        content=content,
        model="human",
        token_usage={},
        stop_reason="human_input",
        session_id=session_id,
    )

get_session_info async

get_session_info(session_id: str) -> SessionInfo

Return session metadata.

Source code in src/heddle/contrib/chatbridge/manual.py
async def get_session_info(self, session_id: str) -> SessionInfo:
    """Return session metadata."""
    session = self._sessions.get(session_id)
    return SessionInfo(
        session_id=session_id,
        bridge_type=self.bridge_type,
        model="human",
        message_count=len(session.messages) if session else 0,
        created_at=session.created_at if session else None,
    )

worker

ChatBridgeBackend — wraps a ChatBridge as a Heddle ProcessingBackend.

This enables any ChatBridge adapter to be used as a standard Heddle worker via YAML config alone, without writing Python code::

name: "external_gpt4"
processing_backend: "heddle.contrib.chatbridge.worker.ChatBridgeBackend"
processing_config:
  bridge_class: "heddle.contrib.chatbridge.openai.OpenAIChatBridge"
  model: "gpt-4o"
  api_key_env: "OPENAI_API_KEY"

ChatBridgeBackend

ChatBridgeBackend(**config: Any)

Bases: SyncProcessingBackend

ProcessingBackend that delegates to a ChatBridge adapter.

The bridge class is dynamically imported from config, enabling YAML-only configuration of external chat agents.

Config keys

bridge_class: Dotted import path to a ChatBridge subclass. api_key_env: Optional env var name for API key. system_prompt: Optional system prompt for the bridge. **kwargs: Passed to the bridge constructor.

Source code in src/heddle/contrib/chatbridge/worker.py
def __init__(self, **config: Any) -> None:
    super().__init__(serialize_writes=False)
    self._config = config
    self._bridge = self._create_bridge(config)

process_sync

process_sync(payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]

Not used — we override process() for async bridge calls.

Source code in src/heddle/contrib/chatbridge/worker.py
def process_sync(
    self,
    payload: dict[str, Any],
    config: dict[str, Any],
) -> dict[str, Any]:
    """Not used — we override process() for async bridge calls."""
    msg = "ChatBridgeBackend.process_sync should not be called directly"
    raise NotImplementedError(msg)

process async

process(payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]

Delegate to the bridge's send_turn method.

Expects payload to contain
  • message or the entire payload as the message
  • _session_id (optional, defaults to "default")
  • _context (optional, additional context dict)
Source code in src/heddle/contrib/chatbridge/worker.py
async def process(
    self,
    payload: dict[str, Any],
    config: dict[str, Any],
) -> dict[str, Any]:
    """Delegate to the bridge's send_turn method.

    Expects payload to contain:
        - ``message`` or the entire payload as the message
        - ``_session_id`` (optional, defaults to "default")
        - ``_context`` (optional, additional context dict)
    """
    session_id = payload.get("_session_id", "default")
    context = payload.get("_context", {})

    # Use 'message' field if present, otherwise serialize full payload.
    import json

    message = payload.get("message")
    if message is None:
        # Remove internal fields before serializing.
        clean = {k: v for k, v in payload.items() if not k.startswith("_")}
        message = json.dumps(clean, ensure_ascii=False, indent=2)

    try:
        response = await self._bridge.send_turn(message, context, session_id)
    except Exception as e:
        msg = f"ChatBridge send_turn failed: {e}"
        raise ChatBridgeBackendError(msg) from e

    return {
        "output": {
            "content": response.content,
            "model": response.model,
            "session_id": response.session_id,
        },
        "model_used": response.model,
        "token_usage": response.token_usage,
    }

Valkey/Redis Store

Production checkpoint store using Redis/Valkey. Replaces the default in-memory store for persistent orchestrator checkpoints.

store

Valkey-backed checkpoint store.

Production implementation of CheckpointStore using redis.asyncio (redis-py). The redis-py client library works unchanged with Valkey. Install with: pip install heddle-ai[redis]

Connection defaults

redis://redis:6379 — matches the Docker Compose / k8s service name. For local dev: redis://localhost:6379

RedisCheckpointStore

RedisCheckpointStore(redis_url: str = 'redis://redis:6379')

Bases: CheckpointStore

Valkey-backed checkpoint store (via redis-py client).

Thin wrapper around redis.asyncio that implements the CheckpointStore interface. Handles connection lifecycle and TTL-based expiry natively. The redis-py client works unchanged with Valkey.

Source code in src/heddle/contrib/redis/store.py
def __init__(self, redis_url: str = "redis://redis:6379") -> None:
    self._redis = redis.from_url(redis_url)

set async

set(key: str, value: str, ttl_seconds: int | None = None) -> None

Store a value with optional TTL.

Source code in src/heddle/contrib/redis/store.py
async def set(self, key: str, value: str, ttl_seconds: int | None = None) -> None:
    """Store a value with optional TTL."""
    if ttl_seconds:
        await self._redis.set(key, value, ex=ttl_seconds)
    else:
        await self._redis.set(key, value)

get async

get(key: str) -> str | None

Retrieve a value by key.

Source code in src/heddle/contrib/redis/store.py
async def get(self, key: str) -> str | None:
    """Retrieve a value by key."""
    result = await self._redis.get(key)
    if result is None:
        return None
    # redis.asyncio returns bytes by default
    if isinstance(result, bytes):
        return result.decode()
    return result

DuckDB Query Backend

Action-dispatch query backend for DuckDB. Supports full-text search, filtering, statistics, single-row get, and vector similarity search.

query_backend

Generic DuckDB query and analytics backend for Heddle workflows.

Provides a configurable action-dispatch query backend against any DuckDB table. Supports full-text search (via DuckDB FTS), attribute filtering, aggregate statistics, single-record retrieval, and vector similarity search.

Subclasses configure domain-specific behavior by passing constructor parameters (table name, columns, filter definitions, etc.) rather than overriding methods. For advanced customisation, override _get_handlers to add or replace action handlers.

Example worker config::

processing_backend: "myapp.backends.MyQueryBackend"
backend_config:
  db_path: "/tmp/workspace/data.duckdb"
See Also

heddle.worker.processor.SyncProcessingBackend -- base class for sync backends heddle.contrib.duckdb.DuckDBViewTool -- LLM-callable view tool heddle.contrib.duckdb.DuckDBVectorTool -- LLM-callable vector search tool

DuckDBQueryError

Bases: BackendError

Raised when a DuckDB query operation fails.

Wraps underlying DuckDB exceptions with a descriptive message and the original cause attached via __cause__.

DuckDBQueryBackend

DuckDBQueryBackend(db_path: str = '/tmp/workspace/data.duckdb', *, table_name: str = 'documents', result_columns: list[str] | None = None, json_columns: set[str] | None = None, id_column: str = 'id', full_text_column: str | None = 'full_text', fts_fields: str = 'full_text,summary', filter_fields: dict[str, str] | None = None, stats_groups: set[str] | None = None, stats_aggregates: list[str] | None = None, default_order_by: str = 'rowid', embedding_column: str = 'embedding')

Bases: SyncProcessingBackend

Generic action-dispatch query backend for DuckDB tables.

Opens a read-only connection to the DuckDB database and dispatches to the appropriate query handler based on the action field in the payload.

All queries use parameterized statements to prevent SQL injection. Results from search/filter actions exclude large content columns (configurable via full_text_column) to keep messages small.

Parameters:

Name Type Description Default
db_path str

Path to the DuckDB database file.

'/tmp/workspace/data.duckdb'
table_name str

Table to query.

'documents'
result_columns list[str] | None

Columns returned in search/filter results.

None
json_columns set[str] | None

Set of column names containing JSON strings that should be parsed back into Python objects on read.

None
id_column str

Primary key column name for the get action.

'id'
full_text_column str | None

Large content column included only in get results. Set to None if no such column exists.

'full_text'
fts_fields str

Comma-separated field names for DuckDB FTS match_bm25 calls (e.g. "content,summary").

'full_text,summary'
filter_fields dict[str, str] | None

Mapping of payload field names to SQL condition templates. Example: {"min_pages": "page_count >= ?"}. Each key is checked in the payload; if present, its SQL template is added to the WHERE clause.

None
stats_groups set[str] | None

Set of column names allowed as group_by values for the stats action.

None
stats_aggregates list[str] | None

SQL aggregate expressions for the stats query. Defaults to ["COUNT(*) AS record_count"].

None
default_order_by str

ORDER BY clause for filter results.

'rowid'
embedding_column str

Column name for vector embeddings used in the vector_search action.

'embedding'
Source code in src/heddle/contrib/duckdb/query_backend.py
def __init__(
    self,
    db_path: str = "/tmp/workspace/data.duckdb",
    *,
    table_name: str = "documents",
    result_columns: list[str] | None = None,
    json_columns: set[str] | None = None,
    id_column: str = "id",
    full_text_column: str | None = "full_text",
    fts_fields: str = "full_text,summary",
    filter_fields: dict[str, str] | None = None,
    stats_groups: set[str] | None = None,
    stats_aggregates: list[str] | None = None,
    default_order_by: str = "rowid",
    embedding_column: str = "embedding",
) -> None:
    self.db_path = Path(db_path)
    self.table_name = table_name
    self.result_columns = result_columns or ["id"]
    self.json_columns = json_columns or set()
    self.id_column = id_column
    self.full_text_column = full_text_column
    self.fts_fields = fts_fields
    self.filter_fields = filter_fields or {}
    self.stats_groups = stats_groups or set()
    self.stats_aggregates = stats_aggregates or ["COUNT(*) AS record_count"]
    self.default_order_by = default_order_by
    self.embedding_column = embedding_column

process_sync

process_sync(payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]

Dispatch a query action against the DuckDB database.

Parameters:

Name Type Description Default
payload dict[str, Any]

Must contain action (str). Additional fields depend on the action type.

required
config dict[str, Any]

Worker config dict. May include db_path to override the constructor default.

required

Returns:

Type Description
dict[str, Any]

A dict with "output" (query results) and

dict[str, Any]

"model_used" (always "duckdb").

Raises:

Type Description
ValueError

If the action is unknown.

DuckDBQueryError

If the database query fails.

Source code in src/heddle/contrib/duckdb/query_backend.py
def process_sync(self, payload: dict[str, Any], config: dict[str, Any]) -> dict[str, Any]:
    """Dispatch a query action against the DuckDB database.

    Args:
        payload: Must contain ``action`` (str). Additional fields
            depend on the action type.
        config: Worker config dict. May include ``db_path`` to
            override the constructor default.

    Returns:
        A dict with ``"output"`` (query results) and
        ``"model_used"`` (always ``"duckdb"``).

    Raises:
        ValueError: If the action is unknown.
        DuckDBQueryError: If the database query fails.
    """
    db_path = config.get("db_path", str(self.db_path))
    action = payload.get("action", "")

    handlers = self._get_handlers()

    handler = handlers.get(action)
    if not handler:
        raise ValueError(f"Unknown action '{action}'. Supported: {', '.join(handlers.keys())}")

    try:
        conn = duckdb.connect(db_path, read_only=True)
        try:
            # Load FTS extension for search queries.
            conn.execute("LOAD fts")
            result = handler(conn, payload)
        finally:
            conn.close()
    except (ValueError, DuckDBQueryError):
        raise
    except Exception as exc:
        raise DuckDBQueryError(f"Query failed (action={action}): {exc}") from exc

    return {"output": result, "model_used": "duckdb"}

DuckDB View Tool

Read-only DuckDB view exposed as an LLM-callable tool. Workers can query structured data during processing.

view_tool

DuckDB view tool — exposes a DuckDB view as an LLM-callable tool.

When configured in a worker's knowledge_silos, this tool lets the LLM query a read-only DuckDB view during reasoning. The LLM can search (full-text) or list records from the view.

Example knowledge_silos config::

knowledge_silos:
  - name: "catalog"
    type: "tool"
    provider: "heddle.contrib.duckdb.DuckDBViewTool"
    config:
      db_path: "/tmp/workspace/data.duckdb"
      view_name: "summaries"
      description: "Search and browse record summaries"
      max_results: 20

The tool auto-introspects the view's columns via DESCRIBE to build its JSON Schema definition. Queries use parameterized SQL to prevent injection.

DuckDBViewTool

DuckDBViewTool(db_path: str, view_name: str, description: str = 'Query a database view', max_results: int = 20)

Bases: SyncToolProvider

Expose a DuckDB view as an LLM-callable search/list tool.

The tool dynamically introspects the view's column schema at instantiation time and builds a JSON Schema tool definition that the LLM can call.

Supports two operations
  • search: Full-text ILIKE search across all text columns
  • list: List recent records with optional column filters

All queries are parameterized and results are capped at max_results.

Source code in src/heddle/contrib/duckdb/view_tool.py
def __init__(
    self,
    db_path: str,
    view_name: str,
    description: str = "Query a database view",
    max_results: int = 20,
) -> None:
    self.db_path = db_path
    self.view_name = view_name
    self.description = description
    self.max_results = max_results
    self._columns: list[dict[str, str]] = []
    self._introspect()

get_definition

get_definition() -> dict[str, Any]

Build JSON Schema tool definition from view columns.

Source code in src/heddle/contrib/duckdb/view_tool.py
def get_definition(self) -> dict[str, Any]:
    """Build JSON Schema tool definition from view columns."""
    # Build filterable columns (non-text columns that might be useful for filtering)
    filter_props: dict[str, Any] = {}
    for col in self._columns:
        col_type = col["type"].upper()
        if col_type in ("VARCHAR", "TEXT"):
            filter_props[col["name"]] = {"type": "string"}
        elif col_type in ("INTEGER", "BIGINT", "SMALLINT", "TINYINT"):
            filter_props[col["name"]] = {"type": "integer"}
        elif col_type in ("DOUBLE", "FLOAT", "DECIMAL"):
            filter_props[col["name"]] = {"type": "number"}
        elif col_type == "BOOLEAN":
            filter_props[col["name"]] = {"type": "boolean"}

    return {
        "name": f"query_{self.view_name}",
        "description": self.description,
        "parameters": {
            "type": "object",
            "properties": {
                "operation": {
                    "type": "string",
                    "enum": ["search", "list"],
                    "description": "search: full-text search; list: browse records",
                },
                "query": {
                    "type": "string",
                    "description": "Search query (for search operation)",
                },
                "limit": {
                    "type": "integer",
                    "description": (
                        f"Max results to return (default: 10, max: {self.max_results})"
                    ),
                },
                "filters": {
                    "type": "object",
                    "description": "Column filters (for list operation)",
                    "properties": filter_props,
                },
            },
            "required": ["operation"],
        },
    }

execute_sync

execute_sync(arguments: dict[str, Any]) -> str

Execute a query against the DuckDB view.

Source code in src/heddle/contrib/duckdb/view_tool.py
def execute_sync(self, arguments: dict[str, Any]) -> str:
    """Execute a query against the DuckDB view."""
    operation = arguments.get("operation", "list")
    limit = min(arguments.get("limit", 10), self.max_results)

    try:
        conn = duckdb.connect(self.db_path, read_only=True)
        try:
            if operation == "search":
                result = self._search(conn, arguments, limit)
            else:
                result = self._list(conn, arguments, limit)
        finally:
            conn.close()
    except Exception as e:
        return json.dumps({"error": str(e)})

    return json.dumps(result, default=str)

DuckDB Vector Tool

Semantic similarity search via DuckDB embeddings, exposed as an LLM tool.

vector_tool

DuckDB vector similarity search tool for LLM function-calling.

Uses embedding vectors stored in DuckDB to find semantically similar records. Query text is embedded via Ollama at search time, then compared against stored vectors using DuckDB's list_cosine_similarity.

Example knowledge_silos config::

knowledge_silos:
  - name: "similar_items"
    type: "tool"
    provider: "heddle.contrib.duckdb.DuckDBVectorTool"
    config:
      db_path: "/tmp/workspace/data.duckdb"
      table_name: "documents"
      result_columns: ["id", "title", "summary", "created_at"]
      embedding_column: "embedding"
      tool_name: "find_similar"
      description: "Find records semantically similar to a query"
      embedding_model: "nomic-embed-text"
See Also

heddle.worker.embeddings -- OllamaEmbeddingProvider heddle.worker.tools -- SyncToolProvider base class

DuckDBVectorTool

DuckDBVectorTool(db_path: str, table_name: str = 'documents', result_columns: list[str] | None = None, embedding_column: str = 'embedding', tool_name: str = 'find_similar', description: str = 'Find semantically similar records', embedding_model: str = 'nomic-embed-text', ollama_url: str | None = None, max_results: int = 10)

Bases: SyncToolProvider

Semantic similarity search over DuckDB vector embeddings.

Generates a query embedding via Ollama, then uses DuckDB's list_cosine_similarity function to find the most similar records by their stored embedding vectors.

Only records with non-null embeddings are searched.

Parameters:

Name Type Description Default
db_path str

Path to the DuckDB database file.

required
table_name str

Table containing the records and embeddings.

'documents'
result_columns list[str] | None

Columns to include in results. If None, introspects the table schema at first use, excluding the embedding column and any column named full_text.

None
embedding_column str

Name of the column storing embedding vectors.

'embedding'
tool_name str

Name exposed in the LLM tool definition.

'find_similar'
description str

Description exposed in the LLM tool definition.

'Find semantically similar records'
embedding_model str

Ollama model name for embedding generation.

'nomic-embed-text'
ollama_url str | None

Optional custom Ollama server URL.

None
max_results int

Hard cap on returned results.

10
Source code in src/heddle/contrib/duckdb/vector_tool.py
def __init__(
    self,
    db_path: str,
    table_name: str = "documents",
    result_columns: list[str] | None = None,
    embedding_column: str = "embedding",
    tool_name: str = "find_similar",
    description: str = "Find semantically similar records",
    embedding_model: str = "nomic-embed-text",
    ollama_url: str | None = None,
    max_results: int = 10,
) -> None:
    self.db_path = db_path
    self.table_name = table_name
    self._result_columns = result_columns
    self.embedding_column = embedding_column
    self.tool_name = tool_name
    self.description = description
    self.embedding_model = embedding_model
    self.ollama_url = ollama_url
    self.max_results = max_results

result_columns property

result_columns: list[str]

Return result columns, introspecting on first access if needed.

get_definition

get_definition() -> dict[str, Any]

Return tool definition for LLM function-calling.

Source code in src/heddle/contrib/duckdb/vector_tool.py
def get_definition(self) -> dict[str, Any]:
    """Return tool definition for LLM function-calling."""
    return {
        "name": self.tool_name,
        "description": self.description,
        "parameters": {
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "Natural language query to find similar records",
                },
                "limit": {
                    "type": "integer",
                    "description": f"Max results (default: 5, max: {self.max_results})",
                },
            },
            "required": ["query"],
        },
    }

execute_sync

execute_sync(arguments: dict[str, Any]) -> str

Embed the query and search for similar records.

Source code in src/heddle/contrib/duckdb/vector_tool.py
def execute_sync(self, arguments: dict[str, Any]) -> str:
    """Embed the query and search for similar records."""
    query = arguments.get("query", "")
    limit = min(arguments.get("limit", 5), self.max_results)

    if not query.strip():
        return json.dumps({"results": [], "total": 0})

    # Generate query embedding via Ollama.
    query_embedding = self._embed_query(query)
    if query_embedding is None:
        return json.dumps({"error": "Failed to generate query embedding"})

    try:
        conn = duckdb.connect(self.db_path, read_only=True)
        try:
            result = self._similarity_search(conn, query_embedding, limit)
        finally:
            conn.close()
    except Exception as e:
        return json.dumps({"error": str(e)})

    return json.dumps(result, default=str)

LanceDB Vector Store

ANN vector storage and search via LanceDB. Faster than DuckDB for large datasets. Implements the VectorStore ABC.

store

LanceDB-backed vector store for embedded text chunks.

Stores EmbeddedChunk records in a LanceDB table with native vector columns. Supports: - Batch insertion of TextChunk objects (with embedding generation) - Pre-embedded chunk insertion - Approximate Nearest Neighbor (ANN) similarity search - Metadata filtering (e.g. by channel_id) - Basic CRUD (get, delete by chunk_id)

Uses Heddle's OllamaEmbeddingProvider for query embedding generation.

LanceDB provides ANN indexing for faster search over large datasets compared to exact cosine similarity in DuckDB.

LanceDBVectorStore

LanceDBVectorStore(db_path: str = '/tmp/rag-vectors.lance', embedding_model: str = 'nomic-embed-text', ollama_url: str = 'http://localhost:11434')

Bases: VectorStore

Embedded vector store backed by LanceDB.

Usage::

store = LanceDBVectorStore("/tmp/rag-vectors.lance")
store.initialize()

# Embed and store chunks
store.add_chunks(chunks)

# Search
results = store.search("earthquake damage", limit=5)

store.close()
Source code in src/heddle/contrib/lancedb/store.py
def __init__(
    self,
    db_path: str = "/tmp/rag-vectors.lance",
    embedding_model: str = "nomic-embed-text",
    ollama_url: str = "http://localhost:11434",
) -> None:
    self.db_path = Path(db_path)
    self.embedding_model = embedding_model
    self.ollama_url = ollama_url
    self._db: Any = None
    self._table: Any = None
    self._embedding_dim: int | None = None

initialize

initialize() -> LanceDBVectorStore

Open or create the LanceDB database and table.

Source code in src/heddle/contrib/lancedb/store.py
def initialize(self) -> LanceDBVectorStore:
    """Open or create the LanceDB database and table."""
    import lancedb

    self.db_path.parent.mkdir(parents=True, exist_ok=True)
    self._db = lancedb.connect(str(self.db_path))

    # Check if table already exists
    if self.TABLE_NAME in self._db.list_tables():
        self._table = self._db.open_table(self.TABLE_NAME)
    else:
        self._table = None  # Created on first insert (need schema from data)

    logger.info("Initialized LanceDB vector store at %s", self.db_path)
    return self

close

close() -> None

Close the database connection.

Source code in src/heddle/contrib/lancedb/store.py
def close(self) -> None:
    """Close the database connection."""
    self._table = None
    self._db = None

add_chunks

add_chunks(chunks: list[TextChunk], batch_size: int = 64) -> int

Embed and insert TextChunk objects. Returns count of inserted rows.

Source code in src/heddle/contrib/lancedb/store.py
def add_chunks(  # pragma: no cover
    self,
    chunks: list[TextChunk],
    batch_size: int = 64,
) -> int:
    """Embed and insert TextChunk objects. Returns count of inserted rows."""
    if not chunks:
        return 0

    total_inserted = 0
    for i in range(0, len(chunks), batch_size):
        batch = chunks[i : i + batch_size]
        texts = [c.text for c in batch]

        try:
            embeddings = self._embed_texts(texts)
        except Exception as exc:
            logger.error("Embedding batch %d failed: %s", i // batch_size, exc)
            continue

        records = []
        for chunk, emb in zip(batch, embeddings, strict=False):
            records.append(
                {
                    "chunk_id": chunk.chunk_id,
                    "source_global_id": chunk.source_global_id,
                    "source_channel_id": chunk.source_channel_id,
                    "source_channel_name": chunk.source_channel_name,
                    "text": chunk.text,
                    "char_start": chunk.char_start,
                    "char_end": chunk.char_end,
                    "chunk_index": chunk.chunk_index,
                    "total_chunks": chunk.total_chunks,
                    "strategy": chunk.strategy.value
                    if hasattr(chunk.strategy, "value")
                    else str(chunk.strategy),
                    "timestamp_unix": chunk.timestamp_unix,
                    "vector": emb,
                    "embedding_model": self.embedding_model,
                    "embedding_dim": len(emb),
                }
            )

        try:
            created = self._ensure_table(records)
            if not created and self._table is not None and records:
                self._table.add(records)
            total_inserted += len(records)
        except Exception as exc:
            logger.warning("Insert batch %d failed: %s", i // batch_size, exc)

    logger.info("Inserted %d / %d chunks into %s", total_inserted, len(chunks), self.TABLE_NAME)
    return total_inserted

add_embedded_chunks

add_embedded_chunks(chunks: list[EmbeddedChunk]) -> int

Insert pre-embedded chunks (no embedding generation needed).

Source code in src/heddle/contrib/lancedb/store.py
def add_embedded_chunks(self, chunks: list[EmbeddedChunk]) -> int:
    """Insert pre-embedded chunks (no embedding generation needed)."""
    if not chunks:
        return 0

    records = [
        {
            "chunk_id": ec.chunk_id,
            "source_global_id": ec.source_global_id,
            "source_channel_id": ec.source_channel_id,
            "text": ec.text,
            "vector": ec.embedding,
            "embedding_model": ec.model,
            "embedding_dim": ec.dimensions,
            "source_channel_name": "",
            "char_start": 0,
            "char_end": 0,
            "chunk_index": 0,
            "total_chunks": 1,
            "strategy": "sentence",
            "timestamp_unix": 0,
        }
        for ec in chunks
    ]

    try:
        created = self._ensure_table(records)
        if not created and self._table is not None:
            self._table.add(records)
        return len(records)
    except Exception as exc:
        logger.warning("Insert pre-embedded chunks failed: %s", exc)
        return 0

search

search(query: str, limit: int = 10, min_score: float = 0.0, channel_ids: list[int] | None = None) -> list[SimilarityResult]

Semantic similarity search using LanceDB ANN.

Parameters:

Name Type Description Default
query str

Natural language query (embedded via Ollama)

required
limit int

Maximum results to return

10
min_score float

Minimum cosine similarity threshold

0.0
channel_ids list[int] | None

Optional filter by source channel

None

Returns:

Type Description
list[SimilarityResult]

List of SimilarityResult sorted by descending similarity

Source code in src/heddle/contrib/lancedb/store.py
def search(  # pragma: no cover
    self,
    query: str,
    limit: int = 10,
    min_score: float = 0.0,
    channel_ids: list[int] | None = None,
) -> list[SimilarityResult]:
    """
    Semantic similarity search using LanceDB ANN.

    Args:
        query:       Natural language query (embedded via Ollama)
        limit:       Maximum results to return
        min_score:   Minimum cosine similarity threshold
        channel_ids: Optional filter by source channel

    Returns:
        List of SimilarityResult sorted by descending similarity
    """
    if self._table is None:
        return []

    embeddings = self._embed_texts([query])
    if not embeddings:
        return []

    query_emb = embeddings[0]

    search_query = self._table.search(query_emb, vector_column_name="vector").limit(limit)

    if channel_ids:
        filter_expr = " OR ".join(f"source_channel_id = {cid}" for cid in channel_ids)
        search_query = search_query.where(f"({filter_expr})")

    try:
        raw_results = search_query.to_list()
    except Exception as exc:
        logger.error("LanceDB search failed: %s", exc)
        return []

    results: list[SimilarityResult] = []
    for row in raw_results:
        # LanceDB returns _distance (L2) by default; for cosine metric
        # it returns 1 - cosine_similarity, so score = 1 - _distance
        distance = row.get("_distance", 1.0)
        score = max(0.0, 1.0 - distance)

        if score < min_score:
            continue

        results.append(
            SimilarityResult(
                chunk_id=row["chunk_id"],
                text=row["text"],
                score=score,
                source_channel_id=row["source_channel_id"],
                source_global_id=row["source_global_id"],
                metadata={
                    "source_channel_name": row.get("source_channel_name", ""),
                    "timestamp_unix": row.get("timestamp_unix", 0),
                    "strategy": row.get("strategy", ""),
                },
            )
        )

    return results

count

count() -> int

Return total number of stored chunks.

Source code in src/heddle/contrib/lancedb/store.py
def count(self) -> int:
    """Return total number of stored chunks."""
    if self._table is None:
        return 0
    return self._table.count_rows()

get

get(chunk_id: str) -> EmbeddedChunk | None

Retrieve a single embedded chunk by ID.

Source code in src/heddle/contrib/lancedb/store.py
def get(self, chunk_id: str) -> EmbeddedChunk | None:
    """Retrieve a single embedded chunk by ID."""
    if self._table is None:
        return None

    try:
        results = self._table.search().where(f"chunk_id = '{chunk_id}'").limit(1).to_list()
    except Exception:
        return None

    if not results:
        return None

    row = results[0]
    return EmbeddedChunk(
        chunk_id=row["chunk_id"],
        source_global_id=row["source_global_id"],
        source_channel_id=row["source_channel_id"],
        text=row["text"],
        embedding=list(row.get("vector", [])),
        model=row.get("embedding_model", ""),
        dimensions=row.get("embedding_dim", 0),
    )

delete

delete(chunk_id: str) -> bool

Delete a chunk by ID. Returns True if a row was deleted.

Source code in src/heddle/contrib/lancedb/store.py
def delete(self, chunk_id: str) -> bool:
    """Delete a chunk by ID. Returns True if a row was deleted."""
    if self._table is None:
        return False

    before = self._table.count_rows()
    try:
        self._table.delete(f"chunk_id = '{chunk_id}'")
    except Exception as exc:
        logger.warning("Delete failed for chunk %s: %s", chunk_id, exc)
        return False
    return self._table.count_rows() < before

delete_by_source

delete_by_source(source_global_id: str) -> int

Delete all chunks for a given source post. Returns count.

Source code in src/heddle/contrib/lancedb/store.py
def delete_by_source(self, source_global_id: str) -> int:
    """Delete all chunks for a given source post. Returns count."""
    if self._table is None:
        return 0

    before = self._table.count_rows()
    try:
        self._table.delete(f"source_global_id = '{source_global_id}'")
    except Exception as exc:
        logger.warning("Delete by source failed for %s: %s", source_global_id, exc)
        return 0
    return before - self._table.count_rows()

stats

stats() -> dict[str, Any]

Return summary statistics about the store.

Source code in src/heddle/contrib/lancedb/store.py
def stats(self) -> dict[str, Any]:
    """Return summary statistics about the store."""
    if self._table is None:
        return {"total_chunks": 0}

    try:
        total = self._table.count_rows()
        if total == 0:
            return {"total_chunks": 0}

        # Get basic stats via pandas for aggregate queries
        df = self._table.to_pandas()
        return {
            "total_chunks": total,
            "unique_posts": df["source_global_id"].nunique(),
            "unique_channels": df["source_channel_id"].nunique(),
            "earliest_timestamp": int(df["timestamp_unix"].min()),
            "latest_timestamp": int(df["timestamp_unix"].max()),
            "db_path": str(self.db_path),
        }
    except Exception as exc:
        logger.warning("Stats query failed: %s", exc)
        return {"total_chunks": self.count(), "db_path": str(self.db_path)}

LanceDB Vector Tool

Semantic similarity search via LanceDB, exposed as an LLM tool.

tool

LanceDB vector similarity search tool for LLM function-calling.

Uses embedding vectors stored in LanceDB to find semantically similar records. Query text is embedded via Ollama at search time, then compared against stored vectors using LanceDB's ANN search.

Example knowledge_silos config::

knowledge_silos:
  - name: "similar_items"
    type: "tool"
    provider: "heddle.contrib.lancedb.LanceDBVectorTool"
    config:
      db_path: "/tmp/workspace/rag-vectors.lance"
      table_name: "rag_chunks"
      tool_name: "find_similar"
      description: "Find records semantically similar to a query"
      embedding_model: "nomic-embed-text"
See Also

heddle.worker.embeddings -- OllamaEmbeddingProvider heddle.worker.tools -- SyncToolProvider base class

LanceDBVectorTool

LanceDBVectorTool(db_path: str, table_name: str = 'rag_chunks', vector_column: str = 'vector', result_columns: list[str] | None = None, tool_name: str = 'find_similar', description: str = 'Find semantically similar records', embedding_model: str = 'nomic-embed-text', ollama_url: str | None = None, max_results: int = 10)

Bases: SyncToolProvider

Semantic similarity search over LanceDB vector embeddings.

Generates a query embedding via Ollama, then uses LanceDB's ANN search to find the most similar records by their stored vectors.

Parameters:

Name Type Description Default
db_path str

Path to the LanceDB database directory.

required
table_name str

Table containing the records and embeddings.

'rag_chunks'
vector_column str

Name of the column storing embedding vectors.

'vector'
result_columns list[str] | None

Columns to include in results. If None, returns chunk_id, text, source_channel_id, source_global_id.

None
tool_name str

Name exposed in the LLM tool definition.

'find_similar'
description str

Description exposed in the LLM tool definition.

'Find semantically similar records'
embedding_model str

Ollama model name for embedding generation.

'nomic-embed-text'
ollama_url str | None

Optional custom Ollama server URL.

None
max_results int

Hard cap on returned results.

10
Source code in src/heddle/contrib/lancedb/tool.py
def __init__(
    self,
    db_path: str,
    table_name: str = "rag_chunks",
    vector_column: str = "vector",
    result_columns: list[str] | None = None,
    tool_name: str = "find_similar",
    description: str = "Find semantically similar records",
    embedding_model: str = "nomic-embed-text",
    ollama_url: str | None = None,
    max_results: int = 10,
) -> None:
    self.db_path = db_path
    self.table_name = table_name
    self.vector_column = vector_column
    self._result_columns = result_columns or [
        "chunk_id",
        "text",
        "source_channel_id",
        "source_global_id",
    ]
    self.tool_name = tool_name
    self.description = description
    self.embedding_model = embedding_model
    self.ollama_url = ollama_url
    self.max_results = max_results

get_definition

get_definition() -> dict[str, Any]

Return tool definition for LLM function-calling.

Source code in src/heddle/contrib/lancedb/tool.py
def get_definition(self) -> dict[str, Any]:
    """Return tool definition for LLM function-calling."""
    return {
        "name": self.tool_name,
        "description": self.description,
        "parameters": {
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "Natural language query to find similar records",
                },
                "limit": {
                    "type": "integer",
                    "description": f"Max results (default: 5, max: {self.max_results})",
                },
            },
            "required": ["query"],
        },
    }

execute_sync

execute_sync(arguments: dict[str, Any]) -> str

Embed the query and search for similar records.

Source code in src/heddle/contrib/lancedb/tool.py
def execute_sync(self, arguments: dict[str, Any]) -> str:  # pragma: no cover
    """Embed the query and search for similar records."""
    query = arguments.get("query", "")
    limit = min(arguments.get("limit", 5), self.max_results)

    if not query.strip():
        return json.dumps({"results": [], "total": 0})

    query_embedding = self._embed_query(query)
    if query_embedding is None:
        return json.dumps({"error": "Failed to generate query embedding"})

    try:
        import lancedb

        db = lancedb.connect(self.db_path)
        if self.table_name not in db.table_names():
            return json.dumps({"results": [], "total": 0})

        table = db.open_table(self.table_name)
        raw_results = (
            table.search(query_embedding, vector_column_name=self.vector_column)
            .limit(limit)
            .to_list()
        )

        results = []
        for row in raw_results:
            record = {}
            for col in self._result_columns:
                if col in row:
                    record[col] = row[col]
            distance = row.get("_distance", 1.0)
            record["similarity"] = max(0.0, 1.0 - distance)
            results.append(record)

        return json.dumps({"results": results, "total": len(results)}, default=str)

    except Exception as e:
        return json.dumps({"error": str(e)})