Condividi tramite


Creare un flusso di lavoro sequenziale semplice

Questa esercitazione illustra come creare un flusso di lavoro sequenziale semplice usando i flussi di lavoro di Agent Framework.

I flussi di lavoro sequenziali sono la base della creazione di sistemi complessi dell'agente di intelligenza artificiale. Questa esercitazione illustra come creare un semplice flusso di lavoro in due passaggi in cui ogni passaggio elabora i dati e lo passa al passaggio successivo.

Informazioni generali

In questa esercitazione si creerà un flusso di lavoro con due executor:

  1. Esecutore maiuscole - Converte il testo di input in lettere maiuscole
  2. Esecutore di testo inverso: inverte il testo e mostra il risultato finale

Il flusso di lavoro illustra i concetti di base, ad esempio:

  • Creazione di un executor personalizzato con un gestore
  • Creazione di un executor personalizzato da una funzione
  • Utilizzando WorkflowBuilder per connettere executor con archi
  • Elaborazione dei dati tramite passaggi sequenziali
  • Osservazione dell'esecuzione del flusso di lavoro tramite eventi

Concetti trattati

Prerequisiti

Implementazione dettagliata

Le sezioni seguenti illustrano come compilare il flusso di lavoro sequenziale passo dopo passo.

Passaggio 1: Installare pacchetti NuGet

Prima di tutto, installare i pacchetti necessari per il progetto .NET:

dotnet add package Microsoft.Agents.AI.Workflows --prerelease

Passaggio 2: Definire l'executor maiuscolo

Definire un executor che converte il testo in maiuscolo:

using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows;

/// <summary>
/// First executor: converts input text to uppercase.
/// </summary>
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindExecutor("UppercaseExecutor");

Punti chiave:

  • Creare una funzione che accetta una stringa e restituisce la versione maiuscola
  • Usare BindExecutor() per creare un executor dalla funzione

Passaggio 3: Definire l'esecutore di testo inverso

Definisci un esecutore che inverte il testo:

/// <summary>
/// Second executor: reverses the input text and completes the workflow.
/// </summary>
internal sealed partial class ReverseTextExecutor() : Executor("ReverseTextExecutor")
{
    [MessageHandler]
    private ValueTask<string> HandleAsync(string input, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Reverse the input text
        return ValueTask.FromResult(new string(input.Reverse().ToArray()));
    }
}

ReverseTextExecutor reverse = new();

Punti chiave:

  • Creare una partial classe che deriva da Executor
  • Aggiungere l'attributo [MessageHandler] al metodo del gestore

Passaggio 4: Compilare e connettere il flusso di lavoro

Collegare gli esecutori usando WorkflowBuilder:

// Build the workflow by connecting executors sequentially
WorkflowBuilder builder = new(uppercase);
builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse);
var workflow = builder.Build();

Punti chiave:

  • WorkflowBuilder il costruttore riceve l'executor iniziale
  • AddEdge() crea una connessione diretta da maiuscolo a inverso
  • WithOutputFrom() specifica gli executor che producono output del flusso di lavoro
  • Build() crea il flusso di lavoro non modificabile

Passaggio 5: Eseguire il flusso di lavoro

Eseguire il flusso di lavoro e osservare i risultati:

// Execute the workflow with input data
await using Run run = await InProcessExecution.RunAsync(workflow, "Hello, World!");
foreach (WorkflowEvent evt in run.NewEvents)
{
    switch (evt)
    {
        case ExecutorCompletedEvent executorComplete:
            Console.WriteLine($"{executorComplete.ExecutorId}: {executorComplete.Data}");
            break;
    }
}

Passaggio 6: Comprendere il risultato del flusso di lavoro

Quando si esegue il flusso di lavoro, verrà visualizzato un output simile al seguente:

UppercaseExecutor: HELLO, WORLD!
ReverseTextExecutor: !DLROW ,OLLEH

L'input "Hello, World!" viene prima convertito in maiuscolo ("HELLO, WORLD!"), quindi invertito ("! DLROW , OLLEH").

Concetti chiave illustrati

Interfaccia esecutore

Executor dalle funzioni:

  • Usare BindExecutor() per creare un executor da una funzione

