SDP(5):ScalikeJDBC- JDBC-Engine:Streaming
作为一种通用的数据库编程引擎,用Streaming来应对海量数据的处理是必备功能。同样,我们还是通过一种Context传递产生流的要求。因为StreamingContext比较简单,而且还涉及到数据抽取函数extractor的传递,所以我们分开来定义:
复制代码
case class JDBCQueryContext[M](
dbName: Symbol,
statement: String,
parameters: Seq[Any] = Nil,
fetchSize: Int = 100,
autoCommit: Boolean = false,
queryTimeout: Option[Int] = None,
extractor: WrappedResultSet => M)
复制代码
由于我们会将JDBCQueryContext传给JDBC-Engine去运算,所以Streaming函数的所有参数都必须明确定义,包括extractor函数。实际上JDBCQueryContext也完全满足了jdbcQueryResult函数。我们会在后面重新设计这个函数。JDBCStreaming函数产生一个akka-Source,如下:
复制代码
def jdbcAkkaStream[A](ctx: JDBCQueryContext[A])
(implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = {
val publisher: DatabasePublisher[A] = NamedDB('h2) readOnlyStream {
val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor)
sql.iterator
.withDBSessionForceAdjuster(session => {
session.connection.setAutoCommit(ctx.autoCommit)
session.fetchSize(ctx.fetchSize)
})
}
Source.fromPublisher[A](publisher)
}
复制代码
我们只需要提供一个Sink就可以使用这个akka-stream了:
复制代码
import akka.actor._
import akka.stream.scaladsl._
import akka.stream._
import scalikejdbc._
import configdbs._
import jdbccontext._
import JDBCEngine._
object JDBCStreaming extends App {
implicit val actorSys = ActorSystem("actor-system")
implicit val ec = actorSys.dispatcher
implicit val mat = ActorMaterializer()
ConfigDBsWithEnv("dev").setup('h2)
ConfigDBsWithEnv("dev").loadGlobalSettings()
case class DataRow(year: String, state: String, county: String, value: String)
//data row converter
val toRow = (rs: WrappedResultSet) => DataRow(
year = rs.string("REPORTYEAR"),
state = rs.string("STATENAME"),
county = rs.string("COUNTYNAME"),
value = rs.string("VALUE")
)
//construct the context
val ctx = JDBCQueryContext[DataRow](
dbName = 'h2,
statement = "select * from AIRQM",
extractor = toRow
)
//pass context to construct akka-source
val akkaSource = jdbcAkkaStream(ctx)
//a sink for display rows
val snk = Sink.foreach[(DataRow,Long)] { case (row,idx) =>
println(s"rec#: $idx - year: ${row.year} location: ${row.state},${row.county} value: ${row.value}")}
//can manual terminate stream by kill.shutdown
val kill: UniqueKillSwitch = (akkaSource.zipWithIndex).viaMat(KillSwitches.single)(Keep.right).to(snk).run
scala.io.StdIn.readLine()
kill.shutdown()
actorSys.terminate()
println("+++++++++++++++")
}
复制代码
试运行结果OK。下面是新版本的jdbcQueryResult函数:
复制代码
def jdbcQueryResult[C[_] <: TraversableOnce[_], A](
ctx: JDBCQueryContext[A])(
implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {
val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
rawSql.fetchSize(ctx.fetchSize)
implicit val session = NamedAutoSession(ctx.dbName)
val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor)
sql.collection.apply[C]()
}
复制代码
试运行:
复制代码
object SlickDAO {
import slick.jdbc.H2Profile.api._
case class CountyModel(id: Int, name: String)
case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
def name = column[String]("NAME",O.Length(64))
def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
}
val CountyQuery = TableQuery[CountyTable]
val filter = "Kansas"
val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
val statement = qry.result.statements.head
}
import SlickDAO._
def toRow: WrappedResultSet => CountyModel = rs =>
CountyModel(id=rs.int("id"),name=rs.string("name"))
//construct the context
val slickCtx = JDBCQueryContext[CountyModel](
dbName = 'h2,
statement = "select * from county where id > ? and id < ?",
parameters = Seq(6000,6200),
extractor = toRow
)
val vecCounty: Vector[CountyModel] = jdbcQueryResult[Vector,CountyModel](slickCtx)
vecCounty.foreach(r => println(s"${r.id},${r.name}"))
复制代码
下面是本次讨论的示范源代码:
build.sbt
复制代码
// Scala 2.10, 2.11, 2.12
libraryDependencies ++= Seq(
"org.scalikejdbc" %% "scalikejdbc" % "3.2.1",
"org.scalikejdbc" %% "scalikejdbc-test" % "3.2.1" % "test",
"org.scalikejdbc" %% "scalikejdbc-config" % "3.2.1",
"org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1",
"org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1",
"com.h2database" % "h2" % "1.4.196",
"mysql" % "mysql-connector-java" % "6.0.6",
"org.postgresql" % "postgresql" % "42.2.0",
"commons-dbcp" % "commons-dbcp" % "1.4",
"org.apache.tomcat" % "tomcat-jdbc" % "9.0.2",
"com.zaxxer" % "HikariCP" % "2.7.4",
"com.jolbox" % "bonecp" % "0.8.0.RELEASE",
"com.typesafe.slick" %% "slick" % "3.2.1",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"com.typesafe.akka" %% "akka-actor" % "2.5.4",
"com.typesafe.akka" %% "akka-stream" % "2.5.4"
)
复制代码
resources/application.conf
复制代码
# JDBC settings
test {
db {
h2 {
driver = "org.h2.Driver"
url = "jdbc:h2:tcp://localhost/~/slickdemo"
user = ""
password = ""
poolInitialSize = 5
poolMaxSize = 7
poolConnectionTimeoutMillis = 1000
poolValidationQuery = "select 1 as one"
poolFactoryName = "commons-dbcp2"
}
}
db.mysql.driver = "com.mysql.cj.jdbc.Driver"
db.mysql.url = "jdbc:mysql://localhost:3306/testdb"
db.mysql.user = "root"
db.mysql.password = "123"
db.mysql.poolInitialSize = 5
db.mysql.poolMaxSize = 7
db.mysql.poolConnectionTimeoutMillis = 1000
db.mysql.poolValidationQuery = "select 1 as one"
db.mysql.poolFactoryName = "bonecp"
# scallikejdbc Global settings
scalikejdbc.global.loggingSQLAndTime.enabled = true
scalikejdbc.global.loggingSQLAndTime.logLevel = info
scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
}
dev {
db {
h2 {
driver = "org.h2.Driver"
url = "jdbc:h2:tcp://localhost/~/slickdemo"
user = ""
password = ""
poolFactoryName = "hikaricp"
numThreads = 10
maxConnections = 12
minConnections = 4
keepAliveConnection = true
}
mysql {
driver = "com.mysql.cj.jdbc.Driver"
url = "jdbc:mysql://localhost:3306/testdb"
user = "root"
password = "123"
poolInitialSize = 5
poolMaxSize = 7
poolConnectionTimeoutMillis = 1000
poolValidationQuery = "select 1 as one"
poolFactoryName = "bonecp"
}
postgres {
driver = "org.postgresql.Driver"
url = "jdbc:postgresql://localhost:5432/testdb"
user = "root"
password = "123"
poolFactoryName = "hikaricp"
numThreads = 10
maxConnections = 12
minConnections = 4
keepAliveConnection = true
}
}
# scallikejdbc Global settings
scalikejdbc.global.loggingSQLAndTime.enabled = true
scalikejdbc.global.loggingSQLAndTime.logLevel = info
scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
}
复制代码
JDBCEngine.scala
复制代码
package jdbccontext
import java.sql.PreparedStatement
import scala.collection.generic.CanBuildFrom
import akka.stream.scaladsl._
import scalikejdbc._
import scalikejdbc.streams._
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import scala.util._
import scalikejdbc.TxBoundary.Try._
import scala.concurrent.ExecutionContextExecutor
object JDBCContext {
type SQLTYPE = Int
val SQL_SELECT: Int = 0
val SQL_EXEDDL= 1
val SQL_UPDATE = 2
val RETURN_GENERATED_KEYVALUE = true
val RETURN_UPDATED_COUNT = false
}
case class JDBCQueryContext[M](
dbName: Symbol,
statement: String,
parameters: Seq[Any] = Nil,
fetchSize: Int = 100,
autoCommit: Boolean = false,
queryTimeout: Option[Int] = None,
extractor: WrappedResultSet => M)
case class JDBCContext(
dbName: Symbol,
statements: Seq[String] = Nil,
parameters: Seq[Seq[Any]] = Nil,
fetchSize: Int = 100,
queryTimeout: Option[Int] = None,
queryTags: Seq[String] = Nil,
sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_SELECT,
batch: Boolean = false,
returnGeneratedKey: Seq[Option[Any]] = Nil,
// no return: None, return by index: Some(1), by name: Some("id")
preAction: Option[PreparedStatement => Unit] = None,
postAction: Option[PreparedStatement => Unit] = None) {
ctx =>
//helper functions
def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag)
def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags)
def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size)
def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time)
def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
!ctx.batch && ctx.statements.size == 1)
ctx.copy(preAction = action)
else
throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
}
def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
!ctx.batch && ctx.statements.size == 1)
ctx.copy(postAction = action)
else
throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
}
def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
if (ctx.sqlType == JDBCContext.SQL_EXEDDL) {
ctx.copy(
statements = ctx.statements ++ Seq(_statement),
parameters = ctx.parameters ++ Seq(Seq(_parameters))
)
} else
throw new IllegalStateException("JDBCContex setting error: option not supported!")
}
def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = {
if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) {
ctx.copy(
statements = ctx.statements ++ Seq(_statement),
parameters = ctx.parameters ++ Seq(_parameters),
returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None))
)
} else
throw new IllegalStateException("JDBCContex setting error: option not supported!")
}
def appendBatchParameters(_parameters: Any*): JDBCContext = {
if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!")
var matchParams = true
if (ctx.parameters != Nil)
if (ctx.parameters.head.size != _parameters.size)
matchParams = false
if (matchParams) {
ctx.copy(
parameters = ctx.parameters ++ Seq(_parameters)
)
} else
throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!")
}
def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = {
if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!")
ctx.copy(
returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil
)
}
def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
ctx.copy(
statements = Seq(_statement),
parameters = Seq(_parameters),
sqlType = JDBCContext.SQL_EXEDDL,
batch = false
)
}
def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = {
ctx.copy(
statements = Seq(_statement),
parameters = Seq(_parameters),
returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None),
sqlType = JDBCContext.SQL_UPDATE,
batch = fals