image frame

无论走到哪里,
都应该记住,
过去都是假的,
回忆是一条没有尽头的路。
一切以往的春天都不复存在,
就连那最坚韧而又狂乱的爱情,
归根结底也不过是转瞬即逝的现实,
唯有孤独永恒。

——加西亚·马尔克斯

ElasticSearch Sink 源码

Maven 引用

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch5_2.11</artifactId>
<version>1.5.0</version>
</dependency>

Java 调用方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
List<InetSocketAddress> transportAddresses = new ArrayList<>();
try {
String clusterNodes = YmlReader.getValue("es.cluster.nodes");
for( String clusterNode : clusterNodes.split( "," )) {
String hostName = clusterNode.substring( 0,clusterNode.indexOf( ":" ) );
String port = clusterNode.substring( clusterNode.indexOf( ":" )+1, clusterNode.length());
log.info( "adding transport node : " + clusterNode );
transportAddresses.add(new InetSocketAddress( InetAddress.getByName(hostName), Integer.valueOf(port)));
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}

Map<String, String> userConfig;
userConfig = new HashMap<>(7);
userConfig.put("cluster.name", YmlReader.getValue("es.cluster.name"));
userConfig.put("bulk.flush.max.actions", "1000");
userConfig.put("bulk.flush.max.size.mb", "4");
userConfig.put("bulk.flush.interval.ms", "60000");

ActionRequestFailureHandler failureHandler = (ActionRequestFailureHandler) (action, failure, i, indexer) -> {
if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
// 请求数据被拒绝,添加到队列重试
indexer.add(new ActionRequest[]{action});
log.error("es reject, action: {}", action);
} else if (ExceptionUtils.findThrowable(failure, VersionConflictEngineException.class).isPresent()) {
// 版本冲突
log.info("version conflict, action: {}", action);
} else if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
// 解析异常处理
log.info("parse exception, action: {}", action);
throw failure;
} else {
log.info("todo action: {}", action);
throw failure;
}
};

ElasticsearchSinkFunction elasticsearchSinkFunction = (o, runtimeContext, requestIndexer) -> {
String dateStr = LocalDateTimeUtils.formatToString(o.getStmsInsertTime(), LocalDateTimeUtils.DATE_FORMAT_YM);
String indexName = ElasticConstants.PICKUP_NETWORK_DETAIL + ElasticConstants.SPLIT + dateStr;
String indexType = ElasticConstants.PICKUP_NETWORK_DETAIL;

IndexRequest indexRequest = Requests.indexRequest()
.index(indexName)
.type(indexType)
.id(o.getUniqueId())
.source(JSON.toJSONString(o));
indexRequest.versionType(VersionType.EXTERNAL_GTE);
indexRequest.version(o.getStmsUpdateTime().getTime());
requestIndexer.add(indexRequest);
}

new ElasticsearchSink<>(userConfig, transportAddresses, elasticsearchSinkFunction, failureHandler);

继承结构

ElasticsearchSink 继承结构

  • 继承 ElasticsearchSinkBase

    主要实现类

  • 继承 RichSinkFunction<T>

    实现 flink sink open(Configuration parameters)close() 方法

  • 实现了 CheckpointedFunction 接口

    实现快照的初始化,快照存储;

源码实现

ElasticsearchSink 类

两种构造方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
   /**
* Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
*
* @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
* @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
* @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
*/
public ElasticsearchSink(
Map<String, String> userConfig,
List<InetSocketAddress> transportAddresses,
ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {

this(userConfig, transportAddresses, elasticsearchSinkFunction, new NoOpFailureHandler());
}

/**
* Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link TransportClient}.
*
* @param userConfig The map of user settings that are used when constructing the {@link TransportClient} and {@link BulkProcessor}
* @param transportAddresses The addresses of Elasticsearch nodes to which to connect using a {@link TransportClient}
* @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element
* @param failureHandler This is used to handle failed {@link ActionRequest}
*/
public ElasticsearchSink(
Map<String, String> userConfig,
List<InetSocketAddress> transportAddresses,
ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
ActionRequestFailureHandler failureHandler) {

super(new Elasticsearch5ApiCallBridge(transportAddresses), userConfig, elasticsearchSinkFunction, failureHandler);
}
  • NoOpFailureHandler 方式不处理异常,数据写入过程中如果发生异常,直接向上抛出,阻塞现成的执行;
  • ActionRequestFailureHandler 自定义异常数据处理,对不同的异常,针对性地做抛出、阻塞等;

ElasticsearchSinkBase 类

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// es BulkProcessor API 参数配置
public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = "bulk.flush.max.actions";
public static final String CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB = "bulk.flush.max.size.mb";
public static final String CONFIG_KEY_BULK_FLUSH_INTERVAL_MS = "bulk.flush.interval.ms";
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE = "bulk.flush.backoff.enable";
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE = "bulk.flush.backoff.type";
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES = "bulk.flush.backoff.retries";
public static final String CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY = "bulk.flush.backoff.delay";

// BulkProcessor 一个批次的最大索引数量
private final Integer bulkProcessorFlushMaxActions;
// BulkProcessor 一个批次的最大容量(MB)
private final Integer bulkProcessorFlushMaxSizeMb;
// BulkProcessor flush 时间间隔(millis)
private final Integer bulkProcessorFlushIntervalMillis;
// BulkProcessor flush 策略
private final ElasticsearchSinkBase.BulkFlushBackoffPolicy bulkProcessorFlushBackoffPolicy;
// 用户定义的 es 配置
private final Map<String, String> userConfig;
private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;
// flush 异常处理
private final ActionRequestFailureHandler failureHandler;
private boolean flushOnCheckpoint = true;
private transient BulkProcessorIndexer requestIndexer;
private final ElasticsearchApiCallBridge callBridge;
// 最大请求等待数量
private AtomicLong numPendingRequests = new AtomicLong(0L);
// es client
private transient Client client;
// es BulkProcessor
private transient BulkProcessor bulkProcessor;
// 请求异常堆栈
private final AtomicReference<Throwable> failureThrowable = new AtomicReference();
  • es 数据的写入是基于 BulkProcessor 实现的
  • 实现 snapshotState(FunctionSnapshotContext context) 方法,请求检查点快照时调用; FunctionInitializationContext 执行初始化, FunctionSnapshotContext 调用;
  • 实现 initializeState(FunctionInitializationContext context) 方法,当分布式执行并行函数初始化的时候调用,通常再此方法设置状态存储的数据结构;

  • © 2015-2020 Andrew
  • Powered by Hexo Theme Ayer
  • PV: UV:

请我喝杯咖啡吧~

支付宝
微信