许昌市做网站公司,黄浦企业网站制作,外贸网站平台下载,小程序免费制作平台企业中心摘要#xff1a; Flink log connector是阿里云日志服务推出的#xff0c;用于对接Flink的工具#xff0c;包含两块#xff0c;分别是消费者和生产者#xff0c;消费者用于从日志服务中读数据#xff0c;支持exactly once语义#xff0c;生产者用于将数据写到日志服务中 Flink log connector是阿里云日志服务推出的用于对接Flink的工具包含两块分别是消费者和生产者消费者用于从日志服务中读数据支持exactly once语义生产者用于将数据写到日志服务中该Connector隐藏了日志服务的一些概念比如Shard的分裂合并等用户在使用时只需要专注在自己的业务逻辑即可。
阿里云日志服务是针对实时数据一站式服务用户只需要将精力集中在分析上过程中数据采集、对接各种存储计算、数据索引和查询等琐碎工作等都可以交给日志服务完成。
日志服务中最基础的功能是LogHub支持数据实时采集与消费实时消费家族除 Spark Streaming、Storm、StreamComputeBlink外目前新增Flink啦。Flink Connector Flink log connector是阿里云日志服务提供的用于对接flink的工具包括两部分消费者(Consumer)和生产者(Producer)。
消费者用于从日志服务中读取数据支持exactly once语义支持shard负载均衡. 生产者用于将数据写入日志服务使用connector时需要在项目中添加maven依赖
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_2.11/artifactIdversion1.3.2/version
/dependency
dependencygroupIdcom.aliyun.openservices/groupIdartifactIdflink-log-connector/artifactIdversion0.1.3/version
/dependency
dependencygroupIdcom.google.protobuf/groupIdartifactIdprotobuf-java/artifactIdversion2.5.0/version
/dependencydependencygroupIdcom.aliyun.openservices/groupIdartifactIdaliyun-log/artifactIdversion0.6.10/version/dependency
dependencygroupIdcom.aliyun.openservices/groupIdartifactIdlog-loghub-producer/artifactIdversion0.1.8/version
/dependency
代码Github
用法 请参考日志服务文档正确创建Logstore。 如果使用子账号访问请确认正确设置了LogStore的RAM策略。参考授权RAM子用户访问日志服务资源。 1. Log Consumer 在Connector中 类FlinkLogConsumer提供了订阅日志服务中某一个LogStore的能力实现了exactly once语义在使用时用户无需关心LogStore中shard数 量的变化consumer会自动感知。
flink中每一个子任务负责消费LogStore中部分shard如果LogStore中shard发生split或者merge子任务消费的shard也会随之改变。
1.1 配置启动参数
Properties configProps new Properties();
// 设置访问日志服务的域名
configProps.put(ConfigConstants.LOG_ENDPOINT cn-hangzhou.log.aliyuncs.com);
// 设置访问ak
configProps.put(ConfigConstants.LOG_ACCESSSKEYID );
configProps.put(ConfigConstants.LOG_ACCESSKEY );
// 设置日志服务的project
configProps.put(ConfigConstants.LOG_PROJECT ali-cn-hangzhou-sls-admin);
// 设置日志服务的LogStore
configProps.put(ConfigConstants.LOG_LOGSTORE sls_consumergroup_log);
// 设置消费日志服务起始位置
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION Consts.LOG_END_CURSOR);
// 设置日志服务的消息反序列化方法
RawLogGroupListDeserializer deserializer new RawLogGroupListDeserializer();
final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamRawLogGroupList logTestStream env.addSource(new FlinkLogConsumerRawLogGroupList(deserializer configProps));
上面是一个简单的消费示例我们使用java.util.Properties作为配置工具所有Consumer的配置都可以在ConfigConstants中找到。
注意flink stream的子任务数量和日志服务LogStore中的shard数量是独立的如果shard数量多于子任务数量每个子任务不重复的消费多个shard如果少于
那么部分子任务就会空闲等到新的shard产生。
1.2 设置消费起始位置 Flink log consumer支持设置shard的消费起始位置通过设置属性ConfigConstants.LOG_CONSUMER_BEGIN_POSITION就可以定制消费从shard的头尾或者某个特定时间开始消费具体取值如下
Consts.LOG_BEGIN_CURSOR 表示从shard的头开始消费也就是从shard中最旧的数据开始消费。 Consts.LOG_END_CURSOR 表示从shard的尾开始也就是从shard中最新的数据开始消费。 UnixTimestamp 一个整型数值的字符串用1970-01-01到现在的秒数表示 含义是消费shard中这个时间点之后的数据。 三种取值举例如下
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION Consts.LOG_BEGIN_CURSOR);
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION Consts.LOG_END_CURSOR);
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION 1512439000);
1.3 监控消费进度(可选) Flink log consumer支持设置消费进度监控所谓消费进度就是获取每一个shard实时的消费位置这个位置使用时间戳表示详细概念可以参考 文档消费组-查看状态消费组-监控报警 。
configProps.put(ConfigConstants.LOG_CONSUMERGROUP your consumer group name”);
注意上面代码是可选的如果设置了consumer会首先创建consumerGroup如果已经存在则什么都不做consumer中的snapshot会自动同步到日志服务的consumerGroup中用户可以在日志服务的控制台查看consumer的消费进度。
1.4 容灾和exactly once语义支持 当打开Flink的checkpointing功能时Flink log consumer会周期性的将每个shard的消费进度保存起来当作业失败时flink会恢复log consumer并 从保存的最新的checkpoint开始消费。
写checkpoint的周期定义了当发生失败时最多多少的数据会被回溯也就是重新消费使用代码如下
final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
// 开启flink exactly once语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 每5s保存一次checkpoint
env.enableCheckpointing(5000);
更多Flink checkpoint的细节请参考Flink官方文档Checkpoints。
1.5 补充材料关联 API与权限设置 Flink log consumer 会用到的阿里云日志服务接口如下
GetCursorOrData
用于从shard中拉数据 注意频繁的调用该接口可能会导致数据超过日志服务的shard quota 可以通过ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS和ConfigConstants.LOG_MAX_NUMBER_PER_FETCH
控制接口调用的时间间隔和每次调用拉取的日志数量shard的quota参考文章[shard简介](https://help.aliyun.com/document_detail/28976.html).
configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS 100);
configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH 100);
ListShards用于获取logStore中所有的shard列表获取shard状态等.如果您的shard经常发生分裂合并可以通过调整接口的调用周期来及时发现shard的变化。
// 设置每30s调用一次ListShards
configProps.put(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS 30000);
CreateConsumerGroup
该接口调用只有当设置消费进度监控时才会发生功能是创建consumerGroup用于同步checkpoint。
ConsumerGroupUpdateCheckPoint
该接口用户将flink的snapshot同步到日志服务的consumerGroup中。
子用户使用Flink log consumer需要授权如下几个RAM Policy2. Log Producer FlinkLogProducer 用于将数据写到阿里云日志服务中。
注意producer只支持Flink at-least-once语义这就意味着在发生作业失败的情况下写入日志服务中的数据有可能会重复但是绝对不会丢失。
用法示例如下我们将模拟产生的字符串写入日志服务
// 将数据序列化成日志服务的数据格式
class SimpleLogSerializer implements LogSerializationSchemaString {public RawLogGroup serialize(String element) {RawLogGroup rlg new RawLogGroup();RawLog rl new RawLog();rl.setTime((int)(System.currentTimeMillis() / 1000));rl.addContent(message element);rlg.addLog(rl);return rlg;}
}
public class ProducerSample {public static String sEndpoint cn-hangzhou.log.aliyuncs.com;public static String sAccessKeyId ;public static String sAccessKey ;public static String sProject ali-cn-hangzhou-sls-admin;public static String sLogstore test-flink-producer;private static final Logger LOG LoggerFactory.getLogger(ConsumerSample.class);public static void main(String[] args) throws Exception {final ParameterTool params ParameterTool.fromArgs(args);final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.getConfig().setGlobalJobParameters(params);env.setParallelism(3);DataStreamString simpleStringStream env.addSource(new EventsGenerator());Properties configProps new Properties();// 设置访问日志服务的域名configProps.put(ConfigConstants.LOG_ENDPOINT sEndpoint);// 设置访问日志服务的akconfigProps.put(ConfigConstants.LOG_ACCESSSKEYID sAccessKeyId);configProps.put(ConfigConstants.LOG_ACCESSKEY sAccessKey);// 设置日志写入的日志服务projectconfigProps.put(ConfigConstants.LOG_PROJECT sProject);// 设置日志写入的日志服务logStoreconfigProps.put(ConfigConstants.LOG_LOGSTORE sLogstore);FlinkLogProducerString logProducer new FlinkLogProducerString(new SimpleLogSerializer() configProps);simpleStringStream.addSink(logProducer);env.execute(flink log producer);}// 模拟产生日志public static class EventsGenerator implements SourceFunctionString {private boolean running true;Overridepublic void run(SourceContextString ctx) throws Exception {long seq 0;while (running) {Thread.sleep(10);ctx.collect((seq) - RandomStringUtils.randomAlphabetic(12));}}Overridepublic void cancel() {running false;}}
}
2.1 初始化 Producer初始化主要需要做两件事情
初始化配置参数Properties 这一步和Consumer类似 Producer有一些定制的参数一般情况下使用默认值即可特殊场景可以考虑定制
// 用于发送数据的io线程的数量默认是8
ConfigConstants.LOG_SENDER_IO_THREAD_COUNT
// 该值定义日志数据被缓存发送的时间默认是3000
ConfigConstants.LOG_PACKAGE_TIMEOUT_MILLIS
// 缓存发送的包中日志的数量默认是4096
ConfigConstants.LOG_LOGS_COUNT_PER_PACKAGE
// 缓存发送的包的大小默认是3Mb
ConfigConstants.LOG_LOGS_BYTES_PER_PACKAGE
// 作业可以使用的内存总的大小默认是100Mb
ConfigConstants.LOG_MEM_POOL_BYTES
上述参数不是必选参数用户可以不设置直接使用默认值。
重载LogSerializationSchema定义将数据序列化成RawLogGroup的方法。
RawLogGroup是log的集合每个字段的含义可以参考文档[日志数据模型](https://help.aliyun.com/document_detail/29054.html)。
如果用户需要使用日志服务的shardHashKey功能指定数据写到某一个shard中可以使用LogPartitioner产生数据的hashKey用法例子如下
FlinkLogProducerString logProducer new FlinkLogProducerString(new SimpleLogSerializer() configProps);
logProducer.setCustomPartitioner(new LogPartitionerString() {// 生成32位hash值public String getHashKey(String element) {try {MessageDigest md MessageDigest.getInstance(MD5);md.update(element.getBytes());String hash new BigInteger(1 md.digest()).toString(16);while(hash.length() 32) hash 0 hash;return hash;} catch (NoSuchAlgorithmException e) {}return 0000000000000000000000000000000000000000000000000000000000000000;}});
注意LogPartitioner是可选的不设置情况下 数据会随机写入某一个shard。
2.2 权限设置RAM Policy Producer依赖日志服务的API写数据如下
log:PostLogStoreLogs log:ListShards 当RAM子用户使用Producer时需要对上述两个API进行授权