`

JDK7 NIO2 监控文件夹文件拷贝完成

阅读更多

基于JDK7,监控某文件夹拷贝文件,监控本身很简单用WatcherService实现,主要是如何确定文件拷贝完成,这里使用移动文件的思路

 

import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

import java.io.File;
import java.io.IOException;
import java.nio.file.*;
import java.util.UUID;

import static java.nio.file.StandardWatchEventKinds.*;

/**
 * JDK7 NIO2 文件监控
 */
public class MyWatcherService {

    // 时间格式化Format
    private static final DateTimeFormatter dirNameFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH mm ss ");

    public static void main(String[] args) throws IOException, InterruptedException {
        String dir = "C:\\Users\\Administrator\\Desktop\\test";
        Path path = Paths.get(dir);
        WatchService watcher = FileSystems.getDefault().newWatchService();
        path.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);

        while (true) {
            /**
             * 拷贝文件会触发 一次ENTRY_CREATE 两次ENTRY_MODIFY(文件可能只触发一次ENTRY_MODIFY)
             * 拷贝文件会触发 一次ENTRY_CREATE 两次ENTRY_MODIFY 一次ENTRY_DELETE
             * 删除文件会触发 一次ENTRY_DELETE
             */
            WatchKey key = watcher.take();
            for (WatchEvent<?> event : key.pollEvents()) {
                WatchEvent.Kind kind = event.kind();

                if (kind == OVERFLOW) {//事件可能lost or discarded
                    System.out.println("事件可能lost or discarded");
                    continue;
                }

                WatchEvent<Path> e = (WatchEvent<Path>) event;
                String kindName = kind.name();
                Path fileName = e.context();

                System.out.printf("Event %s has happened,which fileName is %s%n", kindName, fileName);

                if ("ENTRY_MODIFY".equals(kindName)) {
                    rename(dir, fileName.toString());
                }
            }
            // 重置 key 如果失败结束监控
            if (!key.reset()) {
                break;
            }
        }
    }

    /**
     * 通过修改文件名判断文件拷贝完成
     */
    public static void rename(String dir, String fileName) {
        File src = new File(dir + File.separator + fileName);
        String tmp = "C:\\Users\\Administrator\\Desktop\\test2";
        File dest = new File(tmp + File.separator
                + DateTime.now().toString(dirNameFormat)
                + UUID.randomUUID().toString().replaceAll("-", "")
                + File.separator + fileName);
        try {
            // 未拷贝完成移动失败
            FileUtils.moveFile(src, dest);
        } catch (Exception e) {
            FileUtils.deleteQuietly(dest.getParentFile());
        }
        if (dest.exists()) {
            System.out.printf("file %s copy over , you can do next with %s%n", src.getAbsolutePath(), dest.getAbsolutePath());
        }
    }
}

 

 

完善版本

使用中发现,如果一次导入大量数据会存在有些event来不及处理就过去的情况,从结果来看就是文件夹中留存一些文件没有处理,进一步的解决方法是采用阻塞队列,文件监控进程只负责把文件名放到队列中

 

1.主程序启动一个文件监控进程,一个文件处理进程

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * <pre>
 * 功能说明:启动两个线程一个监控文件夹改变 一个处理文件
 * </pre>
 */
public class MyWatcherService2 {
    public static void main(String[] args) {

        // 创建单向队列queue DirWatch进程将监控到的文件放到队列中 Processor检查处理队列中的文件
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>();

        DirWatch watch = new DirWatch(queue);
        Thread watchThread = new Thread(watch);
        watchThread.start();

        Processor processor = new Processor(queue);
        Thread processorThread = new Thread(processor);
        processorThread.start();

        while (true) {
            try {
                Thread.sleep(9999999);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}

 

2.文件监控进程

import java.nio.file.*;
import java.util.concurrent.BlockingQueue;

import static java.nio.file.StandardWatchEventKinds.*;

/**
 * <pre>
 * 功能说明:通过NIO 监控文件夹并放入队列中
 * </pre>
 */
public class DirWatch implements Runnable {

    private BlockingQueue<String> queue;

    public DirWatch(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        String sourceDir = "D:\\bigdata\\src";

        try {

            // 对文件夹进行监控
            Path path = Paths.get(sourceDir);
            WatchService watcher = FileSystems.getDefault().newWatchService();
            path.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);

            while (true) {
                WatchKey key = watcher.take();
                for (WatchEvent<?> event : key.pollEvents()) {
                    WatchEvent.Kind kind = event.kind();

                    if (kind == OVERFLOW) {
                        System.out.println("事件可能lost or discarded");
                        continue;
                    }

                    WatchEvent<Path> e = (WatchEvent<Path>) event;
                    String kindName = kind.name();
                    Path fileName = e.context();

//                    System.out.printf("Event %s has happened,which fileName is %s%n", kindName, fileName);

                    if ("ENTRY_MODIFY".equals(kindName)) {
                        // 放到队列中 如果队列满抛出异常 证明程序逻辑出现问题 理论上不可能满
                        queue.add(fileName.toString());
                    }
                }
                // 重置 key 如果失败结束监控
                if (!key.reset()) {
                    throw new RuntimeException("重置Key失败,结束监控");
//                    break;
                }
            }

        } catch (Exception e) {
            System.out.println("监控异常, 停止程序");
            // 无法处理QAR数据 结束程序
            System.exit(1);
        }
    }
}

 

 

3.文件处理进程

import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

import java.io.File;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;

/**
 * <pre>
 * 功能说明:通过移动文件的方法判断文件拷贝完成 并处理文件
 * </pre>
 */
public class Processor implements Runnable {

    // 时间格式化Format
    private DateTimeFormatter dirNameFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH mm ss ");

    private BlockingQueue<String> queue;

    public Processor(BlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        String sourceDir = "D:\\bigdata\\src";
        String resDir = "D:\\bigdata\\res";
        try {
            while (true) {
                String fileName = queue.take();
                File src = new File(sourceDir + File.separator + fileName);
                // 再一次移动防止提交重名文件覆盖正在处理的文件
                File dest = new File(resDir + File.separator
                        + DateTime.now().toString(dirNameFormat)
                        + UUID.randomUUID().toString().replaceAll("-", "")
                        + File.separator + fileName);
                try {
                    // 未拷贝完成不能移动
                    FileUtils.moveFile(src, dest);
                } catch (Exception e) {
                    // 删除建立的空目录
                    FileUtils.deleteQuietly(dest.getParentFile());
                }

                if (dest.exists()) {
                    try {
                        System.out.printf("file %s copy over , you can do next with %s%n", src.getAbsolutePath(), dest.getAbsolutePath());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }

        } catch (Exception e) {
            System.out.println("处理异常, 停止程序");
            // 无法处理QAR数据 结束程序
            System.exit(1);
        }
    }
}

 

 

 

 

 

 

1
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics