Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 112 additions & 3 deletions internal/managedkafka/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ import (
)

const (
clusterID = "fake-cluster"
topicID = "fake-topic"
consumerGroupID = "fake-consumergroup"
clusterID = "fake-cluster"
topicID = "fake-topic"
consumerGroupID = "fake-consumergroup"
connectClusterID = "fake-connect-cluster"
connectorID = "fake-connector"
)

// The reason why we have a fake server is because testing end-to-end will exceed the deadline of 10 minutes.
Expand All @@ -41,14 +43,20 @@ type fakeManagedKafkaServer struct {
managedkafkapb.UnimplementedManagedKafkaServer
}

type fakeManagedKafkaConnectServer struct {
managedkafkapb.UnimplementedManagedKafkaConnectServer
}

func Options(t *testing.T) []option.ClientOption {
server := &fakeManagedKafkaServer{}
connectServer := &fakeManagedKafkaConnectServer{}
listener, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatal(err)
}
gsrv := grpc.NewServer()
managedkafkapb.RegisterManagedKafkaServer(gsrv, server)
managedkafkapb.RegisterManagedKafkaConnectServer(gsrv, connectServer)
fakeServerAddr := listener.Addr().String()
go func() {
if err := gsrv.Serve(listener); err != nil {
Expand Down Expand Up @@ -165,3 +173,104 @@ func (f *fakeManagedKafkaServer) UpdateConsumerGroup(ctx context.Context, req *m
Name: consumerGroupID,
}, nil
}

// Connect server methods
func (f *fakeManagedKafkaConnectServer) CreateConnectCluster(ctx context.Context, req *managedkafkapb.CreateConnectClusterRequest) (*longrunningpb.Operation, error) {
anypb := &anypb.Any{}
err := anypb.MarshalFrom(req.ConnectCluster)
if err != nil {
return nil, fmt.Errorf("anypb.MarshalFrom got err: %w", err)
}
return &longrunningpb.Operation{
Done: true,
Result: &longrunningpb.Operation_Response{
Response: anypb,
},
}, nil
}

func (f *fakeManagedKafkaConnectServer) DeleteConnectCluster(ctx context.Context, req *managedkafkapb.DeleteConnectClusterRequest) (*longrunningpb.Operation, error) {
return &longrunningpb.Operation{
Done: true,
Result: &longrunningpb.Operation_Response{
Response: &anypb.Any{},
},
}, nil
}

func (f *fakeManagedKafkaConnectServer) GetConnectCluster(ctx context.Context, req *managedkafkapb.GetConnectClusterRequest) (*managedkafkapb.ConnectCluster, error) {
return &managedkafkapb.ConnectCluster{
Name: connectClusterID,
}, nil
}

func (f *fakeManagedKafkaConnectServer) ListConnectClusters(ctx context.Context, req *managedkafkapb.ListConnectClustersRequest) (*managedkafkapb.ListConnectClustersResponse, error) {
return &managedkafkapb.ListConnectClustersResponse{
ConnectClusters: []*managedkafkapb.ConnectCluster{{
Name: connectClusterID,
}},
}, nil
}

func (f *fakeManagedKafkaConnectServer) UpdateConnectCluster(ctx context.Context, req *managedkafkapb.UpdateConnectClusterRequest) (*longrunningpb.Operation, error) {
anypb := &anypb.Any{}
err := anypb.MarshalFrom(req.ConnectCluster)
if err != nil {
return nil, fmt.Errorf("anypb.MarshalFrom got err: %w", err)
}
return &longrunningpb.Operation{
Done: true,
Result: &longrunningpb.Operation_Response{
Response: anypb,
},
}, nil
}

// Connector methods
func (f *fakeManagedKafkaConnectServer) CreateConnector(ctx context.Context, req *managedkafkapb.CreateConnectorRequest) (*managedkafkapb.Connector, error) {
return req.Connector, nil
}

func (f *fakeManagedKafkaConnectServer) GetConnector(ctx context.Context, req *managedkafkapb.GetConnectorRequest) (*managedkafkapb.Connector, error) {
return &managedkafkapb.Connector{
Name: connectorID,
Configs: map[string]string{
"connector.class": "test.connector",
},
}, nil
}

func (f *fakeManagedKafkaConnectServer) ListConnectors(ctx context.Context, req *managedkafkapb.ListConnectorsRequest) (*managedkafkapb.ListConnectorsResponse, error) {
return &managedkafkapb.ListConnectorsResponse{
Connectors: []*managedkafkapb.Connector{{
Name: connectorID,
Configs: map[string]string{
"connector.class": "test.connector",
},
}},
}, nil
}

func (f *fakeManagedKafkaConnectServer) UpdateConnector(ctx context.Context, req *managedkafkapb.UpdateConnectorRequest) (*managedkafkapb.Connector, error) {
return req.Connector, nil
}

func (f *fakeManagedKafkaConnectServer) DeleteConnector(ctx context.Context, req *managedkafkapb.DeleteConnectorRequest) (*emptypb.Empty, error) {
return &emptypb.Empty{}, nil
}

func (f *fakeManagedKafkaConnectServer) PauseConnector(ctx context.Context, req *managedkafkapb.PauseConnectorRequest) (*managedkafkapb.PauseConnectorResponse, error) {
return &managedkafkapb.PauseConnectorResponse{}, nil
}

func (f *fakeManagedKafkaConnectServer) ResumeConnector(ctx context.Context, req *managedkafkapb.ResumeConnectorRequest) (*managedkafkapb.ResumeConnectorResponse, error) {
return &managedkafkapb.ResumeConnectorResponse{}, nil
}

func (f *fakeManagedKafkaConnectServer) StopConnector(ctx context.Context, req *managedkafkapb.StopConnectorRequest) (*managedkafkapb.StopConnectorResponse, error) {
return &managedkafkapb.StopConnectorResponse{}, nil
}

func (f *fakeManagedKafkaConnectServer) RestartConnector(ctx context.Context, req *managedkafkapb.RestartConnectorRequest) (*managedkafkapb.RestartConnectorResponse, error) {
return &managedkafkapb.RestartConnectorResponse{}, nil
}
100 changes: 100 additions & 0 deletions managedkafka/connect/clusters/connect_clusters_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clusters

import (
"bytes"
"fmt"
"strings"
"testing"
"time"

"github.com/GoogleCloudPlatform/golang-samples/internal/managedkafka/fake"
"github.com/GoogleCloudPlatform/golang-samples/internal/testutil"
)

const (
connectClusterPrefix = "connect-cluster"
region = "us-central1"
)

func TestConnectClusters(t *testing.T) {
tc := testutil.SystemTest(t)
buf := new(bytes.Buffer)
connectClusterID := fmt.Sprintf("%s-%d", connectClusterPrefix, time.Now().UnixNano())
kafkaClusterID := fmt.Sprintf("kafka-cluster-%d", time.Now().UnixNano())
options := fake.Options(t)

// First create a Kafka cluster that the Connect cluster will reference
kafkaClusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", tc.ProjectID, region, kafkaClusterID)

t.Run("CreateConnectCluster", func(t *testing.T) {
buf.Reset()
if err := createConnectCluster(buf, tc.ProjectID, region, connectClusterID, kafkaClusterPath, options...); err != nil {
t.Fatalf("failed to create a connect cluster: %v", err)
}
got := buf.String()
want := "Created connect cluster"
if !strings.Contains(got, want) {
t.Fatalf("createConnectCluster() mismatch got: %s\nwant: %s", got, want)
}
})
t.Run("GetConnectCluster", func(t *testing.T) {
buf.Reset()
if err := getConnectCluster(buf, tc.ProjectID, region, connectClusterID, options...); err != nil {
t.Fatalf("failed to get connect cluster: %v", err)
}
got := buf.String()
want := "Got connect cluster"
if !strings.Contains(got, want) {
t.Fatalf("getConnectCluster() mismatch got: %s\nwant: %s", got, want)
}
})
t.Run("UpdateConnectCluster", func(t *testing.T) {
buf.Reset()
memoryBytes := int64(25769803776) // 24 GiB in bytes
labels := map[string]string{"environment": "test"}
if err := updateConnectCluster(buf, tc.ProjectID, region, connectClusterID, memoryBytes, labels, options...); err != nil {
t.Fatalf("failed to update connect cluster: %v", err)
}
got := buf.String()
want := "Updated connect cluster"
if !strings.Contains(got, want) {
t.Fatalf("updateConnectCluster() mismatch got: %s\nwant: %s", got, want)
}
})
t.Run("ListConnectClusters", func(t *testing.T) {
buf.Reset()
if err := listConnectClusters(buf, tc.ProjectID, region, options...); err != nil {
t.Fatalf("failed to list connect clusters: %v", err)
}
got := buf.String()
want := "Got connect cluster"
if !strings.Contains(got, want) {
t.Fatalf("listConnectClusters() mismatch got: %s\nwant: %s", got, want)
}
})
t.Run("DeleteConnectCluster", func(t *testing.T) {
buf.Reset()
if err := deleteConnectCluster(buf, tc.ProjectID, region, connectClusterID, options...); err != nil {
t.Fatalf("failed to delete connect cluster: %v", err)
}
got := buf.String()
want := "Deleted connect cluster"
if !strings.Contains(got, want) {
t.Fatalf("deleteConnectCluster() mismatch got: %s\nwant: %s", got, want)
}
})
}
84 changes: 84 additions & 0 deletions managedkafka/connect/clusters/create_connect_cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clusters

// [START managedkafka_create_connect_cluster]
import (
"context"
"fmt"
"io"

"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
"google.golang.org/api/option"

managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func createConnectCluster(w io.Writer, projectID, region, clusterID, kafkaCluster string, opts ...option.ClientOption) error {
// projectID := "my-project-id"
// region := "us-central1"
// clusterID := "my-connect-cluster"
// kafkaCluster := "projects/my-project-id/locations/us-central1/clusters/my-kafka-cluster"
ctx := context.Background()
client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
if err != nil {
return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
}
defer client.Close()

locationPath := fmt.Sprintf("projects/%s/locations/%s", projectID, region)
clusterPath := fmt.Sprintf("%s/connectClusters/%s", locationPath, clusterID)

// Capacity configuration with 12 vCPU and 12 GiB RAM
capacityConfig := &managedkafkapb.CapacityConfig{
VcpuCount: 12,
MemoryBytes: 12884901888, // 12 GiB in bytes
}

// Optionally, you can also specify accessible subnets and resolvable DNS
// domains as part of your network configuration. For example:
// networkConfigs := []*managedkafkapb.ConnectNetworkConfig{
// {
// PrimarySubnet: primarySubnet,
// AdditionalSubnets: []string{"subnet-1", "subnet-2"},
// DnsDomainNames: []string{"domain-1", "domain-2"},
// },
// }

connectCluster := &managedkafkapb.ConnectCluster{
Name: clusterPath,
KafkaCluster: kafkaCluster,
CapacityConfig: capacityConfig,
}

req := &managedkafkapb.CreateConnectClusterRequest{
Parent: locationPath,
ConnectClusterId: clusterID,
ConnectCluster: connectCluster,
}
op, err := client.CreateConnectCluster(ctx, req)
if err != nil {
return fmt.Errorf("client.CreateConnectCluster got err: %w", err)
}
// The duration of this operation can vary considerably, typically taking 5-15 minutes.
resp, err := op.Wait(ctx)
if err != nil {
return fmt.Errorf("op.Wait got err: %w", err)
}
fmt.Fprintf(w, "Created connect cluster: %s\n", resp.Name)
return nil
}

// [END managedkafka_create_connect_cluster]
56 changes: 56 additions & 0 deletions managedkafka/connect/clusters/delete_connect_cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clusters

// [START managedkafka_delete_connect_cluster]
import (
"context"
"fmt"
"io"

"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
"google.golang.org/api/option"

managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func deleteConnectCluster(w io.Writer, projectID, region, clusterID string, opts ...option.ClientOption) error {
// projectID := "my-project-id"
// region := "us-central1"
// clusterID := "my-connect-cluster"
ctx := context.Background()
client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
if err != nil {
return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
}
defer client.Close()

clusterPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, clusterID)
req := &managedkafkapb.DeleteConnectClusterRequest{
Name: clusterPath,
}
op, err := client.DeleteConnectCluster(ctx, req)
if err != nil {
return fmt.Errorf("client.DeleteConnectCluster got err: %w", err)
}
err = op.Wait(ctx)
if err != nil {
return fmt.Errorf("op.Wait got err: %w", err)
}
fmt.Fprint(w, "Deleted connect cluster\n")
return nil
}

// [END managedkafka_delete_connect_cluster]
Loading