@PublicEvolving
public class DebeziumSourceFunction<T>
extends org.apache.flink.streaming.api.functions.source.RichSourceFunction<T>
implements org.apache.flink.streaming.api.checkpoint.CheckpointedFunction, org.apache.flink.api.common.state.CheckpointListener, org.apache.flink.api.java.typeutils.ResultTypeQueryable<T>
DebeziumSourceFunction is a streaming data source that pulls captured change data
from databases into Flink.
There are two workers during the runtime. One worker periodically pulls records from the
database and pushes the records into the Handover. The other worker consumes the records
from the Handover and convert the records to the data in Flink style. The reason why
don't use one workers is because debezium has different behaviours in snapshot phase and
streaming phase.
Here we use the Handover as the buffer to submit data from the producer to the
consumer. Because the two threads don't communicate to each other directly, the error reporting
also relies on Handover. When the engine gets errors, the engine uses the DebeziumEngine.CompletionCallback to report errors to the Handover and wakes up the
consumer to check the error. However, the source function just closes the engine and wakes up the
producer if the error is from the Flink side.
If the execution is canceled or finish(only snapshot phase), the exit logic is as same as the logic in the error reporting.
The source function participates in checkpointing and guarantees that no data is lost during a failure, and that the computation processes elements "exactly once".
Note: currently, the source function can't run in multiple parallel instances.
Please refer to Debezium's documentation for the available configuration properties: https://debezium.io/documentation/reference/1.5/development/engine.html#engine-properties
| Modifier and Type | Field and Description |
|---|---|
static String |
HISTORY_RECORDS_STATE_NAME
State name of the consumer's history records state.
|
static String |
LEGACY_IMPLEMENTATION_KEY
The configuration represents the Debezium MySQL Connector uses the legacy implementation or
not.
|
static String |
LEGACY_IMPLEMENTATION_VALUE
The configuration value represents legacy implementation.
|
protected static org.slf4j.Logger |
LOG |
static int |
MAX_NUM_PENDING_CHECKPOINTS
The maximum number of pending non-committed checkpoints to track, to avoid memory leaks.
|
static String |
OFFSETS_STATE_NAME
State name of the consumer's partition offset states.
|
| Constructor and Description |
|---|
DebeziumSourceFunction(DebeziumDeserializationSchema<T> deserializer,
Properties properties,
DebeziumOffset specificOffset,
Validator validator) |
| Modifier and Type | Method and Description |
|---|---|
void |
cancel() |
void |
close() |
boolean |
getDebeziumStarted() |
String |
getEngineInstanceName() |
org.apache.commons.collections.map.LinkedMap |
getPendingOffsetsToCommit() |
org.apache.flink.api.common.typeinfo.TypeInformation<T> |
getProducedType() |
void |
initializeState(org.apache.flink.runtime.state.FunctionInitializationContext context) |
void |
notifyCheckpointComplete(long checkpointId) |
void |
open(org.apache.flink.configuration.Configuration parameters) |
void |
run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<T> sourceContext) |
void |
snapshotState(org.apache.flink.runtime.state.FunctionSnapshotContext functionSnapshotContext) |
getIterationRuntimeContext, getRuntimeContext, setRuntimeContextprotected static final org.slf4j.Logger LOG
public static final String OFFSETS_STATE_NAME
public static final String HISTORY_RECORDS_STATE_NAME
public static final int MAX_NUM_PENDING_CHECKPOINTS
public static final String LEGACY_IMPLEMENTATION_KEY
public static final String LEGACY_IMPLEMENTATION_VALUE
public DebeziumSourceFunction(DebeziumDeserializationSchema<T> deserializer, Properties properties, @Nullable DebeziumOffset specificOffset, Validator validator)
public void open(org.apache.flink.configuration.Configuration parameters)
throws Exception
open in interface org.apache.flink.api.common.functions.RichFunctionopen in class org.apache.flink.api.common.functions.AbstractRichFunctionExceptionpublic void initializeState(org.apache.flink.runtime.state.FunctionInitializationContext context)
throws Exception
initializeState in interface org.apache.flink.streaming.api.checkpoint.CheckpointedFunctionExceptionpublic void snapshotState(org.apache.flink.runtime.state.FunctionSnapshotContext functionSnapshotContext)
throws Exception
snapshotState in interface org.apache.flink.streaming.api.checkpoint.CheckpointedFunctionExceptionpublic void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<T> sourceContext) throws Exception
public void notifyCheckpointComplete(long checkpointId)
notifyCheckpointComplete in interface org.apache.flink.api.common.state.CheckpointListenerpublic void cancel()
cancel in interface org.apache.flink.streaming.api.functions.source.SourceFunction<T>public void close()
throws Exception
close in interface org.apache.flink.api.common.functions.RichFunctionclose in class org.apache.flink.api.common.functions.AbstractRichFunctionExceptionpublic org.apache.flink.api.common.typeinfo.TypeInformation<T> getProducedType()
getProducedType in interface org.apache.flink.api.java.typeutils.ResultTypeQueryable<T>@VisibleForTesting public org.apache.commons.collections.map.LinkedMap getPendingOffsetsToCommit()
@VisibleForTesting public boolean getDebeziumStarted()
@VisibleForTesting public String getEngineInstanceName()
Copyright © 2022 The Apache Software Foundation. All rights reserved.