Spark Connector Connect to SQL Server

This post is how to use the Spark Connector to Connect to SQL Server.

Install Spark Connector

spark-mssql-connector_2.12-1.2.0.jar

Install msal

pip install msal

Connect using Azure SPN

import msal
global_token_cache = msal.TokenCache()

secret = "<GET SECRET SECURELY>"

global_spn_app = msal.ConfidentialClientApplication(
    <CLIENT_ID>, Authority='https://login.microsoftonline.com/<TENANT_ID>',
    client_credential=secret,
    token_cache=global_token_cache,
)

result = global_spn_app.acquire_token_for_client(scopes=['https://database.windows.net//.default'])

jdbc_df = spark.read \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .option("url", 'jdbc:sqlserver://<SERVER_NAME>:<PORT>;database=<DATABASE>;') \
    .option("query", "SELECT * FROM SOMETHING") \
    .option("accessToken", result['access_token']) \
    .option("encrypt", "true") \
    .option("hostNameInCertificate", "*.database.windows.net") \
    .load()

Connect using Domain Auth

secret = "<GET SECRET SECURELY>"

jdbc_df = spark.read \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .option("url", 'jdbc:sqlserver://<SERVER_NAME>:<PORT>;database=<DATABASE>;') \
    .option("query", "SELECT * FROM SOMETHING") \
    .option("authentication", "ActiveDirectoryPassword") \
    .option("user", "<USER>@<DOMAIN>") \
    .option("password", "<SECRET>") \
    .load()

Connect using SQL Auth

I do not recommend SQL Auth

secret = "<GET SECRET SECURELY>"

jdbc_df = spark.read \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .option("url", 'jdbc:sqlserver://<SERVER_NAME>:<PORT>;database=<DATABASE>;') \
    .option("query", "SELECT * FROM SOMETHING") \
    .option("user", "<USER>") \
    .option("password", "<SECRET>") \
    .load()

 

 

 

Phoenix & Java: Connecting Secure

In this tutorial I will show you how to connect to an Secure Phoenix using Java. It’s rather straight forward.

POM.xml

<dependency>
	<groupId>org.apache.phoenix</groupId>
	<artifactId>phoenix-queryserver</artifactId>
	<version>5.0.0-HBase-2.0</version>
</dependency>

Imports:

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.ResultSet;
import java.sql.Statement;

Initiate Kerberos Authentication

System.setProperty("java.security.krb5.conf", "C:\\Program Files\\Java\\jdk1.8.0_171\\jre\\lib\\security\\krb5.conf");
System.setProperty("java.security.krb5.realm", "REALM.CA");
System.setProperty("java.security.krb5.kdc", "REALM.CA");
System.setProperty("sun.security.krb5.debug", "true");
System.setProperty("javax.net.debug", "all");

Connect:

Now we create the connection.

Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
String url = "jdbc:phoenix:hadoop:2181:/hbase-secure:hbase/hadoop@REALM.CA:\\data\\hbase.service.keytab";
Connection connection = DriverManager.getConnection(url);

System.out.println("Connected");

Statement statement = connection.createStatement();

//Drop table
String deleteTableSql = "DROP TABLE IF EXISTS employee";		 
System.out.println("Deleting Table: " + deleteTableSql);
statement.executeUpdate(deleteTableSql);
System.out.println("Created Table");
 
//Create a table
String createTableSql = "CREATE TABLE employee ( eid bigint primary key, name varchar)";		 
System.out.println("Creating Table: " + createTableSql);
statement.executeUpdate(createTableSql);
System.out.println("Created Table");

//Insert Data
String insertTableSql = "UPSERT INTO employee VALUES(1, 'Oliver')";
System.out.println("Inserting Data: " + insertTableSql);
statement.executeUpdate(insertTableSql);
System.out.println("Inserted Data");

connection.commit();

//Select Data
String selectTablesSql = "select * from employee";
System.out.println("Show records: " + selectTablesSql);
ResultSet res = statement.executeQuery(selectTablesSql);
 
while (res.next()) {
	System.out.println(String.format("id: %s name: %s", res.getInt("eid"), res.getString("name")));
}

 

 

 

 

 

Hive & Java: Connect to Remote Kerberos Hive using KeyTab

