SDP(6):分布式数据库运算环境- Cassandra-Engine

时间:2022-07-06 06:36:23

现代信息系统应该是避不开大数据处理的。作为一个通用的系统集成工具也必须具备大数据存储和读取能力。cassandra是一种分布式的数据库,具备了分布式数据库高可用性(high-availability)特性,对于一个实时大型分布式集成系统来说是核心支柱。与传统的关系数据库对比,cassandra从数据存储结构、读取方式等可以说是皆然不同的。如:cassandra库表设计是反范式的(denormalized)、表结构设计是反过来根据query要求设计的,等等。幸运的是自版本3.0后cassandra提供了CQL来支持数据库操作。简单来说CQL就是cassandra的SQL。CQL是一种query语言,在语法上与SQL相近。最重要的是CQL用SQL的呈现方式来描述cassandra底层数据的存储方式,让熟悉了关系数据库SQL编程人员能够容易开始使用cassandra。与SQL一样,CQL也是一种纯文本语言,可以通过多种终端接口软件包括java-client来运行CQL脚本。 目前在市面上有一些现成的cassandra客户端编程软件,有些为了实现类型安全(type-safety)还提供了Linq-DSL(language-integrated-query),但因为我们需要面向各种cassandra数据库用户,所以还是决定提供一种CQL脚本运算环境,也就是说Cassandra-Engine接受CQL脚本然后运算得出结果。

和JDBC的运算结构很相似:CQL运算也是先构建statement然后execute。与JDBC不同的是:CQL还提供non-blocking脚本运算:

   /**
* Executes the provided query asynchronously.
* <p/>
* This method does not block. It returns as soon as the query has been
* passed to the underlying network stack. In particular, returning from
* this method does not guarantee that the query is valid or has even been
* submitted to a live node. Any exception pertaining to the failure of the
* query will be thrown when accessing the {@link ResultSetFuture}.
* <p/>
* Note that for queries that don't return a result (INSERT, UPDATE and
* DELETE), you will need to access the ResultSetFuture (that is, call one of
* its {@code get} methods to make sure the query was successful.
*
* @param statement the CQL query to execute (that can be any {@code Statement}).
* @return a future on the result of the query.
* @throws UnsupportedFeatureException if the protocol version 1 is in use and
* a feature not supported has been used. Features that are not supported by
* the version protocol 1 include: BatchStatement, ResultSet paging and binary
* values in RegularStatement.
*/
ResultSetFuture executeAsync(Statement statement);

executeAsync返回结果ResultSsetFuture是个google-guava-future。我们可以用隐式转换(implicit conversion)把它转换成scala-future来使用:

 implicit def listenableFutureToFuture[T](
listenableFuture: ListenableFuture[T]): Future[T] = {
val promise = Promise[T]()
Futures.addCallback(listenableFuture, new FutureCallback[T] {
def onFailure(error: Throwable): Unit = {
promise.failure(error)
()
}
def onSuccess(result: T): Unit = {
promise.success(result)
()
}
})
promise.future
}

有了这个隐式实例executeAsync返回结果自动转成Future[?],如下:

  def cqlSingleUpdate(ctx: CQLContext)(
implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
...
session.executeAsync(boundStmt).map(_.wasApplied())
}

我们还是通过某种Context方式来构建完整可执行的statement:

case class CQLContext(
statements: Seq[String],
parameters: Seq[Seq[Object]] = Nil,
consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None
) { ctx =>
def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext =
ctx.copy(consistency = Some(_consistency))
def setCommand(_statement: String, _parameters: Object*): CQLContext =
ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters))
def appendCommand(_statement: String, _parameters: Object*): CQLContext =
ctx.copy(statements = ctx.statements :+ _statement,
parameters = ctx.parameters ++ Seq(_parameters))
}

与JDBCContext不同的是这个consistencyLevel。因为数据是重复分布在多个集群节点上的,所以需要通过consistencyLevel来注明分布式数据的读写方式:

  def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => {
consistency match {
case ALL => ConsistencyLevel.ALL
case ONE => ConsistencyLevel.ONE
case TWO => ConsistencyLevel.TWO
case THREE => ConsistencyLevel.THREE
case ANY => ConsistencyLevel.ANY
case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM
case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE
case QUORUM => ConsistencyLevel.QUORUM
case SERIAL => ConsistencyLevel.SERIAL
case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL
}
}

CQL statement 分simplestatement, preparedstatement和boundstatement。boundstatement可以覆盖所有类型的CQL statement构建要求。下面是一个构建boundstatement的例子:

   val prepStmt = session.prepare(ctx.statement)

    var boundStmt =  prepStmt.bind()
if (ctx.parameter != Nil) {
val params = processParameters(ctx.parameter)
boundStmt = prepStmt.bind(params:_*)
}

CQL statement参数类型比较复杂,包括date,timestamp等都必须经过processParameters函数进行预处理:

  case class CQLDate(year: Int, month: Int, day: Int)
case object CQLTodayDate
case class CQLDateTime(year: Int, Month: Int,
day: Int, hour: Int, minute: Int, second: Int, millisec: Int = )
case object CQLDateTimeNow def processParameters(params: Seq[Object]): Seq[Object] = {
params.map { obj =>
obj match {
case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd)
case CQLTodayDate =>
val today = java.time.LocalDate.now()
LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth)
case CQLDateTimeNow => Instant.now()
case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) =>
Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d")
case p@_ => p
}
}
}

CassandraEngine更新运算分为单条update和批次update。批次update与事物处理有异曲同工之效:批次中任何一条脚本运算失败则回滚所有更新:

 def cqlExecute(ctx: CQLContext)(
implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
if (ctx.statements.size == )
cqlSingleUpdate(ctx)
else
cqlMultiUpdate(ctx)
}
def cqlSingleUpdate(ctx: CQLContext)(
implicit session: Session, ec: ExecutionContext): Future[Boolean] = { val prepStmt = session.prepare(ctx.statements.head) var boundStmt = prepStmt.bind()
if (ctx.statements != Nil) {
val params = processParameters(ctx.parameters.head)
boundStmt = prepStmt.bind(params:_*)
} ctx.consistency.foreach {consistency =>
boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
session.executeAsync(boundStmt).map(_.wasApplied())
} def cqlMultiUpdate(ctx: CQLContext)(
implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
val commands: Seq[(String,Seq[Object])] = ctx.statements zip ctx.parameters
var batch = new BatchStatement()
commands.foreach { case (stm, params) =>
val prepStmt = session.prepare(stm)
if (params == Nil)
batch.add(prepStmt.bind())
else {
val p = processParameters(params)
batch.add(prepStmt.bind(p: _*))
}
} ctx.consistency.foreach {consistency =>
batch.setConsistencyLevel(consistencyLevel(consistency))}
session.executeAsync(batch).map(_.wasApplied())
}

CassandraEngine update返回运算状态Future[Boolean]。下面是一段update示范:

  val createCQL ="""
CREATE TABLE testdb.members (
id UUID primary key,
name TEXT,
description TEXT,
birthday DATE,
created_at TIMESTAMP,
picture BLOB
)""" val ctxCreate = CQLContext().setCommand(createCQL) val ctxInsert = CQLContext().setCommand("insert into testdb.members(id,name,description,birthday,created_at,picture)" +
" values(uuid(),?,?,?,?,?)", "alan xu", "alan-xu", CQLDate(, , ), CQLDateTimeNow, cqlFileToBytes("/users/tiger/Nobody.png")) val createData = for {
createTable <- cqlExecute(ctxCreate)
insertData <- cqlExecute(ctxInsert)
} yield(createTable, insertData) createData.onComplete {
case Success((c,i)) => println(s"Create Table: $c, Insert Data $i")
case Failure(e) => println(e.getMessage)
}

在上面的例子里我们用for-comprehension实现了连续运算。注意在这个例子里已经包括了date,datetime,blob等输入参数类型。

fetch-query的statement构建信息如下:

case class CQLQueryContext[M](
statement: String,
parameter: Seq[Object] = Nil,
consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,
extractor: Row => M
)

fetch-query运算也是用execute方式实现的:

    /**
* Executes the provided query.
* <p/>
* This method blocks until at least some result has been received from the
* database. However, for SELECT queries, it does not guarantee that the
* result has been received in full. But it does guarantee that some
* response has been received from the database, and in particular
* guarantees that if the request is invalid, an exception will be thrown
* by this method.
*
* @param statement the CQL query to execute (that can be any {@link Statement}).
* @return the result of the query. That result will never be null but can
* be empty (and will be for any non SELECT query).
* @throws NoHostAvailableException if no host in the cluster can be
* contacted successfully to execute this query.
* @throws QueryExecutionException if the query triggered an execution
* exception, i.e. an exception thrown by Cassandra when it cannot execute
* the query with the requested consistency level successfully.
* @throws QueryValidationException if the query if invalid (syntax error,
* unauthorized or any other validation problem).
* @throws UnsupportedFeatureException if the protocol version 1 is in use and
* a feature not supported has been used. Features that are not supported by
* the version protocol 1 include: BatchStatement, ResultSet paging and binary
* values in RegularStatement.
*/
ResultSet execute(Statement statement);

返回结果ResultSet经过转换后成为scala collection:

  def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext[A], pageSize: Int = )(
implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= { val prepStmt = session.prepare(ctx.statement) var boundStmt = prepStmt.bind()
if (ctx.parameter != Nil) {
val params = processParameters(ctx.parameter)
boundStmt = prepStmt.bind(params:_*)
} ctx.consistency.foreach {consistency =>
boundStmt.setConsistencyLevel(consistencyLevel(consistency))} val resultSet = session.execute(boundStmt.setFetchSize(pageSize))
(resultSet,(resultSet.asScala.view.map(ctx.extractor)).to[C]) }

fetchResultPage是分页读取的,可以用fetchMoreResults持续读取:

    /**
* Force fetching the next page of results for this result set, if any.
* <p/>
* This method is entirely optional. It will be called automatically while
* the result set is consumed (through {@link #one}, {@link #all} or iteration)
* when needed (i.e. when {@code getAvailableWithoutFetching() == 0} and
* {@code isFullyFetched() == false}).
* <p/>
* You can however call this method manually to force the fetching of the
* next page of results. This can allow to prefetch results before they are
* strictly needed. For instance, if you want to prefetch the next page of
* results as soon as there is less than 100 rows readily available in this
* result set, you can do:
* <pre>
* ResultSet rs = session.execute(...);
* Iterator<Row> iter = rs.iterator();
* while (iter.hasNext()) {
* if (rs.getAvailableWithoutFetching() == 100 && !rs.isFullyFetched())
* rs.fetchMoreResults();
* Row row = iter.next()
* ... process the row ...
* }
* </pre>
* This method is not blocking, so in the example above, the call to {@code
* fetchMoreResults} will not block the processing of the 100 currently available
* rows (but {@code iter.hasNext()} will block once those rows have been processed
* until the fetch query returns, if it hasn't yet).
* <p/>
* Only one page of results (for a given result set) can be
* fetched at any given time. If this method is called twice and the query
* triggered by the first call has not returned yet when the second one is
* performed, then the 2nd call will simply return a future on the currently
* in progress query.
*
* @return a future on the completion of fetching the next page of results.
* If the result set is already fully retrieved ({@code isFullyFetched() == true}),
* then the returned future will return immediately but not particular error will be
* thrown (you should thus call {@link #isFullyFetched()} to know if calling this
* method can be of any use}).
*/
ListenableFuture<S> fetchMoreResults();

下面是分页持续读取的实现:

  def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)(
extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) =
if (resultSet.isFullyFetched) {
(resultSet, None)
} else {
try {
val result = Await.result(resultSet.fetchMoreResults(), timeOut)
(result, Some((result.asScala.view.map(extractor)).to[C]))
} catch { case e: Throwable => (resultSet, None) }
}

我们用这两个函数来读取上面用cqlInsert脚本加入cassandra的数据:

  //data model
case class Member(
id: String,
name: String,
description: Option[String] = None,
birthday: LocalDate,
createdAt: java.util.Date,
picture: Option[ByteBuffer] = None) //data row converter
val toMember = (rs: Row) => Member(
id = rs.getUUID("id").toString,
name = rs.getString("name"),
description = {
val d = rs.getString("description")
if (d == null)
None
else
Some(d) Some(rs.getColumnDefinitions.toString)
},
birthday = rs.getDate("birthday"),
createdAt = rs.getTimestamp("created_at"),
picture = {
val pic = rs.getBytes("picture")
if (pic == null)
None
else
Some(pic) }
) try {
val qtx = CQLQueryContext(statement = "select * from testdb.members", extractor = toMember)
val (resultSet, vecResults) = fetchResultPage[Vector, Member](qtx) var vecMembers: Vector[Member] = vecResults var isExh = resultSet.isExhausted
var nextPage: (ResultSet, Option[Vector[Member]]) = (resultSet, Some(vecResults))
while (!isExh) {
nextPage = fetchMorePages[Vector,Member](nextPage._1, second)(toMember)
nextPage._2.foreach {vec =>
vecMembers = vecMembers ++ vec
}
isExh = resultSet.isExhausted
}
vecMembers.foreach { m =>
println(s"id: ${m.id}-name:${m.name}-${m.description} birthday: ${m.birthday.toString}")
println(s"created_at: ${cqlDateTimeString(m.createdAt,"yyyy-MM-dd HH:mm:ss.SSS")}")
m.picture match {
case Some(buf) =>
val fname = s"/users/tiger/pic-${m.name}.png"
cqlBytesToFile(buf,fname)
println(s"saving picture to $fname")
case _ => println("empty picture!")
}
}
} catch {
case e: Exception => println(e.getMessage)
}

