This commit is contained in:
Claudio Maggioni 2021-04-27 13:32:15 +00:00
parent 8f9e5384e4
commit 455cc442d9
35 changed files with 1413585 additions and 5 deletions

View File

@ -83,7 +83,7 @@ usage_schema = StructType() \
.add("cpu_usage_dist_98", DoubleType(), True) \
.add("cpu_usage_dist_99", DoubleType(), True)
dfu = spark.read.format("csv") \
dfu = spark.read.format("json") \
.option("header", False) \
.schema(usage_schema) \
.load(dfupath)

View File

@ -62,15 +62,16 @@ spark = pyspark.sql.SparkSession.builder \
.appName("task_slowdown") \
.config("spark.driver.maxResultSize", "256g") \
.config("spark.local.dir", "/home/claudio/tmp") \
.config("spark.driver.memory", "120g") \
.config("spark.driver.memory", "200g") \
.getOrCreate()
sc = spark.sparkContext
path = "/home/claudio/" + cluster + "/" + cluster
path += "_instance_usage*.csv.gz"
# path = "/home/claudio/" + cluster + "/" + cluster
# path += "_instance_usage*.csv.gz"
path = "/home/claudio/raid0/google_2019/instance_usage/" + cluster + "/" + cluster + "_instance_usage*.json.gz"
#path += "_test.csv"
df = spark.read.format("csv") \
df = spark.read.format("json") \
.option("header", False) \
.schema(schema) \
.load(path)

Binary file not shown.

File diff suppressed because it is too large Load Diff

Binary file not shown.

View File

File diff suppressed because it is too large Load Diff

Binary file not shown.

View File

File diff suppressed because it is too large Load Diff

Binary file not shown.

View File

File diff suppressed because it is too large Load Diff

Binary file not shown.

View File

File diff suppressed because it is too large Load Diff

Binary file not shown.

View File

File diff suppressed because it is too large Load Diff

Binary file not shown.

View File

File diff suppressed because it is too large Load Diff

Binary file not shown.

View File

File diff suppressed because it is too large Load Diff

68
table_iii/fig5.py Executable file
View File

@ -0,0 +1,68 @@
#!/usr/bin/env python3
# coding: utf-8
import json
import pandas
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
import gzip
import os
import pyspark.sql.functions as F
from pyspark import AccumulatorParam
from pyspark.sql.functions import lit
from pyspark.sql import Window
from pyspark.sql.types import StructType, LongType, StringType, ByteType
def main():
global DIR
global NAME
global launched
if len(sys.argv) != 4:
print(sys.argv[0] + " {cluster} {tmpdir} {maxram}")
sys.exit()
cluster=sys.argv[1]
DIR = os.path.dirname(__file__)
NAME = "fig-5-" + cluster
if os.path.exists(DIR + "/" + NAME + "-working") or os.path.exists(DIR + "/" + NAME + ".parquet"):
print("already launched")
launched = True
sys.exit()
os.system("touch " + DIR + "/" + NAME + "-working")
spark = pyspark.sql.SparkSession.builder \
.appName(NAME) \
.config("spark.driver.maxResultSize", "32g") \
.config("spark.local.dir", sys.argv[2]) \
.config("spark.driver.memory", sys.argv[3]) \
.getOrCreate()
sc = spark.sparkContext
df = spark.read.parquet(DIR + "/bigtable-" + cluster + ".parquet")
cnt_cond = lambda cond: F.sum(F.when(cond, 1).otherwise(0))
df = df.groupBy(["count_4", "count_5", "count_7", "count_8"]).agg( \
cnt_cond(F.col('task_term') == 6).alias('count_succ'),
cnt_cond(F.col('task_term') != 6).alias('count_not_succ'))
df.repartition(1).write.csv(DIR + "/" + NAME + ".csv")
if __name__ == "__main__":
launched = False
DIR = None
NAME = None
try:
main()
finally:
if not launched and DIR and NAME:
os.system("rm -v " + DIR + "/" + NAME + "-working")
# vim: set ts=4 sw=4 et tw=120: