Python: pyodbc with SQL Server

This post is in regards to connecting to SQL Server using pyodbc.

Install package

pip install pyodbc

If you are running in Databricks then the current driver will be “{ODBC Driver 17 for SQL Server}”.

If you are running in Synapse then the current driver will be “{ODBC Driver 18 for SQL Server}”.

Check pyodbc Version

import pyodbc
pyodbc.drivers()

Check Which Version of pyodbc in Databricks

%sh
cat /etc/odbcinst.ini

Install Databricks driver 17

curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
curl https://packages.microsoft.com/config/ubuntu/20.04/prod.list > /etc/apt/sources.list.d/mssql-release.list
apt-get update
ACCEPT_EULA=Y apt-get install msodbcsql17
apt-get -y install unixodbc-dev

Connect using SQL Auth

I do not recommend SQL Auth

import pyodbc

secret = "<GET SECRET SECURELY>"

connection = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};Server=tcp:<SERVER_NAME>;PORT=<PORT>;Database=<DATABASE>;Uid=<USER>;Pwd=<SECRET>;Encrypt=yes;TrustServerCertificate=no;Connection Timeout=<TIMEOUT>;')

Connect Using Domain Auth

import pyodbc

secret = "<GET SECRET SECURELY>"

connection = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};Server=tcp:<SERVER_NAME>;PORT=<PORT>;Database=<DATABASE>;Uid=<USER>;Pwd=<SECRET>;Encrypt=yes;TrustServerCertificate=no;Connection Timeout=<TIMEOUT>;Authentication=ActiveDirectoryPassword')

Connect using Azure SPN

pip install msal
import struct
import msal

global_token_cache = msal.TokenCache()
secret = "<GET SECRET SECURELY>"

global_spn_app = msal.ConfidentialClientApplication(
    <CLIENT_ID>, Authority='https://login.microsoftonline.com/<TENANT_ID>',
    client_credential=secret,
    token_cache=global_token_cache,
)

result = global_spn_app.acquire_token_for_client(scopes=['https://database.windows.net//.default'])
SQL_COPT_SS_ACCESS_TOKEN = 1256

token = bytes(result['access_token'], 'utf-8')
exptoken = b"";

for i in token:
    exptoken += bytes({i});
    exptoken += bytes(1);

token_struct = struct.pack("=i", len(exptoken)) + exptoken;

connection = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};Server=tcp:<SERVER_NAME>;PORT=<PORT>;Database=<DATABASE>;Uid=<USER>;Pwd=<SECRET>;Encrypt=yes;TrustServerCertificate=no;Connection Timeout=<TIMEOUT>;' attrs_before = { SQL_COPT_SS_ACCESS_TOKEN:tokenstruct })

Once you have the connection you can setup the cursor.

cursor = connection.cursor()

Then execute a command

command = "<COMMAND>"
params = ()
cursor.execute(command, params)
connection.commit()

After you Are finish Close

cursor.close()
connection.close()

 

Python: Arguments

This post is in how do use argparse package.

First you must import the package.

import argparse

Next you setup the argument parser.

parser = argparse.ArgumentParser()

Then you create a list of arguments. See the link above for more options then the below set.

argument_list = [
    { "name": "<NAME>", "help": "<HELP_TEXT>", "type": "<TYPE>", "required": True}
]

Then we take your argument_list and create arguments and assign them to the parser.

