top of page

PySpark Assignment Help | Practice Sample Set

Introduction

This notebook will introduce Spark capabilities to deal with data in a structured way. Basically, everything turns around the concept of Data Frame and using SQL language to query them. We will see how the data frame abstraction, very popular in other data analytics ecosystems (e.g. R and Python/Pandas), it is very powerfull when performing exploratory data analysis. In fact, it is very easy to express data queries when used together with the SQL language. Moreover, Spark distributes this column-based data structure transparently, in order to make the querying process as efficient as possible.

This lab session will assume that you have uploaded two data files into cloud, and note down the address:

  • mtcars.csv

  • kddcup_data_10_percent-d8e1d.gz


1. Loading in a DataFrame

To create a Spark DataFrame we load an external DataFrame, called mtcars. This DataFrame includes 32 observations on 11 variables.

[, 1] mpg Miles/(US) --> gallon [, 2] cyl --> Number of cylinders [, 3] disp --> Displacement (cu.in.) [, 4] hp --> Gross horsepower [, 5] drat --> Rear axle ratio [, 6] wt --> Weight (lb/1000) [, 7] qsec --> 1/4 mile time [, 8] vs --> V/S [, 9] am --> Transmission (0 = automatic, 1 = manual) [,10] gear --> Number of forward gears [,11] carb --> Number of carburetors



Read Data From Git Hub Using 'wget'


import wget
link_to_data = 'https://github.com/tulip-lab/sit742/raw/master/Jupyter/data/mtcars.csv'
DataSet = wget.download(link_to_data)
import pandas as pd
mtcars = pd.read_csv('mtcars.csv')
mtcars.head()

output:


Initialize the Spark

from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

Creating Spark DataFrames

With SQLContext and a loaded local DataFrame, we create a Spark DataFrame:


sdf = sqlContext.createDataFrame(mtcars) 
sdf.printSchema()

output:

root |-- car: string (nullable = true) |-- mpg: double (nullable = true) |-- cyl: long (nullable = true) |-- disp: double (nullable = true) |-- hp: long (nullable = true) |-- drat: double (nullable = true) |-- wt: double (nullable = true) |-- qsec: double (nullable = true) |-- vs: long (nullable = true) |-- am: long (nullable = true) |-- gear: long (nullable = true) |-- carb: long (nullable = true)



You can also directly load this csv file into a Spark DataFrame.

sdf2 = sqlContext.read.format("com.databricks.spark.csv").load("mtcars.csv")


Displays the content of the DataFrame


sdf.show(5)

output:


Selecting columns


sdf.select('mpg').show(5)

Output:








Filtering Data

Filter the DataFrame to only retain rows with mpg less than 18


sdf.filter(sdf['mpg'] < 18).show(5)

output:


Operating on Columns

SparkR also provides a number of functions that can directly applied to columns for data processing and aggregation. The example below shows the use of basic arithmetic functions to convert lb to metric ton.


sdf.withColumn('wtTon', sdf['wt'] * 0.45).show(6)

output:



Grouping, Aggregation

Spark DataFrames support a number of commonly used functions to aggregate data after grouping. For example we can compute the average weight of cars by their cylinders as shown below:


sdf.groupby(['cyl'])\
.agg({"wt": "AVG"})\
.show(5)

output;








# We can also sort the output from the aggregation to get the most common cars
car_counts = sdf.groupby(['cyl'])\
.agg({"wt": "count"})\
.sort("count(wt)", ascending=False)\
.show(5)

output:









Running SQL Queries from Spark DataFrames

A Spark DataFrame can also be registered as a temporary table in Spark SQL and registering a DataFrame as a table allows you to run SQL queries over its data. The sql function enables applications to run SQL queries programmatically and returns the result as a DataFrame.


# Register this DataFrame as a table.
sdf.registerTempTable("cars")

# SQL statements can be run by using the sql method
highgearcars = sqlContext.sql("SELECT gear FROM cars WHERE cyl >= 4 AND cyl <= 9")
highgearcars.show(6)

Output:

+----+ |gear| +----+ | 4| | 4| | 4| | 3| | 3| | 3| +----+ only showing top 6 rows




SparkSQL Application


Getting the data and creating the RDD

As we did in previous notebooks, we will use the reduced dataset (10 percent) provided for the KDD Cup 1999, containing nearly half million nework interactions. The file is provided as a Gzip file that we will download locally.


import wget

link_to_data = 'https://github.com/tulip-lab/sit742/blob/master/Jupyter/data/kddcup.data_10_percent.gz?raw=true'

DataSet = wget.download(link_to_data, out='kdd.gz')

data_file = "kdd.gz"

# assume that the data file has been uploaded to DBFS
#data_file = "kddcup_data_10_percent-d8e1d.gz"
raw_data = sc.textFile(data_file).cache()

Getting a Data Frame

A Spark DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R or Pandas. They can be constructed from a wide array of sources such as a existing RDD in our case.

The entry point into all SQL functionality in Spark is the SQLContext class. To create a basic instance, all we need is a SparkContext reference. Since we are running Spark in shell mode (using pySpark) we can use the global context object sc for this purpose.


from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

Inferring the schema

With a SQLContext, we are ready to create a DataFrame from our existing RDD. But first we need to tell Spark SQL the schema in our data.

Spark SQL can convert an RDD of Row objects to a DataFrame. Rows are constructed by passing a list of key/value pairs as kwargs to the Row class. The keys define the column names, and the types are inferred by looking at the first row. Therefore, it is important that there is no missing data in the first row of the RDD in order to properly infer the schema.

In our case, we first need to split the comma separated data, and then use the information in KDD's 1999 task description to obtain the column names.


help(sqlContext)

output:

Help on SQLContext in module pyspark.sql.context object: class SQLContext(builtins.object) | SQLContext(sparkContext, sparkSession=None, jsqlContext=None) | | The entry point for working with structured data (rows and columns) in Spark, in Spark 1.x. | | As of Spark 2.0, this is replaced by :class:`SparkSession`. However, we are keeping the class | here for backward compatibility. | | A SQLContext can be used create :class:`DataFrame`, register :class:`DataFrame` as | tables, execute SQL over tables, cache tables, and read parquet files. | | .. deprecated:: 3.0.0 | Use :func:`SparkSession.builder.getOrCreate()` instead. |

...

...

...


Once we have our RDD of Row we can infer and register the schema.


interactions_df = sqlContext.createDataFrame(row_data)
interactions_df.registerTempTable("interactions")

Now we can run SQL queries over our data frame that has been registered as a table.


# Select tcp network interactions with more than 1 second duration and no transfer from destination
tcp_interactions = sqlContext.sql("""
    SELECT duration, dst_bytes FROM interactions WHERE protocol_type = 'tcp' AND duration > 1000 AND dst_bytes = 0
""")
tcp_interactions.show()

output:

+--------+---------+ |duration|dst_bytes| +--------+---------+ | 5057| 0| | 5059| 0| | 5051| 0| | 5056| 0| | 5051| 0| | 5039| 0| | 5062| 0| | 5041| 0| | 5056| 0| | 5064| 0| | 5043| 0| | 5061| 0| | 5049| 0| | 5061| 0| | 5048| 0| | 5047| 0| | 5044| 0| | 5063| 0| | 5068| 0| | 5062| 0| +--------+---------+ only showing top 20 rows


The results of SQL queries are RDDs and support all the normal RDD operations.

# Output duration together with dst_bytes
tcp_interactions_out = tcp_interactions.rdd.map(lambda p: "Duration: {}, Dest. bytes: {}".format(p.duration, p.dst_bytes))
for ti_out in tcp_interactions_out.collect():
  print(ti_out)

output:

Duration: 5057, Dest. bytes: 0 Duration: 5059, Dest. bytes: 0 Duration: 5051, Dest. bytes: 0 Duration: 5056, Dest. bytes: 0 Duration: 5051, Dest. bytes: 0 Duration: 5039, Dest. bytes: 0 Duration: 5062, Dest. bytes: 0 Duration: 5041, Dest. bytes: 0 Duration: 5056, Dest. bytes: 0 Duration: 5064, Dest. bytes: 0 Duration: 5043, Dest. bytes: 0 Duration: 5061, Dest. bytes: 0 Duration: 5049, Dest. bytes: 0 Duration: 5061, Dest. bytes: 0 Duration: 5048, Dest. bytes: 0 Duration: 5047, Dest. bytes: 0 Duration: 5044, Dest. bytes: 0

...

...


We can easily have a look at our data frame schema using printSchema.


interactions_df.printSchema()

output:

root |-- duration: long (nullable = true) |-- protocol_type: string (nullable = true) |-- service: string (nullable = true) |-- flag: string (nullable = true) |-- src_bytes: long (nullable = true) |-- dst_bytes: long (nullable = true)



Queries as DataFrame operations

Spark DataFrame provides a domain-specific language for structured data manipulation. This language includes methods we can concatenate in order to do selection, filtering, grouping, etc. For example, let's say we want to count how many interactions are there for each protocol type. We can proceed as follows.


from time import time

t0 = time()
interactions_df.select("protocol_type", "duration", "dst_bytes").groupBy("protocol_type").count().show()
tt = time() - t0

print("Query performed in {} seconds".format(round(tt,3)))

Output

+-------------+------+ |protocol_type| count| +-------------+------+ | tcp|190065| | udp| 20354| | icmp|283602| +-------------+------+ Query performed in 6.509 seconds


Now imagine that we want to count how many interactions last more than 1 second, with no data transfer from destination, grouped by protocol type. We can just add to filter calls to the previous.


t0 = time()
interactions_df.select("protocol_type", "duration", "dst_bytes").filter(interactions_df.duration>1000).filter(interactions_df.dst_bytes==0).groupBy("protocol_type").count().show()
tt = time() - t0

print("Query performed in {} seconds".format(round(tt,3)))

Output:

+-------------+-----+ |protocol_type|count| +-------------+-----+ | tcp| 139| +-------------+-----+ Query performed in 6.583 seconds


We can use this to perform some exploratory data analysis. Let's count how many attack and normal interactions we have. First we need to add the label column to our data.


def get_label_type(label):
    if label!="normal.":
        return "attack"
    else:
        return "normal"
    
row_labeled_data = csv_data.map(lambda p: Row(
    duration=int(p[0]), 
    protocol_type=p[1],
    service=p[2],
    flag=p[3],
    src_bytes=int(p[4]),
    dst_bytes=int(p[5]),
    label=get_label_type(p[41])
    )
)
interactions_labeled_df = sqlContext.createDataFrame(row_labeled_data)

This time we don't need to register the schema since we are going to use the OO query interface.

Let's check the previous actually works by counting attack and normal data in our data frame.


t0 = time()
interactions_labeled_df.select("label").groupBy("label").count().show()
tt = time() - t0

print("Query performed in {} seconds".format(round(tt,3)))

output"

+------+------+ | label| count| +------+------+ |normal| 97278| |attack|396743| +------+------+ Query performed in 5.863 seconds



Now we want to count them by label and protocol type, in order to see how important the protocol type is to detect when an interaction is or not an attack.


t0 = time()
interactions_labeled_df.select("label", "protocol_type").groupBy("label", "protocol_type").count().show()
tt = time() - t0

print("Query performed in {} seconds".format(round(tt,3)))

output:

+------+-------------+------+ | label|protocol_type| count| +------+-------------+------+ |normal| udp| 19177| |normal| icmp| 1288| |normal| tcp| 76813| |attack| icmp|282314| |attack| tcp|113252| |attack| udp| 1177| +------+-------------+------+ Query performed in 5.846 seconds



At first sight it seems that udp interactions are in lower proportion between network attacks versus other protocol types.

And we can do much more sofisticated groupings. For example, add to the previous a "split" based on data transfer from target.


t0 = time()
interactions_labeled_df.select("label", "protocol_type", "dst_bytes").groupBy("label", "protocol_type", interactions_labeled_df.dst_bytes==0).count().show()
tt = time() - t0

print("Query performed in {} seconds".format(round(tt,3)))

output:

+------+-------------+---------------+------+
| label|protocol_type|(dst_bytes = 0)| count|
+------+-------------+---------------+------+
|normal|          udp|          false| 15583|
|attack|          udp|          false|    11|
|attack|          tcp|           true|110583|
|normal|          tcp|          false| 67500|
|attack|         icmp|           true|282314|
|attack|          tcp|          false|  2669|
|normal|          tcp|           true|  9313|
|normal|          udp|           true|  3594|
|normal|         icmp|           true|  1288|
|attack|          udp|           true|  1166|
+------+-------------+---------------+------+

Query performed in 6.616 seconds


We see how relevant is this new split to determine if a network interaction is an attack. We will stop here, but we can see how powerfull this type of queries are in order to explore our data. Actually we can replicate all the splits we saw in previous notebooks, when introducing classification trees, just by selecting, groping, and filtering our dataframe. For a more detailed (but less real-world) list of Spark's DataFrame operations and data sources, have a look at the oficial documentation here.



5 Comments


https://luck8.plus/ hôm bữa mình lướt thấy ai đó share nên tò mò bấm vào xem thử, kiểu xem giao diện họ làm ra sao thôi chứ không định “tham gia” gì. Vào trang cái thấy tiêu đề với mấy khối nội dung nổi bật ngay, nhìn phát là biết họ đang tập trung vào phần tin tức cập nhật. Mình có đọc lướt đoạn giới thiệu, thấy họ nói nền tảng có từ 2014 và số thành viên khá đông, nghe cũng thú vị nhưng mình chỉ xem cho biết. Điểm mình thích là chữ rõ, bố cục chia cụm nên kéo xuống không bị ngợp, chuyển qua lại cũng nhanh. Nhìn chung mấy box “tin tức cập nhật mới…

Like

https://ea88com.io/ mình lướt thử vì thấy bạn bè nhắc hoài, kiểu vào xem trang họ làm ra sao thôi chứ không có ý tìm hiểu sâu. Cảm giác đầu tiên là bố cục khá dễ chịu, chữ nghĩa không bị dồn dập nên đọc nhanh vẫn nắm được ý. Mình thấy họ để phần nói về link vào chính thức 2026 khá nổi, nhìn cái là biết họ muốn người mới khỏi bị lạc giữa mớ link linh tinh. Mấy đoạn giới thiệu cũng viết gọn, không kiểu kéo dài lê thê, nên lướt qua vẫn hiểu họ đang nói gì. Menu với các khối nội dung chia tách rõ ràng, đặc biệt cái box “link chính thức 2026” đặt…

Like

s666 trang chủ mình mới lướt thử vì thấy bạn bè nói nhiều, chủ yếu xem giao diện chứ chưa chơi gì. Ấn tượng đầu là bố cục khá gọn, nhìn vào là biết từng phần nằm đâu, không kiểu nhồi chữ khiến rối mắt. Mình mở trên điện thoại thấy tải ổn, chuyển qua lại giữa các mục cũng mượt, không phải chờ lâu. Có cái mình thích là họ để thông tin theo từng khối rõ ràng, đọc lướt vẫn nắm được ý, nhất là mấy đoạn nói về bảo mật tài khoản với hỗ trợ 24 7 nhìn khá dễ hiểu. Nói chung cảm giác trang làm chỉn chu, chữ dễ đọc, khoảng cách thoáng, và các…

Like

TG88 mình thấy mọi người bàn tán hoài nên cũng bấm vào xem thử cho biết. Mình không kiểu ngồi soi game hay gì đâu, chỉ muốn coi trang có dễ dùng không thôi. Vừa vào là thấy giao diện khá sáng sủa, khoảng trống vừa đủ nên nhìn không bị ngộp. Mấy mục trên trang được chia theo nhóm rõ ràng, lướt nhẹ một vòng là biết nên bấm chỗ nào tiếp theo, không phải đoán mò. Mình cũng thích cách họ để thông tin theo từng khối gọn gàng, kiểu nhìn phát nắm ý chính luôn, khỏi phải đọc cả đống chữ liền tù tì. Chuyển qua lại giữa các phần cũng nhanh vì menu đặt khá dễ…

Like

LX88 mình lướt thử cho biết thôi, kiểu xem giao diện có dễ dùng không. Ấn tượng đầu là trang nhìn gọn, các mục được chia thành từng khối nên đọc không bị ngợp, kéo xuống là nắm được ý chính khá nhanh. Mình có thấy họ để phần nói về giấy phép Curacao eGaming và mấy chứng chỉ kiểm tra độc lập như iTech Labs GLI ngay dạng box thông tin, nên ai tò mò về độ minh bạch thì liếc phát hiểu luôn, không phải đào sâu. Menu cũng đặt chỗ dễ nhìn, bấm qua lại giữa các trang không bị rối hay lag gì. Nói chung cảm giác giống một site được sắp xếp có chủ đích,…

Like

REALCODE4YOU

Realcode4you is the one of the best website where you can get all computer science and mathematics related help, we are offering python project help, java project help, Machine learning project help, and other programming language help i.e., C, C++, Data Structure, PHP, ReactJs, NodeJs, React Native and also providing all databases related help.

Hire Us to get Instant help from realcode4you expert with an affordable price.

USEFUL LINKS

Discount

ADDRESS

Noida, Sector 63, India 201301

Follows Us!

  • Facebook
  • Twitter
  • Instagram
  • LinkedIn

OUR CLIENTS BELONGS TO

  • india
  • australia
  • canada
  • hong-kong
  • ireland
  • jordan
  • malaysia
  • new-zealand
  • oman
  • qatar
  • saudi-arabia
  • singapore
  • south-africa
  • uae
  • uk
  • usa

© 2023 IT Services provided by Realcode4you.com

bottom of page