使用Spark SQL从Cou chbase 5.x读取文档

问题描述 投票:0回答:1

我试图通过spark-shell使用Spark SQL读取存储桶的文档。

spark-shell --packages com.couchbase.client:spark-connector_2.11:2.2.0

import org.apache.spark.sql.SparkSession
import com.couchbase.spark.sql._
import com.couchbase.client.java.document.JsonDocument 
import com.couchbase.client.java.query.N1qlQuery
import com.couchbase.client.java.view.ViewQuery
import org.apache.spark.sql.sources.EqualTo 

// Configure Spark
val sparkConf = SparkSession.
      builder().
      appName("KeyValueExample").
      master("local[*]").
      config("spark.couchbase.nodes", "135.x.x.x").
      config("spark.couchbase.username", "Administrator").
      config("spark.couchbase.password", "password").
      config("spark.couchbase.bucket.transaction-datastore", "transaction-datastore").
      getOrCreate()

import sparkConf.implicits._
val sc = sparkConf.sparkContext
val sql = sparkConf.sqlContext

// Create a DataFrame with Schema Inference
val cc = sql.read.couchbase(EqualTo("type", "Credit Card"))

sql.read.couchbase()抛出错误如下:

19/03/13 13:04:19 WARN端点:[null] [KeyValueEndpoint]:身份验证失败。 19/03/13 13:04:19 WARN端点:[null] [KeyValueEndpoint]:身份验证失败。 19/03/13 13:04:19 WARN端点:重新连接时出错:com.couchbase.client.core.endpoint.kv.AuthenticationException:com.couchbase.client.core.endpoint.kv.KeyValueAuthHandler.checkIsAuthed上的身份验证失败( KeyValueAuthHandler.java:288)位于com.couchbase.client.core.endpoint.kv.KeyValueAuthHandler.channelRead0(KeyValueAuthHandler.java:173)的com.couchbase.client.core.endpoint.kv.KeyValueAuthHandler.channelRead0(KeyValueAuthHandler.java: 52)at com.couchbase.client.deps.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java: 356)在com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)的com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java: 335)在com.couchbase.clie nt.deps.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)at com。在com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)上的couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)。 couchbase.client.deps.io.netty.channel.CombinedChannelDuplexHandler $ DelegatingChannelHandlerContext.fireChannelRead(combinedChannelDuplexHandler.java:438)at com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:312) )at com.couchbase.client.deps.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:286)at com.couchbase.client.deps.io.netty.channel.CombinedChannelDuplexHandler.channelRead(combinedChanne) lDuplexHandler.java:253)at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead( AbstractChannelHandlerContext.java:342)at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)at com.couchbase.client.deps.io.netty.handler.timeout.IdleStateHandler。 channel.com(IdleStateHandler.java:286)位于com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)的com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext。 invokeChannelRead(AbstractChannelHandlerContext.java:342)at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:335)at com.couchbase.client.deps.io.netty.channel.DefaultChannelPipel位于com.couchbase.client.deps.io.netty的com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:356)上的$ $ HeadContext.channelRead(DefaultChannelPipeline.java:1302)。 channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:342)at com.couchbase.client.deps.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)at com.couchbase.client.deps.io.netty。 channel.nio.AbstractNioByteChannel $ NioByteUnsafe.read(AbstractNioByteChannel.java:131)位于com.couchbase.client的com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:646)。 deps.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:581)位于com的com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)。在com.couc上的couchbase.client.deps.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:460) hbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor $ 2.run(SingleThreadEventExecutor.java:131)at com.couchbase.client.deps.io.netty.util.concurrent.DefaultThreadFactory $ DefaultRunnableDecorator.run(DefaultThreadFactory。 java:138)at java.lang.Thread.run(Thread.java:748)

请帮忙!提前致谢。 :)

apache-spark apache-spark-sql couchbase
1个回答
1
投票

1-通过安全性 - >添加用户创建新用户/密码

2-添加此用户访问存储桶的权限。 (如果您正在进行测试环境,只需添加“完全管理员”权限)

3-在您的代码中,执行以下操作:

val sparkConf = SparkSession.
  builder().
  appName("KeyValueExample").
  master("local[*]").
  config("spark.couchbase.nodes", "135.x.x.x").
  config("spark.couchbase.username", "myUser").
  config("spark.couchbase.password", "myPassword").
  config("spark.couchbase.bucket.myBucketName", ""). //it must have a empty string as a parameter (backward compatibility)
  getOrCreate()
© www.soinside.com 2019 - 2024. All rights reserved.