machine time waste

This commit is contained in:
Claudio Maggioni 2021-02-19 18:47:56 +00:00
parent 6248d8ca27
commit 9f26534e80
8 changed files with 1976 additions and 0 deletions

View File

@ -0,0 +1,895 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "proper-gnome",
"metadata": {},
"source": [
"# Temporal impact: machine time waste"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "fantastic-harrison",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [],
"source": [
"import pandas\n",
"from IPython import display\n",
"import findspark\n",
"findspark.init()\n",
"import pyspark\n",
"import pyspark.sql\n",
"import sys\n",
"\n",
"from pyspark.sql.functions import col, lag, when, concat_ws, last, first\n",
"from pyspark.sql import Window\n",
"from pyspark.sql.types import LongType"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "failing-rebecca",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [],
"source": [
"cluster=\"b\""
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "transsexual-baptist",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"ERROR:root:Exception while sending command.\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1207, in send_command\n",
" raise Py4JNetworkError(\"Answer from Java side is empty\")\n",
"py4j.protocol.Py4JNetworkError: Answer from Java side is empty\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1033, in send_command\n",
" response = connection.send_command(command)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1212, in send_command\n",
" \"Error while receiving\", e, proto.ERROR_ON_RECEIVE)\n",
"py4j.protocol.Py4JNetworkError: Error while receiving\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n"
]
},
{
"ename": "Py4JError",
"evalue": "An error occurred while calling o26.json",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mPy4JError\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m<ipython-input-3-047bb33e6a05>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m 3\u001b[0m \u001b[0;34m.\u001b[0m\u001b[0mgetOrCreate\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 4\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 5\u001b[0;31m \u001b[0mdf\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mspark\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mread\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mjson\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"/home/claudio/google_2019/instance_events/\"\u001b[0m \u001b[0;34m+\u001b[0m \u001b[0mcluster\u001b[0m \u001b[0;34m+\u001b[0m \u001b[0;34m\"/\"\u001b[0m \u001b[0;34m+\u001b[0m \u001b[0mcluster\u001b[0m \u001b[0;34m+\u001b[0m \u001b[0;34m\"_instance_events*.json.gz\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[0;32m/opt/spark/python/pyspark/sql/readwriter.py\u001b[0m in \u001b[0;36mjson\u001b[0;34m(self, path, schema, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, dateFormat, timestampFormat, multiLine, allowUnquotedControlChars, lineSep, samplingRatio, dropFieldIfAllNull, encoding, locale, pathGlobFilter, recursiveFileLookup)\u001b[0m\n\u001b[1;32m 298\u001b[0m \u001b[0mpath\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 299\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mtype\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0mlist\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 300\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_df\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jreader\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mjson\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_spark\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_sc\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jvm\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPythonUtils\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtoSeq\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 301\u001b[0m \u001b[0;32melif\u001b[0m \u001b[0misinstance\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mRDD\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 302\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0miterator\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1303\u001b[0m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1304\u001b[0m return_value = get_return_value(\n\u001b[0;32m-> 1305\u001b[0;31m answer, self.gateway_client, self.target_id, self.name)\n\u001b[0m\u001b[1;32m 1306\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1307\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/opt/spark/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 126\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mdeco\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 127\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 128\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 129\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mpy4j\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mprotocol\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPy4JJavaError\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 130\u001b[0m \u001b[0mconverted\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mconvert_exception\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0me\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mjava_exception\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m 334\u001b[0m raise Py4JError(\n\u001b[1;32m 335\u001b[0m \u001b[0;34m\"An error occurred while calling {0}{1}{2}\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 336\u001b[0;31m format(target_id, \".\", name))\n\u001b[0m\u001b[1;32m 337\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 338\u001b[0m \u001b[0mtype\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0manswer\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;31mPy4JError\u001b[0m: An error occurred while calling o26.json"
]
}
],
"source": [
"spark = pyspark.sql.SparkSession.builder \\\n",
" .appName(\"machine_time_waste\") \\\n",
" .getOrCreate()\n",
"\n",
"df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "juvenile-absolute",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [],
"source": [
"df.printSchema()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "lucky-western",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [],
"source": [
"df.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "normal-settlement",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [],
"source": [
"# .filter(df.collection_type == 0) \\\n",
"df2 = df \\\n",
" .withColumn(\"time\", col(\"time\").cast(LongType())) \\\n",
" .withColumn(\"type\", col(\"type\").cast(LongType())) \\\n",
" .withColumn(\"type\", when(col(\"type\").isNull(), 0).otherwise(col(\"type\"))) \\\n",
" .withColumn(\"id\", concat_ws(\"-\", \"collection_id\", \"instance_index\")) \\\n",
" .where(col(\"time\").isNotNull()) \\\n",
" .where(col(\"type\").isNotNull()) \\\n",
" .where((col(\"instance_index\").isNotNull()) & (col(\"collection_id\").isNotNull())) \\\n",
" .select(\"machine_id\", \"id\", \"time\", \"type\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "typical-homeless",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [],
"source": [
"df2.show()\n",
"print(\"Total: \" + str(df.count()))\n",
"print(\"Filtered: \" + str(df2.count()))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "collect-saying",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [],
"source": [
"# my_window = Window.partitionBy(\"machine_id\", \"id\").orderBy(df2.time.asc())\n",
"\n",
"w2 = Window.partitionBy(\"id\").orderBy(df2.time.asc()).rowsBetween(Window.currentRow, Window.unboundedFollowing)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cooperative-appraisal",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [],
"source": [
"# .withColumn(\"prev_time\", lag(df2.time).over(my_window)) \\\n",
"# .withColumn(\"prev_type\", lag(df2.type).over(my_window)) \\\n",
"\n",
"df3 = df2 \\\n",
" .withColumn(\"t3_time\", when((df2.type != 3), None).otherwise(df2.time)) \\\n",
" .withColumn(\"t45678_time\", when((df2.type < 4) | (df2.type > 8), None).otherwise(df2.time)) \\\n",
" .withColumn(\"t45678_type\", when((df2.type < 4) | (df2.type > 8), None).otherwise(df2.type)) \\\n",
" .withColumn(\"t01_time\", when((df2.type != 0) & (df2.type != 1), None).otherwise(df2.time)) \\\n",
" .withColumn(\"t01_type\", when((df2.type != 0) & (df2.type != 1), None).otherwise(df2.type)) \\\n",
" .withColumn(\"next_time\", when(df2.type == 3, first(col(\"t45678_time\"), ignorenulls=True).over(w2)) \\\n",
" .when((df2.type == 0) | (df2.type == 1), first(col(\"t3_time\"), ignorenulls=True).over(w2)) \\\n",
" .when((df2.type >= 4) | (df2.type <= 8), first(col(\"t01_time\"), ignorenulls=True).over(w2)) \\\n",
" .otherwise(None)) \\\n",
" .withColumn(\"next_type\", when(df2.type == 3, first(col(\"t45678_type\"), ignorenulls=True).over(w2)) \\\n",
" .when((df2.type == 0) | (df2.type == 1), 3) \\\n",
" .when((df2.type >= 4) | (df2.type <= 8), first(col(\"t01_type\"), ignorenulls=True).over(w2)) \\\n",
" .otherwise(None)) \\\n",
" .withColumn(\"last_term_type\", last(col(\"t45678_type\"), ignorenulls=True).over(w2)) \\\n",
" .withColumn(\"time_delta\", col(\"next_time\") - col(\"time\")) \\\n",
" .select(\"machine_id\", \"id\", \"time\", \"type\", \"last_term_type\", \"time_delta\", \"t01_time\", \"t01_type\", \"t3_time\", \"t45678_time\", \"t45678_type\", \"next_time\", \"next_type\") \\"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ideal-angle",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [],
"source": [
"df4 = df3.where(df3.next_type.isNotNull()).groupby(\"type\", \"next_type\", \"last_term_type\").sum(\"time_delta\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "working-difficulty",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [],
"source": [
"# df3.orderBy(df3.machine_id, df3.time).show(n=100)\n",
"# df3.printSchema()\n",
"df4.show(n=1000000)\n",
"df4.write.csv(\"/home/claudio/google_2019/thesis_queries/machine_time_waste/\" + cluster + \"_state_change.csv\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.9"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,895 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "proper-gnome",
"metadata": {},
"source": [
"# Temporal impact: machine time waste"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "fantastic-harrison",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [],
"source": [
"import pandas\n",
"from IPython import display\n",
"import findspark\n",
"findspark.init()\n",
"import pyspark\n",
"import pyspark.sql\n",
"import sys\n",
"\n",
"from pyspark.sql.functions import col, lag, when, concat_ws, last, first\n",
"from pyspark.sql import Window\n",
"from pyspark.sql.types import LongType"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "failing-rebecca",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [],
"source": [
"cluster=\"b\""
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "transsexual-baptist",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"ERROR:root:Exception while sending command.\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1207, in send_command\n",
" raise Py4JNetworkError(\"Answer from Java side is empty\")\n",
"py4j.protocol.Py4JNetworkError: Answer from Java side is empty\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1033, in send_command\n",
" response = connection.send_command(command)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1212, in send_command\n",
" \"Error while receiving\", e, proto.ERROR_ON_RECEIVE)\n",
"py4j.protocol.Py4JNetworkError: Error while receiving\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n",
"ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:36135)\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 3343, in run_code\n",
" exec(code_obj, self.user_global_ns, self.user_ns)\n",
" File \"<ipython-input-3-047bb33e6a05>\", line 5, in <module>\n",
" df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")\n",
" File \"/opt/spark/python/pyspark/sql/readwriter.py\", line 300, in json\n",
" return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1305, in __call__\n",
" answer, self.gateway_client, self.target_id, self.name)\n",
" File \"/opt/spark/python/pyspark/sql/utils.py\", line 128, in deco\n",
" return f(*a, **kw)\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\", line 336, in get_return_value\n",
" format(target_id, \".\", name))\n",
"py4j.protocol.Py4JError: An error occurred while calling o26.json\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/home/claudio/python-venv/lib/python3.6/site-packages/IPython/core/interactiveshell.py\", line 2044, in showtraceback\n",
" stb = value._render_traceback_()\n",
"AttributeError: 'Py4JError' object has no attribute '_render_traceback_'\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 977, in _get_connection\n",
" connection = self.deque.pop()\n",
"IndexError: pop from an empty deque\n",
"\n",
"During handling of the above exception, another exception occurred:\n",
"\n",
"Traceback (most recent call last):\n",
" File \"/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\", line 1115, in start\n",
" self.socket.connect((self.address, self.port))\n",
"ConnectionRefusedError: [Errno 111] Connection refused\n"
]
},
{
"ename": "Py4JError",
"evalue": "An error occurred while calling o26.json",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mPy4JError\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m<ipython-input-3-047bb33e6a05>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m 3\u001b[0m \u001b[0;34m.\u001b[0m\u001b[0mgetOrCreate\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 4\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 5\u001b[0;31m \u001b[0mdf\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mspark\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mread\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mjson\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"/home/claudio/google_2019/instance_events/\"\u001b[0m \u001b[0;34m+\u001b[0m \u001b[0mcluster\u001b[0m \u001b[0;34m+\u001b[0m \u001b[0;34m\"/\"\u001b[0m \u001b[0;34m+\u001b[0m \u001b[0mcluster\u001b[0m \u001b[0;34m+\u001b[0m \u001b[0;34m\"_instance_events*.json.gz\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[0;32m/opt/spark/python/pyspark/sql/readwriter.py\u001b[0m in \u001b[0;36mjson\u001b[0;34m(self, path, schema, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, dateFormat, timestampFormat, multiLine, allowUnquotedControlChars, lineSep, samplingRatio, dropFieldIfAllNull, encoding, locale, pathGlobFilter, recursiveFileLookup)\u001b[0m\n\u001b[1;32m 298\u001b[0m \u001b[0mpath\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 299\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0mtype\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m==\u001b[0m \u001b[0mlist\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 300\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_df\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jreader\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mjson\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_spark\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_sc\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jvm\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPythonUtils\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtoSeq\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 301\u001b[0m \u001b[0;32melif\u001b[0m \u001b[0misinstance\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mpath\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mRDD\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 302\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mfunc\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0miterator\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1303\u001b[0m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1304\u001b[0m return_value = get_return_value(\n\u001b[0;32m-> 1305\u001b[0;31m answer, self.gateway_client, self.target_id, self.name)\n\u001b[0m\u001b[1;32m 1306\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1307\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/opt/spark/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 126\u001b[0m \u001b[0;32mdef\u001b[0m \u001b[0mdeco\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 127\u001b[0m \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 128\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 129\u001b[0m \u001b[0;32mexcept\u001b[0m \u001b[0mpy4j\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mprotocol\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPy4JJavaError\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 130\u001b[0m \u001b[0mconverted\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mconvert_exception\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0me\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mjava_exception\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m 334\u001b[0m raise Py4JError(\n\u001b[1;32m 335\u001b[0m \u001b[0;34m\"An error occurred while calling {0}{1}{2}\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 336\u001b[0;31m format(target_id, \".\", name))\n\u001b[0m\u001b[1;32m 337\u001b[0m \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 338\u001b[0m \u001b[0mtype\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0manswer\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;31mPy4JError\u001b[0m: An error occurred while calling o26.json"
]
}
],
"source": [
"spark = pyspark.sql.SparkSession.builder \\\n",
" .appName(\"machine_time_waste\") \\\n",
" .getOrCreate()\n",
"\n",
"df = spark.read.json(\"/home/claudio/google_2019/instance_events/\" + cluster + \"/\" + cluster + \"_instance_events*.json.gz\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "juvenile-absolute",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [],
"source": [
"df.printSchema()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "lucky-western",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [],
"source": [
"df.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "normal-settlement",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [],
"source": [
"# .filter(df.collection_type == 0) \\\n",
"df2 = df \\\n",
" .withColumn(\"time\", col(\"time\").cast(LongType())) \\\n",
" .withColumn(\"type\", col(\"type\").cast(LongType())) \\\n",
" .withColumn(\"type\", when(col(\"type\").isNull(), 0).otherwise(col(\"type\"))) \\\n",
" .withColumn(\"id\", concat_ws(\"-\", \"collection_id\", \"instance_index\")) \\\n",
" .where(col(\"time\").isNotNull()) \\\n",
" .where(col(\"type\").isNotNull()) \\\n",
" .where((col(\"instance_index\").isNotNull()) & (col(\"collection_id\").isNotNull())) \\\n",
" .select(\"machine_id\", \"id\", \"time\", \"type\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "typical-homeless",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [],
"source": [
"df2.show()\n",
"print(\"Total: \" + str(df.count()))\n",
"print(\"Filtered: \" + str(df2.count()))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "collect-saying",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [],
"source": [
"# my_window = Window.partitionBy(\"machine_id\", \"id\").orderBy(df2.time.asc())\n",
"\n",
"w2 = Window.partitionBy(\"id\").orderBy(df2.time.asc()).rowsBetween(Window.currentRow, Window.unboundedFollowing)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cooperative-appraisal",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [],
"source": [
"# .withColumn(\"prev_time\", lag(df2.time).over(my_window)) \\\n",
"# .withColumn(\"prev_type\", lag(df2.type).over(my_window)) \\\n",
"\n",
"df3 = df2 \\\n",
" .withColumn(\"t3_time\", when((df2.type != 3), None).otherwise(df2.time)) \\\n",
" .withColumn(\"t45678_time\", when((df2.type < 4) | (df2.type > 8), None).otherwise(df2.time)) \\\n",
" .withColumn(\"t45678_type\", when((df2.type < 4) | (df2.type > 8), None).otherwise(df2.type)) \\\n",
" .withColumn(\"t01_time\", when((df2.type != 0) & (df2.type != 1), None).otherwise(df2.time)) \\\n",
" .withColumn(\"t01_type\", when((df2.type != 0) & (df2.type != 1), None).otherwise(df2.type)) \\\n",
" .withColumn(\"next_time\", when(df2.type == 3, first(col(\"t45678_time\"), ignorenulls=True).over(w2)) \\\n",
" .when((df2.type == 0) | (df2.type == 1), first(col(\"t3_time\"), ignorenulls=True).over(w2)) \\\n",
" .when((df2.type >= 4) | (df2.type <= 8), first(col(\"t01_time\"), ignorenulls=True).over(w2)) \\\n",
" .otherwise(None)) \\\n",
" .withColumn(\"next_type\", when(df2.type == 3, first(col(\"t45678_type\"), ignorenulls=True).over(w2)) \\\n",
" .when((df2.type == 0) | (df2.type == 1), 3) \\\n",
" .when((df2.type >= 4) | (df2.type <= 8), first(col(\"t01_type\"), ignorenulls=True).over(w2)) \\\n",
" .otherwise(None)) \\\n",
" .withColumn(\"last_term_type\", last(col(\"t45678_type\"), ignorenulls=True).over(w2)) \\\n",
" .withColumn(\"time_delta\", col(\"next_time\") - col(\"time\")) \\\n",
" .select(\"machine_id\", \"id\", \"time\", \"type\", \"last_term_type\", \"time_delta\", \"t01_time\", \"t01_type\", \"t3_time\", \"t45678_time\", \"t45678_type\", \"next_time\", \"next_type\") \\"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ideal-angle",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [],
"source": [
"df4 = df3.where(df3.next_type.isNotNull()).groupby(\"type\", \"next_type\", \"last_term_type\").sum(\"time_delta\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "working-difficulty",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [],
"source": [
"# df3.orderBy(df3.machine_id, df3.time).show(n=100)\n",
"# df3.printSchema()\n",
"df4.show(n=1000000)\n",
"df4.write.csv(\"/home/claudio/google_2019/thesis_queries/machine_time_waste/\" + cluster + \"_state_change.csv\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.9"
}
},
"nbformat": 4,
"nbformat_minor": 5
}

View File

@ -0,0 +1,79 @@
#!/usr/bin/env python3
# coding: utf-8
# # Temporal impact: machine time waste
import pandas
from IPython import display
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
from pyspark.sql.functions import col, lag, when, concat_ws, last, first
from pyspark.sql import Window
from pyspark.sql.types import LongType
cluster="b"
spark = pyspark.sql.SparkSession.builder \
.appName("machine_time_waste") \
.config("spark.local.dir", "/run/tmpfiles.d/spark") \
.config("spark.driver.memory", "124g") \
.getOrCreate()
df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz")
df.printSchema()
df.show()
# .filter(df.collection_type == 0) \
df2 = df \
.withColumn("time", col("time").cast(LongType())) \
.withColumn("type", col("type").cast(LongType())) \
.withColumn("type", when(col("type").isNull(), 0).otherwise(col("type"))) \
.withColumn("id", concat_ws("-", "collection_id", "instance_index")) \
.where(col("time").isNotNull()) \
.where(col("type").isNotNull()) \
.where((col("instance_index").isNotNull()) & (col("collection_id").isNotNull())) \
.select("time", "type", "id")
df2.show()
print("Total: " + str(df.count()))
print("Filtered: " + str(df2.count()))
# my_window = Window.partitionBy("machine_id", "id").orderBy(df2.time.asc())
w2 = Window.partitionBy("id").orderBy(df2.time.asc()).rowsBetween(Window.currentRow, Window.unboundedFollowing)
# .withColumn("prev_time", lag(df2.time).over(my_window)) \
# .withColumn("prev_type", lag(df2.type).over(my_window)) \
df3 = df2 \
.withColumn("t3_time", when((df2.type != 3), None).otherwise(df2.time)) \
.withColumn("t45678_time", when((df2.type < 4) | (df2.type > 8), None).otherwise(df2.time)) \
.withColumn("t45678_type", when((df2.type < 4) | (df2.type > 8), None).otherwise(df2.type)) \
.withColumn("t01_time", when((df2.type != 0) & (df2.type != 1), None).otherwise(df2.time)) \
.withColumn("t01_type", when((df2.type != 0) & (df2.type != 1), None).otherwise(df2.type)) \
.withColumn("next_time", when(df2.type == 3, first(col("t45678_time"), True).over(w2)) \
.when((df2.type == 0) | (df2.type == 1), first(col("t3_time"), True).over(w2)) \
.when((df2.type >= 4) | (df2.type <= 8), first(col("t01_time"), True).over(w2)) \
.otherwise(None)) \
.withColumn("next_type", when(df2.type == 3, first(col("t45678_type"), True).over(w2)) \
.when((df2.type == 0) | (df2.type == 1), 3) \
.when((df2.type >= 4) | (df2.type <= 8), first(col("t01_type"), True).over(w2)) \
.otherwise(None)) \
.withColumn("last_term_type", last(col("t45678_type"), True).over(w2)) \
.withColumn("time_delta", col("next_time") - col("time")) \
.select("id", "time", "type", "last_term_type", "time_delta", "t01_time", \
"t01_type", "t3_time", "t45678_time", "t45678_type", "next_time", "next_type")
df4 = df3.where(df3.next_type.isNotNull()).groupby("type", "next_type", "last_term_type").sum("time_delta")
# df3.orderBy(df3.machine_id, df3.time).show(n=100)
# df3.printSchema()
df4.show(n=1000000)
df4.write.csv("/home/claudio/google_2019/thesis_queries/machine_time_waste/" + cluster + "_state_change.csv")
# vim: set ts=2 sw=2 et tw=120:

View File

@ -0,0 +1,103 @@
#!/usr/bin/env python3
# coding: utf-8
# # Temporal impact: machine time waste
import json
import pandas
from IPython import display
import findspark
findspark.init()
import pyspark
import pyspark.sql
import sys
from pyspark.sql.functions import col, lag, when, concat_ws, last, first
from pyspark.sql import Window
from pyspark.sql.types import LongType
if len(sys.argv) != 2 or len(sys.argv[1]) != 1:
print("usage: " + sys.argv[0] + " {cluster}", file=sys.stderr)
sys.exit(1)
cluster=sys.argv[1]
spark = pyspark.sql.SparkSession.builder \
.appName("machine_time_waste") \
.config("spark.local.dir", "/tmp/ramdisk/spark") \
.config("spark.driver.memory", "124g") \
.getOrCreate()
df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_instance_events*.json.gz")
# df = spark.read.json("/home/claudio/google_2019/instance_events/" + cluster + "/" + cluster + "_test.json")
df.printSchema()
df.show()
# .filter(df.collection_type == 0) \
df2 = df \
.withColumn("time", col("time").cast(LongType())) \
.withColumn("type", col("type").cast(LongType())) \
.withColumn("type", when(col("type").isNull(), 0).otherwise(col("type"))) \
.withColumn("id", concat_ws("-", "collection_id", "instance_index")) \
.where(col("time").isNotNull()) \
.where(col("type").isNotNull()) \
.where((col("instance_index").isNotNull()) & (col("collection_id").isNotNull())) \
.select("time", "type", "id")
df2.show()
total = df.count()
filtered = df2.count()
print("Total: " + str(total))
print("Filtered: " + str(filtered))
r = df2.rdd
def for_each_task(ts):
ts = sorted(ts, key=lambda x: x.time)
last_term = None
prev = None
tr = {}
for i,t in enumerate(ts):
if prev is not None and t.type == prev.type: # remove useless transitions
if (i == len(ts)-1): # if last
tr[str(prev.type) + "-" + str(t.type)] = t.time - prev.time # keep "loops" if last
else:
continue
if t.type >= 4 and t.type <= 8:
last_term = t.type
if prev is not None:
tr[str(prev.type) + "-" + str(t.type)] = t.time - prev.time
prev = t
return {"last_term": last_term, 'tr': tr}
def sum_values(ds):
dsum = {}
for dt in ds:
d = dt["tr"]
for key in d:
if key not in dsum:
dsum[key] = d[key]
else:
dsum[key] += d[key]
return dsum
r2 = r \
.groupBy(lambda x: x.id) \
.mapValues(for_each_task) \
.map(lambda x: x[1]) \
.groupBy(lambda x: x["last_term"]) \
.mapValues(sum_values) \
.collect()
with open(cluster + "_state_changes.json", "w") as out:
json.dump({"filtered": filtered, "total": total, "data": r2}, out)
# .withColumn("prev_time", lag(df2.time).over(my_window)) \
# .withColumn("prev_type", lag(df2.type).over(my_window)) \
# vim: set ts=2 sw=2 et tw=120: