次の方法で共有


チュートリアル: Lakeflow Pipelines エディターを使用して最初のパイプラインを作成する

データ オーケストレーションと自動ローダーに Lakeflow Spark 宣言パイプライン (SDP) を使用して新しいパイプラインを作成する方法について説明します。 このチュートリアルでは、データをクリーニングし、上位 100 人のユーザーを検索するクエリを作成することで、サンプル パイプラインを拡張します。

このチュートリアルでは、Lakeflow Pipelines エディターを使用して次の操作を行う方法について説明します。

  • 既定のフォルダー構造で新しいパイプラインを作成し、一連のサンプル ファイルから始めます。
  • 期待値を使用してデータ品質制約を定義します。
  • エディター機能を使用して、新しい変換でパイプラインを拡張し、データの分析を実行します。

Requirements

このチュートリアルを開始する前に、次の作業を行う必要があります。

  • Azure Databricks ワークスペースにログインします。
  • ワークスペースに対して Unity カタログを有効にします。
  • ワークスペースに対して Lakeflow パイプライン エディターを有効にし、オプトインする必要があります。 「Lakeflow パイプライン エディターを有効にする」と「更新された監視」を参照してください。
  • コンピューティング リソースを作成したり、コンピューティング リソースにアクセスしたりするためのアクセス許可を持っている。
  • カタログに新しいスキーマを作成するためのアクセス許可を持っている。 必要なアクセス許可は、 ALL PRIVILEGES または USE CATALOGCREATE SCHEMAです。

手順 1: パイプラインを作成する

この手順では、既定のフォルダー構造とコード サンプルを使用してパイプラインを作成します。 コード サンプルは、users サンプル データ ソースのwanderbricks テーブルを参照します。

  1. Azure Databricks ワークスペースで、プラスアイコン新規、そして パイプラインアイコン をクリックして、ETL パイプラインを選択します。 パイプライン エディターが開き、[パイプラインの作成] ページが開きます。

  2. ヘッダーをクリックして、パイプラインに名前を付けます。

  3. 名前のすぐ下で、出力テーブルの既定のカタログとスキーマを選択します。 これらは、パイプライン定義でカタログとスキーマを指定しない場合に使用されます。

  4. パイプラインの [次のステップ] で、いずれかの [スキーマ] アイコンをクリックします。SQL またはSchema icon.のサンプル コードから始めます。言語の設定に基づいて、Python のサンプル コードから始めます。 これにより、サンプル コードの既定の言語が変更されますが、後で他の言語でコードを追加できます。 これにより、作業を開始するためのサンプル コードを含む既定のフォルダー構造が作成されます。

  5. ワークスペースの左側にあるパイプライン資産ブラウザーでサンプル コードを表示できます。 transformationsの下には、それぞれ 1 つのパイプライン データセットを生成する 2 つのファイルがあります。 explorationsの下には、パイプラインの出力を表示するのに役立つコードが含まれているノートブックがあります。 ファイルをクリックすると、エディターでコードを表示および編集できます。

    出力データセットはまだ作成されておらず、画面の右側の パイプライン グラフ は空です。

  6. パイプライン コード ( transformations フォルダー内のコード) を実行するには、画面の右上にある [ パイプラインの実行 ] をクリックします。

    実行が完了すると、ワークスペースの下部に、作成された 2 つの新しいテーブル ( sample_users_<pipeline-name>sample_aggregation_<pipeline-name>) が表示されます。 ワークスペースの右側にあるパイプライン グラフに、sample_usersのソースであるsample_aggregationを含む 2 つのテーブルが表示されるようになりました。

手順 2: データ品質チェックを適用する

この手順では、データ品質チェックを sample_users テーブルに追加します。 パイプラインの 期待値を 使用して、データを制限します。 この場合、有効な電子メール アドレスを持たないユーザー レコードを削除し、クリーニングされたテーブルを users_cleanedとして出力します。

  1. パイプライン資産ブラウザーで、[ プラス] アイコンをクリックし、[変換] を選択 します

  2. [ 新しい変換ファイルの作成 ] ダイアログで、次の項目を選択します。

    • 言語として Python または SQL を選択します。 これは、前の選択内容と一致する必要はありません。
    • ファイルに名前を付けます。 この場合は、[ users_cleaned] を選択します。
    • [宛先パス] では、既定値のままにします。
    • [データセットの種類] では、[なし] を選択したままにするか、[具体化されたビュー] を選択します。 具体化されたビューを選択すると、サンプル コードが生成されます。
  3. 新しいコード ファイルで、次に一致するようにコードを編集します (前の画面の選択内容に基づいて、SQL または Python を使用します)。 <pipeline-name>を、sample_users テーブルの完全な名前に置き換えます。

    SQL

    -- Drop all rows that do not have an email address
    
    CREATE MATERIALIZED VIEW users_cleaned
    (
      CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW
    ) AS
    SELECT *
    FROM sample_users_<pipeline-name>;
    

    Python

    from pyspark import pipelines as dp
    
    # Drop all rows that do not have an email address
    
    @dp.table
    @dp.expect_or_drop("no null emails", "email IS NOT NULL")
    def users_cleaned():
        return (
            spark.read.table("sample_users_<pipeline_name>")
        )
    
  4. [ パイプラインの実行 ] をクリックしてパイプラインを更新します。 これで 3 つのテーブルが作成されます。

手順 3: 上位のユーザーを分析する

次に、作成した予約数で上位 100 人のユーザーを取得します。 wanderbricks.bookings テーブルをusers_cleanedマテリアライズド ビューに結合します。

  1. パイプライン資産ブラウザーで、[ プラス] アイコンをクリックし、[変換] を選択 します

  2. [ 新しい変換ファイルの作成 ] ダイアログで、次の項目を選択します。

    • 言語として Python または SQL を選択します。 これは、以前の選択内容と一致する必要はありません。
    • ファイルに名前を付けます。 この場合は、[ users_and_bookings] を選択します。
    • [宛先パス] では、既定値のままにします。
    • [データセットの種類] では、[なし] を選択したままにします
  3. 新しいコード ファイルで、次に一致するようにコードを編集します (前の画面の選択内容に基づいて、SQL または Python を使用します)。

    SQL

    -- Get the top 100 users by number of bookings
    
    CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS
    SELECT u.name AS name, COUNT(b.booking_id) AS booking_count
    FROM users_cleaned u
    JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id
    GROUP BY u.name
    ORDER BY booking_count DESC
    LIMIT 100;
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import col, count, desc
    
    # Get the top 100 users by number of bookings
    
    @dp.table
    def users_and_bookings():
        return (
            spark.read.table("users_cleaned")
            .join(spark.read.table("samples.wanderbricks.bookings"), "user_id")
            .groupBy(col("name"))
            .agg(count("booking_id").alias("booking_count"))
            .orderBy(desc("booking_count"))
            .limit(100)
        )
    
  4. [ パイプラインの実行 ] をクリックしてデータセットを更新します。 実行が完了すると、 パイプライン グラフ に、新しい users_and_bookings テーブルを含む 4 つのテーブルがあることを確認できます。

    パイプライン内の 4 つのテーブルを示すパイプライン グラフ

次のステップ

これで、Lakeflow パイプライン エディターの機能の一部を使用してパイプラインを作成する方法を学習しました。詳細については、次の他の機能を参照してください。