江阴营销网站建设,asp简单网站开发,云南app开发,南宁网站定制摘要#xff1a; MaxCompute 的数据上传接口#xff08;Tunnel#xff09;定义了数据 block 的概念#xff1a;一个 block 对应一个 http request#xff0c;多个 block 的上传可以并发而且是原子的#xff0c;一次同步请求要么成功要么失败#xff0c;不会污染其他的 b…摘要 MaxCompute 的数据上传接口Tunnel定义了数据 block 的概念一个 block 对应一个 http request多个 block 的上传可以并发而且是原子的一次同步请求要么成功要么失败不会污染其他的 block。这种设计对于服务端来讲十分简洁但是也把记录状态做 fa.
本文用到的
阿里云数加大数据计算服务MaxCompute产品地址https://www.aliyun.com/product/odps
MaxCompute 的数据上传接口Tunnel定义了数据 block 的概念一个 block 对应一个 http request多个 block 的上传可以并发而且是原子的一次同步请求要么成功要么失败不会污染其他的 block。这种设计对于服务端来讲十分简洁但是也把记录状态做 failover 的工作交给了客户端。
用户在使用 Tunnel SDK 编程时需要对 block 这一层的语义进行认知并且驱动数据上传的整个过程[1]并且自己进行容错毕竟『网络错误是正常而不是异常』。由于用户文档中并没有强调这一点的重要性导致很多用户踩了坑一种常见的出错场景是当客户端写数据的速度过慢两次 write 的间隔超时[2]导致整个 block 上传失败。
High Level API
MaxCompute Java SDK 在 0.21.3-public 之后新增了 BufferredWriter 这个更高层的 API简化了数据上传的过程并且提供了容错的功能。 BufferedWriter 对用户隐藏了 block 这个概念从用户角度看就是在 session 上打开一个 writer 然后往里面写记录即可
RecordWriter writer null;try {int i 0; writer uploadSession.openBufferedWriter();Record product uploadSession.newRecord();for (String item : items) {product.setString(name, item);product.setBigint(id, i);writer.write(product);i 1;}
} finally {if (writer ! null) {writer.close();}
}
uploadSession.commit();
具体实现时 BufferedWriter 先将记录缓存在客户端的缓冲区中并在缓冲区填满之后打开一个 http 连接进行上传。BufferedWriter 会尽最大可能容错保证数据上传上去。
由于屏蔽了底层细节这个接口可能并不适合数据预划分、断点续传、分批次上传等需要细粒度控制的场景。
多线程上传示例
多线程上传时每个线程只需要打开一个 writer 往里面写数据就行了。
class UploadThread extends Thread {private UploadSession session;private static int RECORD_COUNT 1200;public UploadThread(UploadSession session) {this.session session;}Overridepublic void run() {RecordWriter writer up.openBufferedWriter();Record r up.newRecord();for (int i 0; i RECORD_COUNT; i) {r.setBigint(0, i);writer.write(r);}writer.close();}
};public class Example {public static void main(String args[]) {// 初始化 MaxCompute 和 tunnel 的代码TableTunnel.UploadSession uploadSession tunnel.createUploadSession(projectName, tableName);UploadThread t1 new UploadThread(up);UploadThread t2 new UploadThread(up);t1.start();t2.start();t1.join();t2.join();uploadSession.commit();}
更多控制
重试策略
由于底层在上传出错时会回避一段固定的时间并进行重试但如果你的程序不想花太多时间在重试上或者你的程序位于一个极其恶劣的网络环境中为此 TunnelBufferedWriter 允许用户配置重试策略。
用户可以选择三种重试回避策略指数回避EXPONENTIAL_BACKOFF、线性时间回避LINEAR_BACKOFF、常数时间回避CONSTANT_BACKOFF。
例如下面这段代码可以将write 的重试次数调整为 6每一次重试之前先分别回避 4s、8s、16s、32s、64s 和 128s从 4 开始的指数递增的序列。
RetryStrategy retry new RetryStrategy(6, 4, RetryStrategy.BackoffStrategy.EXPONENTIAL_BACKOFF)writer (TunnelBufferedWriter) uploadSession.openBufferedWriter();
writer.setRetryStrategy(retry);
缓冲区控制
如果你的程序对 JVM 的内存有严格的要求可以通过下面这个接口修改缓冲区占内存的字节数bytes
writer.setBufferSize(1024*1024);
默认配置每一个 Writer 的 BufferSize 是 10 MiB。TunnelBufferedWriter 一次 flush buffer 的操作上传一个 block 的数据[3]。
多个进程共享 Session
由于一个 Session 的上传状态是通过维护一个 block list 实现的对于多线程程序来讲通过锁很容易实现资源的分配。但对于两个进程空间里的程序想要复用一个 Session 时必须通过一种机制对资源进行隔离。
具体地在 getUploadSession 的时候必须指定这个共享这个 Session 的进程数目以及一个用来区分进程的 global id
//程序1这个 session 将被两个 writer 共享我是其中第 0 个
TableTunnel.UploadSession up tunnel.getUploadSession(projectName, tableName, sid, 2, 0);
writer session.openBufferedWriter();//程序1这个 session 将被两个 writer 共享我是其中第 1 个
TableTunnel.UploadSession up tunnel.getUploadSession(projectName, tableName, sid, 2, 1);
writer session.openBufferedWriter();
Notes
[1] 一次完整的上传流程通常包括以下步骤
先对数据进行划分 为每个数据块指定 block id即调用 openRecordWriter(id) 然后用一个或多个线程分别将这些 block 上传上去 并在某个 block 上传失败以后需要对整个 block 进行重传 在所有 block 都上传以后向服务端提供上传成功的 blockid list 进行校验即调用 session.commit([1,2,3,…]) [2] 因为使用长连接服务端有计时器判断是否客户端是否 alive
[3] block 在服务端有 20000 个的数量上限如果 BufferSize 设得太小会导致 20000 个 block 很快被用光
[4] Session的有效期为24小时超过24小时会导致数据上传失败
原文链接
干货好文请关注扫描以下二维码