# vim: set ts=4 sw=4 et tw=80: #!/usr/bin/env python3 # coding: utf-8 import json import findspark findspark.init() import pyspark import pyspark.sql import sys import gzip from pyspark import AccumulatorParam from pyspark.sql.functions import lit from pyspark.sql import Window from pyspark.sql.types import * from decimal import * cluster=sys.argv[1] schema_act = StructType() \ .add("id", StringType(), False) \ .add("cpu", DecimalType(38, 18), False) \ .add("ram", DecimalType(38, 18), False) schema_term = StructType() \ .add("id", StringType(), False) \ .add("term", ByteType(), False) spark = pyspark.sql.SparkSession.builder \ .appName("spatial_resource_waste_join") \ .config("spark.driver.maxResultSize", "256g") \ .config("spark.local.dir", "/tmp") \ .config("spark.driver.memory", "256g") \ .getOrCreate() sc = spark.sparkContext import os dirname = os.path.dirname(__file__) patha = os.path.join(dirname, cluster + "_res_micros_actual_per_job/part-*.gz") patht = os.path.join(dirname, cluster + "_task_term_table/part-*.gz") dfa = spark.read.format("csv") \ .option("header", False) \ .schema(schema_act) \ .load(patha) dft = spark.read.format("csv") \ .option("header", False) \ .schema(schema_term) \ .load(patht) dfj = dfa.join(dft, dfa.id == dft.id) \ .groupby("term") \ .sum("cpu", "ram") \ .coalesce(1) dfj.write.csv(os.path.join(dirname, cluster + '_actual'))