Spark DataFrame - Array[ByteBuffer] - IllegalAurmentException

IllegalArgumentException - ByteBuffer - Spark DataFrame


I was processing a several million documents (~ 20 million) in which we need to extract the NLP features using NLP4J, OpenNLP, and WordNet. The combination of the three NL features blows up each record to 11 times its original size. We are using all three because we do not know yet what feature sets will be helpful to us.
The original dataset is in parquet files in HDFS (16 partitions). I thought that was convenient just use withColumn and pass a UDF (User Defined Function) on the column where it needs those features. withColumn adds the calculated column back to the DataFrame. So I created the spark job (I am on Spark 1.5.2-cdh5.5.2)for the above, and things started to get nasty. I am blowing up the ByteBuffer array on the in-memory columnar storage. This is the exception that I am getting. There seems to be no reference in my code in this stack trace.


       

       java.lang.IllegalArgumentException
 at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
 at org.apache.spark.sql.columnar.ColumnBuilder$.ensureFreeSpace(ColumnBuilder.scala:142)
 at org.apache.spark.sql.columnar.BasicColumnBuilder.appendFrom(ColumnBuilder.scala:72)
 at org.apache.spark.sql.columnar.NativeColumnBuilder.org$apache$spark$sql$columnar$NullableColumnBuilder$$super$appendFrom(ColumnBuilder.scala:87)
 at org.apache.spark.sql.columnar.NullableColumnBuilder$class.appendFrom(NullableColumnBuilder.scala:62)
 at org.apache.spark.sql.columnar.NativeColumnBuilder.org$apache$spark$sql$columnar$compression$CompressibleColumnBuilder$$super$appendFrom(ColumnBuilder.scala:87)
 at org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.appendFrom(CompressibleColumnBuilder.scala:78)
 at org.apache.spark.sql.columnar.NativeColumnBuilder.appendFrom(ColumnBuilder.scala:87)
 at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:142)
 at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:120)
 at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)
 at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
 at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
 at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
 at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:88)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
 
So I googled this - and there seems to be no definite answer to this issue.
So here is what I did -
1. Looked at the ColumBuilder source code - There is a default size for each column type which is about ~ 128 MB (if my math is right)
2. I looked how columnar storage works and how it relates to the ByteBuffer - the only thing I can find out there is this blog - (oopsoom (Chinese - blog- get ready to translate)) which with the help of picture in the blog gave me a good idea on how this columnar storage works (in terms of the data structure that it uses)
3. I have to review my code again and see if I made any mistakes in coding this for Spark - so just a refresher I looked at this slides again - op-5-mistakes-to-avoid-when-writing-apache-spark-applications. In this presentation, it reminded me that no spark shuffle block can be greater than 2 GB and this is especially problematic for spark SQL.

So armed with this knowledge I have to pull out several tricks from up my sleeve. Here is what I did.
1. First increased the driver memory and executor memory - memory is cheap!
2. Increased the number of partitions (from the original 16 to 2500 (Spark uses a compressed data structure if the partitions are more than 2000 - (Line 48 on MapStatus.scala))) before processing each row. This is to have each executor deal with smaller chunks in which the dataset can grow to 11 times its original size
3. Of course, I used KryoSerializer and here are the settings that I used
     conf.set("spark.kryoserializer.buffer.max","2000m")
     conf.set("spark.kryoserializer.buffer","512m")
     conf.set("spark.rdd.compress","true")
     conf.set("spark.serializer.objectStreamReset","10")
4. In the SQLContext, I have used the following settings as well
   sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
   sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed", "true")
   sqlContext.setConf("spark.sql.inMemoryColumnarStorage.batchSize", "1000000")
   sqlContext.setConf("spark.sql.shuffle.partitions", )
So after doing all that - that exception is gone
I can start removing the changes that I made one by one to determine which of the things I did directly address the issue but I am keeping all my settings because from a 4-hour job, it now only runs in about 1.5 hours.

I think that in the next version of spark, there should be a setting to define the column size for any columnar type. In short there should be a legal argument to pass in the size of the column.


Comments

Popular posts from this blog

OAuth 1.0a Request Signing and Verification - HMAC-SHA1 - HMAC-SHA256

Gensim Doc2Vec on Spark - a quest to get the right Vector