Day 23/100
Spark Streaming from Amazon SQS
So this is a journal log, to describe how was I able to integrate Spark Streaming and Amazon SQS and what all problems I faced while doing so.
Problem Statement
- Seems like when we read s3 files from spark Streaming, spark makes a lot of s3 list API calls and also reads out the checkpointing directory to get the list of files already processed, to identify the new set of files to be processed. i.e. (file list from s3 list call) - (file list from checkpoints) = (files to process next)
- So this process is fast initially but overtime checkpoints directory grows, so the s3 files as well. and that cause slowness in file processing and takes time only to get us next set of files to process.
- Apart from the slowness, list API calls adds up good amount of cost to the aws bill
- A good read on checkpoints - qubole.com/blog/structured-streaming-with-d..
- So we want read from s3 but want to get over these drawbacks.
Probable solution
- One of the best approaches is to use Amazon SQS, i.e. simple queue service. which can be used in combination of SNS i.e. simple notification service, to produce messages for any new file create/deleted to a simple queue.
- We can pick file names from the queue and read the actual file, that way we get rid of list calls and save cost, also should be faster than normal read due to missing list calls.
- And we need some Custom Stream Source to pick file names and read the actual files from s3.
- Fortunately there are ready solutions available for above use case.
- The best one is github.com/apache/bahir/tree/master/sql-str.. , but that is only for scala 2.12 but I had spark 2.4.5 with scala 2.11 so couldn't use this.
- the other adoption of the same lib but for spark 2.11 is this one, github.com/qubole/s3-sqs-connector
Integration issues and fixes
- So I started by adding the dependancy to pom.xml something like this -
<dependency>
<groupId>com.qubole</groupId>
<artifactId>spark-sql-streaming-sqs_2.11</artifactId>
<version>0.5.1</version>
</dependency>
- And added the required options for spark structured streaming readStream like this
val inputDf = sparkSession
.readStream
.format("s3-sqs")
.schema(schema)
.option("sqsUrl", queueUrl)
.option("region", awsRegion)
.option("fileFormat", "json")
.option("sqsFetchIntervalSeconds", "2")
.option("useInstanceProfileCredentials", "true")
.option("sqsLongPollingWaitTimeSeconds", "5")
.load()
- for more information about the options and default values available please see github.com/qubole/s3-sqs-connector#configur..
- Created a build and tried running it while passing the jar into
--jars
options since I was using slim jar. - The very first error popped looked something like this,
FATAL [main] org.apache.hadoop.mapred.YarnChild: Error running child : java.lang.NoSuchMethodError: com.amazonaws.util.StringUtils.trim(Ljava/lang/String;)Ljava/lang/String; ... ... ...
- So after doing a thorough debugging found out that the spark and hadoop libs were using
hadoop-aws-1.7.4.jar
which was very old and did not even support the sqs as a new feature in aws. - Whereas the application was expecting some newer versions of
aws-java-sdk-core-1.11.271.jar
along with similaraws-java-sdk-sqs-1.11.271.jar
Hence the error. tried a lot of options to get rid of this error
- passed
aws-java-sdk-core-1.11.271.jar
andaws-java-sdk-sqs-1.11.271.jar
, both the jars via spark-submit's--jars
option [DID NOT WORK] - tried loading the user classpath first using below
--conf "spark.driver.userClassPathFirst=true" --conf "spark.executor.userClassPathFirst=true"
[DID NOT WORK] - Cloned the sqs lib on local and tried creating a fat sqs connector shaded jar for aws-java-sdk-core and aws-java-sdk-sqs. with relocations and transformers, something like this,
<executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> </transformer> </transformers> <relocations> <relocation> <pattern>com.amazonaws</pattern> <shadedPattern>sqs.com.amazonaws</shadedPattern> </relocation> </relocations> <artifactSet> <includes> <include>com.amazonaws:aws-java-sdk-sqs:*</include> <include>com.amazonaws:aws-java-sdk-core:*</include> </includes> </artifactSet> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/maven/</exclude> <exclude>META-INF/MANIFEST.MF</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions>
- Now this was interesting basically all the classes under
com.amazonaws
will now be moved/renamed undercustomeName.com.amzonaws
. this was whenever the code inside of this repo is calling aws functions it will only call to shaded classes and won't clash with production'shadoop-aws-1.7.4.jar
- So after build and run got into one more new issue,
Unable to instantiate request handler chain for client for request handler 'com.amazonaws.services.sqs.QueueUrlHandler' ........
- after spending a good amount of time to see what was causing the issue, turns out unfortunately for us the aws sdk jar has two hardcoded fully qualified class names being used inside of the aws lib causing conflicts of missing class post shading. Since we have renamed the class and there's a hardcoding inside of the job refering to old class it will not find it naturally.
requestHandler2s.addAll(chainFactory.newRequestHandlerChain("/com/amazonaws/services/sqs/request.handlers")) requestHandler2s.addAll(chainFactory.newRequestHandler2Chain("/com/amazonaws/services/sqs/request.handler2s"))
- After spending hours, probably 1-2 days on getting around this problem, but not get through this one so gave up on shaded jar altogether.
- the last thing I tried was to use these options in spark-submit command,
--conf spark.driver.extraClassPath=aws-java-sdk-sqs-1.11.271.jar:aws-java-sdk-core-1.11.271.jar --conf spark.executor.extraClassPath=aws-java-sdk-sqs-1.11.271.jar:aws-java-sdk-core-1.11.271.jar
- the documentation for this option suggests that this would allow me to prepend dependencies/JARS to the classpath. Keyword being prepend, as in, put in front of Spark’s built-in classpath and libs.
- the above option was adding the aws jars at the very beginning of classpath allowing jobs to load the new aws jars instead of one from hadoop libs. and fortunately this worked and used normal version of sqs lib without any changes.
- passed
The next error I got was this one
20/07/23 17:57:13 ERROR MicroBatchExecution: Query [id = c9b4658e-af4b-41ab-939f-22149636e025, runId = 7aede0d8-a016-48fd-8ad9-41ba08539f2d] terminated with error java.lang.NoSuchMethodError: org.apache.spark.sql.Dataset$.ofRows$default$3()Ljava/lang/String; .... ....
- to solve this problem after trying various suggestions and options I just build the sqs connector lib with my version of spark i.e. 2.4.5. and build a new jar called now,
spark-sql-streaming-sqs_2.11-0.5.2-SNAPSHOT.jar
- this did the trick, mostly the issues seems to have been with spark version mismatch between the deafult jar i.e.
spark-sql-streaming-sqs_2.11-0.5.1.jar
. the default uses spark 2.4.0
- to solve this problem after trying various suggestions and options I just build the sqs connector lib with my version of spark i.e. 2.4.5. and build a new jar called now,
- After this the job proceeded further than earlier ones and ended up erroring out something on the lines of
22/04/12 17:49:44 ERROR Client: Application diagnostics message: User class threw exception: org.apache.spark.sql.streaming.StreamingQueryException: /path/to/file.json.gz doesn't exist Caused by: java.io.IOException: /path/to/file.json.gz doesn't exist
- After analysis, I thought the job is taking only the
/path/to/file
and not the bucket name was missing. - So after printing the path the job is trying to use, we got
s3://path/to/file
, so my assumption of error in the s3 path was wrong. - to fix this had to change in the sqs connector library, this file
/path/to/git/clone/s3-sqs-connector/src/main/scala/org/apache/spark/sql/streaming/sqs/SqsSource.scala
- After analysis, I thought the job is taking only the
changed this
paths = files.map(f => new Path(new URI(f.path)).toString),
to this
paths = files.map(f => new Path(new URI(f.path.replaceAll("s3:", "s3n:"))).toString),
- This has started picking up the files from sqs, not sure why it is not deleting the files from SQS, this might be due to change in file name, need to dig in. more.
[TO BE CONTINUED]