次の方法で共有


レイクハウス チュートリアル: レイクハウスでデータを準備して変換する

このチュートリアルでは、Spark ランタイムでノートブックを使用して、Lakehouse 内の生データを変換および準備します。

前提条件

開始する前に、このシリーズの前のチュートリアルを完了する必要があります。

  1. レイクハウスを作成する
  2. レイクハウスにデータを取り込む

データを準備する

前のチュートリアルの手順では、ソースから lakehouse のファイルセクションに生データを取り込んでいます。 これで、そのデータを変換し、デルタ テーブルを作成するための準備を行うことができます。

  1. Lakehouse Tutorial Source Code フォルダーからノートブックをダウンロードします。

  2. ブラウザーで、 Fabric ポータルで Fabric ワークスペースに移動します。

  3. [>>。

    Fabric ポータルの [ノートブックのインポート] オプションを示すスクリーンショット。

  4. 画面の右側に表示される [インポート状態] ウィンドウから [アップロード] を選択します。

  5. このセクションの最初の手順でダウンロードしたすべてのノートブックを選択します。

    ダウンロードしたノートブックの場所と [開く] ボタンを示すスクリーンショット。

  6. [開く] を選択します。 インポートの状態を示す通知がブラウザー ウィンドウの右上隅に表示されます。

  7. インポートが成功したら、ワークスペースのアイテム ビューに移動して、新しくインポートされたノートブックを確認します。

    インポートされたノートブックの一覧と、レイクハウスを選択する場所を示すスクリーンショット。

Delta テーブルを作成する

このセクションでは、 01 - 差分テーブルの作成 ノートブックを開き、各セルを実行して生データから差分テーブルを作成します。 テーブルはスター スキーマに従います。これは、分析データを整理するための一般的なパターンです。

  • ファクト テーブル (fact_sale) には、ビジネスの測定可能なイベント (この場合は、数量、価格、利益を含む個々の販売トランザクション) が含まれています。
  • ディメンション テーブル (dimension_citydimension_customerdimension_datedimension_employeedimension_stock_item) には、販売の発生場所、作成者、タイミングなど、ファクトに関するコンテキストを示す説明的な属性が含まれています。
  1. wwilakehouse のレイクハウスを開くことで、次に開くノートブックとリンクされます。

  2. 上部のナビゲーション メニューで、[ ノートブックを開く>ノートブックの作成] を選択します。

    正常にインポートされたノートブックの一覧を示すスクリーンショット。

  3. [01 - 差分テーブルの作成] ノートブックを選択し、[開く] を選択します。 ノートブックは、レイクハウス エクスプローラーに示すように、開いているレイクハウスに既にリンクされています。

    注意

    次の手順では、ノートブック内の各セルを順番に実行します。 セルを実行するには、マウス ポインターを合わせるとセルの左側に表示される [実行 ] アイコンを選択するか、セルが選択されている間にキーボードで Shift キーを 押しながら Enter キーを押します。 または、上部のリボン ([ホーム] の下) で [すべて実行] を選択して、すべてのセルを順番に実行することもできます。

  4. セル 1 - Spark セッションの構成。 このセルを使用すると、後続のセルでのデータの書き込みと読み取り方法を最適化する 2 つの Fabric 機能が有効になります。 V オーダーでは 、読み取り速度が向上し、圧縮が向上するため、Parquet ファイルレイアウトが最適化されます。 書き込みを最適化 すると、書き込まれるファイルの数が減り、個々のファイル サイズが増加します。

    このセルを実行します。

    spark.conf.set("spark.sql.parquet.vorder.enabled", "true")
    spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
    spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "1073741824")
    

    コード セルと [実行] アイコンを含む Spark セッション構成画面のスクリーンショット。

    ヒント

    Spark プールまたはクラスターの詳細を指定する必要はありません。 Fabric では、すべてのワークスペースに Live Pool と呼ばれる既定の Spark プールが提供されます。 最初のセルを実行すると、ライブ プールは数秒で開始され、Spark セッションが確立されます。 後続のセルは、セッションがアクティブな間にほぼ瞬時に実行されます。

  5. セル 2 - ファクト - セール。 このセルは、前のチュートリアルで lakehouse に取り込まれた wwi-raw-data フォルダーから生の Parquet データを読み取ります。 日付パーツ列 (YearQuarterMonth) を追加し、データを YearQuarter でパーティション分割された Delta テーブルとして書き込みます。 パーティション分割では、データがサブディレクトリに整理されるため、これらの列でフィルター処理するときのクエリパフォーマンスが向上します。

    このセルを実行します。

    from pyspark.sql.functions import col, year, month, quarter
    
    table_name = 'fact_sale'
    
    df = spark.read.format("parquet").load('Files/wwi-raw-data/full/fact_sale_1y_full')
    df = df.withColumn('Year', year(col("InvoiceDateKey")))
    df = df.withColumn('Quarter', quarter(col("InvoiceDateKey")))
    df = df.withColumn('Month', month(col("InvoiceDateKey")))
    
    df.write.mode("overwrite").format("delta").partitionBy("Year","Quarter").save("Tables/" + table_name)
    
  6. セル 3 - 寸法。 このセルは、都市、顧客、日付など、ファクト テーブルの説明コンテキストを提供するディメンション テーブルを読み込みます。 特定のテーブル名の wwi-raw-data フォルダーから生の Parquet データを読み取り、未使用の Photo 列を削除し、各テーブルを Delta テーブルとして書き込む関数を定義します。 次に、5 つのディメンション テーブル (dimension_citydimension_customerdimension_datedimension_employeedimension_stock_item) をループして、それぞれにデルタ テーブルを作成します。

    このセルを実行します。

    from pyspark.sql.types import *
    def loadFullDataFromSource(table_name):
        df = spark.read.format("parquet").load('Files/wwi-raw-data/full/' + table_name)
        df = df.drop("Photo")
        df.write.mode("overwrite").format("delta").save("Tables/" + table_name)
    
    full_tables = [
        'dimension_city',
        'dimension_customer',
        'dimension_date',
        'dimension_employee',
        'dimension_stock_item'
        ]
    
    for table in full_tables:
        loadFullDataFromSource(table)
    
  7. 作成したテーブルを検証するには、エクスプローラーで wwilakehouse lakehouse を右クリックし、[最新の 情報に更新] を選択します。 テーブルが表示されます。

    Lakehouse エクスプローラーで作成したテーブルを検索する場所を示すスクリーンショット。

ビジネス集計のデータを変換する

このセクションでは、 02 - データ変換 - ビジネス集計 ノートブックを開き、各セルを実行して、前のセクションで作成した Delta テーブルから集計テーブルを作成します。

  1. wwilakehouse lakehouse を選択して再度開くことで、次に開くノートブックがリンクされます。

  2. 上部のナビゲーション メニューで、[ ノートブックを開く>ノートブックの作成] を選択します。 02 - データ変換 - ビジネス集計ノートブックを選択し、[開く] を選択します。

    このノートブックでは、2 つの異なるコーディング方法を使用して、2 つの集計テーブルを作成します。 すべてのセルを実行します。各方法では、異なるテーブルが作成されます。

    • アプローチ 1 では 、PySpark を使用して aggregate_sale_by_date_city テーブルを作成します。 この方法は、Python または PySpark の背景がある場合に適しています。
    • アプローチ 2 では 、Spark SQL を使用して aggregate_sale_by_date_employee テーブルを作成します。 SQL の背景がある場合は、この方法をお勧めします。

    次の手順では、前のセクションと同様に、ノートブック内の各セルを順番に実行します。

  3. セル 1 - Spark セッションの構成。 前のノートブックと同様に、このセルを使用すると、Spark セッションの V オーダーと書き込みの最適化が有効になります。

    このセルを実行します。

    spark.conf.set("spark.sql.parquet.vorder.enabled", "true")
    spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
    spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "1073741824")
    
  4. セル 2 - 方法 #1 - sale_by_date_city。 このセルは、 fact_saledimension_date、および dimension_city Delta テーブルを PySpark データフレームに読み込み、次のセルで結合および集計するためのデータを準備します。

    このセルを実行します。

    df_fact_sale = spark.read.table("wwilakehouse.fact_sale") 
    df_dimension_date = spark.read.table("wwilakehouse.dimension_date")
    df_dimension_city = spark.read.table("wwilakehouse.dimension_city")
    

    lakehouse で lakehouse スキーマを有効にした場合は、セルの内容を次のコードに置き換えて実行します。

    df_fact_sale = spark.read.format("delta").load("Tables/fact_sale")
    df_dimension_date = spark.read.format("delta").load("Tables/dimension_date")
    df_dimension_city = spark.read.format("delta").load("Tables/dimension_city")
    
  5. セル 3. このセルは、キー列の 3 つのデータフレームを結合し、日付、市区町村、および販売フィールドを選択し、日付と市区町村別の売上合計と利益をグループ化して合計します。 結果は aggregate_sale_by_date_city Delta テーブルとして書き込まれます。これは、地域別の販売実績をまとめたものです。

    このセルを実行します。

    sale_by_date_city = df_fact_sale.alias("sale") \
    .join(df_dimension_date.alias("date"), df_fact_sale.InvoiceDateKey == df_dimension_date.Date, "inner") \
    .join(df_dimension_city.alias("city"), df_fact_sale.CityKey == df_dimension_city.CityKey, "inner") \
    .select("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory", "sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")\
    .groupBy("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory")\
    .sum("sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")\
    .withColumnRenamed("sum(TotalExcludingTax)", "SumOfTotalExcludingTax")\
    .withColumnRenamed("sum(TaxAmount)", "SumOfTaxAmount")\
    .withColumnRenamed("sum(TotalIncludingTax)", "SumOfTotalIncludingTax")\
    .withColumnRenamed("sum(Profit)", "SumOfProfit")\
    .orderBy("date.Date", "city.StateProvince", "city.City")
    
    sale_by_date_city.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/aggregate_sale_by_date_city")
    
  6. セル 4 - 手法 2 - 日付別販売担当者。 このセルでは、Spark SQL を使用して、 sale_by_date_employeeという一時ビューを作成します。 クエリは、日付と従業員の列ごとにグループ化 fact_saledimension_date、および dimension_employeeを結合し、集計された売上合計と利益を計算し、従業員別の販売実績を集計します。

    このセルを実行します。

    %%sql
    CREATE OR REPLACE TEMPORARY VIEW sale_by_date_employee
    AS
    SELECT
           DD.Date, DD.CalendarMonthLabel
     , DD.Day, DD.ShortMonth Month, CalendarYear Year
          ,DE.PreferredName, DE.Employee
          ,SUM(FS.TotalExcludingTax) SumOfTotalExcludingTax
          ,SUM(FS.TaxAmount) SumOfTaxAmount
          ,SUM(FS.TotalIncludingTax) SumOfTotalIncludingTax
          ,SUM(Profit) SumOfProfit 
    FROM wwilakehouse.fact_sale FS
    INNER JOIN wwilakehouse.dimension_date DD ON FS.InvoiceDateKey = DD.Date
    INNER JOIN wwilakehouse.dimension_Employee DE ON FS.SalespersonKey = DE.EmployeeKey
    GROUP BY DD.Date, DD.CalendarMonthLabel, DD.Day, DD.ShortMonth, DD.CalendarYear, DE.PreferredName, DE.Employee
    ORDER BY DD.Date ASC, DE.PreferredName ASC, DE.Employee ASC
    

    lakehouse スキーマを有効にした場合は、セルの内容を次の Spark SQL コードに置き換えて実行します。

    %%sql
    CREATE OR REPLACE TEMPORARY VIEW sale_by_date_employee
    AS
    SELECT
           DD.Date, DD.CalendarMonthLabel
     , DD.Day, DD.ShortMonth Month, CalendarYear Year
          ,DE.PreferredName, DE.Employee
          ,SUM(FS.TotalExcludingTax) SumOfTotalExcludingTax
          ,SUM(FS.TaxAmount) SumOfTaxAmount
          ,SUM(FS.TotalIncludingTax) SumOfTotalIncludingTax
          ,SUM(Profit) SumOfProfit 
    FROM delta.`Tables/fact_sale` FS
    INNER JOIN delta.`Tables/dimension_date` DD ON FS.InvoiceDateKey = DD.Date
    INNER JOIN delta.`Tables/dimension_employee` DE ON FS.SalespersonKey = DE.EmployeeKey
    GROUP BY DD.Date, DD.CalendarMonthLabel, DD.Day, DD.ShortMonth, DD.CalendarYear, DE.PreferredName, DE.Employee
    ORDER BY DD.Date ASC, DE.PreferredName ASC, DE.Employee ASC
    
  7. セル 5. このセルは、前のセルで作成された sale_by_date_employee 一時ビューから読み取り、結果を aggregate_sale_by_date_employee Delta テーブルとして書き込みます。

    このセルを実行します。

    sale_by_date_employee = spark.sql("SELECT * FROM sale_by_date_employee")
    sale_by_date_employee.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/aggregate_sale_by_date_employee")
    
  8. 作成したテーブルを検証するには、エクスプローラーで wwilakehouse lakehouse を右クリックし、[最新の 情報に更新] を選択します。 集計テーブルが表示されます。

    新しいテーブルが表示される場所を示す Lakehouse エクスプローラーのスクリーンショット。

注意

このチュートリアルのデータは、Delta Lake ファイルとして記述されています。 Fabric の自動テーブル検出および登録機能によって、テーブルを検出しメタストアに登録します。 SQL で使用するテーブルを作成するために CREATE TABLE ステートメントを明示的に呼び出す必要はありません。

次のステップ