重要
このコネクタは、Microsoft Fabric のリアルタイム インテリジェンスで使用できます。 次の例外を除き、この記事の手順を使用してください。
- 必要に応じて、「KQL データベースを作成する」の手順に従ってデータベースを作成する。
- 必要に応じて、「空のテーブルを作成する」の手順に従ってテーブルを作成する。
- 「URI をコピーする」の手順に従って、クエリまたはインジェスト URI を取得する。
- KQL クエリセットでクエリを実行する。
Azure Functions を使用すると、スケジュールに従って、またはイベントに応答して、クラウドでサーバーレス コードを実行できます。 Azure Functions に対して Azure Data Explorer の入力バインドと出力バインドを使用すると、Azure Data Explorer をワークフローに統合してデータを取り込み、クラスターに対してクエリを実行できます。
前提条件
- Azure サブスクリプション。 無料の Azure アカウントを作成します。
- Azure Data Explorer クラスターとデータベースおよびサンプル データ。 クラスターとデータベースを作成します。
- ストレージ アカウント。
サンプル プロジェクトを使用して統合を試してください。
Azure Functions に Azure Data Explorer のバインドを使用する方法
Azure Functions に Azure Data Explorer バインドを使用する方法については、次の記事を参照してください。
- Azure Functions における Azure Data Explorer バインドの概要
- Azure Functions 用の Azure Data Explorer 入力バインド
- Azure Functions 用の Azure Data Explorer 出力バインド
Azure Functions に Azure Data Explorer のバインドを使用するシナリオ
以下のセクションでは、Azure Functions に Azure Data Explorer のバインドを使用するいくつかの一般的なシナリオについて説明します。
入力バインディング
入力バインドは、必要に応じてパラメーターを使用して Kusto クエリ言語 (KQL) クエリまたは KQL 関数を実行し、出力を関数に返します。
次のセクションでは、一般的なシナリオで入力バインドを使用する方法について説明します。
シナリオ 1: クラスターでデータのクエリを実行する HTTP エンドポイント
REST API を使用して Azure Data Explorer データを公開する必要がある場合は、入力バインドを使用します。 このシナリオでは、Azure Functions HTTP トリガーを使って、クラスター内のデータのクエリを実行します。 このシナリオは、外部アプリケーションまたはサービスの Azure Data Explorer データへのプログラムによるアクセスを提供する必要がある場合に役立ちます。 REST API を使用してデータを公開すると、アプリケーションはクラスターに直接接続しなくても、データを簡単に使用できます。
次のコードでは、HTTP トリガーと Azure Data Explorer の入力バインドを使って関数を定義しています。 入力バインドでは、productsdb データベースの Products テーブルに対して実行するクエリが指定されています。 この関数では、パラメーターとして渡される述語として、productId 列を使います。
{
[FunctionName("GetProduct")]
public static async Task<IActionResult> RunAsync(
[HttpTrigger(AuthorizationLevel.User, "get", Route = "getproducts/{productId}")]
HttpRequest req,
[Kusto(Database:"productsdb" ,
KqlCommand = "declare query_parameters (productId:long);Products | where ProductID == productId" ,
KqlParameters = "@productId={productId}",
Connection = "KustoConnectionString")]
IAsyncEnumerable<Product> products)
{
IAsyncEnumerator<Product> enumerator = products.GetAsyncEnumerator();
var productList = new List<Product>();
while (await enumerator.MoveNextAsync())
{
productList.Add(enumerator.Current);
}
await enumerator.DisposeAsync();
return new OkObjectResult(productList);
}
}
次のように関数を呼び出します。
curl https://myfunctionapp.azurewebsites.net/api/getproducts/1
シナリオ 2: クラスターからデータをエクスポートするスケジュールされたトリガー
次のシナリオは、時間ベースのスケジュールでデータをエクスポートする必要がある状況で適用できます。
このコードでは、productsdb データベースから Azure Blob Storage の CSV ファイルに売上データの集計をエクスポートするタイマー トリガーを含む関数が定義されています。
public static async Task Run([TimerTrigger("0 0 1 * * *")] TimerInfo myTimer,
[Kusto(ConnectionStringSetting = "KustoConnectionString",
DatabaseName = "productsdb",
Query = "ProductSales | where OrderDate >= ago(1d) | summarize Sales = sum(ProductSales) by ProductName | top 10 by Sales desc")] IEnumerable<dynamic> queryResults,
[Blob("salescontainer/productsblob.csv", FileAccess.Write, Connection = "BlobStorageConnection")] CloudBlockBlob outputBlob,
ILogger log)
{
// Write the query results to a CSV file
using (var stream = new MemoryStream())
using (var writer = new StreamWriter(stream))
using (var csv = new CsvWriter(writer, CultureInfo.InvariantCulture))
{
csv.WriteRecords(queryResults);
writer.Flush();
stream.Position = 0;
await outputBlob.UploadFromStreamAsync(stream);
}
}
出力バインディング
出力バインドは 1 つ以上の行を受け取り、Azure Data Explorer テーブルに挿入します。
次のセクションでは、一般的なシナリオで出力バインドを使用する方法について説明します。
シナリオ 1: クラスターにデータを取り込む HTTP エンドポイント
このシナリオは、受信 HTTP 要求を処理し、クラスターにデータを取り込む必要がある場合に使用します。 出力バインドを使用すると、要求からの受信データを Azure Data Explorer テーブルに書き込むことができます。
次のコードでは、HTTP トリガーと Azure Data Explorer の出力バインドを使って関数を定義しています。 この関数は、HTTP 要求本文の JSON ペイロードを取得して、productsdb データベースの products テーブルにそれを書き込みます。
public static IActionResult Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "addproductuni")]
HttpRequest req, ILogger log,
[Kusto(Database:"productsdb" ,
TableName ="products" ,
Connection = "KustoConnectionString")] out Product product)
{
log.LogInformation($"AddProduct function started");
string body = new StreamReader(req.Body).ReadToEnd();
product = JsonConvert.DeserializeObject<Product>(body);
string productString = string.Format(CultureInfo.InvariantCulture, "(Name:{0} ID:{1} Cost:{2})",
product.Name, product.ProductID, product.Cost);
log.LogInformation("Ingested product {}", productString);
return new CreatedResult($"/api/addproductuni", product);
}
次のように関数を呼び出します。
curl -X POST https://myfunctionapp.azurewebsites.net/api/addproductuni -d '{"Name":"Product1","ProductID":1,"Cost":100,"ActivatedOn":"2023-01-02T00:00:00"}'
シナリオ 2: Azure でサポートされている RabbitMQ または他のメッセージング システムからデータを取り込む
このシナリオは、メッセージング システムからクラスターにデータを取り込む必要がある場合に使用します。 出力バインドを使用すると、メッセージング システムから Azure Data Explorer テーブルに受信データを取り込むことができます。
このコードでは、RabbitMQ トリガーを使用して関数を定義します。 この関数は、メッセージ (JSON 形式のデータ) を productsdb データベース内の products テーブルに取り込みます。
public class QueueTrigger
{
[FunctionName("QueueTriggerBinding")]
[return: Kusto(Database: "productsdb",
TableName = "products",
Connection = "KustoConnectionString")]
public static Product Run(
[RabbitMQTrigger(queueName: "bindings.products.queue", ConnectionStringSetting = "rabbitMQConnectionAppSetting")] Product product,
ILogger log)
{
log.LogInformation($"Dequeued product {product.ProductID}");
return product;
}
}
関数について詳しくは、Azure Functions のドキュメントをご覧ください。 Azure Data Explorer の拡張機能は次の場所から入手できます。