Python Transforms
In addition to SQL transforms, you may run Python transforms in your pipeline. Python transforms are defined by a Python function that takes a DataFrame as input and returns a DataFrame as output. The function is then registered as a transform in the pipeline.
Exactly like the SQL transform, you can define a python transform by creating a python file and adding it
to the metadata/transform/<domain>
directory.
You can also define a YAML configuration file for the python script using the exact same format as the SQL transform.
Arguments specified in the command line through the --options
flag will be passed to the python function as keyword arguments:
$ starlake transform --name <domain>.<transform_name> --options key1=value1,key2=value2
will be passed as keyword arguments to the python function:
<transform-name>.py --key1 value1 --key2 value2
The dataframe returned by the python function will be saved as the output table of the transform.
Before returning the dataframe, the function must create a temporary view with the name SL_THIS
so that the dataframe can be saved as a table.
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession
if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
spark = SparkSession \
.builder \
.getOrCreate()
partitions = 2
n = 100000 * partitions
def f(_: int) -> float:
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 <= 1 else 0
count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
result = "Pi is roughly %f" % (4.0 * count / n)
df = spark.createDataFrame([[result]])
df.createOrReplaceTempView("SL_THIS")