次の方法で共有


チュートリアル: データフロー パイプラインの作成

DataflowBlock.ReceiveDataflowBlock.ReceiveAsync、およびDataflowBlock.TryReceiveメソッドを使用してソース ブロックからメッセージを受信することもできますが、メッセージ ブロックを接続してデータフロー パイプラインを形成することもできます。 データフロー パイプラインは一連のコンポーネント ( データフロー ブロック) であり、それぞれがより大きな目標に貢献する特定のタスクを実行します。 データフロー パイプライン内のすべてのデータフロー ブロックは、別のデータフロー ブロックからメッセージを受信したときに処理を実行します。 これは、自動車製造の組み立てラインに例えることができます。 各車両が組み立てラインを通過すると、1つのステーションがフレームを組み立て、次のステーションがエンジンを取り付けます。 組み立てラインでは複数の車両を同時に組み立てることができるため、一度に 1 台ずつ完全な車両を組み立てるよりもスループットが向上します。

このドキュメントでは、Web サイトから書籍 The Iliad of Homer をダウンロードし、テキストを検索して、個々の単語と最初の単語の文字を逆にする単語と一致させるデータフロー パイプラインを示します。 このドキュメントでのデータフロー パイプラインの形成は、次の手順で構成されます。

  1. パイプラインに参加するデータフロー ブロックを作成します。

  2. 各データフロー ブロックをパイプライン内の次のブロックに接続します。 各ブロックは、パイプライン内の前のブロックの出力を入力として受け取ります。

  3. データフロー ブロックごとに、前のブロックが完了した後に次のブロックを完了状態に設定する継続タスクを作成します。

  4. パイプラインの先頭にデータをポストします。

  5. パイプラインのヘッドを完了としてマークします。

  6. パイプラインがすべての作業を完了するまで待ちます。

[前提条件]

このチュートリアルを開始する前に 、データフロー を読み取ってください。

コンソール アプリケーションの作成

Visual Studio で、Visual C# または Visual Basic コンソール アプリケーション プロジェクトを作成します。 System.Threading.Tasks.Dataflow NuGet パッケージをインストールします。

TPL データフロー ライブラリ ( System.Threading.Tasks.Dataflow 名前空間) は、.NET 6 以降のバージョンに含まれています。 .NET Framework および .NET Standard プロジェクトの場合は、 📦 System.Threading.Tasks.Dataflow NuGet パッケージをインストールする必要があります。

次のコードをプロジェクトに追加して、基本的なアプリケーションを作成します。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class Program
{
   static void Main()
   {
   }
}
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
    End Sub

End Module

データフロー ブロックの作成

次のコードを Main メソッドに追加して、パイプラインに参加するデータフロー ブロックを作成します。 次の表は、パイプラインの各メンバーのロールをまとめたものです。

//
// Create the members of the pipeline.
//

// Downloads the requested resource as a string.
var downloadString = new TransformBlock<string, string>(async uri =>
{
   Console.WriteLine($"Downloading '{uri}'...");

   return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
});

// Separates the specified text into an array of words.
var createWordList = new TransformBlock<string, string[]>(text =>
{
   Console.WriteLine("Creating word list...");

   // Remove common punctuation by replacing all non-letter characters
   // with a space character.
   char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
   text = new string(tokens);

   // Separate the text into an array of words.
   return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
});

// Removes short words and duplicates.
var filterWordList = new TransformBlock<string[], string[]>(words =>
{
   Console.WriteLine("Filtering word list...");

   return words
      .Where(word => word.Length > 3)
      .Distinct()
      .ToArray();
});

// Finds all words in the specified collection whose reverse also
// exists in the collection.
var findReversedWords = new TransformManyBlock<string[], string>(words =>
{
   Console.WriteLine("Finding reversed words...");

   var wordsSet = new HashSet<string>(words);

   return from word in words.AsParallel()
          let reverse = new string(word.Reverse().ToArray())
          where word != reverse && wordsSet.Contains(reverse)
          select word;
});

// Prints the provided reversed words to the console.
var printReversedWords = new ActionBlock<string>(reversedWord =>
{
   Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
});
'
' Create the members of the pipeline.
' 

' Downloads the requested resource as a string.
Dim downloadString = New TransformBlock(Of String, String)(
    Async Function(uri)
        Console.WriteLine("Downloading '{0}'...", uri)

        Return Await New HttpClient().GetStringAsync(uri)
    End Function)

' Separates the specified text into an array of words.
Dim createWordList = New TransformBlock(Of String, String())(
   Function(text)
       Console.WriteLine("Creating word list...")

     ' Remove common punctuation by replacing all non-letter characters 
     ' with a space character.
     Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
       text = New String(tokens)

     ' Separate the text into an array of words.
     Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
   End Function)

' Removes short words and duplicates.
Dim filterWordList = New TransformBlock(Of String(), String())(
   Function(words)
       Console.WriteLine("Filtering word list...")

       Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
   End Function)

' Finds all words in the specified collection whose reverse also 
' exists in the collection.
Dim findReversedWords = New TransformManyBlock(Of String(), String)(
   Function(words)

       Dim wordsSet = New HashSet(Of String)(words)

       Return From word In words.AsParallel()
              Let reverse = New String(word.Reverse().ToArray())
              Where word <> reverse AndAlso wordsSet.Contains(reverse)
              Select word
   End Function)

' Prints the provided reversed words to the console.    
Dim printReversedWords = New ActionBlock(Of String)(
   Sub(reversedWord)
       Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
   End Sub)
メンバー タイプ Description
downloadString TransformBlock<TInput,TOutput> Web から書籍のテキストをダウンロードします。
createWordList TransformBlock<TInput,TOutput> 書籍のテキストを単語の配列に分割します。
filterWordList TransformBlock<TInput,TOutput> 単語配列から短い単語と重複を削除します。
findReversedWords TransformManyBlock<TInput,TOutput> フィルター処理された単語配列コレクション内のすべての単語を検索します。その逆は、単語配列でも発生します。
printReversedWords ActionBlock<TInput> 単語と対応する逆の単語をコンソールに表示します。

この例では、データフロー パイプラインの複数のステップを 1 つのステップに組み合わせることができますが、この例では、複数の独立したデータフロー タスクを作成して大きなタスクを実行するという概念を示しています。 この例では、 TransformBlock<TInput,TOutput> を使用して、パイプラインの各メンバーが入力データに対して操作を実行し、パイプラインの次のステップに結果を送信できるようにします。 パイプラインの findReversedWords メンバーは、入力ごとに複数の独立した出力を生成するため、 TransformManyBlock<TInput,TOutput> オブジェクトです。 パイプラインの末尾 ( printReversedWords) は、入力に対してアクションを実行し、結果を生成しないため、 ActionBlock<TInput> オブジェクトです。

パイプラインの形成

次のコードを追加して、各ブロックをパイプライン内の次のブロックに接続します。

LinkTo メソッドを呼び出してソース データフロー ブロックをターゲット データフロー ブロックに接続すると、データが使用可能になると、ソース データフロー ブロックによってターゲット ブロックにデータが伝達されます。 DataflowLinkOptions を true に設定して PropagateCompletion を指定すると、パイプライン内のブロックが正常に完了した場合でも失敗した場合でも、その結果として次のブロックが完了します。

//
// Connect the dataflow blocks to form a pipeline.
//

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

downloadString.LinkTo(createWordList, linkOptions);
createWordList.LinkTo(filterWordList, linkOptions);
filterWordList.LinkTo(findReversedWords, linkOptions);
findReversedWords.LinkTo(printReversedWords, linkOptions);
'
' Connect the dataflow blocks to form a pipeline.
'

Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

downloadString.LinkTo(createWordList, linkOptions)
createWordList.LinkTo(filterWordList, linkOptions)
filterWordList.LinkTo(findReversedWords, linkOptions)
findReversedWords.LinkTo(printReversedWords, linkOptions)

パイプラインへのデータの投稿

次のコードを追加して、書籍 The Iliad of Homer の URL をデータフロー パイプラインの先頭に投稿します。

// Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");
' Process "The Iliad of Homer" by Homer.
downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

この例では、 DataflowBlock.Post を使用して、パイプラインの先頭にデータを同期的に送信します。 データフロー ノードに非同期的にデータを送信する必要がある場合は、 DataflowBlock.SendAsync メソッドを使用します。

パイプライン アクティビティの完了

次のコードを追加して、パイプラインの先頭を完了済みとしてマークします。 パイプラインのヘッドは、バッファー内のすべてのメッセージを処理した後、その完了を伝達します。

// Mark the head of the pipeline as complete.
downloadString.Complete();
' Mark the head of the pipeline as complete.
downloadString.Complete()

この例では、処理するデータフロー パイプラインを介して 1 つの URL を送信します。 パイプラインを介して複数の入力を送信する場合は、すべての入力を送信した後、 IDataflowBlock.Complete メソッドを呼び出します。 アプリケーションにデータが使用できなくなったり、アプリケーションがパイプラインの完了を待つ必要がない明確に定義されたポイントがない場合は、この手順を省略できます。

パイプラインの完了を待機しています

パイプラインが完了するのを待つ次のコードを追加します。 パイプラインの末尾が終了すると、全体的な操作が完了します。

// Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait();
' Wait for the last block in the pipeline to process all messages.
printReversedWords.Completion.Wait()

任意のスレッドまたは複数のスレッドからのデータフローの完了を同時に待機できます。

完全な例

次の例は、このチュートリアルの完全なコードを示しています。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to create a basic dataflow pipeline.
// This program downloads the book "The Iliad of Homer" by Homer from the Web
// and finds all reversed words that appear in that book.
static class DataflowReversedWords
{
   static void Main()
   {
      //
      // Create the members of the pipeline.
      //

      // Downloads the requested resource as a string.
      var downloadString = new TransformBlock<string, string>(async uri =>
      {
         Console.WriteLine($"Downloading '{uri}'...");

         return await new HttpClient(new HttpClientHandler{ AutomaticDecompression = System.Net.DecompressionMethods.GZip }).GetStringAsync(uri);
      });

      // Separates the specified text into an array of words.
      var createWordList = new TransformBlock<string, string[]>(text =>
      {
         Console.WriteLine("Creating word list...");

         // Remove common punctuation by replacing all non-letter characters
         // with a space character.
         char[] tokens = text.Select(c => char.IsLetter(c) ? c : ' ').ToArray();
         text = new string(tokens);

         // Separate the text into an array of words.
         return text.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
      });

      // Removes short words and duplicates.
      var filterWordList = new TransformBlock<string[], string[]>(words =>
      {
         Console.WriteLine("Filtering word list...");

         return words
            .Where(word => word.Length > 3)
            .Distinct()
            .ToArray();
      });

      // Finds all words in the specified collection whose reverse also
      // exists in the collection.
      var findReversedWords = new TransformManyBlock<string[], string>(words =>
      {
         Console.WriteLine("Finding reversed words...");

         var wordsSet = new HashSet<string>(words);

         return from word in words.AsParallel()
                let reverse = new string(word.Reverse().ToArray())
                where word != reverse && wordsSet.Contains(reverse)
                select word;
      });

      // Prints the provided reversed words to the console.
      var printReversedWords = new ActionBlock<string>(reversedWord =>
      {
         Console.WriteLine($"Found reversed words {reversedWord}/{new string(reversedWord.Reverse().ToArray())}");
      });

      //
      // Connect the dataflow blocks to form a pipeline.
      //

      var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

      downloadString.LinkTo(createWordList, linkOptions);
      createWordList.LinkTo(filterWordList, linkOptions);
      filterWordList.LinkTo(findReversedWords, linkOptions);
      findReversedWords.LinkTo(printReversedWords, linkOptions);

      // Process "The Iliad of Homer" by Homer.
      downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt");

      // Mark the head of the pipeline as complete.
      downloadString.Complete();

      // Wait for the last block in the pipeline to process all messages.
      printReversedWords.Completion.Wait();
   }
}
/* Sample output:
   Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
   Creating word list...
   Filtering word list...
   Finding reversed words...
   Found reversed words doom/mood
   Found reversed words draw/ward
   Found reversed words aera/area
   Found reversed words seat/taes
   Found reversed words live/evil
   Found reversed words port/trop
   Found reversed words sleek/keels
   Found reversed words area/aera
   Found reversed words tops/spot
   Found reversed words evil/live
   Found reversed words mood/doom
   Found reversed words speed/deeps
   Found reversed words moor/room
   Found reversed words trop/port
   Found reversed words spot/tops
   Found reversed words spots/stops
   Found reversed words stops/spots
   Found reversed words reed/deer
   Found reversed words keels/sleek
   Found reversed words deeps/speed
   Found reversed words deer/reed
   Found reversed words taes/seat
   Found reversed words room/moor
   Found reversed words ward/draw
*/
Imports System.Net.Http
Imports System.Threading.Tasks.Dataflow

' Demonstrates how to create a basic dataflow pipeline.
' This program downloads the book "The Iliad of Homer" by Homer from the Web 
' and finds all reversed words that appear in that book.
Module DataflowReversedWords

    Sub Main()
        '
        ' Create the members of the pipeline.
        ' 

        ' Downloads the requested resource as a string.
        Dim downloadString = New TransformBlock(Of String, String)(
            Async Function(uri)
                Console.WriteLine("Downloading '{0}'...", uri)

                Return Await New HttpClient().GetStringAsync(uri)
            End Function)

        ' Separates the specified text into an array of words.
        Dim createWordList = New TransformBlock(Of String, String())(
           Function(text)
               Console.WriteLine("Creating word list...")

             ' Remove common punctuation by replacing all non-letter characters 
             ' with a space character.
             Dim tokens() As Char = text.Select(Function(c) If(Char.IsLetter(c), c, " "c)).ToArray()
               text = New String(tokens)

             ' Separate the text into an array of words.
             Return text.Split(New Char() {" "c}, StringSplitOptions.RemoveEmptyEntries)
           End Function)

        ' Removes short words and duplicates.
        Dim filterWordList = New TransformBlock(Of String(), String())(
           Function(words)
               Console.WriteLine("Filtering word list...")

               Return words.Where(Function(word) word.Length > 3).Distinct().ToArray()
           End Function)

        ' Finds all words in the specified collection whose reverse also 
        ' exists in the collection.
        Dim findReversedWords = New TransformManyBlock(Of String(), String)(
           Function(words)

               Dim wordsSet = New HashSet(Of String)(words)

               Return From word In words.AsParallel()
                      Let reverse = New String(word.Reverse().ToArray())
                      Where word <> reverse AndAlso wordsSet.Contains(reverse)
                      Select word
           End Function)

        ' Prints the provided reversed words to the console.    
        Dim printReversedWords = New ActionBlock(Of String)(
           Sub(reversedWord)
               Console.WriteLine("Found reversed words {0}/{1}", reversedWord, New String(reversedWord.Reverse().ToArray()))
           End Sub)

        '
        ' Connect the dataflow blocks to form a pipeline.
        '

        Dim linkOptions = New DataflowLinkOptions With {.PropagateCompletion = True}

        downloadString.LinkTo(createWordList, linkOptions)
        createWordList.LinkTo(filterWordList, linkOptions)
        filterWordList.LinkTo(findReversedWords, linkOptions)
        findReversedWords.LinkTo(printReversedWords, linkOptions)

        ' Process "The Iliad of Homer" by Homer.
        downloadString.Post("http://www.gutenberg.org/cache/epub/16452/pg16452.txt")

        ' Mark the head of the pipeline as complete.
        downloadString.Complete()

        ' Wait for the last block in the pipeline to process all messages.
        printReversedWords.Completion.Wait()
    End Sub

End Module

' Sample output:
'Downloading 'http://www.gutenberg.org/cache/epub/16452/pg16452.txt'...
'Creating word list...
'Filtering word list...
'Finding reversed words...
'Found reversed words aera/area
'Found reversed words doom/mood
'Found reversed words draw/ward
'Found reversed words live/evil
'Found reversed words seat/taes
'Found reversed words area/aera
'Found reversed words port/trop
'Found reversed words sleek/keels
'Found reversed words tops/spot
'Found reversed words evil/live
'Found reversed words speed/deeps
'Found reversed words mood/doom
'Found reversed words moor/room
'Found reversed words spot/tops
'Found reversed words spots/stops
'Found reversed words trop/port
'Found reversed words stops/spots
'Found reversed words reed/deer
'Found reversed words deeps/speed
'Found reversed words deer/reed
'Found reversed words taes/seat
'Found reversed words keels/sleek
'Found reversed words room/moor
'Found reversed words ward/draw

次のステップ

この例では、データフロー パイプラインを介して処理する 1 つの URL を送信します。 パイプラインを介して複数の入力値を送信する場合は、自動車工場内での部品の移動方法に似た並列処理の形式をアプリケーションに導入できます。 パイプラインの最初のメンバーが結果を 2 番目のメンバーに送信すると、2 番目のメンバーが最初の結果を処理するときに、別の項目を並列で処理できます。

データフロー パイプラインを使用して実現される並列処理は、通常、より少ない大きなタスクで構成されるため、 粒度の粗い並列処理 と呼ばれます。 データフロー パイプラインでは、実行時間の短い小さいタスクのより 細かい並列処理 を使用することもできます。 この例では、パイプラインの findReversedWords メンバーは PLINQ を使用して、作業リスト内の複数の項目を並列処理します。 粒度の粗いパイプラインできめ細かい並列処理を使用すると、全体的なスループットを向上させることができます。

また、ソース データフロー ブロックを複数のターゲット ブロックに接続して 、データフロー ネットワークを作成することもできます。 LinkTo メソッドのオーバーロードされたバージョンは、ターゲット ブロックがその値に基づいて各メッセージを受け入れるかどうかを定義するPredicate<T> オブジェクトを受け取ります。 ソースとして機能するほとんどのデータフロー ブロックの種類は、いずれかのブロックがそのメッセージを受け入れるまで、接続されているすべてのターゲット ブロックに、接続された順序でメッセージを提供します。 このフィルター処理メカニズムを使用すると、接続されたデータフロー ブロックのシステムを作成し、特定のデータを 1 つのパスを通り、他のデータを別のパスに転送できます。 フィルター処理を使用してデータフロー ネットワークを作成する例については、「 チュートリアル: Windows フォーム アプリケーションでのデータフローの使用」を参照してください。

こちらも参照ください