SQLAlchemy - performing a bulk upsert (if exists, update, else insert) in postgresql SQLAlchemy - performing a bulk upsert (if exists, update, else insert) in postgresql postgresql postgresql

SQLAlchemy - performing a bulk upsert (if exists, update, else insert) in postgresql


There is an upsert-esque operation in SQLAlchemy:

db.session.merge()

After I found this command, I was able to perform upserts, but it is worth mentioning that this operation is slow for a bulk "upsert".

The alternative is to get a list of the primary keys you would like to upsert, and query the database for any matching ids:

# Imagine that post1, post5, and post1000 are posts objects with ids 1, 5 and 1000 respectively# The goal is to "upsert" these posts.# we initialize a dict which maps id to the post objectmy_new_posts = {1: post1, 5: post5, 1000: post1000} for each in posts.query.filter(posts.id.in_(my_new_posts.keys())).all():    # Only merge those posts which already exist in the database    db.session.merge(my_new_posts.pop(each.id))# Only add those posts which did not exist in the database db.session.add_all(my_new_posts.values())# Now we commit our modifications (merges) and inserts (adds) to the database!db.session.commit()


You can leverage the on_conflict_do_update variant. A simple example would be the following:

from sqlalchemy.dialects.postgresql import insertclass Post(Base):    """    A simple class for demonstration    """    id = Column(Integer, primary_key=True)    title = Column(Unicode)# Prepare all the values that should be "upserted" to the DBvalues = [    {"id": 1, "title": "mytitle 1"},    {"id": 2, "title": "mytitle 2"},    {"id": 3, "title": "mytitle 3"},    {"id": 4, "title": "mytitle 4"},]stmt = insert(Post).values(values)stmt = stmt.on_conflict_do_update(    # Let's use the constraint name which was visible in the original posts error msg    constraint="post_pkey",    # The columns that should be updated on conflict    set_={        "title": stmt.excluded.title    })session.execute(stmt)

See the PG docs for more details (f.ex. where the "excluded" term comes from).

Side-Note on duplicated column names

The above code uses the column names as dict keys both in the values list and the argument to set_. If the column-name is changed in the class-definition this needs to be changed everywhere or it will break. This can be avoided by accessing the column definitions, making the code a bit uglier, but more robust:

coldefs = Post.__table__.cvalues = [    {coldefs.id.name: 1, coldefs.title.name: "mytitlte 1"},    ...]stmt = stmt.on_conflict_do_update(    ...    set_={        coldefs.title.name: stmt.excluded.title        ...    })


An alternative approach using compilation extension (https://docs.sqlalchemy.org/en/13/core/compiler.html):

from sqlalchemy.ext.compiler import compilesfrom sqlalchemy.sql.expression import Insert@compiles(Insert)def compile_upsert(insert_stmt, compiler, **kwargs):    """    converts every SQL insert to an upsert  i.e;    INSERT INTO test (foo, bar) VALUES (1, 'a')    becomes:    INSERT INTO test (foo, bar) VALUES (1, 'a') ON CONFLICT(foo) DO UPDATE SET (bar = EXCLUDED.bar)    (assuming foo is a primary key)    :param insert_stmt: Original insert statement    :param compiler: SQL Compiler    :param kwargs: optional arguments    :return: upsert statement    """    pk = insert_stmt.table.primary_key    insert = compiler.visit_insert(insert_stmt, **kwargs)    ondup = f'ON CONFLICT ({",".join(c.name for c in pk)}) DO UPDATE SET'    updates = ', '.join(f"{c.name}=EXCLUDED.{c.name}" for c in insert_stmt.table.columns)    upsert = ' '.join((insert, ondup, updates))    return upsert

This should ensure that all insert statements behave as upserts. This implementation is in Postgres dialect, but it should be fairly easy to modify for MySQL dialect.