Keeping Data in memory Keeping Data in memory multithreading multithreading

Keeping Data in memory


Sorry but I would say that it is a bad idea. There are the following problems:

  • If the application pool recycles before data is written to the database you will loose data
  • Keeping all data in the same collection leads to the need to lock that collection when data is inserted and when the data is written to disk and the collection cleared. This could cause the whole site to pause during the bulk insert.
  • Your code will be more complicated with the extra step. Fixing threading problems is hard

We have written web applications that write 1000 rows per second to an SQL Server database at peak load.

Try writting your application as simple as possible first and then performance test it.

The speed at which you can insert into the database depends alot on your hardware, but there are also things that you can do in your program:

  • only have one index (clustered) on the table. Key autonumber.
  • make sure that you release the connection to the database as soon as possible.


I have done pretty much exactly the same thing as you described with the code below. Its thread safe and has a flush method you could call to flush and pending writes. Once it reaches a threshold number of objects to write it sends the queue (List in my case) to a different thread for saving. Note that it uses a manualResetEvent to handle flushing the data at the end (there is a limit of 64 reset events that you can wait on, so that is why is manually waits if we have more than 64 background threads pending writing, but that should almost never happen unless your database is really slow). This code was used to process 10s of millions of records that were streamed into it (from memory it took about 5 mins to write 20m rows, but was running on save server as database, so no network hop...SQL can certainly handle thousands of rows a second using BulkSqlCopy object and IDataReader), so it should handle your request load (but of course that will depend on what you are writing and your database, but I think the code is up to the task!).

Also, to facility bulk writing I create a minimal implementation of an IDataReader to stream my data. You will need to do that for your reqests to use the code below.

