次の方法で共有


入力と出力のバインドを使用して Azure Functions と Azure Data Explorer を統合する (プレビュー)

重要

このコネクタは、Microsoft Fabric のリアルタイム インテリジェンスで使用できます。 次の例外を除き、この記事の手順を使用してください。

Azure Functions を使用すると、スケジュールに従って、またはイベントに応答して、クラウドでサーバーレス コードを実行できます。 Azure Functions に対して Azure Data Explorer の入力バインドと出力バインドを使用すると、Azure Data Explorer をワークフローに統合してデータを取り込み、クラスターに対してクエリを実行できます。

前提条件

サンプル プロジェクトを使用して統合を試してください。

Azure Functions に Azure Data Explorer のバインドを使用する方法

Azure Functions に Azure Data Explorer バインドを使用する方法については、次の記事を参照してください。

Azure Functions に Azure Data Explorer のバインドを使用するシナリオ

以下のセクションでは、Azure Functions に Azure Data Explorer のバインドを使用するいくつかの一般的なシナリオについて説明します。

入力バインディング

入力バインドは、必要に応じてパラメーターを使用して Kusto クエリ言語 (KQL) クエリまたは KQL 関数を実行し、出力を関数に返します。

次のセクションでは、一般的なシナリオで入力バインドを使用する方法について説明します。

シナリオ 1: クラスターでデータのクエリを実行する HTTP エンドポイント

REST API を使用して Azure Data Explorer データを公開する必要がある場合は、入力バインドを使用します。 このシナリオでは、Azure Functions HTTP トリガーを使って、クラスター内のデータのクエリを実行します。 このシナリオは、外部アプリケーションまたはサービスの Azure Data Explorer データへのプログラムによるアクセスを提供する必要がある場合に役立ちます。 REST API を使用してデータを公開すると、アプリケーションはクラスターに直接接続しなくても、データを簡単に使用できます。

次のコードでは、HTTP トリガーと Azure Data Explorer の入力バインドを使って関数を定義しています。 入力バインドでは、productsdb データベースの Products テーブルに対して実行するクエリが指定されています。 この関数では、パラメーターとして渡される述語として、productId 列を使います。

{
    [FunctionName("GetProduct")]
    public static async Task<IActionResult> RunAsync(
        [HttpTrigger(AuthorizationLevel.User, "get", Route = "getproducts/{productId}")]
        HttpRequest req,
        [Kusto(Database:"productsdb" ,
        KqlCommand = "declare query_parameters (productId:long);Products | where ProductID == productId" ,
        KqlParameters = "@productId={productId}",
        Connection = "KustoConnectionString")]
        IAsyncEnumerable<Product> products)
    {
        IAsyncEnumerator<Product> enumerator = products.GetAsyncEnumerator();
        var productList = new List<Product>();
        while (await enumerator.MoveNextAsync())
        {
            productList.Add(enumerator.Current);
        }
        await enumerator.DisposeAsync();
        return new OkObjectResult(productList);
    }
}

次のように関数を呼び出します。

curl https://myfunctionapp.azurewebsites.net/api/getproducts/1

シナリオ 2: クラスターからデータをエクスポートするスケジュールされたトリガー

次のシナリオは、時間ベースのスケジュールでデータをエクスポートする必要がある状況で適用できます。

このコードでは、productsdb データベースから Azure Blob Storage の CSV ファイルに売上データの集計をエクスポートするタイマー トリガーを含む関数が定義されています。

public static async Task Run([TimerTrigger("0 0 1 * * *")] TimerInfo myTimer,
    [Kusto(ConnectionStringSetting = "KustoConnectionString",
            DatabaseName = "productsdb",
            Query = "ProductSales | where OrderDate >= ago(1d) | summarize Sales = sum(ProductSales) by ProductName | top 10 by Sales desc")] IEnumerable<dynamic> queryResults,
[Blob("salescontainer/productsblob.csv", FileAccess.Write, Connection = "BlobStorageConnection")] CloudBlockBlob outputBlob,
ILogger log)
{
    // Write the query results to a CSV file
    using (var stream = new MemoryStream())
    using (var writer = new StreamWriter(stream))
    using (var csv = new CsvWriter(writer, CultureInfo.InvariantCulture))
    {
        csv.WriteRecords(queryResults);
        writer.Flush();
        stream.Position = 0;
        await outputBlob.UploadFromStreamAsync(stream);
    }
}

出力バインディング

出力バインドは 1 つ以上の行を受け取り、Azure Data Explorer テーブルに挿入します。

次のセクションでは、一般的なシナリオで出力バインドを使用する方法について説明します。

シナリオ 1: クラスターにデータを取り込む HTTP エンドポイント

このシナリオは、受信 HTTP 要求を処理し、クラスターにデータを取り込む必要がある場合に使用します。 出力バインドを使用すると、要求からの受信データを Azure Data Explorer テーブルに書き込むことができます。

次のコードでは、HTTP トリガーと Azure Data Explorer の出力バインドを使って関数を定義しています。 この関数は、HTTP 要求本文の JSON ペイロードを取得して、productsdb データベースの products テーブルにそれを書き込みます。

public static IActionResult Run(
    [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "addproductuni")]
    HttpRequest req, ILogger log,
    [Kusto(Database:"productsdb" ,
    TableName ="products" ,
    Connection = "KustoConnectionString")] out Product product)
{
    log.LogInformation($"AddProduct function started");
    string body = new StreamReader(req.Body).ReadToEnd();
    product = JsonConvert.DeserializeObject<Product>(body);
    string productString = string.Format(CultureInfo.InvariantCulture, "(Name:{0} ID:{1} Cost:{2})",
                product.Name, product.ProductID, product.Cost);
    log.LogInformation("Ingested product {}", productString);
    return new CreatedResult($"/api/addproductuni", product);
}

次のように関数を呼び出します。

curl -X POST https://myfunctionapp.azurewebsites.net/api/addproductuni -d '{"Name":"Product1","ProductID":1,"Cost":100,"ActivatedOn":"2023-01-02T00:00:00"}'

シナリオ 2: Azure でサポートされている RabbitMQ または他のメッセージング システムからデータを取り込む

このシナリオは、メッセージング システムからクラスターにデータを取り込む必要がある場合に使用します。 出力バインドを使用すると、メッセージング システムから Azure Data Explorer テーブルに受信データを取り込むことができます。

このコードでは、RabbitMQ トリガーを使用して関数を定義します。 この関数は、メッセージ (JSON 形式のデータ) を productsdb データベース内の products テーブルに取り込みます。

public class QueueTrigger
{
    [FunctionName("QueueTriggerBinding")]
    [return: Kusto(Database: "productsdb",
                TableName = "products",
                Connection = "KustoConnectionString")]
    public static Product Run(
        [RabbitMQTrigger(queueName: "bindings.products.queue", ConnectionStringSetting = "rabbitMQConnectionAppSetting")] Product product,
        ILogger log)
    {
        log.LogInformation($"Dequeued product {product.ProductID}");
        return product;
    }
}

関数について詳しくは、Azure Functions のドキュメントをご覧ください。 Azure Data Explorer の拡張機能は次の場所から入手できます。