If I try to upsert to the pinecone index (Pod) using the following method (referred from Databricks - Pinecone Docs)
(
df.write
.option("pinecone.apiKey", api_key)
.option("pinecone.indexName", index_name)
.option("pinecone.projectName", project_name)
.option("pinecone.environment", environment)
.format("io.pinecone.spark.pinecone.Pinecone")
.mode("append")
.save()
)
I am getting
org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 4584.0 failed 4 times, most recent failure: Lost task 5.3 in stage 4584.0 (TID 1437233) (executor 683): io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
File <command-2093907735488114>, line 24
14 for attempt in range(max_retries):
15 try:
16 (
17 df.write
18 .option("pinecone.apiKey", api_key)
19 .option("pinecone.indexName", index_name)
20 .option("pinecone.projectName", project_name)
21 .option("pinecone.environment", environment)
22 .format("io.pinecone.spark.pinecone.Pinecone")
23 .mode("append")
---> 24 .save()
25 )
26 break # Exit the loop if the write operation is successful
27 except Exception as e:
File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function.<locals>.wrapper(*args, **kwargs)
45 start = time.perf_counter()
46 try:
---> 47 res = func(*args, **kwargs)
48 logger.log_success(
49 module_name, class_name, function_name, time.perf_counter() - start, signature
50 )
51 return res
File /databricks/spark/python/pyspark/sql/readwriter.py:1679, in DataFrameWriter.save(self, path, format, mode, partitionBy, **options)
1677 self.format(format)
1678 if path is None:
-> 1679 self._jwrite.save()
1680 else:
1681 self._jwrite.save(path)
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1355, in JavaMember.__call__(self, *args)
1349 command = proto.CALL_COMMAND_NAME +\
1350 self.command_header +\
1351 args_command +\
1352 proto.END_COMMAND_PART
1354 answer = self.gateway_client.send_command(command)
-> 1355 return_value = get_return_value(
1356 answer, self.gateway_client, self.target_id, self.name)
1358 for temp_arg in temp_args:
1359 if hasattr(temp_arg, "_detach"):
File /databricks/spark/python/pyspark/errors/exceptions/captured.py:188, in capture_sql_exception.<locals>.deco(*a, **kw)
186 def deco(*a: Any, **kw: Any) -> Any:
187 try:
--> 188 return f(*a, **kw)
189 except Py4JJavaError as e:
190 converted = convert_exception(e.java_exception)
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o478.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 4584.0 failed 4 times, most recent failure: Lost task 5.3 in stage 4584.0 (TID 1437233) ([host IP] executor 683): io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
Channel Pipeline: [SslHandler#0, ProtocolNegotiators$ClientTlsHandler#0, WriteBufferingAndExceptionHandler#0, DefaultChannelPipeline$TailContext#0]
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
at