Azure: Tags

This post is how to work with tags for Azure resources.

Install Graph Extension

az extension add --name resource-graph

Tag List

az tag list --subscription <NAME>

Query for Specific Tag Value

az graph query -q "project name, resourceGroup, type, tags | where tags.<TAGNAME>=~'<VALUE>'"

Query for Multiple Tags

az graph query -q "project name, resourceGroup, type, tags | where tags.<TAGNAME>=~'<VALUE>' | where tags.<TAGNAME>=~'<VALUE>'"

Query for Resource Groups

az graph query -q "ResourceContainers | project name, type, tags | where tags.<TAGNAME>=~'<VALUE>'"

Query For Multiple Resource Types

az graph query -q "project name, resourceGroup, type, tags | where tags.<TAGNAME>=~'<VALUE>' | where type =~ 'microsoft.sql/servers/databases' or type =~ 'microsoft.storage/storageaccounts'"

 

Databricks: Python SDK

This post is how to use the Databricks Python SDK.

Install the Package

pip install databricks-sdk

Create Storage Credential

NOTE: Your SPN must be account admin to do this.

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import AzureManagedIdentity

secret = dbutils.secrets.get(scope = "<SCOPE>", key = "<KEY>")

w = WorkspaceClient(
  host = 'https://<URL>/'.
  azure_workspace_resource_id = '<RESOURCE_ID_OF_DATABRICKS>',
  azure_tenant_id = '<TENANT_ID>',
  azure_client_id = '<CLIENT_ID>',
  azure_client_secret = secret
)

storage_credential_name = '<CREDENTIAL_NAME>'
comment = '<COMMENT>'
connector_id = '<DATABRICKS_ACCESS_CONNECTOR>'
az_mi = AzureManagedIdentity(access_connector_id = connector_id)

w.storage_credenditals.create(
  name = storage_credential_name,
  azure_managed_identity = az_mi
  comment = comment
)

 

PySpark DataFrame Methods

This post shows different methods of a DataFrame.

Get the first value in a column

df = some_dataframe_definition

value = df.select("SOME_COLUMN_NAME").first()[0]

Convert Dataframe to JSON

df = some_dataframe_definition

result_json = df.toJSON()

Get a Row

df = some_dataframe_definition

row = df.collect()[0]      #You can switch out 0 for whatever row you want.

Count rows of Dataframe

df = some_dataframe_definition

num_rows = df.count()

 

 

Databricks Unity Catalog SQL Commands

This post is basic commands you will need to know for working with Unity Catalog.

Display Current Metastore
SELECT CURRENT_METASTORE();
Display Current Catalog
SELECT CURRENT_CATALOG();
Create Catalog
CREATE CATALOG IF NOT EXISTS  <Catalog_Name> COMMENT 'A COMMENT';
Create Catalog With Location
CREATE CATALOG IF NOT EXISTS <Catalog_Name> MANAGED LOCATION 'abfss://<METASTORE_CONTAINER_NAME>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<CATALOG_NAME>' COMMENT 'A COMMENT';
Describe Catalog
DESCRIBE CATALOG <Catalog_Name>;
Create Schema
CREATE SCHEMA IF NOT EXISTS <SCHEMA_NAME> COMMENT '<COMMENT>';
Create Schema With Location
CREATE SCHEMA IF NOT EXISTS <SCHEMA_NAME> MANAGED LOCATION 'abfss://<METASTORE_CONTAINER_NAME>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<CATALOG_NAME>/<SCHEMA_NAME>' COMMENT '<COMMENT>';
Show All Storage Credentials
SHOW STORAGE CREDENTIALS;
Describe Credential
DESCRIBE STORAGE CREDENTIAL <CREDENTIAL_NAME>;
Create External Location

You will first need a storage credential.

You can reference down to the full table path or keep it at the container

CREATE EXTERNAL LOCATION IF NOT EXISTS <NAME>
URL 'abfss://<COTNAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/'
WITH (STORAGE CREDENTIAL <CREDENTIAL_NAME>)
COMMENT '<COMMENT>';
Create External Table
CREATE TABLE <CATALOG_NAME>.<SCHEMA_NAME>.<TABLE_NAME>
USING <FORMAT>
LOCATION 'abfss://<COTNAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/FOLDER/PATH;'
Grant Create Storage Credential on Metastore
GRANT CREATE STORAGE CREDENTIAL ON METASTORE TO `<USER>`;
Grant Permission to Create External Locations on Storage Credential
GRANT CREATE EXTERNAL LOCATION ON STORAGE CREDENTIAL <CREDENTIAL_NAME> TO `<USER>`;
Grant Permission to Create External Location On Metastored
GRANT CREATE EXTERNAL LOCATION ON METASTORE TO `<USER>`;
Grant Permission to Use Catalog
GRANT USE_CATALOG ON CATALOG <CATALOG_NAME> TO `<USER>`;
Show all Grants On Metastore
SHOW GRANTS `<USER>` ON METASTORE;
Grant Permission to Use Schema
GRANT USE_SCHEMA ON SCHEMA <CATALOG_NAME>.<SCHEMA_NAME> TO `<USER>`;
Grant Permission to Create Table
GRANT CREATE TABLE ON SCHEMA <CATALOG_NAME>.<SCHEMA_NAME> TO <USER>;

 

Databricks Unity Catalog Rest API’s

This post is how to work with Databricks Unity Catalog Rest API’s.

Set Catalog Isolation Mode to ISOLATED

curl --location --request PATCH 'https://<DATABRICK_URL>/api/2.1/unity-catalog/catalogs/<CATALOG_NAME>' \
--header 'Authorization: Bearer <TOKEN>' \
--header 'Content-Type: application/json' \
--data-raw '{
"isolation_mode": "ISOLATED"
}'

Bind Workspace to Catalog

curl --location --request PATCH 'https://<DATABRICK_URL>/api/2.1/unity-catalog/bindings/catalog/<CATALOG_NAME>' \
--header 'Authorization: Bearer <TOKEN>' \
--header 'Content-Type: application/json' \
--data-raw '{
"add": [{ "workspace_id": "<WORKSPACEE_ID>", "binding_type": "BINDING_TYPE_READ_WRITE" }]
"remove": []
}'

Unbind Workspace to Catalog

curl --location --request PATCH 'https://<DATABRICK_URL>/api/2.1/unity-catalog/bindings/catalog/<CATALOG_NAME>' \
--header 'Authorization: Bearer <TOKEN>' \
--header 'Content-Type: application/json' \
--data-raw '{
"unassign_workspaces": ["<WORKSPACE_ID>"]
}'

List Workspaces Assigned to Catalog

curl --location --request GET 'https://<DATABRICK_URL>/api/2.1/unity-catalog/bindings/catalog/<CATALOG_NAME>' \
--header 'Authorization: Bearer <TOKEN>' \
--header 'Content-Type: application/json'

 

Synapse: Get KeyVault Properties Using Token Library

This post is how to get the key vault properties using the token library.

Ensure you have a spark session created. Refer to PySpark: Create a Spark Session

import sys
from pyspark.sql import SparkSession

linked_service_name = '<KEYVAULT_LINKED_SERVICE_NAME>'
spark = <GET_SPARK_SESSION>
token_library = spark._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
key_vault_url = token_library.getFullConnectionStringAsMap(linked_service_name).get('url')

print(key_vault_url)
print(token_library.getFullConnectionStringAsMap(linked_service_name))

 

Synapse: SAS Token

This post is how to get the SAS token from a notebook.

Ensure you have a spark session created. Refer to PySpark: Create a Spark Session

from notebookutils import mssparkutils

linked_service_storage_account_name = '<LINKED_SERVICE_STORAGE_NAME>'
blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_storage_account_name)

spark.conf.set('fs.azure.sas.<CONTAINER_NAME>.<ADLS_STORAGE_ACCOUNT_NAME>.blob.core.windows.net', blob_sas_token

 

Synapse: Environment Variables

This post is how to work with environment variables in Synapse.

Ensure you have a spark session created. Refer to PySpark: Create a Spark Session

Get Environment Variable

It should be noted that “str” is the type that variable is. You can change it to whatever is required.

var: str = spark.conf.get('spark.executorEnv.<ENV_NAME>')
Set Environment Variable
spark.conf.set('spark.executorEnv.<ENV_NAME>', '<VALUE>')

 

Synapse: List Python Packages

This post is how to list the python packages in various ways.

You can use %pip to list the python packages that are installed.

%pip freeze

However doing it that way may not give you the exact versions that are installed. To get a comprehensive list do the following.

import pkg_resources

for package in pkg_resources.working_set:
    print(package)

 

Synapse: Help Command

This post is just how to use the help command from mssparkutils.

You can use help at various levels of Synapse.

Root

The following command will tell you what areas help can assist you in. This will respond with

  • fs
  • notebook
  • credentials
  • env
from notebookutils import mssparkutils

mssparkutils.help()
FileSystem

If you leave the help command empty it will just return all options that are available for help. If you put a command in then it will explain that command in greater detail.

from notebookutils import mssparkutils

mssparkutils.fs.help()

mssparkutils.fs.help('cp')

 

Synapse: Mounts

This post is how to work with mounts on Synapse.

I suggest mounting to an ADLS storage account. That is what I will assume in the below examples.

List Mounts
from notebookutils import mssparkutils

mssparkutils.fs.mounts()
Get Mount Path

The output of this command will produce ‘/synfs/<number>/mnt/<CONTAINER_NAME>’

from notebookutils import mssparkutils

mount_name = "/mnt/<CONTAINER_NAME>"
mount_path = mssparkutils.fs.getMountPath(mount_name)
Unmount
from notebookutils import mssparkutils

mount_name = "/mnt/<CONTAINER_NAME>"
mssparkutils.fs.unmount(mount_name)
Mount Using a Linked Service

First you must have a linked service created to the storage account. This linked service must be hard-coded and not parameterized in any way.

from notebookutils import mssparkutils

container = '<CONTAINER_NAME>'
storage_account = '<STORAGE_ACCOUNT_NAME>'
sub_folder = '<SUB_FOLDER>' #it should be noted that this isn't required.
linked_service_name = '<LINKED_SERVICE_NAME>'

mssparkutils.fs.mount(
    source='abfss://%s@%s.dfs.core.windows.net/%s/' % (container, storage_account, sub_folder),
    mountPoint='/mnt/%s' % (container),
    {'linkedService':linked_service_name, 'fileCacheTimeout': 120, 'timeout': 120}
)
Mount Using Configs

You will need to get the secret. Refer to Synapse: Get Secret

from notebookutils import mssparkutils

client_id = '<CLIENT_ID>'
tenant_id = '<TENANT_ID>'
container = '<CONTAINER_NAME>'
storage_account = '<STORAGE_ACCOUNT_NAME>'
sub_folder = '<SUB_FOLDER>' #it should be noted that this isn't required.

configs = {
  "fs.azure.account.auth.type": "OAuth",
  "fs.azure.account.oauth.provider.type": "org.apache.fs.azurebfs.oauth2.ClientCredsTokenProvider",
  "fs.azure.account.oauth2.client.id": client_id,
  "fs.azure.account.oauth2.client.secret": secret,
  "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/" tenant_id + "/oauth2/token"
}

mssparkutils.fs.mount(
  source='abfss://%s@%s.dfs.core.windows.net/%s' % (container, storage_account, sub_folder),
  mountPoint='/mnt/%s' % (container),
  extraConfigs=configs
)

 

PySpark: Delta Lake

This post is how to use pyspark to work with Delta Tables.

For more information on Delta Lake you can refer here.

First you need to install the “delta-spark” package for whatever version you require.

pip install delta-spark==3.1.0

Setup a Spark Session.

To read delta tables you can refer to PySpark: Read From ADLS to DataFrame.

To write delta tables you can refer to PySpark: Save a DataFrame To ADLS.

Vacuum Delta Table

from delta.tables import DeltaTable

vacuum_hrs = 100
path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'

delta_table = DeltaTable.forPath(spark, path)
delta_table.vacuum(vacuum_hrs)

Compaction

Impoves reads by merging small files into larger ones.

from delta.tables import DeltaTable

path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/' 

delta_table = DeltaTable.forPath(spark, path)
delta_table.optimize().executeCompaction()

Z-Order

from delta.tables import DeltaTable

path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/' 
columns = ''

delta_table = DeltaTable.forPath(spark, path)
delta_table.optimize().executeZOrderBy(columns)

Delete

from delta.tables import DeltaTable
import pyspark.sql.functions as F

path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/' 

delta_table = DeltaTable.forPath(spark, path)
delta_table.delete(F.col('<MY_COL>') == '<SOME_VAL>')

#You can also use sql
delta_table.delete("column == 'some_VALUE'")

Modify Properties

You can refer here for more properties.

dataSkippingNumIndexedCols

You would do this if you have over the max columns that the delta lake can collect statistics on. Default value is 32.

path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'
skip_cols = <SOME_VALUE>

spark.sql("ALTER TABLE delta.`%s` SET TBLPROPERTIES ('delta.dataSkippingNumIndexedCols' == '%s')" % (path, skip_cols))

 

Synapse: Get Secret

This post is how to get a secret from a key vault in Synapse.

If you have Data Exfiltration enabled (which is recommended) then you need to have a Managed Private Endpoint setup to your KeyVault.

You also need to ensure your Synapse Managed Identity has access to your Key Vault.

You also need a un-parameterized Linked Service Created.

Then you can query your Key Vault to get the secret with the following command.

from notebookutils import mssparkutils

secret = mssparkutils.credentials.getSecret('<KEY_VAULT_NAME>', '<SECRET_KEY>', '<LINKED_SERVICE_KEYVAULT_NAME>')

 

Databricks: Get Secret

This post is how to get a secret from a key vault in Databricks.

First you need to setup dbutils.

Next you have to make sure your Databricks installation has a Key Vault integrated Scope setup.

Then you need to make sure that Databricks is allowed to communicate with your KeyVault.

Then you can query your Key Vault to get the secret with the following command.

secret = dbutils.secrets.get(scope='<SCOPE>', key='<SECRET_KEY>')

 

Spark Connector Connect to SQL Server

This post is how to use the Spark Connector to Connect to SQL Server.

Install Spark Connector

spark-mssql-connector_2.12-1.2.0.jar

Install msal

pip install msal

Connect using Azure SPN

import msal
global_token_cache = msal.TokenCache()

secret = "<GET SECRET SECURELY>"

global_spn_app = msal.ConfidentialClientApplication(
    <CLIENT_ID>, Authority='https://login.microsoftonline.com/<TENANT_ID>',
    client_credential=secret,
    token_cache=global_token_cache,
)

result = global_spn_app.acquire_token_for_client(scopes=['https://database.windows.net//.default'])

jdbc_df = spark.read \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .option("url", 'jdbc:sqlserver://<SERVER_NAME>:<PORT>;database=<DATABASE>;') \
    .option("query", "SELECT * FROM SOMETHING") \
    .option("accessToken", result['access_token']) \
    .option("encrypt", "true") \
    .option("hostNameInCertificate", "*.database.windows.net") \
    .load()

Connect using Domain Auth

secret = "<GET SECRET SECURELY>"

jdbc_df = spark.read \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .option("url", 'jdbc:sqlserver://<SERVER_NAME>:<PORT>;database=<DATABASE>;') \
    .option("query", "SELECT * FROM SOMETHING") \
    .option("authentication", "ActiveDirectoryPassword") \
    .option("user", "<USER>@<DOMAIN>") \
    .option("password", "<SECRET>") \
    .load()

Connect using SQL Auth

I do not recommend SQL Auth

secret = "<GET SECRET SECURELY>"

jdbc_df = spark.read \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .option("url", 'jdbc:sqlserver://<SERVER_NAME>:<PORT>;database=<DATABASE>;') \
    .option("query", "SELECT * FROM SOMETHING") \
    .option("user", "<USER>") \
    .option("password", "<SECRET>") \
    .load()