最近在写一些ES的入库代码,需要扫描磁盘上的上TB的数据,找到其中的元数据. 按照前人写的一次性读取所有的元数据来讲,是不可性的,最多在测试环境跑一下测试数据,线上的话,会导致内存泄漏的问题.
所以用对递归回调的方式,扫描的一个元数据便处理一个元数据,避免占用太大的内存.这边做一个简单的demo.思路就是,每读取一个元数据,便把数据放入到队列中.让消费者去消费队列中的数据.
- 首先,定义一个回调接口
import java.util.concurrent.BlockingQueue;
public interface ThreadCallback {
void process(Integer count, String fileName, BlockingQueue<String> queue);
}
- 继承Thread,然后实现上述接口
import java.util.concurrent.BlockingQueue;
public class ThreadImpl extends Thread implements ThreadCallback {
private BlockingQueue<String> queue;
private static Integer count = 0;
public ThreadImpl(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void process(Integer count, String fileName, BlockingQueue<String> queue) {
count = count + 1;
try {
queue.put(fileName);
System.out.print(count);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void run() {
UtilHelper.processFile("/home/tmp/workspace/design", count, ThreadImpl.this, queue);
try {
queue.put("over");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
- 编写读取文件目录代码
import java.io.File;
import java.util.concurrent.BlockingQueue;
public class UtilHelper {
public static void processFile(String filePath, Integer count, ThreadCallback callback, BlockingQueue<String> queue) {
File file = new File(filePath);
File[] files = file.listFiles();
for (int i = 0; i < files.length; i++) {
if (files[i].isDirectory()) {
processFile(files[i].getPath(), count, callback, queue);
} else {
// 此处回调方法
callback.process(count, files[i].getName(), queue);
}
}
}
}
- 创建消费线程
import java.util.concurrent.BlockingQueue;
public class ThreadCustomer extends Thread {
private BlockingQueue<String> queue;
public ThreadCustomer(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
String str = queue.poll();
if ("over".equals(str)) {
System.out.println("结束");
break;
} else {
System.out.println(str);
}
}
}
}
- 最后,main类
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class Main {
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
new ThreadImpl(queue).start();
new ThreadCustomer(queue).start();
}
}
当然,代码还是有缺陷的,便是count只是起到了标识的作用,没有真正统计读取的文件个数.
0