在上面的示范里我们还引用了一些helper函数:

 def cqlFileToBytes(fileName: String): ByteBuffer = {
val fis = new FileInputStream(fileName)
val b = new Array[Byte](fis.available + )
val length = b.length
fis.read(b)
ByteBuffer.wrap(b)
} def cqlBytesToFile(bytes: ByteBuffer, fileName: String)(
implicit mat: Materializer): Future[IOResult] = {
val source = StreamConverters.fromInputStream(() => ByteBufferInputStream(bytes))
source.runWith(FileIO.toPath(Paths.get(fileName)))
} def cqlDateTimeString(date: java.util.Date, fmt: String): String = {
val outputFormat = new java.text.SimpleDateFormat(fmt)
outputFormat.format(date)
} def useJava8DateTime(cluster: Cluster) = {
//for jdk8 datetime format
cluster.getConfiguration().getCodecRegistry()
.register(InstantCodec.instance)
}

还需要一个ByteBufferInputStream类型来实现blob内容的读取:

 class ByteBufferInputStream(buf: ByteBuffer) extends InputStream {
override def read: Int = {
if (!buf.hasRemaining) return -
buf.get
} override def read(bytes: Array[Byte], off: Int, len: Int): Int = {
val length: Int = Math.min(len, buf.remaining)
buf.get(bytes, off, length)
length
}
}
object ByteBufferInputStream {
def apply(buf: ByteBuffer): ByteBufferInputStream = {
new ByteBufferInputStream(buf)
}
}

下面就是本次讨论示范源代码:

build.sbt

name := "learn_cassandra"

version := "0.1"

scalaVersion := "2.12.4"

libraryDependencies := Seq(
"com.datastax.cassandra" % "cassandra-driver-core" % "3.4.0",
"com.datastax.cassandra" % "cassandra-driver-extras" % "3.4.0",
"com.typesafe.akka" %% "akka-actor" % "2.5.4",
"com.typesafe.akka" %% "akka-stream" % "2.5.4",
"com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.16",
"org.scalikejdbc" %% "scalikejdbc" % "3.2.1",
"org.scalikejdbc" %% "scalikejdbc-test" % "3.2.1" % "test",
"org.scalikejdbc" %% "scalikejdbc-config" % "3.2.1",
"org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1",
"org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1",
"com.h2database" % "h2" % "1.4.196",
"mysql" % "mysql-connector-java" % "6.0.6",
"org.postgresql" % "postgresql" % "42.2.0",
"commons-dbcp" % "commons-dbcp" % "1.4",
"org.apache.tomcat" % "tomcat-jdbc" % "9.0.2",
"com.zaxxer" % "HikariCP" % "2.7.4",
"com.jolbox" % "bonecp" % "0.8.0.RELEASE",
"com.typesafe.slick" %% "slick" % "3.2.1",
"ch.qos.logback" % "logback-classic" % "1.2.3")

CassandraEngine.scala

import com.datastax.driver.core._
import scala.concurrent._
import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture}
import scala.collection.JavaConverters._
import scala.collection.generic.CanBuildFrom
import scala.concurrent.duration.Duration object CQLContext {
// Consistency Levels
type CONSISTENCY_LEVEL = Int
val ANY: CONSISTENCY_LEVEL = 0x0000
val ONE: CONSISTENCY_LEVEL = 0x0001
val TWO: CONSISTENCY_LEVEL = 0x0002
val THREE: CONSISTENCY_LEVEL = 0x0003
val QUORUM : CONSISTENCY_LEVEL = 0x0004
val ALL: CONSISTENCY_LEVEL = 0x0005
val LOCAL_QUORUM: CONSISTENCY_LEVEL = 0x0006
val EACH_QUORUM: CONSISTENCY_LEVEL = 0x0007
val LOCAL_ONE: CONSISTENCY_LEVEL = 0x000A
val LOCAL_SERIAL: CONSISTENCY_LEVEL = 0x000B
val SERIAL: CONSISTENCY_LEVEL = 0x000C def apply(): CQLContext = CQLContext(statements = Nil) def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => {
consistency match {
case ALL => ConsistencyLevel.ALL
case ONE => ConsistencyLevel.ONE
case TWO => ConsistencyLevel.TWO
case THREE => ConsistencyLevel.THREE
case ANY => ConsistencyLevel.ANY
case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM
case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE
case QUORUM => ConsistencyLevel.QUORUM
case SERIAL => ConsistencyLevel.SERIAL
case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL }
} }
case class CQLQueryContext[M](
statement: String,
parameter: Seq[Object] = Nil,
consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,
extractor: Row => M
) case class CQLContext(
statements: Seq[String],
parameters: Seq[Seq[Object]] = Nil,
consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None
) { ctx => def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext =
ctx.copy(consistency = Some(_consistency))
def setCommand(_statement: String, _parameters: Object*): CQLContext =
ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters))
def appendCommand(_statement: String, _parameters: Object*): CQLContext =
ctx.copy(statements = ctx.statements :+ _statement,
parameters = ctx.parameters ++ Seq(_parameters))
} object CQLEngine {
import CQLContext._
import CQLHelpers._ def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext[A], pageSize: Int = )(
implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= { val prepStmt = session.prepare(ctx.statement) var boundStmt = prepStmt.bind()
if (ctx.parameter != Nil) {
val params = processParameters(ctx.parameter)
boundStmt = prepStmt.bind(params:_*)
} ctx.consistency.foreach {consistency =>
boundStmt.setConsistencyLevel(consistencyLevel(consistency))} val resultSet = session.execute(boundStmt.setFetchSize(pageSize))
(resultSet,(resultSet.asScala.view.map(ctx.extractor)).to[C])
}
def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)(
extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) =
if (resultSet.isFullyFetched) {
(resultSet, None)
} else {
try {
val result = Await.result(resultSet.fetchMoreResults(), timeOut)
(result, Some((result.asScala.view.map(extractor)).to[C]))
} catch { case e: Throwable => (resultSet, None) }
}
def cqlExecute(ctx: CQLContext)(
implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
if (ctx.statements.size == )
cqlSingleUpdate(ctx)
else
cqlMultiUpdate(ctx)
}
def cqlSingleUpdate(ctx: CQLContext)(
implicit session: Session, ec: ExecutionContext): Future[Boolean] = { val prepStmt = session.prepare(ctx.statements.head) var boundStmt = prepStmt.bind()
if (ctx.statements != Nil) {
val params = processParameters(ctx.parameters.head)
boundStmt = prepStmt.bind(params:_*)
} ctx.consistency.foreach {consistency =>
boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
session.executeAsync(boundStmt).map(_.wasApplied())
}
def cqlMultiUpdate(ctx: CQLContext)(
implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
val commands: Seq[(String,Seq[Object])] = ctx.statements zip ctx.parameters
var batch = new BatchStatement()
commands.foreach { case (stm, params) =>
val prepStmt = session.prepare(stm)
if (params == Nil)
batch.add(prepStmt.bind())
else {
val p = processParameters(params)
batch.add(prepStmt.bind(p: _*))
}
}
ctx.consistency.foreach {consistency =>
batch.setConsistencyLevel(consistencyLevel(consistency))}
session.executeAsync(batch).map(_.wasApplied())
}
}
object CQLHelpers {
import java.nio.ByteBuffer
import java.io._
import java.nio.file._
import com.datastax.driver.core.LocalDate
import com.datastax.driver.extras.codecs.jdk8.InstantCodec
import java.time.Instant
import akka.stream.scaladsl._
import akka.stream._ implicit def listenableFutureToFuture[T](
listenableFuture: ListenableFuture[T]): Future[T] = {
val promise = Promise[T]()
Futures.addCallback(listenableFuture, new FutureCallback[T] {
def onFailure(error: Throwable): Unit = {
promise.failure(error)
()
}
def onSuccess(result: T): Unit = {
promise.success(result)
()
}
})
promise.future
} case class CQLDate(year: Int, month: Int, day: Int)
case object CQLTodayDate
case class CQLDateTime(year: Int, Month: Int,
day: Int, hour: Int, minute: Int, second: Int, millisec: Int = )
case object CQLDateTimeNow def processParameters(params: Seq[Object]): Seq[Object] = {
params.map { obj =>
obj match {
case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd)
case CQLTodayDate =>
val today = java.time.LocalDate.now()
LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth)
case CQLDateTimeNow => Instant.now()
case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) =>
Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d")
case p@_ => p
}
}
}
class ByteBufferInputStream(buf: ByteBuffer) extends InputStream {
override def read: Int = {
if (!buf.hasRemaining) return -
buf.get
} override def read(bytes: Array[Byte], off: Int, len: Int): Int = {
val length: Int = Math.min(len, buf.remaining)
buf.get(bytes, off, length)
length
}
}
object ByteBufferInputStream {
def apply(buf: ByteBuffer): ByteBufferInputStream = {
new ByteBufferInputStream(buf)
}
}
class FixsizedByteBufferOutputStream(buf: ByteBuffer) extends OutputStream { override def write(b: Int): Unit = {
buf.put(b.toByte)
} override def write(bytes: Array[Byte], off: Int, len: Int): Unit = {
buf.put(bytes, off, len)
}
}
object FixsizedByteBufferOutputStream {
def apply(buf: ByteBuffer) = new FixsizedByteBufferOutputStream(buf)
}
class ExpandingByteBufferOutputStream(var buf: ByteBuffer, onHeap: Boolean) extends OutputStream { private val increasing = ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR override def write(b: Array[Byte], off: Int, len: Int): Unit = {
val position = buf.position
val limit = buf.limit
val newTotal: Long = position + len
if(newTotal > limit){
var capacity = (buf.capacity * increasing)
while(capacity <= newTotal){
capacity = (capacity*increasing)
}
increase(capacity.toInt)
} buf.put(b, , len)
} override def write(b: Int): Unit= {
if (!buf.hasRemaining) increase((buf.capacity * increasing).toInt)
buf.put(b.toByte)
}
protected def increase(newCapacity: Int): Unit = {
buf.limit(buf.position)
buf.rewind
val newBuffer =
if (onHeap) ByteBuffer.allocate(newCapacity)
else ByteBuffer.allocateDirect(newCapacity)
newBuffer.put(buf)
buf.clear
buf = newBuffer
}
def size: Long = buf.position
def capacity: Long = buf.capacity
def byteBuffer: ByteBuffer = buf
}
object ExpandingByteBufferOutputStream {
val DEFAULT_INCREASING_FACTOR = 1.5f
def apply(size: Int, increasingBy: Float, onHeap: Boolean) = {
if (increasingBy <= ) throw new IllegalArgumentException("Increasing Factor must be greater than 1.0")
val buffer: ByteBuffer =
if (onHeap) ByteBuffer.allocate(size)
else ByteBuffer.allocateDirect(size)
new ExpandingByteBufferOutputStream(buffer,onHeap)
}
def apply(size: Int): ExpandingByteBufferOutputStream = {
apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, false)
} def apply(size: Int, onHeap: Boolean): ExpandingByteBufferOutputStream = {
apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, onHeap)
} def apply(size: Int, increasingBy: Float): ExpandingByteBufferOutputStream = {
apply(size, increasingBy, false)
} }
def cqlFileToBytes(fileName: String): ByteBuffer = {
val fis = new FileInputStream(fileName)
val b = new Array[Byte](fis.available + )
val length = b.length
fis.read(b)
ByteBuffer.wrap(b)
}
def cqlBytesToFile(bytes: ByteBuffer, fileName: String)(
implicit mat: Materializer): Future[IOResult] = {
val source = StreamConverters.fromInputStream(() => ByteBufferInputStream(bytes))
source.runWith(FileIO.toPath(Paths.get(fileName)))
}
def cqlDateTimeString(date: java.util.Date, fmt: String): String = {
val outputFormat = new java.text.SimpleDateFormat(fmt)
outputFormat.format(date)
}
def useJava8DateTime(cluster: Cluster) = {
//for jdk8 datetime format
cluster.getConfiguration().getCodecRegistry()
.register(InstantCodec.instance)
}
}

CQLEngineDemo.scala

import scala.util._
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import com.datastax.driver.core._
import CQLEngine._
import CQLHelpers._
import com.datastax.driver.core.LocalDate
import java.nio.ByteBuffer
import scala.concurrent.duration._ object CQLEngineDemo extends App { //#init-mat
implicit val cqlsys = ActorSystem("cqlSystem")
implicit val mat = ActorMaterializer()
implicit val ec = cqlsys.dispatcher val cluster = new Cluster
.Builder()
.addContactPoints("localhost")
.withPort()
.build() useJava8DateTime(cluster)
implicit val session = cluster.connect() val createCQL ="""
CREATE TABLE testdb.members (
id UUID primary key,
name TEXT,
description TEXT,
birthday DATE,
created_at TIMESTAMP,
picture BLOB
)""" val ctxCreate = CQLContext().setCommand(createCQL) val ctxInsert = CQLContext().setCommand("insert into testdb.members(id,name,description,birthday,created_at,picture)" +
" values(uuid(),?,?,?,?,?)", "alan xu", "alan-xu", CQLDate(, , ), CQLDateTimeNow, cqlFileToBytes("/users/tiger/Nobody.png")) val createData = for {
createTable <- cqlExecute(ctxCreate)
insertData <- cqlExecute(ctxInsert)
} yield(createTable, insertData) createData.onComplete {
case Success((c,i)) => println(s"Create Table: $c, Insert Data $i")
case Failure(e) => println(e.getMessage)
}
scala.io.StdIn.readLine()
//data model
case class Member(
id: String,
name: String,
description: Option[String] = None,
birthday: LocalDate,
createdAt: java.util.Date,
picture: Option[ByteBuffer] = None) //data row converter
val toMember = (rs: Row) => Member(
id = rs.getUUID("id").toString,
name = rs.getString("name"),
description = {
val d = rs.getString("description")
if (d == null)
None
else
Some(d) Some(rs.getColumnDefinitions.toString)
},
birthday = rs.getDate("birthday"),
createdAt = rs.getTimestamp("created_at"),
picture = {
val pic = rs.getBytes("picture")
if (pic == null)
None
else
Some(pic) }
) try {
val qtx = CQLQueryContext(statement = "select * from testdb.members", extractor = toMember)
val (resultSet, vecResults) = fetchResultPage[Vector, Member](qtx) var vecMembers: Vector[Member] = vecResults var isExh = resultSet.isExhausted
var nextPage: (ResultSet, Option[Vector[Member]]) = (resultSet, Some(vecResults))
while (!isExh) {
nextPage = fetchMorePages[Vector,Member](nextPage._1, second)(toMember)
nextPage._2.foreach {vec =>
vecMembers = vecMembers ++ vec
}
isExh = resultSet.isExhausted
}
vecMembers.foreach { m =>
println(s"id: ${m.id}-name:${m.name}-${m.description} birthday: ${m.birthday.toString}")
println(s"created_at: ${cqlDateTimeString(m.createdAt,"yyyy-MM-dd HH:mm:ss.SSS")}")
m.picture match {
case Some(buf) =>
val fname = s"/users/tiger/pic-${m.name}.png"
cqlBytesToFile(buf,fname)
println(s"saving picture to $fname")
case _ => println("empty picture!")
}
}
} catch {
case e: Exception => println(e.getMessage)
} scala.io.StdIn.readLine()
session.close()
cluster.close()
cqlsys.terminate() }

