@@ -103,6 +103,7 @@ func (m ClusterStateFeederFactory) Make() *clusterStateFeeder {
103
103
vpaCheckpointClient : m .VpaCheckpointClient ,
104
104
vpaCheckpointLister : m .VpaCheckpointLister ,
105
105
vpaLister : m .VpaLister ,
106
+ podLister : m .PodLister ,
106
107
clusterState : m .ClusterState ,
107
108
specClient : spec .NewSpecClient (m .PodLister ),
108
109
selectorFetcher : m .SelectorFetcher ,
@@ -211,6 +212,7 @@ type clusterStateFeeder struct {
211
212
vpaCheckpointClient vpa_api.VerticalPodAutoscalerCheckpointsGetter
212
213
vpaCheckpointLister vpa_lister.VerticalPodAutoscalerCheckpointLister
213
214
vpaLister vpa_lister.VerticalPodAutoscalerLister
215
+ podLister v1lister.PodLister
214
216
clusterState model.ClusterState
215
217
selectorFetcher target.VpaTargetSelectorFetcher
216
218
memorySaveMode bool
@@ -220,55 +222,6 @@ type clusterStateFeeder struct {
220
222
vpaObjectNamespace string
221
223
}
222
224
223
- func (feeder * clusterStateFeeder ) InitFromHistoryProvider (historyProvider history.HistoryProvider ) {
224
- klog .V (3 ).InfoS ("Initializing VPA from history provider" )
225
- clusterHistory , err := historyProvider .GetClusterHistory ()
226
- if err != nil {
227
- klog .ErrorS (err , "Cannot get cluster history" )
228
- }
229
- for podID , podHistory := range clusterHistory {
230
- klog .V (4 ).InfoS ("Adding pod with labels" , "pod" , podID , "labels" , podHistory .LastLabels )
231
- feeder .clusterState .AddOrUpdatePod (podID , podHistory .LastLabels , apiv1 .PodUnknown )
232
- for containerName , sampleList := range podHistory .Samples {
233
- containerID := model.ContainerID {
234
- PodID : podID ,
235
- ContainerName : containerName ,
236
- }
237
- klog .V (0 ).InfoS ("Adding" , "container" , containerID )
238
- // TODO @jklaw90: pass the container type here
239
- if err = feeder .clusterState .AddOrUpdateContainer (containerID , nil , model .ContainerTypeStandard ); err != nil {
240
- klog .V (0 ).InfoS ("Failed to add container" , "container" , containerID , "error" , err )
241
- }
242
- klog .V (4 ).InfoS ("Adding samples for container" , "sampleCount" , len (sampleList ), "container" , containerID )
243
- for _ , sample := range sampleList {
244
- if err := feeder .clusterState .AddSample (
245
- & model.ContainerUsageSampleWithKey {
246
- ContainerUsageSample : sample ,
247
- Container : containerID ,
248
- }); err != nil {
249
- klog .V (0 ).InfoS ("Failed to add sample" , "sample" , sample , "error" , err )
250
- }
251
- }
252
- }
253
- }
254
- }
255
-
256
- func (feeder * clusterStateFeeder ) setVpaCheckpoint (checkpoint * vpa_types.VerticalPodAutoscalerCheckpoint ) error {
257
- vpaID := model.VpaID {Namespace : checkpoint .Namespace , VpaName : checkpoint .Spec .VPAObjectName }
258
- vpa , exists := feeder .clusterState .VPAs ()[vpaID ]
259
- if ! exists {
260
- return fmt .Errorf ("cannot load checkpoint to missing VPA object %s/%s" , vpaID .Namespace , vpaID .VpaName )
261
- }
262
-
263
- cs := model .NewAggregateContainerState ()
264
- err := cs .LoadFromCheckpoint (& checkpoint .Status )
265
- if err != nil {
266
- return fmt .Errorf ("cannot load checkpoint for VPA %s/%s. Reason: %v" , vpaID .Namespace , vpaID .VpaName , err )
267
- }
268
- vpa .ContainersInitialAggregateState [checkpoint .Spec .ContainerName ] = cs
269
- return nil
270
- }
271
-
272
225
func (feeder * clusterStateFeeder ) InitFromCheckpoints (ctx context.Context ) {
273
226
klog .V (3 ).InfoS ("Initializing VPA from checkpoints" )
274
227
feeder .LoadVPAs (ctx )
@@ -300,6 +253,49 @@ func (feeder *clusterStateFeeder) InitFromCheckpoints(ctx context.Context) {
300
253
}
301
254
}
302
255
256
+ func (feeder * clusterStateFeeder ) InitFromHistoryProvider (historyProvider history.HistoryProvider ) {
257
+ pods := feeder .podSpecLookup ()
258
+ klog .V (3 ).InfoS ("Initializing VPA from history provider" )
259
+ clusterHistory , err := historyProvider .GetClusterHistory ()
260
+ if err != nil {
261
+ klog .ErrorS (err , "Cannot get cluster history" )
262
+ }
263
+ for podID , podHistory := range clusterHistory {
264
+ // no need to load history if the pod no longer exists
265
+ podSpec , ok := pods [podID ]
266
+ if ! ok {
267
+ continue
268
+ }
269
+ klog .V (4 ).InfoS ("Adding pod with labels" , "pod" , podID , "labels" , podHistory .LastLabels )
270
+ feeder .clusterState .AddOrUpdatePod (podID , podHistory .LastLabels , podSpec .Phase )
271
+ for containerName , sampleList := range podHistory .Samples {
272
+ containerID := model.ContainerID {
273
+ PodID : podID ,
274
+ ContainerName : containerName ,
275
+ }
276
+ klog .V (0 ).InfoS ("Adding" , "container" , containerID )
277
+
278
+ containerSpec := podSpec .GetContainerSpec (containerName )
279
+ if containerSpec == nil {
280
+ continue
281
+ }
282
+ if err = feeder .clusterState .AddOrUpdateContainer (containerID , nil , containerSpec .ContainerType ); err != nil {
283
+ klog .V (0 ).InfoS ("Failed to add container" , "container" , containerID , "error" , err )
284
+ }
285
+ klog .V (4 ).InfoS ("Adding samples for container" , "sampleCount" , len (sampleList ), "container" , containerID )
286
+ for _ , sample := range sampleList {
287
+ if err := feeder .clusterState .AddSample (
288
+ & model.ContainerUsageSampleWithKey {
289
+ ContainerUsageSample : sample ,
290
+ Container : containerID ,
291
+ }); err != nil {
292
+ klog .V (0 ).InfoS ("Failed to add sample" , "sample" , sample , "error" , err )
293
+ }
294
+ }
295
+ }
296
+ }
297
+ }
298
+
303
299
func (feeder * clusterStateFeeder ) GarbageCollectCheckpoints (ctx context.Context ) {
304
300
klog .V (3 ).InfoS ("Starting garbage collection of checkpoints" )
305
301
@@ -341,82 +337,6 @@ func (feeder *clusterStateFeeder) GarbageCollectCheckpoints(ctx context.Context)
341
337
}
342
338
}
343
339
344
- func (feeder * clusterStateFeeder ) shouldIgnoreNamespace (namespace string ) bool {
345
- // 1. `vpaObjectNamespace` is set but doesn't match the current namespace.
346
- if feeder .vpaObjectNamespace != "" && namespace != feeder .vpaObjectNamespace {
347
- return true
348
- }
349
- // 2. `ignoredNamespaces` is set, and the current namespace is in the list.
350
- if len (feeder .ignoredNamespaces ) > 0 && slices .Contains (feeder .ignoredNamespaces , namespace ) {
351
- return true
352
- }
353
- return false
354
- }
355
-
356
- func (feeder * clusterStateFeeder ) cleanupCheckpointsForNamespace (ctx context.Context , namespace string , allVPAKeys map [model.VpaID ]bool ) error {
357
- var err error
358
- checkpointList , err := feeder .vpaCheckpointLister .VerticalPodAutoscalerCheckpoints (namespace ).List (labels .Everything ())
359
-
360
- if err != nil {
361
- return err
362
- }
363
- for _ , checkpoint := range checkpointList {
364
- vpaID := model.VpaID {Namespace : checkpoint .Namespace , VpaName : checkpoint .Spec .VPAObjectName }
365
- if ! allVPAKeys [vpaID ] {
366
- if errFeeder := feeder .vpaCheckpointClient .VerticalPodAutoscalerCheckpoints (namespace ).Delete (ctx , checkpoint .Name , metav1.DeleteOptions {}); errFeeder != nil {
367
- err = fmt .Errorf ("failed to delete orphaned checkpoint %s: %w" , klog .KRef (namespace , checkpoint .Name ), err )
368
- continue
369
- }
370
- klog .V (3 ).InfoS ("Orphaned VPA checkpoint cleanup - deleting" , "checkpoint" , klog .KRef (namespace , checkpoint .Name ))
371
- }
372
- }
373
- return err
374
- }
375
-
376
- func implicitDefaultRecommender (selectors []* vpa_types.VerticalPodAutoscalerRecommenderSelector ) bool {
377
- return len (selectors ) == 0
378
- }
379
-
380
- func selectsRecommender (selectors []* vpa_types.VerticalPodAutoscalerRecommenderSelector , name * string ) bool {
381
- for _ , s := range selectors {
382
- if s .Name == * name {
383
- return true
384
- }
385
- }
386
- return false
387
- }
388
-
389
- // Filter VPA objects whose specified recommender names are not default
390
- func filterVPAs (feeder * clusterStateFeeder , allVpaCRDs []* vpa_types.VerticalPodAutoscaler ) []* vpa_types.VerticalPodAutoscaler {
391
- klog .V (3 ).InfoS ("Start selecting the vpaCRDs." )
392
- var vpaCRDs []* vpa_types.VerticalPodAutoscaler
393
- for _ , vpaCRD := range allVpaCRDs {
394
- if feeder .recommenderName == DefaultRecommenderName {
395
- if ! implicitDefaultRecommender (vpaCRD .Spec .Recommenders ) && ! selectsRecommender (vpaCRD .Spec .Recommenders , & feeder .recommenderName ) {
396
- klog .V (6 ).InfoS ("Ignoring vpaCRD as current recommender's name doesn't appear among its recommenders" , "vpaCRD" , klog .KObj (vpaCRD ), "recommenderName" , feeder .recommenderName )
397
- continue
398
- }
399
- } else {
400
- if implicitDefaultRecommender (vpaCRD .Spec .Recommenders ) {
401
- klog .V (6 ).InfoS ("Ignoring vpaCRD as recommender doesn't process CRDs implicitly destined to default recommender" , "vpaCRD" , klog .KObj (vpaCRD ), "recommenderName" , feeder .recommenderName , "defaultRecommenderName" , DefaultRecommenderName )
402
- continue
403
- }
404
- if ! selectsRecommender (vpaCRD .Spec .Recommenders , & feeder .recommenderName ) {
405
- klog .V (6 ).InfoS ("Ignoring vpaCRD as current recommender's name doesn't appear among its recommenders" , "vpaCRD" , klog .KObj (vpaCRD ), "recommenderName" , feeder .recommenderName )
406
- continue
407
- }
408
- }
409
-
410
- if feeder .shouldIgnoreNamespace (vpaCRD .Namespace ) {
411
- klog .V (6 ).InfoS ("Ignoring vpaCRD as this namespace is ignored" , "vpaCRD" , klog .KObj (vpaCRD ))
412
- continue
413
- }
414
-
415
- vpaCRDs = append (vpaCRDs , vpaCRD )
416
- }
417
- return vpaCRDs
418
- }
419
-
420
340
// LoadVPAs fetches VPA objects and loads them into the cluster state.
421
341
func (feeder * clusterStateFeeder ) LoadVPAs (ctx context.Context ) {
422
342
// List VPA API objects.
@@ -468,14 +388,7 @@ func (feeder *clusterStateFeeder) LoadVPAs(ctx context.Context) {
468
388
469
389
// LoadPods loads pod into the cluster state.
470
390
func (feeder * clusterStateFeeder ) LoadPods () {
471
- podSpecs , err := feeder .specClient .GetPodSpecs ()
472
- if err != nil {
473
- klog .ErrorS (err , "Cannot get SimplePodSpecs" )
474
- }
475
- pods := make (map [model.PodID ]* spec.BasicPodSpec )
476
- for _ , spec := range podSpecs {
477
- pods [spec .ID ] = spec
478
- }
391
+ pods := feeder .podSpecLookup ()
479
392
for key := range feeder .clusterState .Pods () {
480
393
if _ , exists := pods [key ]; ! exists {
481
394
klog .V (3 ).InfoS ("Deleting Pod" , "pod" , klog .KRef (key .Namespace , key .PodName ))
@@ -488,13 +401,13 @@ func (feeder *clusterStateFeeder) LoadPods() {
488
401
}
489
402
feeder .clusterState .AddOrUpdatePod (pod .ID , pod .PodLabels , pod .Phase )
490
403
for _ , container := range pod .Containers {
491
- if err = feeder .clusterState .AddOrUpdateContainer (container .ID , container .Request , container .ContainerType ); err != nil {
404
+ if err : = feeder .clusterState .AddOrUpdateContainer (container .ID , container .Request , container .ContainerType ); err != nil {
492
405
klog .V (0 ).InfoS ("Failed to add container" , "container" , container .ID , "error" , err )
493
406
}
494
407
}
495
408
for _ , initContainer := range pod .InitContainers {
496
409
if features .Enabled (features .NativeSidecar ) && initContainer .ContainerType == model .ContainerTypeInitSidecar {
497
- if err = feeder .clusterState .AddOrUpdateContainer (initContainer .ID , initContainer .Request , initContainer .ContainerType ); err != nil {
410
+ if err : = feeder .clusterState .AddOrUpdateContainer (initContainer .ID , initContainer .Request , initContainer .ContainerType ); err != nil {
498
411
klog .V (0 ).InfoS ("Failed to add initContainer" , "container" , initContainer .ID , "error" , err )
499
412
}
500
413
} else {
@@ -551,6 +464,109 @@ Loop:
551
464
metrics_recommender .RecordAggregateContainerStatesCount (feeder .clusterState .StateMapSize ())
552
465
}
553
466
467
+ func (feeder * clusterStateFeeder ) podSpecLookup () map [model.PodID ]* spec.BasicPodSpec {
468
+ podSpecs , err := feeder .specClient .GetPodSpecs ()
469
+ if err != nil {
470
+ klog .ErrorS (err , "Cannot get SimplePodSpecs" )
471
+ }
472
+ pods := make (map [model.PodID ]* spec.BasicPodSpec )
473
+ for _ , spec := range podSpecs {
474
+ pods [spec .ID ] = spec
475
+ }
476
+ return pods
477
+ }
478
+
479
+ func (feeder * clusterStateFeeder ) setVpaCheckpoint (checkpoint * vpa_types.VerticalPodAutoscalerCheckpoint ) error {
480
+ vpaID := model.VpaID {Namespace : checkpoint .Namespace , VpaName : checkpoint .Spec .VPAObjectName }
481
+ vpa , exists := feeder .clusterState .VPAs ()[vpaID ]
482
+ if ! exists {
483
+ return fmt .Errorf ("cannot load checkpoint to missing VPA object %s/%s" , vpaID .Namespace , vpaID .VpaName )
484
+ }
485
+
486
+ cs := model .NewAggregateContainerState ()
487
+ err := cs .LoadFromCheckpoint (& checkpoint .Status )
488
+ if err != nil {
489
+ return fmt .Errorf ("cannot load checkpoint for VPA %s/%s. Reason: %v" , vpaID .Namespace , vpaID .VpaName , err )
490
+ }
491
+ vpa .ContainersInitialAggregateState [checkpoint .Spec .ContainerName ] = cs
492
+ return nil
493
+ }
494
+ func (feeder * clusterStateFeeder ) shouldIgnoreNamespace (namespace string ) bool {
495
+ // 1. `vpaObjectNamespace` is set but doesn't match the current namespace.
496
+ if feeder .vpaObjectNamespace != "" && namespace != feeder .vpaObjectNamespace {
497
+ return true
498
+ }
499
+ // 2. `ignoredNamespaces` is set, and the current namespace is in the list.
500
+ if len (feeder .ignoredNamespaces ) > 0 && slices .Contains (feeder .ignoredNamespaces , namespace ) {
501
+ return true
502
+ }
503
+ return false
504
+ }
505
+
506
+ func (feeder * clusterStateFeeder ) cleanupCheckpointsForNamespace (ctx context.Context , namespace string , allVPAKeys map [model.VpaID ]bool ) error {
507
+ var err error
508
+ checkpointList , err := feeder .vpaCheckpointLister .VerticalPodAutoscalerCheckpoints (namespace ).List (labels .Everything ())
509
+
510
+ if err != nil {
511
+ return err
512
+ }
513
+ for _ , checkpoint := range checkpointList {
514
+ vpaID := model.VpaID {Namespace : checkpoint .Namespace , VpaName : checkpoint .Spec .VPAObjectName }
515
+ if ! allVPAKeys [vpaID ] {
516
+ if errFeeder := feeder .vpaCheckpointClient .VerticalPodAutoscalerCheckpoints (namespace ).Delete (ctx , checkpoint .Name , metav1.DeleteOptions {}); errFeeder != nil {
517
+ err = fmt .Errorf ("failed to delete orphaned checkpoint %s: %w" , klog .KRef (namespace , checkpoint .Name ), err )
518
+ continue
519
+ }
520
+ klog .V (3 ).InfoS ("Orphaned VPA checkpoint cleanup - deleting" , "checkpoint" , klog .KRef (namespace , checkpoint .Name ))
521
+ }
522
+ }
523
+ return err
524
+ }
525
+
526
+ func implicitDefaultRecommender (selectors []* vpa_types.VerticalPodAutoscalerRecommenderSelector ) bool {
527
+ return len (selectors ) == 0
528
+ }
529
+
530
+ func selectsRecommender (selectors []* vpa_types.VerticalPodAutoscalerRecommenderSelector , name * string ) bool {
531
+ for _ , s := range selectors {
532
+ if s .Name == * name {
533
+ return true
534
+ }
535
+ }
536
+ return false
537
+ }
538
+
539
+ // Filter VPA objects whose specified recommender names are not default
540
+ func filterVPAs (feeder * clusterStateFeeder , allVpaCRDs []* vpa_types.VerticalPodAutoscaler ) []* vpa_types.VerticalPodAutoscaler {
541
+ klog .V (3 ).InfoS ("Start selecting the vpaCRDs." )
542
+ var vpaCRDs []* vpa_types.VerticalPodAutoscaler
543
+ for _ , vpaCRD := range allVpaCRDs {
544
+ if feeder .recommenderName == DefaultRecommenderName {
545
+ if ! implicitDefaultRecommender (vpaCRD .Spec .Recommenders ) && ! selectsRecommender (vpaCRD .Spec .Recommenders , & feeder .recommenderName ) {
546
+ klog .V (6 ).InfoS ("Ignoring vpaCRD as current recommender's name doesn't appear among its recommenders" , "vpaCRD" , klog .KObj (vpaCRD ), "recommenderName" , feeder .recommenderName )
547
+ continue
548
+ }
549
+ } else {
550
+ if implicitDefaultRecommender (vpaCRD .Spec .Recommenders ) {
551
+ klog .V (6 ).InfoS ("Ignoring vpaCRD as recommender doesn't process CRDs implicitly destined to default recommender" , "vpaCRD" , klog .KObj (vpaCRD ), "recommenderName" , feeder .recommenderName , "defaultRecommenderName" , DefaultRecommenderName )
552
+ continue
553
+ }
554
+ if ! selectsRecommender (vpaCRD .Spec .Recommenders , & feeder .recommenderName ) {
555
+ klog .V (6 ).InfoS ("Ignoring vpaCRD as current recommender's name doesn't appear among its recommenders" , "vpaCRD" , klog .KObj (vpaCRD ), "recommenderName" , feeder .recommenderName )
556
+ continue
557
+ }
558
+ }
559
+
560
+ if feeder .shouldIgnoreNamespace (vpaCRD .Namespace ) {
561
+ klog .V (6 ).InfoS ("Ignoring vpaCRD as this namespace is ignored" , "vpaCRD" , klog .KObj (vpaCRD ))
562
+ continue
563
+ }
564
+
565
+ vpaCRDs = append (vpaCRDs , vpaCRD )
566
+ }
567
+ return vpaCRDs
568
+ }
569
+
554
570
func (feeder * clusterStateFeeder ) matchesVPA (pod * spec.BasicPodSpec ) bool {
555
571
for vpaKey , vpa := range feeder .clusterState .VPAs () {
556
572
podLabels := labels .Set (pod .PodLabels )
0 commit comments