- 论坛徽章:
- 0
|
用一个线程类做守护线程。
package com.gmt.ftp;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import com.fy.framework.util.Contents;
import com.fy.framework.util.UT;
/**
*
* 下载与上传处理
*
* 设置一个守护线程,如果发现有下载或是上传线程已中断,则重新生成另一线程。
*
* @author
*
*/
public class FTPServer extends Thread {
private final static Logger log = Logger.getLogger(Contents.LOG_APP_NAME);
private static List downThreads = new ArrayList(0);
private static List upThreads = new ArrayList(0);
public FTPServer(){}
public void run(){
try{
if (downThreads.size() > 0 || upThreads.size() > 0 ){
while(true){
for (int i=0,loop=downThreads.size();i
Configuration config = Configuration.getInstance();
try{
List listNode = config.parser(new File(Configuration.class
.getClassLoader().getResource("ftp-config.xml").getPath()));
int size = 0;
if (listNode != null){
size = listNode.size();
DownloadBusiness[] download = new DownloadBusiness[size];
UploadBusiness[] upload = new UploadBusiness[size];
for (int i=0,loop=size;i
上传类:
package com.gmt.ftp;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.SocketException;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.log4j.Logger;
import com.fy.framework.util.Contents;
import com.fy.framework.util.UT;
import com.gmt.ftp.util.FTPContents;
import com.gmt.ftp.util.FTPUtil;
/**
*
* 上传处理类
*
* @author
*
*/
public class UploadBusiness extends Thread {
private final static Logger log = Logger.getLogger(Contents.LOG_APP_NAME);
// FTP客户端代理
private FTPClient ftpClient;
// 是否连接上
private boolean isConnect;
public boolean isDeadth;
private ConfigBean config;
public UploadBusiness(ConfigBean config) {
this.ftpClient = new FTPClient();
this.isConnect = false;
this.isDeadth = false;
this.config = config;
}
public void run() {
try {
// 是否已连接上FTP服务器,如果未连接则连接
if (!isConnect) {
connectFtpServer();
}
while (true) {
// 上传文件
uploadManyFile(config.getLocalUpDir());
long sleepTime = (Long.parseLong(config.getUpQuartz())*60*1000);
Thread.sleep(sleepTime);
}
} catch (Exception e) {
//线程中断标志
isDeadth = true;
log.debug(e.getMessage(), e);
} finally {
closeConnect();
}
}
/**
*
* 连接到FTP服务器
*
* @return true 连接服务器成功,false 连接服务器失败
*/
public boolean connectFtpServer() {
boolean isSuccess = false;
if (ftpClient == null) {
ftpClient = new FTPClient();
}
try {
int reply;
// 设置通讯的字符集
ftpClient.setControlEncoding("GBK");
// 登录FTP服务器的端口
ftpClient.setDefaultPort(Integer.parseInt(config.getPort()));
ftpClient.connect(config.getFtpIP());
// 验证登录FTP服务器
if (ftpClient.login(config.getUserName(), config.getPassWord())) {
// 发出链接请求后,应马上获取replyCode,检验链接是否成功
reply = ftpClient.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
ftpClient.disconnect();
log.debug("验证登录FTP服务器失败,时间:" + UT.getCurrDefaultTime());
return isSuccess;
}
//FTP客户端代理连接处于活动的FTPServer
ftpClient.enterLocalPassiveMode();
// 设置传输模式
ftpClient.setFileTransferMode(
FTP.STREAM_TRANSFER_MODE
);
// 防止超时断开
ftpClient.setDataTimeout(FTPContents.TIME_OUT);
// 转到文件上传到FTP服务器的存放目录
if (!ftpClient.changeWorkingDirectory(config.getFtpStoreDir())) {
log.debug("[上传]FTP服务器提供上传的文件夹不存在.");
return isSuccess;
}
}
isSuccess = true;
log.debug("[上传]验证登录FTP服务器成功,时间:"+UT.getCurrDefaultTime());
} catch (SocketException e) {
isSuccess = false;
log.debug("[上传]登录FTP服务器发生异常,时间:"+UT.getCurrDefaultTime()+e.getMessage());
} catch (IOException ie) {
isSuccess = false;
log.debug("[上传]登录FTP服务器出现异常,时间:"+UT.getCurrDefaultTime()+ie.getMessage());
}
this.isConnect = isSuccess;
return isSuccess;
}
/**
*
* 关闭FTP连接
*
* 关闭FTP连接,释放资源
*
*/
public void closeConnect() {
try {
if (ftpClient != null) {
ftpClient.logout();
ftpClient.disconnect();
}
this.isConnect = false;
} catch (Exception e) {
log.debug(e.getMessage(), e);
}
}
/**
*
* 上传单个文件。从本地上传单个文件到FTP指定的目录.
*
* @param fileName
* 本地文件名
* @return true 上传成功,false 下传失败
*/
public boolean uploadOneFile(String fileName) {
boolean uploadSuccess = false;
InputStream inputStream = null;
try {
String localUpDir = config.getLocalUpDir();
File localFile = new File(FTPUtil.formatPathAppendEnd(localUpDir)+ fileName);
// 上传时,在文件名后加上.temp,表示文件正在上传,其它程序不可作处理.
String tempName = fileName + FTPContents.APPEND_TEMP_NAME;
inputStream = new FileInputStream(localFile);
// FTP服务器保存文件
uploadSuccess = ftpClient.storeFile(tempName, inputStream);
// 关闭文件流
inputStream.close();
if (uploadSuccess) {
// 重命文件名
ftpClient.rename(tempName, fileName);
// 把已上传成功的文件移到备份目录
UT.copy(FTPUtil.formatPathAppendEnd(localUpDir)
+ fileName, FTPUtil.formatPathAppendEnd(config.getLocalUpBak())
+ fileName);
// 删除已上传成功的文件
UT.deleteFile(FTPUtil.formatPathAppendEnd(localUpDir)+ fileName);
}
} catch (Exception e) {
log.debug(e.getMessage(), e);
} finally {
if (inputStream != null) {
try {
inputStream.close();
} catch (Exception e) {
log.debug(e.getMessage(), e);
}
}
}
return uploadSuccess;
}
/**
*
* 上传多个文件
*
* @param localFile
* 待上传目录
* @return true 上传成功,false 上传失败
*/
public boolean uploadManyFile(String localFile) {
boolean uploadSuccess = true;
// 如果不提供上传目录或为空,则作为不用上传处理
if (localFile == null || "".equals(localFile)) {
return uploadSuccess;
}
// 判断是否还处于连接状态,如果不是连接状态,则登录FTP服务器
if (ftpClient != null && !ftpClient.isConnected()) {
connectFtpServer();
}
try {
// 判断本地上传目录存是与否.
if (!new File(localFile).exists()) {
new File(localFile).mkdirs();
}
File upFile = new File(localFile);
// 待上传的文件列表
File[] file = upFile.listFiles();
if (file != null) {
for (int i = 0, loop = file.length; i
File inFile = file.getAbsoluteFile();
// 如果是目录,再递归迭代
if (inFile.isDirectory()) {
uploadManyFile(inFile.getAbsolutePath());
// 如果是文件则上传
} else {
// 过滤上传文件类型,只能上传指定文件类型
if (config.getUpFilter().canUpload(config.getUpType(),inFile.getName())) {
if (!uploadOneFile(inFile.getName())) {
log.debug("文件:"+inFile.getName()+"上传失败,时间:"+UT.getCurrDefaultTime());
}
}
}
}
}
} catch (Exception e) {
log.debug(e.getMessage(), e);
}
return uploadSuccess;
}
public boolean isConnect() {
return isConnect;
}
public void setConnect(boolean isConnect) {
this.isConnect = isConnect;
}
public boolean isDeadth() {
return isDeadth;
}
public void setDeadth(boolean isDeadth) {
this.isDeadth = isDeadth;
}
public ConfigBean getConfig() {
return config;
}
public void setConfig(ConfigBean config) {
this.config = config;
}
}
下载类:
package com.gmt.ftp;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.SocketException;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.log4j.Logger;
import com.fy.framework.util.Contents;
import com.fy.framework.util.UT;
import com.gmt.ftp.util.FTPContents;
import com.gmt.ftp.util.FTPUtil;
/**
*
* 下载处理类
*
* @author
*
*/
public class DownloadBusiness extends Thread{
private final static Logger log = Logger.getLogger(Contents.LOG_APP_NAME);
//FTP客户端代理
private FTPClient ftpClient;
//是否连接上
private boolean isConnect;
public boolean isDeadth;
private ConfigBean config;
public DownloadBusiness(ConfigBean config){
this.ftpClient = new FTPClient();
this.isConnect = false;
this.isDeadth = false;
this.config = config;
}
public void run(){
try{
//是否已连接上FTP服务器,如果未连接则连接
if (!isConnect){
connectFtpServer();
}
while(true){
//下载文件
downLoadManyFile();
long sleepTime = (Long.parseLong(config.getDownQuartz())*60*1000);
Thread.sleep(sleepTime);
}
}catch(Exception e){
//设置线程中断
isDeadth = true;
log.debug(e.getMessage(),e);
}finally{
closeConnect();
}
}
/**
*
* 连接到FTP服务器
*
* @return
* true 连接服务器成功,false 连接服务器失败
*/
public boolean connectFtpServer(){
boolean isSuccess = false;
if(ftpClient == null){
ftpClient = new FTPClient();
}
try{
int reply;
//设置通讯的字符集
ftpClient.setControlEncoding("GBK");
//登录FTP服务器的端口
ftpClient.setDefaultPort(Integer.parseInt(config.getPort()));
ftpClient.connect(config.getFtpIP());
//验证登录FTP服务器
if (ftpClient.login(config.getUserName(), config.getPassWord())) {
//发出链接请求后,应马上获取replyCode,检验链接是否成功
reply = ftpClient.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
ftpClient.disconnect();
log.debug("[下载]验证登录FTP服务器失败,"+UT.getCurrDefaultTime());
return isSuccess;
}
}
//转到FTP服务器下载目录
if(!ftpClient.changeWorkingDirectory(config.getFtpDownDir())){
log.debug("FTP服务器上提供下载的文件夹不存在.");
return isSuccess;
}
//防止超时断开
ftpClient.setDataTimeout(FTPContents.TIME_OUT);
isSuccess = true;
this.isConnect = isSuccess;
log.debug("[下载]验证登录FTP服务器成功,"+UT.getCurrDefaultTime());
}catch(SocketException e){
isSuccess = false;
log.debug("[下载]登录FTP服务器出现异常,"+UT.getCurrDefaultTime()+e.getMessage());
}catch(IOException ie){
isSuccess = false;
log.debug("[下载]登录FTP服务器出现异常,"+UT.getCurrDefaultTime()+ie.getMessage());
}
return isSuccess;
}
/**
*
* 关闭FTP连接
*
* 关闭FTP连接,释放资源
*
*/
public void closeConnect() {
try {
if (ftpClient != null) {
ftpClient.logout();
ftpClient.disconnect();
}
this.isConnect = false;
log.debug("[下载]关闭FTP服务器,"+UT.getCurrDefaultTime());
} catch (Exception e) {
log.debug("[下载]关闭FTP服务器出现异常,"+UT.getCurrDefaultTime()+e.getMessage());
}
}
/**
*
* 从FTP服务器上下载文件,下载单个文件
*
* @param fileName
*
* @return
* true 下载成功或待下载的文件不符合需求 false 下载失败
*/
public boolean downLoadOneFile(String fileName){
boolean downSuccess = true;
if (".".equals(fileName) || "..".equals(fileName)){
return downSuccess;
}
BufferedOutputStream outPutStream = null;
try{
String localStoreDir = config.getLocalStoreDir();
if (!new File(localStoreDir).exists()){
new File(localStoreDir).mkdirs();
}
//在此,给文件名加上后缀.temp表示文件正在下载,其它程序不可作处理
String tempFileName = FTPUtil.appendTempName(fileName);
File temFile = new File(FTPUtil.formatPathAppendEnd(localStoreDir)+tempFileName);
File reNameFile = new File(FTPUtil.formatPathAppendEnd(localStoreDir)+fileName);
outPutStream = new BufferedOutputStream(new FileOutputStream(temFile));
downSuccess = ftpClient.retrieveFile(fileName,outPutStream);
//关闭文件流
outPutStream.close();
if (downSuccess){
//待文件下载完成,再把文件名改回来
if (!temFile.renameTo(reNameFile)){
log.debug("下载成功,但改文件名时出错.文件名:"+fileName);
}
//删除FTP服务器上的下载文件
if (!ftpClient.deleteFile(fileName)){
log.debug("下载成功,但在删除FTP服务器上的文件时出错.文件名:"+fileName);
}
}
}catch(Exception e){
downSuccess = false;
log.debug(e.getMessage(),e);
}finally{
try{
if (outPutStream != null){
outPutStream.close();
}
}catch(Exception e){
log.debug(e.getMessage(),e);
}
}
return downSuccess;
}
/**
*
* 从FTP服务器上下载多个文件.
*
* @return
*
*/
public boolean downLoadManyFile(){
//是否下载成功的标志
boolean downSuccess = true;
//如果不提供下载目录或为空,则作为不用下载处理
String ftpDownloadDir = config.getFtpDownDir();
if (ftpDownloadDir == null || "".equals(ftpDownloadDir)){
return downSuccess;
}
//判断是否还处于连接状态,如果不是连接状态,则登录FTP服务器
if (ftpClient != null && !ftpClient.isConnected()){
connectFtpServer();
}
try{
//下载目录的文件列表
FTPFile[] ftpFile = ftpClient.listFiles();
for(FTPFile file:ftpFile){
String fileName = file.getName();
//判断待下载文件是否属于指定文件类型
if (config.getDownFilter().canDownload(config.downType,fileName)){
//如果下载文件不成功,记录下文件名
if (!downLoadOneFile(fileName)){
log.debug("文件:"+fileName+"下载失败,时间:"+UT.getCurrDefaultTime());
}
}
}
}catch(Exception e){
downSuccess = false;
log.debug(e.getMessage(),e);
}
return downSuccess;
}
public boolean isConnect() {
return isConnect;
}
public void setConnect(boolean isConnect) {
this.isConnect = isConnect;
}
public boolean isDeadth() {
return isDeadth;
}
public void setDeadth(boolean isDeadth) {
this.isDeadth = isDeadth;
}
public ConfigBean getConfig() {
return config;
}
public void setConfig(ConfigBean config) {
this.config = config;
}
}
本文来自ChinaUnix博客,如果查看原文请点:http://blog.chinaunix.net/u2/78176/showart_1420038.html |
|