app = Flask(__name__)
# AstraDB connection and vector store initialization
def initialize_astra_vector_store():
cassio.init(token=ASTRA_DB_APPLICATION_TOKEN, database_id=ASTR_DB_ID)
embedding = OpenAIEmbeddings(openai_api_key=OPENAI_API_KEY)
astra_vector_store = Cassandra(
embedding=embedding,
table_name=table_name,
session=None,
keyspace=None
)
print('----------------------table created as table_name:', table_name)
return astra_vector_store
.
.
.
.flask app routes----------------
.
.
def delete_table():
print('----------------------inside delete_table')
db = AstraDB(
token=os.getenv('ASTRA_DB_APPLICATION_TOKEN'),
api_endpoint='<api-endpoint>',
)
print('----------------------table_name:', table_name)
# Drop the table created for this session
db.delete_collection(collection_name=table_name)
print("----------------------APP EXITED----------------------")
atexit.register(delete_table)
if __name__ == '__main__':
app.run(debug=False)
现在,当我运行应用程序时,我收到错误如下
Exception in thread Task Scheduler:
Traceback (most recent call last):
File "C:\Users\HP\AppData\Local\Programs\Python\Python311\Lib\threading.py", line 1038, in _bootstrap_inner
self.run()
File "cassandra\cluster.py", line 4239, in cassandra.cluster._Scheduler.run
File "C:\Users\HP\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\thread.py", line 167, in submit
raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
Exception in thread Task Scheduler:
Traceback (most recent call last):
File "C:\Users\HP\AppData\Local\Programs\Python\Python311\Lib\threading.py", line 1038, in _bootstrap_inner
self.run()
File "cassandra\cluster.py", line 4239, in cassandra.cluster._Scheduler.run
File "C:\Users\HP\AppData\Local\Programs\Python\Python311\Lib\concurrent\futures\thread.py", line 167, in submit
raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
当我运行应用程序并关闭它时,我收到错误,但我希望它只是删除在应用程序期间创建的
astraDB
表...尽管当我运行然后关闭应用程序时该表被删除... .但出现此错误。
首先让我澄清一下,您的代码似乎在某种程度上混合了 Astra DB 的两种不同方法:
cassio
和 langchain_community.vectorstores.Cassandra
astrapy.db.AstraDB
(您在关闭代码中使用)和 langchain_astradb.AstraDBVectorStore
(您将在应用程序中使用它,如果它完全基于关于这种方法)这两种方法各有优缺点(例如,如果不熟悉 Cassandra,Data API 当然更容易使用且更惯用;相反,第一种方法适用于任何 Cassandra 5+ 集群,无论其部署如何).
在实践中,脚本在第一个范例中创建一个向量存储(因此是数据库上的基础表),然后使用数据 API 删除该向量存储。这似乎可行,但这种混合很难达到预期用途。如果您想坚持使用 CassIO 方法,我建议将关闭部分替换为如下代码(参见相关的 CassIO 文档):
def delete_table_cql():
print(f"Deleting table {store}")
# the following two lines expect cassio.init() has been run already
_session = cassio.config.resolve_session()
_keyspace = cassio.config.resolve_keyspace()
# issue a CQL command to drop the table
_session.execute(f"DROP TABLE IF EXISTS {_keyspace}.{table_name};")
atexit.register(delete_table_cql)
话虽这么说,让我们来谈谈主要问题,“关闭后的期货”症状。
此问题并非 CassIO 特有,而是源自 Python Cassandra 驱动程序(CassIO 内部使用该驱动程序;而 LangChain
Cassandra
存储又使用 CassIO)。
发生的情况是,驱动程序利用特定的执行器来执行一些异步任务(会话生命周期内的清理/内部维护),这显然不是用于运行查询的任务(即使您运行了异步查询) )。您收到的错误来自这样一个“隐藏”驱动程序看门人的任务安排得太晚,即当应用程序关闭并且该特定执行程序不再可用时。尽管出现了这个看起来可怕的“错误”,显式查询(例如上面的 DROP TABLE 命令)仍将 100% 成功完成。 (无论驱动程序在内部进行什么资源清理,谁在乎?无论如何它都会被销毁,对吧?)。
更准确地说:有趣的是,该脚本的关闭挂钩中没有任何与 Cassandra 驱动程序相关的内容(即,如前所述,是纯数据 API 的东西,即仅基于 HTTP 请求)。事实上,您显示的关闭挂钩在功能上等同于以下内容:
def delete_table_bare_http():
import requests
req = requests.post(
f"{ASTRA_DB_API_ENDPOINT}/api/json/v1/default_keyspace",
headers={"token": ASTRA_DB_APPLICATION_TOKEN},
json={"deleteCollection": {"name": table_name}},
)
assert req.status_code == 200
atexit.register(delete_table_bare_http)
...但是,您应该再次调整此挂钩,使其像其余代码一样基于 CQL,或者考虑将整个应用程序切换到Astra DB(基于数据 API)简而言之,请完全忽略此错误:保证您的请求执行到底。
来源:我是 CassIO 的主要作者 + 我已经就这个问题与 Python Cassandra 驱动程序的主要维护者之一进行了几次对话。