Skip to content

pipeline

Pipeline

Bases: Generic[AgentInputT, AgentOutputT]

A pipeline for chaining multiple agents together for sequential processing.

The Pipeline class enables the creation of agent chains where data flows from one agent to the next. Each agent in the pipeline must have compatible input/output schemas, where each agent's output schema matches the next agent's input schema.

Type Parameters

AgentInputT: The type of input the pipeline accepts (e.g., str, dict, List[User]) AgentOutputT: The type of output the pipeline produces (e.g., List[str], Report)

Attributes:

Name Type Description
input_schema Type[AgentIO]

The expected schema for pipeline input data

output_schema Type[AgentIO]

The expected schema for pipeline output data

agents

List of agents in the pipeline, executed in order

output_history List[AgentOutputT]

List of outputs from each agent's execution

current_step Optional[int]

The index of the currently executing agent (1-based)

Example

from typing import List from agenty.types import User extractor = UserExtractor() # output_schema = List[User] title_agent = TitleAgent() # input_schema = List[User] pipeline = extractor | title_agent result = await pipeline.run("Some text input")

Source code in agenty/pipeline.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
class Pipeline(Generic[AgentInputT, AgentOutputT]):
    """A pipeline for chaining multiple agents together for sequential processing.

    The Pipeline class enables the creation of agent chains where data flows from one
    agent to the next. Each agent in the pipeline must have compatible input/output
    schemas, where each agent's output schema matches the next agent's input schema.

    Type Parameters:
        AgentInputT: The type of input the pipeline accepts (e.g., str, dict, List[User])
        AgentOutputT: The type of output the pipeline produces (e.g., List[str], Report)

    Attributes:
        input_schema: The expected schema for pipeline input data
        output_schema: The expected schema for pipeline output data
        agents: List of agents in the pipeline, executed in order
        output_history: List of outputs from each agent's execution
        current_step: The index of the currently executing agent (1-based)

    Example:
        >>> from typing import List
        >>> from agenty.types import User
        >>> extractor = UserExtractor()  # output_schema = List[User]
        >>> title_agent = TitleAgent()   # input_schema = List[User]
        >>> pipeline = extractor | title_agent
        >>> result = await pipeline.run("Some text input")
    """

    input_schema: Type[AgentIO] = str
    output_schema: Type[AgentIO] = str

    def __init__(
        self,
        agents: List[AgentIOProtocol[Any, Any]] = list(),
        input_schema: Type[AgentIO] | NotGiven = NOT_GIVEN,
        output_schema: Type[AgentIO] | NotGiven = NOT_GIVEN,
    ) -> None:
        super().__init__()

        if not isinstance(input_schema, NotGiven):
            self.input_schema = input_schema
        if not isinstance(output_schema, NotGiven):
            self.output_schema = output_schema
        self.agents = agents
        self.output_history: List[AgentOutputT] = []
        self.current_step: Optional[int] = None

    async def run(
        self,
        input_data: Optional[AgentInputT],
        name: Optional[str] = None,
    ) -> AgentOutputT:
        """Run the pipeline by executing each agent in sequence.

        The pipeline processes input data through a chain of agents, where each agent's
        output becomes the input for the next agent. Type validation is performed at
        each step to ensure data compatibility.

        Args:
            input_data: The input data to process through the pipeline. Must match
                the pipeline's input_schema type.
            name: Optional name to identify this pipeline run, passed to each agent.

        Returns:
            The final output after processing through all agents in the pipeline.
            Will match the pipeline's output_schema type.

        Raises:
            AgentyTypeError: If agent's output type doesn't match the next agent's input schema.
            AgentyValueError: If the pipeline contains no agents.

        Example:
            >>> pipeline = extractor | processor | formatter
            >>> result = await pipeline.run(
            ...     input_data="Raw text to process",
            ...     name="document-123"
            ... )
        """
        if not self.agents:
            raise AgentyValueError("Pipeline must contain at least one agent")

        def validate_data(data: Any, schema: Type[AgentIO], context: str) -> Any:
            """Validate data against a schema."""
            try:
                return TypeAdapter(schema).validate_python(data)
            except ValidationError:
                raise AgentyTypeError(
                    f"{context} data type {type(data)} does not match schema {schema}"
                )

        current_input = validate_data(input_data, self.input_schema, "Pipeline input")
        output = None

        self.reset()
        for i, agent in enumerate(self.agents, 0):
            self.current_step = i
            typed_input = validate_data(
                current_input, agent.input_schema, f"Agent {i} input"
            )
            output = await agent.run(typed_input, name=name)
            current_input = output
            self.output_history.append(output)

        final_output = validate_data(output, self.output_schema, "Pipeline output")
        return cast(AgentOutputT, final_output)

    def run_sync(
        self,
        input_data: Optional[AgentInputT],
        name: Optional[str] = None,
    ) -> AgentOutputT:
        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:
            loop = None

        if loop:
            return loop.run_until_complete(
                asyncio.create_task(self.run(input_data, name))
            )
        else:
            return asyncio.run(self.run(input_data, name))

    def reset(self) -> None:
        """Reset the pipeline to its initial state."""
        self.current_step: Optional[int] = None
        self.output_history = []
        for agent in self.agents:
            agent.reset()

    def __or__(
        self, other: AgentIOProtocol[AgentOutputT, PipelineOutputT]
    ) -> AgentIOProtocol[AgentInputT, PipelineOutputT]:
        """Chain this pipeline with another agent using the | operator.

        This enables fluent pipeline construction using the | operator to chain
        multiple agents together. The output type of this pipeline must match
        the input type of the other agent.

        Args:
            other: Another agent to append to this pipeline. Its input schema must
                match this pipeline's output schema.

        Returns:
            A new Pipeline instance containing both this pipeline's agents and
            the new agent, preserving type information for the full chain.

        Example:
            >>> # Chain multiple agents to create a processing pipeline
            >>> pipeline = (
            ...     extractor |      # Extract entities
            ...     classifier |     # Classify entities
            ...     formatter        # Format results
            ... )
        """
        # Create a new pipeline with combined agents
        new_pipeline = Pipeline[AgentInputT, PipelineOutputT](
            agents=self.agents + [other],
            input_schema=self.input_schema,
            output_schema=other.output_schema,
        )
        return new_pipeline

