超实用文件监控多线程FTP上传工具

时间:2021-12-09 19:09:01

这是自己很久以前写的一个多线程FTP 上传工具,支持多账户,自定义线程数,自定义文件监控目录,可用做文件发布使用,非常实用,今天有小伙伴问起,现分享出来:

超实用文件监控多线程FTP上传工具

 using System;
using System.Collections.Generic;
using System.Linq;
using System.IO;
using System.Text;
using System.Threading.Tasks; namespace NuFTPCmmndv5
{
public class TaskFile
{
public TaskFile()
{
GUID = Guid.NewGuid();
}
private long _fileSize { get; set; } public Guid GUID { get; set; }
public string HOST { get; set; }
public string DIR { get; set; }
public string LCD { get; set; }
public string Priority { get; set; }
public string Filename { get; set; }
public long Size
{
get
{
if (File.Exists(this.LCD))
{
_fileSize = new FileInfo(this.LCD).Length;
};
return _fileSize;
}
set { _fileSize = value; }
} }
}
 using System;
using System.IO;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms; using FluentScheduler; namespace NuFTPCmmndv5
{
public partial class FormMain : Form
{ private Setting setting;
private static Semaphore sema = new Semaphore(, );
private object syncRoot = new object();
public FormMain()
{
InitializeComponent();
setting = new Setting();
} private void FormMain_Load(object sender, EventArgs e)
{
setting = setting.loadSetting(); if (Directory.Exists(setting.FolderToMonitor + "err\\"))
Directory.CreateDirectory(setting.FolderToMonitor + "err\\"); if (Directory.Exists(setting.FolderToMonitor + "pending\\"))
Directory.CreateDirectory(setting.FolderToMonitor + "pending\\"); foreach (var f in new DirectoryInfo(setting.FolderToMonitor + "pending\\").GetFiles())
{
f.CopyTo(setting.FolderToMonitor + "err\\" + f.Name, true);
f.Delete();
} SetStatus(setting.FolderToMonitor); //开启上传任务
StartRunUploadTask(); //开始监控任务
StartRunMonitorTask();
} /// <summary>
/// 启动监控任务
/// </summary>
/// <param name="timer"></param>
public void StartRunMonitorTask(int timer)
{
JobManager.AddJob(() =>
{
sema.WaitOne();
SetToolStripStatusStatus(DateTime.Now + " (" + DataGridFiles.Rows.Count + ")");
RunTask();
sema.Release();
},
t =>
{
t.WithName("StartRunMonitorTask").ToRunNow().AndEvery(timer).Seconds();
});
} /// <summary>
/// 运行监控任务
/// </summary>
public void RunTask()
{
//lock (syncRoot)
//{
#region
try
{
//每5分钟读取出错文件并追加到待上传
if (DateTime.Now.Second == && DateTime.Now.Minute % == )
{
foreach (var f in new DirectoryInfo(setting.FolderToMonitor + "err\\").GetFiles())
{
f.CopyTo(setting.FolderToMonitor + f.Name, true);
f.Delete();
LogText("Retrying: " + f.Name);
}
}
//写监控
WriteMon(DateTime.Now.ToString()); //待上传文件大于200 报警
if (DataGridFiles.Rows.Count > )
{
WriteMon("=ERROR=NuFTPCmmndv4 pending upload: " + DataGridFiles.Rows.Count);
} //删除超过30天日志
foreach (var f in new DirectoryInfo(AppDomain.CurrentDomain.BaseDirectory + "logs\\").GetFiles())
{
if (f.LastWriteTime < DateTime.Now.AddDays(-))
f.Delete();
}
}
catch (Exception ex)
{
}
#endregion //读取dat文件
foreach (var f in new DirectoryInfo(setting.FolderToMonitor).GetFiles("*.dat"))
{
try
{
var task = new TaskFile(); #region
//按行读取
int curLine = ;
using (var fileStream = new FileStream(f.FullName, FileMode.Open, FileAccess.Read))
{
fileStream.Seek(, SeekOrigin.Begin);
string[] lines = System.IO.File.ReadAllLines(f.FullName);
using (var streamReader = new StreamReader(fileStream, Encoding.Default))
{
string content = streamReader.ReadLine();
while (!string.IsNullOrEmpty(content))
{
if (content.Substring(, ) == "HOST=")
{
//FTP
task.HOST = content.Substring(content.IndexOf("=") + );
}
else if (content.Substring(, ) == "LCD=")
{
//本地目录文件
task.LCD = content.Substring(content.IndexOf("=") + );
}
else if (content.Substring(, ) == "DIR=")
{
//远程对应目录文件
task.DIR = content.Substring(content.IndexOf("=") + );
task.DIR = task.DIR.Replace("\\", "/");
}
else if (content.Substring(, ) == "priority=")
{
//优先级
task.Priority = content.Substring(content.IndexOf("=") + );
} content = streamReader.ReadLine();
curLine++;
}
}
} #endregion //上传文件名
task.Filename = f.Name; //拷贝到待上传目录
f.CopyTo(setting.FolderToMonitor + "pending\\" + f.Name, true);
f.Delete(); //遍历账户配置
var account = setting.FTPAccounts.Select(a => a).Where(a => a.FTPName == task.HOST).FirstOrDefault();
if (account != null)
{
//是否已经存在
if (!account.Contains(task))
{
//添加到待传队列
account.AddQueue(task);
//刷新GridView
InvokeAddGridView(this.DataGridFiles, task);
}
else
{
//存在则移除文件
LogText(task.HOST + " The file already exists in the Queue:" + task.HOST + "/" + task.DIR);
}
}
}
catch (Exception ex)
{
LogText(ex.Message + ";" + ex.StackTrace);
}
}
//}
}
/// <summary>
/// 开启上传任务
/// </summary>
public void StartRunUploadTask()
{
foreach (var account in setting.FTPAccounts)
{
account.setting = setting;
//注册上传完成事件
account.Completed += account_Completed;
//注册上传进度事件
account.ProcessProgress += account_ProcessProgress;
//注册删除上传事件
account.Deleted += account_Deleted;
//终止上传事件
account.Aborted += account_Aborted;
//注册上传日志事件
account.ProcessLog += account_ProcessLog;
//开始上传队列
account.Start();
}
} private void account_ProcessProgress(TaskFile arg1, FTPAccount.CompetedEventArgs arg2)
{
InvokeUpdateGridView(this.DataGridFiles, arg1, arg2);
}
private void account_Completed(TaskFile arg1, FTPAccount.CompetedEventArgs arg2)
{
InvokeRemoveGridView(this.DataGridFiles, arg1);
}
private void account_Aborted(TaskFile arg1, FTPAccount.CompetedEventArgs arg2)
{
foreach (FileInfo f in new DirectoryInfo(setting.FolderToMonitor + "pending\\").GetFiles())
{
if (arg1.Filename == f.Name)
{
f.CopyTo(setting.FolderToMonitor + "err\\" + f.Name, true);
f.Delete();
break;
}
}
}
private void account_Deleted(TaskFile arg1, FTPAccount.CompetedEventArgs arg2)
{
//删除行
InvokeRemoveGridView(this.DataGridFiles, arg1);
//删除文件
try
{
System.IO.File.Delete(setting.FolderToMonitor + "err\\" + arg1.Filename);
}
catch (Exception ex)
{
LogText(ex.Message + ";" + ex.StackTrace);
}
}
private void account_ProcessLog(string obj)
{
LogText(obj);
} public void InvokeAddGridView(DataGridView dataGridView, TaskFile task)
{
if (dataGridView.InvokeRequired)
{
this.Invoke(new Action(() =>
{
AddGridView(dataGridView, task);
}));
return;
}
AddGridView(dataGridView, task);
}
public void AddGridView(DataGridView dataGridView, TaskFile task)
{
try
{
int index = dataGridView.Rows.Add();
dataGridView.Rows[index].Cells[].Value = task.LCD;
dataGridView.Rows[index].Cells[].Value = FormatFileSize(task.Size);
dataGridView.Rows[index].Cells[].Value = "Pending";
dataGridView.Rows[index].Cells[].Value = task.Filename;
dataGridView.Rows[index].Cells[].Value = task.GUID.ToString();
}
catch (Exception ex)
{
LogText(ex.Message + ";" + ex.StackTrace);
}
} public String FormatFileSize(Int64 fileSize)
{
if (fileSize < )
return "";
else if (fileSize >= * * )
return string.Format("{0:########0.00} GB", ((Double)fileSize) / ( * * ));
else if (fileSize >= * )
return string.Format("{0:####0.00} MB", ((Double)fileSize) / ( * ));
else if (fileSize >= )
return string.Format("{0:####0.00} KB", ((Double)fileSize) / );
else
return string.Format("{0} bytes", fileSize);
} public void InvokeUpdateGridView(DataGridView dataGridView, TaskFile task, FTPAccount.CompetedEventArgs arg)
{
if (dataGridView.InvokeRequired)
{
this.Invoke(new Action(() =>
{
UpdateGridView(dataGridView, task, arg);
}));
return;
}
UpdateGridView(dataGridView, task, arg);
}
public void UpdateGridView(DataGridView dataGridView, TaskFile task, FTPAccount.CompetedEventArgs arg)
{
try
{
foreach (DataGridViewRow r in dataGridView.Rows)
{
if (!r.IsNewRow && (r.Cells["Guid"].Value.ToString() == task.GUID.ToString()))
{
if (arg.uploadStatus == FTPAccount.UploadStatus.Failed)
{
r.Cells["Completed"].Value = "Failed";
r.Cells["Completed"].Style.BackColor = Color.LightPink;
}
else if (arg.uploadStatus == FTPAccount.UploadStatus.Timeout)
{
r.Cells["Completed"].Value = "Timeout";
r.Cells["Completed"].Style.BackColor = Color.LightPink;
}
else if (arg.uploadStatus == FTPAccount.UploadStatus.Cancel)
{
r.Cells["Completed"].Value = "Cancel";
r.Cells["Completed"].Style.BackColor = Color.LightPink;
}
else if (arg.uploadStatus == FTPAccount.UploadStatus.Uploading && arg.CompetedPrecent != )
{
r.Cells["Completed"].Value = arg.CompetedPrecent + "%";
r.Cells["Completed"].Style.BackColor = Color.White;
}
else if (arg.uploadStatus == FTPAccount.UploadStatus.Uploading && arg.CompetedPrecent == )
{
r.Cells["Completed"].Value = "Uploading";
r.Cells["Completed"].Style.BackColor = Color.White;
}
}
}
dataGridView.Sort(dataGridView.Columns["Completed"], ListSortDirection.Ascending);
}
catch (Exception ex)
{
LogText(ex.Message + ";" + ex.StackTrace);
}
} public void InvokeRemoveGridView(DataGridView dataGridView, TaskFile task)
{
if (dataGridView.InvokeRequired)
{
this.Invoke(new Action(() =>
{
RemoveGridView(dataGridView, task);
}));
return;
}
RemoveGridView(dataGridView, task);
}
public void RemoveGridView(DataGridView dataGridView, TaskFile task)
{
try
{
foreach (DataGridViewRow r in dataGridView.Rows)
{
if (!r.IsNewRow && (r.Cells["Guid"].Value.ToString() == task.GUID.ToString()))
{
dataGridView.Rows.Remove(r);
}
}
}
catch (Exception ex)
{
LogText(ex.Message + ";" + ex.StackTrace);
}
} public void WriteMon(string txt)
{ } public void LogText(string msg)
{
var dir = AppDomain.CurrentDomain.BaseDirectory + "logs\\";
if (!Directory.Exists(dir))
Directory.CreateDirectory(dir); using (var w = new StreamWriter(dir + DateTime.Now.ToString("yyyy-MM-dd") + ".log", true))
{
w.WriteLine(DateTime.Now.ToString() + ": " + msg);
w.Close();
} if (TextBoxLog.InvokeRequired)
{
this.Invoke(new Action(() =>
{
TextBoxLog.AppendText(msg + "\r\n");
if (this.TextBoxLog.Lines.Count() > setting.MAX_LOG_LINES)
{
this.TextBoxLog.Text = "";
}
Application.DoEvents();
}));
return;
} TextBoxLog.AppendText(msg + "\r\n");
if (this.TextBoxLog.Lines.Count() > setting.MAX_LOG_LINES)
{
this.TextBoxLog.Text = "";
}
Application.DoEvents();
} public void SetStatus(string msg)
{
if (this.InvokeRequired)
{
this.Invoke(new Action(() =>
{
this.LabelStatus.Text = msg;
Application.DoEvents();
}));
return;
}
this.LabelStatus.Text = msg;
Application.DoEvents();
} public void SetToolStripStatusStatus(string msg)
{
if (this.InvokeRequired)
{
this.Invoke(new Action(() =>
{
this.toolStripStatusLabel2.Text = msg;
Application.DoEvents();
}));
return;
}
this.toolStripStatusLabel2.Text = msg;
Application.DoEvents();
} public void WriteLog(TextBox textBox, string msg)
{
if (textBox.InvokeRequired)
{
textBox.Invoke(new Action(() =>
{
textBox.AppendText(msg + "\r\n");
}));
return;
}
textBox.AppendText(msg + "\r\n");
} private void button2_Click(object sender, EventArgs e)
{ if (ButtonPauseResume.Text == "Pause")
{
JobManager.Stop();
ButtonPauseResume.Text = "Resume";
}
else
{
JobManager.Start();
ButtonPauseResume.Text = "Pause";
}
} /// <summary>
/// 取消上传
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void cancelUploadToolStripMenuItem_Click(object sender, EventArgs e)
{
DataGridViewRow row;
var task = new TaskFile();
row = DataGridFiles.SelectedRows[];
var fileName = row.Cells["FileName"].Value.ToString(); #region if (System.IO.File.Exists(setting.FolderToMonitor + "pending\\" + fileName))
{
var f = new FileInfo(setting.FolderToMonitor + "pending\\" + fileName);
//按行读取
int curLine = ;
using (var fileStream = new FileStream(f.FullName, FileMode.Open, FileAccess.Read))
{
fileStream.Seek(, SeekOrigin.Begin);
string[] lines = System.IO.File.ReadAllLines(f.FullName);
using (var streamReader = new StreamReader(fileStream, Encoding.Default))
{
string content = streamReader.ReadLine();
while (!string.IsNullOrEmpty(content))
{
if (content.Substring(, ) == "HOST=")
{
//FTP
task.HOST = content.Substring(content.IndexOf("=") + );
}
else if (content.Substring(, ) == "LCD=")
{
//本地目录文件
task.LCD = content.Substring(content.IndexOf("=") + );
}
else if (content.Substring(, ) == "DIR=")
{
//远程对应目录文件
task.DIR = content.Substring(content.IndexOf("=") + );
task.DIR = task.DIR.Replace("\\", "/");
}
else if (content.Substring(, ) == "priority=")
{
//优先级
task.Priority = content.Substring(content.IndexOf("=") + );
} content = streamReader.ReadLine();
curLine++;
}
}
} foreach (var account in setting.FTPAccounts)
{
if (account.FTPName == task.HOST)
{
account.cancelTask = task;
break;
}
} //--------------Copyto file to err
f.CopyTo(setting.FolderToMonitor + "err\\" + f.Name, true);
System.IO.File.Delete(setting.FolderToMonitor + "pending\\" + f.Name);
//--------------Copyto file to err
} #endregion
} /// <summary>
/// 删除上传
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void deleteUploadToolStripMenuItem_Click(object sender, EventArgs e)
{
DataGridViewRow row;
var task = new TaskFile();
row = DataGridFiles.SelectedRows[];
var fileName = row.Cells["FileName"].Value.ToString();
var fileGuid = row.Cells["Guid"].Value.ToString();
#region if (System.IO.File.Exists(setting.FolderToMonitor + "pending\\" + fileName))
{
var f = new FileInfo(setting.FolderToMonitor + "pending\\" + fileName);
//按行读取
int curLine = ;
using (var fileStream = new FileStream(f.FullName, FileMode.Open, FileAccess.Read))
{
fileStream.Seek(, SeekOrigin.Begin);
string[] lines = System.IO.File.ReadAllLines(f.FullName);
using (var streamReader = new StreamReader(fileStream, Encoding.Default))
{
string content = streamReader.ReadLine();
while (!string.IsNullOrEmpty(content))
{
if (content.Substring(, ) == "HOST=")
{
//FTP
task.HOST = content.Substring(content.IndexOf("=") + );
}
else if (content.Substring(, ) == "LCD=")
{
//本地目录文件
task.LCD = content.Substring(content.IndexOf("=") + );
}
else if (content.Substring(, ) == "DIR=")
{
//远程对应目录文件
task.DIR = content.Substring(content.IndexOf("=") + );
task.DIR = task.DIR.Replace("\\", "/");
}
else if (content.Substring(, ) == "priority=")
{
//优先级
task.Priority = content.Substring(content.IndexOf("=") + );
} content = streamReader.ReadLine();
curLine++;
}
}
} if (!string.IsNullOrEmpty(task.HOST))
{
foreach (var account in setting.FTPAccounts)
{
if (account.FTPName == task.HOST)
{
account.delTask = task;
break;
}
}
} LogText("Delete File:" + "pending\\" + f.Name);
System.IO.File.Delete(setting.FolderToMonitor + "pending\\" + f.Name);
} if (!string.IsNullOrEmpty(task.HOST))
RemoveGridView(this.DataGridFiles, task);
else
{
task.GUID = System.Guid.Parse(fileGuid);
RemoveGridView(this.DataGridFiles, task);
} #endregion
} private void DataGridFiles_CellMouseDown(object sender, DataGridViewCellMouseEventArgs e)
{
if (e.Button == MouseButtons.Right)
{
if (e.RowIndex >= )
{
//若行已是选中状态就不再进行设置
if (DataGridFiles.Rows[e.RowIndex].Selected == false)
{
DataGridFiles.ClearSelection();
DataGridFiles.Rows[e.RowIndex].Selected = true;
}
//只选中一行时设置活动单元格
if (DataGridFiles.SelectedRows.Count == )
{
DataGridFiles.CurrentCell = DataGridFiles.Rows[e.RowIndex].Cells[e.ColumnIndex];
}
//弹出操作菜单
contextMenuStrip1.Show(MousePosition.X, MousePosition.Y);
}
}
} private void FormMain_FormClosing(object sender, FormClosingEventArgs e)
{
if (JobManager.GetSchedule("StartRunMonitorTask") != null)
{
JobManager.Stop();
JobManager.RemoveJob("StartRunMonitorTask");
}
System.Environment.Exit();
Application.Exit();
} private void button1_Click(object sender, EventArgs e)
{
var fa = new FormAccounts(setting);
fa.Owner = this;
fa.ShowDialog();
}
}
}
 using System;
