Spark Connector Connect to SQL Server

This post is how to use the Spark Connector to Connect to SQL Server.

Install Spark Connector

spark-mssql-connector_2.12-1.2.0.jar

Install msal

pip install msal

Connect using Azure SPN

import msal
global_token_cache = msal.TokenCache()

secret = "<GET SECRET SECURELY>"

global_spn_app = msal.ConfidentialClientApplication(
    <CLIENT_ID>, Authority='https://login.microsoftonline.com/<TENANT_ID>',
    client_credential=secret,
    token_cache=global_token_cache,
)

result = global_spn_app.acquire_token_for_client(scopes=['https://database.windows.net//.default'])

jdbc_df = spark.read \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .option("url", 'jdbc:sqlserver://<SERVER_NAME>:<PORT>;database=<DATABASE>;') \
    .option("query", "SELECT * FROM SOMETHING") \
    .option("accessToken", result['access_token']) \
    .option("encrypt", "true") \
    .option("hostNameInCertificate", "*.database.windows.net") \
    .load()

Connect using Domain Auth

secret = "<GET SECRET SECURELY>"

jdbc_df = spark.read \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .option("url", 'jdbc:sqlserver://<SERVER_NAME>:<PORT>;database=<DATABASE>;') \
    .option("query", "SELECT * FROM SOMETHING") \
    .option("authentication", "ActiveDirectoryPassword") \
    .option("user", "<USER>@<DOMAIN>") \
    .option("password", "<SECRET>") \
    .load()

Connect using SQL Auth

I do not recommend SQL Auth

secret = "<GET SECRET SECURELY>"

jdbc_df = spark.read \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .option("url", 'jdbc:sqlserver://<SERVER_NAME>:<PORT>;database=<DATABASE>;') \
    .option("query", "SELECT * FROM SOMETHING") \
    .option("user", "<USER>") \
    .option("password", "<SECRET>") \
    .load()

 

 

 

Python: pyodbc with SQL Server

This post is in regards to connecting to SQL Server using pyodbc.

Install package

pip install pyodbc

If you are running in Databricks then the current driver will be “{ODBC Driver 17 for SQL Server}”.

If you are running in Synapse then the current driver will be “{ODBC Driver 18 for SQL Server}”.

Check pyodbc Version

import pyodbc
pyodbc.drivers()

Check Which Version of pyodbc in Databricks

%sh
cat /etc/odbcinst.ini

Install Databricks driver 17

curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
curl https://packages.microsoft.com/config/ubuntu/20.04/prod.list > /etc/apt/sources.list.d/mssql-release.list
apt-get update
ACCEPT_EULA=Y apt-get install msodbcsql17
apt-get -y install unixodbc-dev

Connect using SQL Auth

I do not recommend SQL Auth

import pyodbc

secret = "<GET SECRET SECURELY>"

connection = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};Server=tcp:<SERVER_NAME>;PORT=<PORT>;Database=<DATABASE>;Uid=<USER>;Pwd=<SECRET>;Encrypt=yes;TrustServerCertificate=no;Connection Timeout=<TIMEOUT>;')

Connect Using Domain Auth

import pyodbc

secret = "<GET SECRET SECURELY>"

connection = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};Server=tcp:<SERVER_NAME>;PORT=<PORT>;Database=<DATABASE>;Uid=<USER>;Pwd=<SECRET>;Encrypt=yes;TrustServerCertificate=no;Connection Timeout=<TIMEOUT>;Authentication=ActiveDirectoryPassword')

Connect using Azure SPN

pip install msal
import struct
import msal

global_token_cache = msal.TokenCache()
secret = "<GET SECRET SECURELY>"

global_spn_app = msal.ConfidentialClientApplication(
    <CLIENT_ID>, Authority='https://login.microsoftonline.com/<TENANT_ID>',
    client_credential=secret,
    token_cache=global_token_cache,
)

result = global_spn_app.acquire_token_for_client(scopes=['https://database.windows.net//.default'])
SQL_COPT_SS_ACCESS_TOKEN = 1256

token = bytes(result['access_token'], 'utf-8')
exptoken = b"";

for i in token:
    exptoken += bytes({i});
    exptoken += bytes(1);

token_struct = struct.pack("=i", len(exptoken)) + exptoken;

connection = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};Server=tcp:<SERVER_NAME>;PORT=<PORT>;Database=<DATABASE>;Uid=<USER>;Pwd=<SECRET>;Encrypt=yes;TrustServerCertificate=no;Connection Timeout=<TIMEOUT>;' attrs_before = { SQL_COPT_SS_ACCESS_TOKEN:tokenstruct })

Once you have the connection you can setup the cursor.

cursor = connection.cursor()

Then execute a command

command = "<COMMAND>"
params = ()
cursor.execute(command, params)
connection.commit()

After you Are finish Close

cursor.close()
connection.close()

 

Python: Arguments

This post is in how do use argparse package.

First you must import the package.

import argparse

Next you setup the argument parser.

parser = argparse.ArgumentParser()

Then you create a list of arguments. See the link above for more options then the below set.

argument_list = [
    { "name": "<NAME>", "help": "<HELP_TEXT>", "type": "<TYPE>", "required": True}
]

Then we take your argument_list and create arguments and assign them to the parser.

for arg in argument_list:
    parser.add_argument("--{}".format(arg["name"], help=arg["help"], type=arg["type"], required=arg["required"])

Then we parse the args from “sys.argv”. Parsing args this way means that if anything is unknown to your program than your program won’t fail but instead it will set those variables to the unknown variable and continue your application.

args, unknown = parser.parse_known_args()

You could also parse the args from “sys.argv” this way. However that means that all the args passed to sys.argv must be known otherwise it will fail.

args = parser.parse_args()

Then as a final step we set the values with their key to the config.

config = vars(args)

 

 

 

Python: lxml

This post focus’ on the lxml package.

First you need to install the package

from lxml import etree

Create xml object by string

xml_str = "<root><subitem attr='test'>rec</subitem></root>"
root = etree.fromstring(xml_str)

Get text in node

text_str = root.xpath('//root/subitem/text()')[0]

Get Attribute

attr = root.xpath('//root/subitem')[0].attrib['attr']

 

Python: Create a Logger

This post is how-to create a logger.

First we need to import

import sys
import logging
from datetime import datetime
from pytz import timezone

Then we create a class for Formatter

class CustomFormatter(logging.Formatter):
    grey = "\x1b[38;20m"
    reset = "\x1b[0m"
    format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s (%(filename)s:)"
    FORMATS = {
        logging.DEBUG: '\x1b[38;5;23m' + format + reset,
        logging.INFO: grey + format + reset,
        logging.WARNING: '\x1b[38;5;56m' + format + reset,
        logging.ERROR: '\x1b[38;5;197m' + format + reset,
        logging.CRITICAL: '\x1b[38;5;1m' + format +reset
    }

    def format(self, record):
        log_fmt = self.FORMATS.get(record.levelno)
        formatter = logging.Formatter(log_fmt)
        return formatter.format(record)

Then we create a function set our logger up.

def set_logger(logging_level, name, log_dir, timezone):
    LOGGING_LEVELS = ['WARNING','INFO','DEBUG','ERROR']
    if logging_level not in LOGGING_LEVELS:
        logging_level = 'INFO'

    level_lookup = {
        'WARNING': logging.WARNING,
        'INFO': logging.INFO,
        'DEBUG': logging.DEBUG,
        'ERROR': logging.ERROR,
    }
    logging.Formatter.converter = lambda *args: datetime.now(tz=timezone(timezone)).timetuple()
    logging.basicConfig(level=level_lookup[logging_level], format="[%(levelname)s] %(asctime)s - %(message)s:%(lineno)d")
    stream_handler = logging.StreamHandler(sys.stdout)
    stream_handler.setFormatter(CustomFormatter())
    logger = logging.getLogger(name)
    logger.addHandler(stream_handler)
    logger.setLevel(logging_level)

    Path(log_dir).mkdir(parents=True, exist_ok=True)

    now = datetime.now(tz=timezone(timezone))
    now = now.strftime("%H-%M-%S")

    log_file = '%slog_%s.log' % (log_dir, now)
    file_handler = logging.FileHandler(log_file, mode='a')
    file_handler.setFormatter(logging.Formatter("[%(levelname)s] %(asctime)s - %(message)s:%(lineno)d"))
    logger.addHandler(file_handler)

    return logger

References

https://alexandra-zaharia.github.io/posts/make-your-own-custom-color-formatter-with-python-logging/

Python: Unit Testing

This post focus’ on common hurdles when trying to do unit testing.

Testing Values During Run

You add the following line to anywhere you want to pause the unit test to check values.

import pdb
pdb.set_trace()

How to Patch a Function

from unittest.mock import path

@patch('src.path.to.file.my_function')
@path('src.path.to.file.my_function_add')
def test_some_function(mock_my_function_add, mock_my_function):
    mock_function_add.return_value = <something>
    .......

How to Patch a Function With No Return Value

from unittest.mock import patch

def test_some_function():
    with(patch('src.path.to.file.my_function'):
        ...

How to Patch a Function With 1 Return Value

from unittest.mock import patch

def test_some_function():
    with(patch('src.path.to.file.my_function', MagicMock(return_value=[<MY_VALUES>])):
        ...

How to Patch a Function With Multiple Return Value

from unittest.mock import patch

def test_some_function():
    with(patch('src.path.to.file.my_function', MagicMock(side-effect=[[<MY_VALUES>], [<OTHER_VALUES>]])):
        ...

How to Create a Test Module

from unittest import TestCase

class MyModule(TestCase):
    def setUp(self):
        some_class.my_variable = <something>
        ... DO OTHER STUFF
    def test_my_function(self):
        ... DO Function Test Stuff

How to Patch a Method

patch_methods = [
    "pyodbc.connect"
]

for method in patch_methods:
    patch(method).start()

How to create a PySpark Session

Now once you do this you can just call spark and it will set it.

import pytest
from pyspark.sql import SparkSession

@pytest.fixture(scope='module')
def spark():
    return (SparkSession.builder.appName('pyspark_test').getOrCreate())

How to Create a Spark SQL Example

import pytest
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType

@pytest.fixture(scope='module')
def spark():
    return (SparkSession.builder.appName('pyspark_test').getOrCreate())

def test_function(spark):
    query = 'SELECT * FROM SOMETHING'
    schema = StructType([
        StructField('column_a', StringType()),
        StructField('column_b', StringType()),
        StructField('column_c', StringType()),
    ])

data = [Row(column_a='a', column_b='b', column_c='c')]
table = spark.createDataFrame(data, schema=schema)
table.createOrReplaceTempView('<table_name>')
df = spark.sql(query).toPandas()

assert not df.empty
assert df.shape[0] == 1
assert df.shape(1) == 5

spark.catalog.dropTempView('<table_name>')

How to Mock a Database Call

First let’s assume you have an exeucte sql function

def execute_sql(cursor, sql, params):
    result = cursor.execute(sql, params).fetchone()
    connection.commit()
    return result

Next in your unit tests you want to test that funciton

def test_execute_sql():
    val = <YOUR_RETURN_VALUE>
    with patch('path.to.code.execute_sql', MagicMock(return_value=val)) as mock_execute:
        return_val = some_other_function_that_calls_execute_sql(....)
        assert return_val == val

If you need to close a cursor or DB connection

def test_execute_sql():
    val = <YOUR_RETURN_VALUE>
    mock_cursor = MagicMock()
    mock_cursor.configure_mock(
        **{
              "close": MagicMock()
         }
    )
    mock_connection = MagicMock()
    mock_connection.configure_mock(
        **{
            "close": MagicMock()
        }
    )

    with patch('path.to.code.cursor', MagicMock(return_value=mock_cursor)) as mock_cursor_close:
        with patch('path.to.code.connection', MagicMock(return_value=mock_connection)) as mock_connection_close:
            return_val = some_other_function_that_calls_execute_sql(....)
            assert return_val == val

How to Mock Open a File Example 1

@patch('builtins.open", new_callable=mock_open, read_data='my_data')
def test_file_open(mock_file):
    assert open("my/file/path/filename.extension").read() == 'my_data'
    mock_file.assert_called_with("my/file/path/filename.extension")

    val = function_to_test(....)
    assert 'my_data' == val

How to Mock Open a File Example 2

def test_file_open():
    fake_file_path = 'file/path/to/mock'
    file_content_mock = 'test'
    with patch('path.to.code.function'.format(__name__), new=mock_open(read_data=file_content_mock)) as mock_file:
        with patch(os.utime') as mock_utime:
            actual = function_to_test(fake_file_path)
            mock_file.assert_called_once_with(fake_file_path)
            assertIsNotNone(actual)

Compare DataFrames

def as_dicts(df):
    df = [row.asDict() for row in df.collect()]
    return sorted(df, key=lambda row: str(row))

assert as_dicts(df1) == as_dicts(df2)

Python: Create a WHL File

This post will just be a how-to on creating a whl file.

You need the following files:

Manifest.in:

recursive-include <directory> *
recursive-exclude tests *.py

Requirements.txt:

This file just holds your packages and the version.

Setup.py

You remove pytest and coverage from your whl file because you don’t want those applications being required when you deploy your code.

from setuptools import find_packages
from distutils.core import setup
import os
import json

if os.path.exists('requirements.txt'):
    req = [line.strip('\n') for line in open('requirements.txt') if 'pytest' not in line and 'coverage' not in line]

setup(
    include_package_data=True,
    name=<app_name>,
    version=<app-version>,
    description=<app_desc>,
    install_requires=req,
    packages=find_packages(excude=["*tests.*","*tests"]),
    classifiers=[
        "Programming Language :: Python || <python_Version>",
        "License || OSI Approved :: MIT License",
        "Operating System :: OS Independent",
    ],
    python_requires='>=<python_version>',
    package_dir={<directory>: <directory>},
)

To Check Your Whl File

Install package

pip install check-wheel-contents

Check WHL

check-wheel-contents <PATH_TO_WHL>\<filename>.whl

Install WHL

This will deploy to <PATH_TO_PYTHON>\Lib\site-packages\<directory>

<PATH_TO_PYTHON>\Scripts\pip3.7.exe install <PATH_TO_WHL>\<filename>.whl

 

 

 

Python: xlrd (Read Excel File)

In this tutorial I will show you how to read an excel file in Python.

Installation

pip install xlrd

Open The Workbook

import xlrd

my_excel = (r'C:\path\to\file')
wb = xlrd.open_workbook(my_excel)

Select Sheet

# Select the first sheet. If you want to select the third just change to (3)
sheet = wb.sheet_by_index(0)

Get Data In Column

#This loops through all the rows in that sheet
for i in range(sheet.nrows):
        # if the value isn't empty then print it out.
        if sheet.cell_value(i, 0) != '':
            print(sheet.cell_value(i, 0))

Get all the Column Header

#This loops through all the rows in that sheet
for i in range(sheet.ncols):
        # if the value isn't empty then print it out.
        if sheet.cell_value(0, i) != '':
            print(sheet.cell_value(0, i))

 

Django: React Website

In this tutorial I will demonstrate how to create a Django + React website using Django 2.0. You must have Eclipse installed before you continue. If you have it already installed and configured you can continue on. We will require Postgres 9.4, nodejs before you continue. You can get Nodejs from here. You can get Postgres 9.4 from here.

Pip Django Install:
pip install django
pip install django-webpack-loader
Django Version:

If you are not sure what version you are running do the following

python -c "import django; print(django.get_version())"
Eclipse Create Project:

 

 

 

 

 

 

Eclipse Setup Project:

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Eclipse Django DB Settings:

 

 

 

 

 

 

 

 

 

 

 

 

 

Eclipse Django Setup Successful:

Once you click “Finish” your project will look like the following.

 

 

 

Folder Structure:
  • Under djangoApp project.
  • folder: static
  • folder: djangoApp
    • folder: templates
      • file: index.html
      • folder: base
        • file: base.html
  • folder: assets
    • folder: bundles
    • folder: js
      • file: index.jsx
Node:

Inside the djangoApp application do the following

npm init
npm install --save-dev jquery react react-dom webpack webpack-bundle-tracker babel-loader babel-core babel-preset-es2015 babel-preset-react
npm install create-react-class --save
webpack.config.js:
var path = require('path')
var webpack = require('webpack')
var BundleTracker = require('webpack-bundle-tracker')

module.exports = {
    //the base directory (absolute path) for resolving the entry option
    context: __dirname,
    //the entry point we created earlier. Note that './' means 
    //your current directory.
    entry: {
		"index": [path.resolve(__dirname, "./assets/js/index.jsx")],
	},
	output: {
		path: path.resolve('./assets/bundles/'),
		filename: "[name]-[hash].js",
	},
    plugins: [
        //tells webpack where to store data about your bundles.
        new BundleTracker({filename: './webpack-stats.json'}), 
        //makes jQuery available in every module
        new webpack.ProvidePlugin({ 
            $: 'jquery',
            jQuery: 'jquery',
            'window.jQuery': 'jquery' 
        })
    ],
    module: {
        loaders: [
		{
			test: /\.jsx?$/,
			exclude: /(node_modules)/,
			loader: 'babel-loader',
			query: {
				presets: ['react','es2015']
			}
		}
        ]
    }
}
djangoApp\Settings.py:

Installed Apps

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'webpack_loader',
]

Add/Edit the following template directive

TEMPLATES = [
 {
    'BACKEND': 'django.template.backends.django.DjangoTemplates',
    'DIRS': [os.path.join(BASE_DIR, 'djangoApp', 'templates'),],
    'APP_DIRS': True,
    'OPTIONS': {
        'context_processors': [
            'django.template.context_processors.debug',
            'django.template.context_processors.request',
            'django.contrib.auth.context_processors.auth',
            'django.contrib.messages.context_processors.messages',
        ],
    },
},]

Add the following static directive

STATIC_URL = '/static/'

STATICFILES_DIRS = [
    os.path.join(BASE_DIR, 'assets'),
]

Modify DATABASES

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql_psycopg2',
        'NAME': 'YOUR_DB_NAME',
        'USER': 'YOUR_USER',
        'PASSWORD': 'YOUR_PASSWORD',
        'HOST': 'localhost',
        'PORT': 5432
    }
}

Webpack Loader

WEBPACK_LOADER = {
    'DEFAULT': {
        'BUNDLE_DIR_NAME': 'bundles/',
        'STATS_FILE': os.path.join(BASE_DIR, 'webpack-stats.json'),
    }
}
djangoApp\views.py:

We will create our index page view. Notice the third dict. Those are variables passed to the template to make our site dynamic

from django.shortcuts import render

def index(request):
    return render(request, 'index.html', {'title': 'Index Page', 'script_name': 'index'})
djangoApp\urls.py:

Add the following imports

from django.conf.urls import url
#This is the index view we created above
from djangoApp.views import index

urlpatterns = [
    url(r'^$', index, name='index'),
    path('admin/', admin.site.urls),
]
djangoApp\templates\base\base.html:

Let’s setup our base template and setup our blocks that the other templates will inherit from.

<html>
	<head>
		<title>{% block title %}{% endblock %}</title>
	</head>
	<body>
		{% block content %}
		{% endblock %}
	</body>
</html>
djangoApp\templates\index.html:

The important parts here are the extends otherwise your base.html template won’t be inherited. As well the {% with %} and title variable makes our template dynamic and allows us to incorporate react in our site.

{% extends "base/base.html"  %}
{% load render_bundle from webpack_loader %}
{% load staticfiles %}
{% block title %}
	{{title}}
{% endblock %}
{% block content %}
	<div id="container"></div>
	{% with script=script_name %}
		{% render_bundle script 'js' %}
	{% endwith %} 
{% endblock %}
assets\js\index.jsx:

This is our react class.

var React = require('react');
var ReactDOM = require('react-dom');
var createReactClass = require('create-react-class');

var App = createReactClass({
    render: function() {
        return (
            <h1>
            React App Page
            </h1>
        )
    }
});

ReactDOM.render(<App />, document.getElementById('container'));
Database Setup/Migration:

For this tutorial we used postgres. At this time please make sure you create your djangoApp db and user you specified in the settings.py file. Then run the following commands in order.

#Migrates the auth
python manage.py migrate auth
#migrates the rest
python manage.py migrate
#Create the user for accessing the django admin ui
#This will ask you for user names and passwords. Don't make it the same as in your settings.py file.
python manage.py createsuperuser
Start Server:
webpack -p
python manage.py runserver

Your site is now running at http://localhost:8000.

Your admin site is now running at http://localhost:8000/admin/.

 

References:

I used this video as a guideline to get the project started. However some didn’t work right and needed to adjust and made adjustments to require just one template, etc.

Avro & Python: How to Schema, Write, Read

I have been experimenting with Apache Avro and Python. Below is what I have learned thus far.

Pip Install

At the time of this writing I am using 1.8.2.

pip install avro-python3

Schema

There are so many different ways to work with the schema definition. There are primitive and complex types. You can find way more documentation on the schema definition here.

import json
import avro.schema

my_schema = avro.schema.Parse(json.dumps(
{
    'namespace': 'test.avro',
    'type': 'record',
    'name': 'MY_NAME',
    'fields': [
        {'name': 'name_1', 'type': 'int'},
        {'name': 'name_2', 'type': {'type': 'array', 'items': 'float'}},
        {'name': 'name_3', 'type': 'float'},
    ]
}))

Method 1

Write

from avro.datafile import DataFileWriter
from avro.io import DatumWriter
import io

#write binary
file = open(filename, 'wb')

datum_writer = DatumWriter()
fwriter = DataFileWriter(file, datum_writer, my_schema)
fwriter.append({'name_1': 645645, 'name_2': [5.6,34.7], 'name_3': 644.5645})
fwriter.close()

Write Deflate

from avro.datafile import DataFileWriter
from avro.io import DatumWriter

#write binary
file = open(filename, 'wb')

datum_writer = DatumWriter()
fwriter = DataFileWriter(file, datum_writer, my_schema, codec = 'deflate')
fwriter.append({'name_1': 645645, 'name_2': [5.6,34.7], 'name_3': 644.5645})
fwriter.close()

Append

from avro.datafile import DataFileWriter
from avro.io import DatumWriter
import io

#append binary
file = open(filename, 'a+b')

datum_writer = DatumWriter()
#Notice that the schema is not added the the datafilewriter. This is because you are appending to an existing avro file
fwriter = DataFileWriter(file, datum_writer)
fwriter.append({'name_1': 645675, 'name_2': [5.6,34.9], 'name_3': 649.5645})
fwriter.close()

Read Schema

from avro.datafile import DataFileReader
from avro.io import DatumReader

file = open(filename, 'rb')
datum_reader = DatumReader()
file_reader = DataFileReader(file, datum_reader)

print(file_reader .meta)

Read

from avro.datafile import DataFileReader
from avro.io import DatumReader

#read binary
fd = open(filename, 'rb')
datum_reader = DatumReader()
file_reader = DataFileReader(fd, datum_reader)

for datum in file_reader:
	print(datum['name_1'])
	print(datum['name_2'])
	print(datum['name_3'])
file_reader.close()

Method 2

Write/Append BinaryEncoder

import io
from avro.io import DatumWriter, BinaryEncoder

#write binary
file = open(filename, 'wb')
#append binary
file = open(filename, 'a+b')
bytes_writer = io.BytesIO()
encoder = BinaryEncoder(bytes_writer)
writer_binary = DatumWriter(my_schema)
writer_binary.write({'name_1': 645645, 'name_2': [5.6,34.7], 'name_3': 644.5645}, encoder)
file.write(bytes_writer.getvalue())

Read BinaryDecoder

import io
from avro.io import DatumReader, BinaryDecoder

file = open(filename, 'rb')
bytes_reader = io.BytesIO(file.read())
decoder = BinaryDecoder(bytes_reader)
reader = DatumReader(my_schema)

while True:
	try:
		rec = reader.read(decoder)
		print(rec['name_1'])
		print(rec['name_2'])
		print(rec['name_3'])
	except:
		break

 

 

 

Python: Run Process

If you want to run a jar from python or really any process. You do so by leveraging subprocess package.

from subprocess import Popen, PIPE

Then you need to call Popen. If you want to set java memory you can do so using -Xms and -Xmx in between java and -jar.

#bufsize of 1 is line buffered
#stdout and stderr to PIPE is to pipe the output of std out and std error to the PIPE so you can get the output
result = Popen(['java -jar myapp.jar'], stdout=PIPE, stderr=PIPE, shell=False, bufsize=1)

If you want your process to wait until finished you will need to call wait.

result.wait()

If you pushed the stderr and stdout then you can check the output.

if result.stdout is not None:
    for line in result.stdout:
        print(line)

if result.stderr is not None:
    for line in result.stderr:
        print(line)

Python: Logging

If you want to do some basic logging to a file, etc. You can use the logging package that comes with python. Here are some of the basic ways to log.

You first have to import the package.

import logging

You can setup your own logging configuration but for this we will just use the basic setup and log to a file.

#If you are going to have multiple handlers you should setup your handler
logging.root.handlers = []

#The file to log to
log_file = /mnt/log/

#Setup the config with the level to log up to
logging.basicConfig(filename=log_file, level=logging.INFO)

Then you setup your logger

logger = logging.getLogger('my_awesome_log')

If you want your log to truncate after a certain size then you must add the handler for truncating the log and back. If you do not use the rotatingfilehandler then the log will increase till your drive runs out of space.

handler = RotatingFileHandler(log_file, maxBytes=1024, backupCount=1)
logger.addHandler(handler)

If you also want to log to console you will need to add an additional handler for the console setting the level to log.

console = logging.StreamHandler()
console.setLevel(logging.INFO)
logger.addHandler(console)

That’s it a basic example of how to use the logging package.

 

Python: Multiprocessing Pool

Sometimes we want to run a method using multiple processors to process our code due to a costly function. Below is an example of how you could do it. There is other api’s you could use like ‘map’ but here is just one example.

from multiprocessing import Pool
# Sets the pool to utilize 4 processes
pool = Pool(processes=4)
result = pool.apply_async(func=my_method, args=("some_info",))
# Performs the aync function
data = result.get()
pool.close()

Python: Selenium Tests

Selenium is a great way to test your UI. It is compatible with different browsers. I will show you two.

Gecko Driver Installation:

Make sure you are using latest version. At the time of this writing it is 0.19.0.

wget https://github.com/mozilla/geckodriver/releases/download/v0.19.0/geckodriver-v0.19.0-linux64.tar.gz
sudo tar -xvzf geckodriver-v0.19.0-linux64.tar.gz
sudo chmod +x geckodriver
cp geckodriver /usr/local/bin/
sudo cp geckodriver /usr/local/bin/

You can use phantomjs, firefox, chrome, etc.

PhantomJS Installation:

sudo mv phantomjs-2.1.1-linux-x86_64.tar.bz2 /usr/local/share/.
cd /usr/local/share/
sudo tar xjf phantomjs-2.1.1-linux-x86_64.tar.bz2
sudo ln -s /usr/local/share/phantomjs-2.1.1-linux-x86_64 /usr/local/share/phantomjs
sudo ln -s /usr/local/share/phantomjs/bin/phantomjs /usr/local/bin/phantomjs

Firefox Installation:

sudo apt-get update
wget https://ftp.mozilla.org/pub/firefox/releases/50.0/linux-x86_64/en-US/firefox-50.0.tar.bz2
sudo tar -xjf firefox-50.0.tar.bz2
sudo rm -rf /opt/firefox
sudo mv firefox /opt/firefox
sudo mv /usr/bin/firefox /usr/bin/firefoxold
sudo ln -s /opt/firefoxX/firefox /usr/bin/firefox

Firefox Headless Installation:

sudo apt-get install xvfb
pip3 install pyvirtualdisplay==0.2.1

Selenium Installation:

pip3 install selenium==3.6.0

PyUnit Selenium Test Examples:

Setup:

#If you are using headless firefox
from pyvirtualdisplay import Display
#The selenium imports
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
import unittest, os, time

class MySeleniumTests(unittest.TestCase):
    @classmethod
    def setUpClass(self):
        self.server_url = "http://" + os.getenv("WEBSITE_URL", 'localhost:5000')

    def setUp(self):
        #if you are using firefox headless browser
        display = Display(visible=0, size=(1080, 720))
        display.start()
		
        #Firefox selenium driver.
        self.driver = webdriver.Firefox()
		
        #PhantomJS selenium driver
        self.driver = webdriver.PhantomJS()
		
        self.driver.implicitly_wait(60)
        self.driver.set_page_load_timeout(60)
        self.driver.set_window_size(1080, 720)
        self.base_url = self.server_url

        self.driver.get(self.base_url + "/")
		
        #If your site has a login then you need to set the username and password first.
        self.driver.find_element_by_id("user").clear()
        self.driver.find_element_by_id("user").send_keys(USERNAME)
        self.driver.find_element_by_id("password").clear()
        self.driver.find_element_by_id("password").send_keys(PWD)
        self.driver.find_element_by_id("submit").click()
        time.sleep(1)

    def tearDown(self):
        self.driver.quit()

if __name__ == "__main__":
    unittest.main()

Test Title:

self.driver.get(self.server_url)
self.assertIn("MySite", self.driver.title)

Find Class:

self.assertTrue(WebDriverWait(self.driver, 10).until(EC.visibility_of_element_located((By.CLASS_NAME, "my-awesome-class"))))

Find ID:

self.assertTrue(WebDriverWait(self.driver, 10).until(EC.visibility_of_element_located((By.ID, "myId"))))

Find Partial Text:

self.driver.find_element_by_partial_link_text("My Text On Page")

Find Element Contains Text:

self.assertTrue('MyText' in self.driver.find_element_by_id('container').text)

Click Element:

self.driver.find_element_by_id('myId').click()

Wait Element To Show:

self.assertTrue(WebDriverWait(self.driver, 10).until(EC.text_to_be_present_in_element((By.ID, 'MyID'), "Text To See")))

xPath Click Second Element:

self.driver.find_element_by_xpath("(//div[@class='my-awesome-class'])[1]").click()

Clear Input:

self.driver.find_element_by_id("myId").clear()

Send Data To Input:

self.driver.find_element_by_id("myId").send_keys('My New Data')

 

 

Python: MRJob

If you use hadoop and you want to run a map reduce type job using Python you can use MRJob.

Installation:

pip install mrjob

Here is an example if you run just the mapper code and you load a json file. yield writes the data out.

from mrjob.job import MRJob, MRStep
import json

class MRTest(MRJob):
    def steps(self):
        return [
            MRStep(mapper=self.mapper_test)
        ]

    def mapper_test(self, _, line):
        result = {}
        doc = json.loads(line)

        yield key, result

if __name__ == '__main__':
    MRTest.run()

Python: Working with DateTimes

In this tutorial I will show you the different ways of working with dates and times in python. Which includes working with milliseconds. You should note this isn’t all available options just some that I have encountered over the years.

Install Python Packages:

Open cmd/terminal and if required navigate to your sites working folder. (note: if you are working in a virtual env you should ensure you source it first).

pip install python-dateutil

There are many different packages that we can use to work with date and times. You need to decide what is right for you.

dateutil:

The following will convert the date string you give it fast and easily. This gives you back the datetime object. Notice how we don’t need to pass it a date time format. To me this is very convenient.

from dateutil import parser

date_str = '2017-06-06'
date_time_str = '2017-06-07 12:34'
date_time_str_2 = '2017-06-07 12:34:46'
date_time_str_3 = '2017-06-07 12:34:42.234'

result = parser.parse(date_str)
print(result) #2017-06-06 00:00:00
result = parser.parse(date_time_str)
print(result) #2017-06-07 12:34:00
result = parser.parse(date_time_str_2)
print(result) #2017-06-07 12:34:46
result = parser.parse(date_time_str_3)
print(result) #2017-06-07 12:34:42.234000

datetime:

The following will convert the date string you give it fast and easily. This gives you back the datetime object. Notice how we need to pass the format of the datetime. If you don’t you will get an exception. This is a convenient way if you know the format before hand. But that might not always be the case.

import datetime

date_str = '2017-06-06'
date_time_str = '2017-06-07 12:34'
date_time_str_2 = '2017-06-07 12:34:46'
date_time_str_3 = '2017-06-07 12:34:42.234'

result = datetime.datetime.strptime(date_str, "%Y-%m-%d")
print(result) #2017-06-06 00:00:00
result = datetime.datetime.strptime(date_time_str, "%Y-%m-%d %H:%M")
print(result) #2017-06-07 12:34:00
result = datetime.datetime.strptime(date_time_str_2, "%Y-%m-%d %H:%M:%S")
print(result) #2017-06-07 12:34:46
result = datetime.datetime.strptime(date_time_str_3, "%Y-%m-%d %H:%M:%S.%f")
print(result) #2017-06-07 12:34:42.234000

The above all works however the following example will not. Why do you think this is?

import datetime

date_time_str = '2017-06-07 12:34:46'

try:
    datetime.datetime.strptime(date_time_str, "%Y-%m-%d %H:%M:%S")
except:
    pass #just for this example don't do this lol

The reason is because datetime expects the correct format to be supplied. We gave it hour minute second but not milliseconds. You will get the following exception (ValueError: unconverted data remains: .234)

Timestamps:

Sometimes we want to convert the date to unix (epoch) time or vise versa.

From Date:
from dateutil import parser
from datetime import timezone

date_time_str = '2017-06-07 17:34:42.234'
result = parser.parse(date_time_str)

timestamp = result.replace(tzinfo=timezone.utc).timestamp()
print(timestamp) #1496856882.234

This gives us the timestamp as a float as 1496856882.234.

From Timestamp:
from dateutil import parser
import datetime

timestamp = 1496856882.234

result = datetime.datetime.fromtimestamp(timestamp)
print(result) #2017-06-07 13:34:42.234000

result = datetime.datetime.utcfromtimestamp(timestamp)
print(result) #2017-06-07 17:34:42.234000

Get Date Parts:

If you want to get specific date parts such as the year, month, day, hour, etc.

import datetime
from dateutil import parser

result = parser.parse(date_time_str_3)
print(result) #2017-06-07 12:34:42.234000

year = result.year #2017
month = result.month #6
day = result.day #7
hour = result.hour #12
minute = result.minute #34
second = result.second #42
millisecond = result.microsecond #234000

Add To Date:

If you want to add time to a date.

import datetime
from dateutil import parser
from datetime import timezone, timedelta

date_time_str = '2017-06-07 17:34:42.234'
result = parser.parse(date_time_str)
print(result) #2017-06-07 17:34:42.234000

timestamp = result.replace(tzinfo=timezone.utc).timestamp()
print(timestamp) #1496856882.234

#Add 10 seconds to datetime
new_time = int((datetime.datetime.fromtimestamp(timestamp) + timedelta(milliseconds=10000)).timestamp() * 1000)
print(new_time) #1496856892234

As you can see you can 10 seconds has been added the datetime.

datetime strftime

from datetime import datetime

now = datetime.now()
datetime_str = now.strftime("%Y-%m-%d %H:%M:%S")
print(datetime_str)

datetime fromisoformat

from datetime import datetime

print(datetime.fromisoformat("2024-04-09 13:48:20"))

 

Python: CSV from Array

In this tutorial I will explain how to turn an array to a csv file. I will show you two ways. One is in memory and the other is to a file.

For both ways you need to import csv and io package.

import csv, io
Way 1 Write (In Memory):
#Create the string buffer
output = io.StringIO()

#Setup the csv writer to write the results to a string buffer
wr = csv.writer(output, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
Way 2 Write (File):
#Crate the file itself in write mode
f = open('filename.csv', 'w')

#Setup the csv writer to write the results to a file.
wr = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)

Technically both ways have the same setup for the csv writer. Then to write results to the csv writer you then pass an array of values like below.

wr.writerow(['123',5,4,'value'])

To Read the contents of the file or string buffer depends on which way you chose. I show you those ways below.

Way 1 Read (In Memory):
b = bytes(output.getvalue(), 'utf-u')
Way 2 Read (File):
f.close()
file_data = open('filename.csv', 'r').read()

If you want to send the file down using something like flask send_file then you need to convert it to BytesIO.

buffer = BytesIO()
buffer.write(b)
#You must seek to beginning otherwise it won't send anything back.
buffer.seek(0)

Now if you are sending it as a file back to the user and are using something like flask this is how you do that. Pretty straight forward.

return send_file(buffer, mimetype='application/octet-stream', as_attachment=True, attachment_filename='myFile.csv')