独上高楼网站
  •    你所在位置:首页 VS.netC#C#编程〉大量数据转录的多线程和同步处理实现
  • 大量数据转录的多线程和同步处理实现
  • 作者:Justina Chen  文章来源:www.cnblogs.com  发布日期:2008-11-15  浏览次数:69
  • 打印这篇文章
  • 项目中需要对两个不同格式的存储设备进行数据转录,因为数据量非常大,所以时间非常缓慢;解决方案是使用ReaderWriterSlim类建立一个共享的同步数据,可以支持一个线程读取外部设备,向同步数据写入;多个线程从同步数据中读取,转换格式,然后写入到本地设备。

    本例中采用Queue作为存放数据的集合,写入线程向它的尾部写入对象,读取线程从它的头部获取对象。

    需要注意的是,由于Queue会抛弃已处理的对象,所以在同步数据队列中无法验证数据对象的唯一性,被写入的数据库需要去掉唯一约束,或在写入时向数据库请求验证。

    首先定义一个读写接口: 

     

    namespace Common
    {
        
    public interface IReaderWriter<T>
        {
            T Read(
    int argument);
            
    void Write(int arugment, T instance);
            
    void Delete(int argument);
            
    void Clear();
        }
    }

     

    然后实现一个队列的读写器:

     

    namespace Common
    {
        
    public class QueueReaderWriter<T> : IReaderWriter<T>
        {
            
    private Queue<T> queues;

            
    public QueueReaderWriter()
            {
                queues 
    = new Queue<T>();
            }

            
    public QueueReaderWriter(int capacity)
            {
                queues 
    = new Queue<T>(capacity);
            }

            
    #region IReadWrite 成员

            
    public T Read(int argument)
            {
                
    return queues.FirstOrDefault();
            }

            
    public void Write(int arugment, T instance)
            {
                queues.Enqueue(instance);
            }

            
    public void Delete(int argument)
            {
                queues.Dequeue();
            }

            
    public void Clear()
            {
                queues.Clear();
                queues.TrimExcess();
            }

            
    #endregion
        }
    }

     

    使用ReaderWriterLockSlim实现同步数据类:

    namespace Common
    {
        
    public class SynchronizedWriteData<T> : IDisposable
        {
            
    private ReaderWriterLockSlim _dataLock = new ReaderWriterLockSlim();
            
    private IReaderWriter<T> _innerData;

            
    private SynchronizedWriteData()
            { }

            
    public SynchronizedWriteData(IReaderWriter<T> innerData)
            {
                _innerData 
    = innerData;
            }

            
    public T Read()
            {
                _dataLock.EnterReadLock();
                
    try
                {
                    
    return _innerData.Read(0);
                }
                
    finally
                {
                    _dataLock.ExitReadLock();
                }
            }

            
    public T Read(int argument)
            {
                _dataLock.EnterReadLock();
                
    try
                {
                    
    return _innerData.Read(argument);
                }
                
    finally
                {
                    _dataLock.ExitReadLock();
                }
            }

            
    public void Add(T instance)
            {
                _dataLock.EnterWriteLock();
                
    try
                {
                    _innerData.Write(
    0, instance);
                }
                
    finally
                {
                    _dataLock.ExitWriteLock();
                }
            }

            
    public void Add(int argument, T instance)
            {
                _dataLock.EnterWriteLock();
                
    try
                {
                    _innerData.Write(argument, instance);
                }
                
    finally
                {
                    _dataLock.ExitWriteLock();
                }
            }

            
    public bool AddWithTimeout(T instance, int timeout)
            {
                
    if (_dataLock.TryEnterWriteLock(timeout))
                {
                    
    try
                    {
                        _innerData.Write(
    0, instance);
                    }
                    
    finally
                    {
                        _dataLock.ExitWriteLock();
                    }
                    
    return true;
                }
                
    else
                {
                    
    return false;
                }
            }

            
    public bool AddWithTimeout(int argument, T instance, int timeout)
            {
                
    if (_dataLock.TryEnterWriteLock(timeout))
                {
                    
    try
                    {
                        _innerData.Write(argument, instance);
                    }
                    
    finally
                    {
                        _dataLock.ExitWriteLock();
                    }
                    
    return true;
                }
                
    else
                {
                    
    return false;
                }
            }

            
    public void Delete()
            {
                _dataLock.EnterWriteLock();
                
    try
                {
                    _innerData.Delete(
    0);
                }
                
    finally
                {
                    _dataLock.ExitWriteLock();
                }
            }

            
    public void Delete(int argument)
            {
                _dataLock.EnterWriteLock();
                
    try
                {
                    _innerData.Delete(argument);
                }
                
    finally
                {
                    _dataLock.ExitWriteLock();
                }
            }

            
    #region IDisposable 成员

            
    public void Dispose()
            {
                
    try
                {
                    _dataLock.EnterWriteLock();
                    {
                        
    try
                        {
                            _innerData.Clear();
                        }
                        
    finally
                        {
                            _dataLock.ExitWriteLock();
                        }
                    }
                }
                
    finally
                {
                    _dataLock.Dispose();
                }
            }

            
    #endregion
        }
    }

     

    namespace ExternalDataHandle
    {
        
    /// 
        
    /// 从外部数据源获取到内部数据源的适配器抽象类
        
    /// 

        
    /// T 数据对象类型
        public abstract class ExternalDataAdapter<T> : IDisposable
        {
            
    /// 
            
    /// 外部数据源连接字符串
            
    /// 

            protected abstract string ConnectString { get; }

            
    /// 
            
    /// 提供初始化数据适配器的方法
            
    /// 

            protected abstract void Initialize();

            
    /// 
            
    /// 提供数据传递的方法
            
    /// 

            public abstract void Transmit();

            
    /// 
            
    /// 提供从外部数据设备读取数据的方法
            
    /// 

            protected abstract void ReadFromExternalDevice();

            
    /// 
            
    /// 提供保存数据到内部设备的方法
            
    /// 

            protected abstract void SaveToInternalDevice();

            
    #region IDisposable 成员

            
    public abstract void Dispose();

            
    #endregion
        }
    }

     

    多线程数据转录类,本例只使用了一个读取线程:

    namespace ExternalDataHandle
    {
        
    /// 
        
    /// 提供多线程方式从外部数据源获取到内部数据源的适配器类
        
    /// 

        
    /// 
        public abstract class MultiThreadAdapter<T> : ExternalDataAdapter<T>
        {
            
    protected SynchronizedWriteData<T> _data;
            
    protected Thread _readThread;

            
    protected abstract override string ConnectString { get; }

            
    protected abstract override void Initialize();

            
    public sealed override void Transmit()
            {
                _readThread 
    = new Thread(new ThreadStart(ReadFromExternalDevice));
                _readThread.Start();
                Thread.Sleep(
    10000);
                
    while (_readThread.IsAlive)
                {
                    SaveToInternalDevice();
                }
                _readThread.Join();
            }

            
    protected abstract override void ReadFromExternalDevice();

            
    protected abstract override void SaveToInternalDevice();

            
    public override void Dispose()
            {
                
    if (_data != null)
                {
                    _data.Dispose();
                }
            }
        }
    }

     

  • 打印这篇文章
  • 与本文主题相关的文章
  • 返回首页