Spark Extension

The Pandas extension provides the code generator, which targes Pandas.

Examples

Create an instance of a SparkCodeGenerator:

from nldsl import SparkCodeGenerator

self.code_gen = SparkCodeGenerator()

Generated code: x = y.filter(y.col1 == m & y.col3.isin([v1, v2, v3])):

model = "## x = on y | select rows y.col1 == m and y.col3 in [v1, v2, v3]"
result = self.code_gen(model)

Generated code: x = df.dropDuplicates():

model = "## x = on df | drop duplicates"
result = self.code_gen(model)

DSL statement that adds new rules to the grammer:

model_def = #$ sort and clean $x = sort by $x | drop duplicates
model_eval = ## x = on df | sort and clean col1
result = self.code_gen(model_def + mode_eval)

Grammar Rules

nldsl.spark_extension.expression_only(code, args, env=None)

Parses python expressions outside of a pipeline.

Examples

  1. x = 5

  2. x = y % (5.7 + z / 2)

  3. x = w and (y or not z)

Grammar

!expr

Parameters

expr (expression) – The expression to be evaluated.

Type

Function

nldsl.spark_extension.on_dataframe(code, args, env=None)

Starts a pipeline on the given DataFrame.

Examples

  1. x = on df

  2. x = on df | transformer 1 … | transformer n … | operation

Grammar

on $dataframe

Parameters

dataframe (variable) – The name of DataFrame

Type

Initialization

nldsl.spark_extension.create_dataframe(code, args, env=None)

Creates a new DataFrame from an list.

Examples

  1. x = create dataframe from my_data with header ‘col1’, ‘col2’, ‘col3’

Grammar

create dataframe from $data with header $header[$col_name]

Parameters
  • data (variable) – The data from which to create the dataframe.

  • header (list) – A list of column names

Type

Initialization

nldsl.spark_extension.load_from(code, args, env=None)

Load a DataFrame from a file.

Examples

  1. x = load from “my_file.json” as json

  2. x = load from “my_file.csv” as csv | drop duplicates

Grammar

load from $path as $type type := { json, csv }

Parameters
  • path (variable) – A string containing the path to a file.

  • type (variable) – The type of the file.

Type

Initialization

nldsl.spark_extension.save_to(code, args, env=None)

Save a DataFrame to a file.

Examples

  1. on df | save to “my_file.json” as json

  2. on df | save to “my_file.csv” as csv

Grammar

save to $path as $type type := { json, csv }

Parameters
  • path (variable) – A string containing the path to a file.

  • type (variable) – The type of the file.

Type

Operation

nldsl.spark_extension.union(code, args, env=None)

Compute the union of rows.

Examples

  1. x = on df | union other

Grammar

union $table

Parameters

table (variable) – The table from which all rows will be added.

Type

Transformation

nldsl.spark_extension.difference(code, args, env=None)

Remove all rows which are contained in table.

Examples

  1. x = on df | difference other

Grammar

difference $table

Parameters

table (variable) – The table which contains all rows that shall be removed.

Type

Transformation

nldsl.spark_extension.intersection(code, args, env=None)

Remove all rows which are not contained in table.

Examples

  1. x = on df | intersection other

Grammar

intersection $table

Parameters

table (variable) – The table which contains all rows that shall not be removed.

Type

Transformation

nldsl.spark_extension.select_columns(code, args, env=None)

Select certain columns from a DataFrame.

Examples

  1. ## x = on df | select columns df.col1, col2, df.col3

  2. ## x = on df | select columns “col1”, “col2”, “col3”

Grammar

select columns $columns[$col]

Parameters

columns (varlist) – A list of column names.

Type

Transformation

nldsl.spark_extension.select_columns(code, args, env=None)

Select certain columns from a DataFrame.

Examples

  1. ## x = on df | select columns df.col1, col2, df.col3

  2. ## x = on df | select columns “col1”, “col2”, “col3”

Grammar

select columns $columns[$col]

Parameters

columns (varlist) – A list of column names.

Type

Transformation

nldsl.spark_extension.select_rows(code, args, env=None)

Select the rows of a DataFrame based on some condition.

The condition can be composed out of boolean, comparison and arithmetic expression. The operator precedence is equivalent to python and it is possible to use brackets to modify it.

Examples

  1. ## x = on df | select rows df.col1 > (14.2 + z) and df.col2 == ‘A’

  2. ## x = on df | select rows df.col1 != 0 and not df.col2 in [3, 5, 7]

  3. ## x = on df | select rows df.col3 % df.col1 != 2 or df.col1 <= 12

Grammar

select rows !condition

Parameters

condition (expression) – A boolean expression used as a row filter.

Type

Transformation

nldsl.spark_extension.drop_columns(code, args, env=None)

Drop certain columns from a DataFrame.

Examples

  1. ## x = on df | drop columns df.col1, col2, df.col3

  2. ## x = on df | drop columns “col1”, “col2”, “col3”

Grammar

drop columns $columns[$col]

Parameters

columns (varlist) – A list of column names.

Type

Transformation

nldsl.spark_extension.join(code, args, env=None)

Compute join with this DataFrame.

Examples

  1. x = on df | join inner df2 on ‘col1’, ‘col2’

  2. x = on df | join left df2 on ‘col1’

Grammar

join $how $table on $columns[$col] how := { left, right, outer, inner }

Parameters
  • how (variable) – How the join shall be performed.

  • table (variable) – The table with which to join.

  • columns (varlist) – A list of column on which to join.

Type

Transformation

nldsl.spark_extension.group_by(code, args, env=None)

