orthose
orthose

Reputation: 11

How to configure authentication for Spark Connect?

I recently discovered the Spark Connect new feature in Spark 3.4. I have tested it on a YARN cluster and it works well! However, anyone can access my Spark Connect server and run jobs with my user. Is it possible to configure authentication for Spark Connect? A simple password, or the combination of an user name and a password.

I search something like:

./sbin/start-connect-server.sh \
--jars jars/spark-connect_2.12-3.4.1.jar \
--master yarn --name SparkConnectTest \
--conf spark.sql.catalogImplementation=hive \
--conf password=mysuperpassword
spark = SparkSession.builder.remote("sc://localhost").conf("password", "mysuperpassword").getOrCreate()

The official documentation says:

While Spark Connect does not have built-in authentication, it is designed to work seamlessly with your existing authentication infrastructure. Its gRPC HTTP/2 interface allows for the use of authenticating proxies, which makes it possible to secure Spark Connect without having to implement authentication logic in Spark directly.

But I don't know how to configure gRPC with Spark Connect.

Upvotes: 1

Views: 2037

Answers (1)

flowerbirds
flowerbirds

Reputation: 31

gRPC Proxy

proxying gRPC traffic: gRPC client -> APISIX -> gRPC/gRPCS server

https://apisix.apache.org/docs/apisix/grpc-proxy/

Maybe can help you.

--------------2023-11-02 updated-------------------------

I use a simpel gRPC example and proxy gRPC with apisix。

package user; 

service UserService{
  rpc getUserInfo(UserRequest) returns (UserResponse);
}

message UserRequest{
  string id = 1;
}

message UserResponse{
  string id = 1;
  int32 phoneNumber = 2; 
  string email = 3; 
  int32 serialNumber = 4; 
}

use apisix admin API to add route and auth.

# add route
curl http://127.0.0.1:30918/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
    "methods": ["POST", "GET"],
    "uri": "/user.UserService/getUserInfo",
    "upstream": {
        "scheme": "grpc",
        "type": "roundrobin",
        "nodes": {
            "172.28.208.1:5001": 1
        }
    }
}'

# add key-auth info
curl -i "http://127.0.0.1:30918/apisix/admin/consumers" -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
  "username": "tom",
  "plugins": {
    "key-auth": {
      "key": "secret-key"
    }
  }
}'

# add key-auth plugin to route
curl -i "http://127.0.0.1:30918/apisix/admin/routes/1" -X PATCH -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -d '
{
  "plugins": {
    "key-auth": {}
  }
}'

when I use client call service without apikey or wrong value:

Please input user id: 1
Please input user id: io.grpc.StatusRuntimeException: UNAUTHENTICATED: HTTP status code 401
invalid content-type: text/plain; charset=utf-8
headers: Metadata(:status=401,date=Wed, 01 Nov 2023 03:39:18 GMT,content-type=text/plain; charset=utf-8,server=APISIX/3.6.0)
DATA-----------------------------
{"message":"Missing API key found in request"}

    at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:222)
    at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:203)
    at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:132)
    at com.meritdata.grpc.proto.UserServiceGrpc$UserServiceBlockingStub.getUserInfo(UserServiceGrpc.java:358)
    at com.meritdata.grpc.client.ClientApp.getUserInfo(ClientApp.java:46)
    at com.meritdata.grpc.client.ClientApp.main(ClientApp.java:23)

spark connect gRPC proxy coming soon on.

-----------------Last updated------------------

Add spark connect route:

curl http://127.0.0.1:30918/apisix/admin/routes/spark-connect -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
    "methods": ["POST", "GET"],
    "uri": "/spark.connect.SparkConnectService/*",
    "upstream": {
        "scheme": "grpc",
        "type": "roundrobin",
        "nodes": {
            "10.43.105.147:15002": 1
        }
    },
    "plugins": {
        "key-auth": {}
    }
}'

use pyspark in python code:

from pyspark.sql import SparkSession
spark = SparkSession.builder.remote("sc://172.28.220.144:30981/;apikey=secret-key").getOrCreate()
...
spark.stop()

When there is no apiKey provided or an incorrect value is used during the connection, an error will be reported.

>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.remote("sc://172.28.220.144:30981/;apikey=secret-key1").getOrCreate()
E1102 14:17:30.511000000 16764 src/core/ext/transport/chttp2/transport/hpack_parser.cc:999] Error parsing 'content-type' metadata: invalid value
...\Miniconda3\envs\python310\lib\site-packages\pyspark\sql\connect\session.py:185: UserWarning: <_InactiveRpcError of RPC that terminated with:
        status = StatusCode.UNKNOWN
        details = "Stream removed"
        debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Stream removed", grpc_status:2, created_time:"2023-11-02T06:17:30.5118727+00:00"}"
>
  warnings.warn(str(e))
E1102 14:17:30.546000000 16764 src/core/ext/transport/chttp2/transport/hpack_parser.cc:999] Error parsing 'content-type' metadata: invalid value
...Miniconda3\envs\python310\lib\site-packages\pyspark\sql\connect\session.py:185: UserWarning: <_InactiveRpcError of RPC that terminated with:
        status = StatusCode.UNKNOWN
        details = "Stream removed"
        debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2023-11-02T06:17:30.5470989+00:00", grpc_status:2, grpc_message:"Stream removed"}"
>
  warnings.warn(str(e))

But the error message is not specific, you can use the Wireshark tool to inspect the returned value.

Internet Protocol Version 4, Src: 172.28.220.144, Dst: 172.28.208.1
Transmission Control Protocol, Src Port: 30981, Dst Port: 59795, Seq: 207, Ack: 837, Len: 77
HyperText Transfer Protocol 2
    Stream: HEADERS, Stream ID: 3, Length 68, 401 Unauthorized
        Length: 68
        Type: HEADERS (1)
        Flags: 0x04, End Headers
        0... .... .... .... .... .... .... .... = Reserved: 0x0
        .000 0000 0000 0000 0000 0000 0000 0011 = Stream Identifier: 3
        [Pad Length: 0]
        Header Block Fragment: 48033430316197df3dbf4a004a693f75040132a01ab8d3b7196d4c5a37ff5f92497ca58a…
        [Header Length: 130]
        [Header Count: 4]
        Header: :status: 401 Unauthorized
        Header: date: Thu, 02 Nov 2023 04:47:35 GMT
        Header: content-type: text/plain; charset=utf-8
        Header: server: APISIX/3.6.0

Upvotes: 1

Related Questions