public class DataImporter<T>{    public DataImporter(string tableName, string readerName)    {        _tableName = tableName;        _readerName = readerName;    }    /// <summary>    /// This is the size of our bulk staging list.    /// </summary>    /// <remarks>    /// Note that the SqlBulkCopy object has a batch size property, which may not be the same as this value,    /// so records may not be going into the database in sizes of this staging value.    /// </remarks>    private int _bulkStagingListSize = 20000;    private List<ManualResetEvent> _tasksWaiting = new List<ManualResetEvent>();    private string _tableName = String.Empty;    private string _readerName = String.Empty;    public void QueueForImport(T record)    {        lock (_listLock)        {            _items.Add(record);            if (_items.Count > _bulkStagingListSize)            {                SaveItems(_items);                _items = new List<T>();            }        }    }    /// <summary>    /// This method should be called at the end of the queueing work to ensure to clear down our list    /// </summary>    public void Flush()    {        lock (_listLock)        {            SaveItems(_items);            _items = new List<T>();            while (_tasksWaiting.Count > 64)            {                Thread.Sleep(2000);            }            WaitHandle.WaitAll(_tasksWaiting.ToArray());        }    }    private void SaveItems(List<T> items)    {        ManualResetEvent evt = new ManualResetEvent(false);        _tasksWaiting.Add(evt);        IDataReader reader = DataReaderFactory.GetReader<T>(_readerName,_items);        Tuple<ManualResetEvent, IDataReader> stateInfo = new Tuple<ManualResetEvent, IDataReader>(evt, reader);        ThreadPool.QueueUserWorkItem(new WaitCallback(saveData), stateInfo);    }    private void saveData(object info)    {        using (new ActivityTimer("Saving bulk data to " + _tableName))        {            Tuple<ManualResetEvent, IDataReader> stateInfo = info as Tuple<ManualResetEvent, IDataReader>;            IDataReader r = stateInfo.Item2;            try            {                Database.DataImportStagingDatabase.BulkLoadData(r, _tableName);            }            catch (Exception ex)            {                //Do something            }            finally            {                _tasksWaiting.Remove(stateInfo.Item1);                stateInfo.Item1.Set();            }        }    }    private object _listLock = new object();    private List<T> _items = new List<T>();}

The DataReaderFactory refered to below just selects the right IDataReader implmentation to use for streaming and looks as follows:

internal static class DataReaderFactory{    internal static IDataReader GetReader<T>(string typeName, List<T> items)    {        IDataReader reader = null;        switch(typeName)        {            case "ProductRecordDataReader":                reader =  new ProductRecordDataReader(items as List<ProductRecord>) as IDataReader;                break;            case "RetailerPriceRecordDataReader":                reader =  new RetailerPriceRecordDataReader(items as List<RetailerPriceRecord>) as IDataReader;                break;            default:                break;        }        return reader;    }}

The data reader implementation that I used in this case (althoght this code will work with any data reader) is as follows:

/// <summary>/// This class creates a data reader for ProductRecord data.  This is used to stream the records/// to the SqlBulkCopy object./// </summary>public class ProductRecordDataReader:IDataReader{    public ProductRecordDataReader(List<ProductRecord> products)    {        _products = products.ToList();    }    List<ProductRecord> _products;    int currentRow;    int rowCounter = 0;    public int FieldCount    {        get        {            return 14;        }    }    #region IDataReader Members    public void Close()    {        //Do nothing.    }    public bool Read()    {        if (rowCounter < _products.Count)        {            currentRow = rowCounter;            rowCounter++;            return true;        }        else        {            return false;        }    }    public int RecordsAffected    {        get { throw new NotImplementedException(); }    }    public string GetName(int i)    {        switch (i)        {            case 0:                return "ProductSku";            case 1:                return "UPC";            case 2:                return "EAN";            case 3:                return "ISBN";            case 4:                return "ProductName";            case 5:                return "ShortDescription";            case 6:                return "LongDescription";            case 7:                return "DFFCategoryNumber";            case 8:                return "DFFManufacturerNumber";            case 9:                return "ManufacturerPartNumber";            case 10:                return "ManufacturerModelNumber";            case 11:                return "ProductImageUrl";            case 12:                return "LowestPrice";            case 13:                return "HighestPrice";            default:                return null;        }    }    public int GetOrdinal(string name)    {        switch (name)        {            case "ProductSku":                return 0;            case "UPC":                return 1;            case "EAN":                return 2;            case "ISBN":                return 3;            case "ProductName":                return 4;            case "ShortDescription":                return 5;            case "LongDescription":                return 6;            case "DFFCategoryNumber":                return 7;            case "DFFManufacturerNumber":                return 8;            case "ManufacturerPartNumber":                return 9;            case "ManufacturerModelNumber":                return 10;            case "ProductImageUrl":                return 11;            case "LowestPrice":                return 12;            case "HighestPrice":                return 13;            default:                return -1;        }    }    public object GetValue(int i)    {        switch (i)        {            case 0:                return _products[currentRow].ProductSku;            case 1:                return _products[currentRow].UPC;            case 2:                return _products[currentRow].EAN;            case 3:                return _products[currentRow].ISBN;            case 4:                return _products[currentRow].ProductName;            case 5:                return _products[currentRow].ShortDescription;            case 6:                return _products[currentRow].LongDescription;            case 7:                return _products[currentRow].DFFCategoryNumber;            case 8:                return _products[currentRow].DFFManufacturerNumber;            case 9:                return _products[currentRow].ManufacturerPartNumber;            case 10:                return _products[currentRow].ManufacturerModelNumber;            case 11:                return _products[currentRow].ProductImageUrl;            case 12:                return _products[currentRow].LowestPrice;            case 13:                return _products[currentRow].HighestPrice;            default:                return null;        }    }    #endregion    #region IDisposable Members    public void Dispose()    {        //Do nothing;    }    #endregion    #region IDataRecord Members    public bool NextResult()    {        throw new NotImplementedException();    }    public int Depth    {        get { throw new NotImplementedException(); }    }    public DataTable GetSchemaTable()    {        throw new NotImplementedException();    }    public bool IsClosed    {        get { throw new NotImplementedException(); }    }    public bool GetBoolean(int i)    {        throw new NotImplementedException();    }    public byte GetByte(int i)    {        throw new NotImplementedException();    }    public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length)    {        throw new NotImplementedException();    }    public char GetChar(int i)    {        throw new NotImplementedException();    }    public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length)    {        throw new NotImplementedException();    }    public IDataReader GetData(int i)    {        throw new NotImplementedException();    }    public string GetDataTypeName(int i)    {        throw new NotImplementedException();    }    public DateTime GetDateTime(int i)    {        throw new NotImplementedException();    }    public decimal GetDecimal(int i)    {        throw new NotImplementedException();    }    public double GetDouble(int i)    {        throw new NotImplementedException();    }    public Type GetFieldType(int i)    {        throw new NotImplementedException();    }    public float GetFloat(int i)    {        throw new NotImplementedException();    }    public Guid GetGuid(int i)    {        throw new NotImplementedException();    }    public short GetInt16(int i)    {        throw new NotImplementedException();    }    public int GetInt32(int i)    {        throw new NotImplementedException();    }    public long GetInt64(int i)    {        throw new NotImplementedException();    }    public string GetString(int i)    {        throw new NotImplementedException();    }    public int GetValues(object[] values)    {        throw new NotImplementedException();    }    public bool IsDBNull(int i)    {        throw new NotImplementedException();    }    public object this[string name]    {        get { throw new NotImplementedException(); }    }    public object this[int i]    {        get { throw new NotImplementedException(); }    }    #endregion}

Finally the bulk load data method looks as follows:

    public void BulkLoadData(IDataReader reader, string tableName)    {        using (SqlConnection cnn = new SqlConnection(cnnString))        {            SqlBulkCopy copy = new SqlBulkCopy(cnn);            copy.DestinationTableName = tableName;            copy.BatchSize = 10000;            cnn.Open();            copy.WriteToServer(reader);        }    }

However, having said all that, I would recommend that you DO NOT use this code in asp.net for the reasons someone pointed out in another answer (particularly recycling of worker processes in IIS). I would suggest you use a very light weight queue to first send the request data to another service that won't restart (we use ZeroMQ to stream request and logging data out of an ASP.NET app that I am writing....it is very performant).

Mike.


I do see some discrepancies [...] due to threading

The basic thing here is to use 2 queues and cycle them. 1 for receiving and 1 for inserting. You only need to lock on the receive, with very little contention.