Slick 3.0 bulk insert or update (upsert)
There are several ways that you can make this code faster (each one should be faster than the preceding ones, but it gets progressively less idiomatic-slick):
Run
insertOrUpdateAll
instead ofinsertOrUpdate
if on slick-pg 0.16.1+await(run(TableQuery[FooTable].insertOrUpdateAll rows)).sum
Run your DBIO events all at once, rather than waiting for each one to commit before you run the next:
val toBeInserted = rows.map { row => TableQuery[FooTable].insertOrUpdate(row) }val inOneGo = DBIO.sequence(toBeInserted)val dbioFuture = run(inOneGo)// Optionally, you can add a `.transactionally`// and / or `.withPinnedSession` here to pin all of these upserts// to the same transaction / connection// which *may* get you a little more speed:// val dbioFuture = run(inOneGo.transactionally)val rowsInserted = await(dbioFuture).sum
Drop down to the JDBC level and run your upsert all in one go (idea via this answer):
val SQL = """INSERT INTO table (a,b,c) VALUES (?, ?, ?)ON DUPLICATE KEY UPDATE c=VALUES(a)+VALUES(b);"""SimpleDBIO[List[Int]] { session => val statement = session.connection.prepareStatement(SQL) rows.map { row => statement.setInt(1, row.a) statement.setInt(2, row.b) statement.setInt(3, row.c) statement.addBatch() } statement.executeBatch()}
As you can see at Slick examples, you can use ++=
function to insert using JDBC batch insert feature. Per instance:
val foos = TableQuery[FooTable]val rows: Seq[Foo] = ...foos ++= rows // here slick will use batch insert
You can also "size" you batch by "grouping" the rows sequence:
val batchSize = 1000rows.grouped(batchSize).foreach { group => foos ++= group }
use sqlu
this demo work
case ("insertOnDuplicateKey",answers:List[Answer])=>{ def buildInsert(r: Answer): DBIO[Int] = sqlu"insert into answer (aid,bid,sbid,qid,ups,author,uid,nick,pub_time,content,good,hot,id,reply,pic,spider_time) values (${r.aid},${r.bid},${r.sbid},${r.qid},${r.ups},${r.author},${r.uid},${r.nick},${r.pub_time},${r.content},${r.good},${r.hot},${r.id},${r.reply},${r.pic},${r.spider_time}) ON DUPLICATE KEY UPDATE `aid`=values(aid),`bid`=values(bid),`sbid`=values(sbid),`qid`=values(qid),`ups`=values(ups),`author`=values(author),`uid`=values(uid),`nick`=values(nick),`pub_time`=values(pub_time),`content`=values(content),`good`=values(good),`hot`=values(hot),`id`=values(id),`reply`=values(reply),`pic`=values(pic),`spider_time`=values(spider_time)" val inserts: Seq[DBIO[Int]] = answers.map(buildInsert) val combined: DBIO[Seq[Int]] = DBIO.sequence(inserts) DEST_DB.run(combined).onComplete(data=>{ println("insertOnDuplicateKey data result",data.get.mkString) if (data.isSuccess){ println(data.get) val lastid=answers.last.id Sync.lastActor !("upsert",tablename,lastid) }else{ //retry self !("insertOnDuplicateKey",answers) } })}
and i try to use sqlu in a single sql but error maybe sqlu dont supply String Interpolation
this demo dont work
case ("insertOnDuplicateKeyError",answers:List[Answer])=>{ def buildSql(execpre:String,values: String,execafter:String): DBIO[Int] = sqlu"$execpre $values $execafter" val execpre="insert into answer (aid,bid,sbid,qid,ups,author,uid,nick,pub_time,content,good,hot,id,reply,pic,spider_time) values " val execafter=" ON DUPLICATE KEY UPDATE `aid`=values(aid),`bid`=values(bid),`sbid`=values(sbid),`qid`=values(qid),`ups`=values(ups),`author`=values(author),`uid`=values(uid),`nick`=values(nick),`pub_time`=values(pub_time),`content`=values(content),`good`=values(good),`hot`=values(hot),`id`=values(id),`reply`=values(reply),`pic`=values(pic),`spider_time`=values(spider_time)" val valuesstr=answers.map(row=>("("+List(row.aid,row.bid,row.sbid,row.qid,row.ups,"'"+row.author+"'","'"+row.uid+"'","'"+row.nick+"'","'"+row.pub_time+"'","'"+row.content+"'",row.good,row.hot,row.id,row.reply,row.pic,"'"+row.spider_time+"'").mkString(",")+")")).mkString(",\n") val insertOrUpdateAction=DBIO.seq( buildSql(execpre,valuesstr,execafter) ) DEST_DB.run(insertOrUpdateAction).onComplete(data=>{ if (data.isSuccess){ println("insertOnDuplicateKey data result",data) //retry val lastid=answers.last.id Sync.lastActor !("upsert",tablename,lastid) }else{ self !("insertOnDuplicateKey2",answers) } })}
a mysql sync tool with scala slickhttps://github.com/cclient/ScalaMysqlSync