Description
For my current project I need to set TTL for documents when I am writing a dataframe.
diff --git a/src/main/scala/com/couchbase/spark/sql/streaming/CouchbaseSink.scala b/src/main/scala/com/couchbase/spark/sql/streaming/CouchbaseSink.scala
|
index dae49c7..c11ca02 100644 |
--- a/src/main/scala/com/couchbase/spark/sql/streaming/CouchbaseSink.scala
|
+++ b/src/main/scala/com/couchbase/spark/sql/streaming/CouchbaseSink.scala
|
@@ -44,6 +44,10 @@ class CouchbaseSink(options: Map[String, String]) extends Sink with Logging { |
val removeIdField = options.getOrElse("removeIdField", "true").toBoolean |
val timeout = options.get("timeout").map(v => Duration(v.toLong, MILLISECONDS)) |
|
+ val createDocument = options.get("expiry").map(_.toInt) |
+ .map(expiry => (id: String, content: JsonObject) => JsonDocument.create(id, expiry, content))
|
+ .getOrElse((id: String, content: JsonObject) => JsonDocument.create(id, content))
|
+
|
data.toJSON
|
.queryExecution
|
.toRdd
|
@@ -60,7 +64,7 @@ class CouchbaseSink(options: Map[String, String]) extends Sink with Logging { |
encoded.removeKey(idFieldName)
|
}
|
|
- JsonDocument.create(id.toString, encoded)
|
+ createDocument(id.toString, encoded)
|
}
|
.saveToCouchbase(bucketName, StoreMode.UPSERT, timeout)
|
}
|
|