Gli executor derivano da Executor e usano [MessageHandler]:

  • La classe deve essere partial per abilitare la generazione di origine
  • [MessageHandler]: attributo che contrassegna un metodo come gestore di messaggi
  • Il tipo di parametro del metodo del gestore determina i messaggi accettati dall'executor
  • Il tipo restituito del metodo del gestore determina quali messaggi produce l'executor

Modello di generatore di workflow .NET

WorkflowBuilder fornisce un'API fluente per la creazione di flussi di lavoro:

  • Costruttore: richiede l'executor iniziale
  • AddEdge(): crea connessioni dirette tra executor
  • WithOutputFrom(): specifica quali executor producono output del flusso di lavoro
  • Build(): crea il flusso di lavoro non modificabile finale

Tipi di eventi .NET

Durante l'esecuzione, è possibile osservare questi tipi di evento:

  • ExecutorCompletedEvent - Quando un executor termina l'elaborazione

Esempio completo di .NET

Per l'implementazione completa e pronta per l'esecuzione, vedere l'esempio di 01_ExecutorsAndEdges nel repository di Agent Framework.

Questo esempio include:

  • Implementazione completa con tutte le istruzioni using e la struttura di classi
  • Commenti aggiuntivi che illustrano i concetti relativi al flusso di lavoro
  • Completare l'installazione e la configurazione del progetto

Informazioni generali

In questa esercitazione si creerà un flusso di lavoro con due executor:

  1. Convertitore maiuscolo - Converte il testo di input in lettere maiuscole
  2. Esecutore di testo inverso: inverte il testo e mostra il risultato finale

Il flusso di lavoro illustra i concetti di base, ad esempio:

  • Due modi per definire un'unità di lavoro (un nodo executor):
    1. Classe personalizzata che sottoclassa Executor con un metodo asincrono contrassegnato da @handler
    2. Funzione asincrona autonoma decorata con @executor
  • Connessione degli esecutori con WorkflowBuilder
  • Passaggio di dati tra passaggi con ctx.send_message()
  • Produzione dell'output finale con ctx.yield_output()
  • Streaming di eventi per l'osservabilità in tempo reale

Concetti trattati

Prerequisiti

  • Python 3.10 o versione successiva
  • Pacchetto Python di Agent Framework Core installato: pip install agent-framework-core --pre
  • Nessun servizio di intelligenza artificiale esterno richiesto per questo esempio di base

Implementazione dettagliata

Le sezioni seguenti illustrano come compilare il flusso di lavoro sequenziale passo dopo passo.

Passaggio 1: Importare i moduli necessari

Prima di tutto, importare i moduli necessari da Agent Framework:

import asyncio
from typing_extensions import Never
from agent_framework import WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, executor

Passaggio 2: Creare il Primo Executor

Creare un executor che converte il testo in maiuscolo implementando un executor con un metodo del gestore:

class UpperCase(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Convert the input to uppercase and forward it to the next node.

        Note: The WorkflowContext is parameterized with the type this handler will
        emit. Here WorkflowContext[str] means downstream nodes should expect str.
        """
        result = text.upper()

        # Send the result to the next executor in the workflow.
        await ctx.send_message(result)

Punti chiave:

  • La sottoclasse Executor consente di definire un nodo denominato con hook del ciclo di vita, se necessario
  • L'elemento @handler decorator contrassegna il metodo asincrono che esegue il lavoro
  • La firma del gestore segue un contratto:
    • Il primo parametro è l'input tipizzato per questo nodo (qui: text: str)
    • Il secondo parametro è , WorkflowContext[T_Out]dove T_Out è il tipo di dati che questo nodo genererà tramite ctx.send_message() (qui: str)
  • All'interno di un gestore viene in genere calcolato un risultato e inoltrato ai nodi downstream usando ctx.send_message(result)

Passaggio 3: Creare il secondo executor

Per semplici passaggi è possibile ignorare la sottoclasse e definire una funzione asincrona con lo stesso modello di firma (input tipizzato + WorkflowContext) e decorarlo con @executor. In questo modo viene creato un nodo completamente funzionale che può essere cablato in un flusso:

@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
    """Reverse the input and yield the workflow output."""
    result = text[::-1]

    # Yield the final output for this workflow run
    await ctx.yield_output(result)

Punti chiave:

  • L'elemento @executor decorator trasforma una funzione asincrona autonoma in un nodo del flusso di lavoro
  • l'oggetto WorkflowContext è parametrizzato con due tipi:
    • T_Out = Never: questo nodo non invia messaggi ai nodi downstream
    • T_W_Out = str: questo nodo restituisce l'output del flusso di lavoro di tipo str
  • I nodi del terminale producono output usando ctx.yield_output() per fornire risultati del flusso di lavoro
  • Il flusso di lavoro viene completato quando diventa inattiva (non è più necessario eseguire altre operazioni)

Passaggio 4: Compilare il flusso di lavoro

Collegare gli esecutori usando WorkflowBuilder:

upper_case = UpperCase(id="upper_case_executor")

workflow = (
    WorkflowBuilder()
    .add_edge(upper_case, reverse_text)
    .set_start_executor(upper_case)
    .build()
)

Punti chiave:

  • add_edge() crea connessioni dirette tra executor
  • set_start_executor() definisce il punto di ingresso
  • build() finalizza il flusso di lavoro

Passaggio 5: Eseguire il flusso di lavoro con streaming

Eseguire il flusso di lavoro e osservare gli eventi in tempo reale:

async def main():
    # Run the workflow and stream events
    async for event in workflow.run_stream("hello world"):
        print(f"Event: {event}")
        if isinstance(event, WorkflowOutputEvent):
            print(f"Workflow completed with result: {event.data}")

if __name__ == "__main__":
    asyncio.run(main())

Passaggio 6: Informazioni sull'output

Quando si esegue il flusso di lavoro, verranno visualizzati eventi come:

Event: ExecutorInvokedEvent(executor_id=upper_case_executor)
Event: ExecutorCompletedEvent(executor_id=upper_case_executor)
Event: ExecutorInvokedEvent(executor_id=reverse_text_executor)
Event: ExecutorCompletedEvent(executor_id=reverse_text_executor)
Event: WorkflowOutputEvent(data='DLROW OLLEH', source_executor_id=reverse_text_executor)
Workflow completed with result: DLROW OLLEH

Concetti chiave illustrati

Due modi per definire gli esecutori

  1. Classe personalizzata (sottoclasse Executor): migliore quando sono necessari hook del ciclo di vita o stato complesso. Definire un metodo asincrono con il decorator @handler.
  2. Basato su funzioni (@executor decoratore): ideale per semplici passaggi. Definire una funzione asincrona autonoma con lo stesso modello di firma.

Entrambi gli approcci usano la stessa firma del gestore:

  • Primo parametro: input tipizzato per questo nodo
  • Secondo parametro: a WorkflowContext[T_Out, T_W_Out]

Tipi di contesto del flusso di lavoro

Il WorkflowContext tipo generico definisce i flussi di dati tra executor:

  • WorkflowContext[T_Out] - Usato per i nodi che inviano messaggi di tipo T_Out ai nodi downstream tramite ctx.send_message()
  • WorkflowContext[T_Out, T_W_Out] - Usato per i nodi che producono anche l'output del flusso di lavoro di tipo T_W_Out tramite ctx.yield_output()
  • WorkflowContext senza parametri di tipo è equivalente a WorkflowContext[Never, Never], ovvero questo nodo non invia messaggi a nodi downstream né restituisce l'output del flusso di lavoro

Tipi di evento

Durante l'esecuzione dello streaming, si osserveranno questi tipi di eventi:

  • ExecutorInvokedEvent - Quando un executor avvia l'elaborazione
  • ExecutorCompletedEvent - Quando un executor termina l'elaborazione
  • WorkflowOutputEvent - Contiene il risultato finale del flusso di lavoro

Modello Python per la costruzione di flussi di lavoro

WorkflowBuilder fornisce un'API fluente per la creazione di flussi di lavoro:

  • add_edge(): crea connessioni dirette tra executor
  • set_start_executor(): definisce il punto di ingresso del flusso di lavoro
  • build(): Finalizza e restituisce un oggetto flusso di lavoro non modificabile

Esempio completo

Per l'implementazione completa e pronta per l'esecuzione, vedere l'esempio nel repository di Agent Framework.

Questo esempio include:

  • Implementazione completa con tutte le importazioni e la documentazione
  • Commenti aggiuntivi che illustrano i concetti relativi al flusso di lavoro
  • Output di esempio che mostra i risultati previsti

Passaggi successivi