我的火花作业引发了一个我无法追查的空指针异常 . 当我打印潜在的空变量时,它们都填充在每个工作者身上 . 我的数据不包含空值,因为同一作业在spark shell中有效 . 下面是作业的执行功能,然后是错误消息 .
函数中未定义的所有辅助方法都是在spark作业对象的主体内定义的,所以我认为闭包不是问题所在 .
override def execute(sc:SparkContext) = {
def construct_query(targetTypes:List[String]) = Map("query" ->
Map("nested" ->
Map("path"->"annotations.entities.items",
"query"-> Map("terms"->
Map("annotations.entities.items.type"-> targetTypes)))))
val sourceConfig = HashMap(
"es.nodes" -> params.targetClientHost
)
// Base elastic search RDD returning articles which match the above query on entity types
val rdd = EsSpark.esJsonRDD(sc,
params.targetIndex,
toJson(construct_query(params.entityTypes)),
sourceConfig
).sample(false,params.sampleRate)
// Mapping ES json into news article object, then extracting the entities list of
// well defined annotations
val objectsRDD = rdd.map(tuple => {
val maybeArticle =
try {
Some(JavaJsonUtils.fromJson(tuple._2, classOf[SearchableNewsArticle]))
}catch {
case e: Exception => None
}
(tuple._1,maybeArticle)
}
).filter(tuple => {tuple._2.isDefined && tuple._2.get.annotations.isDefined &&
tuple._2.get.annotations.get.entities.isDefined}).map(tuple => (tuple._1, tuple._2.get.annotations.get.entities.get))
// flat map the RDD of entities lists into a list of (entity text, (entity type, 1)) tuples
(line 79) val entityDataMap: RDD[(String, (String, Int))] = objectsRDD.flatMap(tuple => tuple._2.items.collect({
case item if (item.`type`.isDefined) && (item.text.isDefined) &&
(line 81)(params.entityTypes.contains(item.`type`.get)) => (cleanUpText(item.text.get), (item.`type`.get, 1))
}))
// bucketize the tuples RDD into entity text, List(entity_type, entity_count) to make count aggregation and file writeouts
// easier to follow
val finalResults: Array[(String, (String, Int))] = entityDataMap.reduceByKey((x, y) => (x._1, x._2+y._2)).collect()
val entityTypeMapping = Map(
"HealthCondition" -> "HEALTH_CONDITION",
"Drug" -> "DRUG",
"FieldTerminology" -> "FIELD_TERMINOLOGY"
)
for (finalTuple <- finalResults) {
val entityText = finalTuple._1
val entityType = finalTuple._2._1
if(entityTypeMapping.contains(entityType))
{
if(!Files.exists(Paths.get(entityTypeMapping.get(entityType).get+".txt"))){
val myFile = new java.io.FileOutputStream(new File(entityTypeMapping.get(entityType).get+".txt"),false)
printToFile(myFile) {p => p.println(entityTypeMapping.get(entityType))}
}
}
val myFile = new java.io.FileOutputStream(new File(entityTypeMapping.get(entityType).get+".txt"),true)
printToFile(myFile) {p => p.println(entityText)}
}
}
以下错误消息:
com.quid.gazetteers.GazetteerGenerator上的java.lang.NullPointerException $$ anonfun $ 4 $$ anonfun $ apply $ 1.isDefinedAt(GazetteerGenerator.scala:81)at com.quid.gazetteers.GazetteerGenerator $$ anonfun $ 4 $$ anonfun $ apply $ 1.isDefinedAt(GazetteerGenerator.scala:79)at scala.collection.TraversableLike $$ anonfun $ collect $ 1.apply(TraversableLike.scala:278)scala.collection.immutable.List.foreach(List.scala:318)at scala .collection.TraversableLike $ class.collect(TraversableLike.scala:278)at scala.collection.AbstractTraversable.collect(Traversable.scala:105)at com.quid.gazetteers.GazetteerGenerator $$ anonfun $ 4.apply(GazetteerGenerator.scala:79) )at com.quid.gazetteers.GazetteerGenerator $$ anonfun $ 4.apply(GazetteerGenerator.scala:79)at scala.collection.Iterator $$ anon $ 13.hasNext(Iterator.scala:371)at org.apache.spark.util . collection.ExternalSorter.insertAll(ExternalSorter.scala:189)位于org.apache.spark.scheduler.Shuffl的org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)位于org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)的eMapTask.runTask(ShuffleMapTask.scala:73)org.apache.spark.scheduler.Task.run(Task.scala:89)at org .apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:214)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor) .java:617)在java.lang.Thread.run(Thread.java:745)
1 回答
这个问题已经解决了 . params属性没有序列化,可用于激发 Worker . 解决方案是在需要params属性的区域范围内形成spark广播变量 .