Del via


Tutorial: Opret en brugerdefineret søgemaskine og et spørgsmålssvarssystem

I denne vejledning lærer du, hvordan du indekserer og forespørger store datamængder, der indlæses fra en Spark-klynge. Du opretter en Jupyter Notebook, der udfører følgende handlinger:

  • Indlæs forskellige formularer (fakturaer) i en dataframe i en Apache Spark-session
  • Analyser dem for at bestemme deres egenskaber
  • Saml det resulterende output i en tabelbaseret datastruktur
  • Skriv outputtet til et søgeindeks, der hostes i Azure Cognitive Search
  • Udforsk og forespørg det indhold, du har skabt.

1 - Opsætning af afhængigheder

Vi starter med at importere pakker og forbinde til de Azure-ressourcer, der bruges i denne arbejdsgang.

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

cognitive_key = find_secret("cognitive-api-key") # replace with your cognitive api key
cognitive_location = "eastus"

translator_key = find_secret("translator-key") # replace with your cognitive api key
translator_location = "eastus"

search_key = find_secret("azure-search-key") # replace with your cognitive api key
search_service = "mmlspark-azure-search"
search_index = "form-demo-index-5"

openai_key = find_secret("openai-api-key") # replace with your open ai api key
openai_service_name = "synapseml-openai"
openai_deployment_name = "gpt-35-turbo"
openai_url = f"https://{openai_service_name}.openai.azure.com/"

2 - Indlæs data i Spark

Denne kode indlæser nogle eksterne filer fra en Azure-lagringskonto, der bruges til demoformål. Filerne er forskellige fakturaer, og de læses ind i en dataframe.

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType


def blob_to_url(blob):
    [prefix, postfix] = blob.split("@")
    container = prefix.split("/")[-1]
    split_postfix = postfix.split("/")
    account = split_postfix[0]
    filepath = "/".join(split_postfix[1:])
    return "https://{}/{}/{}".format(account, container, filepath)


df2 = (
    spark.read.format("binaryFile")
    .load("wasbs://ignite2021@mmlsparkdemo.blob.core.windows.net/form_subset/*")
    .select("path")
    .limit(10)
    .select(udf(blob_to_url, StringType())("path").alias("url"))
    .cache()
)

display(df2)

3 - Ansøg om formulargenkendelse

Denne kode indlæser AnalyzeInvoices-transformeren og sender en reference til datarammen, der indeholder fakturaerne. Den kalder den forudbyggede fakturamodel Azure Forms Analyzer.

from synapse.ml.cognitive import AnalyzeInvoices

analyzed_df = (
    AnalyzeInvoices()
    .setSubscriptionKey(cognitive_key)
    .setLocation(cognitive_location)
    .setImageUrlCol("url")
    .setOutputCol("invoices")
    .setErrorCol("errors")
    .setConcurrency(5)
    .transform(df2)
    .cache()
)

display(analyzed_df)

4 - Forenkle output af formulargenkendelse

Denne kode bruger FormOntologyLearner, en transformer der analyserer output fra Form Recognizer-transformere (til Azure Document Intelligence i Foundry Tools) og udleder en tabulær datastruktur. Outputtet fra AnalyzeInvoices er dynamisk og varierer afhængigt af de funktioner, der opdages i dit indhold.

FormOntologyLearner udvider nytten af AnalyzeInvoices-transformeren ved at lede efter mønstre, der kan bruges til at skabe en tabelbaseret datastruktur. At organisere outputtet i flere kolonner og rækker gør downstream-analysen enklere.

from synapse.ml.cognitive import FormOntologyLearner

organized_df = (
    FormOntologyLearner()
    .setInputCol("invoices")
    .setOutputCol("extracted")
    .fit(analyzed_df)
    .transform(analyzed_df)
    .select("url", "extracted.*")
    .cache()
)

display(organized_df)

Med vores flotte tabelbaserede dataframe kan vi flade de indlejrede tabeller i formularerne ud med lidt SparkSQL

from pyspark.sql.functions import explode, col

itemized_df = (
    organized_df.select("*", explode(col("Items")).alias("Item"))
    .drop("Items")
    .select("Item.*", "*")
    .drop("Item")
)

display(itemized_df)

5 - Tilføj oversættelser

Denne kode indlæser Translate, en transformer der kalder Azure Translator i Foundry Tools-tjenesten. Den oprindelige tekst, som findes på engelsk i kolonnen "Description", er maskinoversat til forskellige sprog. Alt output samles i arrayet "output.translations".

from synapse.ml.cognitive import Translate

translated_df = (
    Translate()
    .setSubscriptionKey(translator_key)
    .setLocation(translator_location)
    .setTextCol("Description")
    .setErrorCol("TranslationError")
    .setOutputCol("output")
    .setToLanguage(["zh-Hans", "fr", "ru", "cy"])
    .setConcurrency(5)
    .transform(itemized_df)
    .withColumn("Translations", col("output.translations")[0])
    .drop("output", "TranslationError")
    .cache()
)

