Skip to content

Commit ff48a3f

Browse files
salmanyglasnt
andauthored
feat: Add MKC Code Examples (#5359)
* feat: provide code examples for managedkafka connect clusters and connectors * Managed Kafka Connect Go Code Samples * Adds fake client for testing Connect clusters * Updated headers. * Reset buffer for sub-tests. * Align code samples with TF / Python / Java samples. * Add comment about accessible subnets and DNS domains. * Fix memory defaults for connect cluster example. * Fix name of MM2 Source sample. * Fix sample parameter for Cloud Storage Sink connector. * Fix linting errors. * Fix test error. * Fix CreateBigQuerySinkConnector parameters. --------- Co-authored-by: Katie McLaughlin <katie@glasnt.com>
1 parent 6acf46c commit ff48a3f

23 files changed

+1753
-128
lines changed

internal/managedkafka/fake/fake.go

Lines changed: 112 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@ import (
3030
)
3131

3232
const (
33-
clusterID = "fake-cluster"
34-
topicID = "fake-topic"
35-
consumerGroupID = "fake-consumergroup"
33+
clusterID = "fake-cluster"
34+
topicID = "fake-topic"
35+
consumerGroupID = "fake-consumergroup"
36+
connectClusterID = "fake-connect-cluster"
37+
connectorID = "fake-connector"
3638
)
3739

3840
// The reason why we have a fake server is because testing end-to-end will exceed the deadline of 10 minutes.
@@ -41,14 +43,20 @@ type fakeManagedKafkaServer struct {
4143
managedkafkapb.UnimplementedManagedKafkaServer
4244
}
4345

46+
type fakeManagedKafkaConnectServer struct {
47+
managedkafkapb.UnimplementedManagedKafkaConnectServer
48+
}
49+
4450
func Options(t *testing.T) []option.ClientOption {
4551
server := &fakeManagedKafkaServer{}
52+
connectServer := &fakeManagedKafkaConnectServer{}
4653
listener, err := net.Listen("tcp", "localhost:0")
4754
if err != nil {
4855
t.Fatal(err)
4956
}
5057
gsrv := grpc.NewServer()
5158
managedkafkapb.RegisterManagedKafkaServer(gsrv, server)
59+
managedkafkapb.RegisterManagedKafkaConnectServer(gsrv, connectServer)
5260
fakeServerAddr := listener.Addr().String()
5361
go func() {
5462
if err := gsrv.Serve(listener); err != nil {
@@ -165,3 +173,104 @@ func (f *fakeManagedKafkaServer) UpdateConsumerGroup(ctx context.Context, req *m
165173
Name: consumerGroupID,
166174
}, nil
167175
}
176+
177+
// Connect server methods
178+
func (f *fakeManagedKafkaConnectServer) CreateConnectCluster(ctx context.Context, req *managedkafkapb.CreateConnectClusterRequest) (*longrunningpb.Operation, error) {
179+
anypb := &anypb.Any{}
180+
err := anypb.MarshalFrom(req.ConnectCluster)
181+
if err != nil {
182+
return nil, fmt.Errorf("anypb.MarshalFrom got err: %w", err)
183+
}
184+
return &longrunningpb.Operation{
185+
Done: true,
186+
Result: &longrunningpb.Operation_Response{
187+
Response: anypb,
188+
},
189+
}, nil
190+
}
191+
192+
func (f *fakeManagedKafkaConnectServer) DeleteConnectCluster(ctx context.Context, req *managedkafkapb.DeleteConnectClusterRequest) (*longrunningpb.Operation, error) {
193+
return &longrunningpb.Operation{
194+
Done: true,
195+
Result: &longrunningpb.Operation_Response{
196+
Response: &anypb.Any{},
197+
},
198+
}, nil
199+
}
200+
201+
func (f *fakeManagedKafkaConnectServer) GetConnectCluster(ctx context.Context, req *managedkafkapb.GetConnectClusterRequest) (*managedkafkapb.ConnectCluster, error) {
202+
return &managedkafkapb.ConnectCluster{
203+
Name: connectClusterID,
204+
}, nil
205+
}
206+
207+
func (f *fakeManagedKafkaConnectServer) ListConnectClusters(ctx context.Context, req *managedkafkapb.ListConnectClustersRequest) (*managedkafkapb.ListConnectClustersResponse, error) {
208+
return &managedkafkapb.ListConnectClustersResponse{
209+
ConnectClusters: []*managedkafkapb.ConnectCluster{{
210+
Name: connectClusterID,
211+
}},
212+
}, nil
213+
}
214+
215+
func (f *fakeManagedKafkaConnectServer) UpdateConnectCluster(ctx context.Context, req *managedkafkapb.UpdateConnectClusterRequest) (*longrunningpb.Operation, error) {
216+
anypb := &anypb.Any{}
217+
err := anypb.MarshalFrom(req.ConnectCluster)
218+
if err != nil {
219+
return nil, fmt.Errorf("anypb.MarshalFrom got err: %w", err)
220+
}
221+
return &longrunningpb.Operation{
222+
Done: true,
223+
Result: &longrunningpb.Operation_Response{
224+
Response: anypb,
225+
},
226+
}, nil
227+
}
228+
229+
// Connector methods
230+
func (f *fakeManagedKafkaConnectServer) CreateConnector(ctx context.Context, req *managedkafkapb.CreateConnectorRequest) (*managedkafkapb.Connector, error) {
231+
return req.Connector, nil
232+
}
233+
234+
func (f *fakeManagedKafkaConnectServer) GetConnector(ctx context.Context, req *managedkafkapb.GetConnectorRequest) (*managedkafkapb.Connector, error) {
235+
return &managedkafkapb.Connector{
236+
Name: connectorID,
237+
Configs: map[string]string{
238+
"connector.class": "test.connector",
239+
},
240+
}, nil
241+
}
242+
243+
func (f *fakeManagedKafkaConnectServer) ListConnectors(ctx context.Context, req *managedkafkapb.ListConnectorsRequest) (*managedkafkapb.ListConnectorsResponse, error) {
244+
return &managedkafkapb.ListConnectorsResponse{
245+
Connectors: []*managedkafkapb.Connector{{
246+
Name: connectorID,
247+
Configs: map[string]string{
248+
"connector.class": "test.connector",
249+
},
250+
}},
251+
}, nil
252+
}
253+
254+
func (f *fakeManagedKafkaConnectServer) UpdateConnector(ctx context.Context, req *managedkafkapb.UpdateConnectorRequest) (*managedkafkapb.Connector, error) {
255+
return req.Connector, nil
256+
}
257+
258+
func (f *fakeManagedKafkaConnectServer) DeleteConnector(ctx context.Context, req *managedkafkapb.DeleteConnectorRequest) (*emptypb.Empty, error) {
259+
return &emptypb.Empty{}, nil
260+
}
261+
262+
func (f *fakeManagedKafkaConnectServer) PauseConnector(ctx context.Context, req *managedkafkapb.PauseConnectorRequest) (*managedkafkapb.PauseConnectorResponse, error) {
263+
return &managedkafkapb.PauseConnectorResponse{}, nil
264+
}
265+
266+
func (f *fakeManagedKafkaConnectServer) ResumeConnector(ctx context.Context, req *managedkafkapb.ResumeConnectorRequest) (*managedkafkapb.ResumeConnectorResponse, error) {
267+
return &managedkafkapb.ResumeConnectorResponse{}, nil
268+
}
269+
270+
func (f *fakeManagedKafkaConnectServer) StopConnector(ctx context.Context, req *managedkafkapb.StopConnectorRequest) (*managedkafkapb.StopConnectorResponse, error) {
271+
return &managedkafkapb.StopConnectorResponse{}, nil
272+
}
273+
274+
func (f *fakeManagedKafkaConnectServer) RestartConnector(ctx context.Context, req *managedkafkapb.RestartConnectorRequest) (*managedkafkapb.RestartConnectorResponse, error) {
275+
return &managedkafkapb.RestartConnectorResponse{}, nil
276+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package clusters
16+
17+
import (
18+
"bytes"
19+
"fmt"
20+
"strings"
21+
"testing"
22+
"time"
23+
24+
"github.com/GoogleCloudPlatform/golang-samples/internal/managedkafka/fake"
25+
"github.com/GoogleCloudPlatform/golang-samples/internal/testutil"
26+
)
27+
28+
const (
29+
connectClusterPrefix = "connect-cluster"
30+
region = "us-central1"
31+
)
32+
33+
func TestConnectClusters(t *testing.T) {
34+
tc := testutil.SystemTest(t)
35+
buf := new(bytes.Buffer)
36+
connectClusterID := fmt.Sprintf("%s-%d", connectClusterPrefix, time.Now().UnixNano())
37+
kafkaClusterID := fmt.Sprintf("kafka-cluster-%d", time.Now().UnixNano())
38+
options := fake.Options(t)
39+
40+
// First create a Kafka cluster that the Connect cluster will reference
41+
kafkaClusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", tc.ProjectID, region, kafkaClusterID)
42+
43+
t.Run("CreateConnectCluster", func(t *testing.T) {
44+
buf.Reset()
45+
if err := createConnectCluster(buf, tc.ProjectID, region, connectClusterID, kafkaClusterPath, options...); err != nil {
46+
t.Fatalf("failed to create a connect cluster: %v", err)
47+
}
48+
got := buf.String()
49+
want := "Created connect cluster"
50+
if !strings.Contains(got, want) {
51+
t.Fatalf("createConnectCluster() mismatch got: %s\nwant: %s", got, want)
52+
}
53+
})
54+
t.Run("GetConnectCluster", func(t *testing.T) {
55+
buf.Reset()
56+
if err := getConnectCluster(buf, tc.ProjectID, region, connectClusterID, options...); err != nil {
57+
t.Fatalf("failed to get connect cluster: %v", err)
58+
}
59+
got := buf.String()
60+
want := "Got connect cluster"
61+
if !strings.Contains(got, want) {
62+
t.Fatalf("getConnectCluster() mismatch got: %s\nwant: %s", got, want)
63+
}
64+
})
65+
t.Run("UpdateConnectCluster", func(t *testing.T) {
66+
buf.Reset()
67+
memoryBytes := int64(25769803776) // 24 GiB in bytes
68+
labels := map[string]string{"environment": "test"}
69+
if err := updateConnectCluster(buf, tc.ProjectID, region, connectClusterID, memoryBytes, labels, options...); err != nil {
70+
t.Fatalf("failed to update connect cluster: %v", err)
71+
}
72+
got := buf.String()
73+
want := "Updated connect cluster"
74+
if !strings.Contains(got, want) {
75+
t.Fatalf("updateConnectCluster() mismatch got: %s\nwant: %s", got, want)
76+
}
77+
})
78+
t.Run("ListConnectClusters", func(t *testing.T) {
79+
buf.Reset()
80+
if err := listConnectClusters(buf, tc.ProjectID, region, options...); err != nil {
81+
t.Fatalf("failed to list connect clusters: %v", err)
82+
}
83+
got := buf.String()
84+
want := "Got connect cluster"
85+
if !strings.Contains(got, want) {
86+
t.Fatalf("listConnectClusters() mismatch got: %s\nwant: %s", got, want)
87+
}
88+
})
89+
t.Run("DeleteConnectCluster", func(t *testing.T) {
90+
buf.Reset()
91+
if err := deleteConnectCluster(buf, tc.ProjectID, region, connectClusterID, options...); err != nil {
92+
t.Fatalf("failed to delete connect cluster: %v", err)
93+
}
94+
got := buf.String()
95+
want := "Deleted connect cluster"
96+
if !strings.Contains(got, want) {
97+
t.Fatalf("deleteConnectCluster() mismatch got: %s\nwant: %s", got, want)
98+
}
99+
})
100+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package clusters
16+
17+
// [START managedkafka_create_connect_cluster]
18+
import (
19+
"context"
20+
"fmt"
21+
"io"
22+
23+
"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
24+
"google.golang.org/api/option"
25+
26+
managedkafka "cloud.google.com/go/managedkafka/apiv1"
27+
)
28+
29+
func createConnectCluster(w io.Writer, projectID, region, clusterID, kafkaCluster string, opts ...option.ClientOption) error {
30+
// projectID := "my-project-id"
31+
// region := "us-central1"
32+
// clusterID := "my-connect-cluster"
33+
// kafkaCluster := "projects/my-project-id/locations/us-central1/clusters/my-kafka-cluster"
34+
ctx := context.Background()
35+
client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
36+
if err != nil {
37+
return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
38+
}
39+
defer client.Close()
40+
41+
locationPath := fmt.Sprintf("projects/%s/locations/%s", projectID, region)
42+
clusterPath := fmt.Sprintf("%s/connectClusters/%s", locationPath, clusterID)
43+
44+
// Capacity configuration with 12 vCPU and 12 GiB RAM
45+
capacityConfig := &managedkafkapb.CapacityConfig{
46+
VcpuCount: 12,
47+
MemoryBytes: 12884901888, // 12 GiB in bytes
48+
}
49+
50+
// Optionally, you can also specify accessible subnets and resolvable DNS
51+
// domains as part of your network configuration. For example:
52+
// networkConfigs := []*managedkafkapb.ConnectNetworkConfig{
53+
// {
54+
// PrimarySubnet: primarySubnet,
55+
// AdditionalSubnets: []string{"subnet-1", "subnet-2"},
56+
// DnsDomainNames: []string{"domain-1", "domain-2"},
57+
// },
58+
// }
59+
60+
connectCluster := &managedkafkapb.ConnectCluster{
61+
Name: clusterPath,
62+
KafkaCluster: kafkaCluster,
63+
CapacityConfig: capacityConfig,
64+
}
65+
66+
req := &managedkafkapb.CreateConnectClusterRequest{
67+
Parent: locationPath,
68+
ConnectClusterId: clusterID,
69+
ConnectCluster: connectCluster,
70+
}
71+
op, err := client.CreateConnectCluster(ctx, req)
72+
if err != nil {
73+
return fmt.Errorf("client.CreateConnectCluster got err: %w", err)
74+
}
75+
// The duration of this operation can vary considerably, typically taking 5-15 minutes.
76+
resp, err := op.Wait(ctx)
77+
if err != nil {
78+
return fmt.Errorf("op.Wait got err: %w", err)
79+
}
80+
fmt.Fprintf(w, "Created connect cluster: %s\n", resp.Name)
81+
return nil
82+
}
83+
84+
// [END managedkafka_create_connect_cluster]
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package clusters
16+
17+
// [START managedkafka_delete_connect_cluster]
18+
import (
19+
"context"
20+
"fmt"
21+
"io"
22+
23+
"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
24+
"google.golang.org/api/option"
25+
26+
managedkafka "cloud.google.com/go/managedkafka/apiv1"
27+
)
28+
29+
func deleteConnectCluster(w io.Writer, projectID, region, clusterID string, opts ...option.ClientOption) error {
30+
// projectID := "my-project-id"
31+
// region := "us-central1"
32+
// clusterID := "my-connect-cluster"
33+
ctx := context.Background()
34+
client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
35+
if err != nil {
36+
return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
37+
}
38+
defer client.Close()
39+
40+
clusterPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, clusterID)
41+
req := &managedkafkapb.DeleteConnectClusterRequest{
42+
Name: clusterPath,
43+
}
44+
op, err := client.DeleteConnectCluster(ctx, req)
45+
if err != nil {
46+
return fmt.Errorf("client.DeleteConnectCluster got err: %w", err)
47+
}
48+
err = op.Wait(ctx)
49+
if err != nil {
50+
return fmt.Errorf("op.Wait got err: %w", err)
51+
}
52+
fmt.Fprint(w, "Deleted connect cluster\n")
53+
return nil
54+
}
55+
56+
// [END managedkafka_delete_connect_cluster]

0 commit comments

Comments
 (0)