如何连接PySpark使用SSL来Elasticsearch并验证设置为False证书?

问题描述 投票:1回答:1

先前我已成功连接到用下面的代码的Elasticsearch簇直接从Python中:

ssl_context = create_ssl_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE

es = Elasticsearch(
    ES_HOST,
    http_auth=(ES_USERNAME, ES_PASSWORD),
    scheme="https",
    port=ES_PORT,
    use_ssl=True,
    verify_certs=False,
    ssl_context=ssl_context,
    ca_certs=False
)

现在,我试图连接到使用Pyspark到Elasticsearch连接器相同的集群。星火已经创建了版本2.4.0Hadoop 2.7。我使用elasticsearch-hadoop-6.1.1连接两个。

我用下面的配置PySpark与ES连接:

es_live_conf = {

    "es.nodes" : ES_HOST,

    "es.port" : ES_PORT,

    "es.resource" : 'testindex/testdoc',

    "es.net.http.auth.user" : ES_USERNAME,

    "es.net.http.auth.pass" : ES_PASSWORD,

    "es.net.ssl":"true",

    "es.nodes.resolve.hostname": "false",

    "es.net.ssl.cert.allow.self.signed": "true"

}

然后这个代码来激活连接:

sc = SparkContext(appName="PythonSparkStreaming")  
sc.setLogLevel("WARN")

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=es_live_conf)

我得到与在宁静的通话启用TRACE记录以下错误:

19/02/07 07:00:31 DEBUG CommonsHttpTransportFactory: Creating new CommonsHttpTransport
19/02/07 07:00:31 DEBUG CommonsHttpTransport: SSL Connection enabled
19/02/07 07:00:31 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
19/02/07 07:00:31 TRACE CommonsHttpTransport: Opening HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:31 DEBUG HeaderProcessor: Added HTTP Headers to method: [Content-Type: application/json
, Accept: application/json
]
19/02/07 07:00:31 TRACE CommonsHttpTransport:Tx [HEAD]@[xx.xx.xx.xx:xxxxx][testindex]?[null] w/ payload [null]
19/02/07 07:00:31 DEBUG SSLSocketFactory: No keystore location specified! SSL is continuing with no keystore.
19/02/07 07:00:31 DEBUG SSLSocketFactory: No truststore location specified! SSL is continuing with no truststore.
19/02/07 07:00:35 TRACE CommonsHttpTransport: Rx @[xx.xx.xx.xx] [404-Not Found] [null]
19/02/07 07:00:35 TRACE CommonsHttpTransport: Closing HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:35 DEBUG CommonsHttpTransportFactory: Creating new CommonsHttpTransport
19/02/07 07:00:35 DEBUG CommonsHttpTransport: SSL Connection enabled
19/02/07 07:00:35 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
19/02/07 07:00:35 TRACE CommonsHttpTransport: Opening HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:35 DEBUG HeaderProcessor: Added HTTP Headers to method: [Content-Type: application/json
, Accept: application/json
]
19/02/07 07:00:35 TRACE CommonsHttpTransport: Tx [GET]@[xx.xx.xx.xx:xxxxx][]?[null] w/ payload [null]
19/02/07 07:00:35 DEBUG SSLSocketFactory: No keystore location specified! SSL is continuing with no keystore.
19/02/07 07:00:35 DEBUG SSLSocketFactory: No truststore location specified! SSL is continuing with no truststore.
19/02/07 07:00:39 TRACE CommonsHttpTransport: Rx @[yyy.yy.y.y] [200-OK] [{
  "name" : "iad1esapp2vz742",
  "cluster_name" : "68fc89bc2c36e7188782e4f226ed3948",
  "cluster_uuid" : "7xk6py25R_mRgLgLTYeavg",
  "version" : {
    "number" : "5.6.5",
    "build_hash" : "6a37571",
    "build_date" : "2017-12-04T07:50:10.466Z",
    "build_snapshot" : false,
    "lucene_version" : "6.6.1"
  },
  "tagline" : "You Know, for Search"
}
]
19/02/07 07:00:39 TRACE CommonsHttpTransport: Closing HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:39 DEBUG CommonsHttpTransportFactory: Creating new CommonsHttpTransport
19/02/07 07:00:39 DEBUG CommonsHttpTransport: SSL Connection enabled
19/02/07 07:00:39 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
19/02/07 07:00:39 TRACE CommonsHttpTransport: Opening HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:39 DEBUG HeaderProcessor: Added HTTP Headers to method: [Content-Type: application/json
, Accept: application/json
]
19/02/07 07:00:39 TRACE CommonsHttpTransport: Tx [GET]@[xx.xx.xx.xx:xxxxx][_nodes/http]?[null] w/ payload [null]
19/02/07 07:00:39 DEBUG SSLSocketFactory: No keystore location specified! SSL is continuing with no keystore.
19/02/07 07:00:39 DEBUG SSLSocketFactory: No truststore location specified! SSL is continuing with no truststore.
19/02/07 07:00:43 TRACE CommonsHttpTransport: Rx @[yyy.yy.y.y] [200-OK] [{"_nodes":{"total":14,"successful":14,"failed":0},"cluster_name":"68fc89bc2c36e7188782e4f226ed3948","nodes":{"VJl4scTeQbuPmnXcsb5MOA":{"name":"iad1esdn20vz166","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn20"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"MEq2hOGkSdeBMldoBiv2AQ":{"name":"iad1esdn23vz187","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn23"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"AHDBAMeQTg2LMyhvnxYxeQ":{"name":"iad1esmst4vz277","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["master"],"attributes":{"phy_host":"iad1esmst4"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"nJjtbzRuTP2Bkp9E0EOLBw":{"name":"iad1esapp0vz755","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["ingest"],"attributes":{"phy_host":"iad1esapp0"},"http":{"bound_address":["[::]:9200"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"5PrI09xWQRuNHPPUYCKOcQ":{"name":"iad1esdn31vz12","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn31"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"f1l6borzQt6-d_QEy9hY9Q":{"name":"iad1esapp1vz782","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["ingest"],"attributes":{"phy_host":"iad1esapp1"},"http":{"bound_address":["[::]:9200"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"bO6AoGXFSgGzrISe-pCW8g":{"name":"iad1esdn21vz99","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn21"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"z7UA-JHlQ7SCNyUYfbwR1A":{"name":"iad1esmst5vz277","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["master"],"attributes":{"phy_host":"iad1esmst5"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"53RAotd0QbiE28swz3fyOg":{"name":"iad1esapp2vz742","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["ingest"],"attributes":{"phy_host":"iad1esapp2"},"http":{"bound_address":["[::]:9200"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"5Lje4zCgSryonTouZMBqyA":{"name":"iad1esdn22vz156","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn22"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"YHFxFv5SQzCt9OesKD_a-g":{"name":"iad1esapp3vz748","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["ingest"],"attributes":{"phy_host":"iad1esapp3"},"http":{"bound_address":["[::]:9200"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"yZOAHjgoSz-Qi_eNO7HHxg":{"name":"iad1esmst3vz277","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["master"],"attributes":{"phy_host":"iad1esmst3"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"kg9NYEJiRy60InOxIh796A":{"name":"iad1esdn1vz163","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn1"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}},"PvQ0PobMTaSNnzw0l03z4A":{"name":"iad1esdn13vz80","transport_address":"xx.xx.xx.xx:xxxxx","host":"xx.xx.xx.xx","ip":"xx.xx.xx.xx","version":"5.6.5","build_hash":"6a37571","roles":["data"],"attributes":{"phy_host":"iad1esdn13"},"http":{"bound_address":["[::]:11203"],"publish_address":"xx.xx.xx.xx:xxxxx","max_content_length_in_bytes":104857600}}}}]
19/02/07 07:00:43 TRACE CommonsHttpTransport: Closing HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:43 DEBUG CommonsHttpTransportFactory: Creating new CommonsHttpTransport
19/02/07 07:00:43 DEBUG CommonsHttpTransport: SSL Connection enabled
19/02/07 07:00:43 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
19/02/07 07:00:43 TRACE CommonsHttpTransport: Opening HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:00:43 DEBUG HeaderProcessor: Added HTTP Headers to method: [Content-Type: application/json
, Accept: application/json
]
19/02/07 07:00:43 TRACE CommonsHttpTransport: Tx [GET]@[xx.xx.xx.xx:xxxxx][_nodes/http]?[null] w/ payload [null]
19/02/07 07:00:43 DEBUG SSLSocketFactory: No keystore location specified! SSL is continuing with no keystore.
19/02/07 07:00:43 DEBUG SSLSocketFactory: No truststore location specified! SSL is continuing with no truststore.
[I 07:01:55.454 NotebookApp] Saving file at /work/sth-baseline.ipynb
[W 07:01:55.455 NotebookApp] Notebook work/sth-baseline.ipynb is not trusted
19/02/07 07:05:48 TRACE NetworkClient: Caught exception while performing request [xx.xx.xx.xx:xxxxx][_nodes/http] - falling back to the next node in line...
java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:666)
    at sun.security.ssl.SSLSocketImpl.<init>(SSLSocketImpl.java:471)
    at sun.security.ssl.SSLSocketFactoryImpl.createSocket(SSLSocketFactoryImpl.java:153)
    at org.elasticsearch.hadoop.rest.commonshttp.SSLSocketFactory.createSocket(SSLSocketFactory.java:129)
    at org.apache.commons.httpclient.HttpConnection.open(HttpConnection.java:707)
    at org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:387)
    at org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
    at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
    at org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
    at org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport.execute(CommonsHttpTransport.java:478)
    at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:112)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:466)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:430)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
    at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:155)
    at org.elasticsearch.hadoop.rest.RestClient.getHttpNodes(RestClient.java:112)
    at org.elasticsearch.hadoop.rest.RestClient.getHttpDataNodes(RestClient.java:129)
    at org.elasticsearch.hadoop.rest.InitializationUtils.filterNonDataNodesIfNeeded(InitializationUtils.java:157)
    at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:223)
    at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:405)
    at org.elasticsearch.hadoop.mr.EsInputFormat.getSplits(EsInputFormat.java:386)
    at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:130)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1343)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1337)
    at org.apache.spark.api.python.SerDeUtil$.pairRDDToPython(SerDeUtil.scala:239)
    at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:302)
    at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
19/02/07 07:05:48 TRACE CommonsHttpTransport: Closing HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:05:48 DEBUG CommonsHttpTransportFactory: Creating new CommonsHttpTransport
19/02/07 07:05:48 DEBUG CommonsHttpTransport: SSL Connection enabled
19/02/07 07:05:48 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
19/02/07 07:05:48 TRACE CommonsHttpTransport: Opening HTTP transport to xx.xx.xx.xx:xxxxx
19/02/07 07:05:48 ERROR NetworkClient: Node [xx.xx.xx.xx:xxxxx] failed (Connection refused (Connection refused)); selected next node [xx.xx.xx.xx:xxxxx]

我要指出,利用与当地ES集群连接的配置相同的代码工作的成功。因此,错误的是纯粹用SSL连接到远程集群做。我能够从客户端环境的命令curl --insecure --user ES_USER:ES_PASS -XGET 'https://ES_HOST:ES_PORT/'访问群集 - 如此看来,客户端应该能够连接。

如何设置使用正确的SSL细节星火ES连接?我试图消除在配置es.net.ssl.cert.allow.self.signedes.nodes.resolve.hostname性能,但仍然收到了同样的错误。

python apache-spark elasticsearch pyspark elasticsearch-5
1个回答
0
投票

好了,想通了。

答案就在这里详细介绍:https://www.elastic.co/guide/en/elasticsearch/hadoop/master/cloud.html

简单地说,你可以在日志中看到张贴的客户端能够连接到群集并取回节点的列表。一旦有这单子它会尝试连接到他们,但他们都是在专用网络上访问。从本质上讲,你需要把与尽管它指出,这可能会影响性能的群集节点关闭,直接连接的能力。做到这一点的设置是"es.nodes.wan.only": "true",虽然我还设置"es.nodes.discovery": "false"摸不着头脑,所以发现设置可能不是必要的,但我会离开这里的情况下,它是非常有用的。

© www.soinside.com 2019 - 2024. All rights reserved.