Group a DataFrame and apply an aggregation.

Examples

  1. ## x = on df | group by df.col1 apply min

  2. ## x = on df | group by df.col1, df.col2 apply mean

Grammar

group by $columns[$col] apply $aggregation aggregation := { min, max, sum, avg, mean, count }

Parameters
  • columns (varlist) – A list of column names.

  • aggregation (variable) – The aggregation operation to be performed.

Type

Operation

nldsl.spark_extension.replace_values(code, args, env=None)

Replace a value with another.

Every occurrence of old_value will be substituted with new_value.

Examples

  1. ## x = on df | replace values 1 by 0

  2. ## x = on df | replace values “old” by “new”

Grammar

replace $old_value with $new_value

Parameters
  • old_value (variable) – The value to be replaced.

  • new_value (variable) – The value it will be replaced with.

Type

Operation

nldsl.spark_extension.append_column(code, args, env=None)

Append a new column to the DataFrame.

Example

  1. x = on df | append column df.col1 * 2 as ‘new_col’

Grammar

append column !col_expr as $col_name

Parameters
  • col_expr (expression) – An expression defining the new value of the column.

  • col_name (variable) – The new name of the column.

Type

Transformation

nldsl.spark_extension.sort_by(code, args, env=None)

Sort the DataFrame by certain columns.

Examples

  1. ## x = on df | sort by df.col1 descending

  2. ## x = on df | sort by “col1” ascending, “col2” descending

Grammar

sort by $columns[$col $order] order := { ascending, descending }

Parameters

columns (varlist) – A list of column names and sorting order pairs.

Type

Transformation

nldsl.spark_extension.drop_duplicates(code, args, env=None)

Drop duplicate rows from a DataFrame.

Examples

  1. x = on df | drop duplicates

  2. x = on df | select rows df.col1 != 0 | drop duplicates

Grammar

drop duplicates

Type

Transformation

nldsl.spark_extension.rename_columns(code, args, env=None)

Rename some columns in a DataFrame.

Examples

  1. x = on df | rename columns col1 to col2

  2. x = on df | rename columns col1 to col2, col3 to col4 | show

Grammar

rename columns $columns[$current to $new]

Parameters

columns (list) – A list of current and new column names.

Type

Transformation

nldsl.spark_extension.show(code, args, env=None)

Print the DataFrame to stdout.

Examples

  1. on df | show

  2. on df | drop duplicates | show

Grammar

show

Type

Operation

nldsl.spark_extension.show_schema(code, args, env=None)

Print the schema of the DataFrame to stdout.

Examples

  1. on df | show schema

Grammar

show schema

Type

Operation

nldsl.spark_extension.describe(code, args, env=None)

Print a description of the DataFrame to stdout.

Examples

  1. on df | describe

  2. on df | drop duplicates | describe

Grammar

describe

Type

Operation

nldsl.spark_extension.head(code, args, env=None)

Get the num_rows top most rows in the DataFrame.

Examples

  1. on df | head 10

  2. on df | drop duplicates | head 100

Grammar

head $num_rows

Parameters

num_rows (variable) – The number of rows to return.

Type

Operation

nldsl.spark_extension.count(code, args, env=None)

Count the number of rows in the DataFrame.

Examples

  1. on df | count

  2. on df | drop duplicates | count

Grammar

count

Type

Operation

Code Generator

class nldsl.spark_extension.SparkCodeGenerator(recommend=True, spark_name='spark', import_name='SparkSession', start_session_named='', stop_session=False, **kwargs)[source]

Bases: nldsl.core.codegen.CodeGenerator

A SparkCodeGenerator translates DSL statements into executable spark code.

There are two kind of DSL statements, the ones which can be evaluate to executable code and the ones, which extend the DSL Grammer. As a result parsing a set of DSL statements usually has two impacts. Executable code is generate and the SparkCodeGenerator modifies itself in such a way that he is capable of parsing statements according to the new Grammer rules.

Furthermore the SparkCodeGenerator derives from the CodeMap class and his grammer can also be extended by rules, which can not be expressed within the DSL. This is done with the __setitem__ and registerFunction methods.

Example

Adding new rules:

def show(code, args):
    return "print({})".format(code)

SparkCodeGenerator.registerFunction(“show entire table”, show) # Add to class myCodeGen = SparkCodeGenerator(); myCodeGen[“show entire table”] = show # only add to this instance

Parameters
  • recommend (bool) – Whether to return a Recommendation if possible or always raise an error.

  • spark_name (str) – The name of the variable which holds/should hold the spark session

  • import_name (str) – The name under which the spark session is imported.

  • start_session_named (str) – If not empty a SparkSession with this name will be created.

  • stop_session (bool) – If True code will be added to stop the SparkSession

  • kwargs (dict) – Additional keyword argument, which will be added to the environment

postprocessing(code_list)[source]

Optionally adds lines of code, which create and stop a SparkSession. The behavior of this function is determined by the start_session_named and stop_session parameters of the __init__ method.

Parameters

code_list (list) – A list of string containing executable code.

Returns

(list) The modified code list

extract_environment(source_lines)[source]

Extract the name of the spark session from source_lines if it exists.

Parameters

source_lines (list) – A list of strings containing source code.

class nldsl.spark_extension.SparkExpressionRule(expr_name, next_keyword)[source]

Bases: nldsl.core.rules.ExpressionRule

An ExpressionRule dedicated to parsing Spark expressions.

Parameters
  • expr_name (str) – The name of the expression.

  • next_keyword (str) – The keyword following the expression or None