Extracting Schema information from Databricks would seem to be a very simple solution, isn’t it? I mean there is INFORMATION_SCHEMA to use. That is unfortunately only applicable only to Unity Catalog i.e. a databricks metastore that can share data across multiple Azure Databricks workspaces. Unfortunately no such catalog exists in our project. Best recourse for this is by using Python Spark SQL libraries.
Problem
In one of the recent tasks, I had to update a large SQL query comprised of 10+ Databricks tables with additional attributes and replace some existing ones. The only problem is most of the attributes had no aliases to them. So, there was no way to know which attribute is from which table. It’s one of the most irritating and annoying things that bothers me to no end and feel like…
Solution
Pyspark to rescue! This is easily accomplished using small bit of python. Let’s say, we need to extract column info for table – default.volunteers. Here is the command that you would run in the Databricks Notebook for getting a specific table’s schema detail using the command ‘describe ‘
Imagine, doing that for 20 times and over. Na, didn’t think so. Here is how the solution would be structured –
- Obtain list of tables to extract – First step is to load all the tables for which you want the schema to be extracted from in a file say – schema_extract_table_names_list.csv. Load the list of tables in a variable
- Loop through the tables list and run the ‘DESCRIBE <table_name>’ command and store in a output variable
- Create a Pandas Dataframe to store the output for further processing
In cases where you want to iterate through entire Databricks, you may want to use the command ‘show databases’ along with ‘show tables in <database>’ command followed lastly by ‘describe <database>.<table_name>’ from above list. In my project this took enormous amount of time, hence I took the approach of putting the tables I need in a file and just getting the output.
Here is the whole code
import pyspark.pandas as ps
import pandas as pd
# file containing list of tables for which schema information is to be searched / collected
path = "dbfs:/FileStore/practise/schema_extract_table_names_list.csv"
# read the file info and store it in the dataframe
tables_df = spark.read.csv(path ,header=True)
# get the value in form of a list
tables = tables_df.selectExpr("collect_list(table_names) as tables").first().tables
# variable to store results
output = []
for table in tables:
dataset_name, table_name = table.split('.')
# query to get schema information
sql_get_table_info = "DESCRIBE {0}.{1}".format(dataset_name, table_name)
try:
columns_df = ps.sql(sql_get_table_info).to_pandas()
except:
continue
#loop throw all the columns values and get the name and data type
for _,row in columns_df.iterrows():
column_name = row.col_name
data_type = row.data_type
output.append((dataset_name, table_name, column_name, data_type))
df = pd.DataFrame(output, columns=['dataset_name','table_name','column_name','data_type'])
##scenario 1 - if you need to search for particular column in the result set
# column name to search through
col_name_to_search = 'col_name'
# need for further processing
if col_name_to_search in df['column_name'].values():
print("Column name {0} is present in following tables:".format(col_name_to_search))
display(df[df['column_name'] == col_name_to_search])
else:
print("Column name {0} not present in the given datasets.".format(col_name_to_search))
##scenario 2 - if you need to export the data
#variable to hold dbfs file path to store output
path = "/dbfs/path-to-output.csv"
df.write.csv(path, header=True)