for arg in argument_list:
    parser.add_argument("--{}".format(arg["name"], help=arg["help"], type=arg["type"], required=arg["required"])

Then we parse the args from “sys.argv”. Parsing args this way means that if anything is unknown to your program than your program won’t fail but instead it will set those variables to the unknown variable and continue your application.

args, unknown = parser.parse_known_args()

You could also parse the args from “sys.argv” this way. However that means that all the args passed to sys.argv must be known otherwise it will fail.

args = parser.parse_args()

Then as a final step we set the values with their key to the config.

config = vars(args)

 

 

 

Azure: Python SDK

This post is how to use the Azure Python SDK.

First we need to install the Packages

pip install azure-storage-file
pip install azure-identity
pip install azure-storage-file-datalake

Setup Credentials

Service Principal

from azure.common.credentials import ServicePrincipalCredentials
secret = "<GET_SECRET_SECURELY>"
credential = ServicePrincipalCredential("<SPN_CLIENT_ID>", secret, tenant="<TENANT_ID>")

Token Credential

from azure.identity import ClientSecretCredential
secret = "<GET_SECRET_SECURELY>"
token_credential = ClientSecretCredential("<TENANT_ID>", "<SPN_CLIENT_ID>", secret)

Subscription Client

Client

from azure.mgmt.resource import SubscriptionClient
subscription_client = SubscriptionClient(credential)

Get List

subscriptions = subscription_client.subscriptions.list()
for subscription in subscriptions:
    print(subscription.display_name)

Storage Account

Client

from azure.mgmt.storage import StorageManagementClient
storage_client = StorageManagementClient(credential, "<SUBSCRIPTION_ID>")

Get List by Resource Group

storage_accounts = storage_client.storage_accounts.list_by_resource_group("<RESOURCE_GROUP_NAME>")
for sa in storage_accounts:
    print(sa.name)

List Containers in Storage Account

containers = storage_client.blob_containers.list("<RESOURCE_GROUP_NAME>", sa.name)

Containers

Client

from azure.storage.blob import ContainerClient
account_url_blob = f"https://{sa.name}.blob.core.windows.net"
container_client = ContainerClient.from_container_url(
    container_url=account_url_blob + "/" + container.name,
    credential=token_credential
)

Get Container Properties

container_client.get_container_properties()

List Blobs

for b in container_client.list_blobs():
    print(b)

Data Lake Service

Client

from azure.storage.filedatalake import DataLakeServiceClient
storage_account_url_dfs = f"https://{sa.name}.df.core.windows.net"
data_lake_service_client = DataLakeServiceClient(storage_account_url_dfs, token_credential)

DataLake Directory

from azure.storage.filedatalake import DataLakeDirectoryClient
data_lake_directory_client = DataLakeDirectoryClient(account_url=account_url_dfs, credential=credential)

FileSystem

Client

file_system_client = data_lake_service_client.get_file_system_client(file_system="<CONTAINER_NAME>")

Get Directory Client

directory_client = file_system_client.get_directory_client("<CONTAINER_SUB_FOLDER>")

Get Directory Access Control

acl_props = directory_client.get_access_control()

 

Databricks: Notebook SQL

This post is how to work with Databricks SQL through a Notebook.

Create a Temp View of a DataFrame.

df = <SOMETHING>
df.createOrReplaceTempView("<TABLE_NAME>")

Drop a Table

%sql
drop table <SCHEMA>.<TABLE>;

Describe Table

%sql
desc table extended <SCHEMA>.<TABLE>;

Describe Detail

%sql
describe detail <SCHEMA>.<TABLE>;

Show Table Properties

%sql
SHOW TBLPROPERTIES <SCHEMA>.<TABLE>;

Describe History

%sql
describe history <SCHEMA>.<TABLE>;

Create Schema

%sql
CREATE SCHEMA IF NOT EXISTS <SCHEMA>;

Create Parquet Table

%sql
CREATE TABLE <SCHEMA>.<TABLE> USING PARQUET LOCATION 'abfss://<COTNAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'

Create Delta Table

%sql
CREATE TABLE <SCHEMA>.<TABLE> USING DELTA LOCATION 'abfss://<COTNAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'

Upsert

MERGE INTO schema.table t \
USING ( \
  SELECT columns \
  FROM table \
) AS source ON (source.column = t.column) \
WHEN NOT MATCHED THEN \
  INSERT ( \
    ( \
      column, column2 \
    ) \
  VALUES ( \
    source.column, source.column2 \
  ) \
WHEN MATCHED THEN \
  UPDATE SET \
    t.column = source.column \

 

Databricks: Mounts

This post is how to mount on Databricks.

List Mounts

dbutils.fs.mounts()

Unmount

dbutils.fs.unmount("<MOUNT>")

Mount

client_id = "<CLIENTID>"
secret = dbutils.secrets.get(scope = "<SCOPE_NAME>", key = "<SECRET_NAME>")
tenant_id = "<TENANT_ID>"
storage_account_name = "<STORAGE_ACCOUNT_NAME>"
container_name = "<CONTAINER_NAME>"

configs = {
  "fs.azure.account.auth.type": "OAuth",
  "fs.azure.account.oauth.provider.type": "org.apache.fs.azurebfs.oauth2.ClientCredsTokenProvider",
  "fs.azure.account.oauth2.client.id": client_id,
  "fs.azure.account.oauth2.client.secret": secret,
  "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/"  tenant_id + "/oauth2/token"
}

path = "abfss://%s@%s.dfs.core.windows.net/" % (container_name, storage_account_name)

dbutils.fs.mount(
    source = path,
    mount_point = "/mnt/<MOUNT_NAME>",
    extra_configs = configs
)

 

 

 

 

 

Databricks: Notebook Commands

This post is all about notebook commands.

List a directory on DBFS using Shell

%sh
ls /dbfs

List a Directory on DBFS using FS

%fs
ls "<DIRECTORY>"

List Python Packages

%pip list

Install a Python Requirements.txt

%pip install --index <URL> -r requirements.txt

Install a Single Python Package

%pip install --index <URL> <PACKAGE>==<VERSION>

 

Databricks: Bearer Token CLI

This post is how to get the bearer token using the CLI and setting the env variable.

First install Azure CLI.

Databricks Resource ID = 2ff814a6-3304-4ab8-85cb-cd0e6f879c1d

Get Access Token

az account get-access-token --resource 2ff814a6-3304-4ab8-85cb-cd0e6f879c1d

Set Access Token

Linux

export DATABRICKS_AAD_TOKEN="<TOKEN>"

Windows

set DATABRICKS_AAD_TOKEN="<TOKEN>"

Set Config File

Linux

export DATABRICKS_CONFIG_FILE="<LOCATION>"

Windows

set DATABRICKS_CONFIG_FILE="<LOCATION>"

 

Databricks: Rest API

This post is how to communicate with Databricks using Rest API’s.

Databricks Resource ID = 2ff814a6-3304-4ab8-85cb-cd0e6f879c1d

Get Bearer Token for Service Principal

curl -X GET https://login.microsoft.com/<TENANTID>/oauth2/token -H 'Content-Type: application/x-www-form-urlencoded' -d'grant_type=client_credential&client_id=<CLIENTID>&resource=2ff814a6-3304-4ab8-85cb-cd0e6f879c1d&client_secret=<SECRET>

Get Bearer Token for Service Principal Using management.core.windows.net

curl -X GET https://login.microsoftonline.com/<TENANTID>/oauth2/token -H 'Content-Type: application/x-www-form-urlencoded' -d'grant_type=client_credential&client_id=<CLIENTID>&resource=https://management.core.windows.net/&amp;client_secret=<SECRET>'

Start Cluster

curl --location -g --trace -X --request POST -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/clusters/start -d '{ "cluster_id": "<CLUSTER_ID>"}'

Stop Cluster

curl --location -g --trace -X --request POST -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/clusters/stop -d '{ "cluster_id": "<CLUSTER_ID>"}'

List Clusters

curl --location -g --trace -X --request GET -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/clusters/list

Job List

curl --location -g --trace -X --request GET -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/jobs/list

Job Python Run

curl --location -g --trace -X --request POST -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/jobs/run-now -d '{"job_id": <JOB_ID>, "python_params": [] }'

Job Get

curl --location -g --trace -X --request GET -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/jobs/runs/get?run_id=<JOB_RUN_ID>

Create Job

curl --location -g --trace -X --request POST -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/jobs/create -d '{
  "name": "<JOB_NAME>",
  "new_cluster": {
    "name": "<CLUSTER_NAME>",
    "spark_version": "<SPARK_VERSION>",
    "node_type_id": "<NODE_TYPE>",
    "autoscale": {
      "min_workers": 1,
      "max_workers": 3
    },
    "init_scripts": [
      {
        "dbfs": {
          "destination": "dbfs:/<LOCATION>"
        }
      }
    ],
    "cluster_log_conf": {
      "dbfs": {
        "destination": "dbfs:/mnt/<LOCATION>"
      },
      "spark_env_vars": {
        "<KEY>": "<VALUE>"
      }
    },
    "libraries": [
      {
        "pypi": {
          "package": "<PACKAGE>==<VERSION>"
        }
      }
    ],
    "timeout_seconds": <VALUE>,
    "max_retries: 1,
    "spark_python_task: {
      "python_file": "dbfs:/<SOURCE_LOCATION>",
      "parameters": []
    }
  }
}'

Job Permission Patch

curl --location -g --trace -X --request PATCH -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/permissions/jobs/<JOB_ID> -d '{ "access_control_list": [{ "group_name": "<GROUP_NAME>", "permission_level": "<PERMISSION>"}]}'

Get Service Principal List

curl -X GET -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/preview/scim/v2/ServicePrincipals

Delete Service Principal List From Databricks ONLY

curl --location -g --trace -X --request DELETE -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/preview/scim/v2/ServicePrincipals/<APPLICATION_ID>

Add Service Principal To Databricks

curl --location --request POST 'https://<DATABRICKS_url>/api/2.0/preview/scim/v2/ServicePrincipals' --header 'Authorization: Bearer <TOKEN>' --header 'Content-Type: application/json' --data-raw '{ "schemas": ["urn:ietf:params:scim:schemas:core:2.0:ServicePrincipal"], "applicationId": "<CLIENTID>", "displayName": "<DISPLAYNAME>", "groups": [{"value": "<GROUP_ID>"}], "entitlements": [{ "value": "allow-cluster-create"}] }'

List Secret Scopes

curl --location -g --trace -X --request GET -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/secrets/scopes/list

Create KeyVault Secret Scope

curl --location -g --trace -X --request POST -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/secrets/scopes/create -d '{"scope": "<Keyvault_name>", "scope_backend_type": "AZURE_KEYVAULT", "backend_azure_keyvault": {"resource_id": "<RESOURCE_ID>", "dns_name": "<KEYVAULT_URL>"}, "initial_manage_principal": "users"}'

IP Access Lists

curl -X GET -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/ip-access-lists

List Git Repos

curl --location -g --trace -X --request GET -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/repos

Update Git Repo

curl --location -g --trace -X --request POST -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/repos/<REPO_ID> -d '{ "branch": "<BRANCH_NAME>" }'

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Python: lxml

This post focus’ on the lxml package.

First you need to install the package

from lxml import etree

Create xml object by string

xml_str = "<root><subitem attr='test'>rec</subitem></root>"
root = etree.fromstring(xml_str)

Get text in node

text_str = root.xpath('//root/subitem/text()')[0]

Get Attribute

attr = root.xpath('//root/subitem')[0].attrib['attr']

 

Azure: Install/Configure CLI

This post will show you how to install the Azure CLI.

First you need to install the CLI.

Once it is installed you can set your config directory. This is useful for having multiple logins going at the same time.

set AZURE_CONFIG_DIR=<YOUR_DIRECTORY>

You can then login. There are different ways to do that

Way 1: This will popup a login where you enter your login credentials

az login

Way 2: This will ask you for password via the command line

az login -u <YOUR_LOGIN>

Way 3:

az login -u <YOUR_LOGIN> -p <YOUR_PASSWORD>

Way 4: logs in as a service principal

az login --service-principal --user-name <SPN_ID> --password <SPN_KEY> --tenant <TENANTID>

Show your Account

az account show

Set Account Subscription

az account set -s <SUBSCRIPTION_ID>

List Tags For A Resource

az tag list --subscription <SUBSCRIPTION_NAME>

Install Graph

az extension add --name resource-graph

Query for Anything that Has a Tag

az graph query -q "resourceGraoup, type, tags" | where tags.<TAG_NAME>=~'<VALUE>'

Query for More than One Tag

az graph query -q "resourceGraoup, type, tags" | where tags.<TAG_NAME>=~'<VALUE>' | tags.<TAG_NAME>=='<VALUE>'

Query Type

az graph query -q "resourceGroup, type, tags" | where type =~ 'microsoft.sql/servers/databases'

 

Python: Create a Logger

This post is how-to create a logger.

First we need to import

import sys
import logging
from datetime import datetime
from pytz import timezone

Then we create a class for Formatter

class CustomFormatter(logging.Formatter):
    grey = "\x1b[38;20m"
    reset = "\x1b[0m"
    format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s (%(filename)s:)"
    FORMATS = {
        logging.DEBUG: '\x1b[38;5;23m' + format + reset,
        logging.INFO: grey + format + reset,
        logging.WARNING: '\x1b[38;5;56m' + format + reset,
        logging.ERROR: '\x1b[38;5;197m' + format + reset,
        logging.CRITICAL: '\x1b[38;5;1m' + format +reset
    }

    def format(self, record):
        log_fmt = self.FORMATS.get(record.levelno)
        formatter = logging.Formatter(log_fmt)
        return formatter.format(record)

Then we create a function set our logger up.

def set_logger(logging_level, name, log_dir, timezone):
    LOGGING_LEVELS = ['WARNING','INFO','DEBUG','ERROR']
    if logging_level not in LOGGING_LEVELS:
        logging_level = 'INFO'

    level_lookup = {
        'WARNING': logging.WARNING,
        'INFO': logging.INFO,
        'DEBUG': logging.DEBUG,
        'ERROR': logging.ERROR,
    }
    logging.Formatter.converter = lambda *args: datetime.now(tz=timezone(timezone)).timetuple()
    logging.basicConfig(level=level_lookup[logging_level], format="[%(levelname)s] %(asctime)s - %(message)s:%(lineno)d")
    stream_handler = logging.StreamHandler(sys.stdout)
    stream_handler.setFormatter(CustomFormatter())
    logger = logging.getLogger(name)
    logger.addHandler(stream_handler)
    logger.setLevel(logging_level)

    Path(log_dir).mkdir(parents=True, exist_ok=True)

    now = datetime.now(tz=timezone(timezone))
    now = now.strftime("%H-%M-%S")

    log_file = '%slog_%s.log' % (log_dir, now)
    file_handler = logging.FileHandler(log_file, mode='a')
    file_handler.setFormatter(logging.Formatter("[%(levelname)s] %(asctime)s - %(message)s:%(lineno)d"))
    logger.addHandler(file_handler)

    return logger

References

https://alexandra-zaharia.github.io/posts/make-your-own-custom-color-formatter-with-python-logging/

Databricks: Set Spark Configs

This post is how to set the spark configs on Databricks or Synapse Notebooks.

First you will need a spark session. Refer to PySpark: Create a Spark Session for more details.

secret = 'value' #I highly suggest you get the password from the keyvault
storage_account = ''
application_id = ''
tenant_id = ''

spark.config.set('fs.azure.account.auth.type.{}.dfs.core.windows.net'.format(storage_account), 'OAuth')

spark.config.set('fs.azure.account.oauth.provider.type.{}.dfs.core.windows.net'.format(storage_account), 'org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider')

spark.config.set('fs.azure.account.oauth2.client.id.{}.dfs.core.windows.net'.format(storage_account), application_id)

spark.config.set('fs.azure.account.oauth2.client.secret.{}.dfs.core.windows.net'.format(storage_account), secret)

spark.config.set('fs.azure.account.oauth2.client.endpoint.{}.dfs.core.windows.net'.format(storage_account), 'https://login.microsoftonline.com/{}/oauth2/token'.format(tenant_id))

If you are running in Databricks you could add them to cluster start. Although I recommand doing it in a notebook instead.

spark.hadoop.fs.azure.account.auth.type.<STORAGE_ACCOUNT>.dfs.core.windows.net OAuth
fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT>.dfs.core.windows.net org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT>.dfs.core.windows.net <CLIENT_ID>
fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT>.dfs.core.windows.net secret
fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT>.dfs.core.windows.net https://login.microsoftonline.com/<TENANT_ID>/oauth2/token

 

 

PySpark: Read From ADLS to DataFrame

This how-to is how to read from ADLS to a DataFrame.

First we need a spark Session. See PySpark: Create a Spark Session for my details on that.

Read a CSV from ADLS

path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/'
format = 'csv'

#you don't need "header" if it is not CSV

dataframe = spark.read.format(format) \
  .option('header', True) \
  .schema(schema) \
  .load(path)

Read Parquet from ADLS

path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/' format = 'parquet'

dataframe = spark.read.format(format) \
    .load(path)

Read Delta from ADLS

path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/' format = 'delta'

dataframe = spark.read.format(format) \
    .load(path)

 

PySpark: Save a DataFrame To ADLS

This how-to is how to save a DataFrame to ADLS

First we need a spark Session. See PySpark: Create a Spark Session for my details on that.

Then we need to create a DataFrame. See PySpark: Create a DataFrame.

Then we do the following:

You should note you don’t need all the options below. I just gave an example.

path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/'
mode = 'overwrite'
format = 'delta'
partitions = []

df.write.mode(mode).format(format).option('mergeSchema', False).partitionBy(*partitions).save(path)

 

 

 

 

 

 

PySpark: Create a Spark Session

This post is how to create a Spark Session

Imports

from pyspark.sql import SparkSession

Create the Spark Session

spark = SparkSession.builder.appName('pyspark_app_name').getOrCreate()

You can add any configs you wish during creation. You would add this before the “.getOrCreate()”.

You can see a list here

  • .config(“spark.sql.jsonGenerator.ignoreNullFields”, “false”)
    • When reading JSON you will not ignore NULL fields
  • .config(“spark.sql.parquet.int96RebaseModeInWrite”, “CORRECTED”)
    • Fixes issues in timestamps in write operations
  • .config(“spark.sql.parquet.int96RebaseModeInRead”, “CORRECTED”)
    • Fixes issues in timestamps in read operations

 

PySpark: Create a DataFrame

This post is how to create a DataFrame in pyspark.

First we need a spark Session. See PySpark: Create a Spark Session for my details on that.

Next we need to import

from pyspark.sql import Row
from pyspark.sql.types import StringType, DecimalType, TimestampType, FloatType, IntegerType, LongType, StructField, StructType

Then you create the schema

schema = StructType([
    StructField('id', IntegerType()),
    .....
])

data = [Row(id=1)]

Create the DataFrame

df = spark.createDataFrame(data, schema=schema)

If you want to use a JSON file to build your schema do the following

import json
from pyspark.sql.types import StructType

data = {
    "fields": [
        {
            "metadata": {},
            "name": "column_a",
            "nullable": false,
            "type": "string"
        }
    ],
    "type": "struct"
}

json_schema = json.loads(data)
table_schema = StructType.fromJson(dict(json_schema))

df = spark.createDataFrame(data, schema=table_schema)

 

Python: Unit Testing

This post focus’ on common hurdles when trying to do unit testing.

Testing Values During Run

You add the following line to anywhere you want to pause the unit test to check values.

import pdb
pdb.set_trace()

How to Patch a Function

from unittest.mock import path

@patch('src.path.to.file.my_function')
@path('src.path.to.file.my_function_add')
def test_some_function(mock_my_function_add, mock_my_function):
    mock_function_add.return_value = <something>
    .......

How to Patch a Function With No Return Value

from unittest.mock import patch

def test_some_function():
    with(patch('src.path.to.file.my_function'):
        ...

How to Patch a Function With 1 Return Value

from unittest.mock import patch

def test_some_function():
    with(patch('src.path.to.file.my_function', MagicMock(return_value=[<MY_VALUES>])):
        ...

How to Patch a Function With Multiple Return Value

from unittest.mock import patch

def test_some_function():
    with(patch('src.path.to.file.my_function', MagicMock(side-effect=[[<MY_VALUES>], [<OTHER_VALUES>]])):
        ...

How to Create a Test Module

from unittest import TestCase

class MyModule(TestCase):
    def setUp(self):
        some_class.my_variable = <something>
        ... DO OTHER STUFF
    def test_my_function(self):
        ... DO Function Test Stuff

How to Patch a Method

patch_methods = [
    "pyodbc.connect"
]

for method in patch_methods:
    patch(method).start()

How to create a PySpark Session

Now once you do this you can just call spark and it will set it.

import pytest
from pyspark.sql import SparkSession

@pytest.fixture(scope='module')
def spark():
    return (SparkSession.builder.appName('pyspark_test').getOrCreate())

How to Create a Spark SQL Example

import pytest
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType

@pytest.fixture(scope='module')
def spark():
    return (SparkSession.builder.appName('pyspark_test').getOrCreate())

def test_function(spark):
    query = 'SELECT * FROM SOMETHING'
    schema = StructType([
        StructField('column_a', StringType()),
        StructField('column_b', StringType()),
        StructField('column_c', StringType()),
    ])

data = [Row(column_a='a', column_b='b', column_c='c')]
table = spark.createDataFrame(data, schema=schema)
table.createOrReplaceTempView('<table_name>')
df = spark.sql(query).toPandas()

assert not df.empty
assert df.shape[0] == 1
assert df.shape(1) == 5

spark.catalog.dropTempView('<table_name>')

How to Mock a Database Call

First let’s assume you have an exeucte sql function

def execute_sql(cursor, sql, params):
    result = cursor.execute(sql, params).fetchone()
    connection.commit()
    return result

Next in your unit tests you want to test that funciton

def test_execute_sql():
    val = <YOUR_RETURN_VALUE>
    with patch('path.to.code.execute_sql', MagicMock(return_value=val)) as mock_execute:
        return_val = some_other_function_that_calls_execute_sql(....)
        assert return_val == val

If you need to close a cursor or DB connection

def test_execute_sql():
    val = <YOUR_RETURN_VALUE>
    mock_cursor = MagicMock()
    mock_cursor.configure_mock(
        **{
              "close": MagicMock()
         }
    )
    mock_connection = MagicMock()
    mock_connection.configure_mock(
        **{
            "close": MagicMock()
        }
    )

    with patch('path.to.code.cursor', MagicMock(return_value=mock_cursor)) as mock_cursor_close:
        with patch('path.to.code.connection', MagicMock(return_value=mock_connection)) as mock_connection_close:
            return_val = some_other_function_that_calls_execute_sql(....)
            assert return_val == val

How to Mock Open a File Example 1

@patch('builtins.open", new_callable=mock_open, read_data='my_data')
def test_file_open(mock_file):
    assert open("my/file/path/filename.extension").read() == 'my_data'
    mock_file.assert_called_with("my/file/path/filename.extension")

    val = function_to_test(....)
    assert 'my_data' == val

How to Mock Open a File Example 2

def test_file_open():
    fake_file_path = 'file/path/to/mock'
    file_content_mock = 'test'
    with patch('path.to.code.function'.format(__name__), new=mock_open(read_data=file_content_mock)) as mock_file:
        with patch(os.utime') as mock_utime:
            actual = function_to_test(fake_file_path)
            mock_file.assert_called_once_with(fake_file_path)
            assertIsNotNone(actual)

Compare DataFrames

def as_dicts(df):
    df = [row.asDict() for row in df.collect()]
    return sorted(df, key=lambda row: str(row))

assert as_dicts(df1) == as_dicts(df2)