-
14 votesanswersviews
将数据导入Spark时如何设置分区/节点数
Problem: 我想使用以下方法将数据从S3导入Spark EMR: data = sqlContext.read.json("s3n://.....") 有没有办法可以设置Spark用来加载和处理数据的节点数量?这是我处理数据的示例: data.registerTempTable("table") SqlData = sqlContext.sql(&q... -
0 votesanswersviews
无法通过pyspark创建spark数据帧
我想使用PySpark创建spark数据帧,为此我在PyCharm中运行了这段代码: from pyspark.sql import SparkSession Spark_Session:SparkSession.builder\ .enableHiveSupport()\ .master("local"\ .getOrCreate() 但是,它会返回此错误: 使用Spar... -
0 votesanswersviews
无法将数据框保存为 Hive 表,找不到文件引发异常
当我尝试将数据框保存为 pyspark 中的 Hive 表时 df_writer.saveAsTable('hive_table', format='parquet', mode='overwrite') 我收到以下错误: 由以下原因引起:org.apache.hadoop.mapred.InvalidInputException:输入路径不存在:org.apache.hadoop.mapre... -
3 votesanswersviews
如何在pyspark中合并两个条件的条件?
我能够合并和排序值,但如果值相等,则无法确定不合并的条件 df = sqlContext.createDataFrame([("foo", "bar","too","aaa"), ("bar", "bar","aaa","foo")], (... -
2 votesanswersviews
如何按多列分组并在PySpark中列表?
这是我的问题:我有这个RDD: a = [[u'PNR1',u'TKT1',u'TEST',u'a2',u'a3'],[u'PNR1',u'TKT1',u'TEST',u'a5',u'a6'],[u'PNR1',u'TKT1',u'TEST',u'a8',u'a9']] rdd= sc.parallelize (a) 然后我尝试: rdd.map(lambda x: (x[0],x[1],x[... -
0 votesanswersviews
PySpark中 Map 的汇总列表
我有一个 Map 列表,例如 [{'a' : 10,'b': 20}, {'a' : 5,'b': 20} , {'b': 20} ,{'a' : 0,'b': 20} } 我想得到a和b的平均值 . 所以预期的产出是 a = (10 + 5 + 0 + 0) /3 = 5 ; b = 80/4 = 20. 我怎样才能有效地使用RDD -
-4 votesanswersviews
如何在不使用for循环的情况下从pyspark中的列表创建数据框?
我有如下列表: rrr=[[(1,(3,1)),(2, (3,2)),(3, (3, 2)),(1,(4,1)),(2, (4,2))]] df_input = [] 接下来我定义了如下 Headers : df_header=['sid', 'tid', 'srank'] 使用for循环将数据附加到空列表中: for i in rrr: for j in i: ... -
0 votesanswersviews
在pyspark中将数据框保存为文本文件格式? [重复]
这个问题在这里已有答案: how to export a table dataframe in pyspark to csv? 5个答案 我有如下数据框: +-------+------+----+----+ | a| b|c |d | +-------+-----------+----+ | 101| 244| 4| 1| | 101| ... -
0 votesanswersviews
PySpark:数据并不总是符合模式 - 逻辑来改变数据
我是PySpark的新手,正在编写一个脚本,从 .csv 文件中读取 . 我已明确定义了下面的模式,并且脚本运行完美......大部分时间 . 问题是,有时值会输入不符合架构的文件 - 例如'-'可能出现在整数字段中,因此,我们得到一个类型错误 - 在脚本中达到 df1.show() 时会抛出错误 . 我试图想办法有效地说 - 如果值与定义的数据类型不匹配,则替换为'' 有谁知道这可能吗?任何建议... -
0 votesanswersviews
Pyspark - saveAsTable - 如何将新数据插入现有表?
How to Insert new data to existing table??? 我正在尝试使用pyspark将新数据插入现有表 . 这是我的计划 from pyspark import SparkContext from pyspark.sql import SQLContext, DataFrameWriter sc = SparkContext("local[*]"... -
3 votesanswersviews
Pyspark DataFrame - 如何使用变量进行连接?
我在python上使用Spark数据帧在两个数据帧上进行连接时遇到了一些麻烦 . 我有两个数据框,我必须更改列的名称,以使它们对每个数据框唯一,所以稍后我可以告诉哪个列是哪个 . 我这样做是为了重命名列(firstDf和secondDf是使用createDataFrame函数创建的Spark DataFrames): oldColumns = firstDf.schema.names newCol... -
2 votesanswersviews
Pyspark 数据帧连接需要很长时间
我在 pyspark 中有 2 个数据框,是使用 2 个 sparksql 查询从 hive 数据库中加载的。 当我尝试使用df1.join(df2,df1.id_1=df2.id_2)联接两个数据框时,需要花费很长时间。当我调用 JOIN 时,Spark 是否会重新执行 df1 和 df2 的 SQL? 基础数据库是 HIVE -
1 votesanswersviews
PySpark Dataframes:如何使用紧凑代码在多种条件下进行过滤?
如果我有一个列名列表,并且如果这些列的值大于零,我想对行进行过滤,是否可以执行类似的操作? columns = ['colA','colB','colC','colD','colE','colF'] new_df = df.filter(any([df[c]>0 for c in columns])) 返回: ValueError:无法将列转换为布尔值:请对“和”,“ |”使用“&”构建... -
1 votesanswersviews
在pyspark中使用Scala UDF中的默认参数值?
我在Scala中定义了一个UDF,默认参数值如下: package myUDFs import org.apache.spark.sql.api.java.UDF3 class my_udf extends UDF3[Int, Int, Int, Int] { override def call(a: Int, b: Int, c: Int = 6): Int = { c*(a ... -
5 votesanswersviews
Pyspark圆形功能的问题
在pyspark中使用圆形函数时遇到一些麻烦 - 我有下面的代码块,我试图将 new_bid 列舍入到2位小数,然后将列重命名为 bid - 我导入 pyspark.sql.functions AS func 以供参考,并使用其中包含的 round 函数: output = output.select(col("ad").alias("ad_id"), ... -
0 votesanswersviews
如何解决PySpark中的SQL ParseError?
我是使用Jupyter Notebook的PySpark和AWS Sagemaker的新手 . 我知道如何编写SQL语句来回答我的问题 . 此代码段应该:1 . 在第2年提取我的数据集(CDC死亡数据-in CSV)中的可用死亡处置方法 . 按年度计算每个处置的频率 我能够在MySQL数据库中的同一数据集上运行SQL语句 . 但是一旦我将查询添加到我的PySpark代码中,我得到了 ParseEr... -
-1 votesanswersviews
对象没有属性'na' - pyspark数据帧
我正在使用pyspark 2.4并尝试使用pyspark.sql.dataframe中的dropna函数 . from pyspark.sql.dataframe import * dropna_data = data.na.drop() 数据是一个pyspark数据帧 . 我收到错误 object has no attribute 'na' . 帮助他人? -
0 votesanswersviews
Pyspark转换复杂的Dataformat
用火花读一块镶木地板文件 df = spark.read.parquet("path_to_file") df.show(2) 我的df包含 **Output** +------+-----------------+ | col1 | col2 | +------+-----------------+ | "A1" | {&quo... -
3 votesanswersviews
有没有办法提高PySpark输出的效率?
我试图测试PySpark迭代一些非常大(10s的GB到1s的TB)数据的能力 . 对于大多数脚本,我发现PySpark具有与Scala代码大致相同的效率 . 在其他情况下(如下面的代码),我会遇到严重的速度问题,速度要慢10到12倍 . path = "path/to/file" spark = SparkSession.builder.appName("siteLi... -
2 votesanswersviews
模糊匹配pyspark数据帧字符串中的单词
我有一些数据,其中'X'列包含字符串 . 我正在编写一个函数,使用pyspark,其中传递search_word,并且过滤掉列'X'字符串中不包含子字符串search_word的所有行 . 该功能还必须允许单词的拼写错误,即模糊匹配 . 我已将数据加载到pyspark数据框中,并使用NLTK和fuzzywuzzy python库编写函数,如果字符串包含search_word,则返回True或Fal... -
1 votesanswersviews
如何在Pyspark中注册没有参数的UDF
我已经尝试使用lambda函数的参数spark UDF并注册它 . 但我怎么能创建没有参数和注册商的udf它我试过这个我的示例代码将显示当前时间 from datetime import datetime from pyspark.sql.functions import udf def getTime(): timevalue=datetime.now() ... -
4 votesanswersviews
在PySpark中使用Scala UDF
我希望能够在PySpark中将Scala函数用作UDF package com.test object ScalaPySparkUDFs extends Serializable { def testFunction1(x: Int): Int = { x * 2 } def testUDFFunction1 = udf { x: Int => testFunction1(... -
0 votesanswersviews
Pyspark:在窗口内使用udf
我需要使用Pyspark检测时间序列上的阈值 . 在下面的示例图中,我想检测(通过存储相关的时间戳)参数ALT_STD的每次出现都大于5000然后低于5000 . 对于这个简单的情况,我可以运行简单的查询,如 t_start = df.select('timestamp')\ .filter(df.ALT_STD > 5000)\ ... -
4 votesanswersviews
PySpark - 添加一个按用户排名的新列
海兰 我有这个PySpark DataFrame df = pd.DataFrame(np.array([ ["aa@gmail.com",2,3], ["aa@gmail.com",5,5], ["bb@gmail.com",8,2], ["cc@gmail.com",9,3] ]), columns... -
0 votesanswersviews
你能在pyspark中拥有一列数据帧吗?
我对pyspark / bigdata有点新意,所以这可能是一个坏主意,但我有大约一百万个单独的CSV文件,每个文件都与一些元数据相关联 . 我想要一个包含所有元数据字段列的pyspark数据框,但也有一个列,其条目是与每组元数据相关联的(整个)CSV文件 . 我现在不在工作,但我记得几乎确切的代码 . 我尝试了类似的玩具示例 outer_pandas_df = pd.DataFrame.from... -
1 votesanswersviews
将数据添加到存储在磁盘上的Spark / Parquet数据
我处于类似the one mentioned here的情况 . 问题没有得到满意的答复 . 此外,我处理的数据较少(每天约1G) . 我的情况:我已经有一定数量的数据(~500G)可用作镶木地板(这是商定的“存储格式”),我定期进行增量更新 . 我希望以后能够处理ETL部分以及分析部分 . 为了能够有效地生成某些“中间数据产品”的更新,我看到三个选项: 使用 append mode 保存,保... -
6 votesanswersviews
Spark:对数据进行排序和分区的最有效方法是将其写为镶木地板
我的数据原则上是一个表,除了其他'data'之外,它还包含一列 ID 和一列 GROUP_ID . 在第一步中,我将CSV读入Spark,进行一些处理以准备第二步的数据,并将数据写为镶木地板 . 第二步做了很多 groupBy('GROUP_ID') 和 Window.partitionBy('GROUP_ID').orderBy('ID') . 现在的目标是 - 为了避免第二步中的混乱 - ... -
3 votesanswersviews
如何将数据框中的数据写入HDFS中的单个.parquet文件(单个文件中的数据和元数据)?
如何将数据框中的数据写入HDFS中的单个.parquet文件(单个文件中的数据和元数据)? df.show() --> 2 rows +------+--------------+----------------+ | name|favorite_color|favorite_numbers| +------+--------------+----------------+ |Alyssa|... -
0 votesanswersviews
无法在火花中本地读取镶木地板文件
我在本地运行Pyspark并尝试读取镶木地板文件并从笔记本加载到数据框中 . df = spark.read.parquet(“metastore_db / tmp / userdata1.parquet”) 我得到了这个例外 An error occurred while calling o738.parquet. : org.apache.spark.sql.AnalysisException... -
2 votesanswersviews
为每组pyspark RDD / dataframe选择随机列
我的数据帧有10,0000列,我必须对每个组应用一些逻辑(键是区域和部门) . 每组将使用10k列中的最多30列,30列列表来自第二个数据集列“colList” . 每组将有2-3百万行 . 我的方法是按键分组和调用函数,如下所示 . 但它失败了 - 1. shuffle和2.data组超过2G(可以通过重新分区来解决,但是它的成本很高),3 . 非常慢 def testfunc(iter): ...