bachelorThesis/figure_9/figure9a.py

61 lines
1.9 KiB
Python
Executable File

#!/usr/bin/env python3
# coding: utf-8
import os
import json
import pandas as pd
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
import gzip
from pyspark import AccumulatorParam
from pyspark.sql.functions import lit
from pyspark.sql import Window
from pyspark.sql.types import *
from decimal import *
import os
if len(sys.argv) is not 5:
print(sys.argv[0] + " {cluster} {tmpdir} {maxram} {basedir}")
sys.exit()
cluster=sys.argv[1]
if os.path.exists("/home/claudio/google_2019/thesis_queries/figure_9/figure-9a-task-count-" + cluster + "-working"):
print("already done")
sys.exit()
os.system("touch /home/claudio/google_2019/thesis_queries/figure_9/figure-9a-task-count-" + cluster + "-working")
spark = pyspark.sql.SparkSession.builder \
.appName("figure_9a") \
.config("spark.driver.maxResultSize", "128g") \
.config("spark.local.dir", sys.argv[2]) \
.config("spark.driver.memory", sys.argv[3]) \
.getOrCreate()
sc = spark.sparkContext
# READING INSTANCE EVENTS DATA
dfepath = sys.argv[4] + "/" + cluster + "/" + cluster + "_instance_events*.json.gz"
#dfepath = sys.argv[4] + "/" + cluster + "/" + cluster + "_test.json"
df = spark.read.json(dfepath)
try:
df["collection_type"] = df["collection_type"].cast(ByteType())
except:
df = df.withColumn("collection_type", lit(None).cast(ByteType()))
df = df.rdd.filter(lambda x: x.collection_id is not None and x.instance_index is not None and
(x.collection_type == 0 or x.collection_type is None)) \
.map(lambda x: (x.collection_id, x.instance_index)) \
.groupBy(lambda x: x[0]) \
.map(lambda x: [x[0], len(set(x[1]))]) \
.toDF(["jobid", "task_count"])
df.write.parquet("/home/claudio/google_2019/thesis_queries/figure_9/figure-9a-task-count-" + cluster + ".parquet")
os.system("rm /home/claudio/google_2019/thesis_queries/figure_9/figure-9a-task-count-" + cluster + "-working")
# vim: set ts=4 sw=4 et tw=120: