bachelorThesis/figure_9/figure9a-join.py

93 lines
2.3 KiB
Python
Executable File

#!/usr/bin/env python3
# coding: utf-8
import json
import pandas
import findspark
import pyspark
import pyspark.sql
import sys
import gzip
import os
from pyspark import AccumulatorParam
from pyspark.sql.functions import lit
from pyspark.sql import Window
from pyspark.sql.types import ByteType
from pyspark.sql.functions import col
if len(sys.argv) is not 4:
print(sys.argv[0] + " {cluster} {tmpdir} {maxram}")
sys.exit()
cluster=sys.argv[1]
findspark.init()
DIR = os.path.dirname(__file__)
NAME = cluster + "_figure9a.csv"
spark = pyspark.sql.SparkSession.builder \
.appName("figure_9a_join") \
.config("spark.driver.maxResultSize", "128g") \
.config("spark.local.dir", sys.argv[2]) \
.config("spark.driver.memory", sys.argv[3]) \
.getOrCreate()
sc = spark.sparkContext
dfepath="/home/claudio/google_2019/collection_events/" + cluster + "/" + cluster + "_collection_events*.json.gz"
#dfepath="/home/claudio/google_2019/collection_events/" + cluster + "/" + cluster + "_test.json"
df = spark.read.json(dfepath)
df = df.withColumnRenamed("collection_id", "jobid") \
.select("jobid", "collection_type", "time", "type")
try:
df["collection_type"] = df["collection_type"].cast(ByteType())
except:
df = df.withColumn("collection_type", lit(None).cast(ByteType()))
df = df.filter(((col("collection_type").isNull()) | (col("collection_type") ==
"0")) & (col("time").isNotNull()))
jpath = DIR + "/figure-9a-task-count-" + cluster + ".parquet"
dfj = spark.read.parquet(jpath)
df = df.join(dfj, 'jobid', 'left')
MICROS = 1000000
def for_each_job(data):
jobid = data[0]
ts = data[1]
global non
ts = sorted(ts, key=lambda x: x["time"])
l = len(ts)
last_term = -1
task_count = 0
for i,t in enumerate(ts):
if t["type"] >= 4 and t["type"] <= 8:
last_term = t["type"]
task_count = t["task_count"]
return str(jobid) + "," + str(last_term) + "," + str(task_count)
def cleanup(x):
return {
"time": int(x.time),
"type": 0 if x.type is None else int(x.type),
"id": x.jobid,
"task_count": -1 if x.task_count is None else int(x.task_count)
}
df2 = df.rdd \
.map(cleanup) \
.groupBy(lambda x: x["id"]) \
.repartition(100) \
.map(for_each_job) \
.coalesce(1) \
.saveAsTextFile(cluster + "_task_count")
# vim: set ts=4 sw=4 et tw=80: