#!/usr/bin/env python3 # coding: utf-8 import os import json import pandas as pd import findspark findspark.init() import pyspark import pyspark.sql import sys import gzip from pyspark import AccumulatorParam from pyspark.sql.functions import lit from pyspark.sql import Window from pyspark.sql.types import * from decimal import * if len(sys.argv) is not 5: print(sys.argv[0] + " {cluster} {tmpdir} {maxram} {basedir}") sys.exit() cluster=sys.argv[1] spark = pyspark.sql.SparkSession.builder \ .appName("figure_9a") \ .config("spark.driver.maxResultSize", "128g") \ .config("spark.local.dir", sys.argv[2]) \ .config("spark.driver.memory", sys.argv[3]) \ .getOrCreate() sc = spark.sparkContext # READING INSTANCE EVENTS DATA #dfepath = sys.argv[4] + "/" + cluster + "/" + cluster + "_instance_events*.json.gz" dfepath = sys.argv[4] + "/" + cluster + "/" + cluster + "_test.json" df = spark.read.json(dfepath) try: df["collection_type"] = df["collection_type"].cast(ByteType()) except: df = df.withColumn("collection_type", lit(None).cast(ByteType())) df = df.rdd.filter(lambda x: x.collection_id is not None and x.instance_index is not None and (x.collection_type == 0)) \ .map(lambda x: (x.collection_id, x.instance_index)) \ .groupBy(lambda x: x[0]) \ .mapValues(lambda x: len(x)) df.write.parquet("/home/claudio/raid0/figure-9a-task-count-" + cluster + ".parquet") # vim: set ts=4 sw=4 et tw=120: