Slick 3.0 bulk insert or update (upsert) Slick 3.0 bulk insert or update (upsert) mysql mysql

Slick 3.0 bulk insert or update (upsert)


There are several ways that you can make this code faster (each one should be faster than the preceding ones, but it gets progressively less idiomatic-slick):

  • Run insertOrUpdateAll instead of insertOrUpdate if on slick-pg 0.16.1+

    await(run(TableQuery[FooTable].insertOrUpdateAll rows)).sum
  • Run your DBIO events all at once, rather than waiting for each one to commit before you run the next:

    val toBeInserted = rows.map { row => TableQuery[FooTable].insertOrUpdate(row) }val inOneGo = DBIO.sequence(toBeInserted)val dbioFuture = run(inOneGo)// Optionally, you can add a `.transactionally`// and / or `.withPinnedSession` here to pin all of these upserts// to the same transaction / connection// which *may* get you a little more speed:// val dbioFuture = run(inOneGo.transactionally)val rowsInserted = await(dbioFuture).sum
  • Drop down to the JDBC level and run your upsert all in one go (idea via this answer):

    val SQL = """INSERT INTO table (a,b,c) VALUES (?, ?, ?)ON DUPLICATE KEY UPDATE c=VALUES(a)+VALUES(b);"""SimpleDBIO[List[Int]] { session =>  val statement = session.connection.prepareStatement(SQL)  rows.map { row =>    statement.setInt(1, row.a)    statement.setInt(2, row.b)    statement.setInt(3, row.c)    statement.addBatch()  }  statement.executeBatch()}


As you can see at Slick examples, you can use ++= function to insert using JDBC batch insert feature. Per instance:

val foos = TableQuery[FooTable]val rows: Seq[Foo] = ...foos ++= rows // here slick will use batch insert

You can also "size" you batch by "grouping" the rows sequence:

val batchSize = 1000rows.grouped(batchSize).foreach { group => foos ++= group }


use sqlu

this demo work

case ("insertOnDuplicateKey",answers:List[Answer])=>{  def buildInsert(r: Answer): DBIO[Int] =    sqlu"insert into answer (aid,bid,sbid,qid,ups,author,uid,nick,pub_time,content,good,hot,id,reply,pic,spider_time) values (${r.aid},${r.bid},${r.sbid},${r.qid},${r.ups},${r.author},${r.uid},${r.nick},${r.pub_time},${r.content},${r.good},${r.hot},${r.id},${r.reply},${r.pic},${r.spider_time}) ON DUPLICATE KEY UPDATE `aid`=values(aid),`bid`=values(bid),`sbid`=values(sbid),`qid`=values(qid),`ups`=values(ups),`author`=values(author),`uid`=values(uid),`nick`=values(nick),`pub_time`=values(pub_time),`content`=values(content),`good`=values(good),`hot`=values(hot),`id`=values(id),`reply`=values(reply),`pic`=values(pic),`spider_time`=values(spider_time)"  val inserts: Seq[DBIO[Int]] = answers.map(buildInsert)  val combined: DBIO[Seq[Int]] = DBIO.sequence(inserts)  DEST_DB.run(combined).onComplete(data=>{    println("insertOnDuplicateKey data result",data.get.mkString)    if (data.isSuccess){      println(data.get)      val lastid=answers.last.id      Sync.lastActor !("upsert",tablename,lastid)    }else{      //retry      self !("insertOnDuplicateKey",answers)    }  })}

and i try to use sqlu in a single sql but error maybe sqlu dont supply String Interpolation

this demo dont work

case ("insertOnDuplicateKeyError",answers:List[Answer])=>{  def buildSql(execpre:String,values: String,execafter:String): DBIO[Int] = sqlu"$execpre $values $execafter"  val execpre="insert into answer (aid,bid,sbid,qid,ups,author,uid,nick,pub_time,content,good,hot,id,reply,pic,spider_time)  values "  val execafter=" ON DUPLICATE KEY UPDATE  `aid`=values(aid),`bid`=values(bid),`sbid`=values(sbid),`qid`=values(qid),`ups`=values(ups),`author`=values(author),`uid`=values(uid),`nick`=values(nick),`pub_time`=values(pub_time),`content`=values(content),`good`=values(good),`hot`=values(hot),`id`=values(id),`reply`=values(reply),`pic`=values(pic),`spider_time`=values(spider_time)"  val valuesstr=answers.map(row=>("("+List(row.aid,row.bid,row.sbid,row.qid,row.ups,"'"+row.author+"'","'"+row.uid+"'","'"+row.nick+"'","'"+row.pub_time+"'","'"+row.content+"'",row.good,row.hot,row.id,row.reply,row.pic,"'"+row.spider_time+"'").mkString(",")+")")).mkString(",\n")  val insertOrUpdateAction=DBIO.seq(    buildSql(execpre,valuesstr,execafter)  )  DEST_DB.run(insertOrUpdateAction).onComplete(data=>{    if (data.isSuccess){      println("insertOnDuplicateKey data result",data)      //retry      val lastid=answers.last.id      Sync.lastActor !("upsert",tablename,lastid)    }else{      self !("insertOnDuplicateKey2",answers)    }  })}

a mysql sync tool with scala slickhttps://github.com/cclient/ScalaMysqlSync