spark开发中遇到的问题

2017/9/12 posted in  Spark

spark连接mysql

问题描述

总是报no suitable driver以及   jdbc.mysql.driver类似这样的错误

解决办法1

1.提交任务的时候带上这个,手动指定mysql jar包的位置
                    SPARK_CLASSPATH=/usr/local/spark-1.4.1-bin-hadoop2.6/lib/mysql-connector-java-5.1.38.jar ./bin/spark-submit --class sparkDemo /root/data/demon-parent-1.0-SNAPSHOT-jar-with-dependencies.jar hdfs://192.168.119.100:9000/examples/custom.txt

解决办法2

修改了这个配置SPARK_HOME/conf/spark-env.sh文件,在里面加上了这个参数,就OK了

export SPARK_CLASSPATH=$SPATH_CLASSPATH:/usr/hdp/2.4.0.0-169/spark/lib/mysql-connector-java-5.1.38.jar

在spark中使用hive抛出错误

报错日志

17/08/09 12:11:51 WARN DataNucleus.Persistence: Error creating validator of type org.datanucleus.properties.CorePropertyValidator
ClassLoaderResolver for class "" gave error on creation : {1}
org.datanucleus.exceptions.NucleusUserException: ClassLoaderResolver for class "" gave error on creation : {1}
    at org.datanucleus.NucleusContext.getClassLoaderResolver(NucleusContext.java:1087)
    at org.datanucleus.PersistenceConfiguration.validatePropertyValue(PersistenceConfiguration.java:797)
    at org.datanucleus.PersistenceConfiguration.setProperty(PersistenceConfiguration.java:714)
    at org.datanucleus.PersistenceConfiguration.setPersistenceProperties(PersistenceConfiguration.java:693)
    at org.datanucleus.NucleusContext.<init>(NucleusContext.java:273)
    at org.datanucleus.NucleusContext.<init>(NucleusContext.java:247)
    at org.datanucleus.NucleusContext.<init>(NucleusContext.java:225)
    at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.<init>(JDOPersistenceManagerFactory.java:416)
    at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:301)
    at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:202)

问题分析

看日志应该是缺少了hive的一些包,在网上搜了一下,是下面几个包

[hadoop@U007 lib]$ pwd
/opt/spark-1.6.0/lib
[hadoop@U007 lib]$ ll
total 305220
-rw-r--r-- 1 hadoop hadoop    339666 Apr 15  2016 datanucleus-api-jdo-3.2.6.jar
-rw-r--r-- 1 hadoop hadoop   1890075 Apr 15  2016 datanucleus-core-3.2.10.jar
-rw-r--r-- 1 hadoop hadoop   1809447 Apr 15  2016 datanucleus-rdbms-3.2.9.jar
...

所以在提交spark任务的时候,把这几个包加入到classpath中即可

解决办法

在提交spark的脚本中加上这几个jar包和hive-site.xml文件
如下

nohup spark-submit \
 --master yarn \
 --deploy-mode cluster \
 --class ${className} \
 --driver-memory 4g \
 --executor-memory 2g \
 --executor-cores 4 \
 --num-executors 4 \
 --jars ./lib/datanucleus-api-jdo-3.2.6.jar,./lib/datanucleus-core-3.2.10.jar,./lib/datanucleus-rdbms
-3.2.9.jar  \
 --files ./lib/hive-site.xml \
 ./app-jar-with-dependencies.jar \
 

加上--jars 和 --files即可

在spark中将数据插入hive动态分区

问题描述

当我用standalone以及yarn-client模式进行提交任务的时候,不会报错.但是当我改成yarn-cluster模式进行提交任务,有时候就会报下面的错

报错日志

17/08/09 10:08:01 ERROR scheduler.JobScheduler: Error running job streaming job 1502188440000 ms.0
java.lang.NoSuchMethodException: org.apache.hadoop.hive.ql.metadata.Hive.loadDynamicPartitions(org.apache.hadoop.fs.Path, java.lang.String, java.util.Map, boolean, int, boolean, boolean)
    at java.lang.Class.getMethod(Class.java:1670)
    at org.apache.spark.sql.hive.client.Shim.findMethod(HiveShim.scala:114)
    at org.apache.spark.sql.hive.client.Shim_v0_12.loadDynamicPartitionsMethod$lzycompute(HiveShim.scala:168)
    at org.apache.spark.sql.hive.client.Shim_v0_12.loadDynamicPartitionsMethod(HiveShim.scala:167)
    at org.apache.spark.sql.hive.client.Shim_v0_12.loadDynamicPartitions(HiveShim.scala:261)
    at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$loadDynamicPartitions$1.apply$mcV$sp(ClientWrapper.scala:560)
    at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$loadDynamicPartitions$1.apply(ClientWrapper.scala:560)
    at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$loadDynamicPartitions$1.apply(ClientWrapper.scala:560)
    at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:279)
    at org.apache.spark.sql.hive.client.ClientWrapper.liftedTree1$1(ClientWrapper.scala:226)
    at org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:225)
    at org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:268)
    at org.apache.spark.sql.hive.client.ClientWrapper.loadDynamicPartitions(ClientWrapper.scala:559)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:225)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:127)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:276)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
    at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:145)
    at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:130)
    at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817)