reset()

Reset the pipeline to its initial state.

Source code in agenty/pipeline.py
139
140
141
142
143
144
def reset(self) -> None:
    """Reset the pipeline to its initial state."""
    self.current_step: Optional[int] = None
    self.output_history = []
    for agent in self.agents:
        agent.reset()

run(input_data, name=None) async

Run the pipeline by executing each agent in sequence.

The pipeline processes input data through a chain of agents, where each agent's output becomes the input for the next agent. Type validation is performed at each step to ensure data compatibility.

Parameters:

Name Type Description Default
input_data Optional[AgentInputT]

The input data to process through the pipeline. Must match the pipeline's input_schema type.

required
name Optional[str]

Optional name to identify this pipeline run, passed to each agent.

None

Returns:

Type Description
AgentOutputT

The final output after processing through all agents in the pipeline.

AgentOutputT

Will match the pipeline's output_schema type.

Raises:

Type Description
AgentyTypeError

If agent's output type doesn't match the next agent's input schema.

AgentyValueError

If the pipeline contains no agents.

Example

pipeline = extractor | processor | formatter result = await pipeline.run( ... input_data="Raw text to process", ... name="document-123" ... )

Source code in agenty/pipeline.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
async def run(
    self,
    input_data: Optional[AgentInputT],
    name: Optional[str] = None,
) -> AgentOutputT:
    """Run the pipeline by executing each agent in sequence.

    The pipeline processes input data through a chain of agents, where each agent's
    output becomes the input for the next agent. Type validation is performed at
    each step to ensure data compatibility.

    Args:
        input_data: The input data to process through the pipeline. Must match
            the pipeline's input_schema type.
        name: Optional name to identify this pipeline run, passed to each agent.

    Returns:
        The final output after processing through all agents in the pipeline.
        Will match the pipeline's output_schema type.

    Raises:
        AgentyTypeError: If agent's output type doesn't match the next agent's input schema.
        AgentyValueError: If the pipeline contains no agents.

    Example:
        >>> pipeline = extractor | processor | formatter
        >>> result = await pipeline.run(
        ...     input_data="Raw text to process",
        ...     name="document-123"
        ... )
    """
    if not self.agents:
        raise AgentyValueError("Pipeline must contain at least one agent")

    def validate_data(data: Any, schema: Type[AgentIO], context: str) -> Any:
        """Validate data against a schema."""
        try:
            return TypeAdapter(schema).validate_python(data)
        except ValidationError:
            raise AgentyTypeError(
                f"{context} data type {type(data)} does not match schema {schema}"
            )

    current_input = validate_data(input_data, self.input_schema, "Pipeline input")
    output = None

    self.reset()
    for i, agent in enumerate(self.agents, 0):
        self.current_step = i
        typed_input = validate_data(
            current_input, agent.input_schema, f"Agent {i} input"
        )
        output = await agent.run(typed_input, name=name)
        current_input = output
        self.output_history.append(output)

    final_output = validate_data(output, self.output_schema, "Pipeline output")
    return cast(AgentOutputT, final_output)