-
Notifications
You must be signed in to change notification settings - Fork 1.2k
✨ Add ability to register runnables as pre-start hooks #2044
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,6 +53,7 @@ const ( | |
defaultRenewDeadline = 10 * time.Second | ||
defaultRetryPeriod = 2 * time.Second | ||
defaultGracefulShutdownPeriod = 30 * time.Second | ||
defaultHookPeriod = 15 * time.Second | ||
|
||
defaultReadinessEndpoint = "/readyz" | ||
defaultLivenessEndpoint = "/healthz" | ||
|
@@ -161,6 +162,13 @@ type controllerManager struct { | |
// internalProceduresStop channel is used internally to the manager when coordinating | ||
// the proper shutdown of servers. This channel is also used for dependency injection. | ||
internalProceduresStop chan struct{} | ||
|
||
// prestartHooks are functions that are run immediately before calling the Start functions | ||
// of the leader election runnables. | ||
prestartHooks []Runnable | ||
|
||
// hookTimeout is the duration given to each hook to return successfully. | ||
hookTimeout time.Duration | ||
} | ||
|
||
type hasCache interface { | ||
|
@@ -235,6 +243,23 @@ func (cm *controllerManager) GetHTTPClient() *http.Client { | |
return cm.cluster.GetHTTPClient() | ||
} | ||
|
||
// Hook allows you to add hooks. | ||
func (cm *controllerManager) Hook(hook HookType, runnable Runnable) error { | ||
cm.Lock() | ||
defer cm.Unlock() | ||
|
||
if cm.started { | ||
return fmt.Errorf("unable to add new hook because the manager has already been started") | ||
} | ||
|
||
switch hook { | ||
case HookPrestartType: | ||
cm.prestartHooks = append(cm.prestartHooks, runnable) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (cm *controllerManager) GetConfig() *rest.Config { | ||
return cm.cluster.GetConfig() | ||
} | ||
|
@@ -615,6 +640,27 @@ func (cm *controllerManager) initLeaderElector() (*leaderelection.LeaderElector, | |
} | ||
|
||
func (cm *controllerManager) startLeaderElectionRunnables() error { | ||
cm.logger.Info("Running prestart hooks") | ||
for _, hook := range cm.prestartHooks { | ||
var ctx context.Context | ||
var cancel context.CancelFunc | ||
|
||
if cm.hookTimeout < 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is a 0 timeout valid? |
||
ctx, cancel = context.WithCancel(cm.internalCtx) | ||
} else { | ||
ctx, cancel = context.WithTimeout(cm.internalCtx, cm.hookTimeout) | ||
} | ||
|
||
if err := hook.Start(ctx); err != nil { | ||
cancel() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it make sense to |
||
return err | ||
} | ||
Comment on lines
+654
to
+657
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are expecting the runnable to behave correctly when it is passed a context and handle the timeout itself. Why are we not handling the timeout here as well and cancelling the runnable when the timeout is exceeded? AFAICT I could write an infinitely running hook if I just ignore the context passed to me |
||
cancel() | ||
} | ||
|
||
// All the prestart hooks have ben run, clear the slice to free the underlying resources. | ||
cm.prestartHooks = nil | ||
|
||
return cm.runnables.LeaderElection.Start(cm.internalCtx) | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -79,6 +79,9 @@ type Manager interface { | |
// AddReadyzCheck allows you to add Readyz checker | ||
AddReadyzCheck(name string, check healthz.Checker) error | ||
|
||
// Hook allows to add Runnables as hooks to modify the behavior. | ||
Hook(hook HookType, runnable Runnable) error | ||
|
||
// Start starts all registered Controllers and blocks until the context is cancelled. | ||
// Returns an error if there is an error starting any controller. | ||
// | ||
|
@@ -269,6 +272,10 @@ type Options struct { | |
// +optional | ||
Controller config.Controller | ||
|
||
// HookTimeout is the duration given to each hook to return successfully. | ||
// To use hooks without timeout, set to a negative duration, e.g. time.Duration(-1) | ||
terinjokes marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would expect a |
||
HookTimeout *time.Duration | ||
|
||
// makeBroadcaster allows deferring the creation of the broadcaster to | ||
// avoid leaking goroutines if we never call Start on this manager. It also | ||
// returns whether or not this is a "owned" broadcaster, and as such should be | ||
|
@@ -283,6 +290,15 @@ type Options struct { | |
newPprofListener func(addr string) (net.Listener, error) | ||
} | ||
|
||
// HookType defines hooks for use with AddHook. | ||
type HookType int | ||
|
||
const ( | ||
// HookPrestartType defines a hook that is run after leader election and immediately before | ||
// calling Start on the runnables that needed leader election. | ||
HookPrestartType HookType = iota | ||
) | ||
|
||
// BaseContextFunc is a function used to provide a base Context to Runnables | ||
// managed by a Manager. | ||
type BaseContextFunc func() context.Context | ||
|
@@ -438,6 +454,7 @@ func New(config *rest.Config, options Options) (Manager, error) { | |
livenessEndpointName: options.LivenessEndpointName, | ||
pprofListener: pprofListener, | ||
gracefulShutdownTimeout: *options.GracefulShutdownTimeout, | ||
hookTimeout: *options.HookTimeout, | ||
internalProceduresStop: make(chan struct{}), | ||
leaderElectionStopped: make(chan struct{}), | ||
leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel, | ||
|
@@ -539,6 +556,11 @@ func setOptionsDefaults(options Options) Options { | |
options.GracefulShutdownTimeout = &gracefulShutdownTimeout | ||
} | ||
|
||
if options.HookTimeout == nil { | ||
hookTimeout := defaultHookPeriod | ||
options.HookTimeout = &hookTimeout | ||
} | ||
|
||
if options.Logger.GetSink() == nil { | ||
options.Logger = log.Log | ||
} | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -1197,6 +1197,121 @@ var _ = Describe("manger.Manager", func() { | |||||
Expect(time.Since(beforeDone)).To(BeNumerically(">=", 1500*time.Millisecond)) | ||||||
}) | ||||||
|
||||||
It("should run prestart hooks before calling Start on leader election runnables", func() { | ||||||
m, err := New(cfg, options) | ||||||
Expect(err).NotTo(HaveOccurred()) | ||||||
for _, cb := range callbacks { | ||||||
cb(m) | ||||||
} | ||||||
|
||||||
runnableRan := make(chan struct{}) | ||||||
|
||||||
Expect(m.Add(RunnableFunc(func(ctx context.Context) error { | ||||||
close(runnableRan) | ||||||
return nil | ||||||
}))).ToNot(HaveOccurred()) | ||||||
|
||||||
Expect(m.Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error { | ||||||
Expect(m.Elected()).ShouldNot(BeClosed()) | ||||||
Consistently(runnableRan).ShouldNot(BeClosed()) | ||||||
return nil | ||||||
}))).ToNot(HaveOccurred()) | ||||||
|
||||||
ctx, cancel := context.WithCancel(context.Background()) | ||||||
defer cancel() | ||||||
go func() { | ||||||
defer GinkgoRecover() | ||||||
Expect(m.Elected()).ShouldNot(BeClosed()) | ||||||
Expect(m.Start(ctx)).NotTo(HaveOccurred()) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Flow of the BDD would be better this way no?
Suggested change
|
||||||
}() | ||||||
|
||||||
<-m.Elected() | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about wrapping this in an Eventually so that we can timeout in case this never gets closed Will have the same effect right?
Suggested change
|
||||||
}) | ||||||
|
||||||
It("should run prestart hooks with timeout", func() { | ||||||
m, err := New(cfg, options) | ||||||
Expect(err).NotTo(HaveOccurred()) | ||||||
for _, cb := range callbacks { | ||||||
cb(m) | ||||||
} | ||||||
m.(*controllerManager).hookTimeout = 1 * time.Nanosecond | ||||||
|
||||||
Expect(m.Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error { | ||||||
select { | ||||||
case <-ctx.Done(): | ||||||
return ctx.Err() | ||||||
case <-time.After(1 * time.Second): | ||||||
return errors.New("prestart hook timeout exceeded expected") | ||||||
} | ||||||
Comment on lines
+1241
to
+1245
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are relying on the fact that the context is started before this function is called? Possibly racy? |
||||||
}))).ToNot(HaveOccurred()) | ||||||
|
||||||
ctx, cancel := context.WithCancel(context.Background()) | ||||||
defer cancel() | ||||||
|
||||||
Expect(m.Start(ctx)).Should(MatchError(context.DeadlineExceeded)) | ||||||
}) | ||||||
|
||||||
It("should run prestart hooks without timeout", func() { | ||||||
m, err := New(cfg, options) | ||||||
Expect(err).NotTo(HaveOccurred()) | ||||||
for _, cb := range callbacks { | ||||||
cb(m) | ||||||
} | ||||||
m.(*controllerManager).hookTimeout = -1 * time.Second | ||||||
|
||||||
Expect(m.Add(RunnableFunc(func(ctx context.Context) error { | ||||||
fmt.Println("runnable returning") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the printing just for debug? |
||||||
return nil | ||||||
}))).ToNot(HaveOccurred()) | ||||||
|
||||||
Expect(m.Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error { | ||||||
select { | ||||||
case <-ctx.Done(): | ||||||
return ctx.Err() | ||||||
case <-time.After(1 * time.Second): | ||||||
fmt.Println("prestart hook returning") | ||||||
return nil | ||||||
} | ||||||
}))).ToNot(HaveOccurred()) | ||||||
|
||||||
ctx, cancel := context.WithCancel(context.Background()) | ||||||
defer cancel() | ||||||
|
||||||
go func() { | ||||||
defer GinkgoRecover() | ||||||
Expect(m.Elected()).ShouldNot(BeClosed()) | ||||||
Expect(m.Start(ctx)).NotTo(HaveOccurred()) | ||||||
}() | ||||||
|
||||||
<-m.Elected() | ||||||
}) | ||||||
|
||||||
It("should not run leader election runnables if prestart hooks fail", func() { | ||||||
m, err := New(cfg, options) | ||||||
Expect(err).NotTo(HaveOccurred()) | ||||||
for _, cb := range callbacks { | ||||||
cb(m) | ||||||
} | ||||||
|
||||||
runnableRan := make(chan struct{}) | ||||||
|
||||||
Expect(m.Add(RunnableFunc(func(ctx context.Context) error { | ||||||
close(runnableRan) | ||||||
return nil | ||||||
}))).ToNot(HaveOccurred()) | ||||||
|
||||||
Expect(m.Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error { | ||||||
Expect(m.Elected()).ShouldNot(BeClosed()) | ||||||
Consistently(runnableRan).ShouldNot(BeClosed()) | ||||||
return errors.New("prestart hook failed") | ||||||
}))).ToNot(HaveOccurred()) | ||||||
|
||||||
ctx, cancel := context.WithCancel(context.Background()) | ||||||
defer cancel() | ||||||
|
||||||
Expect(m.Elected()).ShouldNot(BeClosed()) | ||||||
Expect(m.Start(ctx)).Should(MatchError(ContainSubstring("prestart hook failed"))) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to check runnable ran is still open? |
||||||
}) | ||||||
} | ||||||
|
||||||
Context("with defaults", func() { | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we return an error for an unknown hook type?