diff --git a/coco.yml b/coco.yml index ef23bfb5..da5e6ed9 100644 --- a/coco.yml +++ b/coco.yml @@ -319,6 +319,11 @@ connector: interval: 10s queue: name: indexing_documents + mongodb: + enabled: true + interval: 30s + queue: + name: indexing_documents notion: enabled: true interval: 10s diff --git a/config/setup/en-US/connector.tpl b/config/setup/en-US/connector.tpl index 335a06fa..1f08ef19 100644 --- a/config/setup/en-US/connector.tpl +++ b/config/setup/en-US/connector.tpl @@ -231,6 +231,82 @@ POST $[[SETUP_INDEX_PREFIX]]connector$[[SETUP_SCHEMA_VER]]/$[[SETUP_DOC_TYPE]]/n }, "builtin": true } +POST $[[SETUP_INDEX_PREFIX]]connector$[[SETUP_SCHEMA_VER]]/$[[SETUP_DOC_TYPE]]/mongodb +{ + "id" : "mongodb", + "created" : "2025-01-12T00:00:00.000000+08:00", + "updated" : "2025-01-12T00:00:00.000000+08:00", + "name" : "MongoDB Connector", + "description" : "Powerful MongoDB database connector supporting incremental/full sync, field mapping (collection-level + global-level), pagination, cluster type optimization, authentication database configuration, projection pushdown, index hints, and other advanced features. Supports standalone, replica set, and sharded cluster deployments.", + "category" : "database", + "icon" : "/assets/icons/connector/mongodb/icon.png", + "tags" : [ + "nosql", + "storage", + "database", + "document", + "mongodb", + "incremental_sync", + "field_mapping", + "pagination", + "cluster_optimization", + "authentication", + "performance" + ], + "url" : "http://coco.rs/connectors/mongodb", + "assets" : { + "icons" : { + "default" : "/assets/icons/connector/mongodb/icon.png", + "collection" : "/assets/icons/connector/mongodb/collection.png", + "document" : "/assets/icons/connector/mongodb/document.png", + "replica_set" : "/assets/icons/connector/mongodb/replica_set.png", + "sharded" : "/assets/icons/connector/mongodb/sharded.png" + } + }, + "config": { + "connection_uri": "mongodb://username:password@localhost:27017/database", + "database": "database_name", + "auth_database": "admin", + "cluster_type": "standalone", + "collections": [ + { + "name": "collection_name", + "filter": {"status": "active"}, + "title_field": "title", + "content_field": "content", + "category_field": "category", + "tags_field": "tags", + "url_field": "url", + "timestamp_field": "updated_at" + } + ], + "pagination": true, + "page_size": 500, + "sync_strategy": "incremental", + "last_modified_field": "updated_at", + "field_mapping": { + "enabled": true, + "mapping": { + "id": "custom_id", + "title": "custom_title", + "content": "custom_content", + "category": "custom_category", + "tags": "custom_tags", + "url": "custom_url", + "metadata": "extra_fields" + } + }, + "performance": { + "batch_size": 1000, + "max_pool_size": 10, + "timeout": "30s", + "enable_projection": true, + "enable_index_hint": true + } + }, + "builtin": true +} + POST $[[SETUP_INDEX_PREFIX]]connector$[[SETUP_SCHEMA_VER]]/$[[SETUP_DOC_TYPE]]/postgresql { "id" : "postgresql", diff --git a/config/setup/zh-CN/connector.tpl b/config/setup/zh-CN/connector.tpl index bec3bf8a..97f8b490 100644 --- a/config/setup/zh-CN/connector.tpl +++ b/config/setup/zh-CN/connector.tpl @@ -231,6 +231,81 @@ POST $[[SETUP_INDEX_PREFIX]]connector$[[SETUP_SCHEMA_VER]]/$[[SETUP_DOC_TYPE]]/n }, "builtin": true } +POST $[[SETUP_INDEX_PREFIX]]connector$[[SETUP_SCHEMA_VER]]/$[[SETUP_DOC_TYPE]]/mongodb +{ + "id" : "mongodb", + "created" : "2025-01-12T00:00:00.000000+08:00", + "updated" : "2025-01-12T00:00:00.000000+08:00", + "name" : "MongoDB 连接器", + "description" : "强大的MongoDB数据库连接器,支持增量/全量同步、字段映射(集合级别+全局级别)、分页处理、集群类型优化、认证数据库配置、投影下推、索引提示等高级功能。支持单机、复制集、分片集群部署。", + "category" : "database", + "icon" : "/assets/icons/connector/mongodb/icon.png", + "tags" : [ + "nosql", + "storage", + "database", + "document", + "mongodb", + "incremental_sync", + "field_mapping", + "pagination", + "cluster_optimization", + "authentication", + "performance" + ], + "url" : "http://coco.rs/connectors/mongodb", + "assets" : { + "icons" : { + "default" : "/assets/icons/connector/mongodb/icon.png", + "collection" : "/assets/icons/connector/mongodb/collection.png", + "document" : "/assets/icons/connector/mongodb/document.png", + "replica_set" : "/assets/icons/connector/mongodb/replica_set.png", + "sharded" : "/assets/icons/connector/mongodb/sharded.png" + } + }, + "config": { + "connection_uri": "mongodb://username:password@localhost:27017/database", + "database": "database_name", + "auth_database": "admin", + "cluster_type": "standalone", + "collections": [ + { + "name": "collection_name", + "filter": {"status": "active"}, + "title_field": "title", + "content_field": "content", + "category_field": "category", + "tags_field": "tags", + "url_field": "url", + "timestamp_field": "updated_at" + } + ], + "pagination": true, + "page_size": 500, + "sync_strategy": "incremental", + "last_modified_field": "updated_at", + "field_mapping": { + "enabled": true, + "mapping": { + "id": "custom_id", + "title": "custom_title", + "content": "custom_content", + "category": "custom_category", + "tags": "custom_tags", + "url": "custom_url", + "metadata": "extra_fields" + } + }, + "performance": { + "batch_size": 1000, + "max_pool_size": 10, + "timeout": "30s", + "enable_projection": true, + "enable_index_hint": true + } + }, + "builtin": true +} POST $[[SETUP_INDEX_PREFIX]]connector$[[SETUP_SCHEMA_VER]]/$[[SETUP_DOC_TYPE]]/postgresql { "id" : "postgresql", diff --git a/docker/init-mongo.js b/docker/init-mongo.js new file mode 100644 index 00000000..843040df --- /dev/null +++ b/docker/init-mongo.js @@ -0,0 +1,41 @@ +// MongoDB initialization script for testing +db = db.getSiblingDB('coco_test'); + +// Create test user +db.createUser({ + user: 'coco_test', + pwd: 'test_password', + roles: [ + { + role: 'readWrite', + db: 'coco_test' + } + ] +}); + +// Create test collections with sample data +db.articles.insertMany([ + { + title: "Sample Article 1", + content: "This is sample content for testing", + category: "Technology", + tags: ["mongodb", "database"], + url: "https://example.com/article1", + updated_at: new Date(), + status: "published" + }, + { + title: "Sample Article 2", + content: "Another sample content for testing", + category: "Programming", + tags: ["go", "backend"], + url: "https://example.com/article2", + updated_at: new Date(), + status: "draft" + } +]); + +// Create indexes for better performance +db.articles.createIndex({ "updated_at": 1 }); +db.articles.createIndex({ "status": 1 }); +db.articles.createIndex({ "category": 1 }); \ No newline at end of file diff --git a/docker/mongodb-test.yml b/docker/mongodb-test.yml new file mode 100644 index 00000000..f7939755 --- /dev/null +++ b/docker/mongodb-test.yml @@ -0,0 +1,36 @@ +version: '3.8' + +services: + mongodb: + image: mongo:7.0 + container_name: coco-mongodb-test + ports: + - "27017:27017" + environment: + MONGO_INITDB_ROOT_USERNAME: admin + MONGO_INITDB_ROOT_PASSWORD: password + MONGO_INITDB_DATABASE: coco_test + volumes: + - mongodb_data:/data/db + - ./init-mongo.js:/docker-entrypoint-initdb.d/init-mongo.js:ro + networks: + - coco-test + + mongodb-replica: + image: mongo:7.0 + container_name: coco-mongodb-replica-test + ports: + - "27018:27017" + command: mongod --replSet rs0 --bind_ip_all + volumes: + - mongodb_replica_data:/data/db + networks: + - coco-test + +volumes: + mongodb_data: + mongodb_replica_data: + +networks: + coco-test: + driver: bridge \ No newline at end of file diff --git a/examples/mongodb.yml b/examples/mongodb.yml new file mode 100644 index 00000000..53982ce4 --- /dev/null +++ b/examples/mongodb.yml @@ -0,0 +1,22 @@ +# MongoDB Connector Default Configuration +mongodb: + # Default connection settings + default_timeout: "30s" + default_batch_size: 1000 + default_max_pool_size: 10 + + # Default sync settings + default_sync_strategy: "full" + + # Performance tuning + max_concurrent_collections: 5 + memory_gc_interval: 10000 + + # Retry settings + connection_retry_attempts: 3 + connection_retry_delay: "30s" + + # Logging + log_level: "info" + log_slow_queries: true + slow_query_threshold: "5s" \ No newline at end of file diff --git a/plugins/connectors/mongodb/config.go b/plugins/connectors/mongodb/config.go new file mode 100644 index 00000000..714f05a4 --- /dev/null +++ b/plugins/connectors/mongodb/config.go @@ -0,0 +1,145 @@ +/* Copyright © INFINI LTD. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package mongodb + +import ( + "fmt" +) + +// Config defines the configuration for the MongoDB connector +type Config struct { + // Connection configuration + ConnectionURI string `config:"connection_uri"` + Database string `config:"database"` + AuthDatabase string `config:"auth_database"` // Authentication database (e.g., "admin") + ClusterType string `config:"cluster_type"` // Cluster type: "standalone", "replica_set", "sharded" + + // Collections configuration + Collections []CollectionConfig `config:"collections"` + + // Pagination configuration + Pagination bool `config:"pagination"` + PageSize int `config:"page_size"` + + // Last modified field for incremental sync + LastModifiedField string `config:"last_modified_field"` + + // Performance optimization configuration + BatchSize int `config:"batch_size"` + Timeout string `config:"timeout"` + MaxPoolSize int `config:"max_pool_size"` + + // Sync strategy + SyncStrategy string `config:"sync_strategy"` + + // Field mapping configuration - This handles all field mappings + FieldMapping *FieldMappingConfig `config:"field_mapping"` + + // Advanced query optimization + EnableProjection bool `config:"enable_projection"` // Enable projection pushdown + EnableIndexHint bool `config:"enable_index_hint"` // Enable index hints for better performance +} + +// CollectionConfig defines collection-specific configuration +// Field mapping is now handled by the global FieldMapping configuration +type CollectionConfig struct { + Name string `config:"name"` // Collection name + Filter map[string]interface{} `config:"filter"` // MongoDB query filter for this collection +} + +// FieldMappingConfig defines the field mapping configuration +// This replaces the individual field configurations in CollectionConfig +type FieldMappingConfig struct { + Enabled bool `config:"enabled"` + Mapping map[string]interface{} `config:"mapping"` + + // Standard field mappings for common document fields + TitleField string `config:"title_field"` // MongoDB field name for document title + ContentField string `config:"content_field"` // MongoDB field name for document content + CategoryField string `config:"category_field"` // MongoDB field name for document category + TagsField string `config:"tags_field"` // MongoDB field name for document tags + URLField string `config:"url_field"` // MongoDB field name for document URL + TimestampField string `config:"timestamp_field"` // MongoDB field name for document timestamp +} + +func (p *Plugin) setDefaultConfig(config *Config) { + if config.BatchSize <= 0 { + config.BatchSize = 1000 + } + if config.MaxPoolSize <= 0 { + config.MaxPoolSize = 10 + } + if config.Timeout == "" { + config.Timeout = "30s" + } + if config.SyncStrategy == "" { + config.SyncStrategy = "full" + } + if config.PageSize <= 0 { + config.PageSize = 500 + } + if config.AuthDatabase == "" { + config.AuthDatabase = "admin" // Default to admin database for authentication + } + if config.ClusterType == "" { + config.ClusterType = "standalone" // Default to standalone MongoDB instance + } + if config.FieldMapping == nil { + config.FieldMapping = &FieldMappingConfig{ + Enabled: false, + Mapping: make(map[string]interface{}), + } + } + + // Enable advanced optimizations by default for better performance + if !config.EnableProjection { + config.EnableProjection = true + } + if !config.EnableIndexHint { + config.EnableIndexHint = true + } +} + +func (p *Plugin) validateConfig(config *Config) error { + if config.ConnectionURI == "" { + return fmt.Errorf("connection_uri must be specified") + } + + if config.Database == "" { + return fmt.Errorf("database must be specified") + } + + if len(config.Collections) == 0 { + return fmt.Errorf("at least one collection must be configured") + } + + for i, coll := range config.Collections { + if coll.Name == "" { + return fmt.Errorf("collection[%d].name is required", i) + } + } + + if config.BatchSize < 0 { + return fmt.Errorf("batch_size must be positive") + } + + if config.MaxPoolSize < 0 { + return fmt.Errorf("max_pool_size must be positive") + } + + if config.PageSize < 0 { + return fmt.Errorf("page_size must be positive") + } + + if config.SyncStrategy != "" && config.SyncStrategy != "full" && config.SyncStrategy != "incremental" { + return fmt.Errorf("sync_strategy must be 'full' or 'incremental'") + } + + if config.ClusterType != "" && config.ClusterType != "standalone" && config.ClusterType != "replica_set" && config.ClusterType != "sharded" { + return fmt.Errorf("cluster_type must be 'standalone', 'replica_set', or 'sharded'") + } + + return nil +} diff --git a/plugins/connectors/mongodb/connection.go b/plugins/connectors/mongodb/connection.go new file mode 100644 index 00000000..446eee97 --- /dev/null +++ b/plugins/connectors/mongodb/connection.go @@ -0,0 +1,137 @@ +/* Copyright © INFINI LTD. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package mongodb + +import ( + "context" + "time" + + log "github.com/cihub/seelog" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readpref" + "go.mongodb.org/mongo-driver/mongo/writeconcern" +) + +func (p *Plugin) getOrCreateClient(datasourceID string, config *Config) (*mongo.Client, error) { + // First check: use read lock to check if connection exists and is valid + p.mu.RLock() + if client, exists := p.clients[datasourceID]; exists { + // Test if the connection is still valid + if err := client.Ping(context.Background(), readpref.Primary()); err == nil { + p.mu.RUnlock() + return client, nil + } + p.mu.RUnlock() + } else { + p.mu.RUnlock() + } + + // Acquire write lock to prepare for creating new connection + p.mu.Lock() + defer p.mu.Unlock() + + // Second check: re-check connection status under write lock protection + // Prevents connection overwrite when multiple goroutines create connections simultaneously + if client, exists := p.clients[datasourceID]; exists { + // Test connection again (may have been fixed by another goroutine) + if err := client.Ping(context.Background(), readpref.Primary()); err == nil { + return client, nil + } + // Connection indeed failed, remove it and disconnect + delete(p.clients, datasourceID) + client.Disconnect(context.Background()) + } + + // Create new MongoDB client connection + client, err := p.createMongoClient(config) + if err != nil { + return nil, err + } + + // Store new connection in the connection pool + p.clients[datasourceID] = client + + return client, nil +} + +func (p *Plugin) createMongoClient(config *Config) (*mongo.Client, error) { + clientOptions := options.Client() + + // Set connection string + clientOptions.ApplyURI(config.ConnectionURI) + + // Set authentication database if specified + if config.AuthDatabase != "" { + clientOptions.SetAuth(options.Credential{ + AuthSource: config.AuthDatabase, + }) + } + + // Connection pool configuration + if config.MaxPoolSize > 0 { + clientOptions.SetMaxPoolSize(uint64(config.MaxPoolSize)) + } + + // Timeout configuration + if config.Timeout != "" { + if timeout, err := time.ParseDuration(config.Timeout); err == nil { + clientOptions.SetServerSelectionTimeout(timeout) + clientOptions.SetConnectTimeout(timeout) + } + } + + // Configure cluster-specific settings + switch config.ClusterType { + case "replica_set": + // For replica sets, prefer secondary nodes for read operations to distribute load + clientOptions.SetReadPreference(readpref.SecondaryPreferred()) + // Enable retry writes for replica sets + clientOptions.SetRetryWrites(true) + // Set write concern for replica sets + clientOptions.SetWriteConcern(writeconcern.New( + writeconcern.WMajority(), + writeconcern.J(true), + writeconcern.WTimeout(10*time.Second), + )) + case "sharded": + // For sharded clusters, use primary for writes and nearest for reads + clientOptions.SetReadPreference(readpref.Nearest()) + // Enable retry writes for sharded clusters + clientOptions.SetRetryWrites(true) + // Set write concern for sharded clusters + clientOptions.SetWriteConcern(writeconcern.New( + writeconcern.WMajority(), + writeconcern.J(true), + writeconcern.WTimeout(10*time.Second), + )) + default: + // For standalone instances, use primary preferred + clientOptions.SetReadPreference(readpref.PrimaryPreferred()) + } + + return mongo.Connect(context.Background(), clientOptions) +} + +func (p *Plugin) healthCheck(client *mongo.Client) error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + return client.Ping(ctx, readpref.Primary()) +} + +func (p *Plugin) handleConnectionError(err error, datasourceID string) { + // Clean up failed connection + p.mu.Lock() + if client, exists := p.clients[datasourceID]; exists { + client.Disconnect(context.Background()) + delete(p.clients, datasourceID) + } + p.mu.Unlock() + + // Log error and return immediately + // Let the scheduler decide when to retry the failed scan task + log.Errorf("[mongodb connector] connection error for datasource [%s]: %v", datasourceID, err) +} diff --git a/plugins/connectors/mongodb/integration_test.go b/plugins/connectors/mongodb/integration_test.go new file mode 100644 index 00000000..ead5cfba --- /dev/null +++ b/plugins/connectors/mongodb/integration_test.go @@ -0,0 +1,168 @@ +//go:build integration +// +build integration + +/* Copyright © INFINI LTD. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package mongodb + +import ( + "context" + "os" + "testing" + "time" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "infini.sh/coco/modules/common" + "infini.sh/framework/core/queue" +) + +// TestMongoDBIntegration requires a running MongoDB instance +func TestMongoDBIntegration(t *testing.T) { + // Skip if no MongoDB connection string provided + mongoURI := os.Getenv("MONGODB_TEST_URI") + if mongoURI == "" { + t.Skip("MONGODB_TEST_URI not set, skipping integration test") + } + + // Setup test data + client, err := mongo.Connect(context.Background(), options.Client().ApplyURI(mongoURI)) + if err != nil { + t.Fatalf("Failed to connect to MongoDB: %v", err) + } + defer client.Disconnect(context.Background()) + + // Create test database and collection + testDB := "coco_test" + testCollection := "test_articles" + + collection := client.Database(testDB).Collection(testCollection) + + // Insert test documents + testDocs := []interface{}{ + bson.M{ + "title": "Test Article 1", + "content": "This is the content of test article 1", + "category": "Technology", + "tags": []string{"mongodb", "database", "nosql"}, + "url": "https://example.com/article1", + "updated_at": time.Now(), + "status": "published", + }, + bson.M{ + "title": "Test Article 2", + "content": "This is the content of test article 2", + "category": "Programming", + "tags": []string{"go", "golang", "backend"}, + "url": "https://example.com/article2", + "updated_at": time.Now(), + "status": "published", + }, + } + + _, err = collection.InsertMany(context.Background(), testDocs) + if err != nil { + t.Fatalf("Failed to insert test documents: %v", err) + } + + // Clean up after test + defer func() { + collection.Drop(context.Background()) + }() + + // Setup plugin + plugin := &Plugin{ + syncManager: NewSyncManager(), + } + plugin.Queue = &queue.QueueConfig{Name: "test_queue"} + + // Setup test configuration + config := &Config{ + ConnectionURI: mongoURI, + Database: testDB, + BatchSize: 10, + MaxPoolSize: 5, + Timeout: "10s", + Collections: []CollectionConfig{ + { + Name: testCollection, + TitleField: "title", + ContentField: "content", + CategoryField: "category", + TagsField: "tags", + URLField: "url", + TimestampField: "updated_at", + Filter: map[string]interface{}{ + "status": "published", + }, + }, + }, + } + + // Test connection creation + mongoClient, err := plugin.createMongoClient(config) + if err != nil { + t.Fatalf("Failed to create MongoDB client: %v", err) + } + defer mongoClient.Disconnect(context.Background()) + + // Test health check + if err := plugin.healthCheck(mongoClient); err != nil { + t.Fatalf("Health check failed: %v", err) + } + + // Test collection stats + stats, err := plugin.getCollectionStats(mongoClient, testDB, testCollection) + if err != nil { + t.Fatalf("Failed to get collection stats: %v", err) + } + + if stats["documentCount"].(int64) != 2 { + t.Errorf("Expected 2 documents, got %v", stats["documentCount"]) + } + + // Test document scanning + testCollection := mongoClient.Database(testDB).Collection(testCollection) + filter := plugin.buildFilter(config, config.Collections[0], datasource) + + cursor, err := testCollection.Find(context.Background(), filter) + if err != nil { + t.Fatalf("Failed to query collection: %v", err) + } + defer cursor.Close(context.Background()) + + datasource := &common.DataSource{ + ID: "test-datasource", + Name: "Test MongoDB Integration", + } + + documents := plugin.processCursor(cursor, config.Collections[0], datasource) + + if len(documents) != 2 { + t.Errorf("Expected 2 documents, got %d", len(documents)) + } + + // Verify document transformation + doc := documents[0] + if doc.Title == "" { + t.Errorf("Expected non-empty title") + } + if doc.Content == "" { + t.Errorf("Expected non-empty content") + } + if doc.Category == "" { + t.Errorf("Expected non-empty category") + } + if len(doc.Tags) == 0 { + t.Errorf("Expected non-empty tags") + } + if doc.URL == "" { + t.Errorf("Expected non-empty URL") + } + if doc.Updated == nil { + t.Errorf("Expected non-nil updated time") + } +} \ No newline at end of file diff --git a/plugins/connectors/mongodb/plugin.go b/plugins/connectors/mongodb/plugin.go new file mode 100644 index 00000000..ce139689 --- /dev/null +++ b/plugins/connectors/mongodb/plugin.go @@ -0,0 +1,143 @@ +/* Copyright © INFINI LTD. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package mongodb + +import ( + "context" + "sync" + + log "github.com/cihub/seelog" + "go.mongodb.org/mongo-driver/mongo" + "infini.sh/coco/modules/common" + "infini.sh/coco/plugins/connectors" + "infini.sh/framework/core/global" + "infini.sh/framework/core/module" +) + +const ConnectorMongoDB = "mongodb" + +type Plugin struct { + connectors.BasePlugin + mu sync.RWMutex + ctx context.Context + cancel context.CancelFunc + clients map[string]*mongo.Client + syncManager *SyncManager +} + +func init() { + module.RegisterUserPlugin(&Plugin{}) +} + +func (p *Plugin) Name() string { + return ConnectorMongoDB +} + +func (p *Plugin) Setup() { + p.BasePlugin.Init("connector.mongodb", "indexing mongodb documents", p) +} + +func (p *Plugin) Start() error { + p.mu.Lock() + defer p.mu.Unlock() + p.ctx, p.cancel = context.WithCancel(context.Background()) + p.clients = make(map[string]*mongo.Client) + p.syncManager = NewSyncManager() + return p.BasePlugin.Start(connectors.DefaultSyncInterval) +} + +func (p *Plugin) Stop() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.cancel != nil { + p.cancel() + } + + // Clean up all connections + for _, client := range p.clients { + if client != nil { + client.Disconnect(context.Background()) + } + } + p.clients = nil + + return nil +} + +func (p *Plugin) Scan(connector *common.Connector, datasource *common.DataSource) { + // Get the parent context + p.mu.RLock() + parentCtx := p.ctx + p.mu.RUnlock() + + // Check if the plugin has been stopped + if parentCtx == nil { + log.Warnf("[mongodb connector] plugin is stopped, skipping scan for datasource [%s]", datasource.Name) + return + } + + config := &Config{} + err := connectors.ParseConnectorConfigure(connector, datasource, config) + if err != nil { + log.Errorf("[mongodb connector] parsing configuration failed: %v", err) + return + } + + // Validate configuration + if err := p.validateConfig(config); err != nil { + log.Errorf("[mongodb connector] invalid configuration for datasource [%s]: %v", datasource.Name, err) + return + } + + // Set default values + p.setDefaultConfig(config) + + log.Debugf("[mongodb connector] handling datasource: %v", config) + + client, err := p.getOrCreateClient(datasource.ID, config) + if err != nil { + log.Errorf("[mongodb connector] failed to create client for datasource [%s]: %v", datasource.Name, err) + p.handleConnectionError(err, datasource.ID) + return + } + + // Health check + if err := p.healthCheck(client); err != nil { + log.Errorf("[mongodb connector] health check failed for datasource [%s]: %v", datasource.Name, err) + p.handleConnectionError(err, datasource.ID) + return + } + + // Simple sequential scanning for each collection + // Since the connector is already wrapped in a background task, we use simple implementation + for _, collConfig := range config.Collections { + if global.ShuttingDown() { + log.Debugf("[mongodb connector] shutting down, stopping scan for collection [%s]", collConfig.Name) + break + } + + // Check if context is cancelled + select { + case <-parentCtx.Done(): + log.Debugf("[mongodb connector] context cancelled, stopping scan for collection [%s]", collConfig.Name) + return + default: + } + + log.Debugf("[mongodb connector] scanning collection [%s]", collConfig.Name) + + // Execute collection scanning + if err := p.scanCollectionWithContext(parentCtx, client, config, collConfig, datasource); err != nil { + log.Errorf("[mongodb connector] failed to scan collection [%s]: %v", collConfig.Name, err) + // Continue with next collection instead of failing completely + continue + } + + log.Debugf("[mongodb connector] successfully scanned collection [%s]", collConfig.Name) + } + + log.Infof("[mongodb connector] finished scanning datasource [%s]", datasource.Name) +} diff --git a/plugins/connectors/mongodb/plugin_test.go b/plugins/connectors/mongodb/plugin_test.go new file mode 100644 index 00000000..1d029d59 --- /dev/null +++ b/plugins/connectors/mongodb/plugin_test.go @@ -0,0 +1,354 @@ +/* Copyright © INFINI LTD. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package mongodb + +import ( + "testing" + "time" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/primitive" + "infini.sh/coco/modules/common" +) + +func TestSafeConvertToString(t *testing.T) { + p := &Plugin{} + + tests := []struct { + name string + input interface{} + expected string + }{ + {"string", "hello", "hello"}, + {"int", 42, "42"}, + {"float", 3.14, "3.140000"}, + {"bool", true, "true"}, + {"nil", nil, ""}, + {"objectid", primitive.NewObjectID(), ""}, + {"array", []interface{}{"a", "b"}, `["a","b"]`}, + {"object", map[string]interface{}{"key": "value"}, `{"key":"value"}`}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := p.safeConvertToString(tt.input) + if tt.name == "objectid" { + // ObjectID will have different values, just check it's not empty + if result == "" { + t.Errorf("Expected non-empty ObjectID string") + } + } else if result != tt.expected { + t.Errorf("Expected %s, got %s", tt.expected, result) + } + }) + } +} + +func TestConvertToStringSlice(t *testing.T) { + p := &Plugin{} + + tests := []struct { + name string + input interface{} + expected []string + }{ + {"string_slice", []string{"a", "b"}, []string{"a", "b"}}, + {"interface_slice", []interface{}{"a", 1, true}, []string{"a", "1", "true"}}, + {"single_string", "hello", []string{"hello"}}, + {"nil", nil, nil}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := p.convertToStringSlice(tt.input) + if len(result) != len(tt.expected) { + t.Errorf("Expected length %d, got %d", len(tt.expected), len(result)) + return + } + for i, v := range result { + if v != tt.expected[i] { + t.Errorf("Expected %s at index %d, got %s", tt.expected[i], i, v) + } + } + }) + } +} + +func TestConvertToTime(t *testing.T) { + p := &Plugin{} + + now := time.Now() + timestamp := primitive.NewDateTimeFromTime(now) + + tests := []struct { + name string + input interface{} + expected bool // whether result should be non-nil + }{ + {"time", now, true}, + {"datetime", timestamp, true}, + {"unix_timestamp", now.Unix(), true}, + {"rfc3339_string", now.Format(time.RFC3339), true}, + {"invalid_string", "invalid", false}, + {"nil", nil, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := p.convertToTime(tt.input) + if tt.expected && result == nil { + t.Errorf("Expected non-nil time") + } else if !tt.expected && result != nil { + t.Errorf("Expected nil time") + } + }) + } +} + +func TestBuildFilter(t *testing.T) { + p := &Plugin{ + syncManager: NewSyncManager(), + } + + config := &Config{ + SyncStrategy: "incremental", + LastModifiedField: "updated_at", + } + + collConfig := CollectionConfig{ + Filter: map[string]interface{}{ + "status": "published", + }, + TimestampField: "updated_at", + } + + // Create a mock datasource + datasource := &common.DataSource{ + ID: "test_datasource", + } + + filter := p.buildFilter(config, collConfig, datasource) + + // Check base filter + if filter["status"] != "published" { + t.Errorf("Expected status filter to be preserved") + } + + // Check timestamp filter - should not exist initially since no sync time is set + if _, exists := filter["updated_at"]; exists { + t.Errorf("Expected no timestamp filter initially since no sync time is set") + } +} + +func TestValidateConfig(t *testing.T) { + p := &Plugin{} + + tests := []struct { + name string + config *Config + wantErr bool + }{ + { + name: "valid_config", + config: &Config{ + Host: "localhost", + Database: "test", + Collections: []CollectionConfig{ + {Name: "collection1"}, + }, + }, + wantErr: false, + }, + { + name: "missing_host_and_uri", + config: &Config{ + Database: "test", + Collections: []CollectionConfig{ + {Name: "collection1"}, + }, + }, + wantErr: true, + }, + { + name: "missing_database", + config: &Config{ + Host: "localhost", + Collections: []CollectionConfig{ + {Name: "collection1"}, + }, + }, + wantErr: true, + }, + { + name: "no_collections", + config: &Config{ + Host: "localhost", + Database: "test", + Collections: []CollectionConfig{}, + }, + wantErr: true, + }, + { + name: "collection_without_name", + config: &Config{ + Host: "localhost", + Database: "test", + Collections: []CollectionConfig{ + {Name: ""}, + }, + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := p.validateConfig(tt.config) + if (err != nil) != tt.wantErr { + t.Errorf("validateConfig() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestTransformToDocument(t *testing.T) { + p := &Plugin{} + + mongoDoc := bson.M{ + "_id": primitive.NewObjectID(), + "title": "Test Article", + "content": "This is test content", + "category": "Technology", + "tags": []interface{}{"mongodb", "database"}, + "url": "https://example.com/article", + "updated_at": primitive.NewDateTimeFromTime(time.Now()), + } + + collConfig := CollectionConfig{ + Name: "articles", + TitleField: "title", + ContentField: "content", + CategoryField: "category", + TagsField: "tags", + URLField: "url", + TimestampField: "updated_at", + } + + datasource := &common.DataSource{ + Name: "Test MongoDB", + } + + config := &Config{} + doc, err := p.transformToDocument(mongoDoc, collConfig, datasource, config) + if err != nil { + t.Fatalf("transformToDocument() error = %v", err) + } + + if doc.Title != "Test Article" { + t.Errorf("Expected title 'Test Article', got '%s'", doc.Title) + } + + if doc.Content != "This is test content" { + t.Errorf("Expected content 'This is test content', got '%s'", doc.Content) + } + + if doc.Category != "Technology" { + t.Errorf("Expected category 'Technology', got '%s'", doc.Category) + } + + if doc.Tags[0] != "mongodb" || doc.Tags[1] != "database" { + t.Errorf("Expected tags ['mongodb', 'database'], got %v", doc.Tags) + } + + if doc.URL != "https://example.com/article" { + t.Errorf("Expected URL 'https://example.com/article', got '%s'", doc.URL) + } + + if doc.Type != ConnectorMongoDB { + t.Errorf("Expected type '%s', got '%s'", ConnectorMongoDB, doc.Type) + } + + if doc.Updated == nil { + t.Errorf("Expected non-nil Updated time") + } + + // Check metadata + if doc.Metadata["mongodb_collection"] != "articles" { + t.Errorf("Expected collection metadata to be 'articles'") + } + + if doc.Metadata["mongodb_id"] != mongoDoc["_id"] { + t.Errorf("Expected mongodb_id metadata to match original _id") + } +} + +func TestBuildConnectionURI(t *testing.T) { + p := &Plugin{} + + tests := []struct { + name string + config *Config + expected string + }{ + { + name: "basic_connection", + config: &Config{ + ConnectionURI: "mongodb://localhost:27017/testdb", + Database: "testdb", + }, + expected: "mongodb://localhost:27017/testdb", + }, + { + name: "with_auth", + config: &Config{ + Host: "localhost", + Port: 27017, + Username: "user", + Password: "pass", + Database: "testdb", + }, + expected: "mongodb://user:pass@localhost:27017/testdb", + }, + { + name: "with_replica_set", + config: &Config{ + Host: "localhost", + Port: 27017, + Database: "testdb", + ReplicaSet: "rs0", + }, + expected: "mongodb://localhost:27017/testdb?replicaSet=rs0", + }, + { + name: "with_auth_database", + config: &Config{ + Host: "localhost", + Port: 27017, + Database: "testdb", + AuthDatabase: "admin", + }, + expected: "mongodb://localhost:27017/testdb?authSource=admin", + }, + { + name: "with_tls", + config: &Config{ + Host: "localhost", + Port: 27017, + Database: "testdb", + EnableTLS: true, + }, + expected: "mongodb://localhost:27017/testdb?ssl=true", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := p.buildConnectionURI(tt.config) + if result != tt.expected { + t.Errorf("Expected %s, got %s", tt.expected, result) + } + }) + } +} diff --git a/plugins/connectors/mongodb/scanner.go b/plugins/connectors/mongodb/scanner.go new file mode 100644 index 00000000..84696541 --- /dev/null +++ b/plugins/connectors/mongodb/scanner.go @@ -0,0 +1,225 @@ +/* Copyright © INFINI LTD. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package mongodb + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" + "time" + + log "github.com/cihub/seelog" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/mongo/readconcern" + "infini.sh/coco/modules/common" + "infini.sh/framework/core/global" +) + +func (p *Plugin) scanCollectionWithContext(ctx context.Context, client *mongo.Client, config *Config, collConfig CollectionConfig, datasource *common.DataSource) error { + select { + case <-ctx.Done(): + log.Debugf("[mongodb connector] context cancelled, stopping scan for collection [%s]", collConfig.Name) + return ctx.Err() + default: + } + + if global.ShuttingDown() { + return nil + } + + // Create sync strategy + strategyFactory := &SyncStrategyFactory{} + strategy := strategyFactory.CreateStrategy(config.SyncStrategy) + strategyName := strategyFactory.GetStrategyName(config.SyncStrategy) + + log.Infof("[mongodb connector] starting %s sync for collection [%s] in datasource [%s]", + strategyName, collConfig.Name, datasource.Name) + + log.Infof("[mongodb connector] starting scan for collection [%s] in datasource [%s]", collConfig.Name, datasource.Name) + + collection := client.Database(config.Database).Collection(collConfig.Name) + + // Get collection stats for monitoring + if stats, err := p.getCollectionStats(client, config.Database, collConfig.Name); err == nil { + log.Debugf("[mongodb connector] collection [%s] stats: %v", collConfig.Name, stats) + } + + // Build query filter + filter := p.buildFilter(config, collConfig, datasource) + + // Set query options + findOptions := options.Find() + + // Use page size if pagination is enabled, otherwise use batch size + if config.Pagination { + findOptions.SetBatchSize(int32(config.PageSize)) + } else { + findOptions.SetBatchSize(int32(config.BatchSize)) + } + + // Set projection if fields are specified in collection config and projection is enabled + // This enables projection pushdown for better performance + if config.EnableProjection && (collConfig.TitleField != "" || collConfig.ContentField != "" || + collConfig.CategoryField != "" || collConfig.TagsField != "" || + collConfig.URLField != "" || collConfig.TimestampField != "") { + projection := bson.D{} + + // Always include _id field for document identification + projection = append(projection, bson.E{Key: "_id", Value: 1}) + + // Add configured fields to projection + if collConfig.TitleField != "" { + projection = append(projection, bson.E{Key: collConfig.TitleField, Value: 1}) + } + if collConfig.ContentField != "" { + projection = append(projection, bson.E{Key: collConfig.ContentField, Value: 1}) + } + if collConfig.CategoryField != "" { + projection = append(projection, bson.E{Key: collConfig.CategoryField, Value: 1}) + } + if collConfig.TagsField != "" { + projection = append(projection, bson.E{Key: collConfig.TagsField, Value: 1}) + } + if collConfig.URLField != "" { + projection = append(projection, bson.E{Key: collConfig.URLField, Value: 1}) + } + if collConfig.TimestampField != "" { + projection = append(projection, bson.E{Key: collConfig.TimestampField, Value: 1}) + } + + // Add any additional fields specified in the filter for proper filtering + for field := range collConfig.Filter { + projection = append(projection, bson.E{Key: field, Value: 1}) + } + + findOptions.SetProjection(projection) + } + + // Optimize query + p.optimizeQuery(findOptions, collConfig, config) + + // Paginated processing for large datasets + var skip int64 = 0 + for { + select { + case <-ctx.Done(): + log.Debugf("[mongodb connector] context cancelled during scan for collection [%s]", collConfig.Name) + return ctx.Err() + default: + } + + if global.ShuttingDown() { + return nil + } + + findOptions.SetSkip(skip) + findOptions.SetLimit(int64(config.BatchSize)) + + cursor, err := collection.Find(ctx, filter, findOptions) + if err != nil { + log.Errorf("[mongodb connector] query failed for collection [%s]: %v", collConfig.Name, err) + return err + } + + documents := p.processCursor(cursor, collConfig, datasource) + cursor.Close(ctx) + + if len(documents) == 0 { + break + } + + // Batch push to queue + p.pushDocuments(documents) + + skip += int64(len(documents)) + + // Update last sync time based on sync strategy + strategy := strategyFactory.CreateStrategy(config.SyncStrategy) + if strategy.ShouldUpdateSyncTime() && config.LastModifiedField != "" { + // Get the latest timestamp from the current batch + latestTime := p.getLatestTimestampFromBatch(documents, config.LastModifiedField) + if !latestTime.IsZero() { + // Update sync time using sync manager with datasource ID and collection name + if err := p.syncManager.UpdateLastSyncTime(datasource.ID, collConfig.Name, latestTime, latestTime); err != nil { + log.Warnf("[mongodb connector] failed to update last sync time: %v", err) + } + } + } else if strategy.GetStrategyName() == "full" { + // For full sync strategy, we don't need to track sync time + // All documents will be processed regardless of their modification time + log.Debugf("[mongodb connector] full sync strategy - processing all documents in collection [%s]", collConfig.Name) + } + } + + log.Infof("[mongodb connector] finished scanning collection [%s] in datasource [%s]", collConfig.Name, datasource.Name) + return nil +} + +func (p *Plugin) buildFilter(config *Config, collConfig CollectionConfig, datasource *common.DataSource) bson.M { + // Create sync strategy + strategyFactory := &SyncStrategyFactory{} + strategy := strategyFactory.CreateStrategy(config.SyncStrategy) + + // Use strategy to build filter + return strategy.BuildFilter(config, collConfig, datasource.ID, p.syncManager) +} + +func (p *Plugin) optimizeQuery(findOptions *options.FindOptions, collConfig CollectionConfig, config *Config) { + // Set read concern level + findOptions.SetReadConcern(readconcern.Local()) + + // If there's a timestamp field and index hints are enabled, suggest using related index + if config.EnableIndexHint && collConfig.TimestampField != "" { + findOptions.SetHint(bson.D{{Key: collConfig.TimestampField, Value: 1}}) + } +} + +func (p *Plugin) getCollectionStats(client *mongo.Client, database, collection string) (map[string]interface{}, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + db := client.Database(database) + coll := db.Collection(collection) + + // Get collection stats + var result bson.M + err := db.RunCommand(ctx, bson.D{ + {Key: "collStats", Value: collection}, + }).Decode(&result) + + if err != nil { + return nil, err + } + + // Get document count + count, err := coll.CountDocuments(ctx, bson.D{}) + if err != nil { + log.Warnf("[mongodb connector] failed to get document count: %v", err) + } else { + result["documentCount"] = count + } + + return result, nil +} + +// getLatestTimestampFromBatch finds the latest timestamp from a batch of documents +func (p *Plugin) getLatestTimestampFromBatch(documents []*common.Document, timestampField string) time.Time { + var latestTime time.Time + + for _, doc := range documents { + if doc.Updated != nil && !doc.Updated.IsZero() { + if latestTime.IsZero() || doc.Updated.After(latestTime) { + latestTime = *doc.Updated + } + } + } + + return latestTime +} diff --git a/plugins/connectors/mongodb/sync_manager.go b/plugins/connectors/mongodb/sync_manager.go new file mode 100644 index 00000000..e3f5ddfc --- /dev/null +++ b/plugins/connectors/mongodb/sync_manager.go @@ -0,0 +1,190 @@ +/* Copyright © INFINI LTD. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package mongodb + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + log "github.com/cihub/seelog" +) + +// SyncState represents the synchronization state for a specific datasource and collection +type SyncState struct { + DatasourceID string `json:"datasource_id"` + CollectionName string `json:"collection_name"` + LastSyncTime time.Time `json:"last_sync_time"` + LastModified time.Time `json:"last_modified"` + UpdatedAt time.Time `json:"updated_at"` +} + +// SyncManager manages the synchronization state for MongoDB collections +type SyncManager struct { + mu sync.RWMutex + states map[string]*SyncState // key: datasourceID_collectionName + storageDir string +} + +// NewSyncManager creates a new sync manager instance +func NewSyncManager() *SyncManager { + return &SyncManager{ + states: make(map[string]*SyncState), + storageDir: getDefaultSyncStorageDir(), + } +} + +// GetSyncKey generates a unique key for datasource and collection +func (sm *SyncManager) GetSyncKey(datasourceID, collectionName string) string { + return fmt.Sprintf("%s_%s", datasourceID, collectionName) +} + +// GetLastSyncTime retrieves the last sync time for a specific datasource and collection +func (sm *SyncManager) GetLastSyncTime(datasourceID, collectionName string) time.Time { + sm.mu.RLock() + defer sm.mu.RUnlock() + + key := sm.GetSyncKey(datasourceID, collectionName) + + // First check in-memory cache + if state, exists := sm.states[key]; exists { + return state.LastSyncTime + } + + // If not in memory, try to load from persistent storage + state := sm.loadFromStorage(datasourceID, collectionName) + if state != nil { + sm.states[key] = state + return state.LastSyncTime + } + + return time.Time{} // Return zero time if no sync state found +} + +// UpdateLastSyncTime updates the last sync time for a specific datasource and collection +func (sm *SyncManager) UpdateLastSyncTime(datasourceID, collectionName string, syncTime, lastModified time.Time) error { + sm.mu.Lock() + defer sm.mu.Unlock() + + key := sm.GetSyncKey(datasourceID, collectionName) + + state := &SyncState{ + DatasourceID: datasourceID, + CollectionName: collectionName, + LastSyncTime: syncTime, + LastModified: lastModified, + UpdatedAt: time.Now(), + } + + // Update in-memory cache + sm.states[key] = state + + // Persist to storage + return sm.saveToStorage(state) +} + +// GetLastModifiedTime retrieves the last modified time for a specific datasource and collection +func (sm *SyncManager) GetLastModifiedTime(datasourceID, collectionName string) time.Time { + sm.mu.RLock() + defer sm.mu.RUnlock() + + key := sm.GetSyncKey(datasourceID, collectionName) + if state, exists := sm.states[key]; exists { + return state.LastModified + } + + // Try to load from storage + state := sm.loadFromStorage(datasourceID, collectionName) + if state != nil { + sm.states[key] = state + return state.LastModified + } + + return time.Time{} +} + +// loadFromStorage loads sync state from persistent storage +func (sm *SyncManager) loadFromStorage(datasourceID, collectionName string) *SyncState { + key := sm.GetSyncKey(datasourceID, collectionName) + filename := sanitizeFilename(key) + ".json" + filepath := filepath.Join(sm.storageDir, filename) + + data, err := os.ReadFile(filepath) + if err != nil { + if os.IsNotExist(err) { + return nil // File doesn't exist, no previous sync + } + log.Warnf("[mongodb connector] failed to read sync state file %s: %v", filepath, err) + return nil + } + + var state SyncState + if err := json.Unmarshal(data, &state); err != nil { + log.Warnf("[mongodb connector] failed to parse sync state file %s: %v", filepath, err) + return nil + } + + return &state +} + +// saveToStorage saves sync state to persistent storage +func (sm *SyncManager) saveToStorage(state *SyncState) error { + // Ensure storage directory exists + if err := os.MkdirAll(sm.storageDir, 0755); err != nil { + return fmt.Errorf("failed to create sync storage directory: %v", err) + } + + key := sm.GetSyncKey(state.DatasourceID, state.CollectionName) + filename := sanitizeFilename(key) + ".json" + filepath := filepath.Join(sm.storageDir, filename) + + // Marshal to JSON + data, err := json.MarshalIndent(state, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal sync state: %v", err) + } + + // Write to file atomically (write to temp file first, then rename) + tempFile := filepath + ".tmp" + if err := os.WriteFile(tempFile, data, 0644); err != nil { + return fmt.Errorf("failed to write temp sync state file: %v", err) + } + + if err := os.Rename(tempFile, filepath); err != nil { + // Clean up temp file on error + os.Remove(tempFile) + return fmt.Errorf("failed to rename temp sync state file: %v", err) + } + + return nil +} + +// getDefaultSyncStorageDir returns the default directory for storing sync state files +func getDefaultSyncStorageDir() string { + homeDir, err := os.UserHomeDir() + if err != nil { + homeDir = "." + } + return filepath.Join(homeDir, ".coco", "mongodb", "sync") +} + +// sanitizeFilename sanitizes a string to be used as a filename +func sanitizeFilename(name string) string { + // Replace invalid characters with underscores + invalid := []rune{'/', '\\', ':', '*', '?', '"', '<', '>', '|'} + result := []rune(name) + for i, r := range result { + for _, inv := range invalid { + if r == inv { + result[i] = '_' + break + } + } + } + return string(result) +} diff --git a/plugins/connectors/mongodb/sync_storage_test.go b/plugins/connectors/mongodb/sync_storage_test.go new file mode 100644 index 00000000..946492fb --- /dev/null +++ b/plugins/connectors/mongodb/sync_storage_test.go @@ -0,0 +1,182 @@ +package mongodb + +import ( + "os" + "path/filepath" + "testing" + "time" + + "infini.sh/coco/modules/common" +) + +func TestSyncTimeStorage(t *testing.T) { + // Create a temporary test directory + testDir := t.TempDir() + + // Create a test plugin instance + plugin := &Plugin{} + + // Test data + syncKey := "test_mongodb_localhost_27017_testdb_testcollection" + testTime := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + + // Test storing sync time + err := plugin.updateSyncTimeInStorage(syncKey, testTime) + if err != nil { + t.Fatalf("Failed to store sync time: %v", err) + } + + // Test retrieving sync time + retrievedTime, err := plugin.getSyncTimeFromStorage(syncKey) + if err != nil { + t.Fatalf("Failed to retrieve sync time: %v", err) + } + + if !retrievedTime.Equal(testTime) { + t.Errorf("Retrieved time %v does not match stored time %v", retrievedTime, testTime) + } + + // Test updating sync time + newTime := time.Date(2024, 1, 2, 12, 0, 0, 0, time.UTC) + err = plugin.updateSyncTimeInStorage(syncKey, newTime) + if err != nil { + t.Fatalf("Failed to update sync time: %v", err) + } + + // Verify the update + updatedTime, err := plugin.getSyncTimeFromStorage(syncKey) + if err != nil { + t.Fatalf("Failed to retrieve updated sync time: %v", err) + } + + if !updatedTime.Equal(newTime) { + t.Errorf("Updated time %v does not match expected time %v", updatedTime, newTime) + } +} + +func TestSyncTimeStorageWithConfig(t *testing.T) { + // Create a temporary test directory + testDir := t.TempDir() + + // Create a test plugin instance + plugin := &Plugin{} + + // Test configuration + config := &Config{ + ConnectionURI: "mongodb://localhost:27017", + Database: "testdb", + } + collectionName := "testcollection" + testTime := time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + + // Test updating last sync time + err := plugin.syncManager.UpdateLastSyncTime(datasourceID, collectionName, testTime, testTime) + if err != nil { + t.Fatalf("Failed to update last sync time: %v", err) + } + + // Test getting last sync time + retrievedTime := plugin.syncManager.GetLastSyncTime(datasourceID, collectionName) + if !retrievedTime.Equal(testTime) { + t.Errorf("Retrieved time %v does not match stored time %v", retrievedTime, testTime) + } +} + +func TestSyncTimeStorageNonExistent(t *testing.T) { + // Create a test plugin instance with sync manager + plugin := &Plugin{ + syncManager: NewSyncManager(), + } + + // Test retrieving non-existent sync time + datasourceID := "test_datasource" + collectionName := "test_collection" + retrievedTime := plugin.syncManager.GetLastSyncTime(datasourceID, collectionName) + + if !retrievedTime.IsZero() { + t.Errorf("Expected zero time for non-existent key, got %v", retrievedTime) + } +} + +func TestSyncTimeStorageInvalidData(t *testing.T) { + // Create a test plugin instance with sync manager + plugin := &Plugin{ + syncManager: NewSyncManager(), + } + + // Test retrieving from non-existent datasource/collection + datasourceID := "invalid_datasource" + collectionName := "invalid_collection" + retrievedTime := plugin.syncManager.GetLastSyncTime(datasourceID, collectionName) + + if !retrievedTime.IsZero() { + t.Errorf("Expected zero time for invalid datasource/collection, got %v", retrievedTime) + } +} + +func TestGetLatestTimestampFromBatch(t *testing.T) { + plugin := &Plugin{} + + // Create test documents with different timestamps + doc1 := &common.Document{ + Updated: &time.Time{}, + } + doc1.Updated = &time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC) + + doc2 := &common.Document{ + Updated: &time.Time{}, + } + doc2.Updated = &time.Date(2024, 1, 2, 12, 0, 0, 0, time.UTC) + + doc3 := &common.Document{ + Updated: &time.Time{}, + } + doc3.Updated = &time.Date(2024, 1, 3, 12, 0, 0, 0, time.UTC) + + documents := []*common.Document{doc1, doc2, doc3} + + // Test getting latest timestamp + latestTime := plugin.getLatestTimestampFromBatch(documents, "updated_at") + expectedTime := time.Date(2024, 1, 3, 12, 0, 0, 0, time.UTC) + + if !latestTime.Equal(expectedTime) { + t.Errorf("Expected latest time %v, got %v", expectedTime, latestTime) + } +} + +func TestGetLatestTimestampFromBatchWithNil(t *testing.T) { + plugin := &Plugin{} + + // Create test documents with some nil timestamps + doc1 := &common.Document{ + Updated: nil, + } + + doc2 := &common.Document{ + Updated: &time.Time{}, + } + doc2.Updated = &time.Date(2024, 1, 2, 12, 0, 0, 0, time.UTC) + + documents := []*common.Document{doc1, doc2} + + // Test getting latest timestamp + latestTime := plugin.getLatestTimestampFromBatch(documents, "updated_at") + expectedTime := time.Date(2024, 1, 2, 12, 0, 0, 0, time.UTC) + + if !latestTime.Equal(expectedTime) { + t.Errorf("Expected latest time %v, got %v", expectedTime, latestTime) + } +} + +func TestGetLatestTimestampFromBatchEmpty(t *testing.T) { + plugin := &Plugin{} + + // Test with empty documents slice + documents := []*common.Document{} + + latestTime := plugin.getLatestTimestampFromBatch(documents, "updated_at") + + if !latestTime.IsZero() { + t.Errorf("Expected zero time for empty documents, got %v", latestTime) + } +} diff --git a/plugins/connectors/mongodb/sync_strategy.go b/plugins/connectors/mongodb/sync_strategy.go new file mode 100644 index 00000000..6263c0f0 --- /dev/null +++ b/plugins/connectors/mongodb/sync_strategy.go @@ -0,0 +1,111 @@ +/* Copyright © INFINI LTD. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package mongodb + +import ( + "go.mongodb.org/mongo-driver/bson" + "time" + + log "github.com/cihub/seelog" +) + +// SyncStrategy defines the interface for different synchronization strategies +type SyncStrategy interface { + BuildFilter(config *Config, collConfig CollectionConfig, datasourceID string, syncManager *SyncManager) bson.M + ShouldUpdateSyncTime() bool + GetStrategyName() string +} + +// FullSyncStrategy implements full synchronization strategy +type FullSyncStrategy struct{} + +func (f *FullSyncStrategy) BuildFilter(config *Config, collConfig CollectionConfig, datasourceID string, syncManager *SyncManager) bson.M { + filter := bson.M{} + + // Copy base filter from collection configuration + for k, v := range collConfig.Filter { + filter[k] = v + } + + // Full sync strategy - no timestamp filtering, process all documents + log.Debugf("[mongodb connector] full sync strategy for collection [%s] - processing all documents", collConfig.Name) + return filter +} + +func (f *FullSyncStrategy) ShouldUpdateSyncTime() bool { + // Full sync doesn't need to track sync time + return false +} + +func (f *FullSyncStrategy) GetStrategyName() string { + return "full" +} + +// IncrementalSyncStrategy implements incremental synchronization strategy +type IncrementalSyncStrategy struct{} + +func (i *IncrementalSyncStrategy) BuildFilter(config *Config, collConfig CollectionConfig, datasourceID string, syncManager *SyncManager) bson.M { + filter := bson.M{} + + // Copy base filter from collection configuration + for k, v := range collConfig.Filter { + filter[k] = v + } + + // Add timestamp filter for incremental sync + if config.LastModifiedField != "" { + // Get last sync time from sync manager using datasource ID and collection name + lastSyncTime := syncManager.GetLastSyncTime(datasourceID, collConfig.Name) + if !lastSyncTime.IsZero() { + filter[config.LastModifiedField] = bson.M{"$gt": lastSyncTime} + log.Debugf("[mongodb connector] incremental sync for collection [%s] - filtering documents newer than %v", + collConfig.Name, lastSyncTime) + } else { + log.Debugf("[mongodb connector] incremental sync for collection [%s] - no previous sync time, processing all documents", + collConfig.Name) + } + } else { + log.Warnf("[mongodb connector] incremental sync strategy specified but LastModifiedField not configured for collection [%s]", + collConfig.Name) + } + + return filter +} + +func (i *IncrementalSyncStrategy) ShouldUpdateSyncTime() bool { + // Incremental sync needs to track sync time + return true +} + +func (i *IncrementalSyncStrategy) GetStrategyName() string { + return "incremental" +} + +// SyncStrategyFactory creates sync strategy instances +type SyncStrategyFactory struct{} + +// CreateStrategy creates a sync strategy based on the configuration +func (f *SyncStrategyFactory) CreateStrategy(strategyName string) SyncStrategy { + switch strategyName { + case "incremental": + return &IncrementalSyncStrategy{} + case "full": + fallthrough + default: + return &FullSyncStrategy{} + } +} + +// GetStrategyName returns the display name for logging purposes +func (f *SyncStrategyFactory) GetStrategyName(strategyName string) string { + switch strategyName { + case "incremental": + return "incremental" + case "full": + return "full" + default: + return "full (default)" + } +} diff --git a/plugins/connectors/mongodb/sync_strategy_test.go b/plugins/connectors/mongodb/sync_strategy_test.go new file mode 100644 index 00000000..83a5cca3 --- /dev/null +++ b/plugins/connectors/mongodb/sync_strategy_test.go @@ -0,0 +1,122 @@ +/* Copyright © INFINI LTD. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package mongodb + +import ( + "testing" +) + +func TestFullSyncStrategy(t *testing.T) { + strategy := &FullSyncStrategy{} + + config := &Config{} + collConfig := CollectionConfig{ + Filter: map[string]interface{}{ + "status": "published", + }, + } + datasourceID := "test_datasource" + syncManager := &SyncManager{} + + // Test filter building + filter := strategy.BuildFilter(config, collConfig, datasourceID, syncManager) + + // Should preserve base filter + if filter["status"] != "published" { + t.Errorf("Expected status filter to be preserved, got %v", filter["status"]) + } + + // Should not have timestamp filtering + if _, exists := filter["updated_at"]; exists { + t.Errorf("Expected no timestamp filtering for full sync strategy") + } + + // Test strategy properties + if strategy.ShouldUpdateSyncTime() { + t.Error("Expected full sync strategy to not update sync time") + } + + if strategy.GetStrategyName() != "full" { + t.Errorf("Expected strategy name to be 'full', got %s", strategy.GetStrategyName()) + } +} + +func TestIncrementalSyncStrategy(t *testing.T) { + strategy := &IncrementalSyncStrategy{} + + config := &Config{ + LastModifiedField: "updated_at", + } + collConfig := CollectionConfig{ + Filter: map[string]interface{}{ + "status": "published", + }, + } + datasourceID := "test_datasource" + syncManager := &SyncManager{} + + // Test filter building + filter := strategy.BuildFilter(config, collConfig, datasourceID, syncManager) + + // Should preserve base filter + if filter["status"] != "published" { + t.Errorf("Expected status filter to be preserved, got %v", filter["status"]) + } + + // Should not have timestamp filtering initially (no previous sync time) + if _, exists := filter["updated_at"]; exists { + t.Errorf("Expected no timestamp filtering initially for incremental sync strategy") + } + + // Test strategy properties + if !strategy.ShouldUpdateSyncTime() { + t.Error("Expected incremental sync strategy to update sync time") + } + + if strategy.GetStrategyName() != "incremental" { + t.Errorf("Expected strategy name to be 'incremental', got %s", strategy.GetStrategyName()) + } +} + +func TestSyncStrategyFactory(t *testing.T) { + factory := &SyncStrategyFactory{} + + // Test full strategy creation + fullStrategy := factory.CreateStrategy("full") + if fullStrategy.GetStrategyName() != "full" { + t.Errorf("Expected full strategy, got %s", fullStrategy.GetStrategyName()) + } + + // Test incremental strategy creation + incStrategy := factory.CreateStrategy("incremental") + if incStrategy.GetStrategyName() != "incremental" { + t.Errorf("Expected incremental strategy, got %s", incStrategy.GetStrategyName()) + } + + // Test default strategy creation + defaultStrategy := factory.CreateStrategy("") + if defaultStrategy.GetStrategyName() != "full" { + t.Errorf("Expected default strategy to be full, got %s", defaultStrategy.GetStrategyName()) + } + + // Test invalid strategy creation + invalidStrategy := factory.CreateStrategy("invalid") + if invalidStrategy.GetStrategyName() != "full" { + t.Errorf("Expected invalid strategy to default to full, got %s", invalidStrategy.GetStrategyName()) + } + + // Test strategy name display + if factory.GetStrategyName("full") != "full" { + t.Errorf("Expected strategy name 'full', got %s", factory.GetStrategyName("full")) + } + + if factory.GetStrategyName("incremental") != "incremental" { + t.Errorf("Expected strategy name 'incremental', got %s", factory.GetStrategyName("incremental")) + } + + if factory.GetStrategyName("") != "full (default)" { + t.Errorf("Expected strategy name 'full (default)', got %s", factory.GetStrategyName("")) + } +} diff --git a/plugins/connectors/mongodb/transformer.go b/plugins/connectors/mongodb/transformer.go new file mode 100644 index 00000000..910ded10 --- /dev/null +++ b/plugins/connectors/mongodb/transformer.go @@ -0,0 +1,173 @@ +/* Copyright © INFINI LTD. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package mongodb + +import ( + "context" + "fmt" + + log "github.com/cihub/seelog" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/bson/primitive" + "infini.sh/coco/modules/common" + "infini.sh/framework/core/global" + "infini.sh/framework/core/queue" + "infini.sh/framework/core/util" +) + +func (p *Plugin) processCursor(cursor *mongo.Cursor, collConfig CollectionConfig, datasource *common.DataSource) []*common.Document { + var documents []*common.Document + count := 0 + maxBatchSize := 1000 // Prevent memory overflow + + // Pre-allocate slice with capacity to reduce memory allocations + documents = make([]*common.Document, 0, maxBatchSize) + + for cursor.Next(context.Background()) && count < maxBatchSize { + if global.ShuttingDown() { + break + } + + var mongoDoc bson.M + if err := cursor.Decode(&mongoDoc); err != nil { + log.Warnf("[mongodb connector] decode document failed: %v", err) + continue + } + + doc, err := p.transformToDocument(mongoDoc, &collConfig, config) + if err != nil { + log.Warnf("[mongodb connector] transform document failed: %v", err) + continue + } + + documents = append(documents, doc) + count++ + } + + return documents +} + +// transformToDocument transforms a MongoDB document to a common Document +func (p *Plugin) transformToDocument(mongoDoc bson.M, collConfig *CollectionConfig, config *Config) (*common.Document, error) { + doc := &common.Document{} + + // Extract MongoDB ObjectID + objectID, ok := mongoDoc["_id"].(primitive.ObjectID) + if !ok { + // Try to get string ID if ObjectID is not available + if idStr, ok := mongoDoc["_id"].(string); ok { + doc.ID = idStr + } else { + doc.ID = fmt.Sprintf("%v", mongoDoc["_id"]) + } + } else { + doc.ID = objectID.Hex() + } + + // Apply field mapping configuration + p.applyFieldMapping(doc, mongoDoc, config) + + // Store original metadata + doc.Metadata = make(map[string]interface{}) + doc.Metadata["mongodb_collection"] = collConfig.Name + doc.Metadata["mongodb_id"] = objectID + doc.Metadata["raw_document"] = mongoDoc + + return doc, nil +} + +// applyFieldMapping applies field mapping configuration to the document +// This function handles all field mappings using the centralized FieldMapping configuration +func (p *Plugin) applyFieldMapping(doc *common.Document, mongoDoc bson.M, config *Config) { + if config.FieldMapping == nil || !config.FieldMapping.Enabled { + return + } + + // Apply standard field mappings + if config.FieldMapping.TitleField != "" { + if title, ok := mongoDoc[config.FieldMapping.TitleField]; ok { + doc.Title = p.safeConvertToString(title) + } + } + + if config.FieldMapping.ContentField != "" { + if content, ok := mongoDoc[config.FieldMapping.ContentField]; ok { + doc.Content = p.safeConvertToString(content) + } + } + + if config.FieldMapping.CategoryField != "" { + if category, ok := mongoDoc[config.FieldMapping.CategoryField]; ok { + doc.Category = p.safeConvertToString(category) + } + } + + // Handle tags + if config.FieldMapping.TagsField != "" { + if tags, ok := mongoDoc[config.FieldMapping.TagsField]; ok { + doc.Tags = p.convertToStringSlice(tags) + } + } + + // Handle URL + if config.FieldMapping.URLField != "" { + if url, ok := mongoDoc[config.FieldMapping.URLField]; ok { + doc.URL = p.safeConvertToString(url) + } + } + + // Handle timestamp + if config.FieldMapping.TimestampField != "" { + if timestamp, ok := mongoDoc[config.FieldMapping.TimestampField]; ok { + if t := p.convertToTime(timestamp); t != nil { + doc.Updated = t + } + } + } + + // Apply custom field mappings from the mapping configuration + for targetField, sourceField := range config.FieldMapping.Mapping { + if sourceFieldStr, ok := sourceField.(string); ok { + if value, exists := mongoDoc[sourceFieldStr]; exists { + switch targetField { + case "id": + // Handle ID field specially + doc.ID = p.safeConvertToString(value) + case "title": + doc.Title = p.safeConvertToString(value) + case "content": + doc.Content = p.safeConvertToString(value) + case "category": + doc.Category = p.safeConvertToString(value) + case "tags": + doc.Tags = p.convertToStringSlice(value) + case "url": + doc.URL = p.safeConvertToString(value) + case "metadata": + // Handle metadata fields + if doc.Metadata == nil { + doc.Metadata = make(map[string]interface{}) + } + doc.Metadata[sourceFieldStr] = value + } + } + } + } +} + +func (p *Plugin) pushDocuments(documents []*common.Document) { + for _, doc := range documents { + if global.ShuttingDown() { + return + } + + data := util.MustToJSONBytes(doc) + if err := queue.Push(p.Queue, data); err != nil { + log.Errorf("[mongodb connector] failed to push document to queue: %v", err) + continue + } + } +} diff --git a/plugins/connectors/mongodb/utils.go b/plugins/connectors/mongodb/utils.go new file mode 100644 index 00000000..b09463f1 --- /dev/null +++ b/plugins/connectors/mongodb/utils.go @@ -0,0 +1,133 @@ +/* Copyright © INFINI LTD. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package mongodb + +import ( + "encoding/json" + "fmt" + "time" + + "go.mongodb.org/mongo-driver/bson/primitive" + "infini.sh/framework/core/global" +) + +func (p *Plugin) safeConvertToString(value interface{}) string { + if value == nil { + return "" + } + + switch v := value.(type) { + case string: + return v + case primitive.ObjectID: + return v.Hex() + case int, int32, int64: + return fmt.Sprintf("%d", v) + case float32, float64: + return fmt.Sprintf("%f", v) + case bool: + return fmt.Sprintf("%t", v) + case time.Time: + return v.Format(time.RFC3339) + case primitive.DateTime: + return v.Time().Format(time.RFC3339) + case primitive.Timestamp: + return time.Unix(int64(v.T), 0).Format(time.RFC3339) + case []interface{}: + // Convert array to JSON string + if jsonBytes, err := json.Marshal(v); err == nil { + return string(jsonBytes) + } + return fmt.Sprintf("%v", v) + case map[string]interface{}: + // Convert object to JSON string + if jsonBytes, err := json.Marshal(v); err == nil { + return string(jsonBytes) + } + return fmt.Sprintf("%v", v) + default: + // Try JSON serialization as fallback + if jsonBytes, err := json.Marshal(v); err == nil { + return string(jsonBytes) + } + return fmt.Sprintf("%v", v) + } +} + +func (p *Plugin) convertToStringSlice(value interface{}) []string { + if value == nil { + return nil + } + + switch v := value.(type) { + case []string: + return v + case []interface{}: + var result []string + for _, item := range v { + result = append(result, p.safeConvertToString(item)) + } + return result + case string: + // If it's a single string, treat as one tag + return []string{v} + default: + // Convert to string and treat as single tag + return []string{p.safeConvertToString(v)} + } +} + +func (p *Plugin) convertToTime(value interface{}) *time.Time { + if value == nil { + return nil + } + + switch v := value.(type) { + case time.Time: + return &v + case primitive.DateTime: + t := v.Time() + return &t + case primitive.Timestamp: + t := time.Unix(int64(v.T), 0) + return &t + case int64: + // Unix timestamp + t := time.Unix(v, 0) + return &t + case string: + // Try to parse various time formats + formats := []string{ + time.RFC3339, + time.RFC3339Nano, + "2006-01-02T15:04:05Z", + "2006-01-02 15:04:05", + "2006-01-02", + } + for _, format := range formats { + if t, err := time.Parse(format, v); err == nil { + return &t + } + } + } + + return nil +} + +func (p *Plugin) shouldStop() bool { + p.mu.RLock() + defer p.mu.RUnlock() + + if p.ctx == nil { + return true + } + + select { + case <-p.ctx.Done(): + return true + default: + return global.ShuttingDown() + } +} diff --git a/web/src/components/datasource/type/index.jsx b/web/src/components/datasource/type/index.jsx index 44b044de..f20b02ac 100644 --- a/web/src/components/datasource/type/index.jsx +++ b/web/src/components/datasource/type/index.jsx @@ -17,6 +17,10 @@ export const Types = { RSS: 'rss', S3: 's3', Yuque: 'yuque', + S3: 's3', + Confluence: 'confluence', + NetworkDrive: 'network_drive', + MongoDB: 'mongodb', Postgresql: 'postgresql', Mysql: 'mysql', GitHub: 'github' @@ -148,6 +152,13 @@ export const TypeList = ({ text="Notion" onChange={onItemClick} /> + {/* */} @@ -189,6 +200,14 @@ export const TypeList = ({ {/* */} )} + {v.id === Types.MongoDB && ( +
+
MongoDB Configuration
+
+ Configure MongoDB connection and collection settings in the next step. +
+
+ )} ); }; diff --git a/web/src/pages/data-source/edit/[id].tsx b/web/src/pages/data-source/edit/[id].tsx index 3fb784ae..9fde0085 100644 --- a/web/src/pages/data-source/edit/[id].tsx +++ b/web/src/pages/data-source/edit/[id].tsx @@ -21,13 +21,14 @@ import Confluence from '../new/confluence'; import GitHub from '../new/github'; import HugoSite from '../new/hugo_site'; import LocalFS from '../new/local_fs'; -import { GithubConfig, NetworkDriveConfig, RdbmsConfig } from '../new/models'; +import { GithubConfig, NetworkDriveConfig, RdbmsConfig, MongoDBConfig } from '../new/models'; import NetworkDrive from '../new/network_drive'; import Notion from '../new/notion'; import Rdbms from '../new/rdbms'; import Rss from '../new/rss'; import S3 from '../new/s3'; import Yuque from '../new/yuque'; +import MongoDB from '../new/mongodb'; // eslint-disable-next-line complexity export function Component() { @@ -256,6 +257,7 @@ export function Component() { break; case Types.GoogleDrive: break; + case Types.S3: if (datasource.connector?.config) { datasource.config = { @@ -297,6 +299,12 @@ export function Component() { } break; } + case Types.MongoDB: { + if (datasource.connector?.config) { + datasource.config = MongoDBConfig(datasource.connector); + } + break; + } case Types.Postgresql: case Types.Mysql: { if (datasource.connector?.config) { @@ -365,6 +373,7 @@ export function Component() { {type === Types.S3 && } {type === Types.Confluence && } {type === Types.NetworkDrive && } + {type === Types.MongoDB && } {type === Types.Postgresql && } {type === Types.Mysql && } {type === Types.GitHub && } diff --git a/web/src/pages/data-source/new/index.tsx b/web/src/pages/data-source/new/index.tsx index a8b9bda0..003541fc 100644 --- a/web/src/pages/data-source/new/index.tsx +++ b/web/src/pages/data-source/new/index.tsx @@ -19,13 +19,14 @@ import GitHub from './github'; import GoogleDrive from './google_drive'; import HugoSite from './hugo_site'; import LocalFS from './local_fs'; -import { GithubConfig, NetworkDriveConfig, RdbmsConfig } from './models'; +import { GithubConfig, NetworkDriveConfig, RdbmsConfig, MongoDBConfig } from './models'; import NetworkDrive from './network_drive'; import Notion from './notion'; import Rdbms from './rdbms'; import Rss from './rss'; import S3 from './s3'; import Yuque from './yuque'; +import MongoDB from './mongodb'; // eslint-disable-next-line complexity export function Component() { @@ -112,7 +113,10 @@ export function Component() { case Types.NetworkDrive: connectorType = 'Network Drive'; break; - case Types.Postgresql: + case Types.MongoDB: + connectorType = 'MongoDB'; + break; + case Types.Postgresql: connectorType = 'Postgresql'; break; case Types.Mysql: @@ -223,6 +227,10 @@ export function Component() { config = NetworkDriveConfig(values); break; } + case Types.MongoDB: { + config = MongoDBConfig(values); + break; + } case Types.Postgresql: { config = RdbmsConfig(values); break; @@ -354,6 +362,7 @@ export function Component() { {type === Types.S3 && } {type === Types.Confluence && } {type === Types.NetworkDrive && } + {type === Types.MongoDB && } {type === Types.Postgresql && } {type === Types.Mysql && } {type === Types.GitHub && } diff --git a/web/src/pages/data-source/new/models.ts b/web/src/pages/data-source/new/models.ts index cb68ca54..e9db2189 100644 --- a/web/src/pages/data-source/new/models.ts +++ b/web/src/pages/data-source/new/models.ts @@ -30,6 +30,29 @@ export const RdbmsConfig = (values: any) => { }; }; +export const MongoDBConfig = (values: any) => { + // 首先获取RdbmsConfig的基础配置,确保兼容性 + const baseConfig = RdbmsConfig(values); + + // 然后添加MongoDB特有的配置参数 + return { + ...baseConfig, // 包含RdbmsConfig的所有基础参数 + // MongoDB特有的连接参数 + database: values.config?.database || '', + auth_database: values.config?.auth_database || 'admin', + cluster_type: values.config?.cluster_type || 'standalone', + collections: values.config?.collections || [], + // MongoDB特有的性能优化参数 + batch_size: values.config?.batch_size || 1000, + max_pool_size: values.config?.max_pool_size || 10, + timeout: values.config?.timeout || '30s', + sync_strategy: values.config?.sync_strategy || 'full', + // MongoDB特有的查询优化参数 + enable_projection: values.config?.enable_projection !== false, + enable_index_hint: values.config?.enable_index_hint !== false + }; +}; + export const GithubConfig = (values: any) => { return { index_issues: values.config?.index_issues, diff --git a/web/src/pages/data-source/new/mongodb.tsx b/web/src/pages/data-source/new/mongodb.tsx new file mode 100644 index 00000000..8d8a0ff5 --- /dev/null +++ b/web/src/pages/data-source/new/mongodb.tsx @@ -0,0 +1,331 @@ +import { Button, Form, Input, InputNumber, Select, Space, Switch } from 'antd'; +import React, { useState } from 'react'; +import { useTranslation } from 'react-i18next'; +import { MinusCircleOutlined, PlusOutlined } from '@ant-design/icons'; + +import { FieldMapping } from '../modules/FieldMapping'; + +const { Option } = Select; + +export default function MongoDB() { + const { t } = useTranslation(); + const [showAdvanced, setShowAdvanced] = useState(false); + + return ( + <> + {/* 基本连接配置 */} + + + + + {/* 数据库名称 */} + + + + + {/* 认证数据库 */} + + + + + {/* 集群类型 */} + + + + + {/* 集合配置 */} + + + {(fields, { add, remove }) => ( + <> + {fields.map(({ key, name, ...restField }) => ( +
+ + + + + + + + + + remove(name)} + /> + + + {/* 字段映射配置 */} +
+ + + + + + + + + + + + + + + + + + + + + + + +
+
+ ))} + + + + + + )} +
+
+ + {/* 分页配置 */} + + + + + + prevValues.config?.pagination !== currentValues.config?.pagination + } + > + {({ getFieldValue }) => + getFieldValue(['config', 'pagination']) ? ( + + + + ) : null + } + + + {/* 最后修改字段 */} + + + + + {/* 高级配置 */} + + setShowAdvanced(checked)} /> + + + {showAdvanced && ( +
+ + + + + + + + + + + + + + + +
+ )} + + {/* 字段映射 */} + + + + + + prevValues.config?.field_mapping?.enabled !== currentValues.config?.field_mapping?.enabled + } + > + {({ getFieldValue }) => } + + + ); +}