使用sparklyr或sparkR连接Redshift?

问题描述 投票:0回答:1

我试图了解如何使用spark将R连接到redshift,我无法使用简单的RPostgres进行连接,因为该数据集非常庞大且需要分布式计算,

到目前为止,我能够从s3读取和写入CSV到spark数据帧,有人可以展示如何配置jar和其他东西,以便我可以将SparklyR(spark_read_jdbc())或sparkR连接到redshift。

如果你可以展示如何将jar添加到sparkContexts也会很有帮助

直到现在我已经发现databricks已经提供了访问jdbc url到redshift db所需的一些jar。

r amazon-redshift databricks sparkr sparklyr
1个回答
0
投票
rm(list=ls())
library(sparklyr)
#library(SparkR)
#detach('SparkR')
Sys.setenv("SPARK_MEM" = "15G")
config <- spark_config()
config$`sparklyr.shell.driver-memory` <- "8G"
config$`sparklyr.shell.executor-memory` <- "8G"
config$`spark.yarn.executor.memoryOverhead` <- "6GB"
config$`spark.dynamicAllocation.enabled`   <- "TRUE"
config$`sparklyr.shell.driver-java-options`<-list("driver-class-path" ="/home/root/spark/spark-2.1.0-bin-hadoop2.7/jars/RedshiftJDBC4-no-awssdk-1.2.20.1043.jar")
spark_dir = "/tmp/spark_temp"
config$`sparklyr.shell.driver-java-options` <-  paste0("-Djava.io.tmpdir=", spark_dir)
sc <- spark_connect(master = "local[*]", config = config)
#sc <- spark_connect(master = "local")

###invoke the spark context 
ctx <- sparklyr::spark_context(sc)
#Use below to set the java spark context ##"org.apache.spark.api.java.JavaSparkContext"
####
jsc <- sparklyr::invoke_static( sc, "org.apache.spark.api.java.JavaSparkContext", "fromSparkContext",ctx )
##invoke the hadoop context 
hconf <- jsc %>% sparklyr::invoke("hadoopConfiguration")
#hconf %>%    invoke("set","fs.s3a.access.key","<your access key for s3 >")  

hconf %>%    sparklyr::invoke("set","fs.s3a.access.key","<your access key for s3>")  
hconf %>% sparklyr::invoke("set","fs.s3a.secret.key", "<your secret key for s3>")   
hconf%>% sparklyr::invoke("set","fs.s3a.endpoint", "<your region of s3 bucket>") 

hconf %>% sparklyr::invoke("set","com.amazonaws.services.s3.enableV4", "true") 
hconf %>% sparklyr::invoke("set","spark.hadoop.fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")


hconf %>% sparklyr::invoke("set","fs.s3a.impl.disable.cache", "true") 



?spark_read_csv

###reading from s3 buckets 
spark_read_csv(sc=sc,name='sr',path="s3a://my-bucket/tmp/2district.csv",memory = TRUE)
spark_read_csv(sc=sc,name='sr_disk3',path="s3a://my-bucket/tmp/changed/",memory = FALSE)
###reading from local drive 
spark_read_csv(sc=sc,name='raw_data_loc_in3',path="/tmp/distance.csv",memory = TRUE)
spark_read_csv(sc=sc,name='raw_data_loc_in5',path="/tmp/distance.csv",memory = TRUE)









####reading from redshift table 
t<-sparklyr::spark_read_jdbc(sc, "connection",  options = list(
  url = "jdbc:redshift://<URL>:<Port>/<dbName>",
  user = "<user_name>",
  password = "<password>",
  dbtable='(Select * from sales limit 1000)',
  tempS3Dir = "s3a://my-bucket/migration"),memory = T,overwrite = T,repartition = 3)

####write rdd to csv in local
sparklyr::spark_write_csv(t,path='sample.csv')
####write rdd to csv in local
sparklyr::spark_write_csv(t,path='s3a://my-bucket/output/')
© www.soinside.com 2019 - 2024. All rights reserved.