We use cookies to give you the best experience on our website. If you continue to browse, then you agree to our privacy policy and cookie policy. Image for the cookie policy date

Pyspark MemoryError continues to occur

I'm running a big data cleaning job on Spark cluster on AWS m6i.xlarge(4 vCPU, 16G Memory with 200G disk) instanses with folling SparkContext:

    conf = pyspark.SparkConf().setAppName("CommonCrawlProcessor")\
                              .set("spark.executor.cores", "3")
    sc = pyspark.SparkContext(conf=conf)

And I set following config:

    --executor-memory 12G \
    --driver-memory 12G \

I persist every rdd on dist with RDD.persist(storageLevel=StorageLevel.DISK_ONLY) However, no matter how hard I tried, I keeps getting tons of follwing error when running into some stages:

org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/home/ec2-user/spark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/home/ec2-user/a22/code/spark.py", line 60, in parse_file
MemoryError

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
    at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:732)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:438)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2066)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:272)

Really Appreciate Any Help!

I persist every rdd on dist with RDD.persist(storageLevel=StorageLevel.DISK_ONLY) and unpersist them if not using anymore


1 Reply

KS Karthik Sridhar Syncfusion Team April 24, 2023 11:36 AM UTC

Hello Manikanta Choudary,


Thanks for contacting Syncfusion. We are sorry to let you know that we had discontinued the support and development of the Big Data product and we don’t have any support team to revert on this query. We suggest you check with online open-source tools to fulfill your request.


Regards,

Karthik Sridhar.


Loader.
Up arrow icon