Condividi tramite


Concetti di base relativi ai flussi di lavoro di Microsoft Agent Framework - Executor

Questo documento fornisce un'analisi approfondita del componente Executors del sistema flusso di lavoro di Microsoft Agent Framework.

Informazioni generali

Gli executor sono i blocchi predefiniti fondamentali che elaborano i messaggi in un flusso di lavoro. Sono unità di elaborazione autonome che ricevono messaggi tipizzato, eseguono operazioni e possono produrre messaggi o eventi di output.

Importante

A partire da questa versione, il modo consigliato per definire i gestori dei messaggi dell'executor in C# consiste nell'usare l'attributo [MessageHandler] nei metodi all'interno di una partial classe che deriva da Executor. Questo sostituisce i modelli precedenti in cui si ereditava da Executor<TInput>, Executor<TInput, TOutput> o ReflectingExecutor<T> con le interfacce IMessageHandler<T>. Il nuovo approccio usa la generazione del codice sorgente in fase di compilazione per la registrazione del gestore, offrendo prestazioni migliori, convalida in fase di compilazione e compatibilità con Native AOT. I modelli precedenti sono deprecati e verranno rimossi in una versione futura.

Gli executor derivano dalla Executor classe base e usano l'attributo per dichiarare i [MessageHandler] metodi del gestore. La classe deve essere contrassegnata partial per abilitare la generazione di origine. Ogni executor ha un identificatore univoco e può gestire tipi di messaggio specifici.

Struttura di Base dell'Executor

using Microsoft.Agents.AI.Workflows;

internal sealed partial class UppercaseExecutor() : Executor("UppercaseExecutor")
{
    [MessageHandler]
    private ValueTask<string> HandleAsync(string message, IWorkflowContext context)
    {
        string result = message.ToUpperInvariant();
        return ValueTask.FromResult(result); // Return value is automatically sent to connected executors
    }
}

È possibile inviare messaggi manualmente senza restituire un valore:

internal sealed partial class UppercaseExecutor() : Executor("UppercaseExecutor")
{
    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        string result = message.ToUpperInvariant();
        await context.SendMessageAsync(result); // Manually send messages to connected executors
    }
}

È anche possibile gestire più tipi di input definendo più [MessageHandler] metodi:

internal sealed partial class SampleExecutor() : Executor("SampleExecutor")
{
    /// <summary>
    /// Converts input string to uppercase
    /// </summary>
    [MessageHandler]
    private ValueTask<string> HandleStringAsync(string message, IWorkflowContext context)
    {
        return ValueTask.FromResult(message.ToUpperInvariant());
    }

    /// <summary>
    /// Doubles the input integer
    /// </summary>
    [MessageHandler]
    private ValueTask<int> HandleIntAsync(int message, IWorkflowContext context)
    {
        return ValueTask.FromResult(message * 2);
    }
}

È anche possibile creare un executor da una funzione usando il BindExecutor metodo di estensione:

Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindExecutor("UppercaseExecutor");

Gli executor ereditano dalla Executor classe base. Ogni executor ha un identificatore univoco e può gestire tipi di messaggio specifici usando metodi decorati con il decoratore @handler. I gestori devono avere l'annotazione corretta per specificare il tipo di messaggi che possono elaborare.

Struttura di Base dell'Executor

from agent_framework import (
    Executor,
    WorkflowContext,
    handler,
)

class UpperCase(Executor):

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Convert the input to uppercase and forward it to the next node.

        Note: The WorkflowContext is parameterized with the type this handler will
        emit. Here WorkflowContext[str] means downstream nodes should expect str.
        """
        await ctx.send_message(text.upper())

È possibile creare un executor da una funzione usando il decoratore @executor:

from agent_framework import (
    WorkflowContext,
    executor,
)

@executor(id="upper_case_executor")
async def upper_case(text: str, ctx: WorkflowContext[str]) -> None:
    """Convert the input to uppercase and forward it to the next node.

    Note: The WorkflowContext is parameterized with the type this handler will
    emit. Here WorkflowContext[str] means downstream nodes should expect str.
    """
    await ctx.send_message(text.upper())

È anche possibile gestire più tipi di input definendo più gestori:

class SampleExecutor(Executor):

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Convert the input to uppercase and forward it to the next node.

        Note: The WorkflowContext is parameterized with the type this handler will
        emit. Here WorkflowContext[str] means downstream nodes should expect str.
        """
        await ctx.send_message(text.upper())

    @handler
    async def double_integer(self, number: int, ctx: WorkflowContext[int]) -> None:
        """Double the input integer and forward it to the next node.

        Note: The WorkflowContext is parameterized with the type this handler will
        emit. Here WorkflowContext[int] means downstream nodes should expect int.
        """
        await ctx.send_message(number * 2)

Oggetto WorkflowContext

L'oggetto WorkflowContext fornisce metodi per consentire al gestore di interagire con il flusso di lavoro durante l'esecuzione. Viene WorkflowContext parametrizzato con il tipo di messaggi che il gestore emetterà e il tipo di output che può fornire.

Il metodo più comunemente usato è send_message, che consente al gestore di inviare messaggi agli executor connessi.

from agent_framework import WorkflowContext

class SomeHandler(Executor):

    @handler
    async def some_handler(message: str, ctx: WorkflowContext[str]) -> None:
        await ctx.send_message("Hello, World!")

Un gestore può usare yield_output per produrre output che verranno considerati come output del flusso di lavoro e restituiti/trasmessi al chiamante come evento di output:

from agent_framework import WorkflowContext

class SomeHandler(Executor):

    @handler
    async def some_handler(message: str, ctx: WorkflowContext[Never, str]) -> None:
        await ctx.yield_output("Hello, World!")

Se un gestore non invia né messaggi né restituisce output, non è necessario alcun parametro di tipo per WorkflowContext:

from agent_framework import WorkflowContext

class SomeHandler(Executor):

    @handler
    async def some_handler(message: str, ctx: WorkflowContext) -> None:
        print("Doing some work...")

Passaggio successivo