Java 回调 的简单示例

  最近在写一些ES的入库代码,需要扫描磁盘上的上TB的数据,找到其中的元数据. 按照前人写的一次性读取所有的元数据来讲,是不可性的,最多在测试环境跑一下测试数据,线上的话,会导致内存泄漏的问题.

   所以用对递归回调的方式,扫描的一个元数据便处理一个元数据,避免占用太大的内存.这边做一个简单的demo.思路就是,每读取一个元数据,便把数据放入到队列中.让消费者去消费队列中的数据.

  • 首先,定义一个回调接口
import java.util.concurrent.BlockingQueue;

public interface ThreadCallback {

    void process(Integer count, String fileName, BlockingQueue<String> queue);
}
  • 继承Thread,然后实现上述接口
import java.util.concurrent.BlockingQueue;

public class ThreadImpl extends Thread implements ThreadCallback {
    private BlockingQueue<String> queue;
    private static Integer count = 0;
    public ThreadImpl(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void process(Integer count, String fileName, BlockingQueue<String> queue) {
        count = count + 1;
        try {
            queue.put(fileName);
            System.out.print(count);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        UtilHelper.processFile("/home/tmp/workspace/design", count, ThreadImpl.this, queue);
        try {
            queue.put("over");
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}
  • 编写读取文件目录代码
import java.io.File;
import java.util.concurrent.BlockingQueue;

public class UtilHelper {

    public static void processFile(String filePath, Integer count, ThreadCallback callback, BlockingQueue<String> queue) {
        File file = new File(filePath);
        File[] files = file.listFiles();

        for (int i = 0; i < files.length; i++) {
            if (files[i].isDirectory()) {
                processFile(files[i].getPath(), count, callback, queue);
            } else {
                                // 此处回调方法
                callback.process(count, files[i].getName(), queue);
            }
        }
    }

}
  • 创建消费线程
import java.util.concurrent.BlockingQueue;

public class ThreadCustomer extends Thread {
    private BlockingQueue<String> queue;
    public ThreadCustomer(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            String str = queue.poll();
            if ("over".equals(str)) {
                System.out.println("结束");
                break;
            } else {
                System.out.println(str);
            }
        }

    }
}
  • 最后,main类

import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class Main { public static void main(String[] args) { BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10); new ThreadImpl(queue).start(); new ThreadCustomer(queue).start(); } }

  当然,代码还是有缺陷的,便是count只是起到了标识的作用,没有真正统计读取的文件个数.

0

发表评论

This site uses Akismet to reduce spam. Learn how your comment data is processed.