\documentclass{usiinfbachelorproject} \usepackage{multirow} \usepackage{ifthen} \usepackage{enumitem} \usepackage{fontawesome5} \usepackage{pgf} \usepackage{tikz} \usetikzlibrary{fit,arrows,calc,positioning} \usepackage{parskip} \usepackage{xcolor} \usepackage{amsmath} \usepackage{subcaption} \usepackage{graphicx} \usepackage[backend=biber,style=numeric,citestyle=ieee]{biblatex} \usepackage{booktabs} \usepackage{pgfplots} \usepackage{array} \newcolumntype{C}[1]{>{\centering\arraybackslash}p{#1}} \usepgfplotslibrary{external} \usepackage{titlesec} \newcommand{\sectionbreak}{\clearpage} \addbibresource{references.bib} \setlength{\parskip}{5pt} \setlength{\parindent}{0pt} \captionsetup{labelfont={bf}} \title{Understanding and Comparing Unsuccessful Executions in Large Datacenters} %\subtitle{The (optional) subtitle} \author{Claudio Maggioni} \versiondate{\today} \begin{committee} \advisor[Universit\`a della Svizzera Italiana, Switzerland]{Prof.}{Walter}{Binder} \assistant[Universit\`a della Svizzera Italiana, Switzerland]{Dr.}{Andrea}{Ros\`a} \end{committee} \abstract{The thesis aims at comparing two different traces coming from large datacenters, focusing in particular on unsuccessful executions of jobs and tasks submitted by users. The objective of this thesis is to compare the resource waste caused by unsuccessful executions, their impact on application performance, and their root causes. We show the strong negative impact on CPU and RAM usage and on task slowdown. We analyze patterns of unsuccessful jobs and tasks, focusing on their interdependency. Moreover, we uncover their root causes by inspecting key workload and system attributes such as machine locality and concurrency level.} \begin{document} \maketitle \tableofcontents \newpage \section{Introduction} In today's world there is an ever growing demand for efficient, large scale computations. The rising trend of ``big data'' puts the need for efficient management of large scaled parallelized computing at an all time high. This fact also increases the demand for research in the field of distributed systems, in particular in how to schedule computations effectively, avoid wasting resources and avoid failures. In 2011 Google released a month long data trace of their own cluster management system~\cite{google-marso-11} \textit{Borg}, containing a lot of data regarding scheduling, priority management, and failures of a real production workload. This data was the foundation of the 2015 Ros\`a et al.\ paper \textit{Understanding the Dark Side of Big Data Clusters: An Analysis beyond Failures}~\cite{dsn-paper}, which in its many conclusions highlighted the need for better cluster management highlighting the high amount of failures found in the traces. In 2019 Google released an updated version of the \textit{Borg} cluster traces~\cite{google-marso-19}, not only containing data from a far bigger workload due to improvements in computational technology, but also providing data from 8 different \textit{Borg} cells from datacenters located all over the world. %\subsection{Motivation} Even by glancing at some of the spatial and temporal analyses performed on the Google Borg traces in this report, it is evident that unsuccessful executions play a major role in the waste of resources in clusterized computations. For examples, Figure~\ref{fig:machinetimewaste-rel} shows the distribution of machine time over ``tasks'' (i.e.\ executables running in Borg) with different termination ``states'', of which \texttt{FINISH} is the only successful one. For the 2011 Borg traces we have that more than half of the machine time is invested in carrying out non-successful executions, i.e.\ executing programs that would eventually ``crash'' and potentially not leading to useful results\footnote{This is only a speculation, since both the 2011 and the 2019 traces only provide a ``black box'' view of the Borg cluster system. Neither of the accompanying papers for both traces~\cite{google-marso-11}~\cite{google-marso-19} or the documentation for the 2019 traces~\cite{google-drive-marso} ever mention if non-successful tasks produce any useful result.}. The 2019 subplot paints an even darker picture, with less than 5\% of machine time used for successful computation. Given that even a major player in big data computation like Google is struggling at efficiently allocating computational resources, the impact of execution failures is indeed significant and worthy of study. Given also the significance and data richness of both trace packages, the analysis performed in this report can be of interest for understanding the behaviour of failures in similar clusterized systems, and could potentially be used to build predictive models to mitigate or erase the resource impact of unsuccessful executions. %\subsection{Challenges} Given that the new 2019 Google Borg cluster traces are about 100 times larger than the 2011 ones, and given that the entire compressed traces package has a non-trivial size (weighing approximately 8 TiB~\cite{google-drive-marso}), the computations required to perform the analysis we illustrate in this report cannot be performed with classical data science techniques. A considerable amount of computational power was needed to carry out the computations, involving at their peek 3 dedicated Apache Spark servers over the span of 3 months. Additionally, the analysis scripts have been written by exploiting the power of parallel computing, following most of the time a MapReduce-like structure. %\subsection{Contribution} This project aims to repeat the analysis performed in 2015 DSN Ros\`a et al.\ paper~\cite{dsn-paper} to highlight similarities and differences in Google Borg workload and the behaviour and patterns of executions within it. Thanks to this analysis, we aim to understand even better the causes of failures and how to prevent them. Additionally, given the technical challenge this analysis posed, the report aims to provide an overview of some basic data engineering techniques for big data applications. %\subsection{Outline} The report is structured as follows. Section~\ref{sec2} contains information about the current state of the art for Google Borg cluster traces. Section~\ref{sec3} provides an overview including technical background information on the data to analyze and its storage format. Section~\ref{sec4} discusses about the project requirements and the data science methods used to perform the analysis. Section~\ref{sec5}, Section~\ref{sec6} and Section~\ref{sec7} show the result obtained while analyzing, respectively the performance input of unsuccessful executions, the patterns of task and job events, and the potential causes of unsuccessful executions. Finally, Section~\ref{sec8} concludes. \section{State of the Art}\label{sec2} \begin{figure}[t] \begin{center} \begin{tabular}{cc} \toprule \textbf{Cluster} & \textbf{Timezone} \\ \midrule A & America/New York \\ B & America/Chicago \\ C & America/New York \\ D & America/New York \\ E & Europe/Helsinki \\ F & America/Chicago \\ G & Asia/Singapore \\ H & Europe/Brussels \\ \bottomrule \end{tabular} \end{center} \caption{Approximate geographical location obtained from the datacenter's timezone of each cluster in the 2019 Google Borg traces.}\label{fig:clusters} \end{figure} In 2015, Rosà et al.\ published a research paper titled \textit{Understanding the Dark Side of Big Data Clusters: An Analysis beyond Failures}~\cite{dsn-paper} in which they performed several analyses on unsuccessful executions in the Google's 2011 Borg cluster traces with the aim of identifying their resource waste, their impacts on the performance of the application, and any causes that may lie behind such failures. The salient conclusion of that research is that actually lots of computations performed by Google would eventually end in failure, then leading to large amounts of computational power being wasted. However, with the release of the new 2019 traces~\cite{google-marso-19}, the results and conclusions found by that paper could be potentially outdated in the current large-scale computing world. The new traces not only provide updated data on Borg's workload, but provide more data as well: the new traces contain data from 8 different Borg ``cells'' (i.e.\ clusters) in datacenters across the world, from now on referred as ``Cluster A'' to ``Cluster H''. The geographical location of each cluster can be consulted in Figure~\ref{fig:clusters}. The information in that table was provided by the 2019 traces documentation~\cite{google-drive-marso}. The new 2019 traces provide richer data even on a cluster by cluster basis. For example, the amount and variety of server configurations per cluster increased significantly from 2011. An overview of the machine configurations in the cluster analyzed with the 2011 traces and in the 8 clusters composing the 2019 traces can be found in Figure~\ref{fig:machineconfigs} and in Figure~\ref{fig:machineconfigs-csts} on a cluster-by-cluster basis. \input{figures/machine_configs} There are two main works covering the new data, one being the paper \textit{Borg: The Next Generation}~\cite{google-marso-19}, which compares the overall features of the trace with the 2011 one~\cite{google-marso-11}~\cite{github-marso}, and one covering the features and performance of \textit{Autopilot}~\cite{james-muratore}, a software that provides autoscaling features in Borg. The new traces have also been analyzed from the execution priority perspective~\cite{down-under}, as well as from a cluster-by-cluster comparison~\cite{golf-course} given the multi-cluster nature of the new traces. Other studies have been performed in similar big-data systems focusing on the failure of hardware components and software bugs~\cite{9}~\cite{10}~\cite{11}~\cite{12}. However, the community has not yet performed any research on the new Borg traces analysing unsuccessful executions, their possible causes, and the relationships between tasks and jobs. Therefore, the only current research in this field is this very report, providing and update to the the 2015 Ros\`a et al.\ paper~\cite{dsn-paper} focusing on the new trace. \section{Background}\label{sec3} \textit{Borg} is Google's own cluster management software able to run thousands of different jobs. Among the various cluster management services it provides, the main ones are: job queuing, scheduling, allocation, and deallocation due to higher priority computations. The core structure of Borg is a cell, a set of machines usually all within the same cluster, whose work is allocated by the same cluster-management system and hence a cell is handled as a unit. Each cell may run large computational workload that is submitted to Borg. Such workload is called ``job'', which outlines the computations that a user wants to run and is made up of several ``tasks''. A task is an executable program, consisting of multiple processes, which has to be run on a single machine. Those tasks may be ran sequentially or in parallel, and the condition for a job's successful termination is nontrivial. Both tasks and jobs lifecyles are represented by several events, which are encoded and stored in the trace as rows of various tables. Among the information events provide, the field ``type'' provides information on the execution status of the job or task. We focus only on events whose ``types'' indicate a termination, i.e.\ the end of a task or job's execution. These termination event types are illustrated in Figure~\ref{fig:eventtypes}. We then define an unsuccessful execution to be an execution characterized by a termination event of type \texttt{EVICT}, \texttt{FAIL} or \texttt{KILL}. Conversely, a successful execution is characterized by a \texttt{FINISH} termination event. \subsection{Traces} The data relative to the events happening while Borg cell processes the workload is then encoded and stored as rows of several tables that make up a single usage trace. Such data comes from the information obtained by the cell's management system and single machines that make up the cell. Each table is identified by a key, usually a timestamp. In general events can be of two kinds, there are events that are relative to the status of the schedule, and there are other events that are relative to the status of a task itself. \begin{figure}[t] \begin{center} \begin{tabular}{p{3cm}p{12cm}} \toprule \textbf{Type code} & \textbf{Description} \\ \midrule \texttt{EVICT} & The job or task was terminated in order to free computational resources for an higher priority job\\ \texttt{FAIL} & The job or task terminated its execution unsuccesfully due to a failure\\ \texttt{FINISH} & The job or task terminated succesfully\\ \texttt{KILL} & The job or task terminated its execution because of a manual request to stop it\\ \bottomrule \end{tabular} \end{center} \caption{Overview of job and task termination event types.}\label{fig:eventtypes} \end{figure} Figure~\ref{fig:eventTypes} shows the expected transitions between event types. \begin{figure}[t] \centering \resizebox{\textwidth}{!}{% \includegraphics{./figures/event_types.png}} \caption{Typical transitions between task/job event types according to Google\label{fig:eventTypes}} \end{figure} \hypertarget{traces-contents}{% \subsection{Trace Contents}\label{traces-contents}} The traces provided by Google contain mainly a collection of job and task events spanning a month of execution of the 8 different clusters. In addition to this data, some additional data on the machines' configuration in terms of resources (i.e.~amount of CPU and RAM) and additional machine-related metadata. Due to Google's policy, most identification related data (like job/task IDs, raw resource amounts and other text values) were obfuscated prior to the release of the traces. One obfuscation that is noteworthy in the scope of this thesis is related to CPU and RAM amounts, which are expressed respetively in NCUs (\emph{Normalized Compute Units}) and NMUs (\emph{Normalized Memory Units}). NCUs and NMUs are defined based on the raw machine resource distributions of the machines within the 8 clusters. A machine having 1 NCU CPU power and 1 NMU memory size has the maximum amount of raw CPU power and raw RAM size found in the clusters. While RAM size is measured in bytes for normalization purposes, CPU power was measured in GCU (\emph{Google Compute Units}), a proprietary CPU power measurement unit used by Google that combines several parameters like number of processors and cores, clock frequency, and architecture (i.e.~ISA). \hypertarget{overview-of-traces-format}{% \subsection{Trace Format}\label{overview-of-traces-format}} The traces have a collective size of approximately 8TiB and are stored in a Gzip-compressed JSONL (JSON lines) format, which means that each table is represented by a single logical ``file'' (stored in several file segments) where each carriage return separated line represents a single record for that table. There are namely 5 different table ``files'': \begin{description} \item[\texttt{machine\_configs},] which is a table containing each physical machine's configuration and its evolution over time; \item[\texttt{instance\_events},] which is a table of task events; \item[\texttt{collection\_events},] which is a table of job events; \item[\texttt{machine\_attributes},] which is a table containing (obfuscated) metadata about each physical machine and its evolution over time; \item[\texttt{instance\_usage},] which contains resource (CPU/RAM) measures of jobs and tasks running on the single machines. \end{description} The scope of this thesis focuses on the tables \texttt{machine\_configs}, \texttt{instance\_events} and \texttt{collection\_events}. \hypertarget{remark-on-traces-size}{% \subsection{Remark on Trace Size}\label{remark-on-traces-size}} While the 2011 Google Borg traces were relatively small, with a total size in the order of the tens of gigabytes, the 2019 traces are quite challenging to analyze due to their sheer size. As stated before, the traces have a total size of 8 TiB when stored in the format provided by Google. Even when broken down to table ``files'', unitary sizes still reach the single tebibyte mark (namely for \texttt{machine\_configs}, the largest table in the trace). Due to this constraints, a careful data engineering based approach was used when reproducing the 2015 DSN paper analysis. Bleeding edge data science technologies like Apache Spark were used to achieve efficient and parallelized computations. This approach is discussed with further detail in the following section. \section{Project Requirements and Analysis Methodology}\label{sec4} The aim of this project is to repeat the analysis performed in 2015 on the dataset Google has released in 2019 in order to find similarities and differences with the previous analysis, and ultimately find whether computational power is indeed wasted in this new workload as well. The 2019 data comes from 8 Borg cells spanning 8 different datacenters located in different geographical positions, all focused on computational oriented workloads. The data collection time span matches the entire month of May 2019. Due to the inherent complexity in analyzing traces of this size, non-trivial data engineering tecniques were adopted to performed the required computations. We used the framework Apache Spark to perform efficient and parallel Map-Reduce computations. In this section, we discuss the technical details behind our approach. \hypertarget{introduction-on-apache-spark}{% \subsection{Apache Spark}\label{introduction-on-apache-spark}} Apache Spark is a unified analytics engine for large-scale data processing. In layman's terms, Spark is really useful to parallelize computations in a fast and streamlined way. In the scope of this thesis, Spark was used essentially as a Map-Reduce framework for computing aggregated results on the various tables. Due to the sharded nature of table ``files'', Spark is able to spawn a thread per file and run computations using all processors on the server machines used to run the analysis. Spark is also quite powerful since it provides automated thread pooling services, and it is able to efficiently store and cache intermediate computation on secondary storage without any additional effort required from the data engineer. This feature was especially useful due to the sheer size of the analyzed data, since the computations required to store up to 1TiB of intermediate data on disk. The chosen programming language for writing analysis scripts was Python. Spark has very powerful native Python bindings in the form of the \emph{PySpark} API, which were used to implement the various queries. \hypertarget{query-architecture}{% \subsection{Query Architecture}\label{query-architecture}} \subsubsection{Overview} In general, each query written to execute the analysis follows a Map-Reduce template. Traces are first read, then parsed, and then filtered by performing selections, projections and computing new derived fields. After this preparation phase, the trace records are often passed through a \texttt{groupby()} operation, which by choosing one or many record fields sorts all the records into several ``bins'' containing records with matching values for the selected fields. Then, a map operation is applied to each bin in order to derive some aggregated property value for each grouping. Finally, a reduce operation is applied to either further aggregate those computed properties or to generate an aggregated data structure for storage purposes. \subsubsection{Parsing Table Files} As stated before, table ``files'' are composed of several Gzip-compressed shards of JSONL record data. The specification for the types and constraints of each record is outlined by Google in the form of a protobuffer specification file found in the trace release package~\cite{google-proto-marso}. This file was used as the oracle specification and was a critical reference for writing the query code that checks, parses and carefully sanitizes the various JSONL records prior to actual computations. The JSONL encoding of traces records is often performed with non-trivial rules that required careful attention. One of these involved fields that have a logically-wise ``zero'' value (i.e.~values like ``0'' or the empty string). For these values the key-value pair in the JSON object is outright omitted. When reading the traces in Apache Spark is therefore necessary to check for this possibility and insert back the omitted record attributes. \subsubsection{The Queries} Most queries use only two or three fields in each trace records, while the original table records often are made of a couple of dozen fields. In order to save memory during the query, a projection is often applied to the data by the means of a \texttt{.map()} operation over the entire trace set, performed using Spark's RDD API. Another operation that is often necessary to perform prior to the Map-Reduce core of each query is a record filtering process, which is often motivated by the presence of incomplete data (i.e.~records which contain fields whose values is unknown). This filtering is performed using the \texttt{.filter()} operation of Spark's RDD API. The core of each query is often a \texttt{groupby()} followed by a \texttt{map()} operation on the aggregated data. The \texttt{groupby()} groups the set of all records into several subsets of records each having something in common. Then, each of this small clusters is reduced with a \texttt{map()} operation to a single record. The motivation behind this way of computing data is that for the analysis in this thesis it is often necessary to analyze the behaviour w.r.t. time of either task or jobs by looking at their events. These queries are therefore implemented by \texttt{groupby()}-ing records by task or job, and then \texttt{map()}-ing each set of event records sorting them by time and performing the desired computation on the obtained chronological event log. Sometimes intermediate results are saved in Spark's parquet format in order to compute and save intermediate results beforehand. \subsection{Query Script Design and the \textit{Task Slowdown} Script} In this section we aim to show the general complexity behind the implementations of query scripts by explaining in detail some sampled scripts to better appreciate their behaviour. One example of analysis script with average complexity and a pretty straightforward structure is the pair of scripts \texttt{task\_slowdown.py} and \texttt{task\_slowdown\_table.py} used to compute the ``task slowdown'' tables (namely the tables in Figure~\ref{fig:taskslowdown}). ``Slowdown'' is a task-wise measure of wasted execution time for tasks with a \texttt{FINISH} termination type. It is computed as the total execution time of the task divided by the execution time actually needed to complete the task (i.e.\ the total time of the last execution attempt, successful by definition). The analysis requires to compute the mean task slowdown for each task priority value, and additionally compute the percentage of tasks with successful terminations per priority. The query therefore needs to compute the execution time of each execution attempt for each task, determine if each task has successful termination or not, and finally combine this data to compute slowdown, mean slowdown and ultimately the final table found in Figure~\ref{fig:taskslowdown}. \begin{figure}[t] \hspace{-0.075\textwidth} \includegraphics[width=1.15\textwidth]{figures/task_slowdown_query.png} \caption{Diagram of the script used for the ``task slowdown'' query.}\label{fig:taskslowdownquery} \end{figure} Figure~\ref{fig:taskslowdownquery} shows a schematic representation of the query structure. The query first starts reading the \texttt{instance\_events} table, which contains (among other data) all task event logs containing properties, event types and timestamps. As already explained in the previous section, the logical table file is actually stored as several Gzip-compressed JSONL shards. This is very useful for processing purposes, since Spark is able to parse and load in memory each shard in parallel, i.e.\ using all processing cores on the server used to run the queries. After loading the data, a selection and a projection operation are performed in the preparation phase so as to ``clean up'' the records and fields that are not needed, leaving only useful information to feed in the ``group by'' phase. In this query, the selection phase removes all records that do not represent task events or that contain an unknown task ID or a null event timestamp. In the 2019 traces it is quite common to find incomplete records, since the log process is unable to capture the sheer amount of events generated by all jobs in a exact and deterministic fashion. Then, after the preparation stage is complete, the task event records are grouped in several bins, one per task ID\@. Performing this operation the collection of unsorted task event types is rearranged to form groups of task events all relating to a single task. These obtained collections of task events are then sorted by timestamp and processed to compute intermediate data relating to execution attempt times and task termination counts. After the task events are sorted, the script iterates over the events in chronological order, storing each execution attempt time and registering all execution termination types by checking the event type field. The task termination is then equal to the last execution termination type, following the definition originally given in the 2015 Ros\`a et al. DSN paper. If the task termination is determined to be unsuccessful, the tally counter of task terminations for the matching task property is increased. Otherwise, all the task termination attempt time deltas are returned. Tallies and time deltas are saved in an intermediate time file for fine-grained processing. Finally, the \texttt{task\_slowdown\_table.py} processes this intermediate results to compute the percentage of successful tasks per execution and computing slowdown values given the previously computed execution attempt time deltas. Finally, the mean of the computed slowdown values is computed resulting in the clear and coincise tables found in Figure~\ref{fig:taskslowdown}. \section{Analysis: Performance Input of Unsuccessful Executions}\label{sec5} Our first investigation focuses on replicating the analysis done by the paper of Ros\`a et al.\ paper~\cite{dsn-paper} regarding usage of machine time and resources. In this section we perform several analyses focusing on how machine time and resources are wasted, by means of a temporal vs.\ spatial resource analysis from the perspective of single tasks as well as jobs. We then compare the results from the 2019 traces to the ones that were obtained before to understand the workload evolution inside Borg between 2011 and 2019. We discover that the spatial and temporal impact of unsuccessful executions is very significant, more than in the 2011 traces. In particular, resource usage is overall dominated by tasks with a final \texttt{KILL} termination event. \subsection{Temporal Impact: Machine Time Waste} \input{figures/machine_time_waste} The goal of this analysis is to understand how much time is spent in doing useless computations by exploring how machine time is distributed over task events and submissions. Before delving into the analysis itself, we define three kinds of events in a task's lifecycle: \begin{description} \item[submission:] when a task is added or re-added to the Borg system queue, waiting to be scheduled; \item[scheduling:] when a task is removed from the Borg queue and its actual execution of potentially useful computations starts; \item[termination:] when a task terminates its computations either successfully or unsuccessfully. \end{description} By partitioning the set of all terminating tasks by their termination event, the analysis aims to measure the total time spent by tasks in 3 different execution phases: \begin{description} \item[resubmission time:] the total of all time intervals between every task termination event and the immediately succeding task submission event, i.e.\ the total time spent by tasks waiting to be resubmitted in Borg after a termination; \item[queue time:] the total of all time intervals between every task submission event and the following task scheduling event, i.e.\ the total time spent by tasks queuing before execution; \item[running time:] the total of all time intervals between every task scheduling event and the following task termination event, i.e.\ the total time spent by tasks ``executing'' (i.e.\ performing potentially useful computations) in the clusters. \end{description} In the 2019 traces, an additional ``Unknown'' measure is counted. This measure collects all the times in which the event transitions between the register events do not allow to safely assume in which execution phase a task may be. Unknown measures are mostly caused by faults and missed event writes in the task event log that was used to generate the traces. The analysis results are depicted in Figure~\ref{fig:machinetimewaste-rel} as a comparison between the 2011 and 2019 traces, aggregating the data from all clusters. Additionally, in Figure~\ref{fig:machinetimewaste-rel-csts} cluster-by-cluster breakdown result is provided for the 2019 traces. The striking difference between 2011 and 2019 data is in the machine time distribution per task termination type. In the 2019 traces, 94.38\% of global machine time is spent on tasks that are eventually \texttt{KILL}ed. \texttt{FINISH}, \texttt{EVICT} and \texttt{FAIL} tasks respectively register totals of 4.20\%, 1.18\% and 0.25\% machine time. Considering instead the distribution between execution phase times, the comparison shows very similar behaviour between the two traces, having the ``Running'' time being dominant (at a total of 16.63\% across task terminations in 2019) over the queue and resubmission phases (with respective totals in 2019 of 3.26\% and 0.004\%). However, another noteworthy difference between 2011 and 2019 data lies in the new ``Unknown'' trace dataset present only in the latter traces, registering a total 80.12\% of global machine time across al terminations. This data can be interpreted as a strong indication of the ``poor quality'' of the 2019 traces w.r.t.\ of accuracy of task event logging. Considering instead the behaviour of each single cluster in the 2019 traces, no significant difference beween them can be observed. The only notable difference lies between the ``Running time''-``Unknown time'' ratio in \texttt{KILL}ed tasks, which is at its highest in cluster A (at 30.78\% by 58.71\% of global machine time) and at its lowest in cluster H (at 8.06\% by 84.77\% of global machine time). The takeaway from this analysis is that in the 2019 traces a lot of computation time is wasted in the execution of tasks that are eventually \texttt{KILL}ed, i.e.\ unsuccessful. \subsection{Average Slowdown per Task} \input{figures/task_slowdown} This analysis aims to measure the average of an ad-hoc defined parameter we call ``slowdown''. We define it as the ratio between the total response time across all executions of the task and the response time (i.e.\ queue time and running time) of the last execution of said task. This metric is especially useful to analyze the impact of unsuccesful executions on each task total execution time w.r.t.\ the intrinsic workload (i.e.\ computational time) of tasks. Refer to Figure~\ref{fig:taskslowdown} for a comparison between the 2011 and 2019 mean task slowdown measures broke down by task priority. Additionally, said means are computed on a cluster-by-cluster basis for 2019 data in Figure~\ref{fig:taskslowdown-csts}. In 2015 Ros\`a et al.~\cite{dsn-paper} measured mean task slowdown per each task priority value, which at the time were numeric values between 0 and 11. However, in 2019 traces, task priorities are given as a numeric value between 0 and 500. Therefore, to allow an easier comparison, mean task slowdown values are computed by task priority tier over the 2019 data. Priority tiers are semantically relevant priority ranges defined by Tirmazi et al.\ in 2020~\cite{google-marso-19} that introduced the 2019 traces. Equivalent priority tiers are also provided next to the 2011 priority values in the table covering the 2015 analysis. In the given tables, the \textbf{\% finished} column corresponds to the percentage of \texttt{FINISH}ed tasks for that priority or tier. \textbf{Mean response [s] (last execution)} instead shows the mean response time of the last task execution of each task in that priority/tier. \textbf{Mean response [s] (all executions)} provides a very similar figure, though this column shows the mean response time across all executions. \textbf{Mean slowdown} instead provides the mean slowdown value for each task priority/tier. Comparing the tables in Figure~\ref{fig:taskslowdown} we observe that the maximum mean slowdown measure for 2019 data (i.e.\ 7.84, for the BEB tier) is almost double of the maximum measure in 2011 data (i.e.\ 3.39, for priority $3$ corresponding to the BEB tier). The ``Best effort batch'' tier, as the name suggest, is a lower priority tier where failures are more tolerated. Therefore, due to the increased concurrency in the 2019 clusters compared to 2011 and the higher machine time spent for unsuccesful executions (as observed in the previous analysis) and increase slowdown rate for this class is not particularly surprising. The amount of non-successful task terminations in the 2019 traces is also rather high when compared to 2011 data, as it can evinced by the low percentage of \texttt{FINISH}ed tasks across priority tiers. Another noteworthy difference is in the mean response times for all and last executions: while the mean response is overall shorter in time in the 2019 traces by an order of magnitude, the new traces show an overall significantly higher mean response time than in the 2011 data. Across 2019 single clusters (as in Figure~\ref{fig:taskslowdown-csts}), the data shows a mostly uniform behaviour, other than for some noteworthy mean slowdown spikes. Indeed, cluster A has 82.97 mean slowdown in the ``Free'' tier, cluster G has 19.06 and 14.57 mean slowdown in the ``BEB'' and ``Production'' tier respectively, and Cluster D has 12.04 mean slowdown in its ``Free'' tier. \subsection{Spatial Impact: Resource Waste} \input{figures/spatial_resource_waste} In this analysis we aim to understand how physical resources of machines in the Borg cluster are used to complete tasks. In particular, we compare how CPU and memory resource allocation and usage are distributed among tasks based on their termination type. Due to limited computational resources w.r.t.\ the data analysis process, the resource usage for clusters E to H in the 2019 traces is missing. However, a comparison between 2011 resource usage and the aggregated resource usage of clusters A to D in the 2019 traces can be found in Figure~\ref{fig:spatialresourcewaste-actual}. Additionally, a cluster-by-cluster breakdown for the 2019 data can be found in Figure~\ref{fig:spatialresourcewaste-actual-csts}. From these figures it is clear that, compared to the relatively even distribution of used resources in the 2011 traces, the distribution of resources in the 2019 Borg clusters became strikingly uneven, registering a combined 86.29\% of CPU resource usage and 84.86\% memory usage for \texttt{KILL}ed tasks. Instead, all other task termination types have a significantly lower resource usage: \texttt{EVICT}ed, \texttt{FAIL}ed and \texttt{FINISH}ed tasks register respectively 8.53\%, 3.17\% and 2.02\% CPU usage and 9.03\%, 4.45\%, and 1.66\% memory usage. This resource distribution can also be found in the data from individual clusters in Figure~\ref{fig:spatialresourcewaste-actual-csts}, with always more than 80\% of resources devoted to \texttt{KILL}ed tasks. Considering now requested resources instead of used ones, a comparison between 2011 and the aggregation of all A-H clusters of the 2019 traces can be found in Figure~\ref{fig:spatialresourcewaste-requested}. Additionally, a cluster-by-cluster breakdown for single 2019 clusters can be found in Figure~\ref{fig:spatialresourcewaste-requested-csts}. Here \texttt{KILL}ed jobs dominate even more the distribution of resources, reaching a global 97.21\% of CPU allocation and a global 96.89\% of memory allocation. Even in allocations, the \texttt{KILL} lead is followed by (in order) \texttt{EVICT}ed, \texttt{FAIL}ed and \texttt{FINISH}ed jobs, with respective CPU allocation figures of 2.73\%, 0.06\% and 0.0012\% and memory allocation figures of 3.04\%, 0.06\% and 0.012\%. Behaviour across clusters (as evinced in Figure~\ref{fig:spatialresourcewaste-requested-csts}) in terms of requested resources is pretty homogeneous, with the exception of cluster A having a relatively high 2.85\% CPU and 3.42\% memory resource requests from \texttt{EVICT}ed tasks and cluster E having a noteworthy 1.67\% CPU and 1.31\% memory resource resquests from \texttt{FINISH}ed tasks. With more than 98\% of both CPU and memory resources used by (and more than 99.99\% of both CPU and memory resources requested by) non-successful tasks, it is clear the spatial resource waste is high in the 2019 traces. \section{Analysis: Patterns of Task and Job Events}\label{sec6} This section aims to use some of the tecniques used in section IV of the Ros\`a et al.\ paper~\cite{dsn-paper} to find patterns and interpendencies between task and job events by gathering event statistics at those events. In particular, Section~\ref{tabIII-section} explores how the success of a task is inter-correlated with its own event patterns, which Section~\ref{figV-section} explores even further by computing task success probabilities based on the number of task termination events of a specific type. Finally, Section~\ref{tabIV-section} aims to find similar correlations, but at the job level. The results found the the 2019 traces seldomly show the same patterns in terms of task events and job/task distributions, in particular highlighting again the overall non-trivial impact of \texttt{KILL} events, no matter the task and job termination type. \subsection{Unsuccessful Task Event Patterns}\label{tabIII-section} \input{figures/table_iii} In this analysis we compute the distribution of termination events by type at the task-level events, namely \texttt{EVICT}, \texttt{FAIL}, \texttt{FINISH} and \texttt{KILL} termination events. A comparison of the termination event distribution between the 2011 and 2019 traces is shown in Figure~\ref{fig:tableIII}. Additionally, a cluster-by-cluster breakdown of the same data for the 2019 traces is shown in Figure~\ref{fig:tableIII-csts}. Each table from these figures shows the mean and the 95-th percentile of the number of termination events per task, broke down by task termination. In addition, the table shows the mean number of \texttt{EVICT}, \texttt{FAIL}, \texttt{FINISH}, and \texttt{KILL} for each task event termination. The first observation we make is that the mean number of events per \texttt{EVICT}ed and \texttt{FAIL}ed tasks increased more than 5-fold (namely from 2.372 to 78.710 and from 3.130 to 24.962 respectively). Also observing the 95-th percentile we can say that the number of events per task has generally increased overall. As observed in 2011, 2019 Borg tasks have all a multitude of events with different types, with \texttt{FINISH}ed tasks experiencing almost always \texttt{FINISH} events and unsuccessful tasks and the same observation holding for \texttt{KILL}ed tasks and their \texttt{KILL} events. Differently from the 2011 data, \texttt{EVICT}ed tasks seem to experience an high number of \texttt{KILL} events as well (25.795 on average per task, over 78.710 overall events on average). A similar phenomena can be observed with \texttt{KILL}ed jobs and their \texttt{EVICT} events (1.876 on average per task with a 8.763 event overall average). Considering cluster-by-cluster behaviour in the 2019 traces (as reported in Figure~\ref{fig:tableIII-csts}) the general observations still hold for each cluster, albeit with event count averages having different magnitudes. Notably, cluster E registers the highest per-event average, with \texttt{FAIL}ed tasks experiencing 111.471 \texttt{FAIL} events out of \texttt{112.384}. \subsection{Conditional Probability of Task Success}\label{figV-section} \input{figures/figure_5} In this analysis we measure the conditional probability of task success given a number of specific unsuccessful (i.e. \texttt{EVICT}, \texttt{FAIL} and \texttt{KILL}) events. This analysis was conducted to better understand how a given number of unsuccessful events could affect the termination of the task it belongs to. Conditional probabilities of each unsuccessful event type are shown in the form of a plot in Figure~\ref{fig:figureV}, comparing the 2011 traces with the overall data from the 2019 ones, and in Figure~\ref{fig:figureV-csts}, as a cluster-by-cluster breakdown of the same data for the 2019 traces. In Figure~\ref{fig:figureV} the 2011 and 2019 plots differ in their x-axis: for 2011 data conditional probabilities are computed for a maximum event count of 30, while for 2019 data are computed for up to 50 events of a specific kind. Nevertheless, another quite striking difference between the two plots can be seen: while 2011 data has relatively smooth decreasing curves for all event types, the curves in the 2019 data almost immediately plateau with no significant change easily observed after 5 events of any kind. The presence of even one \texttt{KILL} event almost surely causes the corresponding task to terminate in an unsuccessful way: a task with no \texttt{KILL} events has 97.16\% probability of success, but tasks with 1 to 5 \texttt{KILL} events have 0.02\%, 0.20\%, 0.44\%, 0.04\%, and 0.07\% probabilities of success respectively. The same effect can be observed, albeit in a less drastic fashion, for the \texttt{EVICT} and \texttt{FAIL} curves. The \texttt{EVICT} curve has for tasks with 0 to 5 kill events 19.70\%, 15.94\%, 1.94\%, 1.67\%, 0.35\% and 0.00\% success probabilities repectively. The \texttt{FAIL} probability curve has instead 18.55\%, 1.79\%, 14.49\%, 2.08\%, 2.40\%, and 1.29\% success probabilities for the same range. Considering cluster-to-cluster behaviour in the 2019 traces (as shown in Figure~\ref{fig:figureV-csts}), some clusters show quite similar behaviour to the aggregated plot (namely clusters A, F, and H), while some other clusters show very oscillating probability distribution function curves for \texttt{EVICT} and \texttt{FINISH} curves. \texttt{KILL} behaviour is instead homogeneous even on a single cluster basis. \subsection{Unsuccessful Job Event Patterns}\label{tabIV-section} \input{figures/table_iv} The analysis uses very similar techniques to the ones used in Section~\ref{tabIII-section}, but focusing at the job level instead. The aim is to better understand the task-job level relationship and to understand how task-level termination events can influence the termination state of a job. A comparison of the analyzed parameters between the 2011 and 2019 traces is shown in Figure~\ref{fig:tableIV}. Additionally, a cluster-by-cluster breakdown of the same data for the 2019 traces is shown in Figure~\ref{fig:tableIV-csts}. Considering the distribution of number of tasks in a job, the 2019 traces show a decrease for the mean figure (e.g.\ for \texttt{FAIL}ed jobs, with a mean 60.5 tasks per job in 2011 and a mean 43.126 tasks per job in 2019) and a fluctuation of the 95-th percentile figure (e.g.\ for \texttt{FAIL}ed jobs it rose from 110 to 200, but for \texttt{KILL}ed job the figure decreased from 400 to 178). Considering the distribution of the number of task-wise termination events instead, the 2019 traces show values generally one or two orders of magnitude below the ones in 2011. While the behaviour of \texttt{EVICT}ed jobs stays the same, \texttt{FAIL}ed and \texttt{KILL}ed jobs show a dramatic difference in the event distribution, with \texttt{KILL} becoming the most popular event task-wise with mean 12.833 and 11.337 task events per job respectively. Finally, the \texttt{FINISH}ed job category has a new event distribution too, with \texttt{FINISH} task events being the most popular at 1.778 events per job in the 2019 traces. The cluster-by-cluster comparison in Figure~\ref{fig:tableIV-csts} shows that the number of tasks per job are generally distributed similarly to the aggregated data, with only cluster H having remarkably low mean and 95-th percentiles overall. Event-wise, for \texttt{EVICT}ed, \texttt{FINISH}ed, and \texttt{KILL}ed jobs again the distributions are similar to the aggregated one. For some clusters (namely B, C, and D), the mean number of \texttt{FAIL} and \texttt{KILL} task events for \texttt{FINISH}ed jobs is almost the same. Additionally, it is noteworthy that cluster A has no \texttt{EVICT}ed jobs. \section{Analysis: Potential Causes of Unsuccessful Executions}\label{sec7} This section re-applies the tecniques used in Section V of the Ros\`a et al.\ paper~\cite{dsn-paper} to find causes for unsuccessful events related to task-level parameters (analyzed in Section~\ref{fig7-section}), usage of machine resources by tasks (analyzed in Section~\ref{fig8-section}), and job-level parameters (analyzed in Section~\ref{fig9-section}). In all the analyses we use the ``event rate'' metric, which represents the relative percentage of termination type events over a certain task/job parameter configuration. We compute this metric for all the possible terminations (i.e.\ \texttt{EVICT}, \texttt{FAIL}, \texttt{FINISH} and \texttt{KILL}) in order to find correlations with the several trace parameters. \subsection{Task Event Rates vs.\ Task Priority, Event Execution Time, and Machine Concurrency.}\label{fig7-section} \input{figures/figure_7} This analysis shows event rates (i.e.\ the relative percentage of termination type events) over different configurations of task-level parameters. Figure~\ref{fig:figureVII-a} and Figure~\ref{fig:figureVII-a-csts} show the distribution of event rates over the various task priority tiers. Figure~\ref{fig:figureVII-b} and Figure~\ref{fig:figureVII-b-csts} show the distribution of event rates over the total event execution time. Finally, Figure~\ref{fig:figureVII-c} and Figure~\ref{fig:figureVII-c-csts} show the distribution of event rates over the metric of machine concurrency, defined as the number of co-executing tasks on the machine and at the moment the termination event is recorded. From this analysis we can make the following observations: \begin{itemize} \item The behaviour of the curves in the task priority distributions (in Figure~\ref{fig:figureVII-a} and Figure~\ref{fig:figureVII-a-csts}) for the 2019 traces is almost the opposite of the 2011 ones, i.e.\ in-between priorities have higher kill rates while priorities at the extremum have lower kill rates; \item The event execution time curves (in Figure~\ref{fig:figureVII-b} and Figure~\ref{fig:figureVII-b-csts}) for the 2019 traces are quite different than 2011 ones, here it seems there is a good correlation between short task execution times and finish event rates, instead of the ``U shape'' curve found in the Ros\`a et al.\ 2015 DSN paper~\cite{dsn-paper}; \item The behaviour among different clusters for the event execution time distributions in Figure~\ref{fig:figureVII-b-csts} seem quite uniform; \item The machine concurrency metric, for which a distribution of event rates is computed in Figure~\ref{fig:figureVII-c} and Figure~\ref{fig:figureVII-c-csts}, seems to play little role in the event termination distribution, as for all concurrency factors the \texttt{KILL} event rate is around 90\% with little fluctuation. \end{itemize} \subsection{Task Event Rates vs.\ Requested Resources, Resource Reservation, and Resource Utilization}\label{fig8-section} \input{figures/figure_8} This analysis is concerned with the distribution of event rates over several resources related parameters. Figure~\ref{fig:figureVIII-a} and Figure~\ref{fig:figureVIII-a-csts} show the distribution of task event rates w.r.t.\ the amount of CPU the task has requested, while Figure~\ref{fig:figureVIII-b} and Figure~\ref{fig:figureVIII-b-csts} show task events rates vs.\ requested memory. Figure~\ref{fig:figureVIII-c} and Figure~\ref{fig:figureVIII-c-csts} show the distribution of task event rates w.r.t.\ the amount of CPU that has collectively requested on the machine where the task is running, while Figure~\ref{fig:figureVIII-d} and Figure~\ref{fig:figureVIII-d-csts} show a similar distribution but for memory. Finally Figure~\ref{fig:figureVIII-e} and Figure~\ref{fig:figureVIII-e-csts} show the distribution of task event rates w.r.t.\ the amount of CPU the task has really been utilized, while Figure~\ref{fig:figureVIII-f} and Figure~\ref{fig:figureVIII-f-csts} show task events rates vs.\ used memory. From this analysis we can make the following observations: \begin{itemize} \item In the 2019 trace, the amount of requested CPU resources seem to play little effect on the job termination as it can evinced in Figure~\ref{fig:figureVIII-a} and Figure~\ref{fig:figureVIII-a-csts}. Instead, the job rate distributions w.r.t.\ the amount of requested memory (Figure~\ref{fig:figureVIII-b} and Figure~\ref{fig:figureVIII-b-csts}) show no discernable pattern; \item Overall a significant increment in the killed event rate can be observed. They seem to dominate all event rates measures; \item Among all clusters in Figure~\ref{fig:figureVIII-a-csts} there can be noted the dominance of the killed event rate. In 2011, it was observed a more dominant behaviour by the success event rate curve; \item For each analysed distribution, clusters do not show a common behaviour of the curves. Some are similar, but they are generally distinguishable; \item In Figure~\ref{fig:figureVIII-e} there can be seen that while a drastic decrease of the killed event rate curve is observed as the CPU utilization increases, the success event rate does not increase much. \end{itemize} \subsection{Job Event Rates vs.\ Job Size, Job Execution Time, and Machine Locality}\label{fig9-section} This analysis shows job event rates (i.e.\ the relative percentage of termination type events) over different configurations of job size, job execution time and machine locality. Figure~\ref{fig:figureIX-a} and Figure~\ref{fig:figureIX-a-csts} provide the plots of job event rates versus the job size. Job size is defined as the number of tasks belonging to the job. Figure~\ref{fig:figureIX-b} and Figure~\ref{fig:figureIX-b-csts} provide the plots of the job event rates versus execution time. Figure~\ref{fig:figureIX-c} and Figure~\ref{fig:figureIX-c-csts} provide the plots of the job event rates versus machine locality. Machine locality is defined as the ratio between the number of machines used to execute the tasks inside the job and the job size. By analysing these plots, we can make the following observations: \begin{itemize} \item There can be noted significant variations in the behaviour of the curves between clusters; \item There are no smooth gradients in the various curves unlike in the 2011 traces; \item Killed jobs have higher event rates in general, and overall dominate all event rates measures. As can be seen in Figure~\ref{fig:figureIX-a}, an higher number of tasks (i.e., an higher job size) seems to be correlated to an higher killed event rate in 2019 rather than in 2011. In Figure~\ref{fig:figureIX-b}, we observe the best success event rate for a job execution time of 4-10 minutes, while in 2011, it seemed that the finish event rate increases along with the job execution time; \item There still seems to be a strong correlation between short execution job times and successful final termination, and likewise for kills and higher job terminations. Especially for these two curves, in most cases also between the clusters, their behaviour suggests a specular trend; \item As can be seen in Figure~\ref{fig:figureIX-c}, across all clusters, a machine locality factor of 1 seems to lead to the highest success event rate, while in 2011 the same machine locality factor led to the lowest success event rate. \end{itemize} \input{figures/figure_9} \section{Conclusions, Limitations and Future Work}\label{sec8} In this report we analyzed the Google Borg 2019 traces and compared them with their 2011 counterpart from the perspective of unsuccessful executions, their impact on resources and their causes. We discover that the impact of unsuccessful executions (especially of \texttt{KILL}ed tasks and jobs) in the new traces is still very relevant in terms of machine time and resources, even more so than in 2011. We also discover that unsuccessful job and task event patterns still play a major role in the overall execution success of Borg jobs and tasks. We finally discover that unsuccessful job and task event rates dominate the overall landscape of Borg's own logs, even when grouping tasks and jobs by parameters such as priority, resource request, reservation and utilization, and machine locality. We then can conclude that the performed analysis show many clear trends regarding the correlation of execution success with several parameters and metadata. These trends can potentially be exploited to build better scheduling algorithms and new predictive models that could understand if an execution has high probability of failure based on its own properties and metadata. The creation of such models could allow for computational resources to be saved and used to either increase the throughput of higher priority workloads or to allow for a larger workload altoghether. The biggest limitation and threat to validity posed to this project is the relative lack of information provided by Google on the true meaning of unsuccessful terminations. Indeed, given the ``black box'' nature of the traces and the rather scarcity of information in the traces documentation~\cite{google-drive-marso}, it is not clear if unsuccessful executions yield any useful computation result or not. Our assumption in this report is that unsuccesful jobs and tasks do not produce any result and are therefore just burdens on machine time and resources, but should this assumption be incorrect then the interpretation of the analyses might change. Given the significant computational time invested in obtaining the results shown in this report and due to time and resource limitations, some of the analysis were not completed on all clusters. Our future work will focus on finishing these analysis, computing results for the missing clusters and obtaining an overall picture of the 2019 Google Borg cluster traces w.r.t.\ failures and their causes. \newpage \printbibliography% \end{document} % vim: set ts=2 sw=2 et tw=80: