@@ -1 +1 @@ -scalaVersions = 2.12.8 \ No newline at end of file +scalaVersions = 2.11.12 diff --git a/src/main/scala/com/couchbase/spark/sql/streaming/CouchbaseSource.scala b/src/main/scala/com/couchbase/spark/sql/streaming/CouchbaseSource.scala index 1962e06..1da7240 100644 --- a/src/main/scala/com/couchbase/spark/sql/streaming/CouchbaseSource.scala +++ b/src/main/scala/com/couchbase/spark/sql/streaming/CouchbaseSource.scala @@ -169,8 +169,9 @@ class CouchbaseSource(sqlContext: SQLContext, userSchema: Option[StructType], override def getBatch(start: Option[Offset], end: Offset): DataFrame = { logInfo(s"GetBatch called with start = $start, end = $end") - val startOffset = start.map(_.asInstanceOf[CouchbaseSourceOffset]) - val endOffset = end.asInstanceOf[CouchbaseSourceOffset] + val startOffset = start.map(CouchbaseSourceOffset.convertToCouchbaseSourceOffset(_)) + val endOffset = CouchbaseSourceOffset.convertToCouchbaseSourceOffset(end) + val results = mutable.ArrayBuffer[(Array[Byte], Array[Byte])]() endOffset.partitionToOffsets.foreach(o => { diff --git a/src/main/scala/com/couchbase/spark/sql/streaming/CouchbaseSourceOffset.scala b/src/main/scala/com/couchbase/spark/sql/streaming/CouchbaseSourceOffset.scala index 4a77952..ef34204 100644 --- a/src/main/scala/com/couchbase/spark/sql/streaming/CouchbaseSourceOffset.scala +++ b/src/main/scala/com/couchbase/spark/sql/streaming/CouchbaseSourceOffset.scala @@ -39,6 +39,16 @@ object CouchbaseSourceOffset { } } + def convertToCouchbaseSourceOffset(offset: Offset): CouchbaseSourceOffset = { + offset match { + case o: CouchbaseSourceOffset => o + case so: SerializedOffset => CouchbaseSourceOffset(so) + case _ => + throw new IllegalArgumentException( + s"Invalid conversion from offset of ${offset.getClass} to CouchbaseSourceOffset") + } + } +