In this tutorial I will show you how to connect to remote Kerberos Hive cluster using Java. If you haven’t install Hive yet follow the tutorial.

Import SSL Cert to Java:

Follow this tutorial to “Installing unlimited strength encryption Java libraries

If on Windows do the following

#Import it
"C:\Program Files\Java\jdk1.8.0_171\bin\keytool" -import -file hadoop.csr -keystore "C:\Program Files\Java\jdk1.8.0_171\jre\lib\security\cacerts" -alias "hadoop"

#Check it
"C:\Program Files\Java\jdk1.8.0_171\bin\keytool" -list -v -keystore "C:\Program Files\Java\jdk1.8.0_171\jre\lib\security\cacerts"

#If you want to delete it
"C:\Program Files\Java\jdk1.8.0_171\bin\keytool" -delete -alias hadoop -keystore "C:\Program Files\Java\jdk1.8.0_171\jre\lib\security\cacerts"

POM.xml:

<dependency>
	<groupId>org.apache.hive</groupId>
	<artifactId>hive-jdbc</artifactId>
	<version>2.3.3</version>
	<exclusions>
		<exclusion>
			<groupId>jdk.tools</groupId>
			<artifactId>jdk.tools</artifactId>
		</exclusion>
	</exclusions>
</dependency>

Imports:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import java.sql.SQLException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.DriverManager;

Connect:

// Setup the configuration object.
final Configuration config = new Configuration();

config.set("fs.defaultFS", "swebhdfs://hadoop:50470");
config.set("hadoop.security.authentication", "kerberos");
config.set("hadoop.rpc.protection", "integrity");

System.setProperty("https.protocols", "TLSv1,TLSv1.1,TLSv1.2");
System.setProperty("java.security.krb5.conf", "C:\\Program Files\\Java\\jdk1.8.0_171\\jre\\lib\\security\\krb5.conf");
System.setProperty("java.security.krb5.realm", "REALM.CA");
System.setProperty("java.security.krb5.kdc", "REALM.CA");
System.setProperty("sun.security.krb5.debug", "true");
System.setProperty("javax.net.debug", "all");
System.setProperty("javax.net.ssl.keyStorePassword","changeit");
System.setProperty("javax.net.ssl.keyStore","C:\\Program Files\\Java\\jdk1.8.0_171\\jre\\lib\\security\\cacerts");
System.setProperty("javax.net.ssl.trustStore", "C:\\Program Files\\Java\\jdk1.8.0_171\\jre\\lib\\security\\cacerts");
System.setProperty("javax.net.ssl.trustStorePassword","changeit");
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");

UserGroupInformation.setConfiguration(config);
UserGroupInformation.setLoginUser(UserGroupInformation.loginUserFromKeytabAndReturnUGI("hive/hadoop@REALM.CA", "c:\\data\\hive.service.keytab"));

System.out.println(UserGroupInformation.getLoginUser());
System.out.println(UserGroupInformation.getCurrentUser());

//Add the hive driver
Class.forName("org.apache.hive.jdbc.HiveDriver");

//Connect to hive jdbc
Connection connection = DriverManager.getConnection("jdbc:hive2://hadoop:10000/default;principal=hive/hadoop@REALM.CA");
Statement statement = connection.createStatement();

//Create a table
String createTableSql = "CREATE TABLE IF NOT EXISTS "
		+" employee ( eid int, name String, "
		+" salary String, designation String)"
		+" COMMENT 'Employee details'"
		+" ROW FORMAT DELIMITED"
		+" FIELDS TERMINATED BY '\t'"
		+" LINES TERMINATED BY '\n'"
		+" STORED AS TEXTFILE";

System.out.println("Creating Table: " + createTableSql);
statement.executeUpdate(createTableSql);

//Show all the tables to ensure we successfully added the table
String showTablesSql = "show tables";
System.out.println("Show All Tables: " + showTablesSql);
ResultSet res = statement.executeQuery(showTablesSql);

while (res.next()) {
	System.out.println(res.getString(1));
}

//Drop the table
String dropTablesSql = "DROP TABLE IF EXISTS employee";

System.out.println("Dropping Table: " + dropTablesSql);
statement.executeUpdate(dropTablesSql);

System.out.println("Finish!");