-
3 votesanswersviews
如何使用Java将unix epoch的列转换为Apache spark DataFrame中的Date?
我有一个json数据文件,其中包含一个属性[creationDate],它是“long”数字类型的unix epoc . Apache Spark DataFrame架构如下所示: root |-- creationDate: long (nullable = true) |-- id: long (nullable = true) |-- postTypeId: long (nul... -
3 votesanswersviews
如何将内存中的JSON字符串读入Spark DataFrame
我正在尝试将内存中的JSON string 读取到Spark DataFrame中: var someJSON : String = getJSONSomehow() val someDF : DataFrame = magic.convert(someJSON) 我花了很多时间查看Spark API,我能找到的最好的就是像这样使用_1544726: var someJSON : String... -
3 votesanswersviews
在Spark和Scala中转换数据框架构
我想使用Spark和Scala来转换数据帧的模式以更改某些列的类型 . 具体来说,我试图使用[U]函数,其描述如下:“返回一个新的数据集,其中每个记录已映射到指定的类型 . 用于映射列的方法取决于U的类型” 原则上这正是我想要的,但我无法让它发挥作用 . 这是一个简单的例子,取自https://github.com/apache/spark/blob/master/sql/core/src/tes... -
5 votesanswersviews
数据帧Spark scala爆炸json数组
假设我有一个如下所示的数据框: +--------------------+--------------------+--------------------------------------------------------------+ | id | Name | ... -
1 votesanswersviews
将数组值列转换为字符串列(包含序列化的json)
Background 我通过首先从 StructType 从 dataset.schema() 生成叶节点/类型的映射,然后使用 col.alias(...) 和 select(reAliasedCols) 生成展平数据集来展平给定Spark DataSet的嵌套模式 . 例如: Issue 我想将它们序列化为JSON字符串,而不是旋转/爆炸数组类型字段 . 这种方法似乎几乎可以使用to_json... -
4 votesanswersviews
展平嵌套的Spark Dataframe
有没有办法压缩任意嵌套的Spark Dataframe?我所看到的大部分工作都是针对特定的模式编写的,我希望能够通过不同的嵌套类型(例如StructType,ArrayType,MapType等)来泛化一个Dataframe . 假设我有一个类似的架构: StructType(List(StructField(field1,...), StructField(field2,...), ArrayT... -
3 votesanswersviews
获取spark数据帧中ArrayType列的不同元素
我有一个名为id,feat1和feat2的3列数据框 . feat1和feat2采用Array of String的形式: Id, feat1,feat2 ------------------ 1, ["feat1_1","feat1_2","feat1_3"],[] 2, ["feat1_2"],["... -
5 votesanswersviews
DataFrame partitionBy嵌套列
我试图在嵌套字段上调用partitionBy,如下所示: val rawJson = sqlContext.read.json(filename) rawJson.write.partitionBy("data.dataDetails.name").parquet(filenameParquet) 我运行时遇到以下错误 . 我确实看到“名称”列为以下架构中的字段 . 是否有不... -
0 votesanswersviews
如何提取Array的ElementType作为StructType的实例
我尝试在spark中分解复杂数据帧的结构 . 我只对root下的嵌套数组感兴趣 . 问题是我无法从StructField的类型中检索ElementType . 这是一个示例,这是StructType对象的模式: df.printSchema result>> root |-- ID: string (nullable = true) |-- creationDate: string... -
2 votesanswersviews
Spark Dataframe Arraytype列
我想在数据帧上创建一个新列,这是将函数应用于arraytype列的结果 . 像这样的东西: df = df.withColumn("max_$colname", max(col(colname))) 列的每一行都包含一个值数组? spark.sql.function中的函数似乎只在列基础上工作 . -
0 votesanswersviews
Spark SQL删除空格
我有一个简单的Spark程序,它读取JSON文件并发出CSV文件 . 在JSON数据中,值包含前导和尾随空格,当我发出CSV时,前导和尾随空格都消失了 . 有没有办法可以保留空间 . 我尝试了很多选项,如ignoreTrailingWhiteSpace,ignoreLeadingWhiteSpace,但没有运气 input.json {"key" : "k1"... -
0 votesanswersviews
在缺少json数据源的spark中读取json
我试图使用下面的代码将示例json文件读入SqlContext,但它失败并出现数据源错误 . val sqlContext = new org.apache.spark.sql.SQLContext(sc) val path = "C:\\samplepath\\sample.json" val jsondata = sqlContext.read.json(path) j... -
0 votesanswersviews
spark cross加入内存泄漏
我有两张 table 要交叉加入, 表1:查询300M行表2:产品描述3000行 以下查询执行交叉连接并计算元组之间的分数,并选择前3个匹配, query_df.repartition(10000).registerTempTable('queries') product_df.coalesce(1).registerTempTable('products') CREATE TABLE m... -
3 votesanswersviews
使用自定义列将Data DataFrame添加到DataSet
我们正在努力将Spark DataFrame转换为具有动态列的case类的DataSet . 每个用例的起点都是DataFrame,如下所示: root |-- id: string (nullable = true) |-- time: long (nullable = true) |-- c: struct (nullable = true) | |-- d: long (nullabl... -
1 votesanswersviews
如何使用SPARK在HDFS中编写大数据(大约800 GB)作为hive orc表?
我在最近3-4个月和最近工作在 Spark Project . 我正在使用巨大的历史文件(800 GB)和一个小的增量文件(3 GB)进行一些计算 . 计算使用 hqlContext & dataframe 很快发生火花,但是当我试图写的计算结果与 orc 格式的 hive table 其中将包含近20十亿的记录有近800 GB的数据大小花费过多时间(超过2小时,最后失败) . 我的群集详细信息... -
1 votesanswersviews
Spark将大文件作为输入流读取
我知道spark内置方法可以有分区和读取大块文件并使用textfile分发为rdd . 但是,我在一个定制的加密文件系统中读到这个,火花本质上不支持 . 我能想到的一种方法是读取输入流并加载多行并分发给执行程序 . 继续阅读,直到加载所有文件 . 因此,由于内存不足错误,执行程序不会爆炸 . 这有可能在火花中做到这一点吗? -
5 votesanswersviews
如何优化spark sql并行运行它
我是一个火花新手,并使用Spark SQL / hiveContext有一个简单的spark应用程序: 从蜂巢表中选择数据(10亿行) 做一些过滤,聚合包括row_number over window function选择第一行,group by,count()和max()等 . 将结果写入HBase(数亿行) 我提交作业在纱线集群(100个 Actuator )上运行它,它很慢,... -
2 votesanswersviews
为什么在本地模式下加入spark是如此之慢?
我在本地模式下使用spark,简单的连接花费的时间太长 . 我已经获取了两个数据帧:A(8列和230万行)和B(8列和120万行)并使用 A.join(B,condition,'left') 连接它们并最终调用一个动作 . 它创建了一个具有三个阶段的单个作业,每个阶段用于两个数据帧提取,一个用于连接 . 令人惊讶的是,提取数据帧A的阶段大约需要8分钟,而数据帧B的需要花费1分钟 . 并在几秒钟内加... -
1 votesanswersviews
OOM |无法查询Spark临时表
我在Hive表中有450万条记录 . 我的要求是通过Spark thrift服务器将此表缓存为临时表,直线使Tableau可以查询临时表并生成报告 . 我有4个节点集群,每个节点有50g RAM和25个vCores . 我在Spark 1.4.1中使用HDP2.3 问题: 我能够在不到一分钟的时间内缓存表,并能够从临时表中获取正确的计数 . 但问题是当我尝试用一列执行选择查询(使用beeline,... -
1 votesanswersviews
Dataframe将直接从Executor连接到RDBMS,还是会通过Driver?
在Spark Dataframe中我正在寻找以下引擎优化的解释 . 数据帧是特殊类型的RDD,它内部包含行RDD . 这些RowRDD分布在执行程序中 . 当我们从执行程序写入这些RowRDD时(特别是在YARN-CLIENT模式下运行时),行RDD将从EXECUTOR传输到DRIVER,DRIVER使用JDBC连接写入Oracle . (这是真的吗?) 在YARN-CLUSTER模式... -
1 votesanswersviews
Spark Window性能问题
我有一个镶木地板数据框,具有以下结构: ID字符串 日期日期 480其他类型为Double的要素列 我必须用相应的加权移动平均值替换480个特征列中的每一个,窗口为250.最初,我尝试对单个列执行此操作,使用以下简单代码: var data = sparkSession.read.parquet("s3://data-location") var window ... -
1 votesanswersviews
Spark 1.6向Cassandra插入数据帧
我想向cassandra插入一个数据帧 . 当我写rdd.tosaveToCasssandra(“keyspace”,“table”) 没问题,但我不能用这个功能写 myDataFrame.tosaveToCassandra("keyspace","table") 我也试过但没有保存 . myDataFrame.write.format("org... -
3 votesanswersviews
Spark DataFrame到RDD并返回
我正在使用Scala编写Apache Spark应用程序 . 为了处理和存储数据,我使用DataFrames . 我有一个很好的管道,包括特征提取和MultiLayerPerceptron分类器,使用ML API . 我也想使用SVM(用于比较目的) . 事情是(如果我弄错的话,纠正我)只有MLLib提供SVM . 并且MLLib还没有准备好处理DataFrames,只有RDD . 所以我想我可以... -
2 votesanswersviews
为什么Spark Cassandra连接器允许过滤,即使在使用DataFrame API通过partitiong键查询表时也是如此?
鉴于Cassandra表: CREATE TABLE data_storage.stack_overflow_test_table ( id int, text_id text, clustering date, some_other text, PRIMARY KEY (( id, text_id ), clustering) ) 以下查询是有效查询: ... -
0 votesanswersviews
如何将Cassandra设置为我的Spark Cluster的分布式存储(文件系统)
我是大数据和Spark(pyspark)的新手 . 最近我刚刚设置了一个spark集群,并希望在我的spark集群上使用Cassandra文件系统(CFS)来帮助上传文件 . 任何人都可以告诉我如何设置它并简要介绍如何使用CFS系统? (比如如何上传文件/从哪里) 顺便说一下,我甚至不知道如何使用HDFS(我下载了预先构建的spark-bin-hadoop但我在我的系统中找不到hadoop . )... -
0 votesanswersviews
如何将file.deflate.gz文件加载到spark数据帧?
我有以 delflate.gz 格式压缩的源文件 . 在将数据加载到Spark数据框架时,它失败并出现 ArrayOutofBound 异常 . val cf = spark.read.option("header", "false").option("delimiter", "\u0001").option(&quo... -
0 votesanswersviews
Cassandra与Spark的联系
我正在与Cassandra连接spark并且我在Cassandra中存储csv文件,当我输入此命令时出现错误 . dfprev.write.format("org.apache.spark.sql.cassandra") .options(Map("keyspace"->"sensorkeyspace","table&qu... -
0 votesanswersviews
如何使用scala获取包含csv.gz目录的tar目录中的所有csv文件?
我有以下问题:假设我有一个包含压缩目录.tar的目录,其中包含多个文件.csv.gz . 我想获取父压缩directorie * .tar中的所有csv.gz文件 . 我使用scala 2.11.7这棵树 file.tar |file1.csv.gz file11.csv |file2.csv.gz file21.cs... -
1 votesanswersviews
如何使用Spark&Scala将数据写入CouchBase?
我是CouchBase的新手 . 我正在尝试以本地模式将数据写入CouchBase . 我的示例代码如下, val cfg = new SparkConf() .setAppName("couchbaseQuickstart") .setMaster("local[*]") .set("com.couchbase.bucket.MyBucket&q... -
1 votesanswersviews
在 Spark 中更改表的架构
我有一个火花表,如下所示: Table1 col1 string col2 int col3 string col4 int col5 string 我有另一个表,如下所示: Table2 col1 ...