多个inputstream的情况下,watermark的值怎么赋值? kakfa中多个partition提取 watermark

2018-06-01 来源: 暖风的风 发布在  https://www.cnblogs.com/WCFGROUP/p/9121401.html

1,org.apache.flink.streaming.api.operators; AbstractStreamOperator

public void processWatermark1(Watermark mark) throws Exception {   input1Watermark = mark.getTimestamp();   long newMin = Math.min(input1Watermark, input2Watermark);   if (newMin > combinedWatermark) {      combinedWatermark = newMin;      processWatermark(new Watermark(combinedWatermark));   }}

public void processWatermark2(Watermark mark) throws Exception {   input2Watermark = mark.getTimestamp();   long newMin = Math.min(input1Watermark, input2Watermark);   if (newMin > combinedWatermark) {      combinedWatermark = newMin;      processWatermark(new Watermark(combinedWatermark));   }}

2,http://vinoyang.com/2016/10/29/flink-streaming-window-operator-analysis/

3, kakfa中多个partition提取 watermark
private static class PeriodicWatermarkEmitter<KPH> implements ProcessingTimeCallback 
public void onProcessingTime(long timestamp) throws Exception {

   long minAcrossAll = Long.MAX_VALUE;   boolean isEffectiveMinAggregation = false;   for (KafkaTopicPartitionState<?> state : allPartitions) {

      // we access the current watermark for the periodic assigners under the state      // lock, to prevent concurrent modification to any internal variables      final long curr;      //noinspection SynchronizationOnLocalVariableOrMethodParameter      synchronized (state) {         curr = ((KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>) state).getCurrentWatermarkTimestamp();      }

      minAcrossAll = Math.min(minAcrossAll, curr);      isEffectiveMinAggregation = true;   }

   // emit next watermark, if there is one   if (isEffectiveMinAggregation && minAcrossAll > lastWatermarkTimestamp) {      lastWatermarkTimestamp = minAcrossAll;      emitter.emitWatermark(new Watermark(minAcrossAll));   }

   // schedule the next watermark   timerService.registerTimer(timerService.getCurrentProcessingTime() + interval, this);}

相关文章