-
34 votesanswersviews
根据RDD / Spark DataFrame中的特定列从行中删除重复项
假设我有一个相当大的数据集,形式如下: data = sc.parallelize([('Foo',41,'US',3), ('Foo',39,'UK',1), ('Bar',57,'CA',2), ('Bar',72,'CA',2), ... -
2 votesanswersviews
如何处理spark中数据框列名称中的空格
我在df中注册了一个tmp表,在列header中有空格 . 我可以通过sqlContext使用sql查询时提取列 . 我尝试使用后退但它不起作用 df1 = sqlContext.sql("""select Company, Sector, Industry, `Altman Z-score as Z_Score` from tmp1 ""&qu... -
0 votesanswersviews
java.io.IOException:不完整的HDFS URI,没有主机:在AWS上运行pyspark
我是第一次使用AWS,并且已将我的文件存储在AWS上 . 到目前为止,这是我尝试用来读取文件的内容 . artist_data = sc.textFile('hdfs:///<aws_server>:<port>/home/ubuntu/artist_stuff/_artist_data') 还尝试过: artist_data = sc.textFile('hdfs://... -
0 votesanswersviews
spark cross加入内存泄漏
我有两张 table 要交叉加入, 表1:查询300M行表2:产品描述3000行 以下查询执行交叉连接并计算元组之间的分数,并选择前3个匹配, query_df.repartition(10000).registerTempTable('queries') product_df.coalesce(1).registerTempTable('products') CREATE TABLE m... -
5 votesanswersviews
Spark中两个大数据集之间的模糊连接
我需要根据两列字符串的相似性在两个大数据集之间进行模糊连接(假设每个数据集为30Gb) . 例如: 表格1: Key1 |Value1 ------------- 1 |qsdm fkq jmsk fqj msdk 表2: Key2 |Value2 ------------- 1 |qsdm fkqj mskf qjm sdk 我们的目的是计算value1的每一行与valu... -
2 votesanswersviews
为什么在本地模式下加入spark是如此之慢?
我在本地模式下使用spark,简单的连接花费的时间太长 . 我已经获取了两个数据帧:A(8列和230万行)和B(8列和120万行)并使用 A.join(B,condition,'left') 连接它们并最终调用一个动作 . 它创建了一个具有三个阶段的单个作业,每个阶段用于两个数据帧提取,一个用于连接 . 令人惊讶的是,提取数据帧A的阶段大约需要8分钟,而数据帧B的需要花费1分钟 . 并在几秒钟内加... -
8 votesanswersviews
Spark中的潜在Dirichlet分配(LDA)
我正在尝试在Spark中编写一个progor来执行Latent Dirichlet分配(LDA) . Spark文档page提供了一个很好的示例,用于在示例数据上执行LDA . 以下是该计划 from pyspark.mllib.clustering import LDA, LDAModel from pyspark.mllib.linalg import Vectors # Load and... -
0 votesanswersviews
来自RDD的PySpark LDA模型密集向量
我将我的数据设置为Apache Spark LDA模型 . 我正在进行的一个挂机是将列表转换为密集向量,因为我的RDD中有一些字母数字值 . 我在尝试运行示例代码时收到的错误是将字符串转换为float . 我知道这个错误,知道我对密集向量和浮点数的了解,但必须有一种方法将这些字符串值加载到LDA模型中,因为这是一个主题模型 . 我应该先说明我是Python和Spark的新手,所以如果我误解了某些东... -
0 votesanswersviews
将PySpark连接到AWS Redshift时出错
一直在尝试将我的EMR 5.11.0集群上的Spark 2.2.1连接到我们的Redshift商店 . 我遵循的方法是 - 使用内置的Redshift JDBC pyspark --jars /usr/share/aws/redshift/jdbc/RedshiftJDBC41.jar from pyspark.sql import SQLContext sc sql_context = S... -
0 votesanswersviews
无法初始化spark上下文
我试图在Python中初始化Spark Context变量 . from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName("test").setMaster("local") sc = SparkContext(conf=conf) 但我收到以下错误: py4j.p... -
0 votesanswersviews
如何在pyspark AWS上跨群集添加文件
我是新来的火花 . 我试图从我的主实例读取一个文件,但我收到此错误 . 经过研究,我发现您需要将数据加载到hdfs或跨群集复制 . 我无法找到执行其中任何一项的命令 . ------------------------------------------------- -------------------------- Py4JJavaError Traceback(最近一次调用last)in... -
3 votesanswersviews
为什么我只能分阶段看到200个任务?
我有一台火花机群,每台机器有8台机器,256个核心,180Gb内存 . 我已经启动了32个 Actuator ,每个 Actuator 有32个内核和40Gb内存 . 我正在尝试优化复杂的应用程序,我注意到很多阶段都有200个任务 . 在我的情况下,这似乎是次优的 . 我已经尝试将参数spark.default.parallelism设置为1024,但它似乎没有任何效果 . 我运行spark 2.... -
1 votesanswersviews
如何使用`py4j.java_gateway:Error`优雅地完成pyspark单元测试?
如何在测试结束时没有看到这个恼人的错误,优雅地完成pyspark单元测试?似乎python客户端正在失去与spark上下文或类似的东西的通信 . 错误:py4j.java_gateway:发送或接收时出错 . 回溯(最近一次调用最后一次):文件“C:\ Spark \ python \ lib \ py4j-0.10.1-src.zip \ py4j \ java_gateway.py”,第82... -
0 votesanswersviews
WARN cluster.YarnScheduler:初始作业未接受任何资源
我运行的任何火花作业都将失败,并显示以下错误消息 17/06/16 11:10:43 WARN cluster.YarnScheduler:初始工作没有接受任何资源;检查群集UI以确保工作人员已注册并具有足够的资源 Spark版本是1.6,在Yarn上运行 . 我正在从pyspark发布工作 . 您可以从作业时间表中注意到它无限期地运行并且没有添加或删除任何资源 . 1 -
3 votesanswersviews
获取错误py4j.protocol.Py4JNetworkError:来自Java端的答案在spark流中是空的
我试图通过使用python API的spark流来从kafka获取数据 . 任何人都知道为什么会出现这个错误 我收到以下错误:错误:root:发送命令时异常 . 回溯(最近一次调用最后一次):文件“/home/shadan/softwares/spark-2.1.1-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py”... -
0 votesanswersviews
Spark shell没有执行'Initial job has not accepted any resources'
我正在使用AWS EC2实例,我安装了Spark 2.2.0,我有8 GB的RAM和2个内核 . 我正在按照本教程使用pyspark shell进行一些操作:https://sparkour.urizone.net/recipes/managing-clusters/ 我开始了主人,我开始了一个奴隶 Worker ,他们出现在网上ui . 但是,在shell中,当我尝试执行如下命令时: >&... -
1 votesanswersviews
结构化流错误py4j.protocol.Py4JNetworkError:来自Java端的答案为空
我正在尝试使用PySpark和Structured Streaming(Spark 2.3)在两个Kafka Stream之间 Build 左外连接 . import os import time from pyspark.sql.types import * from pyspark.sql.functions import from_json, col, struct, explode, g... -
0 votesanswersviews
write.save上的Spark分区将所有数据带到驱动程序?
所以基本上我有一个python spark作业,读取一些简单的json文件,然后尝试将它们写为由一个字段分区的orc文件 . 分区不是很 balancer ,因为有些键非常大,而其他键非常小 . 做这样的事情我有记忆问题: events.write.mode('append').partitionBy("type").save("s3n://mybucket/tofo... -
1 votesanswersviews
Spark 1.6向Cassandra插入数据帧
我想向cassandra插入一个数据帧 . 当我写rdd.tosaveToCasssandra(“keyspace”,“table”) 没问题,但我不能用这个功能写 myDataFrame.tosaveToCassandra("keyspace","table") 我也试过但没有保存 . myDataFrame.write.format("org... -
0 votesanswersviews
Apache Spark:在Spark Standalone模式下禁用Spark Web UI
我在具有客户端部署模式的独立模式下运行Apache Spark 2.1.1 . I want to disable Spark web UI for master and all workers . 提到:https://spark.apache.org/docs/latest/configuration.html#spark-ui并在$ SPARK_HOME / conf / spark-... -
0 votesanswersviews
执行者可以在spark中共享核心吗?
在配置spark作业时,我有时会看到人们建议每个执行程序的核心数大于核心总数除以执行程序数 . 值得注意的是,在这个example中,@ 0x0FFF建议如下: --num-executors 4 --executor-memory 12g --executor-cores 4 如果我们计算 Actuator 核心的总数,我们得到 4 cores per executor * 4 execu... -
10 votesanswersviews
使用spark-submit, - length-executor-cores选项的行为是什么?
我正在使用python包装的C代码运行一个spark集群 . 我目前正在测试多线程选项的不同配置(在Python级别或Spark级别) . 我在HDFS 2.5.4集群上使用带有独立二进制文件的spark . 该集群目前由10个从站组成,每个从站有4个核心 . 从我所看到的,默认情况下,Spark每个节点启动4个从站(我一次有4个python在从属节点上工作) . 我怎样才能限制这个数字?我可以看... -
8 votesanswersviews
执行程序丢失时Spark应用程序无法恢复
我在一个独立的集群中运行Spark . 与Master和2个工作节点在同一节点上的python驱动程序应用程序 . 业务逻辑是由在Worker节点上创建的执行程序运行的python代码 . 如果其中一个遗嘱执行人死亡,我最终会陷入困境 . 如果我强行杀死Worker 0上的一个后端进程,Master输出: 16/06/07 16:20:35 ERROR TaskSchedulerImpl: Los... -
1 votesanswersviews
Spark启动的执行程序多于指定的
我正在使用Pyspark在独立(客户端)模式下运行Spark 1.5.1 . 我正在尝试开始一个似乎内存繁重的工作(在python中,因此它不应该是执行程序 - 内存设置的一部分) . 我正在使用96核和128 GB RAM的机器上进行测试 . 我有一个master和worker正在运行,开始使用/ sbin中的start-all.sh脚本 . 这些是我在/ conf中使用的配置文件 . 火花de... -
0 votesanswersviews
Spark Submit Executors == Spark Shell任务?
我试图了解我的spark-submit和spark shell工作之间的速度差异 . 我启动shell或使用相同的资源分配提交,但我似乎得到了非常不同的性能 . 当我在shell中运行它需要~10分钟与小时火花提交 . 那么我的问题是,REPL进度条中显示的任务数量与spark提交中运行的执行程序数量相同吗?我看到每个人都有不同的数字,我很想知道我做错了什么 . 在shell中我开始使用它 --e... -
2 votesanswersviews
从数据框中选择时重命名列名
我有2个数据帧:df1和df2,我在id列上将它们连接起来并保存到另一个名为df3的数据帧 . 下面是我正在使用的代码,它可以正常工作 . val df3 = df1.alias("tab1").join(df2.alias("tab2"),Seq("id"),"left_outer").select("ta... -
0 votesanswersviews
AWS Glue - 是否使用Crawlers
对于以镶木地板格式运行来自S3存储桶的数据的作业,有两种方法: 创建一个爬虫来创建一个模式表,使用 glueContext.create_dynamic_frame.from_catalog(dbname, tablename) 在Glue作业中形成动态框架 . 使用 glueContext.create_dynamic_frame.from_options("s3",... -
1 votesanswersviews
AWS Glue无法访问输入数据集
我在Glue / Athena注册了一个数据集,称之为 my_db.table . 我可以通过雅典娜查询它,一切似乎都井然有序 . 我正在尝试在Glue作业中使用此表,但我收到以下相当不透明的错误消息: py4j.protocol.Py4JJavaError: An error occurred while calling o54.getCatalogSource. : java.lang.Er... -
1 votesanswersviews
AWS Glue PySpark替换NULL
我正在运行AWS Glue作业,使用来自Glue的自动生成的PySpark脚本,将S3上的管道分隔文件加载到RDS Postgres实例中 . 最初,它在某些列中抱怨NULL值: pyspark.sql.utils.IllegalArgumentException: u"Can't get JDBC type for null" 在谷歌搜索和阅读之后,我尝试通过将我的AWS ... -
1 votesanswersviews
从AWS胶水作业中的数据源中读取 Headers
我有一个AWS Glue作业,它从数据源读取如下: datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "dev-data", table_name = "contacts", transformation_ctx = "datasource0") ...