using System.IO;
using System.Net;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Xml;
using System.Xml.Linq;
using System.Xml.Serialization;
using System.Threading;
using System.Security.Cryptography; namespace NuFTPCmmndv5
{ public class FTPAccount
{
[XmlIgnore]
private ProcessQueue<TaskFile> processQueue; [XmlIgnore]
public Setting setting; public FTPAccount()
{
//processQueue = new ProcessQueue<TaskFile>(UpLoadFileTest);
processQueue = new ProcessQueue<TaskFile>(UpLoadFile);
cancelTask = new TaskFile();
delTask = new TaskFile();
} [XmlElement]
/// <summary>
/// FTP名
/// </summary>
public string FTPName { get; set; }
[XmlElement]
/// <summary>
/// FTP对应IP地址
/// </summary>
public string IP { get; set; }
[XmlElement]
/// <summary>
/// 端口号
/// </summary>
public int Port { get; set; }
[XmlElement]
/// <summary>
/// 账户名
/// </summary>
public string Username { get; set; }
[XmlElement]
/// <summary>
/// 密码
/// </summary>
public string Password { get; set; }
[XmlElement]
public int MaxThreadNum { get; set; } [XmlIgnore]
public TaskFile cancelTask { get; set; }
[XmlIgnore]
public TaskFile delTask { get; set; } /// <summary>
/// 开始处理上传队列
/// </summary>
public void Start()
{
processQueue.Start();
} /// <summary>
/// 添加到队列
/// </summary>
/// <param name="task"></param>
public void AddQueue(TaskFile task)
{
processQueue.Enqueue(task);
} /// <summary>
/// 是否已经包含在队列
/// </summary>
/// <param name="task"></param>
/// <returns></returns>
public bool Contains(TaskFile task)
{
return processQueue.Contains(task);
} /// <summary>
/// 上传进度
/// </summary>
public event Action<TaskFile, CompetedEventArgs> ProcessProgress;
/// <summary>
/// 上传完成
/// </summary>
public event Action<TaskFile, CompetedEventArgs> Completed;
/// <summary>
/// 终止上传
/// </summary>
public event Action<TaskFile, CompetedEventArgs> Aborted;
/// <summary>
/// 删除上传
/// </summary>
public event Action<TaskFile, CompetedEventArgs> Deleted;
/// <summary>
/// 上传日志
/// </summary>
public event Action<string> ProcessLog; private void OnProcessProgress(TaskFile pendingValue, CompetedEventArgs args)
{
if (ProcessProgress != null)
{
try
{
ProcessProgress(pendingValue, args);
}
catch { }
}
} private void OnCompleted(TaskFile pendingValue, CompetedEventArgs args)
{
if (Completed != null)
{
try
{
Completed(pendingValue, args);
}
catch { }
}
}
private void OnAborted(TaskFile pendingValue, CompetedEventArgs args)
{
if (Aborted != null)
{
try
{
Aborted(pendingValue, args);
}
catch { }
}
}
private void OnDeleted(TaskFile pendingValue, CompetedEventArgs args)
{
if (Deleted != null)
{
try
{
Deleted(pendingValue, args);
}
catch { }
}
}
private void OnProcessLog(string log)
{
if (ProcessLog != null)
{
try
{
ProcessLog(log);
}
catch { }
}
} public void UpLoadFileTest(TaskFile task)
{
OnProcessLog("Thread:" + Thread.CurrentThread.ManagedThreadId);
OnProcessLog("Uploading: " + FTPName + "://" + IP + "/" + task.DIR);
OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = , uploadStatus = UploadStatus.Uploading });
for (int i = ; i <= ; i++)
{
OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = i, uploadStatus = UploadStatus.Uploading });
Thread.Sleep(GenerateRandomInteger(,));
}
OnProcessLog("File Uploaded Successfully: " + FTPName + "://" + IP + "/" + task.DIR);
OnCompleted(task, new CompetedEventArgs() { CompetedPrecent = , uploadStatus = UploadStatus.Completed });
//移除文件
File.Delete(setting.FolderToMonitor + "pending\\" + task.Filename);
} public int GenerateRandomInteger(int min = , int max = )
{
var randomNumberBuffer = new byte[];
new RNGCryptoServiceProvider().GetBytes(randomNumberBuffer);
return new Random(BitConverter.ToInt32(randomNumberBuffer, )).Next(min, max);
} /// <summary>
/// 上传文件
/// </summary>
/// <param name="task"></param>
public void UpLoadFile(TaskFile task)
{
FileInfo _FileInfo = null;
FtpWebRequest _FtpWebRequest;
Stream ftpStream = null;
FileStream _FileStream = null;
int buffLength = ;
byte[] buffer; int contentLen = ;
long uploaded = ; string _directory = "";
string dirStr = "";
FtpWebResponse response; var uploadDatetime = DateTime.Now; try
{
_FileInfo = new FileInfo(task.LCD);
OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = , uploadStatus = UploadStatus.Uploading }); //创建目录
if (task.DIR.IndexOf("/") >= )
{
//"/data/test/1.XML"
_directory = task.DIR.Substring(, task.DIR.LastIndexOf("/") - );
OnProcessLog("Creating DIR: " + FTPName + "://" + IP + "/" + _directory);
dirStr = ""; #region
foreach (var dir in _directory.Split(new char[] { '/' }, StringSplitOptions.RemoveEmptyEntries))
{
dirStr += dir + "/";
try
{
_FtpWebRequest = (FtpWebRequest)FtpWebRequest.Create(new Uri("ftp://" + IP + "/" + dirStr));
_FtpWebRequest.Method = WebRequestMethods.Ftp.MakeDirectory;
_FtpWebRequest.UseBinary = true;
_FtpWebRequest.Credentials = new NetworkCredential(Username, Password); response = (FtpWebResponse)_FtpWebRequest.GetResponse();
ftpStream = response.GetResponseStream();
ftpStream.Close();
response.Close();
}
catch (Exception ex)
{
if (ftpStream != null)
{
ftpStream.Close();
ftpStream.Dispose();
}
}
}
#endregion
} OnProcessLog("Uploading: " + FTPName + "://" + IP + "/" + task.DIR); _FtpWebRequest = (FtpWebRequest)FtpWebRequest.Create(new Uri("ftp://" + IP + "/" + task.DIR));
_FtpWebRequest.Credentials = new NetworkCredential(Username, Password);
_FtpWebRequest.KeepAlive = false;
_FtpWebRequest.Timeout = ;
_FtpWebRequest.Method = WebRequestMethods.Ftp.UploadFile;
_FtpWebRequest.UseBinary = true;
_FtpWebRequest.ContentLength = _FileInfo.Length; buffLength = ;
buffer = new byte[buffLength]; _FileStream = _FileInfo.OpenRead();
ftpStream = _FtpWebRequest.GetRequestStream();
contentLen = _FileStream.Read(buffer, , buffLength);
uploaded = contentLen; var cancel = false;
var delete = false;
var timeOut = false; while (contentLen > )
{
if (cancelTask.Filename == task.Filename)
cancel = true; if (delTask.Filename == task.Filename)
delete = true; if (DateTime.Now.Subtract(uploadDatetime).Seconds > )
timeOut = true; if (cancel)
{
OnProcessLog("Thread Cancel: " + FTPName + "://" + IP + "/" + task.DIR);
throw new Exception("Cancel");
cancel = false;
}
else if (delete)
{
OnProcessLog("Thread Delete: " + FTPName + "://" + IP + "/" + task.DIR);
throw new Exception("Delete");
delete = false;
}
else if (timeOut)
{
OnProcessLog("Thread Timeout: " + FTPName + "://" + IP + "/" + task.DIR);
throw new Exception("Timeout");
timeOut = false;
} ftpStream.Write(buffer, , contentLen);
contentLen = _FileStream.Read(buffer, , buffLength);
uploaded += contentLen;
//上传进度
OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = (int)(uploaded / _FileInfo.Length) * , uploadStatus = UploadStatus.Uploading });
} ftpStream.Close();
ftpStream.Dispose();
_FileStream.Close();
_FileStream.Dispose(); //上传完成
OnProcessLog("File Uploaded Successfully: " + FTPName + "://" + IP + "/" + task.DIR);
OnCompleted(task, new CompetedEventArgs() { CompetedPrecent = , uploadStatus = UploadStatus.Completed }); //移除文件
File.Delete(setting.FolderToMonitor + "pending\\" + task.Filename);
}
catch (Exception ex)
{
//上传失败
OnProcessLog("Failed to upload: " + FTPName + "://" + IP + "/" + task.DIR); try
{
File.Move(setting.FolderToMonitor + "pending\\" + task.Filename, setting.FolderToMonitor + "err\\" + task.Filename);
}
catch (Exception ex1)
{
OnProcessLog("Moving Files Err:" + task.HOST + ": " + task.DIR + ex1.Message);
try
{
File.Copy(setting.FolderToMonitor + "pending\\" + task.Filename, setting.FolderToMonitor + "err\\" + task.Filename);
File.Delete(setting.FolderToMonitor + "pending\\" + task.Filename);
}
catch (Exception ex2)
{ OnProcessLog("Uploading Err:" + task.HOST + "/" + task.DIR + ex2.Message);
OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = , uploadStatus = UploadStatus.Failed });
}
}
finally
{
OnProcessLog("Uploading Err:" + task.HOST + "/" + task.DIR + ex.Message); if (ex.Message == "TimeOut")
OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = , uploadStatus = UploadStatus.Timeout });
else if (ex.Message == "Cancel")
OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = , uploadStatus = UploadStatus.Cancel });
else if (ex.Message == "Delete")
OnDeleted(task, new CompetedEventArgs() { CompetedPrecent = , uploadStatus = UploadStatus.Delete });
else
OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = , uploadStatus = UploadStatus.Failed });
} }
finally
{
if (ftpStream != null)
{
ftpStream.Close();
ftpStream.Dispose();
}
if (_FileStream != null)
{
_FileStream.Close();
_FileStream.Dispose();
}
}
} /// <summary>
/// 完成事件数据
/// </summary>
public class CompetedEventArgs : EventArgs
{
public CompetedEventArgs()
{
}
public UploadStatus uploadStatus { get; set; }
/// <summary>
/// 完成百分率
/// </summary>
public int CompetedPrecent { get; set; }
/// <summary>
/// 异常信息
/// </summary>
public Exception InnerException { get; set; }
} public enum DoWorkResult
{
/// <summary>
/// 继续运行,默认
/// </summary>
ContinueThread = ,
/// <summary>
/// 终止当前线程
/// </summary>
AbortCurrentThread = ,
/// <summary>
/// 终止全部线程
/// </summary>
AbortAllThread =
} public enum UploadStatus
{
Completed = ,
Failed = ,
Timeout = ,
Cancel = ,
Uploading = ,
Delete = ,
Abort = ,
}
}
}
 using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Collections;