SDP(6):分布式数据库运算环境- Cassandra-Engine的更多相关文章

  1. 分布式 Key-Value 存储系统:Cassandra 入门

    Apache Cassandra 是一套开源分布式 Key-Value 存储系统.它最初由 Facebook 开发,用于储存特别大的数据. Cassandra 不是一个数据库,它是一个混合型的非关系的 ...

  2. 从NoSQL到NewSQL,谈交易型分布式数据库建设要点

    在上一篇文章<从架构特点到功能缺陷,重新认识分析型分布式数据库>中,我们完成了对不同"分布式数据库"的横向分析,本文Ivan将讲述拆解的第二部分,会结合NoSQL与Ne ...

  3. Distributed4:SQL Server 分布式数据库性能测试

    我使用三台SQL Server 2012 搭建分布式数据库,将一年的1.4亿条数据大致均匀存储在这三台Server中,每台Server 存储4个月的数据,Physical Server的配置基本相同, ...

  4. 【Java EE 学习 30】【闪回】【导入导出】【管理用户安全】【分布式数据库】【数据字典】【方案】

    一.闪回 1.可能的误操作 (1)错误的删除了记录 (2)错误的删除了表 (3)查询历史记录 (4)撤销已经提交了的事务. 2.对应着以上四种类型的误操作,有四种闪回类型 (1)闪回表:将表回退到过去 ...

  5. 云时代的分布式数据库:阿里分布式数据库服务DRDS

    发表于2015-07-15 21:47| 10943次阅读| 来源<程序员>杂志| 27 条评论| 作者王晶昱 <程序员>杂志数据库DRDS分布式沈询 摘要:伴随着系统性能.成 ...

  6. 怎样打造一个分布式数据库——rocksDB&comma; raft&comma; mvcc,本质上是为了解决跨数据中心的复制

    摘自:http://www.infoq.com/cn/articles/how-to-build-a-distributed-database?utm_campaign=rightbar_v2&amp ...

  7. 分布式数据库hbase详解

    新霸哥注意到了在人类随着计算机技术的发展,数据的存储量发生了很大的变化,可以用海量来形容,同时,存储的数据类型也是有多种多样的,网页,图片,视频,音频,电子邮件等等,所以在这中情况下以谷歌旗下的Big ...

  8. Amoeba是一个类似MySQL Proxy的分布式数据库中间代理层软件,是由陈思儒开发的一个开源的java项目

    http://www.cnblogs.com/xiaocen/p/3736095.html amoeba实现mysql读写分离 application  shang  2年前 (2013-03-28) ...

  9. 分布式数据库Google Spanner原理分析

    Spanner 是Google的全球级的分布式数据库 (Globally-Distributed Database) .Spanner的扩展性达到了令人咋舌的全球级,可以扩展到数百万的机器,数已百计的 ...

