correction 9c
This commit is contained in:
parent
455cc442d9
commit
32e3e143c6
|
@ -4,5 +4,5 @@ task_slowdown/?_state_changes.json.gz
|
|||
|
||||
figure_9/*.parquet/
|
||||
figure_9/?_task_count/
|
||||
figure_9/?_machine_count/
|
||||
figure_9/?_machine_locality/
|
||||
table_iii/*.parquet/
|
||||
|
|
|
@ -49,13 +49,14 @@ def tabid(x):
|
|||
#
|
||||
# READING INSTANCE EVENTS DATA
|
||||
#
|
||||
dfepath = "/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz"
|
||||
#dfepath = "/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz"
|
||||
dfepath = "/home/claudio/" + cluster + "/" + cluster + "_instance_events*.json.gz"
|
||||
#dfepath = "/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_test.json"
|
||||
df = spark.read.json(dfepath)
|
||||
|
||||
# READING MACHINE EVENTS DATA, sort them and save them as broadcast variable
|
||||
print("Starting to read machine events...")
|
||||
dfm = pd.read_csv("~/google_2019/machine_events/" + cluster + "_machine_events.csv", converters={
|
||||
dfm = pd.read_csv("/home/claudio/google_2019/machine_events/" + cluster + "_machine_events.csv", converters={
|
||||
'time': lambda x: -1 if x == '' else int(x),
|
||||
'machine_id': lambda x: str(x),
|
||||
'capacity.cpus': lambda x: -1 if x == '' else Decimal(x),
|
||||
|
|
|
@ -42,7 +42,7 @@ dfe = spark.read.json(dfepath)
|
|||
# READING INSTANCE USAGE DATA
|
||||
#
|
||||
dfupath = "/home/claudio/google_2019/instance_usage/" + cluster
|
||||
dfupath += "/" + cluster + ("_instance_usage00000000000?.csv.gz" if TESTDATA else "_instance_usage*.csv.gz")
|
||||
dfupath += "/" + cluster + ("_instance_usage00000000000?.csv.gz" if TESTDATA else "_instance_usage*.json.gz")
|
||||
usage_schema = StructType() \
|
||||
.add("start_time", LongType(), True) \
|
||||
.add("end_time", LongType(), True) \
|
||||
|
|
|
@ -2,6 +2,6 @@
|
|||
|
||||
i=$1
|
||||
|
||||
cut ${i}_machine_count/part-00000 -d, -f2-3 | \
|
||||
cut ${i}_machine_locality/part-00000 -d, -f2-3 | \
|
||||
sort | uniq -c | sed 's/^\s*//' | tr ' ' ',' | \
|
||||
awk 'BEGIN {print "count,term,task_count"} {print $0}' > ${i}_term_machine_count.csv
|
||||
awk 'BEGIN {print "count,term,machine_locality"} {print $0}' > ${i}_term_machine_locality.csv
|
||||
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -14,6 +14,7 @@ from pyspark.sql.functions import lit
|
|||
from pyspark.sql import Window
|
||||
from pyspark.sql.types import ByteType
|
||||
from pyspark.sql.functions import col
|
||||
import math
|
||||
|
||||
if len(sys.argv) is not 4:
|
||||
print(sys.argv[0] + " {cluster} {tmpdir} {maxram}")
|
||||
|
@ -23,7 +24,6 @@ cluster=sys.argv[1]
|
|||
findspark.init()
|
||||
|
||||
DIR = os.path.dirname(__file__)
|
||||
NAME = cluster + "_figure9a.csv"
|
||||
|
||||
spark = pyspark.sql.SparkSession.builder \
|
||||
.appName("figure_9a_join") \
|
||||
|
@ -51,10 +51,18 @@ df = df.filter(((col("collection_type").isNull()) | (col("collection_type") ==
|
|||
jpath = DIR + "/figure-9c-machine-count-" + cluster + ".parquet"
|
||||
dfj = spark.read.parquet(jpath)
|
||||
|
||||
jjpath = DIR + "/figure-9a-task-count-" + cluster + ".parquet"
|
||||
dfjj = spark.read.parquet(jjpath)
|
||||
|
||||
df = df.join(dfj, 'jobid', 'left')
|
||||
df = df.join(dfjj, "jobid")
|
||||
|
||||
MICROS = 1000000
|
||||
|
||||
def round_down(n, decimals=0):
|
||||
multiplier = 10 ** decimals
|
||||
return math.floor(n * multiplier) / multiplier
|
||||
|
||||
def for_each_job(data):
|
||||
jobid = data[0]
|
||||
ts = data[1]
|
||||
|
@ -69,7 +77,10 @@ def for_each_job(data):
|
|||
for i,t in enumerate(ts):
|
||||
if t["type"] >= 4 and t["type"] <= 8:
|
||||
last_term = t["type"]
|
||||
machine_count = t["machine_count"]
|
||||
if t["task_count"] <= 0 or t["machine_count"] <= 0:
|
||||
machine_count = -1
|
||||
else:
|
||||
machine_count = round_down(t["machine_count"] / t["task_count"], 1)
|
||||
|
||||
return str(jobid) + "," + str(last_term) + "," + str(machine_count)
|
||||
|
||||
|
@ -78,6 +89,7 @@ def cleanup(x):
|
|||
"time": int(x.time),
|
||||
"type": 0 if x.type is None else int(x.type),
|
||||
"id": x.jobid,
|
||||
"task_count": -1 if x.task_count is None else int(x.task_count),
|
||||
"machine_count": -1 if x.machine_count is None else int(x.machine_count)
|
||||
}
|
||||
|
||||
|
@ -87,6 +99,6 @@ df2 = df.rdd \
|
|||
.repartition(100) \
|
||||
.map(for_each_job) \
|
||||
.coalesce(1) \
|
||||
.saveAsTextFile(cluster + "_machine_count")
|
||||
.saveAsTextFile(cluster + "_machine_locality")
|
||||
|
||||
# vim: set ts=4 sw=4 et tw=80:
|
||||
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue