GRPC Insert - Broken Pipe and timeouts

I am using the GRPC client of pinecone to do inserts. At times it works and at times it doesnt. I am following instructions on Performance tuning

When I try to do inserts, I get following message

Traceback (most recent call last): File “${HOME}/anaconda3/envs/pinecone_insert/lib/python3.8/site-packages/pinecone/core/grpc/index_grpc.py”, line 227, in result self._delegate.result(timeout=timeout) File “${HOME}/anaconda3/envs/pinecone_insert/lib/python3.8/site-packages/grpc/_channel.py”, line 744, in result raise self grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = “Broken pipe” debug_error_string = “{“created”:”@1648071646.318306073",“description”:“Error received from peer ipv4:34.127.5.128:443”,“file”:“src/core/lib/surface/call.cc”,“file_line”:1067,“grpc_message”:“Broken pipe”,“grpc_status”:14}" >
The above exception was the direct cause of the following exception:
Traceback (most recent call last): File “upserter.py”, line 55, in [async_result.result() for async_result in async_results] File “upserter.py”, line 55, in [async_result.result() for async_result in async_results] File “${HOME}/anaconda3/envs/pinecone_insert/lib/python3.8/site-packages/pinecone/core/grpc/index_grpc.py”, line 229, in result raise PineconeException(e._state.debug_error_string) from e pinecone.core.exceptions.PineconeException: {“created”:"@1648071646.318306073",“description”:“Error received from peer ipv4:34.127.5.128:443”,“file”:“src/core/lib/surface/call.cc”,“file_line”:1067,“grpc_message”:“Broken pipe”,“grpc_status”:14}

Hey @ab1 ,

Can you share your upsert code snippet and the index name?
Also, do you see this only for async upserts?

PINECONE_INDEX='--'
VECTOR_COUNT = 100000
VECTOR_DIM = 384
BATCH_SIZE=100
index = pinecone.GRPCIndex(PINECONE_INDEX)
example_data_generator = map(lambda i: (f'id-{i}', [random.random() for _ in range(VECTOR_DIM)]), range(VECTOR_COUNT))
with index:
    async_results = [
        index.upsert(vectors=chunk, async_req=True)
        for chunk in chunks(example_data_generator, batch_size=BATCH_SIZE)
    ]

    # Wait for and retrieve responses (in case of error)
    [async_result.result() for async_result in async_results]te code here

I see it only for async upserts. If I dont do async upsert, it doesnt complete the job e.g. for 1M inserts, it only ends up adding 300-350K. With async at times it does all 1M inserts at times it doest.

I think the non-async behavior is a bit strange. Do you see any error messages at all or is it just a silent failure?

Hey @ab1, I just used your code to insert a million vectors using both async and non-async upserts.
So I think it’s likely not a client error. I looked at the logs for your index and looks more like a connection issue to me, also broken pipe errors are a sign of connection issues (either on your or our end). Can you try using a VM in the cloud or maybe a Colab notebook for upserts? that would guarantee a stronger connection and would be good to rule out something like this.

Also, sometimes when you send a lot of requests at once, the index takes a bit (depending on number of requests) to catch up, so you may see all the vectors in the console after a few minutes.

Following is error message. It times out in pinecone/core/grpc/index_grpc.py. I have tried this from multiple locations (wifi, wired connection, AWS node, notebook etc.) Still get timeouts. I have tried so far 15-20 times and only once I have been able to to get to 1M. Otherwise it timesout much before that. Do you have any suggestions for error handling / retrys?

  File "upserter.py", line 47, in insert_data_for_chunk
    retrieved_responses += [ async_result.result() ]
  File "${HOME}/anaconda3/envs/pinecone_insert/lib/python3.8/site-packages/pinecone/core/grpc/index_grpc.py", line 227, in result
    self._delegate.result(timeout=timeout)
TypeError: result() got an unexpected keyword argument 'timeout'```

Hey @ab1,
You don’t need to store the results really, just calling .result() on them would be fine in knowing if there’s an error.
May I suggest you another approach to make the upserts faster? Instead of using async, you can try using multiple instances of the client. You can create a multiprocessing pool in python and have each of them perform upserts independently. This would be faster. However, if you don’t have enough pods on your index, this may cause some timeouts as the index won’t be able to keep up, hence we have to be a little conservative with the number of processes.

Here’s an example:

import multiprocessing
import os
import pinecone 
import uuid
import numpy as np

def upsert(data,index_name,batch):
    key =  'API_KEY'
    pinecone.init(api_key=key,environment='ENV')
    index = pinecone.GRPCIndex(index_name)
    index.describe_index_stats()
    def chunker(sqd,size):
        return (seq[pos:pos + size] for pos in range(0, len(seq), size))
    for chunk in chunker(data,batch):
        index.upsert(vectors=chunk)

if __name__=="__main__":
    mgr = multiprocessing.Manager()
    num_processes = 6
    for i in range(num_processes):
        multiprocessing.Process(target=upsert,args=(data,index_name,batch_size)



Let me give this a shot. Will keep you posted in terms of how it goes. I am going to try with 5+ pods to start with.

Sounds good. Also would be great to know more about where you are calling it from, ec2 or notebook or local, from which region, etc.

The above approach worked with little bit of finetuning. Getting good upsert rates. Doing inserts via ec2. Thanks for your help. Will let you know if any issues. We can mark the issue closed

Great, if it would help we have a Spark connector that you can use next time you want to perform some higher volume upserts.

Great… will give it a try