using System.Threading; namespace NuFTPCmmndv5
{
/// <summary>
/// 表示一个线程(同步)安全的通用泛型处理队列
/// </summary>
/// <typeparam name="T">ProcessQueue中包含的数据类型</typeparam>
public class ProcessQueue<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable
{
#region Instance Variables private object syncRoot = new object();
private Queue<T> queue;
private List<WorkerThread> threads = new List<WorkerThread>();
private Action<T> operation;
bool isDisposed;
bool isRunning; private int _maxThreadCount; public int MaxThreadCount
{
get { return _maxThreadCount == ? : _maxThreadCount; }
set { value = _maxThreadCount; }
} #endregion #region Constructors
/// <summary>
/// 初始一个新的ProcessQueue 并指定具有特定容量的工作线程
/// </summary>
public ProcessQueue(Action<T> action) : this(action, ) { }
/// <summary>
/// 初始一个新的ProcessQueue
/// </summary>
public ProcessQueue(int capacity, Action<T> action) : this(capacity, action, ) { }
/// <summary>
/// 初始一个新的ProcessQueue
/// </summary>
public ProcessQueue(IEnumerable<T> collection, Action<T> action) : this(collection, action, ) { } /// <summary>
/// 初始一个新的ProcessQueue
/// </summary>
public ProcessQueue(Action<T> action, int threadCount)
{
queue = new Queue<T>();
operation = action; SetThreadCount(MaxThreadCount);
} /// <summary>
/// 初始一个新的ProcessQueue
/// </summary>
/// <param name="capacity">初始容量</param>
public ProcessQueue(int capacity, Action<T> action, int threadCount)
{
queue = new Queue<T>(capacity);
operation = action; SetThreadCount(MaxThreadCount);
} /// <summary>
/// 初始一个新的ProcessQueue
/// </summary>
/// <param name="collection">将数据复制到ProcessQueue.</param>
public ProcessQueue(IEnumerable<T> collection, Action<T> action, int threadCount)
{
queue = new Queue<T>(collection);
operation = action; SetThreadCount(MaxThreadCount);
}
#endregion #region Processing Control /// <summary>
/// 停止 (挂起)
/// </summary>
public void Stop()
{
lock (syncRoot)
{
foreach (WorkerThread thread in threads)
{
thread.Pause();
} isRunning = false;
}
} /// <summary>
/// 开始运行
/// </summary>
public void Start()
{
lock (syncRoot)
{
//清空队列集合重新创建新的线程
RegenerateIfDisposed();
//如果新进的项目少于当前线程集合总的线程数,则创建当前新进的项目数
//如果新进 的项目多余当前线程集合的线程数,则创建同样多的数程集合的线程数
for (int i = ; i < Math.Min(threads.Count, queue.Count); i++)
{
//设置信号让其运行
threads[i].Signal();
}
isRunning = true;
}
} /// <summary>
/// 获取此ProcessQueue使用的工作线程数。 使用SetThreadCount更改此值。
/// </summary>
public int ThreadCount { get { return threads.Count; } } /// <summary>
/// 设置此ProcessQueue使用的工作线程数,并根据需要分配或释放线程。
/// </summary>
/// <param name="threadCount">线程数</param>
public void SetThreadCount(int threadCount)
{
//至少要有一个线程
if (threadCount < ) throw new ArgumentOutOfRangeException("threadCount", "The ProcessQueue class requires at least one worker thread.");
//同步线程
lock (syncRoot)
{
// 等待队列
int pending = queue.Count;
// 创建一个指定最大工作线程数的线程集合,每个线程用来处理排队的项目
for (int i = threads.Count; i < threadCount; i++)
{
//注意:在实例化工作线程WorkerThread 时,已经创建了一个ThreadProc 无限循环方法,改方法检测 signalEvent, abortEvent 信号
//在收到 abortEvent 时终止并退出,收到 Signal时 循环调用 ProcessItems() 用来处理队列里排队的项目
WorkerThread thread = new ProcessQueue<T>.WorkerThread(this);
//添加到列表
threads.Add(thread);
//线程启动
thread.Start();
//如果队列总有待排项目时
if (pending > )
{
//设置信号,让当前工作线程运行(不等待)
thread.Signal();
}
//待排队数减一
pending--;
} //如果其它线程调用了SetThreadCount,或者多次调用了 SetThreadCount,从而导致当前实际的线程集合有可能远远大于最大线程数
//在这种情况下,需要移除多余的线程,从而保证当前threadCount有效
//移除的线程数 = 当前创建的工作线程集合总数 - 设置的最大线程数
int toRemove = threads.Count - threadCount;
if (toRemove > )
{
//IsSignaled 如果当前实例收到信号,则为 true;否则为 false
//从线程集合里取出正在等待的线程
foreach (WorkerThread thread in threads.Where(t => !t.IsSignaled).ToList())
{
//设置信号使得该线程终止
thread.Abort();
//从集合中移除改项
threads.Remove(thread);
//移除数减一
toRemove--;
}
//如果待移除的线程正在运行中
//则强制移除该线程直到移除完为止
while (toRemove > )
{
WorkerThread thread = threads[threads.Count - ];
thread.Abort();
threads.Remove(thread);
toRemove--;
}
}
}
} /// <summary>
/// 处理队列项
/// </summary>
/// <param name="item"></param>
private void ProcessItem(T item)
{
operation(item);
} /// <summary>
/// 释放时重置线程
/// </summary>
private void RegenerateIfDisposed()
{
if (isDisposed)
{
int threadCount = threads.Count; threads.Clear(); SetThreadCount(threadCount);
} isDisposed = false;
}
#endregion /// <summary>
/// 从ProcessQueue清除所有未处理的项目
/// </summary>
public void Clear()
{
lock (syncRoot)
{
queue.Clear();
}
} /// <summary>
/// 尝试从ProcessQueue中检索获取下一个项目(如果存在)。 如果不存在,则将值设置为其默认值。
/// </summary>
/// <param name="value">如果不存在,则该变量将被设置为默认值.</param>
/// <returns>如果ProcessQueue包含一个项,则为真,如果没有则为False</returns>
public bool TryDequeue(out T value)
{
lock (syncRoot)
{
if (queue.Count > )
{
value = queue.Dequeue(); return true;
}
else
{
value = default(T); return false;
}
}
} /// <summary>
/// 确定队列是否包含指定项
/// </summary>
/// <param name="item">当前项</param>
/// <returns>存在则 True, 不存在则 False</returns>
public bool Contains(T item)
{
lock (syncRoot)
{
return queue.Contains(item);
}
} /// <summary>
/// 将ProcessQueue的内容复制到外部数组,而不影响ProcessQueue的内容
/// </summary>
/// <param name="array">The array to copy the items into</param>
/// <param name="arrayIndex">The starting index in the array</param>
public void CopyTo(T[] array, int arrayIndex)
{
lock (syncRoot)
{
queue.CopyTo(array, arrayIndex);
}
} /// <summary>
/// 从ProcessQueue中检索下一个未处理的项目并将其删除
/// </summary>
/// <returns>The next unprocessed item in the ProcessQueue</returns>
public T Dequeue()
{
lock (syncRoot)
{
return queue.Dequeue();
}
} /// <summary>
/// 将一个项目添加到处理队列的末尾
/// </summary>
/// <param name="item">添加项</param>
public void Enqueue(T item)
{
lock (syncRoot)
{
//新进队列项
queue.Enqueue(item);
//当前处理队列正在运行时
if (isRunning)
{
//清空队列集合重新创建新的线程
RegenerateIfDisposed();
//取出一个等待的线程
WorkerThread firstThread = threads.Where(t => !t.IsSignaled).FirstOrDefault();
//存在则运行它
if (firstThread != null) firstThread.Signal();
}
}
} /// <summary>
/// 从ProcessQueue中检索下一个未处理的项目,而不删除它
/// </summary>
/// <returns>The next unprocessed item in the ProcessQueue</returns>
public T Peek()
{
lock (syncRoot)
{
return queue.Peek();
}
} /// <summary>
/// 返回一个包含ProcessQueue中所有未处理项目的数组
/// </summary>
/// <returns></returns>
public T[] ToArray()
{
lock (syncRoot)
{
return queue.ToArray();
}
} /// <summary>
/// 将ProcessQueue的容量设置为其包含的项目的实际数量,除非该数量超过当前容量的90%。
/// </summary>
public void TrimExcess()
{
lock (syncRoot)
{
queue.TrimExcess();
}
} #region IEnumerable<T> Members public IEnumerator<T> GetEnumerator()
{
return queue.GetEnumerator();
} #endregion #region IEnumerable Members IEnumerator IEnumerable.GetEnumerator()
{
return queue.GetEnumerator();
} #endregion #region ICollection Members void ICollection.CopyTo(Array array, int index)
{
lock (syncRoot)
{
((ICollection)queue).CopyTo(array, index);
}
} public int Count
{
get
{
lock (syncRoot)
{
return queue.Count;
}
}
} bool ICollection.IsSynchronized
{
get { return true; }
} object ICollection.SyncRoot
{
get { return syncRoot; }
} #endregion #region IDisposable Members public void Dispose()
{
Dispose(true);
} private void Dispose(bool disposing)
{
if (disposing)
{
foreach (WorkerThread thread in threads) thread.Abort();
} isDisposed = true;
} #endregion /// <summary>
/// 封装.NET Thread对象并管理与控制其行为相关联的WaitHandles
/// </summary>
private class WorkerThread
{
private ManualResetEvent abortEvent;
private ManualResetEvent signalEvent;
private ProcessQueue<T> queue; private Thread thread; public WorkerThread(ProcessQueue<T> queue)
{
abortEvent = new ManualResetEvent(false);
signalEvent = new ManualResetEvent(false);
this.queue = queue; thread = new Thread(ThreadProc);
thread.Name = "ProcessQueue Worker ID " + thread.ManagedThreadId;
} /// <summary>
/// 运行当前线程
/// </summary>
public void Start()
{
thread.Start();
} /// <summary>
/// 终止当前线程
/// </summary>
public void Abort()
{
abortEvent.Set(); thread.Join();
} /// <summary>
/// 清除信号WaitHandle,导致线程完成当前的迭代后暂停
/// </summary>
public void Pause()
{
signalEvent.Reset();
} /// <summary>
/// 设置信号WaitHandle,等待线程使其恢复运行(如果暂停)
/// </summary>
public void Signal()
{
signalEvent.Set();
} public bool IsSignaled
{
get { return signalEvent.WaitOne(); }
} /// <summary>
/// ThreadProc 总线程方法由一个无限循环组成,在发出中止事件时退出
/// </summary>
private void ThreadProc()
{
WaitHandle[] handles = new WaitHandle[] { signalEvent, abortEvent }; while (true)
{
//等待指定数组中的任一元素收到信号
switch (WaitHandle.WaitAny(handles))
{
case : // signal
{
ProcessItems();
}
break;
case : // abort
{
return;
}
}
}
} /// <summary>
/// 处理项目
/// </summary>
private void ProcessItems()
{
T item;
//从队列中取出一项,这是一个同步的过程
while (queue.TryDequeue(out item))
{
//处理队列项
queue.ProcessItem(item);
//如果当前实例收到信号,则为 true;否则为 false。
//等待当前队列完成 在 调用 signalEvent.Set() 或者 abortEvent.Set() 时
if (!signalEvent.WaitOne() || abortEvent.WaitOne()) return;
}
//线程状态设置为非终止状态
signalEvent.Reset();
}
}
}
}