Databricks では、Databricks SQL を使用してデータを取り込むために、ストリーミング テーブルを使用することをお勧めします。 ストリーミング テーブルとは、ストリーミングまたは増分データ処理を追加でサポートする Unity Catalog に登録されたテーブルのことです。 ストリーミング テーブルごとにパイプラインが自動的に作成されます。 ストリーミング テーブルを使用して、Kafka とクラウド オブジェクト ストレージから増分データ読み込みを行うことができます。
注
Delta Lake テーブルをストリーミング ソースおよびシンクとして使用する方法については、「Delta テーブルのストリーミング読み取りと書き込み」を参照してください。
Requirements
ストリーミング テーブルを使用するには、次の要件を満たす必要があります。
ワークスペースの要件:
Databricks SQL で作成されたストリーミング テーブルは、サーバーレス パイプラインによってサポートされます。 この機能を使用するには、ワークスペースでサーバーレス パイプラインをサポートする必要があります。
- サーバーレスが有効になっている Azure Databricks アカウント。 詳細については、「サーバーレス SQL ウェアハウスを有効にする」を参照してください。
- Unity Catalog が有効になっているワークスペース。 詳細については、「 Unity カタログの概要」を参照してください。
コンピューティングの要件:
次のいずれかを使用する必要があります。
-
Currentチャネルを使用する SQL ウェアハウス。 - Databricks Runtime 13.3 LTS 以降の標準アクセス モード (以前の共有アクセス モード) を使用したコンピューティング。
Databricks Runtime 15.4 LTS 以降の専用アクセス モード (以前のシングル ユーザー アクセス モード) を使用したコンピューティング。
Databricks Runtime 15.3 以下では、 他のユーザーが所有するストリーミング テーブルに対して専用のコンピューティングを使用してクエリを実行することはできません。 ストリーミング テーブルを所有している場合にのみ、Databricks Runtime 15.3 以下で専用コンピューティングを使用できます。 テーブルの作成者が所有者です。
Databricks Runtime 15.4 LTS 以降では、テーブル所有者でない場合でも、専用コンピューティングでパイプラインによって生成されたテーブルのクエリがサポートされます。 専用コンピューティングを使用してデータ フィルター処理操作を実行すると、サーバーレス コンピューティング リソースに対して課金される可能性があります。 専用コンピューティングでのきめ細かなアクセス制御を参照してください。
アクセス許可の要件:
-
USE CATALOGおよびUSE SCHEMAの特権は、ストリーミング テーブルを作成するカタログとスキーマ上にあります。 - ストリーミング テーブルを作成するスキーマでの
CREATE TABLE特権。 - ストリーミング テーブルのソース データを提供するテーブルまたは場所にアクセスするための特権。
ストリーミング テーブルを作成する
ストリーミング テーブルは、Databricks SQL の SQL クエリによって定義されます。 ストリーミング テーブルを作成すると、現在ソース テーブル内のデータを使用してストリーミング テーブルが作成されます。 その後、通常はスケジュールに従ってテーブルを更新し、ソース テーブルに追加されたデータをプルしてストリーミング テーブルに追加します。
ストリーミング テーブルを作成すると、テーブルの所有者と見なされます。
既存のテーブルからストリーミング テーブルを作成するには、次の例のように、CREATE STREAMING TABLE ステートメントを使用します。
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT product, price FROM STREAM raw_data;
この場合、ストリーミング テーブル sales は、 raw_data テーブルの特定の列から作成され、1 時間ごとに更新するスケジュールが設定されます。 使用するクエリは ストリーミング クエリである必要があります。 ストリーミング セマンティクスを使用してソースから読み取る場合は、 STREAM キーワードを使用します。
CREATE OR REFRESH STREAMING TABLE ステートメントを使用してストリーミング テーブルを作成すると、最初のデータ更新と作成が直ちに開始されます。 これらの操作では、DBSQL ウェアハウス コンピューティングは使用されません。 代わりに、ストリーミング テーブルは、作成と更新の両方でサーバーレス パイプラインに依存します。 専用サーバーレス パイプラインは、ストリーミング テーブルごとにシステムによって自動的に作成および管理されます。
自動ローダーを使用してファイルを読み込む
ボリューム内のファイルからストリーミング テーブルを作成するには、自動ローダーを使用します。 クラウド オブジェクト ストレージからのほとんどのデータ インジェスト タスクには、自動ローダーを使用します。 自動ローダーとパイプラインは、クラウドストレージに到着した増加し続けるデータを、段階的かつべき等に読み込むように設計されています。
Databricks SQL で自動ローダーを使用するには、 read_files 関数を使用します。 次の例は、自動ローダーを使用して JSON ファイルのボリュームをストリーミング テーブルに読み取る方法を示しています。
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/my_schema/my_volume/path/to/data",
format => "json"
);
クラウド ストレージからデータを読み取るために、自動ローダーを使用することもできます。
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM read_files(
'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
format => "json"
);
自動ローダーの詳細については、「 自動ローダーとは」を参照してください。 SQL での自動ローダーの使用の詳細と例については、「 オブジェクト ストレージからデータを読み込む」を参照してください。
他のソースからのストリーミング取り込み
Kafka などの他のソースからのインジェストの例については、「 パイプラインでのデータの読み込み」を参照してください。
新しいデータのみを取り込む
既定では、read_files 関数は、テーブルの作成時にソース ディレクトリ内のすべての既存のデータを読み取り、更新のたびに新しく到着したレコードを処理します。
テーブル作成時にソース ディレクトリに既に存在するデータが取り込まれるのを回避するには、includeExistingFiles オプションを falseに設定します。 つまり、テーブルの作成後にディレクトリに到着したデータのみが処理されます。 例えば次が挙げられます。
CREATE OR REFRESH STREAMING TABLE sales
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM read_files(
'/path/to/files',
includeExistingFiles => false
);
ランタイム チャネルを設定する
SQL ウェアハウスを使用して作成されたストリーミング テーブルは、パイプラインを使用して自動的に更新されます。 パイプラインでは、既定で current チャネルのランタイムが使用されます。 リリース プロセスについては、 Lakeflow Spark 宣言パイプラインのリリース ノートとリリース アップグレード プロセス を参照してください。
Databricks では、運用ワークロードに current チャネルを使用することをお勧めします。 新機能は、最初に preview チャネルにリリースされます。 テーブル プロパティとして preview を指定することで、パイプラインをプレビュー チャネルに設定して新機能をテストできます。 このプロパティは、テーブルを作成するとき、または ALTER ステートメントを使用してテーブルを作成した後に指定できます。
次のコード例は、CREATE ステートメントでチャネルをプレビューに設定する方法を示しています。
CREATE OR REFRESH STREAMING TABLE sales
TBLPROPERTIES ('pipelines.channel' = 'preview')
SCHEDULE EVERY 1 hour
AS SELECT *
FROM STREAM raw_data;
ストリーミング テーブルの更新をスケジュールする
定義されたスケジュールに基づいて自動的に更新されるように、またはアップストリーム データが変更されたときにトリガーされるように、Databricks SQL ストリーミング テーブルを構成できます。
スケジュールまたはトリガーを設定するには、次のいずれかの操作を行います。
-
SCHEDULEするときに、句を使用してスケジュールを構成します。 - テーブルを作成するときに、
TRIGGER ON UPDATE句を使用してトリガーを構成します。 - ALTER STREAMING TABLE ステートメントを使用して、スケジュールまたはトリガーを追加または変更します。
注
または、 CREATE OR REFRESH STREAMING TABLE または REFRESH ステートメントのいずれかを含むタスクをジョブに作成し、他のジョブと同様に調整します。
Lakeflow ジョブを参照してください。
スケジュールを作成すると、更新を処理する新しいジョブが Azure Databricks によって自動的に作成されます。
スケジュールを表示するには、次のいずれかを実行します。
- Azure Databricks UI で SQL エディターから
DESCRIBE EXTENDEDステートメントを実行します。 DESCRIBE TABLEを参照してください。 - カタログ エクスプローラーを使用して、ストリーミング テーブルを表示します。 スケジュールは [概要] タブの 更新状態 に表示されます。 「カタログ エクスプローラーとは」を参照してください。
更新スケジュールを使用しても、更新されたデータが必要な場合は、いつでも手動更新を実行できます。
機密データを非表示にする
ストリーミング テーブルを使用すると、テーブルにアクセスするユーザーから機密データを非表示にすることができます。 1 つの方法は、機密性の高い列または行を完全に除外するようにクエリを定義することです。 または、クエリを実行するユーザーのアクセス許可に基づいて列マスクまたは行フィルターを適用することもできます。 たとえば、グループ tax_idに含まれていないユーザーのHumanResourcesDept列を非表示にすることができます。 これを行うには、ストリーミング テーブルの作成時に ROW FILTER 構文と MASK 構文を使用します。 詳細については、「 行フィルターと列マスク」を参照してください。
ストリーミング テーブルを更新する
ストリーミング テーブルを作成すると、更新を自動的にスケジュールできます。 ストリーミング テーブルを手動で更新することもできます。 スケジュールされた更新がある場合でも、いつでも手動更新を呼び出すことができます。 更新は、ストリーミング テーブルと共に自動的に作成されたのと同じパイプラインによって処理されます。
ストリーミング テーブルを更新するには:
REFRESH STREAMING TABLE sales;
DESCRIBE TABLE EXTENDED を使用して、最新の更新の状態を確認できます。
注
タイム トラベル クエリを使用する前に、ストリーミング テーブルの更新が必要になる場合があります。
更新のしくみ
ストリーミング テーブルの更新では、前回の更新以降に到着した新しい行のみが評価され、新しいデータのみが追加されます。
各更新では、ストリーミング テーブルの現在の定義を使用して、この新しいデータを処理します。 ストリーミング テーブル定義を変更しても、既存のデータは自動的に再計算されません。 変更が既存のデータと互換性がない場合 (データ型の変更など)、次の更新はエラーで失敗します。
次の例では、ストリーミング テーブル定義の変更が更新動作にどのように影響するかを説明します。
- フィルターを削除すると、以前にフィルター処理された行は再処理されません。
- 列プロジェクションを変更しても、既存のデータの処理方法には影響しません。
- 静的スナップショットとの結合では、初期処理時にスナップショットの状態が使用されます。 更新されたスナップショットと一致する遅延到着データは無視されます。 これにより、ディメンションが遅れた場合にファクトが欠落する可能性があります。
- 既存の列の CAST を変更すると、エラーが発生します。
既存のストリーミング テーブルでサポートできない方法でデータが変更された場合は、完全な更新を実行できます。
ストリーミング テーブルを完全に更新する
完全更新では、最新の定義を使用して、ソースで使用可能なすべてのデータが再処理されます。 完全更新では既存のデータが切り捨てられるため、データの履歴全体を保持しないソースや、Kafka などの短い保持期間を持つソースに対して完全な更新を呼び出すことはできません。 ソースでデータが使用できなくなった場合、古いデータを回復できないことがあります。
例えば次が挙げられます。
REFRESH STREAMING TABLE sales FULL;
ストリーミング テーブルのスケジュールを変更する
定義されたスケジュールに基づいて自動的に更新されるように、またはアップストリーム データが変更されたときにトリガーされるように、Databricks SQL ストリーミング テーブルを構成できます。 次の表は、更新をスケジュールするためのさまざまなオプションを示しています。
| メソッド | Description | 使用例 |
|---|---|---|
| 手動 | SQL REFRESH ステートメントまたはワークスペース UI を使用したオンデマンド更新。 |
開発、テスト、アドホック更新。 |
TRIGGER ON UPDATE |
アップストリーム データが変更されたときに自動的に更新するストリーミング テーブルを定義します。 | データの鮮度 SLA または予測できない更新期間を持つ運用ワークロード。 |
SCHEDULE |
定義された時間間隔で更新するストリーミング テーブルを定義します。 | 予測可能な時間ベースの更新要件。 |
| ジョブ内の SQL タスク | 更新は 、Lakeflow ジョブを使用して調整されます。 | システム間の依存関係を持つ複雑なパイプライン。 |
手動更新
ストリーミング テーブルを手動で更新するには、Databricks SQL から更新を呼び出すか、ワークスペース UI を使用します。
REFRESH ステートメント
Databricks SQL を使用してパイプラインを更新するには:
SQL エディターで、次のステートメントを実行します。
REFRESH MATERIALIZED VIEW <table-name>;
詳細については、 REFRESH (MATERIALIZED VIEW または STREAMING TABLE) を参照してください。
ワークスペース UI
ワークスペース UI でパイプラインを更新するには:
- Azure Databricks ワークスペースで、[ワークフロー]
をクリックします。ジョブとパイプライン。
- 一覧から更新するパイプラインを選択します。
- [スタート] ボタンをクリックします。
パイプラインが更新されると、UI に更新が表示されます。
更新時にトリガーする
TRIGGER ON UPDATE句は、アップストリーム のソース データが変更されると、ストリーミング テーブルを自動的に更新します。 これにより、パイプライン間でスケジュールを調整する必要がなくなります。 ストリーミング テーブルは、アップストリーム ジョブがいつ完了するか、複雑なスケジュール ロジックを維持するかをユーザーに知らなくても、最新の状態を維持します。
これは、特にアップストリームの依存関係が予測可能なスケジュールで実行されない場合に、運用環境のワークロードに推奨されるアプローチです。 構成が完了すると、ストリーミング テーブルはそのソース テーブルを監視し、アップストリーム ソースの変更が検出されると自動的に更新されます。
制限事項
- アップストリームの依存関係の制限: ストリーミング テーブルでは、最大 10 個のアップストリーム テーブルと 30 個のアップストリーム ビューを監視できます。 依存関係を増やすには、ロジックを複数のストリーミング テーブルに分割します。
- ワークスペースの制限: ワークスペースごとに最大 1,000 個のストリーミング テーブルと
TRIGGER ON UPDATEが存在できます。 1,000 を超えるストリーミング テーブルが必要な場合は、Databricks サポートにお問い合わせください。 - 最小間隔: 最小トリガー間隔は 1 分です。
次の例は、ストリーミング テーブルを定義するときに更新時にトリガーを設定する方法を示しています。
更新時にトリガーを使用してストリーミング テーブルを作成する
ソース データの変更時に自動的に更新されるストリーミング テーブルを作成するには、TRIGGER ON UPDATE ステートメントに CREATE STREAMING TABLE 句を含めます。
次の例では、ソース orders テーブルが更新されるたびに顧客の注文を読み取り、更新するストリーミング テーブルを作成します。
CREATE OR REFRESH STREAMING TABLE catalog.schema.customer_orders
TRIGGER ON UPDATE
AS SELECT
o.customer_id,
o.name,
o.order_id
FROM catalog.schema.orders o;
スロットルの更新頻度
アップストリーム データが頻繁に更新される場合は、 AT MOST EVERY を使用してビューの更新頻度を制限し、コンピューティング コストを制限します。 これは、ソース テーブルが頻繁に更新されるのにダウンストリーム コンシューマーがリアルタイム データを必要としない場合に便利です。
INTERVALキーワードは、時刻値の前に必要です。
次の例では、ソース データの変更頻度が高い場合でも、ストリーミング テーブルを最大 5 分ごとに更新するように制限します。
CREATE OR REFRESH STREAMING TABLE catalog.schema.customer_orders
TRIGGER ON UPDATE AT MOST EVERY INTERVAL 5 MINUTES
AS SELECT
o.customer_id,
o.name,
o.order_id
FROM catalog.schema.orders o;
スケジュールされた更新
更新スケジュールは、ストリーミング テーブル定義で直接定義して、一定の時間間隔でビューを更新できます。 この方法は、データ更新の周期がわかっており、予測可能な更新タイミングが必要な場合に便利です。
更新スケジュールがある場合でも、更新されたデータが必要な場合は、いつでも手動更新を実行できます。
Databricks では、単純な間隔の SCHEDULE EVERY と正確なスケジュール設定のための SCHEDULE CRON という 2 つのスケジュール構文がサポートされています。
SCHEDULEキーワードとSCHEDULE REFRESHキーワードは意味的に同等です。
SCHEDULE句の構文と使用方法の詳細については、SCHEDULE 句CREATE STREAMING TABLE参照してください。
スケジュールが作成されると、更新を処理するように新しい Databricks ジョブが自動的に構成されます。
スケジュールを表示するには、次のいずれかを実行します。
- Azure Databricks UI で SQL エディターから
DESCRIBE EXTENDEDステートメントを実行します。 DESCRIBE TABLEを参照してください。 - カタログ エクスプローラーを使用して、ストリーミング テーブルを表示します。 スケジュールは [概要] タブの 更新状態 に表示されます。 「カタログ エクスプローラーとは」を参照してください。
次の例は、スケジュールを使用してストリーミング テーブルを作成する方法を示しています。
時間間隔ごとにスケジュールを設定する
この例では、5 分ごとに更新をスケジュールします。
CREATE OR REFRESH STREAMING TABLE catalog.schema.hourly_metrics
SCHEDULE EVERY 5 MINUTES
AS SELECT
event_id,
event_time,
event_type
FROM catalog.schema.raw_events;
cron を使用してスケジュールを設定する
次の使用例は、UTC タイム ゾーンの四半期に 15 分ごとに更新をスケジュールします。
CREATE OR REFRESH STREAMING TABLE catalog.schema.hourly_metrics
SCHEDULE CRON '0 */15 * * * ?' AT TIME ZONE 'UTC'
AS SELECT
event_id,
event_time,
event_type
FROM catalog.schema.raw_events;
ジョブ内の SQL タスク
ストリーミング テーブルの更新は、 REFRESH STREAMING TABLE コマンドを含む SQL タスクを作成することで、Databricks ジョブを通じて調整できます。 この方法では、ストリーミング テーブルの更新を既存のジョブ オーケストレーション ワークフローに統合します。
ストリーミング テーブルを更新するためのジョブを作成するには、次の 2 つの方法があります。
-
SQL エディターから:
REFRESH STREAMING TABLEコマンドを記述し、[ スケジュール ] ボタンをクリックして、クエリから直接ジョブを作成します。 -
ジョブ UI から: 新しいジョブを作成し、SQL タスクの種類を追加し、
REFRESH STREAMING TABLEコマンドを使用して SQL クエリまたはノートブックをアタッチします。
次の例は、ストリーミング テーブルを更新する SQL タスク内の SQL ステートメントを示しています。
REFRESH STREAMING TABLE catalog.schema.sales;
この方法は、次の場合に適しています。
- 複雑なマルチステップ パイプラインには、システム間の依存関係があります。
- 既存のジョブ オーケストレーションとの統合が必要です。
- ジョブ レベルのアラートと監視が必要です。
SQL タスクでは、ジョブにアタッチされた SQL ウェアハウスと、更新を実行するサーバーレス コンピューティングの両方が使用されます。 ストリーミング テーブル定義ベースのスケジュール設定を使用して要件を満たしている場合は、 TRIGGER ON UPDATE または SCHEDULE に切り替えると、ワークフローが簡略化されます。
既存のストリーミング テーブルにスケジュールを追加する
作成後にスケジュールを設定するには、 ALTER STREAMING TABLE ステートメントを使用します。
-- Alters the schedule to refresh the streaming table when its upstream
-- data gets updated.
ALTER STREAMING TABLE sales
ADD TRIGGER ON UPDATE;
既存のスケジュールまたはトリガーを変更する
ストリーミング テーブルに既にスケジュールまたはトリガーが関連付けられている場合は、 ALTER SCHEDULE または ALTER TRIGGER ON UPDATE を使用して更新構成を変更します。 これは、あるスケジュールから別のスケジュールに変更するか、あるトリガーから別のトリガーに変更するか、スケジュールとトリガーを切り替えるかに関係なく適用されます。
次の例では、5 分ごとに更新するように既存のスケジュールを変更します。
ALTER STREAMING TABLE catalog.schema.my_table
ALTER SCHEDULE EVERY 5 MINUTES;
スケジュールまたはトリガーを削除する
スケジュールを削除するには、 ALTER ... DROPを使用します。
ALTER STREAMING TABLE catalog.schema.my_table
DROP SCHEDULE;
更新の状態を追跡する
ストリーミング テーブルの更新の状態を表示するには、パイプライン UI でストリーミング テーブルを管理するパイプラインを表示するか、ストリーミング テーブルの コマンドによって返されるDESCRIBE EXTENDEDを表示します。
DESCRIBE TABLE EXTENDED <table-name>;
または、カタログ エクスプローラーでストリーミング テーブルを表示し、そこで更新の状態を確認できます。
- [
サイドバーのカタログ。
- 左側のカタログ エクスプローラー ツリーでカタログを開き、ストリーミング テーブルがあるスキーマを選択します。
- 選択したスキーマの下にある [テーブル ] 項目を開き、ストリーミング テーブルをクリックします。
ここから、ストリーミング テーブル名の下のタブを使用して、次のようなストリーミング テーブルに関する情報を表示および編集できます。
- 更新の状態と履歴
- テーブル スキーマ
- サンプル データ (アクティブなコンピューティングが必要)
- Permissions
- 系列 (このストリーミング テーブルが依存するテーブルとパイプラインを含む)
- 使用状況に関する分析情報
- このストリーミング テーブル用に作成したモニター
更新のタイムアウト
ストリーミング テーブルの更新は、実行できる時間を制限するタイムアウトで実行されます。 2025 年 8 月 14 日以降に作成または更新されたストリーミング テーブルの場合、 CREATE OR REFRESHを実行して更新するとタイムアウトがキャプチャされます。
-
STATEMENT_TIMEOUTが設定されている場合は、その値が使用されます。 STATEMENT_TIMEOUTを参照してください。 - それ以外の場合は、コマンドの実行に使用された SQL ウェアハウスからのタイムアウトが使用されます。
- ウェアハウスにタイムアウトが構成されていない場合は、既定値の 2 日が適用されます。
タイムアウトは、最初の作成だけでなく、その後のスケジュールされた更新でも使用されます。
2025 年 8 月 14 日より前に最後に更新されたストリーミング テーブルの場合、タイムアウトは 2 日に設定されます。
例: ストリーミング テーブルの更新のタイムアウトを設定する テーブルの作成時または更新時にステートメント レベルのタイムアウトを設定することで、ストリーミング テーブルの更新の実行を許可する期間を明示的に制御できます。
SET STATEMENT_TIMEOUT = '6h';
CREATE OR REFRESH STREAMING TABLE my_catalog.my_schema.my_st
SCHEDULE EVERY 12 HOURS
AS SELECT * FROM large_source_table;
これにより、ストリーミング テーブルが 12 時間ごとに更新されるように設定され、更新に 6 時間以上かかる場合はタイムアウトになり、次回のスケジュールされた更新が待機されます。
スケジュールされた更新でタイムアウトを処理する方法
タイムアウトは、 CREATE OR REFRESHを明示的に実行した場合にのみ同期されます。
- スケジュールされた更新では、最新の
CREATE OR REFRESH中にキャプチャされたタイムアウトが引き続き使用されます。 - ウェアハウスタイムアウトのみを変更しても、既存のスケジュールされた更新には影響しません。
Important
ウェアハウスのタイムアウトを変更した後、 CREATE OR REFRESH をもう一度実行して、新しいタイムアウトを今後のスケジュールされた更新に適用します。
ストリーミング テーブルへのアクセスを制御する
ストリーミング テーブルでは、プライベート データの公開を回避しながら、データ共有をサポートする豊富なアクセス制御がサポートされています。 ストリーミング テーブルの所有者または MANAGE 権限を持つユーザーは、他のユーザーに SELECT 権限を付与できます。 ストリーミング テーブルへの SELECT アクセス権を持つユーザーは、ストリーミング テーブルによって参照されるテーブルに SELECT アクセスする必要はありません。 このアクセス制御により、基になるデータへのアクセスを制御しながらデータ共有が可能になります。
ストリーミング テーブルの所有者を変更することもできます。
ストリーミング テーブルに権限を付与する
ストリーミング テーブルへのアクセスを許可するには、 GRANT ステートメントを使用します。
GRANT <privilege_type> ON <st_name> TO <principal>;
privilege_typeは次のようになります。
-
SELECT- ユーザーはストリーミング テーブルをSELECTできます。 -
REFRESH- ユーザーはストリーミング テーブルをREFRESHできます。 更新は、所有者のアクセス許可を使用して実行されます。
次の例では、ストリーミング テーブルを作成し、選択権限と更新権限をユーザーに付与します。
CREATE MATERIALIZED VIEW st_name AS SELECT * FROM source_table;
-- Grant read-only access:
GRANT SELECT ON st_name TO read_only_user;
-- Grand read and refresh access:
GRANT SELECT ON st_name TO refresh_user;
GRANT REFRESH ON st_name TO refresh_user;
Unity Catalog のセキュリティ保護可能なオブジェクトの付与に関する詳細については、「Unity カタログの特権とセキュリティ保護可能なオブジェクト」を参照してください。
ストリーミング テーブルから権限を取り消す
ストリーミング テーブルからのアクセスを取り消すには、 REVOKE ステートメントを使用します。
REVOKE privilege_type ON <st_name> FROM principal;
ソース テーブルに対する SELECT 権限がストリーミング テーブルの所有者、またはストリーミング テーブルに対する MANAGE 権限または SELECT 権限を付与されている他のユーザーから取り消された場合、またはソース テーブルが削除された場合でも、ストリーミング テーブルの所有者またはアクセス権を付与されたユーザーは、ストリーミング テーブルに対してクエリを実行できます。 ただし、次の動作が発生します。
- ストリーミング テーブルの所有者またはストリーミング テーブルへのアクセスを失った他のユーザーは、そのストリーミング テーブルを
REFRESHできなくなり、ストリーミング テーブルは古くなります。 - スケジュールが自動化されている場合、次にスケジュールされた
REFRESHは失敗するか、実行されません。
次の例では、SELECT から read_only_user 権限を取り消します。
REVOKE SELECT ON st_name FROM read_only_user;
ストリーミング テーブルの所有者を変更する
Databricks SQL で定義されているストリーミング テーブルに対する MANAGE アクセス許可を持つユーザーは、カタログ エクスプローラーを使用して新しい所有者を設定できます。 新しい所有者は、自身またはサービス プリンシパル ユーザー ロールを持つ サービス プリンシパル にすることができます。
Azure Databricks ワークスペースで、[データ] アイコンをクリック
カタログ エクスプローラーを開きます。
更新するストリーミング テーブルを選択します。
右側のサイドバーの [ このストリーミング テーブルについて] で、[ 所有者] を見つけて、[
をクリックします。編集します。
注
パイプライン設定で [ユーザーとして実行 ] を変更して所有者を更新するように指示するメッセージが表示された場合、ストリーミング テーブルは Databricks SQL ではなく Lakeflow Spark 宣言パイプラインで定義されます。 メッセージには、パイプライン設定へのリンクが含まれています。ここで、[ ユーザーとして実行 ] を変更できます。
ストリーミング テーブルの新しい所有者を選択します。
所有者は、所有するストリーミング テーブルに対する
MANAGEおよびSELECT権限を自動的に持ちます。 自分が所有するストリーミング テーブルの所有者としてサービス プリンシパルを設定していて、ストリーミング テーブルに対するSELECTまたはMANAGE権限を明示的に持っていない場合、この変更により、ストリーミング テーブルへのすべてのアクセスが失われます。 この場合、これらの特権を明示的に指定するように求められます。[管理の権限を付与]と[SELECTの権限を付与]の両方を選択して「保存」にその権限を与えます。
[ 保存] を クリックして所有者を変更します。
ストリーミング テーブルの所有者が更新されます。 今後のすべての更新は、新しい所有者の ID を使用して実行されます。
所有者がソース テーブルに対する権限を失った場合
所有者を変更しても、新しい所有者がソース テーブルにアクセスできない場合 (または、基になるソース テーブルに対する SELECT 権限が取り消された場合)、ユーザーはストリーミング テーブルに対してクエリを実行できます。 しかし:
- ストリーミング テーブルを
REFRESHできません。 - ストリーミング テーブルの次回のスケジュールされた更新は失敗します。
ソース データへのアクセスを失っても更新はできなくなりますが、既存のストリーミング テーブルが読み取られるのをすぐに無効にすることはできません。
ストリーミング テーブルからレコードを完全に削除する
Important
ストリーミング テーブルでの REORG ステートメントのサポートは 、パブリック プレビュー段階にあります。
注
- ストリーミング テーブルで
REORGステートメントを使用するには、Databricks Runtime 15.4 以降が必要です。 -
REORGステートメントは任意のストリーミング テーブルで使用できますが、削除ベクターが有効になっているストリーミング テーブルからレコードを削除する場合にのみ必要です。 このコマンドは、削除ベクトルが有効になっていないストリーミング テーブルで使用しても効果はありません。
GDPR 準拠など、削除ベクトルが有効になっているストリーミング テーブルの基になるストレージからレコードを物理的に削除するには、ストリーミング テーブルのデータに対して VACUUM 操作を確実に実行するための追加の手順を実行する必要があります。
基になるストレージからレコードを物理的に削除するには:
- レコードを更新するか、ストリーミング テーブルからレコードを削除します。
-
REORGパラメーターを指定して、ストリーミング テーブルに対してAPPLY (PURGE)ステートメントを実行します。 たとえば、「REORG TABLE <streaming-table-name> APPLY (PURGE);」のように指定します。 - ストリーミング テーブルのデータ保有期間が経過するまで待ちます。 既定のデータ保持期間は 7 日間ですが、
delta.deletedFileRetentionDurationテーブル プロパティを使用して構成できます。 「タイム トラベル クエリのデータ保持を構成する」を参照してください。 -
REFRESHストリーミング テーブル。 ストリーミングテーブルを更新するを参照してください。REFRESH操作から 24 時間以内に、レコードが完全に削除されるようにするために必要なVACUUM操作を含むパイプライン メンテナンス タスクが自動的に実行されます。
クエリ履歴を使用して実行を監視する
クエリ履歴ページを使用すると、クエリの詳細とクエリ プロファイルにアクセスできます。これは、ストリーミング テーブルの更新の実行に使用されるパイプラインでパフォーマンスの低いクエリやボトルネックを特定するのに役立ちます。 クエリ履歴とクエリ プロファイルで使用できる情報の種類の概要については、「 Query 履歴 および Query プロファイルを参照してください。
Important
この機能は パブリック プレビュー段階です。 ワークスペース管理者は、[ プレビュー] ページからこの機能へのアクセスを制御できます。 Azure Databricks プレビューの管理を参照してください。
ストリーミング テーブルに関連するすべてのステートメントがクエリ履歴に表示されます。
Statement ドロップダウン フィルターを使用して、任意のコマンドを選択し、関連するクエリを調べることができます。 すべての CREATE ステートメントの後に、パイプラインで非同期的に実行される REFRESH ステートメントが続きます。
REFRESHステートメントには、通常、パフォーマンスの最適化に関する分析情報を提供する詳細なクエリ プランが含まれます。
クエリ履歴 UI REFRESH ステートメントにアクセスするには、次の手順に従います。
- [
をクリックします。左側のサイドバーで 、[クエリ履歴 ] UI を開きます。
- [REFRESH] ドロップダウン フィルターから [] チェック ボックスをオンにします。
- クエリ ステートメントの名前をクリックすると、クエリの期間や集計されたメトリックなどの概要の詳細が表示されます。
- クエリ プロファイルを表示をクリックして、クエリ プロファイルを開きます。 クエリ プロファイルの移動の詳細については、 Query プロファイル を参照してください。
- 必要に応じて、 Query Source セクションのリンクを使用して、関連するクエリまたはパイプラインを開くことができます。
SQL エディターのリンクを使用するか、SQL ウェアハウスに接続されているノートブックから、クエリの詳細にアクセスすることもできます。
外部クライアントからストリーミング テーブルにアクセスする
オープン API をサポートしていない外部の Delta Lake または Iceberg クライアントからストリーミング テーブルにアクセスするには、 互換モードを使用できます。 互換モードでは、Delta Lake または Iceberg クライアントからアクセスできる読み取り専用バージョンのストリーミング テーブルが作成されます。