bachelorThesis/table_iii/bigtable.py

88 lines
2.5 KiB
Python
Executable File

#!/usr/bin/env python3
# coding: utf-8
import json
import pandas
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
import gzip
import os
from pyspark import AccumulatorParam
from pyspark.sql.functions import lit
from pyspark.sql import Window
from pyspark.sql.types import ByteType
DIR = os.path.dirname(__file__)
if len(sys.argv) != 4:
print(sys.argv[0] + " {cluster} {tmpdir} {maxram}")
sys.exit()
cluster=sys.argv[1]
if os.path.exists(DIR + "/bigtable-" + cluster + "-working") or os.path.exists(DIR + "/bigtable-" + cluster + ".parquet"):
print("already launched")
sys.exit()
os.system("touch " + DIR + "/bigtable-" + cluster + "-working")
spark = pyspark.sql.SparkSession.builder \
.appName("bigtable") \
.config("spark.driver.maxResultSize", "32g") \
.config("spark.local.dir", sys.argv[2]) \
.config("spark.driver.memory", sys.argv[3]) \
.getOrCreate()
sc = spark.sparkContext
df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz")
#df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_test.json")
try:
df["collection_type"] = df["collection_type"].cast(ByteType())
except:
df = df.withColumn("collection_type", lit(None).cast(ByteType()))
def for_each_task(data):
rowids = data[0].split("-")
jobid = rowids[0]
taskid = rowids[1]
ts = data[1]
ts = sorted(ts, key=lambda x: x["time"])
last_term = -1
counts = [0] * 11
for i,t in enumerate(ts):
if t["type"] >= 4 and t["type"] <= 8:
last_term = t["type"]
counts[t["type"]] += 1
return (jobid, taskid, last_term, *counts)
def cleanup(x):
return {
"time": int(x.time),
"type": 0 if x.type is None else int(x.type),
"id": x.collection_id + "-" + x.instance_index,
"priority": -1 if x.priority is None else int(x.priority)
}
df = df.rdd \
.filter(lambda x: x.collection_type is None or x.collection_type == 0) \
.filter(lambda x: x.time is not None and x.instance_index is not None and x.collection_id is not None) \
.map(cleanup) \
.groupBy(lambda x: x["id"]) \
.repartition(1000) \
.map(for_each_task) \
.toDF(["jobid", "taskid", "task_term", *["count_" + str(x) for x in range(0,11)]])
df.write.parquet(DIR + "/bigtable-" + cluster + ".parquet")
os.system("rm " + DIR + "/bigtable-" + cluster + "-working")