Bulk insert with SQLAlchemy ORM Bulk insert with SQLAlchemy ORM database database

Bulk insert with SQLAlchemy ORM


SQLAlchemy introduced that in version 1.0.0:

Bulk operations - SQLAlchemy docs

With these operations, you can now do bulk inserts or updates!

For instance, you can do:

s = Session()objects = [    User(name="u1"),    User(name="u2"),    User(name="u3")]s.bulk_save_objects(objects)s.commit()

Here, a bulk insert will be made.


The sqlalchemy docs have a writeup on the performance of various techniques that can be used for bulk inserts:

ORMs are basically not intended for high-performance bulk inserts - this is the whole reason SQLAlchemy offers the Core in addition to the ORM as a first-class component.

For the use case of fast bulk inserts, the SQL generation and execution system that the ORM builds on top of is part of the Core. Using this system directly, we can produce an INSERT that is competitive with using the raw database API directly.

Alternatively, the SQLAlchemy ORM offers the Bulk Operations suite of methods, which provide hooks into subsections of the unit of work process in order to emit Core-level INSERT and UPDATE constructs with a small degree of ORM-based automation.

The example below illustrates time-based tests for several different methods of inserting rows, going from the most automated to the least. With cPython 2.7, runtimes observed:

classics-MacBook-Pro:sqlalchemy classic$ python test.pySQLAlchemy ORM: Total time for 100000 records 12.0471920967 secsSQLAlchemy ORM pk given: Total time for 100000 records 7.06283402443 secsSQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.856323003769 secsSQLAlchemy Core: Total time for 100000 records 0.485800027847 secssqlite3: Total time for 100000 records 0.487842082977 sec

Script:

import timeimport sqlite3from sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, String,  create_enginefrom sqlalchemy.orm import scoped_session, sessionmakerBase = declarative_base()DBSession = scoped_session(sessionmaker())engine = Noneclass Customer(Base):    __tablename__ = "customer"    id = Column(Integer, primary_key=True)    name = Column(String(255))def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'):    global engine    engine = create_engine(dbname, echo=False)    DBSession.remove()    DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False)    Base.metadata.drop_all(engine)    Base.metadata.create_all(engine)def test_sqlalchemy_orm(n=100000):    init_sqlalchemy()    t0 = time.time()    for i in xrange(n):        customer = Customer()        customer.name = 'NAME ' + str(i)        DBSession.add(customer)        if i % 1000 == 0:            DBSession.flush()    DBSession.commit()    print(        "SQLAlchemy ORM: Total time for " + str(n) +        " records " + str(time.time() - t0) + " secs")def test_sqlalchemy_orm_pk_given(n=100000):    init_sqlalchemy()    t0 = time.time()    for i in xrange(n):        customer = Customer(id=i+1, name="NAME " + str(i))        DBSession.add(customer)        if i % 1000 == 0:            DBSession.flush()    DBSession.commit()    print(        "SQLAlchemy ORM pk given: Total time for " + str(n) +        " records " + str(time.time() - t0) + " secs")def test_sqlalchemy_orm_bulk_insert(n=100000):    init_sqlalchemy()    t0 = time.time()    n1 = n    while n1 > 0:        n1 = n1 - 10000        DBSession.bulk_insert_mappings(            Customer,            [                dict(name="NAME " + str(i))                for i in xrange(min(10000, n1))            ]        )    DBSession.commit()    print(        "SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) +        " records " + str(time.time() - t0) + " secs")def test_sqlalchemy_core(n=100000):    init_sqlalchemy()    t0 = time.time()    engine.execute(        Customer.__table__.insert(),        [{"name": 'NAME ' + str(i)} for i in xrange(n)]    )    print(        "SQLAlchemy Core: Total time for " + str(n) +        " records " + str(time.time() - t0) + " secs")def init_sqlite3(dbname):    conn = sqlite3.connect(dbname)    c = conn.cursor()    c.execute("DROP TABLE IF EXISTS customer")    c.execute(        "CREATE TABLE customer (id INTEGER NOT NULL, "        "name VARCHAR(255), PRIMARY KEY(id))")    conn.commit()    return conndef test_sqlite3(n=100000, dbname='sqlite3.db'):    conn = init_sqlite3(dbname)    c = conn.cursor()    t0 = time.time()    for i in xrange(n):        row = ('NAME ' + str(i),)        c.execute("INSERT INTO customer (name) VALUES (?)", row)    conn.commit()    print(        "sqlite3: Total time for " + str(n) +        " records " + str(time.time() - t0) + " sec")if __name__ == '__main__':    test_sqlalchemy_orm(100000)    test_sqlalchemy_orm_pk_given(100000)    test_sqlalchemy_orm_bulk_insert(100000)    test_sqlalchemy_core(100000)    test_sqlite3(100000)


As far as I know, there is no way to get the ORM to issue bulk inserts. I believe the underlying reason is that SQLAlchemy needs to keep track of each object's identity (i.e., new primary keys), and bulk inserts interfere with that. For example, assuming your foo table contains an id column and is mapped to a Foo class:

x = Foo(bar=1)print x.id# Nonesession.add(x)session.flush()# BEGIN# INSERT INTO foo (bar) VALUES(1)# COMMITprint x.id# 1

Since SQLAlchemy picked up the value for x.id without issuing another query, we can infer that it got the value directly from the INSERT statement. If you don't need subsequent access to the created objects via the same instances, you can skip the ORM layer for your insert:

Foo.__table__.insert().execute([{'bar': 1}, {'bar': 2}, {'bar': 3}])# INSERT INTO foo (bar) VALUES ((1,), (2,), (3,))

SQLAlchemy can't match these new rows with any existing objects, so you'll have to query them anew for any subsequent operations.

As far as stale data is concerned, it's helpful to remember that the session has no built-in way to know when the database is changed outside of the session. In order to access externally modified data through existing instances, the instances must be marked as expired. This happens by default on session.commit(), but can be done manually by calling session.expire_all() or session.expire(instance). An example (SQL omitted):

x = Foo(bar=1)session.add(x)session.commit()print x.bar# 1foo.update().execute(bar=42)print x.bar# 1session.expire(x)print x.bar# 42

session.commit() expires x, so the first print statement implicitly opens a new transaction and re-queries x's attributes. If you comment out the first print statement, you'll notice that the second one now picks up the correct value, because the new query isn't emitted until after the update.

This makes sense from the point of view of transactional isolation - you should only pick up external modifications between transactions. If this is causing you trouble, I'd suggest clarifying or re-thinking your application's transaction boundaries instead of immediately reaching for session.expire_all().