Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ abstract class KafkaSourceTest extends StreamTest with SharedSparkSession with K

val sources: Seq[SparkDataStream] = {
query.get.logicalPlan.collect {
case StreamingExecutionRelation(source: KafkaSource, _, _) => source
case StreamingExecutionRelation(source: KafkaSource, _, _, _) => source
case r: StreamingDataSourceV2ScanRelation
if r.stream.isInstanceOf[KafkaMicroBatchStream] ||
r.stream.isInstanceOf[KafkaContinuousStream] =>
Expand Down Expand Up @@ -857,7 +857,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase with
AssertOnQuery { q =>
val latestOffset: Option[(Long, OffsetSeq)] = q.offsetLog.getLatest()
latestOffset.exists { offset =>
!offset._2.offsets.exists(_.exists(_.json == "{}"))
!offset._2.offsets.values.exists(_.json == "{}")
}
}
)
Expand Down Expand Up @@ -1616,7 +1616,7 @@ abstract class KafkaMicroBatchV1SourceSuite extends KafkaMicroBatchSourceSuiteBa
makeSureGetOffsetCalled,
AssertOnQuery { query =>
query.logicalPlan.collectFirst {
case StreamingExecutionRelation(_: KafkaSource, _, _) => true
case StreamingExecutionRelation(_: KafkaSource, _, _, _) => true
}.nonEmpty
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ class KafkaSourceOffsetSuite extends OffsetSuite with SharedSparkSession {
val batch0 = OffsetSeq.fill(kso1)
val batch1 = OffsetSeq.fill(kso2, kso3)

val batch0Serialized = OffsetSeq.fill(batch0.offsets.flatMap(_.map(o =>
SerializedOffset(o.json))): _*)
val batch0Serialized = OffsetSeq.fill(batch0.offsets.values.map(o =>
SerializedOffset(o.json)).toSeq: _*)

val batch1Serialized = OffsetSeq.fill(batch1.offsets.flatMap(_.map(o =>
SerializedOffset(o.json))): _*)
val batch1Serialized = OffsetSeq.fill(batch1.offsets.values.map(o =>
SerializedOffset(o.json)).toSeq: _*)

assert(metadataLog.add(0, batch0))
assert(metadataLog.getLatest() === Some(0 -> batch0Serialized))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,19 @@ abstract class DataStreamReader {
*/
def format(source: String): this.type

/**
* Assigns a name to this streaming source for source evolution capability.
* When sources are named, they can be added, removed, or reordered without
* losing checkpoint state, enabling query evolution.
*
* If not specified, sources are automatically assigned ordinal names ("0", "1", "2", etc.)
* based on their position in the query, which maintains backward compatibility.
*
* @param sourceName The unique name for this source (alphanumeric, underscore, hyphen only)
* @since 4.1.0
*/
def name(sourceName: String): this.type

/**
* Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema
* automatically from data. By specifying the schema here, the underlying data source can skip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ case class StreamingRelationV2(
output: Seq[AttributeReference],
catalog: Option[CatalogPlugin],
identifier: Option[Identifier],
v1Relation: Option[LogicalPlan])
v1Relation: Option[LogicalPlan],
userProvidedName: Option[String] = None)
extends LeafNode with MultiInstanceRelation with ExposesMetadataColumns {
override lazy val resolved = v1Relation.forall(_.resolved)
override def isStreaming: Boolean = true
Expand All @@ -59,7 +60,7 @@ case class StreamingRelationV2(
val newMetadata = metadataOutput.filterNot(outputSet.contains)
if (newMetadata.nonEmpty) {
StreamingRelationV2(source, sourceName, table, extraOptions,
output ++ newMetadata, catalog, identifier, v1Relation)
output ++ newMetadata, catalog, identifier, v1Relation, userProvidedName)
} else {
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,14 @@ case class StreamingDataSourceV2Relation(
catalog: Option[CatalogPlugin],
identifier: Option[Identifier],
options: CaseInsensitiveStringMap,
metadataPath: String)
metadataPath: String,
userProvidedName: Option[String] = None)
extends DataSourceV2RelationBase(table, output, catalog, identifier, options) {

override def isStreaming: Boolean = true

override def newInstance(): StreamingDataSourceV2Relation = {
copy(output = output.map(_.newInstance()))
copy(output = output.map(_.newInstance()), userProvidedName = userProvidedName)
}
}
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ final class DataStreamReader private[sql] (sparkSession: SparkSession)
this
}

def name(sourceName: String): this.type =
this

/** @inheritdoc */
override def schema(schemaString: String): this.type = {
sourceBuilder.setSchema(schemaString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case UnresolvedDataSource(source, userSpecifiedSchema, extraOptions, false, paths) =>
case UnresolvedDataSource(
source, userSpecifiedSchema, extraOptions, false, paths, userProvidedName) =>
// Batch data source created from DataFrameReader
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw QueryCompilationErrors.cannotOperateOnHiveDataSourceFilesError("read")
Expand All @@ -60,7 +61,8 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
source, paths: _*)
}.getOrElse(loadV1BatchSource(source, userSpecifiedSchema, extraOptions, paths: _*))

case UnresolvedDataSource(source, userSpecifiedSchema, extraOptions, true, paths) =>
case UnresolvedDataSource(
source, userSpecifiedSchema, extraOptions, true, paths, userProvidedName) =>
// Streaming data source created from DataStreamReader
if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) {
throw QueryCompilationErrors.cannotOperateOnHiveDataSourceFilesError("read")
Expand All @@ -83,7 +85,7 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
className = source,
options = optionsWithPath.originalMap)
val v1Relation = ds match {
case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource))
case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource, userProvidedName))
case _ => None
}
ds match {
Expand All @@ -107,16 +109,16 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
StreamingRelationV2(
Some(provider), source, table, dsOptions,
toAttributes(table.columns.asSchema), None, None, v1Relation)
toAttributes(table.columns.asSchema), None, None, v1Relation, userProvidedName)

