1,org.apache.flink.streaming.api.operators; AbstractStreamOperator public void processWatermark1(Watermark mark) throws Exception { input1Watermark = mark.getTimestamp(); long newMin = Math.min(input1Watermark, input2Watermark); if (newMin > combinedWatermark) { combinedWatermark = newMin; processWatermark(new Watermark(combinedWatermark)); }} public void processWatermark2(Watermark mark) throws Exception { input2Watermark = mark.getTimestamp(); long newMin = Math.min(input1Watermark, input2Watermark); if (newMin > combinedWatermark) { combinedWatermark = newMin; processWatermark(new Watermark(combinedWatermark)); }} 2,http://vinoyang.com/2016/10/29/flink-streaming-window-operator-analysis/ 3, kakfa中多个partition提取 watermark
private static class PeriodicWatermarkEmitter<KPH> implements ProcessingTimeCallback
public void onProcessingTime(long timestamp) throws Exception { long minAcrossAll = Long.MAX_VALUE; boolean isEffectiveMinAggregation = false; for (KafkaTopicPartitionState<?> state : allPartitions) { // we access the current watermark for the periodic assigners under the state // lock, to prevent concurrent modification to any internal variables final long curr; //noinspection SynchronizationOnLocalVariableOrMethodParameter synchronized (state) { curr = ((KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>) state).getCurrentWatermarkTimestamp(); } minAcrossAll = Math.min(minAcrossAll, curr); isEffectiveMinAggregation = true; } // emit next watermark, if there is one if (isEffectiveMinAggregation && minAcrossAll > lastWatermarkTimestamp) { lastWatermarkTimestamp = minAcrossAll; emitter.emitWatermark(new Watermark(minAcrossAll)); } // schedule the next watermark timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);}