
how to build a haystack pipeline node that sends and receives email
Build a custom Haystack 2.0 component that sends and receives email inside an AI pipeline. Full tutorial with Python code examples and production tips.
You have a Haystack pipeline that answers questions from your documents. It works. Someone types a query, the retriever finds relevant chunks, the LLM generates an answer.
Now you want that same pipeline to handle questions that arrive by email. An agent that reads its inbox, runs each message through your RAG pipeline, and replies with the answer. No human in the loop.
Haystack doesn't ship with email components out of the box. But the 2.0 component interface makes it straightforward to build your own. If you'd rather skip the plumbing and let your agent pinch its own inbox, and jump to the wiring section below.
What a Haystack pipeline does#
A Haystack pipeline is a directed graph of components that process data in sequence. Each component declares typed inputs and outputs, implements a run() method, and connects to other components through explicit wiring. You can chain retrievers, generators, prompt builders, and custom nodes into workflows that handle anything from RAG to document classification to multi-step reasoning.
The part that matters here: Haystack 2.0 treats every node as a pluggable component with a clean interface. A component that calls an email API slots into the pipeline graph just like a built-in retriever or generator. Same decorator, same run() contract, same connection syntax.
How to add an email node to a Haystack pipeline#
A Haystack email node is a custom component that sends or receives email within an AI pipeline. It must implement a run() method and declare typed inputs and outputs.
- Install your email dependencies (
smtplibfor raw SMTP, or an email API SDK like LobsterMail) - Create a Python class decorated with
@component - Declare
@component.output_types()for the data your node produces - Implement the
run()method with your send or receive logic - Add the node to your pipeline with
pipeline.add_component("email_node", YourNode()) - Connect it to upstream and downstream nodes with
pipeline.connect()
That's the skeleton. Let's build both sides.
Building a receive node#
The receive node fetches unread messages and outputs structured fields that downstream components can use directly. Instead of returning raw MIME data, it parses each email into clean typed outputs: body text, subject line, sender address, and a message ID for threading.
from haystack import component
import requests
@component
class EmailReceiveNode:
def __init__(self, api_key: str, inbox_id: str):
self.api_key = api_key
self.inbox_id = inbox_id
@component.output_types(
body=str, subject=str, sender=str, message_id=str
)
def run(self):
resp = requests.get(
f"https://api.lobstermail.ai/v1/inboxes/{self.inbox_id}/messages",
headers={"Authorization": f"Bearer {self.api_key}"},
params={"unread": "true", "limit": 1},
)
resp.raise_for_status()
msg = resp.json()["messages"][0]
return {
"body": msg.get("text", ""),
"subject": msg["subject"],
"sender": msg["from"],
"message_id": msg["id"],
}
Each output is a named, typed field. The body, subject, and sender feed into a prompt template. The message_id passes to the send node so replies thread correctly.
If your pipeline needs to react to emails in real time instead of polling on an interval, webhooks are the better approach. A webhook handler can trigger pipeline.run() the moment a new message arrives, cutting latency from minutes to seconds.
Tip
You can also output Haystack Document objects instead of individual fields. That's useful when feeding emails into a retriever for similarity search. For direct prompt building, individual string outputs are simpler to wire.
Building a send node#
The send node takes generated text from upstream (typically an LLM) and delivers it as a threaded email reply.
@component
class EmailSendNode:
def __init__(self, api_key: str, inbox_id: str):
self.api_key = api_key
self.inbox_id = inbox_id
@component.output_types(status=str)
def run(
self, text: str, recipient: str,
subject: str, reply_to_id: str
):
resp = requests.post(
f"https://api.lobstermail.ai/v1/inboxes/{self.inbox_id}/messages",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"to": [recipient],
"subject": f"Re: {subject}",
"body": {"text": text},
"inReplyTo": reply_to_id,
},
)
resp.raise_for_status()
return {"status": resp.json().get("status", "queued")}
The inReplyTo field threads the reply into the original conversation. The recipient sees a clean email thread instead of a disconnected new message showing up out of context.
Wiring the full pipeline#
Here's a complete pipeline that receives an email, generates a response with an LLM, and sends the reply:
from haystack import Pipeline
from haystack.components.generators import OpenAIGenerator
from haystack.components.builders import PromptBuilder
pipe = Pipeline()
pipe.add_component("email_in", EmailReceiveNode(
api_key="lm_your_key", inbox_id="inbox_abc"
))
pipe.add_component("prompt", PromptBuilder(template=(
"You received this email.\n\n"
"From: {{sender}}\nSubject: {{subject}}\n\n{{body}}\n\n"
"Write a helpful, professional reply."
)))
pipe.add_component("llm", OpenAIGenerator(model="gpt-4o"))
pipe.add_component("email_out", EmailSendNode(
api_key="lm_your_key", inbox_id="inbox_abc"
))
Wire the graph#
pipe.connect("email_in.body", "prompt.body") pipe.connect("email_in.subject", "prompt.subject") pipe.connect("email_in.sender", "prompt.sender") pipe.connect("prompt", "llm") pipe.connect("llm.replies", "email_out.text") pipe.connect("email_in.sender", "email_out.recipient") pipe.connect("email_in.subject", "email_out.subject") pipe.connect("email_in.message_id", "email_out.reply_to_id")
Run it with `pipe.run()`. The pipeline fetches the latest unread email, builds a prompt from the message content, generates a reply, and sends it back to the original sender. Four components, eight connections, zero manual intervention.
This is one of the patterns we explored in [7 things your AI agent can do with its own email](/blog/what-agents-do-with-email). An agent that monitors its inbox, understands each message, and responds on its own.
## Why an email API fits better than raw SMTP
You could build the same nodes with `smtplib` and `imaplib`. Plenty of people do. But there are practical reasons an agent-first email API is a better fit inside Haystack components.
Credential management gets simpler. Raw SMTP requires a host, port, username, password, and TLS configuration per environment. An API key is one string. When your Haystack pipeline gets serialized to YAML and redeployed across staging and production, fewer credentials means fewer things that silently break at 2 AM.
Inbox provisioning is automatic. If your agent needs a new inbox per workflow or per customer, it creates one with a single API call. No DNS records to configure, no domain verification to wait on, no admin panel to click through.
Threading and metadata arrive structured. SMTP gives you raw MIME headers. Parsing `In-Reply-To` and `References` yourself is tedious and error-prone, especially across different mail servers that format headers inconsistently. An email API returns thread IDs and parsed bodies as first-class fields, which map directly to Haystack component outputs.
## Handling errors in email nodes
Email nodes fail differently than most Haystack components. A retriever might return empty results. An email node might hit a rate limit, encounter a temporary server error, or get rejected by the recipient's mail server entirely.
Add a second output type so downstream components can react gracefully:
```python
@component.output_types(status=str, error=str)
def run(self, text: str, recipient: str, subject: str, reply_to_id: str):
try:
resp = requests.post(
f"https://api.lobstermail.ai/v1/inboxes/{self.inbox_id}/messages",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"to": [recipient],
"subject": f"Re: {subject}",
"body": {"text": text},
"inReplyTo": reply_to_id,
},
)
resp.raise_for_status()
return {"status": "queued", "error": ""}
except requests.HTTPError as e:
return {"status": "failed", "error": str(e)}
Declaring error as a typed output lets a downstream router or logger handle failures without crashing your entire pipeline. You can branch on the status value: route successful sends to a confirmation log, and route failures to a retry queue or alert system.
Running email pipelines asynchronously#
Haystack 2.0 supports async execution through AsyncPipeline. Since your email node makes network calls on every run, async execution lets the event loop schedule other work while waiting for the API response instead of blocking.
from haystack import AsyncPipeline
pipe = AsyncPipeline()
Add and connect components the same way#
result = await pipe.run_async()
This matters most when your pipeline processes batches of emails. Synchronous execution blocks on every send and receive call. Async lets multiple email operations overlap with retrieval and generation steps, which can cut total processing time significantly when you're working through a full inbox.
## Test your nodes before deploying
Test each component in isolation before wiring the full pipeline:
```python
node = EmailReceiveNode(api_key="lm_test_key", inbox_id="test_inbox")
result = node.run()
print(result["body"], result["subject"], result["sender"])
Send a test email to your inbox, run the receive node, and verify the outputs contain the right content and metadata. Then test the send node with a known recipient and confirm the reply arrives threaded correctly into the original conversation.
Once individual nodes pass, run the full pipeline with pipe.run() and trace data flow from inbox to prompt builder to LLM to outbox. Wiring mistakes (wrong output name, mismatched types) surface here, and they're much easier to fix before production than after your agent sends a garbled reply to a real customer.
Frequently asked questions
What is a Haystack pipeline node and how does it differ from a component in Haystack 2.0?
In Haystack 1.x, the standard term was "node." Haystack 2.0 replaced it with "component," which uses the @component decorator and requires explicit input/output type declarations. The underlying concept is the same: a unit of work inside a pipeline graph.
What method must a custom Haystack component implement to be usable in a pipeline?
Every component needs a run() method, the @component class decorator, and @component.output_types() to declare what data it produces. Inputs are defined as typed parameters on the run() method itself.
How do you add an email-sending component to a Haystack pipeline using add_component?
Create a class with the @component decorator, implement run() with your send logic, then call pipeline.add_component("email_out", YourSendNode()). Connect its inputs to upstream components with pipeline.connect("source.output", "email_out.input").
Can an inbound email automatically trigger a Haystack pipeline to run?
Yes. Set up a webhook listener that receives email arrival notifications and calls pipeline.run() for each new message. This is more responsive than polling. See our comparison of webhooks vs polling for agent email.
How do you convert an email's subject, body, and attachments into Haystack Document objects?
Create a Document(content=email_body, meta={"from": sender, "subject": subject}) for each message. Attachments can be base64-decoded and stored as separate Documents or added to the metadata dict depending on your downstream processing needs.
How do you connect an email receive node to a downstream retriever or generator in Haystack?
Use pipeline.connect("email_in.output_name", "next_component.input_name") for each field. Haystack 2.0 connections are explicit: you wire each named output to a specific named input on the downstream component.
What is the cleanest way to handle API credentials securely in a Haystack email node?
Load credentials from environment variables in your component's __init__ method: self.api_key = os.environ["LOBSTERMAIL_API_KEY"]. Never hardcode keys in source files or pipeline YAML. For production deployments, use a secrets manager.
How do you prevent a failing email node from crashing the rest of a Haystack pipeline?
Wrap your run() logic in a try/except block and declare an error output type alongside your normal outputs. Downstream components check the error field and decide whether to retry, log, or skip the failed message.
When should you use async pipeline execution for email workflows in Haystack?
Use AsyncPipeline when processing batches of emails or when your email API calls create noticeable latency. Async execution prevents the event loop from blocking while waiting on network responses, improving throughput for multi-message workloads.
Can a single Haystack pipeline both receive an email and send a reply within the same run?
Yes. Add both a receive node and a send node to the same pipeline graph. Wire the receive node's outputs through your processing chain (prompt builder, LLM) and into the send node's inputs. A single pipeline.run() handles the full receive-process-reply cycle.
How does an agent-first email API differ from raw SMTP when used as a Haystack pipeline node?
An email API gives you one credential (API key), structured JSON responses, built-in threading metadata, and automatic inbox provisioning. Raw SMTP requires managing host, port, TLS, MIME parsing, and DNS records. For pipeline components that get serialized and redeployed across environments, the API approach has far fewer moving parts.
How do you serialize and redeploy a Haystack pipeline that includes a custom email node?
Haystack 2.0 serializes pipelines to YAML. Your custom component must be importable from a known Python module path in the target environment. Store credentials as environment variable references rather than plaintext values in the serialized pipeline file.
How do you test an email send/receive node locally before deploying a Haystack pipeline?
Instantiate the node directly and call node.run() with test inputs. For receive nodes, send a test email to your inbox first. For send nodes, use a test recipient and verify delivery, threading, and content. Test individual nodes before running the full pipeline.
Can Haystack pipelines send emails conditionally based on retrieval confidence or LLM output?
Yes. Insert a router component between the LLM and the send node that evaluates the generated response. Route high-confidence replies to the email send node and low-confidence or fallback responses to a logging node or human review queue instead.
What Haystack 2.0 component interface should an email node follow?
Use the @component class decorator, declare outputs with @component.output_types(name=Type), and define inputs as typed run() parameters. Follow the same conventions as built-in components like PromptBuilder or OpenAIGenerator so your node works with standard pipeline tooling.


