PySpark: Delta Lake

This post is how to use pyspark to work with Delta Tables.

For more information on Delta Lake you can refer here.

First you need to install the “delta-spark” package for whatever version you require.

pip install delta-spark==3.1.0

Setup a Spark Session.

To read delta tables you can refer to PySpark: Read From ADLS to DataFrame.

To write delta tables you can refer to PySpark: Save a DataFrame To ADLS.

Vacuum Delta Table

from delta.tables import DeltaTable

vacuum_hrs = 100
path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'

delta_table = DeltaTable.forPath(spark, path)
delta_table.vacuum(vacuum_hrs)

Compaction

Impoves reads by merging small files into larger ones.

from delta.tables import DeltaTable

path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/' 

delta_table = DeltaTable.forPath(spark, path)
delta_table.optimize().executeCompaction()

Z-Order

from delta.tables import DeltaTable

path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/' 
columns = ''

delta_table = DeltaTable.forPath(spark, path)
delta_table.optimize().executeZOrderBy(columns)

Delete

from delta.tables import DeltaTable
import pyspark.sql.functions as F

path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/' 

delta_table = DeltaTable.forPath(spark, path)
delta_table.delete(F.col('<MY_COL>') == '<SOME_VAL>')

#You can also use sql
delta_table.delete("column == 'some_VALUE'")

Modify Properties

You can refer here for more properties.

dataSkippingNumIndexedCols

You would do this if you have over the max columns that the delta lake can collect statistics on. Default value is 32.

path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'
skip_cols = <SOME_VALUE>

spark.sql("ALTER TABLE delta.`%s` SET TBLPROPERTIES ('delta.dataSkippingNumIndexedCols' == '%s')" % (path, skip_cols))

 

Postgres: Vacuum

You should set autovacuum on. If it is turned off (which is the default) it will require manual vacuuming and analyzing to be performed.

You can run vacuuming by:

VACUUM (VERBOSE, ANALYZE)

You can set auto vacuuming on by the below commands. You will need to modify the “postgresql.conf” file for this. Windows is located “C:\Program Files\PostgreSQL\9.4\data\postgresql.conf” and on Ubuntu it is “/etc/postgresql/9.4/main/postgresql.conf”. In the off chance that it isn’t then run the below first.

find / -type f -name "postgresql.conf"

Locate the following line of text “# AUTOVACUUM PARAMETERS” and apply the following. You should note that turning on automatically will also run “VACUUM ANALYZE” command to update statistics.

autovacuum = on (remove #)
autocavuum_analyze_threshold = 100
autovacuum_vacuum_threshold = 100
track_counts = on

Now you will need to restart postgresql service.

Ubuntu:

/etc/init.d/postgresql restart
invoke-rc.d postgresql restart

Windows:

services.msc restart service "postgresql-x64-9.4"

Postgres: Tables

Below are some common functions for doing table creation and maintenance.

Table Creation:

 CREATE TABLE Public.mytable (
      id BigSerial PRIMARY KEY,
      text_column varchar NOT NULL,
      int_column Integer NOT NULL,
      date_column timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);

Create Schema:

 CREATE SCHEMA IF NOT EXISTS test;

Create Schema with Authorization:

CREATE SCHEMA IF NOT EXISTS test AUTHORIZATION myUser;

Drop Schema Cascade:

DROP SCHEMA IF EXISTS test CASCADE;

Comment On Table:

 COMMENT ON TABLE Public.mytable IS 'A List of data.';

Vacuum:
vacuum has options best to review them.

 vacuum (analyze,verbose);

Drop Constraint:

 ALTER TABLE mytable DROP CONSTRAINT mytable_id_pkey;

Add Constraint:

 ALTER TABLE public.mytable ADD CONSTRAINT mytable_id_pkey PRIMARY KEY (id);

Rename Constraint:

 ALTER TABLE mytable RENAME CONSTRAINT "mytable_id2_fkey" TO "mytable_id3__fkey";

Rename Table Column:

 ALTER TABLE mytable RENAME COLUMN text_column TO text_column2;

Rename Table:

 ALTER TABLE mytable RENAME TO mytable2;

Drop Table:

 DROP TABLE public.mytable;

Add Column to Table:

 ALTER TABLE Public.mytable ADD column_name boolean NOT NULL DEFAULT False;

Alter Column Data Type Json:

ALTER TABLE public.mytable ALTER COLUMN json_col TYPE json USING (json_col::json);

Rename Sequence:

 ALTER SEQUENCE mytable_id_seq RENAME TO mytable_id_seq;

Sequence Table Owner:

 alter sequence mytable_id_seq owned by mytable.id;

Sequence Next Value:

 alter table mytable alter column mytable_id set default nextval('mytable_id_seq');

Add Foreign Key:

 alter table mytable ADD FOREIGN KEY (foreign_id) REFERENCES public.mytable2(foreign_id);

Create Index Json:

 CREATE INDEX mytable_idx ON Public.mytable((Data->'Key'->'Key'->>'value'));

Create Index:

 CREATE INDEX mytable_idx ON public.mytable(id);

Drop Index:

 DROP INDEX public.mytable_idx;

Re-Cluster Table:

 Cluster mytable using mytable_pkey;

Trigger:

 CREATE TRIGGER "tg_mytrigger" BEFORE UPDATE OF my_column OR INSERT ON public.mytable FOR EACH ROW EXECUTE PROCEDURE public.mytablestrigger();