分析

用client模式的时候,是在13上运行的.没有问题
用cluster模式的时候,有时候报错,有时候没有报错
那不禁让我猜想,为啥cluster模式时而报错时而不报错呢?

然后我用client模式,在14上提交,不出我所料,基本上每个job都抛出了那个错误.
所以定位到问题就是,除了13这个节点外,别的节点缺少了什么包,导致抛出了错误.
因为抛出来的错误是java.lang.NoSuchMethodException:,所以肯定是缺少了什么包.
之前cluster模式时而报错时而不报错的原因肯定是,当不报错的时候,正好driver端是在13上

现在的问题就是找出别的机器缺少什么包了.

然后我在spark的环境变量里面发现了这个参数

spark.sql.hive.metastore.jars : /usr/lib/hive/lib/*:/opt/spark-1.6.0/lib/spark-assembly-1.6.0-hadoop2.4.0.jar

我去,13上有这个/usr/lib/hive/lib/* 路径
14和15上都没有,,,
问题找到了

解决办法1

把13上这个路径/usr/lib/hive/lib/* 拷贝到14和15上,各自都有一份.这样无论driver端在哪里,都能找到相应的jar包.
就这样愉快的解决了.
所以遇到问题,慢慢分析,不要像无头苍蝇一样.
在网上搜的解决办法,都无法解决这个问题.所以有时候,具体问题具体分析,要慢慢的分析到出错原因.找到了原因,bug就能迎刃而解.

解决办法2

在spark的配置文件中把 spark.sql.hive.metastore.jars 给删了.因为你总不能在每个节点上去拷贝hive的一些依赖吧,如果以后hive升级了,还得替换hive的jar包,太麻烦.所以改成下面的解决办法更好.

在pom文件中加上hive的依赖

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.6.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-mllib_2.10 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-mllib_2.10</artifactId>
    <version>1.6.0</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.10</artifactId>
    <version>1.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.10</artifactId>
    <version>1.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>1.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.10</artifactId>
    <version>1.6.0</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.32</version>
</dependency>
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-jdbc</artifactId>
    <version>0.13.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>0.13.1</version>
</dependency>

sparkstreaming读取kafka数据

问题描述Couldn't find leaders for Set

SparkStreaming程序从Kafka读数据的程序运行期间报了描述中的异常.
通过监控分析发现,是由于有一个Broker挂掉了。可是对应Topic的replica设置的2,就算挂掉一个,应该有replica顶上啊。
后来发现,这是由于存在Partition的Replica没有跟Leader保持同步更新,也就是通常所说的“没追上”。 查看某个Topic是否存在没追上的情况:

查看某个Topic是否存在没追上的情况:

kafka-topics.sh --describe --zookeeper XXX --topic XXX

报错日志

17/10/13 09:41:13 ERROR DirectKafkaInputDStream: ArrayBuffer(java.nio.channels.ClosedChannelException, org.apache.spark.SparkException: Couldn't find leader offsets for Set([dsp_request_event,2]))
17/10/13 09:41:13 ERROR StreamingContext: Error starting the context, marking it as stopped
org.apache.spark.SparkException: ArrayBuffer(java.nio.channels.ClosedChannelException, org.apache.spark.SparkException: Couldn't find leader offsets for Set([dsp_request_event,2]))
    at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
    at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)

解决办法

观察其中的Replicas和Isr是否一致,如果出现Isr少于Replicas,则对应Partition存在没追上的情况
解决方法:
增大num.replica.fetchers的值,此参数是Replicas从Leader同步数据的线程数,默认为1,增大此参数即增大了同步IO。经过测试,增大此值后,不再有追不上的情况
确定问题已解决的方法:
启动出现问题的SparkStreaming程序,在程序正常计算的状态下,kill掉任意一个Broker后,再观察运行情况。在增大同步线程数之前,kill后SparkStreaming会报同样的异常,而增大后程序依然正常运行,问题解决。

参考:http://blog.csdn.net/yanshu2012/article/details/53995159