Added 9a task count

This commit is contained in:
Claudio Maggioni 2021-04-20 18:25:11 +02:00
parent 7346d7a785
commit 637ee6818e
1 changed files with 51 additions and 0 deletions

51
figure_9/figure9a.py Executable file
View File

@ -0,0 +1,51 @@
#!/usr/bin/env python3
# coding: utf-8
import os
import json
import pandas as pd
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
import gzip
from pyspark import AccumulatorParam
from pyspark.sql.functions import lit
from pyspark.sql import Window
from pyspark.sql.types import *
from decimal import *
if len(sys.argv) is not 5:
print(sys.argv[0] + " {cluster} {tmpdir} {maxram} {basedir}")
sys.exit()
cluster=sys.argv[1]
spark = pyspark.sql.SparkSession.builder \
.appName("figure_9a") \
.config("spark.driver.maxResultSize", "128g") \
.config("spark.local.dir", sys.argv[2]) \
.config("spark.driver.memory", sys.argv[3]) \
.getOrCreate()
sc = spark.sparkContext
# READING INSTANCE EVENTS DATA
#dfepath = sys.argv[4] + "/" + cluster + "/" + cluster + "_instance_events*.json.gz"
dfepath = sys.argv[4] + "/" + cluster + "/" + cluster + "_test.json"
df = spark.read.json(dfepath)
try:
df["collection_type"] = df["collection_type"].cast(ByteType())
except:
df = df.withColumn("collection_type", lit(None).cast(ByteType()))
df = df.rdd.filter(lambda x: x.collection_id is not None and x.instance_index is not None and
(x.collection_type == 0)) \
.map(lambda x: (x.collection_id, x.instance_index)) \
.groupBy(lambda x: x[0]) \
.mapValues(lambda x: len(x))
df.write.parquet("/home/claudio/raid0/figure-9a-task-count-" + cluster + ".parquet")
# vim: set ts=4 sw=4 et tw=120: