Databricks: Python SDK

This post is how to use the Databricks Python SDK.

Install the Package

  1. pip install databricks-sdk

Update Package

  1. pip install databricks-sdk --upgrade

Check Package Version

  1. pip show databricks-sdk | grep -oP '(?<=Version: )\S+'

Setup WorkspaceClient

  1. from databricks.sdk import WorkspaceClient
  2.  
  3. secret = dbutils.secrets.get(scope = "<SCOPE>", key = "<KEY>")
  4.  
  5. w = WorkspaceClient(
  6. host = 'https://<URL>/'
  7. azure_workspace_resource_id = '<RESOURCE_ID_OF_DATABRICKS>',
  8. azure_tenant_id = '<TENANT_ID>',
  9. azure_client_id = '<CLIENT_ID>',
  10. azure_client_secret = secret
  11. )

Setup AccountClient

You can get the account_id from the databricks account portal. By your id in the top right hand corner.

  1. from databricks.sdk import AccountClient
  2.  
  3. secret = dbutils.secrets.get(scope = "<SCOPE>", key = "<KEY>")
  4.  
  5. a = AccountClient(
  6. host = 'https://accounts.azuredatabricks.net'
  7. account_id = '<ACCOUNT_ID>'
  8. azure_tenant_id = '<TENANT_ID>',
  9. azure_client_id = '<CLIENT_ID>',
  10. azure_client_secret = secret
  11. )

List Workspace Groups

NOTE: You must also setup the workspaceclient to do this.

  1. w.groups.list()

List Account Groups

NOTE: You must also setup the accountclient to do this. You must also be account admin.

  1. a.groups.list()

Create Storage Credential

NOTE: Your SPN must be account admin to do this. You must also setup the workspaceclient to do this.

  1. from databricks.sdk.service.catalog import AzureManagedIdentity
  2.  
  3. storage_credential_name = '<CREDENTIAL_NAME>'
  4. comment = '<COMMENT>'
  5. connector_id = '<DATABRICKS_ACCESS_CONNECTOR>'
  6. az_mi = AzureManagedIdentity(access_connector_id = connector_id)
  7.  
  8. w.storage_credenditals.create(
  9. name = storage_credential_name,
  10. azure_managed_identity = az_mi
  11. comment = comment
  12. )

 

Synapse: List Python Packages

This post is how to list the python packages in various ways.

You can use %pip to list the python packages that are installed.

  1. %pip freeze

However doing it that way may not give you the exact versions that are installed. To get a comprehensive list do the following.

  1. import pkg_resources
  2.  
  3. for package in pkg_resources.working_set:
  4. print(package)

 

Python: pyodbc with SQL Server

This post is in regards to connecting to SQL Server using pyodbc.

Install package

  1. pip install pyodbc

If you are running in Databricks then the current driver will be “{ODBC Driver 17 for SQL Server}”.

If you are running in Synapse then the current driver will be “{ODBC Driver 18 for SQL Server}”.

Check pyodbc Version

  1. import pyodbc
  2. pyodbc.drivers()

Check Which Version of pyodbc in Databricks

  1. %sh
  2. cat /etc/odbcinst.ini

Install Databricks driver 17

  1. curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
  2. curl https://packages.microsoft.com/config/ubuntu/20.04/prod.list > /etc/apt/sources.list.d/mssql-release.list
  3. apt-get update
  4. ACCEPT_EULA=Y apt-get install msodbcsql17
  5. apt-get -y install unixodbc-dev

Connect using SQL Auth

I do not recommend SQL Auth

  1. import pyodbc
  2.  
  3. secret = "<GET SECRET SECURELY>"
  4.  
  5. connection = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};Server=tcp:<SERVER_NAME>;PORT=<PORT>;Database=<DATABASE>;Uid=<USER>;Pwd=<SECRET>;Encrypt=yes;TrustServerCertificate=no;Connection Timeout=<TIMEOUT>;')

Connect Using Domain Auth

  1. import pyodbc
  2.  
  3. secret = "<GET SECRET SECURELY>"
  4.  
  5. connection = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};Server=tcp:<SERVER_NAME>;PORT=<PORT>;Database=<DATABASE>;Uid=<USER>;Pwd=<SECRET>;Encrypt=yes;TrustServerCertificate=no;Connection Timeout=<TIMEOUT>;Authentication=ActiveDirectoryPassword')

Connect using Azure SPN

  1. pip install msal
  1. import struct
  2. import msal
  3.  
  4. global_token_cache = msal.TokenCache()
  5. secret = "<GET SECRET SECURELY>"
  6.  
  7. global_spn_app = msal.ConfidentialClientApplication(
  8. <CLIENT_ID>, Authority='https://login.microsoftonline.com/<TENANT_ID>',
  9. client_credential=secret,
  10. token_cache=global_token_cache,
  11. )
  12.  
  13. result = global_spn_app.acquire_token_for_client(scopes=['https://database.windows.net//.default'])
  14. SQL_COPT_SS_ACCESS_TOKEN = 1256
  15.  
  16. token = bytes(result['access_token'], 'utf-8')
  17. exptoken = b"";
  18.  
  19. for i in token:
  20. exptoken += bytes({i});
  21. exptoken += bytes(1);
  22.  
  23. token_struct = struct.pack("=i", len(exptoken)) + exptoken;
  24.  
  25. connection = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};Server=tcp:<SERVER_NAME>;PORT=<PORT>;Database=<DATABASE>;Uid=<USER>;Pwd=<SECRET>;Encrypt=yes;TrustServerCertificate=no;Connection Timeout=<TIMEOUT>;' attrs_before = { SQL_COPT_SS_ACCESS_TOKEN:tokenstruct })

Once you have the connection you can setup the cursor.

  1. cursor = connection.cursor()

Then execute a command

  1. command = "<COMMAND>"
  2. params = ()
  3. cursor.execute(command, params)
  4. connection.commit()

After you Are finish Close

  1. cursor.close()
  2. connection.close()

 

Python: Arguments

This post is in how do use argparse package.

First you must import the package.

  1. import argparse

Next you setup the argument parser.

  1. parser = argparse.ArgumentParser()

Then you create a list of arguments. See the link above for more options then the below set.

  1. argument_list = [
  2. { "name": "<NAME>", "help": "<HELP_TEXT>", "type": "<TYPE>", "required": True}
  3. ]

Then we take your argument_list and create arguments and assign them to the parser.

  1. for arg in argument_list:
  2. parser.add_argument("--{}".format(arg["name"], help=arg["help"], type=arg["type"], required=arg["required"])

Then we parse the args from “sys.argv”. Parsing args this way means that if anything is unknown to your program than your program won’t fail but instead it will set those variables to the unknown variable and continue your application.

  1. args, unknown = parser.parse_known_args()

You could also parse the args from “sys.argv” this way. However that means that all the args passed to sys.argv must be known otherwise it will fail.

  1. args = parser.parse_args()

Then as a final step we set the values with their key to the config.

  1. config = vars(args)

 

 

 

Azure: Python SDK

This post is how to use the Azure Python SDK.

If you are using Databricks you can get the secret by using the following Databricks: Get Secret

If you are using Synapse you can get the secret by using the following Synapse: Get Secret

Package Installations

  1. pip install azure-identity
  2. pip install azure-storage-file
  3. pip install azure-storage-file-datalake

Setup Credentials

Service Principal

  1. from azure.common.credentials import ServicePrincipalCredentials
  2. secret = "<GET_SECRET_SECURELY>"
  3. credential = ServicePrincipalCredential("<SPN_CLIENT_ID>", secret, tenant="<TENANT_ID>")

Token Credential

  1. from azure.identity import ClientSecretCredential
  2. secret = "<GET_SECRET_SECURELY>"
  3. token_credential = ClientSecretCredential("<TENANT_ID>", "<SPN_CLIENT_ID>", secret)

Subscription Client

Client

  1. from azure.mgmt.resource import SubscriptionClient
  2. subscription_client = SubscriptionClient(credential)

Get List

  1. subscriptions = subscription_client.subscriptions.list()
  2. for subscription in subscriptions:
  3. print(subscription.display_name)

Storage Account

Client

  1. from azure.mgmt.storage import StorageManagementClient
  2. storage_client = StorageManagementClient(credential, "<SUBSCRIPTION_ID>")

Get List by Resource Group

  1. storage_accounts = storage_client.storage_accounts.list_by_resource_group("<RESOURCE_GROUP_NAME>")
  2. for sa in storage_accounts:
  3. print(sa.name)

List Containers in Storage Account

  1. containers = storage_client.blob_containers.list("<RESOURCE_GROUP_NAME>", sa.name)

Containers

Client

  1. from azure.storage.blob import ContainerClient
  2. account_url_blob = f"https://{sa.name}.blob.core.windows.net"
  3. container_client = ContainerClient.from_container_url(
  4. container_url=account_url_blob + "/" + container.name,
  5. credential=token_credential
  6. )

Get Container Properties

  1. container_client.get_container_properties()

List Blobs

  1. for b in container_client.list_blobs():
  2. print(b)

Data Lake Service

Client

  1. from azure.storage.filedatalake import DataLakeServiceClient
  2. storage_account_url_dfs = f"https://{sa.name}.df.core.windows.net"
  3. data_lake_service_client = DataLakeServiceClient(storage_account_url_dfs, token_credential)

DataLake Directory

  1. from azure.storage.filedatalake import DataLakeDirectoryClient
  2. data_lake_directory_client = DataLakeDirectoryClient(account_url=account_url_dfs, credential=credential)

FileSystem

Client

  1. file_system_client = data_lake_service_client.get_file_system_client(file_system="<CONTAINER_NAME>")

Get Directory Client

  1. directory_client = file_system_client.get_directory_client("<CONTAINER_SUB_FOLDER>")

Get Directory Access Control

  1. acl_props = directory_client.get_access_control()

Microsoft Graph Client

Package Installations

  1. pip install msgraph-sdk
  2. pip install msrestazure
  3. pip install azure-identity

Credential

  1. from azure.identity.aio import ClientSecretCredential
  2.  
  3. secret = "<GET_SECRET_SECURELY>"
  4. credential = ClientSecretCredential('<TENANT_ID>', '<CLIENT_ID>', secret)

Client

  1. from msgraph import GraphServiceClient
  2.  
  3. def create_session(credential):
  4. scopes = ['https://graph.microsoft.com/.default']
  5. graph_client = GraphServiceClient(credential, scopes)
  6. return graph_client
  7.  
  8. graph_client = create_session(credential)

Get Groups

  1. #This will only get you the first 100 groups. If you have more then you need to check again
  2. groups = await graph_client.groups.get()
  3. print(len(groups))
  4.  
  5. while groups is not None and groups.odata_next_link is not None:
  6. groups = await graph_client.groups.with_url(groups.odata_next_link).get()
  7. print(len(groups))

Get Group Members

  1. id = '<GROUP_ID>'
  2. group_members = await graph_client.groups.by_group_id(id).members.get()

 

Python: lxml

This post focus’ on the lxml package.

First you need to install the package

  1. from lxml import etree

Create xml object by string

  1. xml_str = "<root><subitem attr='test'>rec</subitem></root>"
  2. root = etree.fromstring(xml_str)

Get text in node

  1. text_str = root.xpath('//root/subitem/text()')[0]

Get Attribute

  1. attr = root.xpath('//root/subitem')[0].attrib['attr']

 

Python: Create a Logger

This post is how-to create a logger.

First we need to import

  1. import sys
  2. import logging
  3. from datetime import datetime
  4. from pytz import timezone

Then we create a class for Formatter

  1. class CustomFormatter(logging.Formatter):
  2. grey = "\x1b[38;20m"
  3. reset = "\x1b[0m"
  4. format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s (%(filename)s:)"
  5. FORMATS = {
  6. logging.DEBUG: '\x1b[38;5;23m' + format + reset,
  7. logging.INFO: grey + format + reset,
  8. logging.WARNING: '\x1b[38;5;56m' + format + reset,
  9. logging.ERROR: '\x1b[38;5;197m' + format + reset,
  10. logging.CRITICAL: '\x1b[38;5;1m' + format +reset
  11. }
  12.  
  13. def format(self, record):
  14. log_fmt = self.FORMATS.get(record.levelno)
  15. formatter = logging.Formatter(log_fmt)
  16. return formatter.format(record)

Then we create a function set our logger up.

  1. def set_logger(logging_level, name, log_dir, timezone):
  2. LOGGING_LEVELS = ['WARNING','INFO','DEBUG','ERROR']
  3. if logging_level not in LOGGING_LEVELS:
  4. logging_level = 'INFO'
  5.  
  6. level_lookup = {
  7. 'WARNING': logging.WARNING,
  8. 'INFO': logging.INFO,
  9. 'DEBUG': logging.DEBUG,
  10. 'ERROR': logging.ERROR,
  11. }
  12. logging.Formatter.converter = lambda *args: datetime.now(tz=timezone(timezone)).timetuple()
  13. logging.basicConfig(level=level_lookup[logging_level], format="[%(levelname)s] %(asctime)s - %(message)s:%(lineno)d")
  14. stream_handler = logging.StreamHandler(sys.stdout)
  15. stream_handler.setFormatter(CustomFormatter())
  16. logger = logging.getLogger(name)
  17. logger.addHandler(stream_handler)
  18. logger.setLevel(logging_level)
  19.  
  20. Path(log_dir).mkdir(parents=True, exist_ok=True)
  21.  
  22. now = datetime.now(tz=timezone(timezone))
  23. now = now.strftime("%H-%M-%S")
  24.  
  25. log_file = '%slog_%s.log' % (log_dir, now)
  26. file_handler = logging.FileHandler(log_file, mode='a')
  27. file_handler.setFormatter(logging.Formatter("[%(levelname)s] %(asctime)s - %(message)s:%(lineno)d"))
  28. logger.addHandler(file_handler)
  29.  
  30. return logger

References

https://alexandra-zaharia.github.io/posts/make-your-own-custom-color-formatter-with-python-logging/

PySpark: Create a Spark Session

This post is how to create a Spark Session

Imports

  1. from pyspark.sql import SparkSession

Create the Spark Session

  1. spark = SparkSession.builder.appName('pyspark_app_name').getOrCreate()

You can add any configs you wish during creation. You would add this before the “.getOrCreate()”.

You can see a list here

  • .config(“spark.sql.jsonGenerator.ignoreNullFields”, “false”)
    • When reading JSON you will not ignore NULL fields
  • .config(“spark.sql.parquet.int96RebaseModeInWrite”, “CORRECTED”)
    • Fixes issues in timestamps in write operations
  • .config(“spark.sql.parquet.int96RebaseModeInRead”, “CORRECTED”)
    • Fixes issues in timestamps in read operations

 

PySpark: Create a DataFrame

This post is how to create a DataFrame in pyspark.

First we need a spark Session. See PySpark: Create a Spark Session for my details on that.

Next we need to import

  1. from pyspark.sql import Row
  2. from pyspark.sql.types import StringType, DecimalType, TimestampType, FloatType, IntegerType, LongType, StructField, StructType

Then you create the schema

  1. schema = StructType([
  2. StructField('id', IntegerType()),
  3. .....
  4. ])
  5.  
  6. data = [Row(id=1)]

Create the DataFrame

  1. df = spark.createDataFrame(data, schema=schema)

If you want to use a JSON file to build your schema do the following

  1. import json
  2. from pyspark.sql.types import StructType
  3.  
  4. data = {
  5. "fields": [
  6. {
  7. "metadata": {},
  8. "name": "column_a",
  9. "nullable": false,
  10. "type": "string"
  11. }
  12. ],
  13. "type": "struct"
  14. }
  15.  
  16. json_schema = json.loads(data)
  17. table_schema = StructType.fromJson(dict(json_schema))
  18.  
  19. df = spark.createDataFrame(data, schema=table_schema)
  20.  

 

Python: Unit Testing

This post focus’ on common hurdles when trying to do unit testing.

Testing Values During Run

You add the following line to anywhere you want to pause the unit test to check values.

  1. import pdb
  2. pdb.set_trace()

How to Patch a Function

  1. from unittest.mock import path
  2.  
  3. @patch('src.path.to.file.my_function')
  4. @path('src.path.to.file.my_function_add')
  5. def test_some_function(mock_my_function_add, mock_my_function):
  6. mock_function_add.return_value = <something>
  7. .......

How to Patch a Function With No Return Value

  1. from unittest.mock import patch
  2.  
  3. def test_some_function():
  4. with(patch('src.path.to.file.my_function'):
  5. ...

How to Patch a Function With 1 Return Value

  1. from unittest.mock import patch
  2.  
  3. def test_some_function():
  4. with(patch('src.path.to.file.my_function', MagicMock(return_value=[<MY_VALUES>])):
  5. ...

How to Patch a Function With Multiple Return Value

  1. from unittest.mock import patch
  2.  
  3. def test_some_function():
  4. with(patch('src.path.to.file.my_function', MagicMock(side-effect=[[<MY_VALUES>], [<OTHER_VALUES>]])):
  5. ...

How to Create a Test Module

  1. from unittest import TestCase
  2.  
  3. class MyModule(TestCase):
  4. def setUp(self):
  5. some_class.my_variable = <something>
  6. ... DO OTHER STUFF
  7. def test_my_function(self):
  8. ... DO Function Test Stuff

How to Patch a Method

  1. patch_methods = [
  2. "pyodbc.connect"
  3. ]
  4.  
  5. for method in patch_methods:
  6. patch(method).start()

How to create a PySpark Session

Now once you do this you can just call spark and it will set it.

  1. import pytest
  2. from pyspark.sql import SparkSession
  3.  
  4. @pytest.fixture(scope='module')
  5. def spark():
  6. return (SparkSession.builder.appName('pyspark_test').getOrCreate())

How to Create a Spark SQL Example

  1. import pytest
  2. from pyspark.sql import SparkSession, Row
  3. from pyspark.sql.types import StructType, StructField, StringType
  4.  
  5. @pytest.fixture(scope='module')
  6. def spark():
  7. return (SparkSession.builder.appName('pyspark_test').getOrCreate())
  8.  
  9. def test_function(spark):
  10. query = 'SELECT * FROM SOMETHING'
  11. schema = StructType([
  12. StructField('column_a', StringType()),
  13. StructField('column_b', StringType()),
  14. StructField('column_c', StringType()),
  15. ])
  16.  
  17. data = [Row(column_a='a', column_b='b', column_c='c')]
  18. table = spark.createDataFrame(data, schema=schema)
  19. table.createOrReplaceTempView('<table_name>')
  20. df = spark.sql(query).toPandas()
  21.  
  22. assert not df.empty
  23. assert df.shape[0] == 1
  24. assert df.shape(1) == 5
  25.  
  26. spark.catalog.dropTempView('<table_name>')

How to Mock a Database Call

First let’s assume you have an exeucte sql function

  1. def execute_sql(cursor, sql, params):
  2. result = cursor.execute(sql, params).fetchone()
  3. connection.commit()
  4. return result

Next in your unit tests you want to test that funciton

  1. def test_execute_sql():
  2. val = <YOUR_RETURN_VALUE>
  3. with patch('path.to.code.execute_sql', MagicMock(return_value=val)) as mock_execute:
  4. return_val = some_other_function_that_calls_execute_sql(....)
  5. assert return_val == val

If you need to close a cursor or DB connection

  1. def test_execute_sql():
  2. val = <YOUR_RETURN_VALUE>
  3. mock_cursor = MagicMock()
  4. mock_cursor.configure_mock(
  5. **{
  6. "close": MagicMock()
  7. }
  8. )
  9. mock_connection = MagicMock()
  10. mock_connection.configure_mock(
  11. **{
  12. "close": MagicMock()
  13. }
  14. )
  15.  
  16. with patch('path.to.code.cursor', MagicMock(return_value=mock_cursor)) as mock_cursor_close:
  17. with patch('path.to.code.connection', MagicMock(return_value=mock_connection)) as mock_connection_close:
  18. return_val = some_other_function_that_calls_execute_sql(....)
  19. assert return_val == val

How to Mock Open a File Example 1

  1. @patch('builtins.open", new_callable=mock_open, read_data='my_data')
  2. def test_file_open(mock_file):
  3. assert open("my/file/path/filename.extension").read() == 'my_data'
  4. mock_file.assert_called_with("my/file/path/filename.extension")
  5.  
  6. val = function_to_test(....)
  7. assert 'my_data' == val

How to Mock Open a File Example 2

  1. def test_file_open():
  2. fake_file_path = 'file/path/to/mock'
  3. file_content_mock = 'test'
  4. with patch('path.to.code.function'.format(__name__), new=mock_open(read_data=file_content_mock)) as mock_file:
  5. with patch(os.utime') as mock_utime:
  6. actual = function_to_test(fake_file_path)
  7. mock_file.assert_called_once_with(fake_file_path)
  8. assertIsNotNone(actual)

Compare DataFrames

  1. def as_dicts(df):
  2. df = [row.asDict() for row in df.collect()]
  3. return sorted(df, key=lambda row: str(row))
  4.  
  5. assert as_dicts(df1) == as_dicts(df2)

Python: Create a WHL File

This post will just be a how-to on creating a whl file.

You need the following files:

Manifest.in:

  1. recursive-include <directory> *
  2. recursive-exclude tests *.py

Requirements.txt:

This file just holds your packages and the version.

Setup.py

You remove pytest and coverage from your whl file because you don’t want those applications being required when you deploy your code.

  1. from setuptools import find_packages
  2. from distutils.core import setup
  3. import os
  4. import json
  5.  
  6. if os.path.exists('requirements.txt'):
  7. req = [line.strip('\n') for line in open('requirements.txt') if 'pytest' not in line and 'coverage' not in line]
  8.  
  9. setup(
  10. include_package_data=True,
  11. name=<app_name>,
  12. version=<app-version>,
  13. description=<app_desc>,
  14. install_requires=req,
  15. packages=find_packages(excude=["*tests.*","*tests"]),
  16. classifiers=[
  17. "Programming Language :: Python || <python_Version>",
  18. "License || OSI Approved :: MIT License",
  19. "Operating System :: OS Independent",
  20. ],
  21. python_requires='>=<python_version>',
  22. package_dir={<directory>: <directory>},
  23. )

To Check Your Whl File

Install package

  1. pip install check-wheel-contents

Check WHL

  1. check-wheel-contents <PATH_TO_WHL>\<filename>.whl

Install WHL

This will deploy to <PATH_TO_PYTHON>\Lib\site-packages\<directory>

  1. <PATH_TO_PYTHON>\Scripts\pip3.7.exe install <PATH_TO_WHL>\<filename>.whl

 

 

 

Azure: EventHub

In this tutorial I will show you how to connect to event hub from Python. Ensure you have first installed an IDE (Eclipse) and Python3.7.

Python Package Installation

  1. pip3 install azure-eventhub

Create a Producer

This will publish events to event hub. The important part here is the “EndPoint”. You need to login to Azure Portal and get the get the endpoint from the “Shared Access Policies” from the event hub namespace.

  1. from azure.eventhub import EventHubProducerClient, EventData, EventHubConsumerClient
  2.  
  3. connection_str = 'Endpoint=sb://testeventhubnamespace.servicebus.windows.net/;SharedAccessKeyName=<<THE_ACCESS_KEY_NAME>>;SharedAccessKey=<<THE_ACCESS_KEY>>'
  4. eventhub_name = '<<THE_EVENT_HUB_NAME>>'
  5. producer = EventHubProducerClient.from_connection_string(connection_str, eventhub_name=eventhub_name)
  6.  
  7. event_data_batch = producer.create_batch()
  8.  
  9. event_data_batch.add(EventData('My Test Data'))
  10.  
  11. with producer:
  12. producer.send_batch(event_data_batch)

Create a Consumer

This will monitor the event hub for new messages.

  1. from azure.eventhub import EventHubProducerClient, EventData, EventHubConsumerClient
  2.  
  3. connection_str = 'Endpoint=sb://testeventhubnamespace.servicebus.windows.net/;SharedAccessKeyName=<<THE_ACCESS_KEY_NAME>>;SharedAccessKey=<<THE_ACCESS_KEY>>'
  4. eventhub_name = '<<THE_EVENT_HUB_NAME>>'
  5. consumer_group = '<<THE_EVENT_HUB_CONSUMER_GROUP>>'
  6. client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group, eventhub_name=eventhub_name)
  7.  
  8. def on_event(partition_context, event):
  9. print("Received event from partition {} - {}".format(partition_context.partition_id, event))
  10. partition_context.update_checkpoint(event)
  11.  
  12. with client:
  13. #client.receive(
  14. # on_event=on_event,
  15. # starting_position="-1", # "-1" is from the beginning of the partition.
  16. #)
  17. client.receive(
  18. on_event=on_event
  19. )
  20.  

 

Jupyter Installation

In this tutorial I will show you how to install Jupyter. I will use self signed certs for this example.

This assumes your hostname is “hadoop”

Prerequisites

Python3.5 Installation

  1. sudo apt install python3-pip

Update .bashrc

  1. sudo nano ~/.bashrc
  2.  
  3. #Add the following
  4. alias python=python3.5
  5.  
  6. source ~/.bashrc

Install

  1. pip3 install jupyter
  2.  
  3. jupyter notebook --generate-config
  4.  
  5. jupyter notebook password
  6. #ENTER PASSWORD
  7.  
  8. cat ~/.jupyter/jupyter_notebook_config.json
  9. #Get the SHA1 value

Setup Configuration

  1. nano ~/.jupyter/jupyter_notebook_config.py
  2.  
  3. #Find and change the values for the following
  4. c.NotebookApp.ip = '0.0.0.0'
  5. c.NotebookApp.port = 8888
  6. c.NotebookApp.password = u'sha1:1234567fbbd5:dfgy8e0a3l12fehh46ea89f23jjjkae54a2kk54g'
  7. c.NotebookApp.open_browser = False
  8. c.NotebookApp.certfile = '/etc/security/serverKeys/hadoop.pem'
  9. c.NotebookApp.keyfile = '/etc/security/serverKeys/hadoop.key'

Run Jupyter

  1. jupyter notebook

https://NAMENODE:8888

References

https://jupyter.readthedocs.io/en/latest/index.html

Python: xlrd (Read Excel File)

In this tutorial I will show you how to read an excel file in Python.

Installation

  1. pip install xlrd

Open The Workbook

  1. import xlrd
  2.  
  3. my_excel = (r'C:\path\to\file')
  4. wb = xlrd.open_workbook(my_excel)

Select Sheet

  1. # Select the first sheet. If you want to select the third just change to (3)
  2. sheet = wb.sheet_by_index(0)

Get Data In Column

  1. #This loops through all the rows in that sheet
  2. for i in range(sheet.nrows):
  3. # if the value isn't empty then print it out.
  4. if sheet.cell_value(i, 0) != '':
  5. print(sheet.cell_value(i, 0))

Get all the Column Header

  1. #This loops through all the rows in that sheet
  2. for i in range(sheet.ncols):
  3. # if the value isn't empty then print it out.
  4. if sheet.cell_value(0, i) != '':
  5. print(sheet.cell_value(0, i))

 

Django: React Website

In this tutorial I will demonstrate how to create a Django + React website using Django 2.0. You must have Eclipse installed before you continue. If you have it already installed and configured you can continue on. We will require Postgres 9.4, nodejs before you continue. You can get Nodejs from here. You can get Postgres 9.4 from here.

Pip Django Install:
  1. pip install django
  2. pip install django-webpack-loader
Django Version:

If you are not sure what version you are running do the following

  1. python -c "import django; print(django.get_version())"
Eclipse Create Project:

 

 

 

 

 

 

Eclipse Setup Project:

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Eclipse Django DB Settings:

 

 

 

 

 

 

 

 

 

 

 

 

 

Eclipse Django Setup Successful:

Once you click “Finish” your project will look like the following.

 

 

 

Folder Structure:
  • Under djangoApp project.
  • folder: static
  • folder: djangoApp
    • folder: templates
      • file: index.html
      • folder: base
        • file: base.html
  • folder: assets
    • folder: bundles
    • folder: js
      • file: index.jsx
Node:

Inside the djangoApp application do the following

  1. npm init
  2. npm install --save-dev jquery react react-dom webpack webpack-bundle-tracker babel-loader babel-core babel-preset-es2015 babel-preset-react
  3. npm install create-react-class --save
webpack.config.js:
  1. var path = require('path')
  2. var webpack = require('webpack')
  3. var BundleTracker = require('webpack-bundle-tracker')
  4.  
  5. module.exports = {
  6. //the base directory (absolute path) for resolving the entry option
  7. context: __dirname,
  8. //the entry point we created earlier. Note that './' means
  9. //your current directory.
  10. entry: {
  11. "index": [path.resolve(__dirname, "./assets/js/index.jsx")],
  12. },
  13. output: {
  14. path: path.resolve('./assets/bundles/'),
  15. filename: "[name]-[hash].js",
  16. },
  17. plugins: [
  18. //tells webpack where to store data about your bundles.
  19. new BundleTracker({filename: './webpack-stats.json'}),
  20. //makes jQuery available in every module
  21. new webpack.ProvidePlugin({
  22. $: 'jquery',
  23. jQuery: 'jquery',
  24. 'window.jQuery': 'jquery'
  25. })
  26. ],
  27. module: {
  28. loaders: [
  29. {
  30. test: /\.jsx?$/,
  31. exclude: /(node_modules)/,
  32. loader: 'babel-loader',
  33. query: {
  34. presets: ['react','es2015']
  35. }
  36. }
  37. ]
  38. }
  39. }
djangoApp\Settings.py:

Installed Apps

  1. INSTALLED_APPS = [
  2. 'django.contrib.admin',
  3. 'django.contrib.auth',
  4. 'django.contrib.contenttypes',
  5. 'django.contrib.sessions',
  6. 'django.contrib.messages',
  7. 'django.contrib.staticfiles',
  8. 'webpack_loader',
  9. ]

Add/Edit the following template directive

  1. TEMPLATES = [
  2. {
  3. 'BACKEND': 'django.template.backends.django.DjangoTemplates',
  4. 'DIRS': [os.path.join(BASE_DIR, 'djangoApp', 'templates'),],
  5. 'APP_DIRS': True,
  6. 'OPTIONS': {
  7. 'context_processors': [
  8. 'django.template.context_processors.debug',
  9. 'django.template.context_processors.request',
  10. 'django.contrib.auth.context_processors.auth',
  11. 'django.contrib.messages.context_processors.messages',
  12. ],
  13. },
  14. },]

Add the following static directive

  1. STATIC_URL = '/static/'
  2.  
  3. STATICFILES_DIRS = [
  4. os.path.join(BASE_DIR, 'assets'),
  5. ]

Modify DATABASES

  1. DATABASES = {
  2. 'default': {
  3. 'ENGINE': 'django.db.backends.postgresql_psycopg2',
  4. 'NAME': 'YOUR_DB_NAME',
  5. 'USER': 'YOUR_USER',
  6. 'PASSWORD': 'YOUR_PASSWORD',
  7. 'HOST': 'localhost',
  8. 'PORT': 5432
  9. }
  10. }

Webpack Loader

  1. WEBPACK_LOADER = {
  2. 'DEFAULT': {
  3. 'BUNDLE_DIR_NAME': 'bundles/',
  4. 'STATS_FILE': os.path.join(BASE_DIR, 'webpack-stats.json'),
  5. }
  6. }
djangoApp\views.py:

We will create our index page view. Notice the third dict. Those are variables passed to the template to make our site dynamic

  1. from django.shortcuts import render
  2.  
  3. def index(request):
  4. return render(request, 'index.html', {'title': 'Index Page', 'script_name': 'index'})
djangoApp\urls.py:

Add the following imports

  1. from django.conf.urls import url
  2. #This is the index view we created above
  3. from djangoApp.views import index
  4.  
  5. urlpatterns = [
  6. url(r'^$', index, name='index'),
  7. path('admin/', admin.site.urls),
  8. ]
djangoApp\templates\base\base.html:

Let’s setup our base template and setup our blocks that the other templates will inherit from.

  1. <html>
  2. <head>
  3. <title>{% block title %}{% endblock %}</title>
  4. </head>
  5. <body>
  6. {% block content %}
  7. {% endblock %}
  8. </body>
  9. </html>
djangoApp\templates\index.html:

The important parts here are the extends otherwise your base.html template won’t be inherited. As well the {% with %} and title variable makes our template dynamic and allows us to incorporate react in our site.

  1. {% extends "base/base.html" %}
  2. {% load render_bundle from webpack_loader %}
  3. {% load staticfiles %}
  4. {% block title %}
  5. {{title}}
  6. {% endblock %}
  7. {% block content %}
  8. <div id="container"></div>
  9. {% with script=script_name %}
  10. {% render_bundle script 'js' %}
  11. {% endwith %}
  12. {% endblock %}
assets\js\index.jsx:

This is our react class.

  1. var React = require('react');
  2. var ReactDOM = require('react-dom');
  3. var createReactClass = require('create-react-class');
  4.  
  5. var App = createReactClass({
  6. render: function() {
  7. return (
  8. <h1>
  9. React App Page
  10. </h1>
  11. )
  12. }
  13. });
  14.  
  15. ReactDOM.render(<App />, document.getElementById('container'));
Database Setup/Migration:

For this tutorial we used postgres. At this time please make sure you create your djangoApp db and user you specified in the settings.py file. Then run the following commands in order.

  1. #Migrates the auth
  2. python manage.py migrate auth
  3. #migrates the rest
  4. python manage.py migrate
  5. #Create the user for accessing the django admin ui
  6. #This will ask you for user names and passwords. Don't make it the same as in your settings.py file.
  7. python manage.py createsuperuser
Start Server:
  1. webpack -p
  2. python manage.py runserver

Your site is now running at http://localhost:8000.

Your admin site is now running at http://localhost:8000/admin/.

 

References:

I used this video as a guideline to get the project started. However some didn’t work right and needed to adjust and made adjustments to require just one template, etc.

Python: Run Process

If you want to run a jar from python or really any process. You do so by leveraging subprocess package.

  1. from subprocess import Popen, PIPE

Then you need to call Popen. If you want to set java memory you can do so using -Xms and -Xmx in between java and -jar.

  1. #bufsize of 1 is line buffered
  2. #stdout and stderr to PIPE is to pipe the output of std out and std error to the PIPE so you can get the output
  3. result = Popen(['java -jar myapp.jar'], stdout=PIPE, stderr=PIPE, shell=False, bufsize=1)

If you want your process to wait until finished you will need to call wait.

  1. result.wait()

If you pushed the stderr and stdout then you can check the output.

  1. if result.stdout is not None:
  2. for line in result.stdout:
  3. print(line)
  4.  
  5. if result.stderr is not None:
  6. for line in result.stderr:
  7. print(line)

Python: Logging

If you want to do some basic logging to a file, etc. You can use the logging package that comes with python. Here are some of the basic ways to log.

You first have to import the package.

  1. import logging

You can setup your own logging configuration but for this we will just use the basic setup and log to a file.

  1. #If you are going to have multiple handlers you should setup your handler
  2. logging.root.handlers = []
  3.  
  4. #The file to log to
  5. log_file = /mnt/log/
  6.  
  7. #Setup the config with the level to log up to
  8. logging.basicConfig(filename=log_file, level=logging.INFO)

Then you setup your logger

  1. logger = logging.getLogger('my_awesome_log')

If you want your log to truncate after a certain size then you must add the handler for truncating the log and back. If you do not use the rotatingfilehandler then the log will increase till your drive runs out of space.

  1. handler = RotatingFileHandler(log_file, maxBytes=1024, backupCount=1)
  2. logger.addHandler(handler)

If you also want to log to console you will need to add an additional handler for the console setting the level to log.

  1. console = logging.StreamHandler()
  2. console.setLevel(logging.INFO)
  3. logger.addHandler(console)

That’s it a basic example of how to use the logging package.

 

Python: Multiprocessing Pool

Sometimes we want to run a method using multiple processors to process our code due to a costly function. Below is an example of how you could do it. There is other api’s you could use like ‘map’ but here is just one example.

  1. from multiprocessing import Pool
  2. # Sets the pool to utilize 4 processes
  3. pool = Pool(processes=4)
  4. result = pool.apply_async(func=my_method, args=("some_info",))
  5. # Performs the aync function
  6. data = result.get()
  7. pool.close()