bachelorThesis/figure_7/figure7a.py

91 lines
2.5 KiB
Python
Raw Permalink Normal View History

2021-04-17 08:02:57 +00:00
#!/usr/bin/env python3
# coding: utf-8
import json
import pandas
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
import gzip
import os
from pyspark import AccumulatorParam
from pyspark.sql.functions import lit
from pyspark.sql import Window
from pyspark.sql.types import ByteType
if len(sys.argv) is not 4:
print(sys.argv[0] + " {cluster} {tmpdir} {maxram}")
sys.exit()
cluster=sys.argv[1]
if os.path.exists("/home/claudio/google_2019/thesis_queries/figure_7/" + cluster + "_priority"):
print("already computed")
sys.exit()
spark = pyspark.sql.SparkSession.builder \
.appName("task_slowdown") \
.config("spark.driver.maxResultSize", "128g") \
.config("spark.local.dir", sys.argv[2]) \
.config("spark.driver.memory", sys.argv[3]) \
.getOrCreate()
sc = spark.sparkContext
df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz")
#df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_test.json")
try:
df["collection_type"] = df["collection_type"].cast(ByteType())
except:
df = df.withColumn("collection_type", lit(None).cast(ByteType()))
def for_each_task(data):
ts = data[1]
priority = -1
term = -1
min_time = -1
max_time = -1
for t in ts:
if (min_time == -1 or t["time"] <= min_time) and t["priority"] != -1:
priority = t["priority"]
min_time = t["time"]
if (max_time == -1 or t["time"] >= max_time) and t["type"] >= 4 and t["type"] <= 8:
term = t["type"]
max_time = t["time"]
return (term, priority)
def cleanup(x):
return {
"time": int(x.time),
"type": 0 if x.type is None else int(x.type),
"id": x.collection_id + "-" + x.instance_index,
"priority": -1 if x.priority is None else int(x.priority)
}
def sum_rows(xs):
csum = 0
for x in xs:
csum += x[3]
return csum
df2 = df.rdd \
.filter(lambda x: x.collection_type is None or x.collection_type == 0) \
.filter(lambda x: x.time is not None and x.instance_index is not None and x.collection_id is not None) \
.map(cleanup) \
.groupBy(lambda x: x["id"]) \
.map(for_each_task) \
.groupBy(lambda x: x) \
.mapValues(lambda x: len(x)) \
.map(lambda x: str(x[0][0]) + "," + str(x[0][1]) + "," + str(x[1])) \
.coalesce(1) \
.saveAsTextFile(cluster + "_priority")
# vim: set ts=4 sw=4 et tw=80: