bachelorThesis/spatial_resource_waste/spatial_resource_waste/join.py

59 lines
1.4 KiB
Python

# vim: set ts=4 sw=4 et tw=80:
#!/usr/bin/env python3
# coding: utf-8
import json
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
import gzip
from pyspark import AccumulatorParam
from pyspark.sql.functions import lit
from pyspark.sql import Window
from pyspark.sql.types import *
from decimal import *
cluster=sys.argv[1]
schema_act = StructType() \
.add("id", StringType(), False) \
.add("cpu", DecimalType(38, 18), False) \
.add("ram", DecimalType(38, 18), False)
schema_term = StructType() \
.add("id", StringType(), False) \
.add("term", ByteType(), False)
spark = pyspark.sql.SparkSession.builder \
.appName("spatial_resource_waste_join") \
.config("spark.driver.maxResultSize", "256g") \
.config("spark.local.dir", "/tmp") \
.config("spark.driver.memory", "256g") \
.getOrCreate()
sc = spark.sparkContext
import os
dirname = os.path.dirname(__file__)
patha = os.path.join(dirname, cluster + "_res_micros_actual_per_job/part-*.gz")
patht = os.path.join(dirname, cluster + "_task_term_table/part-*.gz")
dfa = spark.read.format("csv") \
.option("header", False) \
.schema(schema_act) \
.load(patha)
dft = spark.read.format("csv") \
.option("header", False) \
.schema(schema_term) \
.load(patht)
dfj = dfa.join(dft, dfa.id == dft.id) \
.groupby("term") \
.sum("cpu", "ram") \
.coalesce(1)
dfj.write.csv(os.path.join(dirname, cluster + '_actual'))