// fallback to v1
// TODO (SPARK-27483): we should move this fallback logic to an analyzer rule.
case _ => StreamingRelation(v1DataSource)
case _ => StreamingRelation(v1DataSource, userProvidedName)
}

case _ =>
// Code path for data source v1.
StreamingRelation(v1DataSource)
StreamingRelation(v1DataSource, userProvidedName)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ case class UnresolvedDataSource(
userSpecifiedSchema: Option[StructType],
options: CaseInsensitiveMap[String],
override val isStreaming: Boolean,
paths: Seq[String])
paths: Seq[String],
userProvidedName: Option[String] = None)
extends UnresolvedLeafNode {

override def simpleString(maxFields: Int): String = toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.classic.ClassicConversions._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.json.JsonUtils.checkJsonSchema
import org.apache.spark.sql.execution.datasources.xml.XmlUtils.checkXmlSchema
import org.apache.spark.sql.execution.streaming.checkpointing.OffsetSeq
import org.apache.spark.sql.streaming
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand All @@ -46,6 +47,13 @@ final class DataStreamReader private[sql](sparkSession: SparkSession)
this
}

/** @inheritdoc */
override def name(sourceName: String): this.type = {
OffsetSeq.validateSourceName(sourceName)
this.userSpecifiedName = Some(sourceName)
this
}

/** @inheritdoc */
def schema(schema: StructType): this.type = {
if (schema != null) {
Expand Down Expand Up @@ -76,7 +84,8 @@ final class DataStreamReader private[sql](sparkSession: SparkSession)
userSpecifiedSchema,
extraOptions,
isStreaming = true,
path.toSeq
path.toSeq,
userSpecifiedName
)
Dataset.ofRows(sparkSession, unresolved)
}
Expand Down Expand Up @@ -160,5 +169,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession)

private var userSpecifiedSchema: Option[StructType] = None

private var userSpecifiedName: Option[String] = None

private var extraOptions = CaseInsensitiveMap[String](Map.empty)
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
result

case s @ StreamingRelationV2(
_, _, table, extraOptions, _, _, _, Some(UnresolvedCatalogRelation(tableMeta, _, true))) =>
_, _, table, extraOptions,
_, _, _, Some(UnresolvedCatalogRelation(tableMeta, _, true)), _) =>
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
val v1Relation = getStreamingRelation(tableMeta, extraOptions)
if (table.isInstanceOf[SupportsRead]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,44 +34,142 @@ import org.apache.spark.sql.internal.SQLConf._


/**
* An ordered collection of offsets, used to track the progress of processing data from one or more
* A collection of named offsets, used to track the progress of processing data from one or more
* [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance
* vector clock that must progress linearly forward.
*
* All sources are tracked by name - either user-provided names or auto-generated names
* from sourceId.
*/
case class OffsetSeq(offsets: Seq[Option[OffsetV2]], metadata: Option[OffsetSeqMetadata] = None) {
case class OffsetSeq(
offsets: Map[String, OffsetV2],
metadata: Option[OffsetSeqMetadata] = None) {

/**
* Unpacks an offset into [[StreamProgress]] by associating each offset with the ordered list of
* sources.
* Unpacks offsets into [[StreamProgress]] by associating each named offset with the
* corresponding source from the ordered list.
*
* This method is typically used to associate a serialized offset with actual sources (which
* cannot be serialized).
* @deprecated Use toStreamProgress(namedSources: Map[String, SparkDataStream]) instead
*/
@deprecated("Use toStreamProgress with named sources map", "3.6.0")
def toStreamProgress(sources: Seq[SparkDataStream]): StreamProgress = {
assert(sources.size == offsets.size, s"There are [${offsets.size}] sources in the " +
s"checkpoint offsets and now there are [${sources.size}] sources requested by the query. " +
s"Cannot continue.")
new StreamProgress ++ sources.zip(offsets).collect { case (s, Some(o)) => (s, o) }
val sourceOffsetPairs = sources.zipWithIndex.flatMap { case (source, index) =>
offsets.get(index.toString).map(offset => (source, offset))
}
new StreamProgress ++ sourceOffsetPairs
}

/**
* Unpacks named offsets into [[StreamProgress]] by associating each named offset with the
* corresponding source.
*
* @param namedSources Map from source name to SparkDataStream
* @return The StreamProgress containing the offset mapping
*/
def toStreamProgress(namedSources: Map[String, SparkDataStream]): StreamProgress = {
// Map existing checkpoint offsets to current sources
val existingSourceOffsets = offsets.flatMap { case (name, offset) =>
namedSources.get(name) match {
case Some(source) => Some((source, offset))
case None =>
// Source from checkpoint no longer exists in current query - ignore
None
}
}

// For source evolution: identify new sources not in checkpoint and start them from beginning
val newSources = namedSources.values.filterNot { source =>
offsets.exists { case (name, _) => namedSources.get(name).contains(source) }
}

// New sources start with null offset (beginning)
val newSourceOffsets = newSources.map(source => (source, null.asInstanceOf[OffsetV2]))

new StreamProgress ++ (existingSourceOffsets ++ newSourceOffsets)
}

override def toString: String =
offsets.map(_.map(_.json).getOrElse("-")).mkString("[", ", ", "]")
/**
* Returns the named offsets map.
*/
def getNamedOffsets: Map[String, OffsetV2] = offsets

override def toString: String = {
offsets.map { case (name, offset) => s"$name: ${offset.json}" }.mkString("{", ", ", "}")
}
}

object OffsetSeq {

/**
* Returns a [[OffsetSeq]] with a variable sequence of offsets.
* `nulls` in the sequence are converted to `None`s.
* @deprecated Use fillNamed instead for explicit source naming
*/
def fill(offsets: OffsetV2*): OffsetSeq = OffsetSeq.fill(None, offsets: _*)
@deprecated("Use fillNamed with explicit source names", "3.6.0")
def fill(offsets: OffsetV2*): OffsetSeq = fill(None, offsets: _*)

/**
* Returns a [[OffsetSeq]] with metadata and a variable sequence of offsets.
* `nulls` in the sequence are converted to `None`s.
* @deprecated Use fillNamed instead for explicit source naming
*/
@deprecated("Use fillNamed with explicit source names", "3.6.0")
def fill(metadata: Option[String], offsets: OffsetV2*): OffsetSeq = {
OffsetSeq(offsets.map(Option(_)), metadata.map(OffsetSeqMetadata.apply))
val namedOffsets = offsets.zipWithIndex.filter(_._1 != null).map {
case (offset, index) => index.toString -> offset
}.toMap
OffsetSeq(namedOffsets, metadata.map(OffsetSeqMetadata.apply))
}

/**
* Creates an OffsetSeq with named sources and metadata.
*
* @param metadata Optional metadata string
* @param namedOffsets Map from source name to offset
* @return OffsetSeq with named sources
*/
def fillNamed(metadata: Option[String], namedOffsets: Map[String, OffsetV2]): OffsetSeq = {
OffsetSeq(namedOffsets, metadata.map(OffsetSeqMetadata.apply))
}

/**
* Creates an OffsetSeq with named sources.
* This is used for new V2 format checkpoints.
*
* @param namedOffsets Map from source name to offset
* @return OffsetSeq with named sources
*/
def fillNamed(namedOffsets: Map[String, OffsetV2]): OffsetSeq = {
fillNamed(None, namedOffsets)
}

/**
* Creates an OffsetSeq from source names, automatically generating ordinal names
* for sources without explicit names.
*
* @param metadata Optional metadata string
* @param sources Sequence of (sourceName, offset) pairs where sourceName may be None
* @return OffsetSeq with appropriate naming
*/
def fromSources(metadata: Option[String], sources: Seq[(Option[String], OffsetV2)]): OffsetSeq = {
val namedSources = sources.zipWithIndex.collect {
case ((Some(name), offset), _) => name -> offset
case ((None, offset), index) => index.toString -> offset
}.toMap

fillNamed(metadata, namedSources)
}

/**
* Validates a source name according to the naming rules.
*
* @param name The source name to validate
* @throws IllegalArgumentException if the name is invalid
*/
def validateSourceName(name: String): Unit = {
require(name != null && name.nonEmpty, "Source name cannot be null or empty")
require(name.length <= 64, s"Source name '$name' exceeds maximum length of 64 characters")
require(name.matches("^[a-zA-Z0-9_-]+$"),
s"Source name '$name' contains invalid characters. Only alphanumeric, underscore, " +
"and hyphen characters are allowed")
}
}

Expand Down
Loading