Bemærk
Adgang til denne side kræver godkendelse. Du kan prøve at logge på eller ændre mapper.
Adgang til denne side kræver godkendelse. Du kan prøve at ændre mapper.
Important
This feature is in Public Preview.
In Databricks Runtime 15.3 and above, you can use the VARIANT type to ingest semi-structured data. This article describes behavior and provides example patterns for ingesting data from cloud object storage using Auto Loader and COPY INTO, streaming records from Kafka, and SQL commands for creating new tables with variant data or inserting new records using the variant type. The following table summarizes the supported file formats and Databricks Runtime version support:
| File format | Supported Databricks Runtime version |
|---|---|
| JSON | 15.3 and above |
| XML | 16.4 and above |
| CSV | 16.4 and above |
See Query variant data.
Create a table with a variant column
VARIANT is a standard SQL type in Databricks Runtime 15.3 and above and is supported by tables backed by Delta Lake. Managed tables on Azure Databricks use Delta Lake by default, so you can create an empty table with a single VARIANT column using the following syntax.
CREATE TABLE table_name (variant_column VARIANT)
Alternately, you can use a CTAS statement to create a table with a variant column. Use the PARSE_JSON function to parse JSON strings or the FROM_XML function to parse XML strings. The following example creates a table with two columns.
- The
idcolumn is extracted from the JSON string as aSTRINGtype. variant_columncontains the entire JSON string encoded as aVARIANTtype.
CREATE TABLE table_name AS
SELECT json_string:id AS id,
PARSE_JSON(json_string) variant_column
FROM source_data
Note
Databricks recommends extracting frequently queried fields and storing them as non-variant columns to accelerate queries and optimize storage layout.
VARIANT columns cannot be used for clustering keys, partitions, or Z-order keys. The VARIANT data type cannot be used for comparisons, grouping, ordering, and set operations. For more information, see Limitations.
Insert data using parse_json
If the target table already contains a column encoded as VARIANT, you can use parse_json to insert JSON string records as VARIANT. For example, parse JSON strings from the json_string column and insert them into table_name.
SQL
INSERT INTO table_name (variant_column)
SELECT PARSE_JSON(json_string)
FROM source_data
Python
from pyspark.sql.functions import col, parse_json
(spark.read
.table("source_data")
.select(parse_json(col("json_string")))
.write
.mode("append")
.saveAsTable("table_name")
)
Insert data using from_xml
If the target table already contains a column encoded as VARIANT, you can use from_xml to insert XML string records as VARIANT. For example, parse XML strings from the xml_string column and insert them into table_name.
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_XML(xml_string, 'variant')
FROM source_data
Python
from pyspark.sql.functions import col, from_xml
(spark.read
.table("source_data")
.select(from_xml(col("xml_string"), "variant"))
.write
.mode("append")
.saveAsTable("table_name")
)
Insert data using from_csv
If the target table already contains a column encoded as VARIANT, you can use from_csv to insert CSV string records as VARIANT. For example, parse CSV records from the csv_string column and insert them into table_name.
SQL
INSERT INTO table_name (variant_column)
SELECT FROM_CSV(csv_string, 'v variant').v
FROM source_data
Python
from pyspark.sql.functions import col, from_csv
(spark.read
.table("source_data")
.select(from_csv(col("csv_string"), "v variant").v)
.write
.mode("append")
.saveAsTable("table_name")
)
Ingest data from cloud object storage as variant
Auto Loader can be used to load all data from the supported file sources as a single VARIANT column in a target table. Because VARIANT is flexible to schema and type changes and maintains case sensitivity and NULL values present in the data source, this pattern is robust to most ingestion scenarios with the following caveats:
- Malformed records cannot be encoded using
VARIANTtype. VARIANTtype can only hold records up to 16 MB in size.
Note
Variant treats overly large records similar to corrupt records. In the default PERMISSIVE processing mode, overly large records are captured in the corruptRecordColumn.
Because the entire record is recorded as a single VARIANT column, no schema evolution occurs during ingestion and rescuedDataColumn is not supported. The following example assumes that the target table already exists with a single VARIANT column.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("singleVariantColumn", "variant_column")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
You can also specify VARIANT when defining a schema or passing schemaHints. The data in the referenced source field must contain a valid record. The following examples demonstrate this syntax.
# Define the schema.
# Writes the columns `name` as a string and `address` as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("name STRING, address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Define the schema.
# A single field `payload` containing JSON data is written as variant.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema("payload VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
# Supply schema hints.
# Writes the `address` column as variant.
# Infers the schema for other fields using standard rules.
(spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaHints", "address VARIANT")
.load("/Volumes/catalog_name/schema_name/volume_name/path")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Use COPY INTO with variant
Databricks recommends using Auto Loader over COPY INTO when available.
COPY INTO supports ingesting the entire contents of a supported data source as a single column. The following example creates a new table with a single VARIANT column and then uses COPY INTO to ingest records from a JSON file source.
CREATE TABLE table_name (variant_column VARIANT);
COPY INTO table_name
FROM '/Volumes/catalog_name/schema_name/volume_name/path'
FILEFORMAT = JSON
FILES = ('file-name')
FORMAT_OPTIONS ('singleVariantColumn' = 'variant_column')
Stream Kafka data as variant
Many Kafka streams encode their payloads using JSON. Ingesting Kafka streams using VARIANT makes these workloads robust to schema changes.
The following example demonstrates reading a Kafka streaming source, casting the key as a STRING and the value as VARIANT, and writing out to a target table.
from pyspark.sql.functions import col, parse_json
(spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
.select(
col("key").cast("string"),
parse_json(col("value").cast("string"))
).writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("table_name")
)
Next steps
- Query variant data.
- Configure Variant type support.
- Learn more about Auto Loader. See What is Auto Loader?.