top of page

PySpark Practice Sets | Big Data PySpark Assignment Help

Practice Set 1

Install everything necessary to make spark work

!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark

Set the paths to the installs

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

Find the spark installation

import findspark
findspark.init()

Start doing fancy pyspark stuff

from pyspark.sql import SparkSession

spark = (
    SparkSession
        .builder
        .appName("programming")
        .master("local")
        .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .config('spark.ui.port', '4050')
        .getOrCreate()
)

Start Here

Load the data from the files below into dataframes and answer the questions under 'Analyze the data'.

Resources

  • orders.csv

  • customers.csv


# TODO
orders_df = None
# TODO
customers_df = None

Analyze the data

Show total orders and revenue.

What's the average order revenue per day?

Which order has the most revenue each day?

How much revenue by customer zip code?

Who has been a customer the longest?

Which customer has the least amount of orders?



Practice Set 2

Install everything necessary to make spark work. :)

!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark

Set the paths to the installs

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

Find the spark installation

import findspark
findspark.init()

Start doing fancy pyspark stuff

from pyspark.sql import SparkSession
spark = (
    SparkSession
        .builder
        .appName("programming")
        .master("local")
        .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .config('spark.ui.port', '4050')
        .getOrCreate()
)

Create the table

OUTPUT_DELTA_PATH = './output/delta/'
spark.sql('CREATE DATABASE IF NOT EXISTS EXERCISE')

spark.sql('''
    CREATE TABLE IF NOT EXISTS EXERCISE.WORKED_HOURS(
        worked_date date
        , worker_id int
        , delete_flag string
        , hours_worked double
    ) USING DELTA
    PARTITIONED BY (worked_date)
    LOCATION "{0}"
    '''.format(OUTPUT_DELTA_PATH)
)

Start Here Load all the files into the table created above, EXERCISE.WORKED_HOURS. Assume each file is received daily, to simulate a daily load process. Meaning, only one file should be read/loaded at a time. Rules:

  • A worker can only work once per day.

  • delete_flag = Y means the record should be removed from the table, if exists.

Resources:

  • 20210609_worked_hours.csv

  • 20210610_worked_hours.csv

  • 20210611_worked_hours.csv


Load 2021-06-09 file and output table contents

Load 2021-06-10 file and output table contents

Load 2021-06-11 file and output table contents



Practice Set 3

Install everything necessary to make spark work.

!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark

Set the paths to the installs


import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

Find the spark installation


import findspark
findspark.init()

Start doing fancy pyspark stuff


from pyspark.sql import SparkSession, DataFrame
spark = (
    SparkSession
        .builder
        .appName("programming")
        .master("local")
        .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .config('spark.ui.port', '4050')
        .getOrCreate()
)


from pyspark.sql.functions import expr, rand, col, unix_timestamp

# This will generate 30 rows per emp_count requested.
def get_sample_data(emp_count: int) -> DataFrame:

  start_date = '2021-04-01'
  end_date = '2021-04-30'
  theseed = 42

  return (
    spark.sql("select explode(sequence({0}, {1})) as emp_id".format(1, emp_count))
      .withColumn('start_datetime', expr("explode(sequence(cast('{0}' as timestamp), cast('{1}' as timestamp), interval 1 day))".format(start_date, end_date)))
      .withColumn('rand_time', (rand(theseed) * 86401).cast('int'))  # find a time for the shift to start
      .withColumn('start_shift_datetime', (unix_timestamp(col('start_datetime')) + col('rand_time')).cast('timestamp'))
      .withColumn('rand_shift_length', (rand(theseed) * 21600 + 7200).cast('int'))  # assume a shift is between 2-8 hrs in length
      .withColumn('end_shift_datetime', (unix_timestamp(col('start_shift_datetime')) + col('rand_shift_length')).cast('timestamp'))
      .select(col('emp_id'), col('start_shift_datetime'), col('end_shift_datetime'))
  )

Start Here

Find what day part (AM, PM, LUNCH) a shift mostly occurs in. If there is a tie indicate as such.

  • AM is before 1100

  • LUNCH is between 1100 (inclusive) and 1400 (exclusive)

  • PM is 1400 or later

Each hour a shift occurs in counts toward that day part.

For example, if a shift starts at 1045 and ends at 1530 the breakdown is as follows.

  • AM: 10

  • LUNCH: 11, 12, 13

  • PM: 14, 15

This shift occurs the most during the LUNCH daypart. Sample output below.






Start with an outline of how the problem could be solved. It is expected that this will not be fully completed within the self-imposed time limit of the exercise. This is used to guage how you think about and approach complex problems.

How does your solution scale as data quantity increases? Keeping in mind the resource limitations that Google Collab has.


def get_results(df: DataFrame) -> DataFrame:

  result = (
    df
      # TODO: This is where the code goes to generate the result being returned.
  )

  # Make it actually do all the calculations but not save anywhere
  result.write.format('noop').mode('overwrite').save()

  return (
      result
        .select(
          col('emp_id')
          , col('start_shift_datetime')
          , col('end_shift_datetime')
          , col('day_part')
        )
  )

Use this section to do some light performance testing on your solution. Keep note of how the time changes as the records increase. Is the change linear, constant, etc?


import time
sample_data = get_sample_data(1000)  # 1000 = 30000 records
s = time.time()
rsp = get_results(df = sample_data)
print("Took {0} seconds\n".format((time.time() - s)))


Practice Set 4

Install everything necessary to make spark work. :)


!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark

Set the paths to the installs


import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

Find the spark installation

import findspark
findspark.init(

Start doing fancy pyspark stuff

from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, ArrayType
from pyspark.sql import DataFrame, SparkSession
INPUT_FILE = './resources/Order.json'  # TODO: Change this based on actual location for your environment setup
OUTPUT_CSV_FILE = './output/files/output.csv'
OUTPUT_DELTA_PATH = './output/delta/'
spark = (
    SparkSession
        .builder
        .appName("programming")
        .master("local")
        .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .config('spark.ui.port', '4050')
        .getOrCreate()
)


def read_json(file_path: str, schema: StructType) -> DataFrame:
    """
    The goal of this method is to parse the input json data using the schema from another method.

    We are only interested in data starting at orderPaid attribute.

    :param file_path: Order.json will be provided
    :param schema: schema that needs to be passed to this method
    :return: Dataframe containing records from Order.json
    """
    return None


def get_struct_type() -> StructType:
    """
    Build a schema based on the the file Order.json

    :return: Structype of equivalent JSON schema
    """
    discount_type = StructType([StructField("amount", IntegerType(), True),
                                StructField("description", StringType(), True)
                                ])

    child_item_type = StructType([StructField("lineItemNumber", StringType(), True),
                                  StructField("itemLabel", StringType(), True),
                                  StructField("quantity", DoubleType(), True),
                                  StructField("price", IntegerType(), True),
                                  StructField("discounts", "TODO", True),
                                  ])

    item_type = StructType([StructField("lineItemNumber", StringType(), True),
                            StructField("itemLabel", StringType(), True),
                            StructField("quantity", DoubleType(), True),
                            StructField("price", IntegerType(), True),
                            StructField("discounts", "TODO", True),
                            StructField("childItems", "TODO", True),
                            ])

    order_paid_type = StructType([StructField("orderToken", StringType(), True),
                                  StructField("preparation", StringType(), True),
                                  StructField("items", "TODO", True),
                                  ])

    message_type = StructType([StructField("orderPaid", "TODO", True)])

    data_type = StructType([StructField("message", "TODO", True)])

    body_type = StructType([StructField("id", StringType(), True),
                            StructField("subject", StringType(), True),
                            StructField("data", "TODO", True),
                            StructField("eventTime", StringType(), True),
                            ])
    return None


def get_rows_from_array(df: DataFrame) -> DataFrame:
    """
    Input data frame contains columns of type array. Identify those columns and convert them to rows.

    :param df: Contains column with data type of type array.
    :return: The dataframe should not contain any columns of type array
    """
    return None


def get_unwrapped_nested_structure(df: DataFrame) -> DataFrame:
    """
    Convert columns that contain multiple attributes to columns of their own

    :param df: Contains columns that have multiple attributes
    :return: Dataframe should not contain any nested structures
    """
    return None


def write_df_as_csv(df: DataFrame) -> None:
    """
    Write the data frame to a local local destination of your choice with headers

    :param df: Contains flattened order data
    """
    return None


def create_delta_table(spark: SparkSession) -> None:
    spark.sql('CREATE DATABASE IF NOT EXISTS EXERCISE')

    spark.sql('''
    CREATE TABLE IF NOT EXISTS EXERCISE.ORDERS(
        OrderToken String,
        Preparation  String,
        ItemLineNumber String,
        ItemLabel String,
        ItemQuantity Double,
        ItemPrice Integer,
        ItemDiscountAmount Integer,
        ItemDiscountDescription String,
        ChildItemLineNumber String, 
        ChildItemLabel String,
        ChildItemQuantity Double,
        ChildItemPrice Integer,
        ChildItemDiscountAmount Integer,
        ChildItemDiscountDescription String
    ) USING DELTA
    LOCATION "{0}"
    '''.format(OUTPUT_DELTA_PATH))

    return None


def write_df_as_delta(df: DataFrame) -> None:
    """
    Write the dataframe output to the table created, overwrite mode can be used

    :param df: flattened data
    :return: Data from the orders table
    """

    pass


def read_data_delta(spark: SparkSession) -> DataFrame:
    """
    Read data from the table created
    
    :param spark:
    :return:
    """

    return None


if __name__ == '__main__':
    input_schema = get_struct_type()

    input_df = read_json(INPUT_FILE, input_schema)

    arrays_to_rows_df = get_rows_from_array(input_df)

    unwrap_struct_df = get_unwrapped_nested_structure(arrays_to_rows_df)

    write_df_as_csv(unwrap_struct_df)

    create_delta_table(spark)
    write_df_as_delta(unwrap_struct_df)

    result_df = read_data_delta(spark)
    result_df.show(truncate=False)


Contact Us to get solution of above problem or need any other help related to Big data Spark at:


realcode4you@gmail.com
47 views0 comments
bottom of page