This repository has been archived on 2022-12-21. You can view files and clone it, but cannot push or open issues or pull requests.
ddm/hw03/spark_import.ipynb

2113 lines
78 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "12cd7c2e",
"metadata": {},
"outputs": [],
"source": [
"# Import the basic spark library\n",
"from pyspark.sql import SparkSession\n",
"from pyspark.sql.types import StructType, StructField, StringType, FloatType, ArrayType, IntegerType, DateType\n",
"from pyspark.sql import functions as F\n",
"from pyspark.sql.types import StringType\n",
"from pyspark.sql.functions import explode, col, sum, avg, count, max, first, last\n",
"\n",
"from faker import Factory\n",
"import random\n",
"\n",
"from datetime import datetime\n",
"from time import time"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "77262795",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"Setting default log level to \"WARN\".\n",
"To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"22/12/18 17:40:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n"
]
}
],
"source": [
"# Create an entry point to the PySpark Application\n",
"spark = SparkSession.builder \\\n",
" .master(\"local\") \\\n",
" .appName(\"DDMProjectWorkPart3\") \\\n",
" .getOrCreate()"
]
},
{
"cell_type": "markdown",
"id": "db4391c6",
"metadata": {},
"source": [
"## Schema definition and faker setup"
]
},
{
"cell_type": "markdown",
"id": "ac9ceba7",
"metadata": {},
"source": [
"Here we define a series of Spark user-defined functions (or UDFs) to account for the missing fields in the DBLP dataset. We generate all these fields with the _Faker_ library, namely:\n",
"\n",
"- The `Content` field for article records;\n",
"- The `Bio`, `Email` and `Affiliation` fields for authors. Note that we generate affiliation using a pool of 500 random university names, generated by appending the string `\"University of \"` to a random town name (the same strategy used for Project Work Part 2).\n",
"\n",
"We define more UDFs for the references dataframe, however we declare them next to the references import code."
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "71ee8de0",
"metadata": {},
"outputs": [],
"source": [
"fake = Factory.create()\n",
"\n",
"# Article\n",
"fake_content_udf = F.udf(lambda: fake.paragraph(nb_sentences=25), StringType())\n",
"\n",
"# Author\n",
"fake_unis = []\n",
"for i in range(0, 500):\n",
" (_, _, loc, _, _) = fake.location_on_land()\n",
" fake_unis.append(\"University of \" + loc)\n",
"\n",
"fake_bio_udf = F.udf(lambda: fake.paragraph(nb_sentences=5), StringType())\n",
"fake_email_udf = F.udf(lambda: fake.ascii_email(), StringType())\n",
"fake_affiliation_udf = F.udf(lambda: fake_unis[random.randint(0, len(fake_unis) - 1)], StringType())"
]
},
{
"cell_type": "markdown",
"id": "4bfaafd5",
"metadata": {},
"source": [
"Here we define the schema for all dataframes we import. This is ostensibly for documentation, as the schema is automatically defined during the import process thanks to the `inferSchema` Spark CSV reader option. \n",
"\n",
"Note that `article` and `author` are in a N-to-N relation, with `article_author` acting as a join table. Additionally, N `article` rows may reference a `journal` and N `reference` rows may reference an `article`. "
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "1a4daf81",
"metadata": {},
"outputs": [],
"source": [
"article_schema = StructType([ \\\n",
" StructField(\"ID\", IntegerType(), False), \\\n",
" StructField(\"Title\", StringType(), False), \\\n",
" StructField(\"Content\", StringType(), True), \\\n",
" StructField(\"MetadataDate\", DateType(), True), \\\n",
" StructField(\"JournalId\", IntegerType(), True), \\\n",
" StructField(\"Volume\", StringType(), True), \\\n",
" StructField(\"NumberInVolume\", StringType(), True), \\\n",
" StructField(\"PagesInVolume\", StringType(), True), \\\n",
" StructField(\"ObjectIds\", ArrayType(StringType()), True)\n",
"])\n",
"\n",
"article_author_schema = StructType([ \\\n",
" StructField(\"ArticleId\", IntegerType(), False), \\\n",
" StructField(\"AuthorId\", IntegerType(), False)\n",
"])\n",
"\n",
"author_schema = StructType([ \\\n",
" StructField(\"ID\", IntegerType(), False), \\\n",
" StructField(\"Name\", StringType(), False), \\\n",
" StructField(\"Email\", StringType(), True), \\\n",
" StructField(\"Affiliation\", DateType(), True), \\\n",
" StructField(\"Bio\", IntegerType(), True)\n",
"])\n",
"\n",
"journal_schema = StructType([ \\\n",
" StructField(\"ID\", IntegerType(), False), \\\n",
" StructField(\"Name\", StringType(), False)\n",
"])\n",
"\n",
"reference_schema = StructType([ \\\n",
" StructField(\"ID\", IntegerType(), False), \\\n",
" StructField(\"ArticleId\", IntegerType(), False), \\\n",
" StructField(\"RefNumber\", IntegerType(), True), \\\n",
" StructField(\"InternalPaperId\", IntegerType(), True), \\\n",
" StructField(\"Title\", StringType(), True), \\\n",
" StructField(\"Authors\", ArrayType(StringType()), True), \\\n",
" StructField(\"Journal\", StringType(), True), \\\n",
" StructField(\"JournalId\", IntegerType(), True), \\\n",
" StructField(\"Volume\", StringType(), True), \\\n",
" StructField(\"NumberInVolume\", StringType(), True)\n",
"])"
]
},
{
"cell_type": "markdown",
"id": "c01b4f97",
"metadata": {},
"source": [
"## The DBLP dataset"
]
},
{
"cell_type": "markdown",
"id": "a9ac5372",
"metadata": {},
"source": [
"Like in Project Work Part 1, we choose to use the XML dataset of the DBLP scientific publication directory. We use the script in the [ThomHurks/dblp-to-csv](https://github.com/ThomHurks/dblp-to-csv) GitHub repo to generate CSV files from the downloaded XML dump. For the scope of this part we only consider the publications of type \"article\".\n",
"\n",
"We use the ``--relations`` option of the _dblp-to-csv_ script to extract author and journal names into separate CSV files. The script also generates IDs for all entities (articles, authors and journals), and two files akin to join tables in a relational model to combine article rows with the extracted author and journal rows.\n",
"\n",
"The following list of shell commands downloads the DBLP dataset, the _dblp-to-csv_ script and generates the CSV files according to the specification required by this notebook:\n",
"\n",
"```shell\n",
"curl -o dblp.xml.gz https://dblp.org/xml/dblp.xml.gz\n",
"gunzip dblp.xml.gz\n",
"\n",
"# download the DTD specification of the DBLP XML format\n",
"curl -o dblp.dtd https://dblp.org/xml/dblp.dtd\n",
"\n",
"git clone https://github.com/ThomHurks/dblp-to-csv\n",
"\n",
"mkdir csv-import\n",
"\n",
"dblp-to-csv/XMLToCSV.py --annotate dblp.xml dblp.dtd dblp_csv.csv \\\n",
" --relations journal:article_journal author:article_author\n",
"\n",
"for t in article; do\n",
" tr ';' '\\n' <dblp_csv_${t}_header.csv | sed 's/:.*//g' | \\\n",
" tr '\\n' ';' | awk 1 | cat - dblp_csv_${t}.csv | \\\n",
" sed -E 's/\\{?\\\\\"\"\\}?/\"\"/g' > csv-import/${t}.csv;\n",
"done\n",
"\n",
"cp dblp_csv_{author|journal}_* dblp_csv_{author|journal}.csv csv-import\n",
"```"
]
},
{
"cell_type": "markdown",
"id": "5cffc425",
"metadata": {},
"source": [
"## Data import and dataframes creation"
]
},
{
"cell_type": "markdown",
"id": "2564cf1e",
"metadata": {},
"source": [
"Here we create the `article` dataframe. To create it we mainly need to read the article CSV file and join it with the article-to-journal join table created by _dblp-to-csv_ script, which we can do while preserving 1NF since we know that an article may be present only in a single journal.\n",
"\n",
"For the scope of this assignment we decide to fix the number of articles to 10000, selecting them at random from the export."
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "674da4e3",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- ID: integer (nullable = false)\n",
" |-- Title: string (nullable = false)\n",
" |-- MetadataDate: timestamp (nullable = true)\n",
" |-- Volume: string (nullable = true)\n",
" |-- NumberInVolume: string (nullable = true)\n",
" |-- PagesInVolume: string (nullable = true)\n",
" |-- ObjectIds: array (nullable = true)\n",
" | |-- element: string (containsNull = false)\n",
" |-- JournalId: integer (nullable = true)\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"[Stage 8:==========================================> (5 + 1) / 7]\r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-------+--------------------+-------------------+--------------+--------------+-------------+--------------------+---------+\n",
"| ID| Title| MetadataDate| Volume|NumberInVolume|PagesInVolume| ObjectIds|JournalId|\n",
"+-------+--------------------+-------------------+--------------+--------------+-------------+--------------------+---------+\n",
"|7476125|Offline Drawing o...|2018-08-13 00:00:00|abs/1608.08385| null| null|[http://arxiv.org...| 12791182|\n",
"|7798842|Reasoning about S...|2020-09-02 00:00:00| 22| 2| 241-262|[https://doi.org/...| 12791281|\n",
"|9477159|System Dynamics M...|2021-10-14 00:00:00| 11| 5| 677|[https://doi.org/...| 12792549|\n",
"|7286983|Wireless Transmis...|2018-08-13 00:00:00|abs/1805.09923| null| null|[http://arxiv.org...| 12791182|\n",
"|7316024|Distributed Struc...|2018-08-13 00:00:00| abs/1207.1345| null| null|[http://arxiv.org...| 12791182|\n",
"|7863079|A Novel Human Aut...|2018-11-14 00:00:00| 12| 6| 7828-7854|[https://doi.org/...| 12791310|\n",
"|6720476|Biogenetic Temper...|2021-10-14 00:00:00| 11| 6| 735-737|[https://doi.org/...| 12790859|\n",
"|7912232|Routing of multip...|2020-04-02 00:00:00| 6| 9| 1617-1622|[https://doi.org/...| 12791324|\n",
"|9296802|The Third Version...|2021-10-14 00:00:00| 5| 4| null|[https://doi.org/...| 12792370|\n",
"|7021903|Demand for housin...|2020-09-17 00:00:00| 18| 1| 61-68|[https://doi.org/...| 12791062|\n",
"|8624192|Visualization of ...|2018-11-14 00:00:00| 18| 12| 2198-2207|[http://doi.ieeec...| 12791925|\n",
"|9028185|Network Topology ...|2020-05-20 00:00:00| 56| 10| 2262-2275|[https://doi.org/...| 12792144|\n",
"|8756232|Modelling, analys...|2020-07-03 00:00:00| 12| 3| 207-216|[https://doi.org/...| 12791998|\n",
"|7841131|Person re-identif...|2020-01-15 00:00:00| 97| null| null|[https://doi.org/...| 12791303|\n",
"|8571383|Social media and ...|2022-01-03 00:00:00| 39| null| 404-412|[https://doi.org/...| 12791876|\n",
"|9211872|Reclaiming Spare ...|2020-03-13 00:00:00| 2011| null| null|[https://doi.org/...| 12792282|\n",
"|7654415|Towards Automatic...|2019-10-02 00:00:00|abs/1909.13184| null| null|[http://arxiv.org...| 12791182|\n",
"|8165836|Smart Real Time A...|2021-02-26 00:00:00| 20| 3| 347-364|[https://doi.org/...| 12791533|\n",
"|8549734|Large-scale image...|2020-05-11 00:00:00| 79| 13-14| 9663|[https://doi.org/...| 12791859|\n",
"|7535983|On Coreset Constr...|2018-08-13 00:00:00|abs/1612.07516| null| null|[http://arxiv.org...| 12791182|\n",
"+-------+--------------------+-------------------+--------------+--------------+-------------+--------------------+---------+\n",
"only showing top 20 rows\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
"[Stage 8:==================================================> (6 + 1) / 7]\r",
"\r",
" \r"
]
}
],
"source": [
"articles_path = \"./csv-import/article.csv\"\n",
"articles_journals_path = \"./csv-import/dblp_csv_journal_article_journal.csv\"\n",
"\n",
"df_articles_csv = spark.read \\\n",
" .option(\"delimiter\", \";\") \\\n",
" .option(\"header\", True) \\\n",
" .csv(articles_path, inferSchema=True) \\\n",
" .select(\"id\", \"title\", \"mdate\", \"volume\", \"number\", \"pages\", \"ee\") \\\n",
" .where(\"id > 2\") \\\n",
" .withColumn('Content', fake_content_udf()) \\\n",
" .withColumnRenamed(\"id\", \"ID\") \\\n",
" .withColumnRenamed(\"title\", \"Title\") \\\n",
" .withColumnRenamed(\"mdate\", \"MetadataDate\") \\\n",
" .withColumnRenamed(\"volume\", \"Volume\") \\\n",
" .withColumnRenamed(\"number\", \"NumberInVolume\") \\\n",
" .withColumnRenamed(\"pages\", \"PagesInVolume\") \\\n",
" .withColumnRenamed(\"ee\", \"ObjectIds\") \\\n",
" .na.drop(subset=[\"ID\", \"Title\"]) \\\n",
" .withColumn(\"ID\", F.coalesce(F.col(\"ID\"), F.lit(0))) \\\n",
" .withColumn(\"Title\", F.coalesce(F.col(\"Title\"), F.lit(\"\"))) \\\n",
" .select(\"ID\", \"Title\", \"MetadataDate\", \"Volume\", \"NumberInVolume\", \"PagesInVolume\", \\\n",
" F.split(\"ObjectIds\", \"\\\\|\").alias(\"ObjectIds\"))\n",
"\n",
"# force ID and Title to be not-nullable, given the .na.drop coalescing never actually happens\n",
"# https://stackoverflow.com/a/58515908\n",
"\n",
"df_article_journals_csv = spark.read \\\n",
" .option(\"delimiter\", \";\") \\\n",
" .option(\"header\", True) \\\n",
" .csv(articles_journals_path, inferSchema=True) \\\n",
" .withColumnRenamed(\":END_ID\", \"JournalId\")\n",
"\n",
"df_articles = df_articles_csv \\\n",
" .join(df_article_journals_csv, \\\n",
" df_article_journals_csv[':START_ID'] == df_articles_csv.ID, 'left') \\\n",
" .drop(\":START_ID\") \\\n",
" .orderBy(F.rand()).limit(10000)\n",
" \n",
"df_articles.printSchema()\n",
"df_articles.show()"
]
},
{
"cell_type": "markdown",
"id": "921f9e8f",
"metadata": {},
"source": [
"Here we import the `article-to-author` CSV file as a dataframe. We filter its rows to make sure we only include the articles we selected at random from the previous step from performance reasons.\n",
"\n",
"Note the use of the `F.coalesce` function to coalesce `ArticleId` and `AuthorId` with 0. As we know that the columns are never null by analyzing the source data, this is simply a workaround to force the schema of the resulting DataFrame to mark these columns as not-null."
]
},
{
"cell_type": "code",
"execution_count": 100,
"id": "11172912",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- ArticleId: integer (nullable = false)\n",
" |-- AuthorId: integer (nullable = false)\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"[Stage 1578:> (0 + 1) / 1]\r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---------+--------+\n",
"|ArticleId|AuthorId|\n",
"+---------+--------+\n",
"| 6636860| 9567664|\n",
"| 6636860| 9570761|\n",
"| 7374783| 9573169|\n",
"| 7693829| 9601657|\n",
"| 6555847| 9608005|\n",
"| 7374783| 9626452|\n",
"| 7946825| 9631289|\n",
"| 8139540| 9636064|\n",
"| 7565583| 9636102|\n",
"| 9321630| 9644172|\n",
"| 7668271| 9650410|\n",
"| 7668271| 9651402|\n",
"| 7371049| 9658197|\n",
"| 8037829| 9665927|\n",
"| 8037829| 9667969|\n",
"| 8170953| 9671694|\n",
"| 6636860| 9672057|\n",
"| 6649339| 9673588|\n",
"| 7324396| 9674337|\n",
"| 8362140| 9674444|\n",
"+---------+--------+\n",
"only showing top 20 rows\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
" \r"
]
}
],
"source": [
"articles_authors_path = \"./csv-import/dblp_csv_author_article_author.csv\"\n",
"\n",
"df_article_ids = df_articles.select(F.col(\"ID\").alias(\"ArtId\"))\n",
"\n",
"df_articles_authors = spark.read \\\n",
" .option(\"delimiter\", \";\") \\\n",
" .option(\"header\", True) \\\n",
" .csv(articles_authors_path, inferSchema=True) \\\n",
" .withColumnRenamed(\":START_ID\", \"ArticleId\") \\\n",
" .withColumnRenamed(\":END_ID\", \"AuthorId\") \\\n",
" .withColumn(\"ArticleId\", F.coalesce(F.col(\"ArticleId\"), F.lit(0))) \\\n",
" .withColumn(\"AuthorId\", F.coalesce(F.col(\"AuthorId\"), F.lit(0))) \\\n",
" .join(df_article_ids, df_article_ids[\"ArtId\"] == F.col(\"ArticleId\"), 'inner') \\\n",
" .select(\"ArticleId\", \"AuthorId\") \\\n",
" .distinct()\n",
"\n",
"df_articles_authors.printSchema()\n",
"df_articles_authors.show()"
]
},
{
"cell_type": "markdown",
"id": "32906fe0",
"metadata": {},
"source": [
"Here we import the `authors`. We perform a similar filter process to the aforementioned one, dropping the authors that did not publish any article that we randomly selected for performance reasons."
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "abc8efd3",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- ID: integer (nullable = false)\n",
" |-- Name: string (nullable = false)\n",
" |-- Email: string (nullable = true)\n",
" |-- Affiliation: string (nullable = true)\n",
" |-- Bio: string (nullable = true)\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"[Stage 32:> (0 + 1) / 1]\r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-------+--------------------+--------------------+--------------------+--------------------+\n",
"| ID| Name| Email| Affiliation| Bio|\n",
"+-------+--------------------+--------------------+--------------------+--------------------+\n",
"|9566872| Adnan Abid|wjohnson@clark-mc...|University of Vernon|Already sell deci...|\n",
"|9567627| Roberto Rossi|hmontgomery@hotma...| University of Biu|Seven exist risk ...|\n",
"|9569592| Hannes Hartenstein|robertsbeth@ramir...|University of Losser|Card data with be...|\n",
"|9572255| Bülent Karasözen| joshua34@barber.net| University of Kulai|Ready effort duri...|\n",
"|9608965| Xiaowei Zhao|timothyvazquez@gm...|University of Par...|Build wall push c...|\n",
"|9638842| Li Yan|loganjohnson@gmai...|University of Artsyz|School eye about....|\n",
"|9641092| Karen Livescu| dyoung@hotmail.com|University of Eas...|Some up no. Follo...|\n",
"|9641772| Edna Dias Canedo| cindy01@rivera.com|University of Sas...|Consider parent l...|\n",
"|9652591| Petrik Clarberg|harristiffany@gil...|University of Lan...|Establish black m...|\n",
"|9653203| Jan Jerabek|zrodriguez@black-...| University of Tavda|Deep event hold w...|\n",
"|9666198|Mihaela van der S...|hvelasquez@hotmai...|University of Voz...|Study foot over a...|\n",
"|9666973| Esther Pacitti|edwardmelton@hotm...| University of Tonga|Represent respons...|\n",
"|9667290| Yannis Kotidis| paul36@yahoo.com|University of Glo...|Wish special guy....|\n",
"|9669347| Ove Frank| pmartin@yahoo.com|University of New...|Art other radio e...|\n",
"|9669369| Weldon A. Lodwick|hughesrodney@vill...|University of Bel...|Exactly relate se...|\n",
"|9676721| Glen Takahara|jenniferbaker@hot...|University of Bru...|Catch still follo...|\n",
"|9734805| Hibiki Tsukada|christinahensley@...|University of Aklera|Success economic ...|\n",
"|9749941| Tobias Gemaßmer|brownjohn@hotmail...|University of Agu...|Lot conference ca...|\n",
"|9819186| Tarak Nandy|kevinduncan@hotma...| University of Lille|Election environm...|\n",
"|9857863| Jun Zhang 0024|perezolivia@hotma...|University of Kan...|Chair pretty rece...|\n",
"+-------+--------------------+--------------------+--------------------+--------------------+\n",
"only showing top 20 rows\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
" \r"
]
}
],
"source": [
"authors_path = \"./csv-import/dblp_csv_author.csv\"\n",
"\n",
"df_author_ids = df_articles_authors.select(F.col(\"AuthorId\").alias(\"AutId\"))\n",
"\n",
"# More info on coalescing: https://youtu.be/ysPtBjY8o_A\n",
"df_authors = spark.read \\\n",
" .option(\"delimiter\", \";\") \\\n",
" .option(\"header\", True) \\\n",
" .csv(authors_path, inferSchema=True) \\\n",
" .withColumnRenamed(\":ID\", \"ID\") \\\n",
" .withColumnRenamed(\"author:string\", \"Name\") \\\n",
" .join(df_author_ids, df_author_ids[\"AutId\"] == F.col(\"ID\"), 'inner') \\\n",
" .select(\"ID\", \"Name\") \\\n",
" .distinct() \\\n",
" .withColumn(\"ID\", F.coalesce(F.col(\"ID\"), F.lit(0))) \\\n",
" .withColumn(\"Name\", F.coalesce(F.col(\"Name\"), F.lit(\"\"))) \\\n",
" .withColumn('Email', fake_email_udf()) \\\n",
" .withColumn('Affiliation', fake_affiliation_udf()) \\\n",
" .withColumn('Bio', fake_bio_udf())\n",
"\n",
"df_authors.printSchema()\n",
"df_authors.show()"
]
},
{
"cell_type": "markdown",
"id": "ab9952a7",
"metadata": {},
"source": [
"Finally, here we import the `journals`. Note that here we actually import journals that may have no article in them out of the randomly selected articles. As this does not affect the queries or performance, we simply ignore this."
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "6fa14e0d",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- ID: integer (nullable = false)\n",
" |-- Name: string (nullable = false)\n",
"\n",
"+--------+--------------------+\n",
"| ID| Name|\n",
"+--------+--------------------+\n",
"|12790675| World Wide Web|\n",
"|12790676| SIGMOD Rec.|\n",
"|12790677| SIGMOD Record|\n",
"|12790678|EAI Endorsed Tran...|\n",
"|12790679|Int. J. Trust. Ma...|\n",
"|12790680|Expert Syst. Appl. X|\n",
"|12790681|Bull. dInformatiq...|\n",
"|12790682|Trans. Large Scal...|\n",
"|12790683| Stud. Inform. Univ.|\n",
"|12790684|IEEE Intell. Tran...|\n",
"|12790685| Control. Cybern.|\n",
"|12790686|Syst. Control. Lett.|\n",
"|12790687|Sci. Comput. Prog...|\n",
"|12790688|Found. Comput. Math.|\n",
"|12790689| Theor. Comput. Sci.|\n",
"|12790690| Inf. Technol. Dev.|\n",
"|12790691|EURASIP J. Adv. S...|\n",
"|12790692|IEEE Embed. Syst....|\n",
"|12790693|J. Assoc. Inf. Syst.|\n",
"|12790694|EAI Endorsed Tran...|\n",
"+--------+--------------------+\n",
"only showing top 20 rows\n",
"\n"
]
}
],
"source": [
"journals_path = \"./csv-import/dblp_csv_journal.csv\"\n",
"\n",
"df_journals = spark.read \\\n",
" .option(\"delimiter\", \";\") \\\n",
" .option(\"header\", True) \\\n",
" .csv(journals_path, inferSchema=True) \\\n",
" .withColumnRenamed(\":ID\", \"ID\") \\\n",
" .withColumnRenamed(\"journal:string\", \"Name\") \\\n",
" .withColumn(\"ID\", F.coalesce(F.col(\"ID\"), F.lit(0))) \\\n",
" .withColumn(\"Name\", F.coalesce(F.col(\"Name\"), F.lit(\"\")))\n",
"\n",
"df_journals.printSchema()\n",
"df_journals.show()"
]
},
{
"cell_type": "markdown",
"id": "4ed3ec8c",
"metadata": {},
"source": [
"The DBLP XML dataset has no `reference` data, therefore we generate our own with the aid of UDF functions using Faker. For each article we generate 1 to 15 random references with may either point to an article in our dataset or to an external article."
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "f2c17a35",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- RefNumber: integer (nullable = true)\n",
" |-- InternalPaperId: integer (nullable = true)\n",
" |-- Title: string (nullable = true)\n",
" |-- Authors: array (nullable = true)\n",
" | |-- element: string (containsNull = true)\n",
" |-- Journal: string (nullable = true)\n",
" |-- JournalId: integer (nullable = true)\n",
" |-- Volume: string (nullable = true)\n",
" |-- NumberInVolume: string (nullable = true)\n",
" |-- ArticleId: integer (nullable = false)\n",
" |-- ID: long (nullable = false)\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"[Stage 52:> (0 + 1) / 1]\r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---------+---------------+--------------------+--------------------+--------------------+---------+--------------+--------------+---------+---+\n",
"|RefNumber|InternalPaperId| Title| Authors| Journal|JournalId| Volume|NumberInVolume|ArticleId| ID|\n",
"+---------+---------------+--------------------+--------------------+--------------------+---------+--------------+--------------+---------+---+\n",
"| 1| 7490681|UNIFUZZ: A Holist...|[Chenyang Lyu, Ch...| CoRR| 12791182|abs/2010.01785| null| 7318490| 0|\n",
"| 2| null|Customer-focused ...| [Steven Gordon]|generate compelli...| null| 9| 85| 7318490| 1|\n",
"| 3| 8969478|Total Variation F...|[Ahmed B. Altamim...| IEEE Access| 12792132| 8| null| 7318490| 2|\n",
"| 4| 7156412|Enhancing the Stu...|[Antoni Lluís Mes...| IEEE Trans. Educ.| 12791148| 64| 1| 7318490| 3|\n",
"| 5| null|Secured zero admi...| [Dwayne Lane]|facilitate effici...| null| 169| 99| 7318490| 4|\n",
"| 6| null|Inverse attitude-...|[Matthew Gill, Ni...|harness web-enabl...| null| 224| 123| 7318490| 5|\n",
"| 1| 8255791|On the Calculatio...|[R. L. Smelianski...| Inf. Process. Lett.| 12791629| 22| 5| 8067977| 6|\n",
"| 2| 8159350|A high-order fuzz...| [Mu-Yen Chen]|Future Gener. Com...| 12791529| 37| null| 8067977| 7|\n",
"| 3| 6553754|Attitude Estimati...|[Byungjin Lee, Ju...|IEEE Trans. Instr...| 12790705| 70| null| 8067977| 8|\n",
"| 4| null|Configurable 24/7...|[Joan Lambert, Li...|re-intermediate l...| null| 499| 167| 8067977| 9|\n",
"| 5| 6849169|Automatic evaluat...|[Akinori Ito, Mas...|Comput. Speech Lang.| 12790941| 28| 2| 8067977| 10|\n",
"| 6| 8070896|Acquiring and App...|[Debbie Richards ...|J. Inf. Knowl. Ma...| 12791460| 2| 2| 8067977| 11|\n",
"| 7| 7330047|Stochastic Optimi...| [Rong Jin 0001]| CoRR| 12791182| abs/1312.0048| null| 8067977| 12|\n",
"| 8| 9508705| SIGCOMM news.| [Erich M. Nahum]|Comput. Commun. Rev.| 12792585| 36| 5| 8067977| 13|\n",
"| 9| 6509187|Stabilizing unsta...|[Ahmet Cetinkaya,...|Syst. Control. Lett.| 12790686| 113| null| 8067977| 14|\n",
"| 1| null|Progressive full-...|[Casey Harris, An...|target distribute...| null| 312| 39| 7289500| 15|\n",
"| 2| null|Distributed multi...|[Stephen Oconnor,...|incentivize dot-c...| null| 58| 7| 7289500| 16|\n",
"| 1| 8245635|Investigation of ...|[Li Li 0013, Qi W...|J. Intell. Transp...| 12791625| 18| 3| 9151933| 17|\n",
"| 2| 7846928|First British Com...| null| Comput. J.| 12791305| 1| 3| 9151933| 18|\n",
"| 3| 8521523|Neuro-fuzzy archi...|[Bogdan M. Wilamo...|IEEE Trans. Ind. ...| 12791844| 46| 6| 9151933| 19|\n",
"+---------+---------------+--------------------+--------------------+--------------------+---------+--------------+--------------+---------+---+\n",
"only showing top 20 rows\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
" \r"
]
}
],
"source": [
"reference_to_explode_schema = ArrayType(StructType([ \\\n",
" StructField(\"RefNumber\", IntegerType(), True), \\\n",
" StructField(\"InternalPaperId\", IntegerType(), True), \\\n",
" StructField(\"Title\", StringType(), True), \\\n",
" StructField(\"Authors\", ArrayType(StringType()), True), \\\n",
" StructField(\"Journal\", StringType(), True), \\\n",
" StructField(\"JournalId\", IntegerType(), True), \\\n",
" StructField(\"Volume\", StringType(), True), \\\n",
" StructField(\"NumberInVolume\", StringType(), True)\n",
"]))\n",
"\n",
"# Pool of articles to randomly cite\n",
"# We can't select them ad-hoc in the UDF function as DataFrames cannot be pickled\n",
"# See: https://stackoverflow.com/q/47249292\n",
"articles: list[dict] = spark.read \\\n",
" .option(\"delimiter\", \";\") \\\n",
" .option(\"header\", True) \\\n",
" .csv(articles_path, inferSchema=True) \\\n",
" .alias(\"art1\") \\\n",
" .join(df_articles, F.col(\"art1.ID\") == df_articles_csv[\"ID\"], 'inner') \\\n",
" .select(\"art1.*\") \\\n",
" .select(\"id\", \"title\", \"volume\", \"number\", \"pages\", \"journal\", \n",
" F.split(F.col(\"author\"), '\\|', -1).alias('author')) \\\n",
" .withColumn(\"id\", F.coalesce(F.col(\"id\"), F.lit(0))) \\\n",
" .withColumn(\"title\", F.coalesce(F.col(\"title\"), F.lit(\"\"))) \\\n",
" .alias(\"art2\") \\\n",
" .join(df_article_journals_csv, df_article_journals_csv[':START_ID'] == F.col(\"art2.id\"), 'left') \\\n",
" .drop(\":START_ID\") \\\n",
" .collect() \n",
"\n",
"def internal_fake_reference(ref: int): \n",
" # Select Random article\n",
" article: dict = articles[random.randint(0, len(articles) - 1)]\n",
" \n",
" return {\n",
" \"RefNumber\": ref,\n",
" \"InternalPaperId\": article[\"id\"],\n",
" \"Title\": article[\"title\"],\n",
" \"Authors\": article[\"author\"],\n",
" \"Journal\": article[\"journal\"],\n",
" \"JournalId\": article[\"JournalId\"],\n",
" \"Volume\": article[\"volume\"],\n",
" \"NumberInVolume\": article[\"number\"]\n",
" }\n",
"\n",
"def external_fake_reference(ref: int):\n",
" return {\n",
" \"RefNumber\": ref,\n",
" \"InternalPaperId\": None,\n",
" \"Title\": fake.catch_phrase() + \": \" + fake.catch_phrase(),\n",
" \"Authors\": [fake.name() for x in range(random.randint(1, 6))],\n",
" \"Journal\": fake.bs(),\n",
" \"JournalId\": None,\n",
" \"Volume\": random.randint(1, 500),\n",
" \"NumberInVolume\": random.randint(1, 200)\n",
" }\n",
"\n",
"def rand_bool() -> bool: \n",
" return bool(random.getrandbits(1))\n",
"\n",
"def rand_references(min_count: int, max_count: int) -> list[dict]:\n",
" return [internal_fake_reference(x + 1) if rand_bool() else external_fake_reference(x + 1) \\\n",
" for x in range(random.randint(min_count, max_count))]\n",
"\n",
"fake_references_udf = F.udf(lambda : rand_references(1, 15), reference_to_explode_schema)\n",
"\n",
"df_references = df_articles \\\n",
" .withColumn(\"explode\", fake_references_udf()) \\\n",
" .withColumnRenamed(\"ID\", \"ArticleId\") \\\n",
" .select(\"ArticleId\", F.explode(\"explode\").alias(\"explode\")) \\\n",
" .select(\"explode.*\", \"ArticleId\") \\\n",
" .withColumn(\"ID\", F.monotonically_increasing_id())\n",
"\n",
"df_references.printSchema()\n",
"df_references.show()"
]
},
{
"cell_type": "markdown",
"id": "5818c7f8",
"metadata": {},
"source": [
"We now print a short overview of the size of all datasets.\n",
"\n",
"Finally, after the import process we save all the dataframes to a _Parquet_ file in order to cache the operations we made until now. Saving and reloading the dataframes also solves a known Spark bug related to re-computing UDFs, which would be problematic since our UDF outputs are random. More info here: https://stackoverflow.com/questions/40320563/"
]
},
{
"cell_type": "code",
"execution_count": 125,
"id": "33ba4e4e",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Articles 10000\n",
"Authors 30643\n",
"Journals 1955\n",
"References 79752\n"
]
}
],
"source": [
"print(\"Articles\", df_articles.count())\n",
"print(\"Authors\", df_authors.count())\n",
"print(\"Journals\", df_journals.count())\n",
"print(\"References\", df_references.count())\n",
"\n",
"df_articles.write.mode('overwrite').parquet('df_articles.parquet')\n",
"df_articles_authors.write.mode('overwrite').parquet('df_articles_authors.parquet')\n",
"df_authors.write.mode('overwrite').parquet('df_authors.parquet')\n",
"df_journals.write.mode('overwrite').parquet('df_journals.parquet')\n",
"df_references.write.mode('overwrite').parquet('df_references.parquet')"
]
},
{
"cell_type": "markdown",
"id": "0f42bb00",
"metadata": {},
"source": [
"## Query"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "443e1ba6",
"metadata": {},
"outputs": [],
"source": [
"df_articles = spark.read.parquet('df_articles.parquet')\n",
"df_articles_authors = spark.read.parquet('df_articles_authors.parquet')\n",
"df_authors = spark.read.parquet('df_authors.parquet')\n",
"df_journals = spark.read.parquet('df_journals.parquet')\n",
"df_references = spark.read.parquet('df_references.parquet')"
]
},
{
"cell_type": "markdown",
"id": "e934ea72",
"metadata": {},
"source": [
"#### Actual schemas for the queries"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "d00bef3c",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- ID: integer (nullable = true)\n",
" |-- Title: string (nullable = true)\n",
" |-- MetadataDate: timestamp (nullable = true)\n",
" |-- Volume: string (nullable = true)\n",
" |-- NumberInVolume: string (nullable = true)\n",
" |-- PagesInVolume: string (nullable = true)\n",
" |-- ObjectIds: array (nullable = true)\n",
" | |-- element: string (containsNull = true)\n",
" |-- JournalId: integer (nullable = true)\n",
"\n",
"root\n",
" |-- ArticleId: integer (nullable = true)\n",
" |-- AuthorId: integer (nullable = true)\n",
"\n",
"root\n",
" |-- ID: integer (nullable = true)\n",
" |-- Name: string (nullable = true)\n",
" |-- Email: string (nullable = true)\n",
" |-- Affiliation: string (nullable = true)\n",
" |-- Bio: string (nullable = true)\n",
"\n",
"root\n",
" |-- ID: integer (nullable = true)\n",
" |-- Name: string (nullable = true)\n",
"\n",
"root\n",
" |-- RefNumber: integer (nullable = true)\n",
" |-- InternalPaperId: integer (nullable = true)\n",
" |-- Title: string (nullable = true)\n",
" |-- Authors: array (nullable = true)\n",
" | |-- element: string (containsNull = true)\n",
" |-- Journal: string (nullable = true)\n",
" |-- JournalId: integer (nullable = true)\n",
" |-- Volume: string (nullable = true)\n",
" |-- NumberInVolume: string (nullable = true)\n",
" |-- ArticleId: integer (nullable = true)\n",
" |-- ID: long (nullable = true)\n",
"\n"
]
}
],
"source": [
"df_articles.printSchema()\n",
"df_articles_authors.printSchema()\n",
"df_authors.printSchema()\n",
"df_journals.printSchema()\n",
"df_references.printSchema()"
]
},
{
"cell_type": "markdown",
"id": "1b2ca94d",
"metadata": {},
"source": [
"### 5 data creation/update queries"
]
},
{
"cell_type": "markdown",
"id": "2c10d20f",
"metadata": {},
"source": [
"#### 01. Insert a new article in the relative DataFrame"
]
},
{
"cell_type": "markdown",
"id": "b4a0996e",
"metadata": {},
"source": [
"We extract the article with the max id in order to guarantee that the inserted id is unique because computed as maximum plus one. We compute the execution time in two cases: the first one considers the case in which the id may not be unique and does not insert the article in that case, but the presence of the id in the database is to be checked; the second one just inserts the new article, assuming the id is unique."
]
},
{
"cell_type": "code",
"execution_count": 13,
"id": "33b976cd",
"metadata": {},
"outputs": [],
"source": [
"new_article_id = df_articles.groupBy().max(\"ID\").collect()[0][0] + 1\n",
"new_article_journal_id = df_journals.filter(df_journals.Name == \"World Wide Web\").select(\"ID\").collect()[0][0]\n",
"df_new_article = spark.createDataFrame(data = [(new_article_id, \"On The Origin Of Species\", None, new_article_journal_id, None, None, None, None)], schema=df_articles.schema)"
]
},
{
"cell_type": "code",
"execution_count": 14,
"id": "5a53dbc8",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Article inserted\n",
"Execution time: 0.07419419288635254\n"
]
}
],
"source": [
"start_time = time()\n",
"# check if the primary key is already present\n",
"temp_article = df_articles.filter(df_articles.ID == new_article_id)\n",
"if temp_article.isEmpty():\n",
" df_articles = df_articles.union(df_new_article)\n",
" print(\"Article inserted\")\n",
"else:\n",
" print(\"Article id already present\")\n",
"\n",
"print(f\"Execution time: {time() - start_time}\")"
]
},
{
"cell_type": "code",
"execution_count": 15,
"id": "75098c0c",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
"[Stage 94:===================> (1 + 1) / 3]\r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Execution time: 1.206272840499878\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
"[Stage 94:======================================> (2 + 1) / 3]\r",
"\r",
" \r"
]
}
],
"source": [
"start_time = time()\n",
"\n",
"df_articles = df_articles.union(df_new_article)\n",
"df_articles.collect()\n",
"\n",
"print(f\"Execution time: {time() - start_time}\")"
]
},
{
"cell_type": "markdown",
"id": "99786ab0",
"metadata": {},
"source": [
"#### 02. Insert a reference as an article in the database"
]
},
{
"cell_type": "markdown",
"id": "5d34807e",
"metadata": {},
"source": [
"This query consists of three steps: \n",
"\n",
"- the first one is to acquire a single reference which has no internal reference\n",
"- the second one is to create the new article DataFrame from the reference data\n",
"- the third one is to insert the new article in the DataFrame through the union"
]
},
{
"cell_type": "code",
"execution_count": 16,
"id": "a447542c",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Row(RefNumber=1, InternalPaperId=None, Title='Customer-focused hybrid alliance: Balanced web-enabled instruction set', Authors=['Steven Gordon', 'Sarah Castillo MD', 'Margaret Saunders', 'Douglas Cummings'], Journal='empower ubiquitous vortals', JournalId=None, Volume='63', NumberInVolume='86', ArticleId=7318490, ID=0)\n",
"Execution time: 0.11755108833312988\n"
]
}
],
"source": [
"start_time = time()\n",
"\n",
"source_reference = df_references.filter(df_references.InternalPaperId.isNull()).limit(1).collect()[0]\n",
"print(source_reference)\n",
"\n",
"print(f\"Execution time: {time() - start_time}\")"
]
},
{
"cell_type": "code",
"execution_count": 17,
"id": "e09dea48",
"metadata": {},
"outputs": [],
"source": [
"new_article_id = df_articles.groupBy().max(\"ID\").collect()[0][0] + 1\n",
"\n",
"data = [(new_article_id,\n",
" source_reference[\"Title\"],\n",
" datetime.now(),\n",
" source_reference[\"Volume\"],\n",
" source_reference[\"NumberInVolume\"],\n",
" None,\n",
" None,\n",
" source_reference[\"JournalId\"]\n",
")]\n",
"\n",
"df_new_article = spark.createDataFrame(data = data, schema=df_articles.schema)"
]
},
{
"cell_type": "code",
"execution_count": 18,
"id": "67e7fa81",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Execution time: 0.42409396171569824\n"
]
}
],
"source": [
"start_time = time()\n",
"\n",
"df_articles = df_articles.union(df_new_article)\n",
"df_articles.collect()\n",
"\n",
"print(f\"Execution time: {time() - start_time}\")"
]
},
{
"cell_type": "markdown",
"id": "e31065dd",
"metadata": {},
"source": [
"#### 03. Update the metadata version of the articles of a given journal"
]
},
{
"cell_type": "markdown",
"id": "392ddc00",
"metadata": {},
"source": [
"This query updates the metadata version of all the articles published on a given journal to the current date. In order to do that, the following steps are performed:\n",
"\n",
"- Retrieve a journal name from the database\n",
"- Retrieve the articles related to the journal\n",
"- Creation of the new DataFrame with the articles' metadata version updated\n",
"- Replacement of the old articles with the new ones in the DataFrame (subtract + union)"
]
},
{
"cell_type": "code",
"execution_count": 19,
"id": "7dad6b5b",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'World Wide Web'"
]
},
"execution_count": 19,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"journal_name = df_journals.limit(10).collect()[0][\"Name\"]\n",
"journal_name"
]
},
{
"cell_type": "code",
"execution_count": 20,
"id": "1c40f02a",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Execution time: 0.4636058807373047\n",
"[Row(ID=7154398, Title='Direction-based spatial skyline for retrieving surrounding objects.', MetadataDate=datetime.datetime(2021, 7, 25, 0, 0), Volume='23', NumberInVolume='1', PagesInVolume='207-239', ObjectIds=['https://doi.org/10.1007/s11280-019-00694-w'], JournalId=12790675), Row(ID=7154450, Title='A threat recognition solution of edge data security in industrial internet.', MetadataDate=datetime.datetime(2022, 10, 18, 0, 0), Volume='25', NumberInVolume='5', PagesInVolume='2109-2138', ObjectIds=['https://doi.org/10.1007/s11280-022-01054-x'], JournalId=12790675), Row(ID=7153948, Title='Prediction of Web Page Accesses by Proxy Server Log.', MetadataDate=datetime.datetime(2017, 5, 20, 0, 0), Volume='5', NumberInVolume='1', PagesInVolume='67-88', ObjectIds=['https://doi.org/10.1023/A:1015750423727'], JournalId=12790675), Row(ID=7154603, Title='Enhancing decision-making in user-centered web development: a methodology for card-sorting analysis.', MetadataDate=datetime.datetime(2022, 5, 13, 0, 0), Volume='24', NumberInVolume='6', PagesInVolume='2099-2137', ObjectIds=['https://doi.org/10.1007/s11280-021-00950-y'], JournalId=12790675)]\n"
]
}
],
"source": [
"# find the articles to be updated\n",
"start_time = time()\n",
"\n",
"df_articles_to_update = df_articles\\\n",
" .join(df_journals, df_articles.JournalId == df_journals.ID)\\\n",
" .filter(df_journals.Name == journal_name)\\\n",
" .select(\n",
" df_articles.ID,\n",
" df_articles.Title,\n",
" df_articles.MetadataDate,\n",
" df_articles.Volume,\n",
" df_articles.NumberInVolume,\n",
" df_articles.PagesInVolume,\n",
" df_articles.ObjectIds,\n",
" df_articles.JournalId\n",
" )\n",
" \n",
"articles_to_update = df_articles_to_update.collect()\n",
"\n",
"print(f\"Execution time: {time() - start_time}\")\n",
"\n",
"print(articles_to_update)"
]
},
{
"cell_type": "code",
"execution_count": 21,
"id": "82a1b765",
"metadata": {},
"outputs": [],
"source": [
"# preparation of the data\n",
"\n",
"data: list[tuple[int, str, datetime, str, str, str, list[str], int]] = []\n",
"for article in articles_to_update:\n",
" data.append((\n",
" article[\"ID\"],\n",
" article[\"Title\"],\n",
" datetime.now(),\n",
" article[\"Volume\"],\n",
" article[\"NumberInVolume\"],\n",
" article[\"PagesInVolume\"],\n",
" article[\"ObjectIds\"],\n",
" article[\"JournalId\"]\n",
" ))\n",
"\n",
"df_updated_articles = spark.createDataFrame(data = data, schema=df_articles.schema)"
]
},
{
"cell_type": "code",
"execution_count": 22,
"id": "9f624e9b",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Execution time: 1.5061960220336914\n"
]
}
],
"source": [
"start_time = time()\n",
"\n",
"# remove the old articles and add the new ones\n",
"df_articles = df_articles.subtract(df_articles_to_update)\n",
"df_articles = df_articles.union(df_updated_articles)\n",
"\n",
"df_articles.collect()\n",
"\n",
"print(f\"Execution time: {time() - start_time}\")"
]
},
{
"cell_type": "markdown",
"id": "7354c188",
"metadata": {},
"source": [
"#### 04. Change the affiliation of an author"
]
},
{
"cell_type": "markdown",
"id": "4e1ab7ca",
"metadata": {},
"source": [
"We take a random author to change his/her affiliation to USI. First the author is retrieved, then the dataframe of the new author is created with the modified data according to the new affiliation. Working with Spark, in order to change a single value in a column we need to create a new dataframe without the author and then create another dataframe as its union with the dataframe of the modified author. This is because the DataFrame structure of Spark is immutable."
]
},
{
"cell_type": "code",
"execution_count": 23,
"id": "62404936",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Before: Author Adnan Abid with affiliation University of Voznesensk\n",
"After: Author Adnan Abid with affiliation USI\n"
]
}
],
"source": [
"author = df_authors.limit(1).collect()[0]\n",
"print(\"Before: Author\", author[\"Name\"], \"with affiliation\", author[\"Affiliation\"])\n",
"\n",
"data_new_author: list[tuple[int, str, str, str, str]] = []\n",
"data_new_author.append((author[\"ID\"], author[\"Name\"], author[\"Email\"], \"USI\", author[\"Bio\"]))\n",
"df_new_author = spark.createDataFrame(data=data_new_author, schema=df_authors.schema)\n",
"\n",
"# remove the old author and add the new one\n",
"df_authors = df_authors.filter((df_authors.ID != author[\"ID\"]))\n",
"df_authors = df_authors.union(df_new_author)\n",
"\n",
"author = df_authors.filter(df_authors.ID == author[\"ID\"]).collect()[0]\n",
"print(\"After: Author\", author[\"Name\"], \"with affiliation\", author[\"Affiliation\"])\n"
]
},
{
"cell_type": "markdown",
"id": "1fc59021",
"metadata": {},
"source": [
"#### 05. Remove duplicates in all dataframes"
]
},
{
"cell_type": "markdown",
"id": "a9dde7ea",
"metadata": {},
"source": [
"Since in theory it is possible to have duplicate rows in a Pyspark DataFrame, it could be useful to check their presence and remove them in case some are found. Anyway, the data creation pipeline we designed ensures that the IDs are unique."
]
},
{
"cell_type": "code",
"execution_count": 24,
"id": "fac9fd2d",
"metadata": {},
"outputs": [],
"source": [
"df_articles = df_articles.distinct()\n",
"df_articles_authors = df_articles_authors.distinct()\n",
"df_authors = df_authors.distinct()\n",
"df_journals = df_journals.distinct()\n",
"df_references = df_references.distinct()"
]
},
{
"cell_type": "markdown",
"id": "f664b74e",
"metadata": {},
"source": [
"### 10 queries with specified complexity"
]
},
{
"cell_type": "markdown",
"id": "7bec6b81",
"metadata": {},
"source": [
"#### Count the number of internal references to clean"
]
},
{
"cell_type": "markdown",
"id": "e235d59f",
"metadata": {},
"source": [
"Some internal references may be not present as articles in the database: this can happen after some data change, such as the removal of an article from the database. This query retrieves the number of references with are not present in the database."
]
},
{
"cell_type": "code",
"execution_count": 25,
"id": "1dc839e3",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----+\n",
"|count|\n",
"+-----+\n",
"|79610|\n",
"+-----+\n",
"\n"
]
}
],
"source": [
"df_consistent_references = df_references.join(df_articles, df_references.InternalPaperId == df_articles.ID)\\\n",
" .select(\"RefNumber\",\n",
" \"InternalPaperId\",\n",
" df_references.Title,\n",
" \"Authors\",\n",
" \"Journal\",\n",
" df_references.JournalId,\n",
" df_references.Volume,\n",
" df_references.NumberInVolume,\n",
" \"ArticleId\",\n",
" df_references.ID\n",
" )\n",
"\n",
"df_old_references = df_references.subtract(df_consistent_references)\n",
"\n",
"df_old_references.groupBy().count().show()"
]
},
{
"cell_type": "markdown",
"id": "f235ffd2",
"metadata": {},
"source": [
"#### 01. Retrieve the articles of a journal given the journal's name\n",
"Complexity: WHERE, JOIN"
]
},
{
"cell_type": "markdown",
"id": "2ef34446",
"metadata": {},
"source": [
"This query starts from a given journal name and retrieves all the articles published on that journal. The performed steps are the following:\n",
"\n",
"- Retrieve the journal ID given the name\n",
"- Join with the articles on the journal ID\n",
"- Drop the columns related to the journal"
]
},
{
"cell_type": "code",
"execution_count": 26,
"id": "7751ebf8",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-------+--------------------+--------------------+------+--------------+-------------+--------------------+---------+\n",
"| ID| Title| MetadataDate|Volume|NumberInVolume|PagesInVolume| ObjectIds|JournalId|\n",
"+-------+--------------------+--------------------+------+--------------+-------------+--------------------+---------+\n",
"|7154603|Enhancing decisio...|2022-12-18 17:46:...| 24| 6| 2099-2137|[https://doi.org/...| 12790675|\n",
"|7154450|A threat recognit...|2022-12-18 17:46:...| 25| 5| 2109-2138|[https://doi.org/...| 12790675|\n",
"|7154398|Direction-based s...|2022-12-18 17:46:...| 23| 1| 207-239|[https://doi.org/...| 12790675|\n",
"|7153948|Prediction of Web...|2022-12-18 17:46:...| 5| 1| 67-88|[https://doi.org/...| 12790675|\n",
"+-------+--------------------+--------------------+------+--------------+-------------+--------------------+---------+\n",
"\n",
"Execution time: 1.213135004043579\n"
]
}
],
"source": [
"journal_name = \"World Wide Web\"\n",
"\n",
"start_time = time()\n",
"\n",
"df_result = df_journals\\\n",
" .filter(df_journals.Name == journal_name)\\\n",
" .join(df_articles, df_articles.JournalId == df_journals.ID)\\\n",
" .drop(df_journals.ID)\\\n",
" .drop(df_journals.Name)\n",
"df_result.show()\n",
"\n",
"print(f\"Execution time: {time() - start_time}\")"
]
},
{
"cell_type": "markdown",
"id": "b1ef6a18",
"metadata": {},
"source": [
"#### 02. Retrieve 5 articles without the DOI registered which have been recently published\n",
"Complexity: WHERE, LIMIT, LIKE"
]
},
{
"cell_type": "markdown",
"id": "56a6fed0",
"metadata": {},
"source": [
"This query retrieves 5 articles for which the DOI is registered in the database and which have been published this year. The steps of the query are the following:\n",
"\n",
"- Explode the ObjectIds column of articles\n",
"- Filter the articles which have been published this year\n",
"- Filter the articles which have an ObjectId containing a DOI reference\n",
"- Limit the query to 5 entries"
]
},
{
"cell_type": "code",
"execution_count": 27,
"id": "fff244c7",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+-------------------+--------------------+\n",
"| Title| MetadataDate| ObjectId|\n",
"+--------------------+-------------------+--------------------+\n",
"|Sparse Count Data...|2022-03-16 00:00:00|https://doi.org/1...|\n",
"|Automatic classif...|2022-11-11 00:00:00|https://doi.org/1...|\n",
"|ST-FMT*: A Fast O...|2022-01-08 00:00:00|https://doi.org/1...|\n",
"|DOA estimation al...|2022-08-16 00:00:00|https://doi.org/1...|\n",
"|Uniform convergen...|2022-06-23 00:00:00|https://doi.org/1...|\n",
"+--------------------+-------------------+--------------------+\n",
"\n",
"Execution time: 1.1717472076416016\n"
]
}
],
"source": [
"start_time = time()\n",
"\n",
"df_result = df_articles\\\n",
" .select(col(\"Title\"), col(\"MetadataDate\"), explode(\"ObjectIds\"))\\\n",
" .withColumnRenamed(\"col\", \"ObjectId\")\\\n",
" .filter(df_articles.MetadataDate > datetime(datetime.now().year, 1, 1))\\\n",
" .filter(col(\"ObjectId\").startswith(\"https://doi.org/\"))\\\n",
" .limit(5)\n",
"\n",
"df_result.show()\n",
"\n",
"print(f\"Execution time: {time() - start_time}\")"
]
},
{
"cell_type": "markdown",
"id": "2155ef91",
"metadata": {},
"source": [
"#### 03. Articles of the authors with a given affiliation\n",
"Complexity: WHERE, IN, Nested Query"
]
},
{
"cell_type": "markdown",
"id": "4c584adc",
"metadata": {},
"source": [
"This query retrieves all the articles with some author related to a given affiliation. The steps of the query are the following:\n",
"\n",
"- Filter the authors with the given affiliation and create a list\n",
"- Filter the bridge table between authors and articles with author IDs in the previous list, and create a list of article IDs out of them\n",
"- Filter the articles DataFrame with the article ID in the just created list, and select the name of the articles"
]
},
{
"cell_type": "code",
"execution_count": 39,
"id": "36cb22c4",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+\n",
"|Title |\n",
"+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+\n",
"|Almost winning: Induced MEG theta power in insula and orbitofrontal cortex increases during gambling near-misses and is associated with BOLD signal and gambling severity.|\n",
"+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+\n",
"\n",
"Execution time: 2.1684679985046387\n"
]
}
],
"source": [
"affiliation = \"University of Vernon\"\n",
"\n",
"start_time = time()\n",
"\n",
"df_authors_of_affiliation = df_authors\\\n",
" .filter(df_authors.Affiliation == affiliation)\n",
"\n",
"authors_of_affiliation = [author[\"ID\"] for author in df_authors_of_affiliation.collect()]\n",
"\n",
"df_articles_of_authors_of_affiliation = df_articles_authors\\\n",
" .join(df_articles, df_articles[\"ID\"] == df_articles_authors[\"ArticleId\"], 'inner') \\\n",
" .filter(df_articles_authors.AuthorId.isin(authors_of_affiliation)) \\\n",
" .select(\"ArticleId\")\n",
"\n",
"articles_of_authors_of_affiliation = [article[\"ArticleId\"] for article in df_articles_of_authors_of_affiliation.collect()]\n",
"\n",
"df_result = df_articles\\\n",
" .filter(df_articles.ID.isin(articles_of_authors_of_affiliation))\\\n",
" .select(df_articles.Title)\n",
"\n",
"df_result.show(truncate=False)\n",
"\n",
"print(f\"Execution time: {time() - start_time}\")"
]
},
{
"cell_type": "markdown",
"id": "8eeafcd4",
"metadata": {},
"source": [
"#### 04. Count the number of publications on a few journals\n",
"Complexity: GROUP BY, JOIN, AS"
]
},
{
"cell_type": "markdown",
"id": "7bd04f9a",
"metadata": {},
"source": [
"This query counts the publications on each element of a list of journals. The steps of the query are the following:\n",
"\n",
"- Filter the journal DataFrame for journals in the given list\n",
"- Join with articles on journal ID\n",
"- Group by journal name (journal ID works as well)\n",
"- Count the entries for each journal name"
]
},
{
"cell_type": "code",
"execution_count": 29,
"id": "2fb014e8",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------+------------+\n",
"| Name|Publications|\n",
"+--------------+------------+\n",
"|World Wide Web| 4|\n",
"+--------------+------------+\n",
"\n",
"Execution time: 1.2528290748596191\n"
]
}
],
"source": [
"journal_names = [\"World Wide Web\", \"SIGMOD Record\"]\n",
"\n",
"start_time = time()\n",
"\n",
"df_result = df_journals\\\n",
" .filter(df_journals.Name.isin(journal_names))\\\n",
" .join(df_articles, df_journals.ID == df_articles.JournalId, \"inner\")\\\n",
" .groupBy(df_journals.Name)\\\n",
" .count()\\\n",
" .withColumnRenamed(\"count\", \"Publications\")\n",
"\n",
"df_result.show()\n",
"\n",
"print(f\"Execution time: {time() - start_time}\")"
]
},
{
"cell_type": "markdown",
"id": "e45e4b3e",
"metadata": {},
"source": [
"#### 05. Count the registered contacts for each affiliation\n",
"Complexity: WHERE, GROUP BY"
]
},
{
"cell_type": "markdown",
"id": "2b8e28aa",
"metadata": {},
"source": [
"This query counts how many authors for each affiliation have a registered email, from which they can be contacted. The steps of the query are the following:\n",
"\n",
"- Filter the authors who have a registered email\n",
"- Group by affiliation\n",
"- Count the number of entries for each affiliation"
]
},
{
"cell_type": "code",
"execution_count": 30,
"id": "72d13b51",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+------------------+\n",
"| Affiliation|Number of contacts|\n",
"+--------------------+------------------+\n",
"|University of Parola| 61|\n",
"|University of Ksa...| 63|\n",
"|University of Pri...| 83|\n",
"|University of Vahdat| 122|\n",
"| University of Ina| 128|\n",
"| University of Split| 57|\n",
"|University of Bat...| 52|\n",
"|University of Gua...| 71|\n",
"|University of Nonsan| 73|\n",
"|University of Dav...| 73|\n",
"|University of Pal...| 58|\n",
"|University of Kas...| 129|\n",
"|University of Sai...| 54|\n",
"|University of Pil...| 73|\n",
"|University of Sch...| 51|\n",
"|University of Mar...| 58|\n",
"|University of Alb...| 66|\n",
"|University of Mar...| 58|\n",
"|University of Wes...| 59|\n",
"|University of Hun...| 172|\n",
"+--------------------+------------------+\n",
"only showing top 20 rows\n",
"\n",
"Execution time: 0.532426118850708\n"
]
}
],
"source": [
"start_time = time()\n",
"\n",
"df_result = df_authors\\\n",
" .filter(df_authors.Email.isNotNull())\\\n",
" .groupBy(df_authors.Affiliation)\\\n",
" .count()\\\n",
" .withColumnRenamed(\"count\", \"Number of contacts\")\n",
"\n",
"df_result.show()\n",
"\n",
"print(f\"Execution time: {time() - start_time}\")"
]
},
{
"cell_type": "markdown",
"id": "b6ddd156",
"metadata": {},
"source": [
"#### 06. Journals with many publications\n",
"Complexity: GROUP BY, HAVING, AS"
]
},
{
"cell_type": "markdown",
"id": "5a91be9f",
"metadata": {},
"source": [
"This query retrieves the IDs of the journals for which more than 5 publications are registered in the database. The steps are the following:\n",
"\n",
"- Group articles by journal ID\n",
"- Count the entries for each group\n",
"- Filter the entries with more than 5 publications"
]
},
{
"cell_type": "code",
"execution_count": 31,
"id": "42fa5350",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---------+----------------------+\n",
"|JournalId|Number of publications|\n",
"+---------+----------------------+\n",
"| 12792492| 21|\n",
"| 12791222| 6|\n",
"| 12791288| 23|\n",
"| 12792037| 24|\n",
"| 12791371| 22|\n",
"| 12791712| 6|\n",
"| 12791500| 6|\n",
"| 12791219| 8|\n",
"| 12790733| 10|\n",
"| 12792375| 9|\n",
"| 12790705| 48|\n",
"| 12791668| 11|\n",
"| 12792121| 6|\n",
"| 12790862| 19|\n",
"| 12792462| 16|\n",
"| 12790945| 45|\n",
"| 12790804| 11|\n",
"| 12791047| 14|\n",
"| 12792154| 19|\n",
"| 12791637| 15|\n",
"+---------+----------------------+\n",
"only showing top 20 rows\n",
"\n",
"Execution time: 1.3681659698486328\n"
]
}
],
"source": [
"start_time = time()\n",
"\n",
"df_result = df_articles\\\n",
" .groupBy(df_articles.JournalId)\\\n",
" .agg(count(df_articles.ID).alias(\"Number of publications\"))\\\n",
" .filter(col(\"Number of publications\") > 5)\n",
" \n",
"df_result.show()\n",
"\n",
"print(f\"Execution time: {time() - start_time}\")"
]
},
{
"cell_type": "markdown",
"id": "b26b365d",
"metadata": {},
"source": [
"#### 07. Registered references with volume cited only once\n",
"Complexity: WHERE, GROUP BY, HAVING, AS"
]
},
{
"cell_type": "markdown",
"id": "4197fa8b",
"metadata": {},
"source": [
"This query retrieves the citations which refer to an article registered in the database and which are the only ones for a specific volume of a journal. The steps are the following:\n",
"\n",
"- Filter the references which have an internal paper id (they do refer to a registered article)\n",
"- Group by journal and volume\n",
"- Count the entries for each combination of journal and volume\n",
"- Filter the combinations with only one citation"
]
},
{
"cell_type": "code",
"execution_count": 32,
"id": "e11d2db7",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+--------------+---------+\n",
"| Journal| Volume|Citations|\n",
"+--------------------+--------------+---------+\n",
"| J. Appl. Log.| 15| 1|\n",
"|Model. Assist. St...| 14| 1|\n",
"|J. Optim. Theory ...| 190| 1|\n",
"|Multimodal Techno...| 5| 1|\n",
"|Math. Comput. Model.| 50| 1|\n",
"| J. Sensors| 2018| 1|\n",
"|Comput. Commun. Rev.| 31| 1|\n",
"|Discuss. Math. Gr...| 32| 1|\n",
"| SIGACT News| 41| 1|\n",
"| Comput. Chem. Eng.| 121| 1|\n",
"|Int. J. Bus. Inf....| 25| 1|\n",
"| Scientometrics| 2| 1|\n",
"|J. Inf. Process. ...| 20| 1|\n",
"| J. Sensors| 2021| 1|\n",
"| Computing| 64| 1|\n",
"| IET Image Process.| 12| 1|\n",
"| Robotica| 37| 1|\n",
"|IEEE Wirel. Commu...| 3| 1|\n",
"| CoRR|abs/1903.09080| 1|\n",
"|IEEE Signal Proce...| 22| 1|\n",
"+--------------------+--------------+---------+\n",
"only showing top 20 rows\n",
"\n",
"Execution time: 0.7499880790710449\n"
]
}
],
"source": [
"start_time = time()\n",
"\n",
"df_result = df_references\\\n",
" .filter(df_references.InternalPaperId.isNotNull())\\\n",
" .groupBy(df_references.Journal, df_references.Volume)\\\n",
" .agg(count(df_references.ID).alias(\"Citations\"))\\\n",
" .filter(col(\"Citations\") == 1)\n",
"\n",
"df_result.show()\n",
"\n",
"print(f\"Execution time: {time() - start_time}\")"
]
},
{
"cell_type": "markdown",
"id": "de6151b5",
"metadata": {},
"source": [
"#### 08. Number of citations for each author of an affiliation\n",
"Complexity: WHERE, Nested Query, GROUP BY"
]
},
{
"cell_type": "markdown",
"id": "3416276b",
"metadata": {},
"source": [
"This query retrieves the number of citations for each author of a given affiliation. The steps are the following:\n",
"\n",
"- Filter authors of the given affiliation and create a list of author names\n",
"- Explode authors in references\n",
"- Filter references of authors in the list\n",
"- Group by author\n",
"- Count the number of entries"
]
},
{
"cell_type": "code",
"execution_count": 33,
"id": "2340405c",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+--------------------+---------+\n",
"| Author|Citations|\n",
"+--------------------+---------+\n",
"| Tatiana Glotova| 1|\n",
"| Charles H. Knapp| 6|\n",
"| Jonathan Huang| 3|\n",
"|Md. Jahangir Hoss...| 5|\n",
"| Xiaoli Wang| 2|\n",
"|Abdon E. Choque R...| 4|\n",
"| Zhen Xu| 4|\n",
"| Qinglai Guo| 2|\n",
"| Hugh E. Williams| 2|\n",
"| Sotirios Brotsis| 4|\n",
"| Ryan Cordell| 5|\n",
"| James Guo Ming Fu| 8|\n",
"| Pavel A. Brodskiy| 4|\n",
"| Shuai Zheng| 7|\n",
"| Danya Song| 10|\n",
"| Alexander Roßnagel| 4|\n",
"| Rajendra Kumar| 6|\n",
"| Stefan Buckreuss| 3|\n",
"| Yupu Yang| 2|\n",
"| Yi Liu 0053| 4|\n",
"+--------------------+---------+\n",
"only showing top 20 rows\n",
"\n",
"Execution time: 1.0813488960266113\n"
]
}
],
"source": [
"affiliation = \"University of Gorakhpur\"\n",
"\n",
"start_time = time()\n",
"\n",
"df_sub_result = df_authors\\\n",
" .filter(col(\"Affiliation\") == affiliation)\n",
"\n",
"sub_result = [author[\"Name\"] for author in df_sub_result.collect()]\n",
"\n",
"df_result = df_references\\\n",
" .select(df_references.Title, explode(df_references.Authors).alias(\"Author\"))\\\n",
" .filter(col(\"Author\").isin(sub_result))\\\n",
" .groupBy(col(\"Author\"))\\\n",
" .agg(count(df_references.Title).alias(\"Citations\"))\n",
"\n",
"df_result.show()\n",
"\n",
"print(f\"Execution time: {time() - start_time}\")"
]
},
{
"cell_type": "markdown",
"id": "dd307eee",
"metadata": {},
"source": [
"#### 09. Titles of internal journals with a reference count in \\[200, 1000)\n",
"Complexity: WHERE, GROUP BY, HAVING, 1 JOIN"
]
},
{
"cell_type": "markdown",
"id": "62874b5a",
"metadata": {},
"source": [
"This query computes the names of the journals that have between 200 and 999 references in the dataset. Results are returned in decreasing order of the reference count. The steps are the following:\n",
"\n",
"- Filter for internal references (i.e. when the journal id is not NULL);\n",
"- Group references by internal journal id;\n",
"- Count references and filter for the \\[200,1000) range;\n",
"- Perform an INNER JOIN with the journals dataframe;\n",
"- Select the name and the count."
]
},
{
"cell_type": "code",
"execution_count": 123,
"id": "80a5d4ea",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Execution time: 0.8328709602355957\n",
"[Row(Name='IEEE Access', count=848), Row(Name='Sensors', count=643), Row(Name='Remote. Sens.', count=367), Row(Name='NeuroImage', count=294), Row(Name='IACR Cryptol. ePrint Arch.', count=288), Row(Name='Neurocomputing', count=276), Row(Name='Appl. Math. Comput.', count=269), Row(Name='Expert Syst. Appl.', count=248), Row(Name='Discret. Math.', count=231), Row(Name='IEEE Trans. Geosci. Remote. Sens.', count=229), Row(Name='Eur. J. Oper. Res.', count=227), Row(Name='IEEE Trans. Commun.', count=222), Row(Name='IEEE Trans. Signal Process.', count=211)]\n"
]
}
],
"source": [
"start_time = time()\n",
"\n",
"result = df_references \\\n",
" .filter(F.col(\"JournalId\").isNotNull()) \\\n",
" .groupBy(\"JournalId\") \\\n",
" .count() \\\n",
" .filter((F.col(\"count\") >= 200) & (F.col(\"count\") < 1000)) \\\n",
" .join(df_journals, df_journals[\"Id\"] == F.col(\"JournalId\"), 'inner') \\\n",
" .select(\"Name\", \"count\") \\\n",
" .sort(F.col(\"count\").desc()) \\\n",
" .collect()\n",
"\n",
"print(f\"Execution time: {time() - start_time}\")\n",
"\n",
"print(result)"
]
},
{
"cell_type": "markdown",
"id": "b2b86356",
"metadata": {},
"source": [
"#### 10. Top 5 authors by published articles \n",
"Complexity: WHERE, GROUP BY, HAVING, 2 JOINs"
]
},
{
"cell_type": "markdown",
"id": "b8b2b6d0",
"metadata": {},
"source": [
"This cell computes the top 5 authors by number of articles published in the last 3000 articles (by ID) in the dataset. We also require the number of articles published to be greater than 1. The steps are the following:\n",
"\n",
"- Filter for the last 3000 rows of the dataset;\n",
"- Inner join with the articles-authors dataframe;\n",
"- Group by author id;\n",
"- Join with the authors dataframe;\n",
"- Filter by count greater than 1;\n",
"- Select name and count;\n",
"- Sort and limit to 5."
]
},
{
"cell_type": "code",
"execution_count": 122,
"id": "70e4a346",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"[Stage 2138:==========================================> (3 + 1) / 4]\r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Execution time: 29.39945387840271\n",
"[Row(Name='H. Vincent Poor', count=9), Row(Name='Mohamed-Slim Alouini', count=6), Row(Name='Licheng Jiao', count=6), Row(Name='Kim-Kwang Raymond Choo', count=6), Row(Name='Sam Kwong', count=6)]\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
" \r"
]
}
],
"source": [
"start_time = time()\n",
"\n",
"result = df_articles \\\n",
" .filter(F.col(\"ID\") > 7000) \\\n",
" .join(df_articles_authors, df_articles_authors[\"ArticleId\"] == F.col(\"ID\")) \\\n",
" .drop(\"ID\") \\\n",
" .groupBy(\"AuthorId\") \\\n",
" .count() \\\n",
" .join(df_authors, df_authors[\"ID\"] == F.col(\"AuthorId\")) \\\n",
" .filter(F.col(\"count\") >= 2) \\\n",
" .select(\"Name\", \"count\") \\\n",
" .sort(F.col(\"Count\").desc()) \\\n",
" .limit(5) \\\n",
" .collect()\n",
"\n",
"print(f\"Execution time: {time() - start_time}\")\n",
"\n",
"print(result)"
]
}
],
"metadata": {
"authors": [
{
"name": "Claudio Maggioni"
},
{
"name": "Lorenzo Martignetti"
}
],
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.8"
},
"title": "Data Design and Modelling -- Project Work Part 3",
"vscode": {
"interpreter": {
"hash": "916dbcbb3f70747c44a77c7bcd40155683ae19c65e1c03b4aa3499c5328201f1"
}
}
},
"nbformat": 4,
"nbformat_minor": 5
}