From 637ee6818efc166f9d8d56f22587cc4c85e168ee Mon Sep 17 00:00:00 2001 From: "Claudio Maggioni (maggicl)" Date: Tue, 20 Apr 2021 18:25:11 +0200 Subject: [PATCH] Added 9a task count --- figure_9/figure9a.py | 51 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100755 figure_9/figure9a.py diff --git a/figure_9/figure9a.py b/figure_9/figure9a.py new file mode 100755 index 00000000..d55a6ca9 --- /dev/null +++ b/figure_9/figure9a.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 +# coding: utf-8 + +import os +import json +import pandas as pd +import findspark +findspark.init() +import pyspark +import pyspark.sql +import sys +import gzip +from pyspark import AccumulatorParam +from pyspark.sql.functions import lit +from pyspark.sql import Window +from pyspark.sql.types import * +from decimal import * + +if len(sys.argv) is not 5: + print(sys.argv[0] + " {cluster} {tmpdir} {maxram} {basedir}") + sys.exit() + +cluster=sys.argv[1] + +spark = pyspark.sql.SparkSession.builder \ + .appName("figure_9a") \ + .config("spark.driver.maxResultSize", "128g") \ + .config("spark.local.dir", sys.argv[2]) \ + .config("spark.driver.memory", sys.argv[3]) \ + .getOrCreate() +sc = spark.sparkContext + +# READING INSTANCE EVENTS DATA +#dfepath = sys.argv[4] + "/" + cluster + "/" + cluster + "_instance_events*.json.gz" +dfepath = sys.argv[4] + "/" + cluster + "/" + cluster + "_test.json" +df = spark.read.json(dfepath) + +try: + df["collection_type"] = df["collection_type"].cast(ByteType()) +except: + df = df.withColumn("collection_type", lit(None).cast(ByteType())) + +df = df.rdd.filter(lambda x: x.collection_id is not None and x.instance_index is not None and + (x.collection_type == 0)) \ + .map(lambda x: (x.collection_id, x.instance_index)) \ + .groupBy(lambda x: x[0]) \ + .mapValues(lambda x: len(x)) + +df.write.parquet("/home/claudio/raid0/figure-9a-task-count-" + cluster + ".parquet") + +# vim: set ts=4 sw=4 et tw=120: