correction 9c

This commit is contained in:
Claudio Maggioni 2021-04-27 14:33:37 +00:00
parent a75b7b5d6c
commit 3417cae87a
22 changed files with 18772 additions and 71559 deletions

2
.gitignore vendored
View file

@ -4,5 +4,5 @@ task_slowdown/?_state_changes.json.gz
figure_9/*.parquet/ figure_9/*.parquet/
figure_9/?_task_count/ figure_9/?_task_count/
figure_9/?_machine_count/ figure_9/?_machine_locality/
table_iii/*.parquet/ table_iii/*.parquet/

View file

View file

@ -49,13 +49,14 @@ def tabid(x):
# #
# READING INSTANCE EVENTS DATA # READING INSTANCE EVENTS DATA
# #
dfepath = "/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz" #dfepath = "/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz"
dfepath = "/home/claudio/" + cluster + "/" + cluster + "_instance_events*.json.gz"
#dfepath = "/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_test.json" #dfepath = "/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_test.json"
df = spark.read.json(dfepath) df = spark.read.json(dfepath)
# READING MACHINE EVENTS DATA, sort them and save them as broadcast variable # READING MACHINE EVENTS DATA, sort them and save them as broadcast variable
print("Starting to read machine events...") print("Starting to read machine events...")
dfm = pd.read_csv("~/google_2019/machine_events/" + cluster + "_machine_events.csv", converters={ dfm = pd.read_csv("/home/claudio/google_2019/machine_events/" + cluster + "_machine_events.csv", converters={
'time': lambda x: -1 if x == '' else int(x), 'time': lambda x: -1 if x == '' else int(x),
'machine_id': lambda x: str(x), 'machine_id': lambda x: str(x),
'capacity.cpus': lambda x: -1 if x == '' else Decimal(x), 'capacity.cpus': lambda x: -1 if x == '' else Decimal(x),

View file

@ -42,7 +42,7 @@ dfe = spark.read.json(dfepath)
# READING INSTANCE USAGE DATA # READING INSTANCE USAGE DATA
# #
dfupath = "/home/claudio/google_2019/instance_usage/" + cluster dfupath = "/home/claudio/google_2019/instance_usage/" + cluster
dfupath += "/" + cluster + ("_instance_usage00000000000?.csv.gz" if TESTDATA else "_instance_usage*.csv.gz") dfupath += "/" + cluster + ("_instance_usage00000000000?.csv.gz" if TESTDATA else "_instance_usage*.json.gz")
usage_schema = StructType() \ usage_schema = StructType() \
.add("start_time", LongType(), True) \ .add("start_time", LongType(), True) \
.add("end_time", LongType(), True) \ .add("end_time", LongType(), True) \

View file

@ -2,6 +2,6 @@
i=$1 i=$1
cut ${i}_machine_count/part-00000 -d, -f2-3 | \ cut ${i}_machine_locality/part-00000 -d, -f2-3 | \
sort | uniq -c | sed 's/^\s*//' | tr ' ' ',' | \ sort | uniq -c | sed 's/^\s*//' | tr ' ' ',' | \
awk 'BEGIN {print "count,term,task_count"} {print $0}' > ${i}_term_machine_count.csv awk 'BEGIN {print "count,term,machine_locality"} {print $0}' > ${i}_term_machine_locality.csv

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -14,6 +14,7 @@ from pyspark.sql.functions import lit
from pyspark.sql import Window from pyspark.sql import Window
from pyspark.sql.types import ByteType from pyspark.sql.types import ByteType
from pyspark.sql.functions import col from pyspark.sql.functions import col
import math
if len(sys.argv) is not 4: if len(sys.argv) is not 4:
print(sys.argv[0] + " {cluster} {tmpdir} {maxram}") print(sys.argv[0] + " {cluster} {tmpdir} {maxram}")
@ -23,7 +24,6 @@ cluster=sys.argv[1]
findspark.init() findspark.init()
DIR = os.path.dirname(__file__) DIR = os.path.dirname(__file__)
NAME = cluster + "_figure9a.csv"
spark = pyspark.sql.SparkSession.builder \ spark = pyspark.sql.SparkSession.builder \
.appName("figure_9a_join") \ .appName("figure_9a_join") \
@ -51,10 +51,18 @@ df = df.filter(((col("collection_type").isNull()) | (col("collection_type") ==
jpath = DIR + "/figure-9c-machine-count-" + cluster + ".parquet" jpath = DIR + "/figure-9c-machine-count-" + cluster + ".parquet"
dfj = spark.read.parquet(jpath) dfj = spark.read.parquet(jpath)
jjpath = DIR + "/figure-9a-task-count-" + cluster + ".parquet"
dfjj = spark.read.parquet(jjpath)
df = df.join(dfj, 'jobid', 'left') df = df.join(dfj, 'jobid', 'left')
df = df.join(dfjj, "jobid")
MICROS = 1000000 MICROS = 1000000
def round_down(n, decimals=0):
multiplier = 10 ** decimals
return math.floor(n * multiplier) / multiplier
def for_each_job(data): def for_each_job(data):
jobid = data[0] jobid = data[0]
ts = data[1] ts = data[1]
@ -69,7 +77,10 @@ def for_each_job(data):
for i,t in enumerate(ts): for i,t in enumerate(ts):
if t["type"] >= 4 and t["type"] <= 8: if t["type"] >= 4 and t["type"] <= 8:
last_term = t["type"] last_term = t["type"]
machine_count = t["machine_count"] if t["task_count"] <= 0 or t["machine_count"] <= 0:
machine_count = -1
else:
machine_count = round_down(t["machine_count"] / t["task_count"], 1)
return str(jobid) + "," + str(last_term) + "," + str(machine_count) return str(jobid) + "," + str(last_term) + "," + str(machine_count)
@ -78,6 +89,7 @@ def cleanup(x):
"time": int(x.time), "time": int(x.time),
"type": 0 if x.type is None else int(x.type), "type": 0 if x.type is None else int(x.type),
"id": x.jobid, "id": x.jobid,
"task_count": -1 if x.task_count is None else int(x.task_count),
"machine_count": -1 if x.machine_count is None else int(x.machine_count) "machine_count": -1 if x.machine_count is None else int(x.machine_count)
} }
@ -87,6 +99,6 @@ df2 = df.rdd \
.repartition(100) \ .repartition(100) \
.map(for_each_job) \ .map(for_each_job) \
.coalesce(1) \ .coalesce(1) \
.saveAsTextFile(cluster + "_machine_count") .saveAsTextFile(cluster + "_machine_locality")
# vim: set ts=4 sw=4 et tw=80: # vim: set ts=4 sw=4 et tw=80:

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff