Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Async streams modellieren eine Streaming-Quelle für Daten. Datenströme rufen häufig Elemente asynchron ab oder generieren sie. Sie stellen ein natürliches Programmiermodell für asynchrone Streamingdatenquellen bereit.
In diesem Tutorial lernen Sie, wie Sie:
- Erstellen Sie eine Datenquelle, die eine Abfolge von Datenelementen asynchron generiert.
- Nutzen Sie diese Datenquelle asynchron.
- Unterstützung für Abbruchvorgänge und erfasste Kontexte für asynchrone Streams
- Erkennen Sie, wann die neue Schnittstelle und Datenquelle den früheren synchronen Datensequenzen vorgezogen werden.
Voraussetzungen
Sie müssen Ihren Computer so einrichten, dass .NET ausgeführt wird, einschließlich des C#-Compilers. Der C#-Compiler ist mit Visual Studio 2022 oder dem .NET SDK verfügbar.
Sie müssen ein GitHub-Zugriffstoken erstellen, damit Sie auf den GitHub GraphQL-Endpunkt zugreifen können. Wählen Sie die folgenden Berechtigungen für Ihr GitHub-Zugriffstoken aus:
- repo:status
- public_repo
Speichern Sie das Zugriffstoken an einem sicheren Ort, damit Sie es verwenden können, um Zugriff auf den GitHub-API-Endpunkt zu erhalten.
Warnung
Schützen Sie Ihr persönliches Zugriffstoken. Jede Software mit Ihrem persönlichen Zugriffstoken kann GitHub-API-Aufrufe mit Ihren Zugriffsrechten durchführen.
In diesem Tutorial wird davon ausgegangen, dass Sie mit C# und .NET vertraut sind (einschließlich Visual Studio oder der .NET-CLI).
Ausführen der Startanwendung
Sie können den Code für die In diesem Lernprogramm verwendete Startanwendung aus dem Dotnet/docs-Repository im Ordner "asynchrone Programmierung/Codeausschnitte " abrufen.
Die Startanwendung ist eine Konsolenanwendung, die die GitHub GraphQL-Schnittstelle verwendet, um aktuelle Probleme abzurufen, die im dotnet/docs-Repository geschrieben wurden. Sehen Sie sich zunächst den folgenden Code für die Start-App-Methode Main an:
static async Task Main(string[] args)
{
//Follow these steps to create a GitHub Access Token
// https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
//Select the following permissions for your GitHub Access Token:
// - repo:status
// - public_repo
// Replace the 3rd parameter to the following code with your GitHub access token.
var key = GetEnvVariable("GitHubKey",
"You must store your GitHub key in the 'GitHubKey' environment variable",
"");
var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
{
Credentials = new Octokit.Credentials(key)
};
var progressReporter = new progressStatus((num) =>
{
Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();
try
{
var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
cancellationSource.Token, progressReporter);
foreach(var issue in results)
Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
Console.WriteLine("Work has been cancelled");
}
}
Sie können entweder eine GitHubKey Umgebungsvariable auf Ihr persönliches Zugriffstoken festlegen oder das letzte Argument im Aufruf GetEnvVariable durch Ihr persönliches Zugriffstoken ersetzen. Platzieren Sie Ihren Zugriffscode nicht im Quellcode, wenn Sie die Quelle für andere freigeben. Laden Sie niemals Zugriffscodes in ein freigegebenes Quell-Repository hoch.
Nach dem Erstellen des GitHub-Clients erstellt der Code Main ein Statusberichtsobjekt und ein Abbruchtoken. Sobald diese Objekte erstellt wurden, ruft MainRunPagedQueryAsync auf, um die letzten 250 erstellten Anfragen abzurufen. Nachdem dieser Vorgang abgeschlossen ist, werden die Ergebnisse angezeigt.
Wenn Sie die Startanwendung ausführen, können Sie einige wichtige Beobachtungen dazu machen, wie diese Anwendung ausgeführt wird. Für jede von GitHub zurückgegebene Seite wird der Fortschritt angezeigt. Sie können eine spürbare Pause beobachten, bevor GitHub jede neue Seite mit Problemen zurückgibt. Schließlich werden die Probleme erst angezeigt, nachdem alle 10 Seiten von GitHub abgerufen wurden.
Untersuchen der Implementierung
Die Implementierung zeigt, warum Sie das im vorherigen Abschnitt erläuterte Verhalten beobachtet haben. Überprüfen Sie den Code für RunPagedQueryAsync:
private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
JArray finalResults = new JArray();
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();
}
return finalResults;
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
Die erste Methode besteht darin, das POST-Objekt mithilfe der GraphQLRequest Klasse zu erstellen:
public class GraphQLRequest
{
[JsonProperty("query")]
public string? Query { get; set; }
[JsonProperty("variables")]
public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();
public string ToJsonText() =>
JsonConvert.SerializeObject(this);
}
dies hilft, den POST-Objekttext zu bilden und ihn korrekt in JSON zu konvertieren, die als einzelne Zeichenfolge mit der ToJsonText Methode dargestellt wird, wodurch alle Zeilenumbruchzeichen aus dem Anforderungstext entfernt werden, der sie mit dem \ Escapezeichen (umgekehrter Schrägstrich) markiert.
Konzentrieren wir uns auf den Pagingalgorithmus und die asynchrone Struktur des vorherigen Codes. (Ausführliche Informationen zur GitHub GraphQL-API finden Sie in der GitHub GraphQL-Dokumentation .) Die RunPagedQueryAsync Methode listet die Probleme von der neuesten bis zur ältesten auf. Es fordert 25 Probleme pro Seite an und untersucht die pageInfo Struktur der Antwort, um mit der vorherigen Seite fortzufahren. Dies folgt der Standardmäßigen Paging-Unterstützung von GraphQL für Mehrseitenantworten. Die Antwort enthält ein pageInfo Objekt, das einen hasPreviousPages Wert und einen startCursor Wert enthält, der zum Anfordern der vorherigen Seite verwendet wird. Die Probleme befinden sich im nodes Array. Die RunPagedQueryAsync Methode fügt diese Knoten an ein Array an, das alle Ergebnisse aller Seiten enthält.
Nach dem Abrufen und Wiederherstellen einer Seite mit Ergebnissen berichtet RunPagedQueryAsync über den Fortschritt und prüft, ob eine Stornierung vorliegt. Wenn ein Abbruch angefordert wurde, löst RunPagedQueryAsync eine OperationCanceledException aus.
In diesem Code gibt es mehrere Elemente, die verbessert werden können. Vor allem muss RunPagedQueryAsync Speicherplatz für alle zurückgegebenen Issues zuordnen. In diesem Beispiel werden nur 250 Probleme angezeigt, da das Abrufen aller offenen Probleme deutlich mehr Arbeitsspeicher erfordert, um diese zu speichern. Der Algorithmus ist durch die Protokolle zur Unterstützung von Fortschrittsberichten und Abbruchvorgängen beim ersten Lesen schwieriger zu verstehen. Weitere Typen und APIs sind beteiligt. Sie müssen die Kommunikation über die CancellationTokenSource und deren Zugehörigen CancellationToken nachverfolgen, um zu verstehen, wo die Kündigung angefordert wird und wo sie gewährt wird.
Async-Streams bieten eine bessere Möglichkeit
Async-Streams und die zugehörige Sprachunterstützung behandeln alle diese Bedenken. Der Code, der die Sequenz generiert, kann jetzt yield return verwenden, um Elemente in einer Methode zurückzugeben, die mit dem async Modifizierer deklariert wurde. Sie können einen asynchronen Datenstrom mit einer await foreach Schleife genauso nutzen, wie Sie jede Sequenz mit einer foreach Schleife nutzen.
Diese neuen Sprachfeatures sind von drei neuen Schnittstellen abhängig, die .NET Standard 2.1 hinzugefügt und in .NET Core 3.0 implementiert wurden:
- System.Collections.Generic.IAsyncEnumerable<T>
- System.Collections.Generic.IAsyncEnumerator<T>
- System.IAsyncDisposable
Diese drei Schnittstellen sollten den meisten C#-Entwicklern vertraut sein. Sie verhalten sich ähnlich wie bei ihren synchronen Gegenstücken:
- System.Collections.Generic.IEnumerable<T>
- System.Collections.Generic.IEnumerator<T>
- System.IDisposable
Ein Typ, der möglicherweise nicht vertraut ist, ist System.Threading.Tasks.ValueTask. Die ValueTask Struktur stellt eine ähnliche API wie die System.Threading.Tasks.Task Klasse bereit.
ValueTask wird aus Leistungsgründen in diesen Schnittstellen verwendet.
Konvertieren in asynchrone Datenströme
Konvertieren Sie als Nächstes die RunPagedQueryAsync Methode, um einen asynchronen Datenstrom zu generieren. Ändern Sie zunächst die Signatur von RunPagedQueryAsync, um ein IAsyncEnumerable<JToken> Objekt zurückzugeben, und entfernen Sie das Abbruchtoken und das Fortschrittsobjekt aus der Parameterliste, wie im folgenden Code gezeigt.
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName)
Der Startcode verarbeitet jede Seite, während die Seite abgerufen wird, wie im folgenden Code dargestellt:
finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();
Ersetzen Sie diese drei Zeilen durch den folgenden Code:
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
Sie können auch die vorherige Deklaration von finalResults in dieser Methode sowie die Anweisung return entfernen, die der von Ihnen geänderten Schleife folgt.
Sie haben die Änderungen abgeschlossen, um einen asynchronen Datenstrom zu generieren. Die fertige Methode sollte dem folgenden Code ähneln:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
}
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
Als Nächstes ändern Sie den Code, der die Sammlung verwendet, um den asynchronen Datenstrom zu nutzen. Suchen Sie den folgenden Code im Main, der die Sammlung von Problemen verarbeitet.
var progressReporter = new progressStatus((num) =>
{
Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();
try
{
var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
cancellationSource.Token, progressReporter);
foreach(var issue in results)
Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
Console.WriteLine("Work has been cancelled");
}
Ersetzen Sie diesen Code durch die folgende await foreach Schleife:
int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
Die neue Schnittstelle IAsyncEnumerator<T> leitet sich von IAsyncDisposable ab. Das bedeutet, dass die vorhergehende Schleife den Stream asynchron löscht, wenn die Schleife beendet wird. Sie können sich vorstellen, dass die Schleife wie der folgende Code aussieht:
int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
while (await enumerator.MoveNextAsync())
{
var issue = enumerator.Current;
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
} finally
{
if (enumerator != null)
await enumerator.DisposeAsync();
}
Streamelemente werden standardmäßig im erfassten Kontext verarbeitet. Wenn Sie die Erfassung des Kontexts deaktivieren möchten, verwenden Sie die TaskAsyncEnumerableExtensions.ConfigureAwait Erweiterungsmethode. Weitere Informationen zu Synchronisierungskontexten und zum Erfassen des aktuellen Kontexts finden Sie im Artikel zum Verwenden des aufgabenbasierten asynchronen Musters.
Async-Streams unterstützen den Abbruch mithilfe desselben Protokolls wie andere async Methoden. Sie würden die Signatur für die asynchrone Iteratormethode wie folgt ändern, um den Abbruch zu unterstützen:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
}
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
Das System.Runtime.CompilerServices.EnumeratorCancellationAttribute Attribut bewirkt, dass der Compiler Code für das IAsyncEnumerator<T> generiert, der das an GetAsyncEnumerator übergebene Element im Textkörper des asynchronen Iterators als Argument sichtbar macht. Innerhalb runQueryAsynckönnen Sie den Status des Tokens untersuchen und bei Bedarf weitere Arbeiten abbrechen.
Sie verwenden eine andere Erweiterungsmethode, WithCancellationum das Abbruchtoken an den asynchronen Datenstrom zu übergeben. Sie würden die Schleife, die die Probleme aufzählt, wie folgt ändern:
private static async Task EnumerateWithCancellation(GitHubClient client)
{
int num = 0;
var cancellation = new CancellationTokenSource();
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
.WithCancellation(cancellation.Token))
{
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
}
Sie können den Code für das fertige Lernprogramm aus dem Dotnet/docs-Repository im Ordner "asynchrone Programmierung/Codeausschnitte " abrufen.
Ausführen der fertigen Anwendung
Erneutes Ausführen der Anwendung Vergleichen Sie sein Verhalten mit dem der Startanwendung. Die erste Seite der Ergebnisse wird aufgezählt, sobald sie verfügbar ist. Es gibt eine feststellbare Pause, während jede neue Seite angefordert und abgerufen wird, dann werden die Ergebnisse der nächsten Seite schnell aufgezählt. Der try / catch-Block ist zur Verarbeitung eines Abbruchs nicht erforderlich: Der Aufrufer kann das Auflisten der Sammlung beenden. Der Fortschritt wird eindeutig gemeldet, da der asynchrone Datenstrom Ergebnisse generiert, sobald jede Seite heruntergeladen wird. Der Status jedes zurückgegebenen Problems ist nahtlos in der await foreach-Schleife enthalten. Sie benötigen kein Rückrufobjekt, um den Fortschritt nachzuverfolgen.
Sie können Verbesserungen der Speichernutzung sehen, indem Sie den Code untersuchen. Sie müssen eine Sammlung nicht mehr zuweisen, um alle Ergebnisse zu speichern, bevor sie aufgezählt werden. Der Aufrufer kann bestimmen, wie die Ergebnisse verwendet werden und ob eine Speicherauflistung erforderlich ist.
Führen Sie sowohl die Start- als auch die fertigen Anwendungen aus, und Sie können die Unterschiede zwischen den Implementierungen selbst beobachten. Sie können das GitHub-Zugriffstoken löschen, das Sie erstellt haben, nachdem Sie dieses Lernprogramm gestartet haben. Wenn ein Angreifer Zugriff auf dieses Token erlangt hat, kann er mithilfe Ihrer Anmeldeinformationen auf GitHub-APIs zugreifen.
In diesem Lernprogramm haben Sie asynchrone Datenströme zum Lesen einzelner Elemente aus einer Netzwerk-API verwendet, die Datenseiten zurückgibt. Asynchrone Streams können auch aus endlosen Streams wie einem Aktienticker oder einem Sensorgerät lesen. Der Aufruf von MoveNextAsync gibt das nächste Element zurück, sobald es verfügbar ist.