bachelorThesis/figure_9/figure9c-join.py

105 lines
2.8 KiB
Python
Executable File

#!/usr/bin/env python3
# coding: utf-8
import json
import pandas
import findspark
import pyspark
import pyspark.sql
import sys
import gzip
import os
from pyspark import AccumulatorParam
from pyspark.sql.functions import lit
from pyspark.sql import Window
from pyspark.sql.types import ByteType
from pyspark.sql.functions import col
import math
if len(sys.argv) is not 4:
print(sys.argv[0] + " {cluster} {tmpdir} {maxram}")
sys.exit()
cluster=sys.argv[1]
findspark.init()
DIR = os.path.dirname(__file__)
spark = pyspark.sql.SparkSession.builder \
.appName("figure_9a_join") \
.config("spark.driver.maxResultSize", "128g") \
.config("spark.local.dir", sys.argv[2]) \
.config("spark.driver.memory", sys.argv[3]) \
.getOrCreate()
sc = spark.sparkContext
dfepath="/home/claudio/google_2019/collection_events/" + cluster + "/" + cluster + "_collection_events*.json.gz"
#dfepath="/home/claudio/google_2019/collection_events/" + cluster + "/" + cluster + "_test.json"
df = spark.read.json(dfepath)
df = df.withColumnRenamed("collection_id", "jobid") \
.select("jobid", "collection_type", "time", "type")
try:
df["collection_type"] = df["collection_type"].cast(ByteType())
except:
df = df.withColumn("collection_type", lit(None).cast(ByteType()))
df = df.filter(((col("collection_type").isNull()) | (col("collection_type") ==
"0")) & (col("time").isNotNull()))
jpath = DIR + "/figure-9c-machine-count-" + cluster + ".parquet"
dfj = spark.read.parquet(jpath)
jjpath = DIR + "/figure-9a-task-count-" + cluster + ".parquet"
dfjj = spark.read.parquet(jjpath)
df = df.join(dfj, 'jobid', 'left')
df = df.join(dfjj, "jobid")
MICROS = 1000000
def round_down(n, decimals=0):
multiplier = 10 ** decimals
return math.floor(n * multiplier) / multiplier
def for_each_job(data):
jobid = data[0]
ts = data[1]
global non
ts = sorted(ts, key=lambda x: x["time"])
l = len(ts)
last_term = -1
machine_count = 0
for i,t in enumerate(ts):
if t["type"] >= 4 and t["type"] <= 8:
last_term = t["type"]
if t["task_count"] <= 0 or t["machine_count"] <= 0:
machine_count = -1
else:
machine_count = round_down(t["machine_count"] / t["task_count"], 1)
return str(jobid) + "," + str(last_term) + "," + str(machine_count)
def cleanup(x):
return {
"time": int(x.time),
"type": 0 if x.type is None else int(x.type),
"id": x.jobid,
"task_count": -1 if x.task_count is None else int(x.task_count),
"machine_count": -1 if x.machine_count is None else int(x.machine_count)
}
df2 = df.rdd \
.map(cleanup) \
.groupBy(lambda x: x["id"]) \
.repartition(100) \
.map(for_each_job) \
.coalesce(1) \
.saveAsTextFile(cluster + "_machine_locality")
# vim: set ts=4 sw=4 et tw=80: