I'm running a big data cleaning job on Spark cluster on AWS m6i.xlarge(4 vCPU, 16G Memory with 200G disk) instanses with folling SparkContext:
conf = pyspark.SparkConf().setAppName("CommonCrawlProcessor")\
.set("spark.executor.cores", "3")
sc = pyspark.SparkContext(conf=conf)
And I set following config:
--executor-memory 12G \
--driver-memory 12G \
I persist every rdd on dist with RDD.persist(storageLevel=StorageLevel.DISK_ONLY)
However, no matter how hard I tried, I keeps getting tons of follwing error when running into some stages:
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
process()
File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
serializer.dump_stream(out_iter, outfile)
File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
return f(*args, **kwargs)
File "/home/ec2-user/a22/code/spark.py", line 60, in parse_file
MemoryError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:732)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:438)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:272)
Really Appreciate Any Help!
I persist every rdd on dist with RDD.persist(storageLevel=StorageLevel.DISK_ONLY)
and unpersist them if not using anymore