#!/usr/bin/env python3 # coding: utf-8 import os import json import pandas as pd import findspark findspark.init() import pyspark import pyspark.sql import sys import gzip from pyspark import AccumulatorParam from pyspark.sql.functions import lit from pyspark.sql import Window from pyspark.sql.types import * from decimal import * import os if len(sys.argv) is not 5: print(sys.argv[0] + " {cluster} {tmpdir} {maxram} {basedir}") sys.exit() cluster=sys.argv[1] DIR = os.path.dirname(__file__) NAME = "figure-9c-machine-count-" + cluster if os.path.exists(DIR + "/" + NAME + "-working"): print("already done") sys.exit() os.system("touch " + DIR + "/" + NAME + "-working") spark = pyspark.sql.SparkSession.builder \ .appName(NAME) \ .config("spark.driver.maxResultSize", "128g") \ .config("spark.local.dir", sys.argv[2]) \ .config("spark.driver.memory", sys.argv[3]) \ .getOrCreate() sc = spark.sparkContext # READING INSTANCE EVENTS DATA dfepath = sys.argv[4] + "/" + cluster + "/" + cluster + "_instance_events*.json.gz" #dfepath = sys.argv[4] + "/" + cluster + "/" + cluster + "_test.json" df = spark.read.json(dfepath) try: df["collection_type"] = df["collection_type"].cast(ByteType()) except: df = df.withColumn("collection_type", lit(None).cast(ByteType())) df = df.rdd.filter(lambda x: x.collection_id is not None and x.machine_id is not None and int(x.machine_id) is not -1 and int(x.machine_id) is not 0 and (x.collection_type == 0 or x.collection_type is None)) \ .map(lambda x: (x.collection_id, int(x.machine_id))) \ .groupBy(lambda x: x[0]) \ .map(lambda x: [x[0], len(set(x[1]))]) \ .toDF(["jobid", "machine_count"]) df.write.parquet(DIR + "/" + NAME + ".parquet") os.system("rm " + DIR + "/" + NAME + "-working") # vim: set ts=4 sw=4 et tw=120: