Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Questa esercitazione illustra come creare un flusso di lavoro sequenziale semplice usando i flussi di lavoro di Agent Framework.
I flussi di lavoro sequenziali sono la base della creazione di sistemi complessi dell'agente di intelligenza artificiale. Questa esercitazione illustra come creare un semplice flusso di lavoro in due passaggi in cui ogni passaggio elabora i dati e lo passa al passaggio successivo.
Informazioni generali
In questa esercitazione si creerà un flusso di lavoro con due executor:
- Esecutore maiuscole - Converte il testo di input in lettere maiuscole
- Esecutore di testo inverso: inverte il testo e mostra il risultato finale
Il flusso di lavoro illustra i concetti di base, ad esempio:
- Creazione di un executor personalizzato con un gestore
- Creazione di un executor personalizzato da una funzione
- Utilizzando
WorkflowBuilderper connettere executor con archi - Elaborazione dei dati tramite passaggi sequenziali
- Osservazione dell'esecuzione del flusso di lavoro tramite eventi
Concetti trattati
Prerequisiti
- .NET 8.0 SDK o versione successiva
- Nessun servizio di intelligenza artificiale esterno richiesto per questo esempio di base
- Nuova applicazione console
Implementazione dettagliata
Le sezioni seguenti illustrano come compilare il flusso di lavoro sequenziale passo dopo passo.
Passaggio 1: Installare pacchetti NuGet
Prima di tutto, installare i pacchetti necessari per il progetto .NET:
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
Passaggio 2: Definire l'executor maiuscolo
Definire un executor che converte il testo in maiuscolo:
using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows;
/// <summary>
/// First executor: converts input text to uppercase.
/// </summary>
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindExecutor("UppercaseExecutor");
Punti chiave:
- Creare una funzione che accetta una stringa e restituisce la versione maiuscola
- Usare
BindExecutor()per creare un executor dalla funzione
Passaggio 3: Definire l'esecutore di testo inverso
Definisci un esecutore che inverte il testo:
/// <summary>
/// Second executor: reverses the input text and completes the workflow.
/// </summary>
internal sealed partial class ReverseTextExecutor() : Executor("ReverseTextExecutor")
{
[MessageHandler]
private ValueTask<string> HandleAsync(string input, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Reverse the input text
return ValueTask.FromResult(new string(input.Reverse().ToArray()));
}
}
ReverseTextExecutor reverse = new();
Punti chiave:
- Creare una
partialclasse che deriva daExecutor - Aggiungere l'attributo
[MessageHandler]al metodo del gestore
Passaggio 4: Compilare e connettere il flusso di lavoro
Collegare gli esecutori usando WorkflowBuilder:
// Build the workflow by connecting executors sequentially
WorkflowBuilder builder = new(uppercase);
builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse);
var workflow = builder.Build();
Punti chiave:
-
WorkflowBuilderil costruttore riceve l'executor iniziale -
AddEdge()crea una connessione diretta da maiuscolo a inverso -
WithOutputFrom()specifica gli executor che producono output del flusso di lavoro -
Build()crea il flusso di lavoro non modificabile
Passaggio 5: Eseguire il flusso di lavoro
Eseguire il flusso di lavoro e osservare i risultati:
// Execute the workflow with input data
await using Run run = await InProcessExecution.RunAsync(workflow, "Hello, World!");
foreach (WorkflowEvent evt in run.NewEvents)
{
switch (evt)
{
case ExecutorCompletedEvent executorComplete:
Console.WriteLine($"{executorComplete.ExecutorId}: {executorComplete.Data}");
break;
}
}
Passaggio 6: Comprendere il risultato del flusso di lavoro
Quando si esegue il flusso di lavoro, verrà visualizzato un output simile al seguente:
UppercaseExecutor: HELLO, WORLD!
ReverseTextExecutor: !DLROW ,OLLEH
L'input "Hello, World!" viene prima convertito in maiuscolo ("HELLO, WORLD!"), quindi invertito ("! DLROW , OLLEH").
Concetti chiave illustrati
Interfaccia esecutore
Executor dalle funzioni:
- Usare
BindExecutor()per creare un executor da una funzione
Gli executor derivano da Executor e usano [MessageHandler]:
- La classe deve essere
partialper abilitare la generazione di origine -
[MessageHandler]: attributo che contrassegna un metodo come gestore di messaggi - Il tipo di parametro del metodo del gestore determina i messaggi accettati dall'executor
- Il tipo restituito del metodo del gestore determina quali messaggi produce l'executor
Modello di generatore di workflow .NET
WorkflowBuilder fornisce un'API fluente per la creazione di flussi di lavoro:
- Costruttore: richiede l'executor iniziale
- AddEdge(): crea connessioni dirette tra executor
- WithOutputFrom(): specifica quali executor producono output del flusso di lavoro
- Build(): crea il flusso di lavoro non modificabile finale
Tipi di eventi .NET
Durante l'esecuzione, è possibile osservare questi tipi di evento:
-
ExecutorCompletedEvent- Quando un executor termina l'elaborazione
Esempio completo di .NET
Per l'implementazione completa e pronta per l'esecuzione, vedere l'esempio di 01_ExecutorsAndEdges nel repository di Agent Framework.
Questo esempio include:
- Implementazione completa con tutte le istruzioni using e la struttura di classi
- Commenti aggiuntivi che illustrano i concetti relativi al flusso di lavoro
- Completare l'installazione e la configurazione del progetto
Informazioni generali
In questa esercitazione si creerà un flusso di lavoro con due executor:
- Convertitore maiuscolo - Converte il testo di input in lettere maiuscole
- Esecutore di testo inverso: inverte il testo e mostra il risultato finale
Il flusso di lavoro illustra i concetti di base, ad esempio:
- Due modi per definire un'unità di lavoro (un nodo executor):
- Classe personalizzata che sottoclassa
Executorcon un metodo asincrono contrassegnato da@handler - Funzione asincrona autonoma decorata con
@executor
- Classe personalizzata che sottoclassa
- Connessione degli esecutori con
WorkflowBuilder - Passaggio di dati tra passaggi con
ctx.send_message() - Produzione dell'output finale con
ctx.yield_output() - Streaming di eventi per l'osservabilità in tempo reale
Concetti trattati
Prerequisiti
- Python 3.10 o versione successiva
- Pacchetto Python di Agent Framework Core installato:
pip install agent-framework-core --pre - Nessun servizio di intelligenza artificiale esterno richiesto per questo esempio di base
Implementazione dettagliata
Le sezioni seguenti illustrano come compilare il flusso di lavoro sequenziale passo dopo passo.
Passaggio 1: Importare i moduli necessari
Prima di tutto, importare i moduli necessari da Agent Framework:
import asyncio
from typing_extensions import Never
from agent_framework import WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, executor
Passaggio 2: Creare il Primo Executor
Creare un executor che converte il testo in maiuscolo implementando un executor con un metodo del gestore:
class UpperCase(Executor):
def __init__(self, id: str):
super().__init__(id=id)
@handler
async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
"""Convert the input to uppercase and forward it to the next node.
Note: The WorkflowContext is parameterized with the type this handler will
emit. Here WorkflowContext[str] means downstream nodes should expect str.
"""
result = text.upper()
# Send the result to the next executor in the workflow.
await ctx.send_message(result)
Punti chiave:
- La sottoclasse
Executorconsente di definire un nodo denominato con hook del ciclo di vita, se necessario - L'elemento
@handlerdecorator contrassegna il metodo asincrono che esegue il lavoro - La firma del gestore segue un contratto:
- Il primo parametro è l'input tipizzato per questo nodo (qui:
text: str) - Il secondo parametro è ,
WorkflowContext[T_Out]doveT_Outè il tipo di dati che questo nodo genererà tramitectx.send_message()(qui:str)
- Il primo parametro è l'input tipizzato per questo nodo (qui:
- All'interno di un gestore viene in genere calcolato un risultato e inoltrato ai nodi downstream usando
ctx.send_message(result)
Passaggio 3: Creare il secondo executor
Per semplici passaggi è possibile ignorare la sottoclasse e definire una funzione asincrona con lo stesso modello di firma (input tipizzato + WorkflowContext) e decorarlo con @executor. In questo modo viene creato un nodo completamente funzionale che può essere cablato in un flusso:
@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
"""Reverse the input and yield the workflow output."""
result = text[::-1]
# Yield the final output for this workflow run
await ctx.yield_output(result)
Punti chiave:
- L'elemento
@executordecorator trasforma una funzione asincrona autonoma in un nodo del flusso di lavoro - l'oggetto
WorkflowContextè parametrizzato con due tipi:-
T_Out = Never: questo nodo non invia messaggi ai nodi downstream -
T_W_Out = str: questo nodo restituisce l'output del flusso di lavoro di tipostr
-
- I nodi del terminale producono output usando
ctx.yield_output()per fornire risultati del flusso di lavoro - Il flusso di lavoro viene completato quando diventa inattiva (non è più necessario eseguire altre operazioni)
Passaggio 4: Compilare il flusso di lavoro
Collegare gli esecutori usando WorkflowBuilder:
upper_case = UpperCase(id="upper_case_executor")
workflow = (
WorkflowBuilder()
.add_edge(upper_case, reverse_text)
.set_start_executor(upper_case)
.build()
)
Punti chiave:
-
add_edge()crea connessioni dirette tra executor -
set_start_executor()definisce il punto di ingresso -
build()finalizza il flusso di lavoro
Passaggio 5: Eseguire il flusso di lavoro con streaming
Eseguire il flusso di lavoro e osservare gli eventi in tempo reale:
async def main():
# Run the workflow and stream events
async for event in workflow.run_stream("hello world"):
print(f"Event: {event}")
if isinstance(event, WorkflowOutputEvent):
print(f"Workflow completed with result: {event.data}")
if __name__ == "__main__":
asyncio.run(main())
Passaggio 6: Informazioni sull'output
Quando si esegue il flusso di lavoro, verranno visualizzati eventi come:
Event: ExecutorInvokedEvent(executor_id=upper_case_executor)
Event: ExecutorCompletedEvent(executor_id=upper_case_executor)
Event: ExecutorInvokedEvent(executor_id=reverse_text_executor)
Event: ExecutorCompletedEvent(executor_id=reverse_text_executor)
Event: WorkflowOutputEvent(data='DLROW OLLEH', source_executor_id=reverse_text_executor)
Workflow completed with result: DLROW OLLEH
Concetti chiave illustrati
Due modi per definire gli esecutori
-
Classe personalizzata (sottoclasse
Executor): migliore quando sono necessari hook del ciclo di vita o stato complesso. Definire un metodo asincrono con il decorator@handler. -
Basato su funzioni (
@executordecoratore): ideale per semplici passaggi. Definire una funzione asincrona autonoma con lo stesso modello di firma.
Entrambi gli approcci usano la stessa firma del gestore:
- Primo parametro: input tipizzato per questo nodo
- Secondo parametro: a
WorkflowContext[T_Out, T_W_Out]
Tipi di contesto del flusso di lavoro
Il WorkflowContext tipo generico definisce i flussi di dati tra executor:
-
WorkflowContext[T_Out]- Usato per i nodi che inviano messaggi di tipoT_Outai nodi downstream tramitectx.send_message() -
WorkflowContext[T_Out, T_W_Out]- Usato per i nodi che producono anche l'output del flusso di lavoro di tipoT_W_Outtramitectx.yield_output() -
WorkflowContextsenza parametri di tipo è equivalente aWorkflowContext[Never, Never], ovvero questo nodo non invia messaggi a nodi downstream né restituisce l'output del flusso di lavoro
Tipi di evento
Durante l'esecuzione dello streaming, si osserveranno questi tipi di eventi:
-
ExecutorInvokedEvent- Quando un executor avvia l'elaborazione -
ExecutorCompletedEvent- Quando un executor termina l'elaborazione -
WorkflowOutputEvent- Contiene il risultato finale del flusso di lavoro
Modello Python per la costruzione di flussi di lavoro
WorkflowBuilder fornisce un'API fluente per la creazione di flussi di lavoro:
- add_edge(): crea connessioni dirette tra executor
- set_start_executor(): definisce il punto di ingresso del flusso di lavoro
- build(): Finalizza e restituisce un oggetto flusso di lavoro non modificabile
Esempio completo
Per l'implementazione completa e pronta per l'esecuzione, vedere l'esempio nel repository di Agent Framework.
Questo esempio include:
- Implementazione completa con tutte le importazioni e la documentazione
- Commenti aggiuntivi che illustrano i concetti relativi al flusso di lavoro
- Output di esempio che mostra i risultati previsti