HertzBeat-Collector-Go is the Go implementation of the collector for Apache HertzBeat. It supports multi-protocol and multi-type monitoring data collection, featuring high performance, easy extensibility, and seamless integration.
- Supports various protocols (HTTP, JDBC, SNMP, SSH, etc.) for monitoring data collection
- Flexible and extensible task scheduling, job management, and collection strategies
- Clean architecture, easy for secondary development and integration
- Rich development, testing, and deployment scripts
- Comprehensive documentation and community support
.
βββ cmd/ # Main entry point
βββ internal/ # Core collector implementation and common modules
β βββ collector/ # Various collectors
β βββ common/ # Common modules (scheduling, jobs, types, logging, etc.)
β βββ util/ # Utilities
βββ api/ # Protocol definitions (protobuf)
βββ examples/ # Example code
βββ docs/ # Architecture and development docs
βββ tools/ # Build, CI, scripts, and tools
βββ Makefile # Build entry
βββ README.md # Project description
The collector implements a unified configuration system with three main components:
Central configuration factory that provides:
- Default values management
- Environment variable processing
- Configuration validation
- Utility methods for configuration manipulation
// Create configuration with defaults
factory := config.NewConfigFactory()
cfg := factory.CreateDefaultConfig()
// Create from environment variables
envCfg := factory.CreateFromEnv()
// Merge file config with environment overrides
mergedCfg := factory.MergeWithEnv(fileCfg)
// Validate configuration
if err := factory.ValidateConfig(cfg); err != nil {
log.Fatal("Invalid configuration:", err)
}
Three distinct entry points for different use cases:
config.LoadFromFile(path)
: File-only configuration loadingconfig.LoadFromEnv()
: Environment-only configuration loadingconfig.LoadUnified(path)
: Combined file + environment loading (recommended)
type CollectorConfig struct {
Collector CollectorSection `yaml:"collector"`
}
type CollectorSection struct {
Info CollectorInfo `yaml:"info"`
Log CollectorLogConfig `yaml:"log"`
Manager ManagerConfig `yaml:"manager"`
Identity string `yaml:"identity"`
Mode string `yaml:"mode"`
}
type ManagerConfig struct {
Host string `yaml:"host"`
Port string `yaml:"port"`
Protocol string `yaml:"protocol"`
}
The system includes comprehensive validation:
- Required fields: Identity, mode, manager host/port
- Value validation: Port numbers, protocol types, mode values
- Format validation: IP addresses, log levels
Field | Default Value | Description |
---|---|---|
Identity | hertzbeat-collector-go |
Collector identifier |
Mode | public |
Collector mode |
Collector.Name | hertzbeat-collector-go |
Collector service name |
Collector.IP | 127.0.0.1 |
Collector bind address |
Collector.Port | 8080 |
Collector service port |
Manager.Host | 127.0.0.1 |
Manager server host |
Manager.Port | 1158 |
Manager server port |
Manager.Protocol | netty |
Communication protocol |
Log.Level | info |
Logging level |
If you have existing configurations, here's how to migrate:
server:
host: "0.0.0.0"
port: 1158
transport:
protocol: "netty"
server_addr: "127.0.0.1:1158"
collector:
info:
name: hertzbeat-collector-go
ip: 0.0.0.0
port: 8080
manager:
host: 127.0.0.1
port: 1158
protocol: netty
identity: hertzbeat-collector-go
mode: public
# Install dependencies
go mod tidy
# Build
make build
# Run
./bin/collector server --config etc/hertzbeat-collector.yaml
The collector supports multiple configuration methods with a unified configuration system:
# Run with configuration file
./bin/collector server --config etc/hertzbeat-collector.yaml
Example configuration file (etc/hertzbeat-collector.yaml
):
collector:
info:
name: hertzbeat-collector-go
ip: 127.0.0.1
port: 8080
log:
level: debug
# Manager/Transport configuration
manager:
host: 127.0.0.1
port: 1158
protocol: netty
# Collector identity and mode
identity: hertzbeat-collector-go
mode: public
The Go version is fully compatible with the Java version's environment variable configuration:
# Set environment variables
export IDENTITY=local
export COLLECTOR_NAME=hertzbeat-collector-go
export COLLECTOR_IP=127.0.0.1
export COLLECTOR_PORT=8080
export MANAGER_HOST=192.168.97.0
export MANAGER_PORT=1158
export MANAGER_PROTOCOL=grpc
export MODE=public
export LOG_LEVEL=info
# Run with environment variables
./bin/collector server
# Or use Docker
docker run -d \
-e IDENTITY=local \
-e MANAGER_HOST=192.168.97.0 \
-e MANAGER_PORT=1158 \
-e MANAGER_PROTOCOL=grpc \
-e MODE=public \
--name hertzbeat-collector-go \
hertzbeat-collector-go
The collector uses a unified configuration system that supports both file and environment variable configurations:
- File + Environment: Environment variables override file settings
- Environment Only: Pure environment variable configuration
- File Only: Pure file-based configuration
Configuration precedence (highest to lowest):
- Environment variables
- Configuration file values
- Built-in defaults
Environment Variable | Description | Default Value |
---|---|---|
IDENTITY |
Collector identity | hertzbeat-collector-go |
MODE |
Collector mode (public /private ) |
public |
COLLECTOR_NAME |
Collector name | hertzbeat-collector-go |
COLLECTOR_IP |
Collector bind IP | 127.0.0.1 |
COLLECTOR_PORT |
Collector bind port | 8080 |
MANAGER_HOST |
Manager server host | 127.0.0.1 |
MANAGER_PORT |
Manager server port | 1158 |
MANAGER_PROTOCOL |
Protocol (netty /grpc ) |
netty |
LOG_LEVEL |
Log level | info |
See examples/
directory for various usage examples:
examples/main.go
- Main example with environment variablesexamples/README.md
- Complete usage guideexamples/Dockerfile
- Docker build example
This Go collector is designed to be compatible with the Java version of HertzBeat manager server. The transport layer supports both gRPC and Netty protocols for seamless integration.
The Go collector supports two communication protocols:
-
Netty Protocol (Recommended for Java server compatibility)
- Uses length-prefixed protobuf message format
- Compatible with Java Netty server implementation
- Default port: 1158
-
gRPC Protocol
- Uses standard gRPC with protobuf
- Supports bidirectional streaming
- Default port: 1159
The collector supports flexible configuration through multiple entry points:
# etc/hertzbeat-collector.yaml
collector:
info:
name: hertzbeat-collector-go
ip: 127.0.0.1
port: 8080
log:
level: debug
# Manager/Transport configuration
manager:
host: 127.0.0.1
port: 1158
protocol: netty
# Collector identity and mode
identity: hertzbeat-collector-go
mode: public
The collector provides three configuration loading methods:
-
File-only Configuration:
import "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config" cfg, err := config.LoadFromFile("etc/hertzbeat-collector.yaml") if err != nil { log.Fatal("Failed to load config:", err) }
-
Environment-only Configuration:
import "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config" cfg := config.LoadFromEnv()
-
Unified Configuration (Recommended):
import "hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config" // Environment variables override file values cfg, err := config.LoadUnified("etc/hertzbeat-collector.yaml") if err != nil { log.Fatal("Failed to load config:", err) }
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"time"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/config"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/collector/common/transport"
)
func main() {
// Load configuration using unified loader (file + env)
cfg, err := config.LoadUnified("etc/hertzbeat-collector.yaml")
if err != nil {
log.Fatal("Failed to load configuration:", err)
}
// Create transport runner with unified config
runner := transport.New(cfg)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start transport in background
go func() {
if err := runner.Start(ctx); err != nil {
log.Printf("Failed to start transport: %v", err)
cancel()
}
}()
// Wait for shutdown signal
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
log.Println("Shutting down...")
if err := runner.Close(); err != nil {
log.Printf("Failed to close transport: %v", err)
}
}
For more granular control, you can use the transport client directly:
package main
import (
"log"
"hertzbeat.apache.org/hertzbeat-collector-go/internal/transport"
pb "hertzbeat.apache.org/hertzbeat-collector-go/api/cluster_msg"
)
func main() {
// Create Netty client for Java server
factory := &transport.TransportClientFactory{}
client, err := factory.CreateClient("netty", "127.0.0.1:1158")
if err != nil {
log.Fatal("Failed to create client:", err)
}
// Start client
if err := client.Start(); err != nil {
log.Fatal("Failed to start client:", err)
}
defer client.Shutdown()
// Register message processor
client.RegisterProcessor(100, func(msg interface{}) (interface{}, error) {
if pbMsg, ok := msg.(*pb.Message); ok {
log.Printf("Received message: %s", string(pbMsg.Msg))
return &pb.Message{
Type: pb.MessageType_HEARTBEAT,
Direction: pb.Direction_RESPONSE,
Identity: pbMsg.Identity,
Msg: []byte("response"),
}, nil
}
return nil, nil
})
// Send heartbeat message
heartbeat := &pb.Message{
Type: pb.MessageType_HEARTBEAT,
Direction: pb.Direction_REQUEST,
Identity: "go-collector",
Msg: []byte("heartbeat"),
}
// Async send
if err := client.SendMsg(heartbeat); err != nil {
log.Printf("Failed to send message: %v", err)
}
// Sync send with timeout
resp, err := client.SendMsgSync(heartbeat, 5000)
if err != nil {
log.Printf("Failed to send sync message: %v", err)
} else if resp != nil {
if pbResp, ok := resp.(*pb.Message); ok {
log.Printf("Received response: %s", string(pbResp.Msg))
}
}
}
The Go collector supports all message types defined in the Java version:
Message Type | Value | Description |
---|---|---|
HEARTBEAT | 0 | Heartbeat/health check |
GO_ONLINE | 1 | Collector online notification |
GO_OFFLINE | 2 | Collector offline notification |
GO_CLOSE | 3 | Collector shutdown notification |
ISSUE_CYCLIC_TASK | 4 | Issue cyclic collection task |
DELETE_CYCLIC_TASK | 5 | Delete cyclic collection task |
ISSUE_ONE_TIME_TASK | 6 | Issue one-time collection task |
The transport layer provides robust connection management:
- Auto-reconnection: Automatically attempts to reconnect when connection is lost
- Connection monitoring: Background monitoring of connection health
- Heartbeat mechanism: Regular heartbeat messages to maintain connection
- Event handling: Connection state change notifications (connected, disconnected, connection failed)
The implementation includes comprehensive error handling:
- Connection timeouts: Proper timeout handling for connection attempts
- Message serialization: Protobuf marshaling/unmarshaling error handling
- Response correlation: Proper matching of requests and responses using identity field
- Graceful shutdown: Clean shutdown procedures with context cancellation
The Go collector implementation provides comprehensive compatibility with the Java version:
-
Transport Layer Compatibility
- Netty Protocol: Complete implementation with length-prefixed message format
- gRPC Protocol: Full gRPC service implementation with bidirectional streaming
- Message Types: All core message types (HEARTBEAT, GO_ONLINE, GO_OFFLINE, etc.) are supported
- Request/Response Pattern: Proper handling of synchronous and asynchronous communication
-
Connection Management
- Auto-reconnection: Robust reconnection logic when connection is lost
- Connection Monitoring: Background health checks with deadline management
- Event System: Comprehensive event handling for connection state changes
- Heartbeat Mechanism: Regular heartbeat messages for connection maintenance
-
Message Processing
- Processor Registry: Dynamic message processor registration and dispatch
- Response Correlation: Proper request-response matching using identity field
- Error Handling: Comprehensive error handling throughout the message pipeline
- Timeout Management: Configurable timeouts for all operations
-
Protocol Compatibility
- Protobuf Messages: Exact compatibility with Java protobuf definitions
- Message Serialization: Proper binary format handling for Netty protocol
- Stream Processing: Support for both unary and streaming gRPC operations
-
Task Processing Logic
- Current implementation returns placeholder responses for task processing
- Actual collection logic needs to be implemented based on specific requirements
- Task scheduling and execution engine needs integration
-
Configuration Management
- Configuration file format needs to be standardized with Java version
- Environment variable support could be enhanced
- Dynamic configuration reloading could be added
-
Monitoring and Metrics
- Comprehensive metrics collection could be added
- Performance monitoring integration could be enhanced
- Health check endpoints could be exposed
-
Netty Protocol Implementation
// Length-prefixed message format for Java compatibility func (c *NettyClient) writeMessage(msg *pb.Message) error { data, err := proto.Marshal(msg) if err != nil { return fmt.Errorf("failed to marshal message: %w", err) } // Write length prefix (varint32) length := len(data) if err := binary.Write(c.writer, binary.BigEndian, uint32(length)); err != nil { return fmt.Errorf("failed to write length: %w", err) } // Write message data if _, err := c.writer.Write(data); err != nil { return fmt.Errorf("failed to write message: %w", err) } return c.writer.Flush() }
-
Response Future Pattern
// Synchronous communication using ResponseFuture func (c *NettyClient) SendMsgSync(msg interface{}, timeoutMillis int) (interface{}, error) { // Create response future for this request future := NewResponseFuture() c.responseTable[pbMsg.Identity] = future defer delete(c.responseTable, pbMsg.Identity) // Send message if err := c.writeMessage(pbMsg); err != nil { future.PutError(err) return nil, err } // Wait for response with timeout return future.Wait(time.Duration(timeoutMillis) * time.Millisecond) }
-
Event-Driven Architecture
// Connection event handling func (c *NettyClient) triggerEvent(eventType EventType, err error) { if c.eventHandler != nil { c.eventHandler(Event{ Type: eventType, Address: c.addr, Error: err, }) } }
The Go implementation achieves high compatibility with the Java version:
- Protocol Level: 100% compatible with Netty message format
- Message Types: All core message types implemented
- Communication Patterns: Both sync and async patterns supported
- Connection Management: Robust connection handling with auto-recovery
- Error Handling: Comprehensive error handling throughout
-
For Production Use:
- Implement actual task processing logic based on specific monitoring requirements
- Add comprehensive logging and monitoring
- Implement configuration validation and management
- Add integration tests with Java server
-
For Development:
- The current implementation provides a solid foundation
- All core communication patterns are correctly implemented
- Protocol compatibility is thoroughly addressed
- Extensibility is built into the architecture
-
Testing Strategy:
- Test with actual Java server deployment
- Verify message format compatibility
- Test connection recovery scenarios
- Validate performance under load
The Go collector implementation successfully recreates the core communication capabilities of the Java version, providing a solid foundation for HertzBeat monitoring data collection in Go.
Contributions are welcome! Please see CONTRIBUTING.md for details, including code, documentation, tests, and discussions.
This project is licensed under the Apache 2.0 License.
For Chinese documentation, please see README-CN.md.