SQLAlchemy - performing a bulk upsert (if exists, update, else insert) in postgresql
There is an upsert-esque operation in SQLAlchemy:
db.session.merge()
After I found this command, I was able to perform upserts, but it is worth mentioning that this operation is slow for a bulk "upsert".
The alternative is to get a list of the primary keys you would like to upsert, and query the database for any matching ids:
# Imagine that post1, post5, and post1000 are posts objects with ids 1, 5 and 1000 respectively# The goal is to "upsert" these posts.# we initialize a dict which maps id to the post objectmy_new_posts = {1: post1, 5: post5, 1000: post1000} for each in posts.query.filter(posts.id.in_(my_new_posts.keys())).all(): # Only merge those posts which already exist in the database db.session.merge(my_new_posts.pop(each.id))# Only add those posts which did not exist in the database db.session.add_all(my_new_posts.values())# Now we commit our modifications (merges) and inserts (adds) to the database!db.session.commit()
You can leverage the on_conflict_do_update
variant. A simple example would be the following:
from sqlalchemy.dialects.postgresql import insertclass Post(Base): """ A simple class for demonstration """ id = Column(Integer, primary_key=True) title = Column(Unicode)# Prepare all the values that should be "upserted" to the DBvalues = [ {"id": 1, "title": "mytitle 1"}, {"id": 2, "title": "mytitle 2"}, {"id": 3, "title": "mytitle 3"}, {"id": 4, "title": "mytitle 4"},]stmt = insert(Post).values(values)stmt = stmt.on_conflict_do_update( # Let's use the constraint name which was visible in the original posts error msg constraint="post_pkey", # The columns that should be updated on conflict set_={ "title": stmt.excluded.title })session.execute(stmt)
See the PG docs for more details (f.ex. where the "excluded" term comes from).
Side-Note on duplicated column names
The above code uses the column names as dict keys both in the values
list and the argument to set_
. If the column-name is changed in the class-definition this needs to be changed everywhere or it will break. This can be avoided by accessing the column definitions, making the code a bit uglier, but more robust:
coldefs = Post.__table__.cvalues = [ {coldefs.id.name: 1, coldefs.title.name: "mytitlte 1"}, ...]stmt = stmt.on_conflict_do_update( ... set_={ coldefs.title.name: stmt.excluded.title ... })
An alternative approach using compilation extension (https://docs.sqlalchemy.org/en/13/core/compiler.html):
from sqlalchemy.ext.compiler import compilesfrom sqlalchemy.sql.expression import Insert@compiles(Insert)def compile_upsert(insert_stmt, compiler, **kwargs): """ converts every SQL insert to an upsert i.e; INSERT INTO test (foo, bar) VALUES (1, 'a') becomes: INSERT INTO test (foo, bar) VALUES (1, 'a') ON CONFLICT(foo) DO UPDATE SET (bar = EXCLUDED.bar) (assuming foo is a primary key) :param insert_stmt: Original insert statement :param compiler: SQL Compiler :param kwargs: optional arguments :return: upsert statement """ pk = insert_stmt.table.primary_key insert = compiler.visit_insert(insert_stmt, **kwargs) ondup = f'ON CONFLICT ({",".join(c.name for c in pk)}) DO UPDATE SET' updates = ', '.join(f"{c.name}=EXCLUDED.{c.name}" for c in insert_stmt.table.columns) upsert = ' '.join((insert, ondup, updates)) return upsert
This should ensure that all insert statements behave as upserts. This implementation is in Postgres dialect, but it should be fairly easy to modify for MySQL dialect.