bachelorThesis/figure_9/figure9a.py

53 lines
1.5 KiB
Python
Raw Normal View History

2021-04-20 16:25:11 +00:00
#!/usr/bin/env python3
# coding: utf-8
import os
import json
import pandas as pd
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
import gzip
from pyspark import AccumulatorParam
from pyspark.sql.functions import lit
from pyspark.sql import Window
from pyspark.sql.types import *
from decimal import *
if len(sys.argv) is not 5:
print(sys.argv[0] + " {cluster} {tmpdir} {maxram} {basedir}")
sys.exit()
cluster=sys.argv[1]
spark = pyspark.sql.SparkSession.builder \
.appName("figure_9a") \
.config("spark.driver.maxResultSize", "128g") \
.config("spark.local.dir", sys.argv[2]) \
.config("spark.driver.memory", sys.argv[3]) \
.getOrCreate()
sc = spark.sparkContext
# READING INSTANCE EVENTS DATA
2021-04-20 16:46:36 +00:00
dfepath = sys.argv[4] + "/" + cluster + "/" + cluster + "_instance_events*.json.gz"
#dfepath = sys.argv[4] + "/" + cluster + "/" + cluster + "_test.json"
2021-04-20 16:25:11 +00:00
df = spark.read.json(dfepath)
try:
df["collection_type"] = df["collection_type"].cast(ByteType())
except:
df = df.withColumn("collection_type", lit(None).cast(ByteType()))
df = df.rdd.filter(lambda x: x.collection_id is not None and x.instance_index is not None and
2021-04-20 16:46:36 +00:00
(x.collection_type == 0 or x.collection_type is None)) \
2021-04-20 16:25:11 +00:00
.map(lambda x: (x.collection_id, x.instance_index)) \
.groupBy(lambda x: x[0]) \
2021-04-20 16:46:36 +00:00
.map(lambda x: [x[0], len(x[1])]) \
.toDF(["jobid", "task_count"])
2021-04-20 16:25:11 +00:00
2021-04-20 16:46:36 +00:00
df.write.parquet("/home/claudio/google_2019/thesis_queries/figure_9/figure-9a-task-count-" + cluster + ".parquet")
2021-04-20 16:25:11 +00:00
# vim: set ts=4 sw=4 et tw=120: