Homework 2 and 3

This commit is contained in:
Claudio Maggioni 2022-12-21 10:10:29 +01:00
parent b9d0e00634
commit 6a7e32a020
10 changed files with 4734 additions and 7 deletions

11
.gitignore vendored
View file

@ -300,4 +300,15 @@ TSWLatexianTemp*
# option is specified. Footnotes are the stored in a file with suffix Notes.bib.
# Uncomment the next line to have this generated file ignored.
#*Notes.bib
# gitignore template for Jupyter Notebooks
# website: http://jupyter.org/
.ipynb_checkpoints
*/.ipynb_checkpoints/*
# IPython
profile_default/
ipython_config.py
# Remove previous ipynb_checkpoints
# git rm -r .ipynb_checkpoints/

558
hw02/Untitled.ipynb Normal file
View file

@ -0,0 +1,558 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 31,
"id": "4abbae95",
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import time\n",
"from pymongo import MongoClient"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c1d3a065",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": 75,
"id": "f3196535",
"metadata": {},
"outputs": [],
"source": [
"mongo_conn_str = \"mongodb://localhost:27017\"\n",
"mongo = MongoClient(mongo_conn_str)\n",
"db = mongo[\"ddm\"]"
]
},
{
"cell_type": "code",
"execution_count": 33,
"id": "264e24b7",
"metadata": {},
"outputs": [],
"source": [
"def start_the_time():\n",
" global start_time\n",
" start_time = time.time()\n",
" \n",
"def end_the_time():\n",
" print(\"--- %s seconds ---\" % (time.time() - start_time))"
]
},
{
"cell_type": "markdown",
"id": "9becfc2a",
"metadata": {},
"source": [
"### Top 10 journals for numbers of papers"
]
},
{
"cell_type": "code",
"execution_count": 80,
"id": "8e98cd86",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"--- 0.0001361370086669922 seconds ---\n",
"['Morphometric MRI as a diagnostic biomarker of frontotemporal dementia: A systematic review to determine clinical applicability', 'Low adherence of Swiss children to national dietary guidelines', 'Decomposing broadcast algorithms using abstract MAC layers']\n"
]
}
],
"source": [
"start_the_time()\n",
"result = db[\"papers\"].find({ \n",
" \"authors.email\": {\"$regex\": \"@usi\\.ch\"}\n",
"}, {\n",
" 'title': 1\n",
"})\n",
"end_the_time()\n",
"\n",
"titles = [doc['title'] for doc in result]\n",
"print(titles)"
]
},
{
"cell_type": "markdown",
"id": "c0e9ad5e",
"metadata": {},
"source": [
"### Most 3 cited authors in 'Strategic info-mediaries'"
]
},
{
"cell_type": "code",
"execution_count": 101,
"id": "a8781d01",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"--- 0.302872896194458 seconds ---\n"
]
},
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>_id</th>\n",
" <th>referenceCount</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>Freshwater</td>\n",
" <td>12</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>Biodiversity</td>\n",
" <td>9</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>Marine</td>\n",
" <td>8</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>Climate change</td>\n",
" <td>8</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>Ecosystem-based management</td>\n",
" <td>7</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>Coastal</td>\n",
" <td>6</td>\n",
" </tr>\n",
" <tr>\n",
" <th>6</th>\n",
" <td>Eutrophication</td>\n",
" <td>5</td>\n",
" </tr>\n",
" <tr>\n",
" <th>7</th>\n",
" <td>Phosphorus</td>\n",
" <td>5</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <td>Policy</td>\n",
" <td>5</td>\n",
" </tr>\n",
" <tr>\n",
" <th>9</th>\n",
" <td>Agriculture</td>\n",
" <td>4</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" _id referenceCount\n",
"0 Freshwater 12\n",
"1 Biodiversity 9\n",
"2 Marine 8\n",
"3 Climate change 8\n",
"4 Ecosystem-based management 7\n",
"5 Coastal 6\n",
"6 Eutrophication 5\n",
"7 Phosphorus 5\n",
"8 Policy 5\n",
"9 Agriculture 4"
]
},
"execution_count": 101,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"pipeline = [\n",
" {\n",
" \"$match\": {\n",
" \"publicationDetails.journal\": \"Vertical e-markets\"\n",
" }\n",
" },\n",
" { \n",
" \"$unwind\": \"$authors\" \n",
" }, \n",
" { \n",
" \"$lookup\": {\n",
" \"from\": \"authors\",\n",
" \"localField\": \"authors.authorId\",\n",
" \"foreignField\": \"_id\",\n",
" \"as\": \"authors\"\n",
" }\n",
" },\n",
" {\n",
" \"$match\": {\n",
" \"authors.bio\": {\n",
" \"$regex\": \"[Ss]uccess\"\n",
" }\n",
" }\n",
" },\n",
" { \n",
" \"$unwind\": \"$keywords\" \n",
" },\n",
" { \n",
" \"$group\": {\n",
" \"_id\": \"$keywords\", \n",
" \"referenceCount\": { \n",
" \"$sum\": 1\n",
" } \n",
" } \n",
" },\n",
" {\n",
" \"$sort\": {\n",
" \"referenceCount\": -1\n",
" }\n",
" },\n",
" {\n",
" \"$limit\": 10\n",
" }\n",
"]\n",
"\n",
"start_the_time()\n",
"result = db[\"papers\"].aggregate(pipeline)\n",
"end_the_time()\n",
"\n",
"pd.DataFrame(result)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5fc8b56f",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"id": "39ae3826",
"metadata": {},
"source": [
"### Title"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "146fef1e",
"metadata": {},
"outputs": [],
"source": [
"pipeline = [\n",
" {\n",
" \"$match\": {\n",
" \"publicationDetails.journal\": \"Next-generation users\",\n",
" }\n",
" },\n",
" { \"$unwind\": \"$authors\" }, \n",
" { \n",
" \"$group\": {\n",
" \"_id\": \"$references.authors.name\", \n",
" \"referenceCount\": { \n",
" \"$sum\": 1\n",
" } \n",
" } \n",
" },\n",
" {\n",
" \"$sort\": {\n",
" \"referenceCount\": -1\n",
" }\n",
" },\n",
" {\n",
" \"$limit\": 3\n",
" }\n",
"]\n",
"\n",
"start_the_time()\n",
"result = db[\"papers\"].aggregate(pipeline)\n",
"end_the_time()\n",
"\n",
"pd.DataFrame(result)"
]
},
{
"cell_type": "code",
"execution_count": 48,
"id": "5c02ad39",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"--- 0.0010950565338134766 seconds ---\n"
]
}
],
"source": [
"new_journal = { \n",
" 'issn': '89012388',\n",
" 'name': 'Advanced Topics on Databases',\n",
" 'volumes': []\n",
"}\n",
"start_the_time()\n",
"new_journal_id = db[\"journals\"].insert_one(new_journal).inserted_id\n",
"end_the_time()"
]
},
{
"cell_type": "code",
"execution_count": 95,
"id": "2f128b04",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"--- 0.09116077423095703 seconds ---\n"
]
},
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>_id</th>\n",
" <th>paper_number</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>One-to-one content</td>\n",
" <td>744</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>Vertical e-markets</td>\n",
" <td>515</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>Bricks-and-clicks web-readiness</td>\n",
" <td>483</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>Plug-and-play web-readiness</td>\n",
" <td>361</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>Back-end partnerships</td>\n",
" <td>354</td>\n",
" </tr>\n",
" <tr>\n",
" <th>5</th>\n",
" <td>Next-generation users</td>\n",
" <td>334</td>\n",
" </tr>\n",
" <tr>\n",
" <th>6</th>\n",
" <td>Distributed mindshare</td>\n",
" <td>329</td>\n",
" </tr>\n",
" <tr>\n",
" <th>7</th>\n",
" <td>Enterprise e-services</td>\n",
" <td>281</td>\n",
" </tr>\n",
" <tr>\n",
" <th>8</th>\n",
" <td>Strategic info-mediaries</td>\n",
" <td>276</td>\n",
" </tr>\n",
" <tr>\n",
" <th>9</th>\n",
" <td>Clicks-and-mortar channels</td>\n",
" <td>271</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" _id paper_number\n",
"0 One-to-one content 744\n",
"1 Vertical e-markets 515\n",
"2 Bricks-and-clicks web-readiness 483\n",
"3 Plug-and-play web-readiness 361\n",
"4 Back-end partnerships 354\n",
"5 Next-generation users 334\n",
"6 Distributed mindshare 329\n",
"7 Enterprise e-services 281\n",
"8 Strategic info-mediaries 276\n",
"9 Clicks-and-mortar channels 271"
]
},
"execution_count": 95,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"pipeline = [{\n",
" \"$group\": {\n",
" \"_id\":\"$publicationDetails.journal\",\n",
" \"paper_number\":{\n",
" \"$sum\":1\n",
" }\n",
" }\n",
"},{\n",
" \"$sort\":{\n",
" \"paper_number\":-1\n",
" }\n",
"},{\n",
" \"$limit\":10\n",
"}]\n",
"\n",
"start_the_time()\n",
"result = db[\"papers\"].aggregate(pipeline)\n",
"end_the_time()\n",
"\n",
"pd.DataFrame(result)"
]
},
{
"cell_type": "code",
"execution_count": 132,
"id": "8f12712b",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"--- 1.3057661056518555 seconds ---\n"
]
},
{
"data": {
"text/plain": [
"'{\"_id\":{\"0\":{\"journal\":\"One-to-one content\",\"sectionTitle\":\"Introduction\"},\"1\":{\"journal\":\"One-to-one content\",\"sectionTitle\":\"Discussion\"},\"2\":{\"journal\":\"Vertical e-markets\",\"sectionTitle\":\"Introduction\"},\"3\":{\"journal\":\"One-to-one content\",\"sectionTitle\":\"Results\"},\"4\":{\"journal\":\"Bricks-and-clicks web-readiness\",\"sectionTitle\":\"Method details\"},\"5\":{\"journal\":\"Plug-and-play web-readiness\",\"sectionTitle\":\"Introduction\"},\"6\":{\"journal\":\"Back-end partnerships\",\"sectionTitle\":\"Introduction\"},\"7\":{\"journal\":\"Next-generation users\",\"sectionTitle\":\"Introduction\"},\"8\":{\"journal\":\"Plug-and-play web-readiness\",\"sectionTitle\":\"Discussion\"},\"9\":{\"journal\":\"Next-generation users\",\"sectionTitle\":\"Results\"}},\"sectionCount\":{\"0\":630,\"1\":512,\"2\":506,\"3\":503,\"4\":371,\"5\":353,\"6\":350,\"7\":332,\"8\":330,\"9\":322}}'"
]
},
"execution_count": 132,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"pipeline = [\n",
" { \n",
" \"$unwind\": \"$content\"\n",
" }, {\n",
" \"$group\": {\n",
" \"_id\": {\n",
" \"journal\": \"$publicationDetails.journal\",\n",
" \"sectionTitle\": \"$content.title\"\n",
" }, \n",
" \"sectionCount\": {\n",
" \"$sum\": 1\n",
" }\n",
" }\n",
" }, {\n",
" \"$sort\": {\n",
" \"sectionCount\": -1\n",
" }\n",
" }, {\n",
" \"$limit\":10\n",
" }\n",
"]\n",
"\n",
"start_the_time()\n",
"result = db[\"papers\"].aggregate(pipeline)\n",
"end_the_time()\n",
"\n",
"pd.DataFrame(result).to_json()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ecfd45d9",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.8"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View file

@ -101,12 +101,9 @@ def transform_section(sec: dict, figures: [dict], references: [dict]) -> dict:
content += arr
if len(content) > 0 and isinstance(content[-1], list) and len(content[-1]) == 0:
del content[-1]
return {
"title": sec["title"],
"content": content
@ -130,7 +127,6 @@ def json_to_paper(filename: str, jsonObj: dict) -> dict:
author = fake_author(get_author_name(author), email)
# TODO: authorID
authors.append({
"email": author["email"],
"name": author["name"],
@ -141,7 +137,7 @@ def json_to_paper(filename: str, jsonObj: dict) -> dict:
paper["keywords"] = getProp(jsonObj, "metadata.keywords")
publicationDetails = {}
publicationDetails["issn"] = getProp(jsonObj, "metadata.issn") # ISBN-like, not a name
publicationDetails["issn"] = getProp(jsonObj, "metadata.issn")
date = fake.date_object()
volume = getProp(jsonObj, "metadata.volume")
@ -243,7 +239,8 @@ def main():
d = json.JSONDecoder()
jsonObj = d.decode(jsonStr)
if getProp(jsonObj, "metadata.issn") is None or getProp(jsonObj, "metadata.doi") is None:
if getProp(jsonObj, "metadata.issn") is None or \
getProp(jsonObj, "metadata.doi") is None:
j += 1
continue # SKIP papers with no journal ISSN or paper DOI
@ -335,4 +332,4 @@ def main():
print("Journals updated with refs: ", i)
if __name__ == "__main__":
main()
main()

BIN
hw02/report.pdf Normal file

Binary file not shown.

1122
hw02/report.tex Normal file

File diff suppressed because it is too large Load diff

4
hw03/.gitignore vendored Normal file
View file

@ -0,0 +1,4 @@
csv-import/
.ipynb_checkpoints/
*.parquet/
*.tar.gz

30
hw03/commands.md Normal file
View file

@ -0,0 +1,30 @@
# How to generate dblp csvs
```shell
curl -o dblp.xml.gz https://dblp.org/xml/dblp.xml.gz
gunzip dblp.xml.gz
# download the DTD specification of the DBLP XML format
curl -o dblp.dtd https://dblp.org/xml/dblp.dtd
git clone https://github.com/ThomHurks/dblp-to-csv
dblp-to-csv/XMLToCSV.py --annotate dblp.xml dblp.dtd dblp_csv.csv \
--relations journal:article_journal author:article_author
for t in article; do
tr ';' '\n' <dblp_csv_${t}_header.csv | sed 's/:.*//g' | \
tr '\n' ';' | awk 1 | cat - dblp_csv_${t}.csv | \
sed -E 's/\{?\\""\}?/""/g' > csv-import/${t}.csv;
done
cp dblp_csv_{author|journal}_* dblp_csv_{author|journal}.csv csv-import
```
# Archive
The csv-import files are compressed. To decompress them run:
```shell
tar -xzvf csv-import.tar.gz
```

893
hw03/dblp_spark.ipynb Normal file
View file

@ -0,0 +1,893 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "e159835e",
"metadata": {},
"source": [
"## Basic setup"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "72909aec-742a-452b-a118-6c500790b96a",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"22/12/08 15:56:43 WARN Utils: Your hostname, martilo-Aspire-A315-42 resolves to a loopback address: 127.0.1.1; using 10.21.72.130 instead (on interface wlp4s0)\n",
"22/12/08 15:56:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"Setting default log level to \"WARN\".\n",
"To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"22/12/08 15:56:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n"
]
}
],
"source": [
"# Import the basic spark library\n",
"from pyspark.sql import SparkSession\n",
"\n",
"# Create an entry point to the PySpark Application\n",
"spark = SparkSession.builder \\\n",
" .master(\"local\") \\\n",
" .appName(\"MyFirstSparkApplication\") \\\n",
" .getOrCreate()\n",
"# master contains the URL of your remote spark instance or 'local'"
]
},
{
"cell_type": "markdown",
"id": "2c1c8f93",
"metadata": {},
"source": [
"## Schema definition and data import"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "44a02813",
"metadata": {},
"outputs": [],
"source": [
"# Schema definitions\n",
"from pyspark.sql.types import StructType, StructField, StringType, FloatType, ArrayType, IntegerType, DateType\n",
"\n",
"article_schema = StructType([ \\\n",
" StructField(\"ID\", IntegerType(), False), \\\n",
" StructField(\"Title\", StringType(), False), \\\n",
" StructField(\"Content\", StringType(), True), \\\n",
" StructField(\"MetadataDate\", DateType(), True), \\\n",
" StructField(\"JournalId\", IntegerType(), True), \\\n",
" StructField(\"Volume\", StringType(), True), \\\n",
" StructField(\"NumberInVolume\", StringType(), True), \\\n",
" StructField(\"PagesInVolume\", StringType(), True), \\\n",
" StructField(\"ObjectIds\", ArrayType(StringType()), True)\n",
"])\n",
"\n",
"article_author_schema = StructType([ \\\n",
" StructField(\"ArticleId\", IntegerType(), False), \\\n",
" StructField(\"AuthorId\", IntegerType(), False)\n",
"])\n",
"\n",
"\n",
"author_schema = StructType([ \\\n",
" StructField(\"ID\", IntegerType(), False), \\\n",
" StructField(\"Name\", StringType(), False), \\\n",
" StructField(\"Email\", StringType(), True), \\\n",
" StructField(\"Affiliation\", DateType(), True), \\\n",
" StructField(\"Bio\", IntegerType(), True)\n",
"])\n",
"\n",
"reference_schema = StructType([ \\\n",
" StructField(\"ID\", IntegerType(), False), \\\n",
" StructField(\"ArticleId\", IntegerType(), False), \\\n",
" StructField(\"RefNumber\", IntegerType(), True), \\\n",
" StructField(\"InternalPaperId\", IntegerType(), True), \\\n",
" StructField(\"Title\", StringType(), True), \\\n",
" StructField(\"Authors\", ArrayType(StringType()), True), \\\n",
" StructField(\"Journal\", StringType(), True), \\\n",
" StructField(\"JournalId\", IntegerType(), True), \\\n",
" StructField(\"Volume\", StringType(), True), \\\n",
" StructField(\"NumberInVolume\", StringType(), True)\n",
"])\n",
"\n",
"journal_schema = StructType([ \\\n",
" StructField(\"ID\", IntegerType(), False), \\\n",
" StructField(\"Name\", StringType(), False)\n",
"])"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "1b8bf1b1",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- ID: integer (nullable = false)\n",
" |-- Title: string (nullable = false)\n",
" |-- Content: string (nullable = true)\n",
" |-- MetadataDate: date (nullable = true)\n",
" |-- JournalId: integer (nullable = true)\n",
" |-- Volume: string (nullable = true)\n",
" |-- NumberInVolume: string (nullable = true)\n",
" |-- PagesInVolume: string (nullable = true)\n",
" |-- ObjectIds: array (nullable = true)\n",
" | |-- element: string (containsNull = true)\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"[Stage 0:> (0 + 1) / 1]\r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+---------+-------+------------+---------+------+--------------+-------------+---------+\n",
"| ID| Title|Content|MetadataDate|JournalId|Volume|NumberInVolume|PagesInVolume|ObjectIds|\n",
"+---+---------+-------+------------+---------+------+--------------+-------------+---------+\n",
"| 1|MyArticle| null| null| 1| null| null| null| null|\n",
"+---+---------+-------+------------+---------+------+--------------+-------------+---------+\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"mock_article_data = [(1, \"MyArticle\", None, None, 1, None, None, None, None)]\n",
"article_df = spark.createDataFrame(data = mock_article_data, schema = article_schema)\n",
"article_df.printSchema()\n",
"article_df.show(truncate=True)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "f1c04253",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- ArticleId: integer (nullable = false)\n",
" |-- AuthorId: integer (nullable = false)\n",
"\n",
"+---------+--------+\n",
"|ArticleId|AuthorId|\n",
"+---------+--------+\n",
"| 1| 1|\n",
"+---------+--------+\n",
"\n"
]
}
],
"source": [
"mock_article_author_data = [(1,1)]\n",
"article_author_df = spark.createDataFrame(data = mock_article_author_data, schema = article_author_schema)\n",
"article_author_df.printSchema()\n",
"article_author_df.show(truncate=True)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "946c48c8",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- ID: integer (nullable = false)\n",
" |-- Name: string (nullable = false)\n",
" |-- Email: string (nullable = true)\n",
" |-- Affiliation: date (nullable = true)\n",
" |-- Bio: integer (nullable = true)\n",
"\n",
"+---+--------+-----+-----------+----+\n",
"| ID| Name|Email|Affiliation| Bio|\n",
"+---+--------+-----+-----------+----+\n",
"| 1|MyAuthor| null| null|null|\n",
"+---+--------+-----+-----------+----+\n",
"\n"
]
}
],
"source": [
"mock_author_data = [(1, \"MyAuthor\", None, None, None)]\n",
"author_df = spark.createDataFrame(data = mock_author_data, schema = author_schema)\n",
"author_df.printSchema()\n",
"author_df.show(truncate=True)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "3409cb93",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- ID: integer (nullable = false)\n",
" |-- ArticleId: integer (nullable = false)\n",
" |-- RefNumber: integer (nullable = true)\n",
" |-- InternalPaperId: integer (nullable = true)\n",
" |-- Title: string (nullable = true)\n",
" |-- Authors: array (nullable = true)\n",
" | |-- element: string (containsNull = true)\n",
" |-- Journal: string (nullable = true)\n",
" |-- JournalId: integer (nullable = true)\n",
" |-- Volume: string (nullable = true)\n",
" |-- NumberInVolume: string (nullable = true)\n",
"\n",
"+---+---------+---------+---------------+--------+-------+-------+---------+------+--------------+\n",
"| ID|ArticleId|RefNumber|InternalPaperId| Title|Authors|Journal|JournalId|Volume|NumberInVolume|\n",
"+---+---------+---------+---------------+--------+-------+-------+---------+------+--------------+\n",
"| 1| 1| 1| null|RefTitle| null| null| null| null| null|\n",
"+---+---------+---------+---------------+--------+-------+-------+---------+------+--------------+\n",
"\n"
]
}
],
"source": [
"mock_reference_data = [(1,1,1,None, \"RefTitle\", None, None, None, None, None)]\n",
"reference_df = spark.createDataFrame(data = mock_reference_data, schema = reference_schema)\n",
"reference_df.printSchema()\n",
"reference_df.show(truncate=True)"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "cff65f33",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- ID: integer (nullable = false)\n",
" |-- Name: string (nullable = false)\n",
"\n",
"+---+---------+\n",
"| ID| Name|\n",
"+---+---------+\n",
"| 1|MyJournal|\n",
"+---+---------+\n",
"\n"
]
}
],
"source": [
"mock_journal_data = [(1,\"MyJournal\")]\n",
"journal_df = spark.createDataFrame(data = mock_journal_data, schema = journal_schema)\n",
"journal_df.printSchema()\n",
"journal_df.show(truncate=True)"
]
},
{
"cell_type": "markdown",
"id": "b0df92a6",
"metadata": {},
"source": [
"## Queries"
]
},
{
"cell_type": "markdown",
"id": "3e42653f",
"metadata": {},
"source": [
"### 5 data creation/update queries"
]
},
{
"cell_type": "markdown",
"id": "1fd2fb0d",
"metadata": {},
"source": [
"#### Insert a new article"
]
},
{
"cell_type": "code",
"execution_count": 22,
"id": "f7bacc0d",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+--------------+-------+------------+---------+------+--------------+-------------+---------+\n",
"| ID| Title|Content|MetadataDate|JournalId|Volume|NumberInVolume|PagesInVolume|ObjectIds|\n",
"+---+--------------+-------+------------+---------+------+--------------+-------------+---------+\n",
"| 1| MyArticle| null| null| 1| null| null| null| null|\n",
"| 2|MyOtherArticle| null| null| 1| null| null| null| null|\n",
"+---+--------------+-------+------------+---------+------+--------------+-------------+---------+\n",
"\n"
]
}
],
"source": [
"new_article_id = 2\n",
"new_article = spark.createDataFrame(data = [(new_article_id, \"MyOtherArticle\", None, None, 1, None, None, None, None)], schema=article_schema)\n",
"# check if the primary key is already present\n",
"temp_article = article_df.filter(article_df.ID == new_article_id)\n",
"if temp_article.isEmpty():\n",
" article_df = article_df.union(new_article)\n",
"\n",
"article_df.show(truncate=True)"
]
},
{
"cell_type": "markdown",
"id": "28c4d2c8",
"metadata": {},
"source": [
"#### Update the affiliation of an author"
]
},
{
"cell_type": "markdown",
"id": "108d3fa8",
"metadata": {},
"source": [
"### 10 queries with specified complexity"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4e69f492",
"metadata": {},
"outputs": [],
"source": [
"# join idea: remove duplicates"
]
},
{
"cell_type": "markdown",
"id": "466e112e",
"metadata": {},
"source": [
"# DO NOT EXECUTE FROM HERE ON!!!"
]
},
{
"cell_type": "markdown",
"id": "906fc50f-a3e1-4e13-b4e3-60a12abaabfc",
"metadata": {},
"source": [
"<h4>Data Upload</h4>"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "0f617ed1-5bce-4e2f-9e7b-5a89161792d9",
"metadata": {},
"outputs": [],
"source": [
"# Upload data from a list \n",
"data = [(\"Margherita\", 5.95, [\"Tomato Sauce\", \"Mozzarella Cheese\", \"Basil\"]),\n",
" (\"Calzone\", 7.95, [\"Tomato Sauce\", \"Mozzarella Cheese\", \"Prosciutto Cotto\"])]\n",
"\n",
"# Create an RDD\n",
"rdd = spark.sparkContext.parallelize(data)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "a4df7f8d-b127-4671-9f20-d8d6e2d2dcfd",
"metadata": {},
"outputs": [],
"source": [
"# Upload list from a file\n",
"rdd_2 = spark.sparkContext.textFile(\"menu.txt\")"
]
},
{
"cell_type": "markdown",
"id": "97f80f26-6a96-4da0-b1a8-787156ef7306",
"metadata": {},
"source": [
"<h4>Dataframe Creation</h4>"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "01ebb85c-13d4-4ab7-8f3f-f5e5534c6763",
"metadata": {},
"outputs": [],
"source": [
"# Create a Dataframe\n",
"df_data = [(\"Margherita\", 5.95, [\"Tomato Sauce\", \"Mozzarella Cheese\", \"Basil\"]),\n",
" (\"Calzone\", 7.95, [\"Tomato Sauce\", \"Mozzarella Cheese\", \"Prosciutto Cotto\"]),\n",
" (\"Diavola\", 5.95, [\"Tomato Sauce\", \"Mozzarella Cheese\", \"Spicy Salame\"]),\n",
" (\"Prosciutto\", 7.95, [\"Tomato Sauce\", \"Mozzarella Cheese\", \"Prosciutto Cotto\"]),\n",
" (\"Speck & Brie\", 7.95, [\"Tomato Sauce\", \"Mozzarella Cheese\", \"Speck\", \"Brie\"]),\n",
" (\"Tonno & Cipolle\", 7.95, [\"Tomato Sauce\", \"Mozzarella Cheese\", \"Tuna\", \"Onions\"]),\n",
" (\"Fries\", 3.95, [\"Potatoes\"])]\n",
" \n",
"columns = [\"Pizza Name\", \"Price\", \"Ingredients\"]\n",
"df = spark.createDataFrame(data = df_data, schema = columns)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "72e85e8b-cfa2-45a0-b88f-7cf3af1fa54e",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"[Stage 0:> (0 + 1) / 1]\r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---------------+-----+--------------------+\n",
"| Pizza Name|Price| Ingredients|\n",
"+---------------+-----+--------------------+\n",
"| Margherita| 5.95|[Tomato Sauce, Mo...|\n",
"| Calzone| 7.95|[Tomato Sauce, Mo...|\n",
"| Diavola| 5.95|[Tomato Sauce, Mo...|\n",
"| Prosciutto| 7.95|[Tomato Sauce, Mo...|\n",
"| Speck & Brie| 7.95|[Tomato Sauce, Mo...|\n",
"|Tonno & Cipolle| 7.95|[Tomato Sauce, Mo...|\n",
"| Fries| 3.95| [Potatoes]|\n",
"+---------------+-----+--------------------+\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"# Show the first 20 elements of a dataframe\n",
"df.show()"
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "c3db8f5c-fd71-4a78-9b94-09c2a2d3617f",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- Pizza Name: string (nullable = true)\n",
" |-- Price: string (nullable = true)\n",
" |-- Ingredients: string (nullable = true)\n",
"\n",
"+---------------+------+--------------------+\n",
"| Pizza Name| Price| Ingredients|\n",
"+---------------+------+--------------------+\n",
"| Margherita| 5.95| [\"Tomato Sauce\",...|\n",
"| Calzone| 7.95| [\"Tomato Sauce\",...|\n",
"| Diavola| 5.95| [\"Tomato Sauce\",...|\n",
"| Prosciutto| 7.95| [\"Tomato Sauce\",...|\n",
"| Speck & Brie| 7.95| [\"Tomato Sauce\",...|\n",
"|Tonno & Cipolle| 7.95| [\"Tomato Sauce\",...|\n",
"+---------------+------+--------------------+\n",
"\n"
]
}
],
"source": [
"# Load a DataFrame from a file: all types are strings\n",
"df = spark.read.option(\"header\", True).option(\"delimiter\", \";\").csv(\"menu_csv.txt\")\n",
"\n",
"# Print detected \n",
"df.printSchema()\n",
"\n",
"df.show()"
]
},
{
"cell_type": "markdown",
"id": "40df7837-91d6-4220-a23c-cfc04a71d790",
"metadata": {},
"source": [
"<h4>Dataframes from RDDs</h4>"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "b361785f-81cd-4039-91a0-1471891e816d",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- _1: string (nullable = true)\n",
" |-- _2: double (nullable = true)\n",
" |-- _3: array (nullable = true)\n",
" | |-- element: string (containsNull = true)\n",
"\n"
]
}
],
"source": [
"# Transform the RDD into a Dataframe\n",
"df_from_rdd = rdd.toDF()\n",
"\n",
"# Print the schema of the Dataframe\n",
"df_from_rdd.printSchema()"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "0755516e-a93f-40d7-8167-13c622ce6e83",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- Pizza Name: string (nullable = true)\n",
" |-- Price: double (nullable = true)\n",
" |-- Ingredients: array (nullable = true)\n",
" | |-- element: string (containsNull = true)\n",
"\n"
]
}
],
"source": [
"#Transform the RDD into a Dataframe, specifying the columns\n",
"columns = [\"Pizza Name\", \"Price\", \"Ingredients\"]\n",
"df_from_rdd = rdd.toDF(columns)\n",
"df_from_rdd.printSchema()"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "0faf942b-8160-4d82-8f4b-70321b01fe62",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- Pizza Name: string (nullable = true)\n",
" |-- Price: double (nullable = true)\n",
" |-- Ingredients: array (nullable = true)\n",
" | |-- element: string (containsNull = true)\n",
"\n"
]
}
],
"source": [
"df_2_from_rdd = spark.createDataFrame(rdd).toDF(*columns)\n",
"df_from_rdd.printSchema()"
]
},
{
"cell_type": "markdown",
"id": "418c4704-59d3-4f0f-9582-e2d9134ff1bf",
"metadata": {},
"source": [
"<h4>Custom Dataframe</h4>"
]
},
{
"cell_type": "code",
"execution_count": 13,
"id": "0ebfda87-f209-4aa2-b989-6cfd5aed57d0",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- Pizza Name: string (nullable = true)\n",
" |-- Price: float (nullable = true)\n",
" |-- Ingredients: array (nullable = true)\n",
" | |-- element: string (containsNull = true)\n",
"\n",
"+---------------+-----+---------------------------------------------------+\n",
"|Pizza Name |Price|Ingredients |\n",
"+---------------+-----+---------------------------------------------------+\n",
"|Margherita |5.95 |[Tomato Sauce, Mozzarella Cheese, Basil] |\n",
"|Calzone |7.95 |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|\n",
"|Diavola |5.95 |[Tomato Sauce, Mozzarella Cheese, Spicy Salame] |\n",
"|Prosciutto |7.95 |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|\n",
"|Speck & Brie |7.95 |[Tomato Sauce, Mozzarella Cheese, Speck, Brie] |\n",
"|Tonno & Cipolle|7.95 |[Tomato Sauce, Mozzarella Cheese, Tuna, Onions] |\n",
"|Fries |3.95 |[Potatoes] |\n",
"+---------------+-----+---------------------------------------------------+\n",
"\n"
]
}
],
"source": [
"from pyspark.sql.types import StructType, StructField, StringType, FloatType, ArrayType\n",
"\n",
"#Createe the schema using StructField(Name, Type, Nullable)\n",
"schema = StructType([ \\\n",
" StructField(\"Pizza Name\", StringType(), True), \\\n",
" StructField(\"Price\", FloatType(), True), \\\n",
" StructField(\"Ingredients\", ArrayType(StringType()), True) \\\n",
"])\n",
" \n",
"df = spark.createDataFrame(data = df_data, schema = schema)\n",
"df.printSchema()\n",
"df.show(truncate=False)"
]
},
{
"cell_type": "markdown",
"id": "6991a58a-c4d8-48a7-9b74-f20267254efb",
"metadata": {
"tags": []
},
"source": [
"<h4>Organizing Data</h4>"
]
},
{
"cell_type": "code",
"execution_count": 15,
"id": "741212ed-9671-4b45-abd1-dfd99021632f",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---------------+-----+---------------------------------------------------+\n",
"|Pizza Name |Price|Ingredients |\n",
"+---------------+-----+---------------------------------------------------+\n",
"|Fries |3.95 |[Potatoes] |\n",
"|Margherita |5.95 |[Tomato Sauce, Mozzarella Cheese, Basil] |\n",
"|Diavola |5.95 |[Tomato Sauce, Mozzarella Cheese, Spicy Salame] |\n",
"|Calzone |7.95 |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|\n",
"|Prosciutto |7.95 |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|\n",
"|Speck & Brie |7.95 |[Tomato Sauce, Mozzarella Cheese, Speck, Brie] |\n",
"|Tonno & Cipolle|7.95 |[Tomato Sauce, Mozzarella Cheese, Tuna, Onions] |\n",
"+---------------+-----+---------------------------------------------------+\n",
"\n"
]
}
],
"source": [
"# Sorting depending on the fields (default = ascending order)\n",
"df.sort(\"Price\").show(truncate = False)"
]
},
{
"cell_type": "code",
"execution_count": 16,
"id": "ec0f8118-a616-43e4-b501-2d7e0cce34a5",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---------------+-----+---------------------------------------------------+\n",
"|Pizza Name |Price|Ingredients |\n",
"+---------------+-----+---------------------------------------------------+\n",
"|Fries |3.95 |[Potatoes] |\n",
"|Diavola |5.95 |[Tomato Sauce, Mozzarella Cheese, Spicy Salame] |\n",
"|Margherita |5.95 |[Tomato Sauce, Mozzarella Cheese, Basil] |\n",
"|Calzone |7.95 |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|\n",
"|Prosciutto |7.95 |[Tomato Sauce, Mozzarella Cheese, Prosciutto Cotto]|\n",
"|Speck & Brie |7.95 |[Tomato Sauce, Mozzarella Cheese, Speck, Brie] |\n",
"|Tonno & Cipolle|7.95 |[Tomato Sauce, Mozzarella Cheese, Tuna, Onions] |\n",
"+---------------+-----+---------------------------------------------------+\n",
"\n"
]
}
],
"source": [
"from pyspark.sql.functions import col\n",
"# Sorting depending on the fields\n",
"df.sort(col(\"Price\"), col(\"Pizza Name\")).show(truncate = False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1857f478-be58-4acb-8235-99ffc5230879",
"metadata": {},
"outputs": [],
"source": [
"# Sorting using orderBy\n",
"df.orderBy(col(\"Price\"), col(\"Pizza Name\")).show(truncate = False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "821c2b65-e3ab-4fd9-8538-07d0bb081ca5",
"metadata": {},
"outputs": [],
"source": [
"# Expliciting the sorting (work the same with orderBy)\n",
"df.sort(col(\"Price\").asc(), col(\"Pizza Name\").desc()).show(truncate = False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "dc3e4b71-ac6c-4419-b26e-01c00e2a93c2",
"metadata": {},
"outputs": [],
"source": [
"# We could also use raw SQL\n",
"# No spoilers -> We'll see how to use it later on"
]
},
{
"cell_type": "markdown",
"id": "cc888070-3830-424b-be01-29dc552df799",
"metadata": {},
"source": [
"<h4>Explode Arrays in Individual Rows</h4>"
]
},
{
"cell_type": "code",
"execution_count": 17,
"id": "4f11d458-11b0-4dd9-b0bf-9707c599fdd2",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- Pizza Name: string (nullable = true)\n",
" |-- Price: float (nullable = true)\n",
" |-- col: string (nullable = true)\n",
"\n",
"+---------------+-----+-----------------+\n",
"|Pizza Name |Price|col |\n",
"+---------------+-----+-----------------+\n",
"|Margherita |5.95 |Tomato Sauce |\n",
"|Margherita |5.95 |Mozzarella Cheese|\n",
"|Margherita |5.95 |Basil |\n",
"|Calzone |7.95 |Tomato Sauce |\n",
"|Calzone |7.95 |Mozzarella Cheese|\n",
"|Calzone |7.95 |Prosciutto Cotto |\n",
"|Diavola |5.95 |Tomato Sauce |\n",
"|Diavola |5.95 |Mozzarella Cheese|\n",
"|Diavola |5.95 |Spicy Salame |\n",
"|Prosciutto |7.95 |Tomato Sauce |\n",
"|Prosciutto |7.95 |Mozzarella Cheese|\n",
"|Prosciutto |7.95 |Prosciutto Cotto |\n",
"|Speck & Brie |7.95 |Tomato Sauce |\n",
"|Speck & Brie |7.95 |Mozzarella Cheese|\n",
"|Speck & Brie |7.95 |Speck |\n",
"|Speck & Brie |7.95 |Brie |\n",
"|Tonno & Cipolle|7.95 |Tomato Sauce |\n",
"|Tonno & Cipolle|7.95 |Mozzarella Cheese|\n",
"|Tonno & Cipolle|7.95 |Tuna |\n",
"|Tonno & Cipolle|7.95 |Onions |\n",
"+---------------+-----+-----------------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"from pyspark.sql.functions import explode\n",
"\n",
"exploded_df = df.select(col(\"Pizza Name\"), df.Price, explode(df.Ingredients))\n",
"exploded_df.printSchema()\n",
"exploded_df.show(truncate = False)"
]
},
{
"cell_type": "code",
"execution_count": 18,
"id": "91dbc7d0-ee23-4c83-99be-4d1d15523f1a",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- Pizza Name: string (nullable = true)\n",
" |-- Price: float (nullable = true)\n",
" |-- Ingredient: string (nullable = true)\n",
"\n"
]
}
],
"source": [
"# How can we rename a column?\n",
"exploded_df = exploded_df.withColumnRenamed(\"col\", \"Ingredient\").printSchema()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.8"
},
"vscode": {
"interpreter": {
"hash": "916dbcbb3f70747c44a77c7bcd40155683ae19c65e1c03b4aa3499c5328201f1"
}
}
},
"nbformat": 4,
"nbformat_minor": 5
}

BIN
hw03/report.pdf Normal file

Binary file not shown.

2112
hw03/spark_import.ipynb Normal file

File diff suppressed because it is too large Load diff