随机推荐

  1. 【jquery】基础知识

    jquery简介 1 jquery是什么 jquery由美国人John Resig创建,至今已吸引了来自世界各地的众多 javascript高手加入其team. jQuery是继prototype之后 ...

  2. Android 通过反射让SQlite建表如此简单

    我们通常使用SQlite的时候,假设我们有10张表,我们要写10个建表语句.而建表语句中仅仅有一些字段的名字须要改而已,这样既费时又费力,还easy出错.我们知道写sql语句的时候常常会写错,假设写错 ...

  3. 初涉JavaScript模式 &lpar;11&rpar; &colon; 模块模式

    引子 这篇算是对第9篇中内容的发散和补充,当时我只是把模块模式中的一些内容简单的归为函数篇中去,在北川的提醒下,我才发觉这是非常不严谨的,于是我把这些内容拎出来,这就是这篇的由来. 什么是模块模式 在 ...

  4. 网站SEO优化中内部链接的优化

    重要性:内链有效的优化能够间接的提高某页面的权重达到搜索排名靠前的效果.同时有效的带领搜索引擎蜘蛛对整站进行抓取. 网站头部导航: 这个导航称为'网站主导航',当用户来到网站需要给他们看到的内容.也就 ...

  5. c&sol;c&plus;&plus;中typedef详解

    1. typedef 最简单使用 typedef long byte_4; // 给已知数据类型long起个新名字,叫byte_4 你可以在任何需要 long 的上下文中使用 byte_4.注意 ty ...

  6. 1709&colon; &lbrack;Usaco2007 Oct&rsqb;Super Paintball超级弹珠

    1709: [Usaco2007 Oct]Super Paintball超级弹珠 Time Limit: 5 Sec  Memory Limit: 64 MBSubmit: 339  Solved:  ...

  7. object-fit 解决图片指定大小被压缩问题

    object-fit 解决图片指定大小被压缩问题 第一次遇到这个属性,是在给video 写 poster的时候,选取的作为poster的img的尺寸有点小,导致video播放器两边有留白.在控制台查看 ...

  8. filter帅选

    var ages = [32, 33, 16, 40]; ages= ages.filter(function checkAdult(obj) {//obj表示数组中的每个元素 return obj ...

  9. &lbrack;ZOJ 4062&rsqb;&lbrack;2018ICPC青岛站&rsqb;&lbrack;Plants vs&period; Zombies&rsqb;

    http://acm.zju.edu.cn/onlinejudge/showProblem.do?problemCode=4062 题目大意:给一个大小为n的数组,数组编号从1到n,每一个元素的值代表 ...

  10. Java编程思想&lpar;十五&rpar; —— 类型信息之反射

    讲完.class,Class之后,继续. 1)泛化的Class引用 Class也能够增加泛型,增加之后会进行类型检查. 贴一下书上原话,Class<?>优于Class,尽管他们是等价的,C ...