display(translated_df)

6 - Oversæt produkter til emojis med OpenAI 🤯

from synapse.ml.cognitive.openai import OpenAIPrompt
from pyspark.sql.functions import trim, split

emoji_template = """ 
  Your job is to translate item names into emoji. Do not add anything but the emoji and end the translation with a comma
  
  Two Ducks: 🦆🦆,
  Light Bulb: 💡,
  Three Peaches: 🍑🍑🍑,
  Two kitchen stoves: ♨️♨️,
  A red car: 🚗,
  A person and a cat: 🧍🐈,
  A {Description}: """

prompter = (
    OpenAIPrompt()
    .setSubscriptionKey(openai_key)
    .setDeploymentName(openai_deployment_name)
    .setUrl(openai_url)
    .setMaxTokens(5)
    .setPromptTemplate(emoji_template)
    .setErrorCol("error")
    .setOutputCol("Emoji")
)

emoji_df = (
    prompter.transform(translated_df)
    .withColumn("Emoji", trim(split(col("Emoji"), ",").getItem(0)))
    .drop("error", "prompt")
    .cache()
)
display(emoji_df.select("Description", "Emoji"))

7 - Udleder leverandøradressekontinentet med OpenAI

continent_template = """
Which continent does the following address belong to? 

Pick one value from Europe, Australia, North America, South America, Asia, Africa, Antarctica. 

Dont respond with anything but one of the above. If you don't know the answer or cannot figure it out from the text, return None. End your answer with a comma.

Address: "6693 Ryan Rd, North Whales",
Continent: Europe,
Address: "6693 Ryan Rd",
Continent: None,
Address: "{VendorAddress}",
Continent:"""

continent_df = (
    prompter.setOutputCol("Continent")
    .setPromptTemplate(continent_template)
    .transform(emoji_df)
    .withColumn("Continent", trim(split(col("Continent"), ",").getItem(0)))
    .drop("error", "prompt")
    .cache()
)
display(continent_df.select("VendorAddress", "Continent"))

8 - Opret et Azure søgeindeks for formularerne

from synapse.ml.cognitive import *
from pyspark.sql.functions import monotonically_increasing_id, lit

(
    continent_df.withColumn("DocID", monotonically_increasing_id().cast("string"))
    .withColumn("SearchAction", lit("upload"))
    .writeToAzureSearch(
        subscriptionKey=search_key,
        actionCol="SearchAction",
        serviceName=search_service,
        indexName=search_index,
        keyCol="DocID",
    )
)

9 - Prøv en søgeforespørgsel

import requests

search_url = "https://{}.search.windows.net/indexes/{}/docs/search?api-version=2019-05-06".format(
    search_service, search_index
)
requests.post(
    search_url, json={"search": "door"}, headers={"api-key": search_key}
).json()

10 - Byg en chatbot, der kan bruge Azure Search som et værktøj 🧠🔧

import json
import openai

openai.api_type = "azure"
openai.api_base = openai_url
openai.api_key = openai_key
openai.api_version = "2023-03-15-preview"

chat_context_prompt = f"""
You are a chatbot designed to answer questions with the help of a search engine that has the following information:

{continent_df.columns}

If you dont know the answer to a question say "I dont know". Do not lie or hallucinate information. Be brief. If you need to use the search engine to solve the please output a json in the form of {{"query": "example_query"}}
"""


def search_query_prompt(question):
    return f"""
Given the search engine above, what would you search for to answer the following question?

Question: "{question}"

Please output a json in the form of {{"query": "example_query"}}
"""


def search_result_prompt(query):
    search_results = requests.post(
        search_url, json={"search": query}, headers={"api-key": search_key}
    ).json()
    return f"""

You previously ran a search for "{query}" which returned the following results:

{search_results}

You should use the results to help you answer questions. If you dont know the answer to a question say "I dont know". Do not lie or hallucinate information. Be Brief and mention which query you used to solve the problem. 
"""


def prompt_gpt(messages):
    response = openai.ChatCompletion.create(
        engine=openai_deployment_name, messages=messages, max_tokens=None, top_p=0.95
    )
    return response["choices"][0]["message"]["content"]


def custom_chatbot(question):
    while True:
        try:
            query = json.loads(
                prompt_gpt(
                    [
                        {"role": "system", "content": chat_context_prompt},
                        {"role": "user", "content": search_query_prompt(question)},
                    ]
                )
            )["query"]

            return prompt_gpt(
                [
                    {"role": "system", "content": chat_context_prompt},
                    {"role": "system", "content": search_result_prompt(query)},
                    {"role": "user", "content": question},
                ]
            )
        except Exception as e:
            raise e

11 - At stille vores chatbot et spørgsmål

custom_chatbot("What did Luke Diaz buy?")

12 - Et hurtigt dobbelttjek

display(
    continent_df.where(col("CustomerName") == "Luke Diaz")
    .select("Description")
    .distinct()
)