PySpark

Fire Insights provides a PySpark processor for writing PySpark/Python code.

Interface

In the PySpark Processor, we have to implement the myfn function which gets invoked:

def myfn(spark: SparkSession, workflowContext: WorkflowContext, id: int, inDF: DataFrame):

* spark : SparkSession object
* workflowContext : Can be used for outputting results to the user
* id : id of the current processor
* inDF : Input PySpark dataframe

WorkflowContext

WorkflowContext provides the following methods for outputting data to the user:

* def outStr(self, text: str)
* def outNameValue(self, nm: str, val: str)
* def outSchema(self, id: int, title: str, df: DataFrame)
* def outDataFrame(self, id: int, title: str, df: DataFrame)
* def outPandasDataframe(self, id: int, title: str, df: pd.DataFrame)
* def outNumpy1darray(self, id: int, title: str, arr: np.ndarray)
* def outNumpy2darray(self, id: int, title: str, arr: np.ndarray)

Example 1

Below is an example code for the PySpark Node.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from pyspark.sql.types import StringType
from pyspark.sql.functions import *
from pyspark.sql import *
from workflowcontext import *

def myfn(spark: SparkSession, workflowContext: WorkflowContext, id: int, inDF: DataFrame):
  house_type_udf = udf(lambda bedrooms: "big house" if int(bedrooms) >2 else "small house", StringType())
  filetr_df = inDF.select("id", "price", "lotsize", "bedrooms")
  outDF = filetr_df.withColumn("house_type", house_type_udf(filetr_df.bedrooms))
  return outDF

Example 2

Below is another example which uses sklearn

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from pyspark.sql.types import StringType
from pyspark.sql.functions import *
from pyspark.sql import *
from workflowcontext import *

import numpy as np
import pandas as pd

from sklearn.linear_model import LinearRegression
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn import metrics

from joblib import dump, load

def myfn(spark: SparkSession, workflowContext: WorkflowContext, id: int, inDF: DataFrame):
  # Convert the Spark DataFrame to a Pandas DataFrame using Arrow
  dataset = inDF.select("*").toPandas()

  dataset = dataset.fillna(method='ffill')

  X = dataset[
        ['fixed acidity', 'volatile acidity', 'citric acid', 'residual sugar', 'chlorides', 'free sulfur dioxide',
         'total sulfur dioxide', 'density', 'pH', 'sulphates', 'alcohol']].values

  y = dataset['quality'].values

  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)

  # There are three steps to model something with sklearn
  # 1. Set up the model
  model = LinearRegression()
  # 2. Use fit
  ft = model.fit(X_train, y_train)
  print(ft)
  # 3. Check the score
  scr = model.score(X_test, y_test)
  workflowContext.outStr("Model Score : " + str(scr))

  # 4. Print model
  workflowContext.outStr("Model Coeffient : " + str(model.coef_))
  workflowContext.outStr("Model Intercept : " + str(model.intercept_))

  # 5. Predict test data
  y_pred = model.predict(X_test)

  # 6. See difference between actual and predicted value
  df = pd.DataFrame({'Actual': y_test, 'Predicted': y_pred})
  df1 = df.head(25)
  workflowContext.outPandasDataframe(id, "Actual - Predicted : ", df1)

  # 7. Evaluate the performance
  workflowContext.outStr("Mean Absolute Error:" + str(metrics.mean_absolute_error(y_test, y_pred)))
  workflowContext.outStr("Mean Squared Error:" + str(metrics.mean_squared_error(y_test, y_pred)))
  workflowContext.outStr("Root Mean Squared Error:" + str(np.sqrt(metrics.mean_squared_error(y_test, y_pred))))

  return inDF