Skip to content

Commit 5a320e8

Browse files
authored
feat: align topics level policies admin apis to java restful apis (#1398)
* feat: add topic configuration methods for subscribe rate, dispatch rate, max consumers, message size, subscriptions, schema validation, deduplication, replicator dispatch rate, offload policies, auto subscription creation, and schema compatibility strategy * fix: lint error * fix: lint * add tests * fix tests * add unit tests
1 parent 0dee113 commit 5a320e8

File tree

10 files changed

+1486
-0
lines changed

10 files changed

+1486
-0
lines changed

integration-tests/blue-green/docker-compose.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ services:
130130
- loadBalancerDebugModeEnabled=true
131131
- brokerServiceCompactionThresholdInBytes=1000000
132132
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
133+
- PULSAR_PREFIX_topicLevelPoliciesEnabled=true
133134
depends_on:
134135
green-zookeeper:
135136
condition: service_healthy
@@ -163,6 +164,7 @@ services:
163164
- loadBalancerDebugModeEnabled=true
164165
- brokerServiceCompactionThresholdInBytes=1000000
165166
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
167+
- PULSAR_PREFIX_topicLevelPoliciesEnabled=true
166168
depends_on:
167169
green-zookeeper:
168170
condition: service_healthy

integration-tests/clustered/docker-compose.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ services:
122122
- advertisedListeners=internal:pulsar://broker-1:6650
123123
- PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
124124
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
125+
- PULSAR_PREFIX_topicLevelPoliciesEnabled=true
125126
depends_on:
126127
zookeeper:
127128
condition: service_healthy
@@ -154,6 +155,7 @@ services:
154155
- advertisedListeners=internal:pulsar://broker-2:6650
155156
- PULSAR_MEM=-Xms256m -Xmx256m -XX:MaxDirectMemorySize=56m
156157
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
158+
- PULSAR_PREFIX_topicLevelPoliciesEnabled=true
157159
depends_on:
158160
zookeeper:
159161
condition: service_healthy

integration-tests/extensible-load-manager/docker-compose.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ services:
129129
- clusterMigrationCheckDurationSeconds=1
130130
- brokerServiceCompactionThresholdInBytes=1000000
131131
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
132+
- PULSAR_PREFIX_topicLevelPoliciesEnabled=true
132133
depends_on:
133134
zookeeper:
134135
condition: service_healthy
@@ -168,6 +169,7 @@ services:
168169
- clusterMigrationCheckDurationSeconds=1
169170
- brokerServiceCompactionThresholdInBytes=1000000
170171
- PULSAR_PREFIX_defaultNumberOfNamespaceBundles=1
172+
- PULSAR_PREFIX_topicLevelPoliciesEnabled=true
171173
depends_on:
172174
zookeeper:
173175
condition: service_healthy

pulsaradmin/pkg/admin/topic.go

Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,107 @@ type Topics interface {
389389
// @param data
390390
// list of replication cluster id
391391
SetReplicationClusters(topic utils.TopicName, data []string) error
392+
393+
// GetSubscribeRate Get subscribe rate configuration for a topic
394+
GetSubscribeRate(utils.TopicName) (*utils.SubscribeRate, error)
395+
396+
// SetSubscribeRate Set subscribe rate configuration for a topic
397+
SetSubscribeRate(utils.TopicName, utils.SubscribeRate) error
398+
399+
// RemoveSubscribeRate Remove subscribe rate configuration for a topic
400+
RemoveSubscribeRate(utils.TopicName) error
401+
402+
// GetSubscriptionDispatchRate Get subscription dispatch rate for a topic
403+
GetSubscriptionDispatchRate(utils.TopicName) (*utils.DispatchRateData, error)
404+
405+
// SetSubscriptionDispatchRate Set subscription dispatch rate for a topic
406+
SetSubscriptionDispatchRate(utils.TopicName, utils.DispatchRateData) error
407+
408+
// RemoveSubscriptionDispatchRate Remove subscription dispatch rate for a topic
409+
RemoveSubscriptionDispatchRate(utils.TopicName) error
410+
411+
// GetMaxConsumersPerSubscription Get max consumers per subscription for a topic
412+
GetMaxConsumersPerSubscription(utils.TopicName) (int, error)
413+
414+
// SetMaxConsumersPerSubscription Set max consumers per subscription for a topic
415+
SetMaxConsumersPerSubscription(utils.TopicName, int) error
416+
417+
// RemoveMaxConsumersPerSubscription Remove max consumers per subscription for a topic
418+
RemoveMaxConsumersPerSubscription(utils.TopicName) error
419+
420+
// GetMaxMessageSize Get max message size for a topic
421+
GetMaxMessageSize(utils.TopicName) (int, error)
422+
423+
// SetMaxMessageSize Set max message size for a topic
424+
SetMaxMessageSize(utils.TopicName, int) error
425+
426+
// RemoveMaxMessageSize Remove max message size for a topic
427+
RemoveMaxMessageSize(utils.TopicName) error
428+
429+
// GetMaxSubscriptionsPerTopic Get max subscriptions per topic
430+
GetMaxSubscriptionsPerTopic(utils.TopicName) (int, error)
431+
432+
// SetMaxSubscriptionsPerTopic Set max subscriptions per topic
433+
SetMaxSubscriptionsPerTopic(utils.TopicName, int) error
434+
435+
// RemoveMaxSubscriptionsPerTopic Remove max subscriptions per topic
436+
RemoveMaxSubscriptionsPerTopic(utils.TopicName) error
437+
438+
// GetSchemaValidationEnforced Get schema validation enforced flag for a topic
439+
GetSchemaValidationEnforced(utils.TopicName) (bool, error)
440+
441+
// SetSchemaValidationEnforced Set schema validation enforced flag for a topic
442+
SetSchemaValidationEnforced(utils.TopicName, bool) error
443+
444+
// RemoveSchemaValidationEnforced Remove schema validation enforced flag for a topic
445+
RemoveSchemaValidationEnforced(utils.TopicName) error
446+
447+
// GetDeduplicationSnapshotInterval Get deduplication snapshot interval for a topic
448+
GetDeduplicationSnapshotInterval(utils.TopicName) (int, error)
449+
450+
// SetDeduplicationSnapshotInterval Set deduplication snapshot interval for a topic
451+
SetDeduplicationSnapshotInterval(utils.TopicName, int) error
452+
453+
// RemoveDeduplicationSnapshotInterval Remove deduplication snapshot interval for a topic
454+
RemoveDeduplicationSnapshotInterval(utils.TopicName) error
455+
456+
// GetReplicatorDispatchRate Get replicator dispatch rate for a topic
457+
GetReplicatorDispatchRate(utils.TopicName) (*utils.DispatchRateData, error)
458+
459+
// SetReplicatorDispatchRate Set replicator dispatch rate for a topic
460+
SetReplicatorDispatchRate(utils.TopicName, utils.DispatchRateData) error
461+
462+
// RemoveReplicatorDispatchRate Remove replicator dispatch rate for a topic
463+
RemoveReplicatorDispatchRate(utils.TopicName) error
464+
465+
// GetOffloadPolicies Get offload policies for a topic
466+
GetOffloadPolicies(utils.TopicName) (*utils.OffloadPolicies, error)
467+
468+
// SetOffloadPolicies Set offload policies for a topic
469+
SetOffloadPolicies(utils.TopicName, utils.OffloadPolicies) error
470+
471+
// RemoveOffloadPolicies Remove offload policies for a topic
472+
RemoveOffloadPolicies(utils.TopicName) error
473+
474+
// GetAutoSubscriptionCreation Get auto subscription creation override for a topic
475+
GetAutoSubscriptionCreation(utils.TopicName) (*utils.AutoSubscriptionCreationOverride, error)
476+
477+
// SetAutoSubscriptionCreation Set auto subscription creation override for a topic
478+
SetAutoSubscriptionCreation(utils.TopicName,
479+
utils.AutoSubscriptionCreationOverride) error
480+
481+
// RemoveAutoSubscriptionCreation Remove auto subscription creation override for a topic
482+
RemoveAutoSubscriptionCreation(utils.TopicName) error
483+
484+
// GetSchemaCompatibilityStrategy Get schema compatibility strategy for a topic
485+
GetSchemaCompatibilityStrategy(utils.TopicName) (utils.SchemaCompatibilityStrategy, error)
486+
487+
// SetSchemaCompatibilityStrategy Set schema compatibility strategy for a topic
488+
SetSchemaCompatibilityStrategy(utils.TopicName,
489+
utils.SchemaCompatibilityStrategy) error
490+
491+
// RemoveSchemaCompatibilityStrategy Remove schema compatibility strategy for a topic
492+
RemoveSchemaCompatibilityStrategy(utils.TopicName) error
392493
}
393494

394495
type topics struct {
@@ -933,3 +1034,192 @@ func (t *topics) GetReplicationClusters(topic utils.TopicName) ([]string, error)
9331034
err := t.pulsar.Client.Get(endpoint, &data)
9341035
return data, err
9351036
}
1037+
1038+
func (t *topics) GetSubscribeRate(topic utils.TopicName) (*utils.SubscribeRate, error) {
1039+
var subscribeRate utils.SubscribeRate
1040+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscribeRate")
1041+
err := t.pulsar.Client.Get(endpoint, &subscribeRate)
1042+
return &subscribeRate, err
1043+
}
1044+
1045+
func (t *topics) SetSubscribeRate(topic utils.TopicName, subscribeRate utils.SubscribeRate) error {
1046+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscribeRate")
1047+
return t.pulsar.Client.Post(endpoint, &subscribeRate)
1048+
}
1049+
1050+
func (t *topics) RemoveSubscribeRate(topic utils.TopicName) error {
1051+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscribeRate")
1052+
return t.pulsar.Client.Delete(endpoint)
1053+
}
1054+
1055+
func (t *topics) GetSubscriptionDispatchRate(topic utils.TopicName) (*utils.DispatchRateData, error) {
1056+
var dispatchRate utils.DispatchRateData
1057+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscriptionDispatchRate")
1058+
err := t.pulsar.Client.Get(endpoint, &dispatchRate)
1059+
return &dispatchRate, err
1060+
}
1061+
1062+
func (t *topics) SetSubscriptionDispatchRate(topic utils.TopicName, dispatchRate utils.DispatchRateData) error {
1063+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscriptionDispatchRate")
1064+
return t.pulsar.Client.Post(endpoint, &dispatchRate)
1065+
}
1066+
1067+
func (t *topics) RemoveSubscriptionDispatchRate(topic utils.TopicName) error {
1068+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscriptionDispatchRate")
1069+
return t.pulsar.Client.Delete(endpoint)
1070+
}
1071+
1072+
func (t *topics) GetMaxConsumersPerSubscription(topic utils.TopicName) (int, error) {
1073+
var maxConsumers int
1074+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxConsumersPerSubscription")
1075+
err := t.pulsar.Client.Get(endpoint, &maxConsumers)
1076+
return maxConsumers, err
1077+
}
1078+
1079+
func (t *topics) SetMaxConsumersPerSubscription(topic utils.TopicName, maxConsumers int) error {
1080+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxConsumersPerSubscription")
1081+
return t.pulsar.Client.Post(endpoint, &maxConsumers)
1082+
}
1083+
1084+
func (t *topics) RemoveMaxConsumersPerSubscription(topic utils.TopicName) error {
1085+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxConsumersPerSubscription")
1086+
return t.pulsar.Client.Delete(endpoint)
1087+
}
1088+
1089+
func (t *topics) GetMaxMessageSize(topic utils.TopicName) (int, error) {
1090+
var maxMessageSize int
1091+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxMessageSize")
1092+
err := t.pulsar.Client.Get(endpoint, &maxMessageSize)
1093+
return maxMessageSize, err
1094+
}
1095+
1096+
func (t *topics) SetMaxMessageSize(topic utils.TopicName, maxMessageSize int) error {
1097+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxMessageSize")
1098+
return t.pulsar.Client.Post(endpoint, &maxMessageSize)
1099+
}
1100+
1101+
func (t *topics) RemoveMaxMessageSize(topic utils.TopicName) error {
1102+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxMessageSize")
1103+
return t.pulsar.Client.Delete(endpoint)
1104+
}
1105+
1106+
func (t *topics) GetMaxSubscriptionsPerTopic(topic utils.TopicName) (int, error) {
1107+
var maxSubscriptions int
1108+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxSubscriptionsPerTopic")
1109+
err := t.pulsar.Client.Get(endpoint, &maxSubscriptions)
1110+
return maxSubscriptions, err
1111+
}
1112+
1113+
func (t *topics) SetMaxSubscriptionsPerTopic(topic utils.TopicName, maxSubscriptions int) error {
1114+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxSubscriptionsPerTopic")
1115+
return t.pulsar.Client.Post(endpoint, &maxSubscriptions)
1116+
}
1117+
1118+
func (t *topics) RemoveMaxSubscriptionsPerTopic(topic utils.TopicName) error {
1119+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxSubscriptionsPerTopic")
1120+
return t.pulsar.Client.Delete(endpoint)
1121+
}
1122+
1123+
func (t *topics) GetSchemaValidationEnforced(topic utils.TopicName) (bool, error) {
1124+
var enabled bool
1125+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaValidationEnforced")
1126+
err := t.pulsar.Client.Get(endpoint, &enabled)
1127+
return enabled, err
1128+
}
1129+
1130+
func (t *topics) SetSchemaValidationEnforced(topic utils.TopicName, enabled bool) error {
1131+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaValidationEnforced")
1132+
return t.pulsar.Client.Post(endpoint, enabled)
1133+
}
1134+
1135+
func (t *topics) RemoveSchemaValidationEnforced(topic utils.TopicName) error {
1136+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaValidationEnforced")
1137+
return t.pulsar.Client.Delete(endpoint)
1138+
}
1139+
1140+
func (t *topics) GetDeduplicationSnapshotInterval(topic utils.TopicName) (int, error) {
1141+
var interval int
1142+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationSnapshotInterval")
1143+
err := t.pulsar.Client.Get(endpoint, &interval)
1144+
return interval, err
1145+
}
1146+
1147+
func (t *topics) SetDeduplicationSnapshotInterval(topic utils.TopicName, interval int) error {
1148+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationSnapshotInterval")
1149+
return t.pulsar.Client.Post(endpoint, &interval)
1150+
}
1151+
1152+
func (t *topics) RemoveDeduplicationSnapshotInterval(topic utils.TopicName) error {
1153+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationSnapshotInterval")
1154+
return t.pulsar.Client.Delete(endpoint)
1155+
}
1156+
1157+
func (t *topics) GetReplicatorDispatchRate(topic utils.TopicName) (*utils.DispatchRateData, error) {
1158+
var dispatchRate utils.DispatchRateData
1159+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "replicatorDispatchRate")
1160+
err := t.pulsar.Client.Get(endpoint, &dispatchRate)
1161+
return &dispatchRate, err
1162+
}
1163+
1164+
func (t *topics) SetReplicatorDispatchRate(topic utils.TopicName, dispatchRate utils.DispatchRateData) error {
1165+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "replicatorDispatchRate")
1166+
return t.pulsar.Client.Post(endpoint, &dispatchRate)
1167+
}
1168+
1169+
func (t *topics) RemoveReplicatorDispatchRate(topic utils.TopicName) error {
1170+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "replicatorDispatchRate")
1171+
return t.pulsar.Client.Delete(endpoint)
1172+
}
1173+
1174+
func (t *topics) GetAutoSubscriptionCreation(topic utils.TopicName) (*utils.AutoSubscriptionCreationOverride, error) {
1175+
var autoSubCreation utils.AutoSubscriptionCreationOverride
1176+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "autoSubscriptionCreation")
1177+
err := t.pulsar.Client.Get(endpoint, &autoSubCreation)
1178+
return &autoSubCreation, err
1179+
}
1180+
1181+
func (t *topics) SetAutoSubscriptionCreation(topic utils.TopicName,
1182+
autoSubCreation utils.AutoSubscriptionCreationOverride) error {
1183+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "autoSubscriptionCreation")
1184+
return t.pulsar.Client.Post(endpoint, &autoSubCreation)
1185+
}
1186+
1187+
func (t *topics) RemoveAutoSubscriptionCreation(topic utils.TopicName) error {
1188+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "autoSubscriptionCreation")
1189+
return t.pulsar.Client.Delete(endpoint)
1190+
}
1191+
1192+
func (t *topics) GetSchemaCompatibilityStrategy(topic utils.TopicName) (utils.SchemaCompatibilityStrategy, error) {
1193+
var strategy utils.SchemaCompatibilityStrategy
1194+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaCompatibilityStrategy")
1195+
err := t.pulsar.Client.Get(endpoint, &strategy)
1196+
return strategy, err
1197+
}
1198+
1199+
func (t *topics) SetSchemaCompatibilityStrategy(topic utils.TopicName,
1200+
strategy utils.SchemaCompatibilityStrategy) error {
1201+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaCompatibilityStrategy")
1202+
return t.pulsar.Client.Put(endpoint, strategy)
1203+
}
1204+
1205+
func (t *topics) RemoveSchemaCompatibilityStrategy(topic utils.TopicName) error {
1206+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaCompatibilityStrategy")
1207+
return t.pulsar.Client.Delete(endpoint)
1208+
}
1209+
1210+
func (t *topics) GetOffloadPolicies(topic utils.TopicName) (*utils.OffloadPolicies, error) {
1211+
var offloadPolicies utils.OffloadPolicies
1212+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offloadPolicies")
1213+
err := t.pulsar.Client.Get(endpoint, &offloadPolicies)
1214+
return &offloadPolicies, err
1215+
}
1216+
1217+
func (t *topics) SetOffloadPolicies(topic utils.TopicName, offloadPolicies utils.OffloadPolicies) error {
1218+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offloadPolicies")
1219+
return t.pulsar.Client.Post(endpoint, &offloadPolicies)
1220+
}
1221+
1222+
func (t *topics) RemoveOffloadPolicies(topic utils.TopicName) error {
1223+
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offloadPolicies")
1224+
return t.pulsar.Client.Delete(endpoint)
1225+
}

0 commit comments

Comments
 (0)