Bemærk
Adgang til denne side kræver godkendelse. Du kan prøve at logge på eller ændre mapper.
Adgang til denne side kræver godkendelse. Du kan prøve at ændre mapper.
This page provides an overview of how Request and Response handling works in the Microsoft Agent Framework Workflow system.
Overview
Executors in a workflow can send requests to outside of the workflow and wait for responses. This is useful for scenarios where an executor needs to interact with external systems, such as human-in-the-loop interactions, or any other asynchronous operations.
Enable Request and Response Handling in a Workflow
Requests and responses are handled via a special type called InputPort.
// Create an input port that receives requests of type CustomRequestType and responses of type CustomResponseType.
var inputPort = InputPort.Create<CustomRequestType, CustomResponseType>("input-port");
Add the input port to a workflow.
var executorA = new SomeExecutor();
var workflow = new WorkflowBuilder(inputPort)
.AddEdge(inputPort, executorA)
.AddEdge(executorA, inputPort)
.Build<CustomRequestType>();
Now, because in the workflow executorA is connected to the inputPort in both directions, executorA needs to be able to send requests and receive responses via the inputPort. Here's what you need to do in SomeExecutor to send a request and receive a response.
internal sealed partial class SomeExecutor(): Executor("SomeExecutor")
{
[MessageHandler]
private async ValueTask HandleAsync(CustomResponseType message, IWorkflowContext context)
{
// Process the response...
...
// Send a request
await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
}
}
Alternatively, SomeExecutor can separate the request sending and response handling into two handlers.
internal sealed partial class SomeExecutor() : Executor("SomeExecutor")
{
[MessageHandler]
private async ValueTask HandleCustomResponseAsync(CustomResponseType message, IWorkflowContext context)
{
// Process the response...
...
}
[MessageHandler]
private async ValueTask HandleOtherDataAsync(OtherDataType message, IWorkflowContext context)
{
// Process the message...
...
// Send a request
await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
}
}
Executors can send requests using ctx.request_info() and handle responses with @response_handler.
from agent_framework import response_handler, WorkflowBuilder
executor_a = SomeExecutor()
executor_b = SomeOtherExecutor()
workflow_builder = WorkflowBuilder(start_executor=executor_a)
workflow_builder.add_edge(executor_a, executor_b)
workflow = workflow_builder.build()
executor_a can send requests and receive responses directly using built-in capabilities.
from agent_framework import (
Executor,
WorkflowContext,
handler,
response_handler,
)
class SomeExecutor(Executor):
@handler
async def handle_data(
self,
data: OtherDataType,
context: WorkflowContext,
):
# Process the message...
...
# Send a request using the API
await context.request_info(
request_data=CustomRequestType(...),
response_type=CustomResponseType
)
@response_handler
async def handle_response(
self,
original_request: CustomRequestType,
response: CustomResponseType,
context: WorkflowContext,
):
# Process the response...
...
The @response_handler decorator automatically registers the method to handle responses for the specified request and response types.
Explicit Type Parameters for Response Handlers
As an alternative to type annotations, you can specify types explicitly via decorator parameters on @response_handler. This follows the same pattern as the @handler decorator.
Important
When using explicit type parameters, you must specify all types via the decorator - you cannot mix explicit parameters with type annotations. If you provide any explicit type parameter (request, response, output, or workflow_output), the framework will not introspect the function signature for types. The request and response parameters are required when using explicit mode; output and workflow_output are optional.
from agent_framework import (
Executor,
WorkflowContext,
handler,
response_handler,
)
class SomeExecutor(Executor):
@handler(input=OtherDataType)
async def handle_data(self, data, context):
# Send a request using the API
await context.request_info(
request_data=CustomRequestType(...),
response_type=CustomResponseType
)
# Explicit types - no type annotations needed on parameters
@response_handler(request=CustomRequestType, response=CustomResponseType)
async def handle_response(self, original_request, response, context):
# Process the response...
...
# With output types specified
@response_handler(request=CustomRequestType, response=CustomResponseType, output=int)
async def handle_response_with_output(self, original_request, response, context):
await context.send_message(42) # int - matches output type
# Union types and string forward references are supported
@response_handler(request="MyRequest | OtherRequest", response="MyResponse")
async def handle_custom(self, original_request, response, context):
...
Handling Requests and Responses
An InputPort emits a RequestInfoEvent when it receives a request. You can subscribe to these events to handle incoming requests from the workflow. When you receive a response from an external system, send it back to the workflow using the response mechanism. The framework automatically routes the response to the executor that sent the original request.
StreamingRun handle = await InProcessExecution.StreamAsync(workflow, input).ConfigureAwait(false);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle `RequestInfoEvent` from the workflow
ExternalResponse response = requestInputEvt.Request.CreateResponse<CustomResponseType>(...);
await handle.SendResponseAsync(response).ConfigureAwait(false);
break;
case WorkflowOutputEvent workflowOutputEvt:
// The workflow has completed successfully
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
return;
}
}
Executors can send requests directly without needing a separate component. When an executor calls ctx.request_info(), the workflow emits a WorkflowEvent with type="request_info". You can subscribe to these events to handle incoming requests from the workflow. When you receive a response from an external system, send it back to the workflow using the response mechanism. The framework automatically routes the response to the executor's @response_handler method.
from agent_framework import WorkflowEvent
while True:
request_info_events : list[WorkflowEvent] = []
pending_responses : dict[str, CustomResponseType] = {}
stream = workflow.run_stream(input) if not pending_responses else workflow.send_responses_streaming(pending_responses)
async for event in stream:
if event.type == "request_info":
# Handle request_info event from the workflow
request_info_events.append(event)
if not request_info_events:
break
for request_info_event in request_info_events:
# Handle request_info event from the workflow
response = CustomResponseType(...)
pending_responses[request_info_event.request_id] = response
Checkpoints and Requests
To learn more about checkpoints, see Checkpoints.
When a checkpoint is created, pending requests are also saved as part of the checkpoint state. When you restore from a checkpoint, any pending requests will be re-emitted as WorkflowEvent objects with type="request_info", allowing you to capture and respond to them. You cannot provide responses directly during the resume operation - instead, you must listen for the re-emitted events and respond using the standard response mechanism.