次の方法で共有


Databricks Connect for Scala のコード例

この記事では、Databricks Runtime 13.3 LTS 以降用の Databricks Connect について説明します。

この記事では、Databricks Connect for Scala を使用するコード例を紹介します。 Databricks Connect を使用すると、一般的な IDE、ノートブック サーバー、カスタム アプリケーションを Azure Databricks クラスターに接続できます。 「Databricks Connect とは」を参照してください。 この記事の Python バージョンについては、「Databricks Connect for Python のコード例」を参照してください。

Databricks Connect の使用を開始する前に、Databricks Connect クライアントを設定する必要があります。

次の例では、Databricks Connect クライアントセットアップに既定の認証を使用していることを前提としています。

例: テーブルの読み取り

この単純なコード例では、指定したテーブルに対してクエリを実行し、指定したテーブルの最初の 5 行を示します。

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession

object Main {
  def main(args: Array[String]): Unit = {
    val spark = DatabricksSession.builder().getOrCreate()
    val df = spark.read.table("samples.nyctaxi.trips")
    df.limit(5).show()
  }
}

DataFrame の作成

次のコード例を次に示します。

  1. メモリ内 DataFrame を作成します。
  2. zzz_demo_temps_table スキーマ内に default という名前のテーブルを作成します。 この名前のテーブルが既に存在する場合は、まずテーブルが削除されます。 異なるスキーマまたはテーブルを使用するには、spark.sqltemps.write.saveAsTable、または両方への呼び出しを調整します。
  3. DataFrame の内容をテーブルに保存します。
  4. テーブルの内容に対して SELECT クエリを実行します。
  5. クエリの結果を表示します。
  6. テーブルを削除します。
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import java.time.LocalDate

object Main {
  def main(args: Array[String]): Unit = {
    val spark = DatabricksSession.builder().getOrCreate()

    // Create a Spark DataFrame consisting of high and low temperatures
    // by airport code and date.
    val schema = StructType(
      Seq(
        StructField("AirportCode", StringType, false),
        StructField("Date", DateType, false),
        StructField("TempHighF", IntegerType, false),
        StructField("TempLowF", IntegerType, false)
      )
    )

    val data = Seq(
      ( "BLI", LocalDate.of(2021, 4, 3), 52, 43 ),
      ( "BLI", LocalDate.of(2021, 4, 2), 50, 38),
      ( "BLI", LocalDate.of(2021, 4, 1), 52, 41),
      ( "PDX", LocalDate.of(2021, 4, 3), 64, 45),
      ( "PDX", LocalDate.of(2021, 4, 2), 61, 41),
      ( "PDX", LocalDate.of(2021, 4, 1), 66, 39),
      ( "SEA", LocalDate.of(2021, 4, 3), 57, 43),
      ( "SEA", LocalDate.of(2021, 4, 2), 54, 39),
      ( "SEA", LocalDate.of(2021, 4, 1), 56, 41)
    )

    val temps = spark.createDataFrame(data).toDF(schema.fieldNames: _*)

    // Create a table on the Databricks cluster and then fill
    // the table with the DataFrame 's contents.
    // If the table already exists from a previous run,
    // delete it first.
    spark.sql("USE default")
    spark.sql("DROP TABLE IF EXISTS zzz_demo_temps_table")
    temps.write.saveAsTable("zzz_demo_temps_table")

    // Query the table on the Databricks cluster, returning rows
    // where the airport code is not BLI and the date is later
    // than 2021-04-01.Group the results and order by high
    // temperature in descending order.
    val df_temps = spark.sql("SELECT * FROM zzz_demo_temps_table " +
      "WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
      "GROUP BY AirportCode, Date, TempHighF, TempLowF " +
      "ORDER BY TempHighF DESC")
    df_temps.show()

    // Results:
    // +------------+-----------+---------+--------+
    // | AirportCode|       Date|TempHighF|TempLowF|
    // +------------+-----------+---------+--------+
    // |        PDX | 2021-04-03|      64 |     45 |
    // |        PDX | 2021-04-02|      61 |     41 |
    // |        SEA | 2021-04-03|      57 |     43 |
    // |        SEA | 2021-04-02|      54 |     39 |
    // +------------+-----------+---------+--------+

    // Clean up by deleting the table from the Databricks cluster.
    spark.sql("DROP TABLE zzz_demo_temps_table")
  }
}

例: DatabricksSession または SparkSession を使用する

次の例では、Databricks Connect の SparkSession クラスが使用できない場合に、DatabricksSession クラスを使用する方法について説明します。

この例では、指定されたテーブルにクエリを実行し、最初の 5 行を返します。 この例では、認証に SPARK_REMOTE 環境変数を使用します。

import org.apache.spark.sql.{DataFrame, SparkSession}

object Main {
  def main(args: Array[String]): Unit = {
    getTaxis(getSpark()).show(5)
  }

  private def getSpark(): SparkSession = {
    SparkSession.builder().getOrCreate()
  }

  private def getTaxis(spark: SparkSession): DataFrame = {
    spark.read.table("samples.nyctaxi.trips")
  }
}

その他のリソース

Databricks には、Databricks Connect GitHub リポジトリで Databricks Connect を使用する方法を示す、次のような追加のサンプル アプリケーションが用意されています。