xiaozhigang

长风破浪会有时,直挂云帆济沧海。

简述

​ RAG的主流程其实比较简单,流程前面已经说过了,简单看一下下面的图了解一下就行。

image-20250621160518166

​ 从上面的图,可以看到,RAG在回答用户问题的时候,向量数据库的交互式非常重要的一环,如果我们能提高向量库的召回质量也就能提高整个答案生成的质量。

强化索引

​ 现在我们知道提高向量库的召回质量能提高整个提问答案的生成质量,怎么提高向量库交互的召回质量有成了新的问题。之前我们说过向量库的查询召回是通过向量的比较实现的,与提问越相近的向量被召回的概率越高。现在我们只要提高文档的向量质量,这样才能提高查询召回的准确性和质量。

MRI

​ MRI(Multi-representation Indexing)多表示强化索引,为每个文档构建多个不同视角的向量表示。其实这个很好理解,我们一句话从不同的角度去理解包含了不同的信息,比如“我今天下午在写博客”这句话,第一个角度理解,就是字面意思,我今天下午写博客,换一个角度,我今天下午还活着,不然也不能写博客。从上面的示例,我们从两个角度获得了两个信息,推广一下,一段话、一篇文章也一样,不同的角度的理解能得到不同的信息,也就能根据这些不同的信息生成不同的向量。

​ 不同维度的理解更加丰富了文章的定义,不同维度的理解也催生了多向量多文章的描述,从而使文章的向量定义更加准确,也使我们查询召回的时候更加准确。

image-20250621160518166

​ 在实际应用中,我们多以父子文档的方式去处理MRI,父文档是原始文档,子文档都是对父文档的摘要、标题等多维度理解表达,我们通过将子文档存入向量库,通过提问向量匹配召回后,通过这些理解表达再找回父文档返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import uuid
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.storage import InMemoryByteStore
from langchain.retrievers.multi_vector import MultiVectorRetriever

# 假设我们有两个原始文档
raw_docs = [
"LangChain is a framework for developing applications powered by language models.",
"Chroma is a vector database used for similarity search over embeddings."
]

# 为每个原始文档生成多个摘要(手动模拟,也可以用 LLM 自动生成)
summaries = [
["LangChain is a framework for LLM apps.", "It helps chain language model calls."],
["Chroma is a vector store.", "It supports similarity search for embeddings."]
]

# 给每个原始文档生成唯一 ID
doc_ids = [str(uuid.uuid4()) for _ in raw_docs]

# 准备子文档(用于嵌入、向量检索)
summary_docs = []
for i, summary_list in enumerate(summaries):
for s in summary_list:
summary_docs.append(Document(page_content=s, metadata={"doc_id": doc_ids[i]}))

# 向量数据库用于存储子文档(summary)
embedding_function = OpenAIEmbeddings()
vectorstore = Chroma(collection_name="multi_index_demo", embedding_function=embedding_function)

# 用于存储父文档(原文)
byte_store = InMemoryByteStore()
id_key = "doc_id"

# 构建多向量检索器
retriever = MultiVectorRetriever(
vectorstore=vectorstore,
byte_store=byte_store,
id_key=id_key,
)

# 添加子文档到向量库
retriever.vectorstore.add_documents(summary_docs)

# 添加原文(父文档)到字节存储中
retriever.docstore.mset(list(zip(doc_ids, raw_docs)))

# 🔍 测试查询
query = "What is a vector database?"
results = retriever.get_relevant_documents(query, n_results=1)

print("🔍 用户查询:", query)
print("📄 匹配到的原始文档:", results[0].page_content)

RAPTOR

RAPTOR 是一种 高效构建多表示索引(multi-representation indexing)的新方法,上面的MRI相当于多维度广度上的处理,一般有广度处理就有深度处理。刚才是从维度上对文章做的处理,现在我们从深度上做处理。我们可以将一篇文章拆分成多个段落,然后对每个段落进行提炼总结,总结出一个个标题,再对这些标题做呢向量处理,生成出向量,这样我们查询到的是一个个段落,这样我们的查询结果会更加精准。

RAPTOR 的核心思想是:先用结构化方法将文档拆分为多个子块(chunk),再用 LLM 为每个子块生成一个有语义的标题作为向量索引的表示

image-20250621160518166

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 伪代码:chunk → title → embedding
from langchain_core.documents import Document
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.vectorstores import FAISS

llm = ChatOpenAI(model="gpt-4")
embed = OpenAIEmbeddings()

chunks = [...] # 原始文档分块
title_chunks = []

for chunk in chunks:
title = llm.invoke(f"为以下内容生成一个主题标题:\n\n{chunk}")
title_chunks.append(Document(page_content=title, metadata={"chunk": chunk}))

# 把标题作为表示加入向量库
vectorstore = FAISS.from_documents(title_chunks, embed)

# 检索时命中标题 → 返回原 chunk(从 metadata 中获取)

ColBERT

ColBERTColumn-wise BERT改进语义检索质量的深度表示方法,但它的思路和实现非常独特,ColBERT 是一种 细粒度(token-level)语义检索方法,它把文档和查询都表示成 多个 token 向量,然后通过 最大相似度匹配机制 来计算查询与文档的相关性,从而提升精度和效率。和上面的REAPTOR 类似单不同,REAPTOR是拆分子块,然后生成标题,这个ColBERT直接就拆分成token了,可以理解为拆分的更细更碎,且没有提炼生成标题的阶段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from ragatouille import RAGPretrainedModel
import requests

# 加载 ColBERT 模型
RAG = RAGPretrainedModel.from_pretrained("colbert-ir/colbertv2.0")

# 用 Wikipedia API 获取文章内容
def get_wikipedia_page(title: str):
URL = "https://en.wikipedia.org/w/api.php"
params = {
"action": "query",
"format": "json",
"titles": title,
"prop": "extracts",
"explaintext": True,
}
headers = {"User-Agent": "ColBERT-example/0.0.1 (your_email@example.com)"}
response = requests.get(URL, params=params, headers=headers)
data = response.json()
page = next(iter(data["query"]["pages"].values()))
return page["extract"] if "extract" in page else None

# 获取文章内容
doc_text = get_wikipedia_page("Hayao Miyazaki")

# 用 ColBERT 构建向量索引
RAG.index(
collection=[doc_text],
index_name="miyazaki-demo-index",
max_document_length=180,
split_documents=True # 会将长文分割成 chunks
)

# 查询示例
query = "Which animation studio did Miyazaki found?"
results = RAG.search(query=query, k=3)

# 打印结果
for rank, hit in enumerate(results, 1):
print(f"Rank {rank}:")
print("Score:", hit['score'])
print("Passage:\n", hit['content'])
print("=" * 80)

三者异同

​ 我们来详细对比 MRI(Multi-representation Indexing)RAPTORColBERT 三者的异同点

特性 MRI RAPTOR ColBERT
📌 核心思路 同一段文本生成多个不同视角的表示向量(如主题、意图、关键词等) 将文档划分为语义段(小粒度),为每段生成简洁标题(父文档)作为召回锚点 将文本拆成 token 粒度,用户 query 也拆成 token,然后进行 token 级别的Late Interaction 匹配
🎯 优势 多角度语义增强召回,能捕捉不同用户提问方式 结构化文档、压缩检索空间,提高精确召回率 更细粒度的匹配,可解释性强,适合长文高密度信息检索
🧩 粒度 Chunk/Paragraph 级 Paragraph + Title Token 级别
🧠 表示方式 多向量(multi-head) 父子文档(title-child) Token-level 表示
📦 模型类型 通常是 dense encoder(可多次 encode) 标题生成模型 + Dense encoder 特殊结构模型(支持 Late Interaction),如 ColBERT 模型
🧪 检索方式 向量 ANN 父文召回 + 子文 rerank Token-Level MaxSim 聚合
🧰 代表实现 HyDE-MRI、[NeMo RAG] RAPTOR (Hofstätter et al. 2023) ColBERT, ColBERTv2

​ 每一种方法都有优劣,这时候你应该能很敏感的get到,如果能将三种方法综合在一起应该是个很棒的方案,那这个方案是否存在呢,肯定是存在的。如下流程整合:

  1. 文档分割(RAPTOR)
    • 用 RAPTOR 的方式划分语义段落,为每段生成标题(构建父子结构);
  2. 多表示索引(MRI)
    • 对每段生成多种视角的表示向量(主题向量 + 关键词向量 + 语义向量);
  3. Token-level 检索(ColBERT)
    • 用户 Query 使用 ColBERT 拆成 token 粒度匹配以 rerank 最终结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from ragatouille import RAGPretrainedModel
from transformers import pipeline
import requests

# 加载 ColBERT 模型(用于 Token-level 检索)
RAG = RAGPretrainedModel.from_pretrained("colbert-ir/colbertv2.0")

# 用标题生成模型(模拟 RAPTOR 的“父文档”)
title_generator = pipeline("text2text-generation", model="google/flan-t5-base")

# 获取 Wikipedia 页面
def get_wikipedia_page(title: str):
URL = "https://en.wikipedia.org/w/api.php"
params = {"action": "query", "format": "json", "titles": title, "prop": "extracts", "explaintext": True}
headers = {"User-Agent": "ColBERT-MRI-RAPTOR/0.0.1 (you@domain.com)"}
resp = requests.get(URL, params=params, headers=headers)
page = next(iter(resp.json()["query"]["pages"].values()))
return page.get("extract", "")

# 文档拆分成段落并生成标题(RAPTOR思路)
def split_and_title(document):
chunks = [chunk for chunk in document.split("\n\n") if len(chunk.strip()) > 100]
titled_chunks = []
for chunk in chunks:
title = title_generator(f"Generate title: {chunk[:200]}", max_new_tokens=12)[0]["generated_text"]
titled_chunks.append({"title": title, "content": chunk})
return titled_chunks

# MRI:多表示(示例:原文 + 关键词抽取 + 问题改写)
def multi_view_representations(titled_chunks):
views = []
for chunk in titled_chunks:
views.append(chunk["content"]) # 原始 chunk
views.append(chunk["title"]) # RAPTOR 的标题
# 关键词表示(模拟 MRI)
views.append("Keywords: " + ", ".join(chunk["content"].split()[:10]))
return views

# 构建索引
def index_document(title):
full_text = get_wikipedia_page(title)
titled_chunks = split_and_title(full_text)
mri_texts = multi_view_representations(titled_chunks)
RAG.index(collection=mri_texts, index_name=f"RAG-MRI-RAPTOR-{title}", max_document_length=180)

# 查询
def search(query, k=3):
results = RAG.search(query=query, k=k)
for i, hit in enumerate(results, 1):
print(f"Rank {i}: [Score: {hit['score']:.2f}]")
print(hit['content'])
print("="*80)

# 示例运行
index_document("Hayao Miyazaki")
search("Which animation studio did Hayao Miyazaki found?")

简述

​ RAG的主流程其实比较简单,流程前面已经说过了,简单看一下下面的图了解一下就行。

image-20250621160518166

​ 从上面的图,可以看到,RAG在回答用户问题的时候,最少做了两次交互:1、与向量数据库的交互,2、与大模型的交互。如果我们对向量数据库和大模型有了细分,比如物理相关文档都放在物理向量数据库,物理大模型也是全部都用物理数据训练的,如果用户提问的是物理问题,那与物理向量数据库和物理大模型交互是不是能得到更好的答案,答案是肯定的,毕竟没有其他数据的污染,会查询生产的更快更准。

路由选择

​ 从上面我们看到了两个地方可以做路由选择:向量数据库、大模型。其实除了这两个地方,还有一个隐形的地方,那就是提示词模版。我们调用大模型的时候是要输入提示词的,提示词都是按模版输入的,如果我们细分做好分类,按不同的问题选择特定的模版,也能提高我们生成更好答案的概率。

​ 说直白一点,就是根据问题的类型,选择相应的向量库、模版和大模型。

向量数据库路由

​ 其实我们是没有办法直接区分用户问题的类型的,但是我们可以让大模型去做区分。拿到区分后的结果,去选择相应的路由。

graph LR
A(开始) --> B[划定分类范围]
B --> C[大模型判断分类]
C --> D[选择向量库]
D --> E(结束)

​ 1、划定分类:根据我们已有的向量数据库设置好分类,规范好分类种类。

​ 2、大模型判断:输入用户问题,和我们设置好的分类,让大模型判断出用户输入问题是分类中的哪一类,或者说与分类中哪一类相关性最高。

​ 3、选择向量库:根据大模型的判断结果,选择已有的向量库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from typing import Literal

from langchain_core.prompts import ChatPromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_openai import ChatOpenAI

# Data model
class RouteQuery(BaseModel):
"""Route a user query to the most relevant datasource."""

datasource: Literal["python_docs", "js_docs", "golang_docs"] = Field(
...,
description="Given a user question choose which datasource would be most relevant for answering their question",
)

# LLM with function call
llm = ChatOpenAI(model="gpt-3.5-turbo-0125", temperature=0)
structured_llm = llm.with_structured_output(RouteQuery)

# Prompt
system = """You are an expert at routing a user question to the appropriate data source.

Based on the programming language the question is referring to, route it to the relevant data source."""

prompt = ChatPromptTemplate.from_messages(
[
("system", system),
("human", "{question}"),
]
)

# Define router
router = prompt | structured_llm

提示词模版路由

​ 套路显然和上面的向量库的套路一样,用大模型获取到分类,在用分类去获取预设好的提示词模版。代码类同,能拿到对应的分类,找到分类对应的模版即可。

graph LR
A(开始) --> B[划定分类范围]
B --> C[大模型判断分类]
C --> D[选择提示词模版]
D --> E(结束)

大模型路由

​ 套路一样,用大模型的判断结果去选择相应的大模型。代码类同。

graph LR
A(开始) --> B[划定分类范围]
B --> C[大模型判断分类]
C --> D[选择大模型]
D --> E(结束)

其他路由选择方法

​ 除了上述使用大模型判断分类,还有没有其他方法判断分类,肯定是有的。以向量计算相关性,判断出最相关的分类。使用embeddings计算出预设模版的向量,再计算出用户问题的向量,最后找出与问题向量最相关的预设模版。能找出模版也就知道预设模版的分类,使用分类去找向量库和大模型即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from langchain.utils.math import cosine_similarity
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

# Two prompts
physics_template = """You are a very smart physics professor. \
You are great at answering questions about physics in a concise and easy to understand manner. \
When you don't know the answer to a question you admit that you don't know.

Here is a question:
{query}"""

math_template = """You are a very good mathematician. You are great at answering math questions. \
You are so good because you are able to break down hard problems into their component parts, \
answer the component parts, and then put them together to answer the broader question.

Here is a question:
{query}"""

# Embed prompts
embeddings = OpenAIEmbeddings()
prompt_templates = [physics_template, math_template]
prompt_embeddings = embeddings.embed_documents(prompt_templates)

# Route question to prompt
def prompt_router(input):
# Embed question
query_embedding = embeddings.embed_query(input["query"])
# Compute similarity
similarity = cosine_similarity([query_embedding], prompt_embeddings)[0]
most_similar = prompt_templates[similarity.argmax()]
# Chosen prompt
print("Using MATH" if most_similar == math_template else "Using PHYSICS")
return PromptTemplate.from_template(most_similar)


chain = (
{"query": RunnablePassthrough()}
| RunnableLambda(prompt_router)
| ChatOpenAI()
| StrOutputParser()
)

print(chain.invoke("What's a black hole"))

简述

​ 这周遇到两个新问题,就是大文件的上传和下载。

场景 之前 问题
大文件上传云存储 文件上传到服务器,在从服务器上传到云存储 1、通过nginx、nacos转发耗时太长 
2、太消耗服务器资源
用户下载视频 从其他端下载视频,再将视频转发给客户端 同上

​ 这两个问题的解决,还得感谢chatgpt,这玩意还是好用的。

大文件上传

​ 之前都是直接将文件上传到服务器,通过服务器将文件上传到云端,拿到链接后保存链接。这样刚开始使用的时候还行,上传的文件比较小,后来要上传大文件,问题就暴露出来了,耗时太长。文件要经过nginx、nacos的转发,一个200M的文件,到服务器后端代码开始处理已经过去十多分钟了。除了耗时,还太消耗资源,nginx、nacos、spring服务这每一项都要转发一下大文件,都要消耗内存流量。

​ 解决方案:1、改长相应时间,2、修改设计方案。第一个解决方案明显不行,没解决根本问题,耗时长,资源消耗大的问题还是存在。那就只能考虑第二个解决方案了。

​ 其实刚开始,我也没啥好的解决方案(见识短浅)。后来问了一下chatgpt,解决方案直接就出来了,生成预链接,让前端通过预链接上传文件。说是业界都用的这种方案,我们都不知道,还是落后呢脱节了啊。我们公司用的是AWS的S3存储,它是支持生成预链接的,除了AWS,其他的云厂商都是支持的(阿里云,腾讯云等)。为了保障链接的有效性,我在后加了一个地址校验,校验一下预链接的地址前端有没有上传文件,防止前端没有上传。代码编写随便百度一下大把。

image-20250705170312520

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 生成一个可用于 PUT 上传的预签名 URL
*
* @param bucketName S3桶名
* @param objectKey 上传到 S3 的 key(路径+文件名)
* @param contentType 上传内容类型(如 "image/jpeg", "application/pdf")
* @param validMinutes URL 有效时间(分钟)
* @return 可用的 PUT 请求 URL
*/
public URL generatePresignedPutUrl(String bucketName, String objectKey, String contentType, int validMinutes) {
// 初始化 S3 PreSigner(线程安全,可复用)
try (S3Presigner preSigner = S3Presigner.builder()
.region(REGION)
.credentialsProvider(StaticCredentialsProvider.create(
AwsBasicCredentials.create(ACCESS_KEY, SECRET_KEY)
))
.build()) {

PutObjectRequest putObjectRequest = PutObjectRequest.builder()
.bucket(bucketName)
.key(objectKey)
.acl(ObjectCannedACL.PUBLIC_READ)
.contentType(contentType)
.build();

PresignedPutObjectRequest preSignedRequest = preSigner.presignPutObject(builder -> builder
.putObjectRequest(putObjectRequest)
.signatureDuration(Duration.ofMinutes(validMinutes)) // 有效期
);
// 可用于前端 PUT 上传
return preSignedRequest.url();
}
}

视频下载

​ 背景:本来可以提供三方的视频链接直接给前端下载的,可以视频链接有反扒校验,预ip绑定,其他ip都打不开链接,所以只能后端服务器下载了传给前端。

​ 初次解决方案:服务器直接下载完,再转发给前端下载,但是这样要与前端保持链接的时间太长了,服务端要下载一遍,前端也要接着下载一遍。同时也消耗服务器资源,带宽流量和服务器内存。

​ 优化方案:通过服务器直接透传给前端,这样服务器不用保存,可以减少内存的压力,同时也减少相应的时间。不过这也有问题,前端没那么稳定,会断,导致整个视频下载不了。

​ 最终方案:分片返回给前端,将一个视频流切成很多片,每次请求一片,这样将一个长连接变成了许多短连接。不会因前端的一次断开,导致整个下载失败。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@PostMapping("/pronhub/url/download")
public ResponseEntity<StreamingResponseBody> downloadVideoWithRange(
HttpServletRequest request,
HttpServletResponse response,
@RequestBody PronhubUrlDownLoadRequest body) throws IOException {

String videoUrl = body.getUrl();
URL url = new URL(videoUrl);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestProperty("User-Agent", "Mozilla/5.0");

int responseCode = conn.getResponseCode();
if (responseCode != 200) {
return ResponseEntity.status(responseCode)
.contentType(MediaType.TEXT_PLAIN)
.body(out -> out.write(("视频请求失败: " + responseCode).getBytes(StandardCharsets.UTF_8)));
}

String contentType = conn.getContentType();
int fullLength = conn.getContentLength();

String rangeHeader = request.getHeader("Range");
int start = 0;
int end;

if (rangeHeader != null && rangeHeader.startsWith("bytes=")) {
String[] ranges = rangeHeader.replace("bytes=", "").split("-");
start = Integer.parseInt(ranges[0].trim());
if (ranges.length > 1 && !ranges[1].isEmpty()) {
end = Integer.parseInt(ranges[1].trim());
} else {
// 如果客户端只给了 start,没有 end,主动限制最多 CHUNK_SIZE
end = Math.min(start + CHUNK_SIZE - 1, fullLength - 1);
}
} else {
// 没有 Range 请求时,默认返回从头开始的一个 5MB 分片
end = Math.min(start + CHUNK_SIZE - 1, fullLength - 1);
}

int rangeLength = end - start + 1;
conn.setRequestProperty("Range", "bytes=" + start + "-" + end);
InputStream inputStream = conn.getInputStream();

final int finalStart = start;
final int finalEnd = end;

StreamingResponseBody stream = outputStream -> {
try (InputStream in = inputStream) {
byte[] buffer = new byte[8192];
int bytesRead;
long totalRead = 0;
long lastFlush = System.currentTimeMillis();

while ((bytesRead = in.read(buffer)) != -1 && totalRead < rangeLength) {
int toWrite = (int) Math.min(bytesRead, rangeLength - totalRead);
outputStream.write(buffer, 0, toWrite);
totalRead += toWrite;

if (System.currentTimeMillis() - lastFlush > 2000) {
outputStream.flush();
lastFlush = System.currentTimeMillis();
}
}

outputStream.flush();
} catch (IOException ex) {
if (ex.getMessage().contains("Broken pipe")) {
log.warn("客户端连接中断: {}", ex.getMessage());
} else {
log.error("视频传输异常: ", ex);
}
}
};

return ResponseEntity.status(HttpStatus.PARTIAL_CONTENT)
.header("Accept-Ranges", "bytes")
.header("Content-Type", contentType != null ? contentType : "video/mp4")
.header("Content-Length", String.valueOf(rangeLength))
.header("Content-Range", String.format("bytes %d-%d/%d", finalStart, finalEnd, fullLength))
.header("Full-Length", String.valueOf(fullLength))
.header("Access-Control-Expose-Headers", "Content-Range,Full-Length")
.header("Content-Disposition", "attachment; filename=video.mp4")
.body(stream);
}

简述

​ RAG的主流程其实比较简单,就是先将用户的提问去向量数据库查询相关文档,再将文档和用户提问组装发送给大模型生成答案,最后将答案返回给用户。其中主要的组件只有向量数据库和大模型,只是两相组合可以玩出花来。

image-20250621160518166

丰富提示词

​ RAG意思是检索增强生成,大模型的生成都是根据提示词生成的,而向量库的存在就是为了丰富我们的提示词,丰富了我们的提示词,大模型才能生成出更理想更丰富合适的答案。那怎么丰富我们的提示词呢?

多版本查询

​ 在我们用问题向量化之后向量去检索向量库得到文档的时候,我们能不能尽可能多的获取相关性的文档,比如说,我的提问是用汉语提问题的,这时候向量化之后一定会带有汉语的向量,导致检索向量库的时候检索到可能都是汉语的文档,其实有些问题的英语文档或者其他语言的文档更全面,如果检索到这些文档,去丰富我们的提示词,生成的答案就会更好。

​ 当然我们在提问的时候不可能每个语言或者其他角度都描述到,这时候我们可以让llm帮我们生成这些版本的问题,去检索到更多更有效的文档。

image-20250621160518166

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from langchain.prompts import ChatPromptTemplate

# Multi Query: Different Perspectives
template = """You are an AI language model assistant. Your task is to generate five
different versions of the given user question to retrieve relevant documents from a vector
database. By generating multiple perspectives on the user question, your goal is to help
the user overcome some of the limitations of the distance-based similarity search.
Provide these alternative questions separated by newlines. Original question: {question}"""
prompt_perspectives = ChatPromptTemplate.from_template(template)

from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

generate_queries = (
prompt_perspectives
| ChatOpenAI(temperature=0)
| StrOutputParser()
| (lambda x: x.split("\n"))
)

from langchain.load import dumps, loads

def get_unique_union(documents: list[list]):
""" Unique union of retrieved docs """
# Flatten list of lists, and convert each Document to string
flattened_docs = [dumps(doc) for sublist in documents for doc in sublist]
# Get unique documents
unique_docs = list(set(flattened_docs))
# Return
return [loads(doc) for doc in unique_docs]

# Retrieve
question = "What is task decomposition for LLM agents?"
retrieval_chain = generate_queries | retriever.map() | get_unique_union
docs = retrieval_chain.invoke({"question":question})
len(docs)

from operator import itemgetter
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnablePassthrough

# RAG
template = """Answer the following question based on this context:

{context}

Question: {question}
"""

prompt = ChatPromptTemplate.from_template(template)

llm = ChatOpenAI(temperature=0)

final_rag_chain = (
{"context": retrieval_chain,
"question": itemgetter("question")}
| prompt
| llm
| StrOutputParser()
)

final_rag_chain.invoke({"question":question})

结果融合

​ 上面的过程,我们将多版本检索出来的结果都提交给大模型生成答案了,多版本检索出来的多结果肯定有好有坏,坏的结果一起提交给大模型,肯定也会影响大模型生成答案,那我们要怎么做才能让减少坏结果的影响呢?

​ 在我们搜索的多结果中,我们可以根据文档在每个列表中的排名,通过 RRF 算法为每个文档打分,最终返回一个融合后的排序列表,最后从列表中筛选出前几个我们想要的结果即可。

给定多个文档排名列表(多个检索器、多个查询),RRF 的打分公式如下:

$$
\text{Score}(d) = \sum_{i=1}^{n} \frac{1}{\text{rank}_i + k}
$$

其中:

  • ( \text{rank}_i ):文档 ( d ) 在第 ( i ) 个排序列表中的排名(从 0 开始)
  • ( k ):平滑参数(通常取 60)
  • ( n ):总共融合的列表数量(例如不同查询或不同检索器)

文档出现越多、排名越靠前,其 RRF 得分就越高。

image-20250621160518166

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def reciprocal_rank_fusion(results: list[list], k=60):
""" Reciprocal_rank_fusion that takes multiple lists of ranked documents
and an optional parameter k used in the RRF formula """

# Initialize a dictionary to hold fused scores for each unique document
fused_scores = {}

# Iterate through each list of ranked documents
for docs in results:
# Iterate through each document in the list, with its rank (position in the list)
for rank, doc in enumerate(docs):
# Convert the document to a string format to use as a key (assumes documents can be serialized to JSON)
doc_str = dumps(doc)
# If the document is not yet in the fused_scores dictionary, add it with an initial score of 0
if doc_str not in fused_scores:
fused_scores[doc_str] = 0
# Retrieve the current score of the document, if any
previous_score = fused_scores[doc_str]
# Update the score of the document using the RRF formula: 1 / (rank + k)
fused_scores[doc_str] += 1 / (rank + k)

# Sort the documents based on their fused scores in descending order to get the final reranked results
reranked_results = [
(loads(doc), score)
for doc, score in sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
]

# Return the reranked results as a list of tuples, each containing the document and its fused score
return reranked_results

提问分解

​ 从版本的角度让我们的提示词更丰富了,但是在语义描述方面,能不能提升一下我们的提示词。在一个复杂困难的问题上我们的提示词可能是不够的,毕竟问题的范围比较大,我们的描述范围有限。如果我们缩小问题的范围,变成小问题,我们的提示词是不是就比较丰富了。在我们经验中,那种复杂的问题,比较好的解决办法就是把它拆分成一个一个的小问题,然后依次去解决这些小问题。在这里我们同样可以套用这种解决方式,把复杂问题甩给大模型让他帮我们拆分成一个一个的小问题,然后我们依次去提问解决这些小问题。正对每一个小问题我们的提示词是不是就丰富了

递归回答

​ 怎么去解决这些小问题,如果这些小问题上下有相关性的话,我们可以使用递归递归回答,就是将上一个问题的答案带入到下一个问题中。

image-20250621160518166

​ 如上图,我们将一个复杂问题拆分成3个小问题,然后依次查询生成解决,将每次解决的结果带入到下一次解决中,最后解决这个复杂问题生成一个完整答案输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from langchain.prompts import ChatPromptTemplate

# Decomposition
template = """You are a helpful assistant that generates multiple sub-questions related to an input question. \n
The goal is to break down the input into a set of sub-problems / sub-questions that can be answers in isolation. \n
Generate multiple search queries related to: {question} \n
Output (3 queries):"""
prompt_decomposition = ChatPromptTemplate.from_template(template)
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

# LLM
llm = ChatOpenAI(temperature=0)

# Chain
generate_queries_decomposition = ( prompt_decomposition | llm | StrOutputParser() | (lambda x: x.split("\n")))

# Run
question = "What are the main components of an LLM-powered autonomous agent system?"
questions = generate_queries_decomposition.invoke({"question":question})
# Prompt
template = """Here is the question you need to answer:

\n --- \n {question} \n --- \n

Here is any available background question + answer pairs:

\n --- \n {q_a_pairs} \n --- \n

Here is additional context relevant to the question:

\n --- \n {context} \n --- \n

Use the above context and any background question + answer pairs to answer the question: \n {question}
"""

decomposition_prompt = ChatPromptTemplate.from_template(template)
from operator import itemgetter
from langchain_core.output_parsers import StrOutputParser

def format_qa_pair(question, answer):
"""Format Q and A pair"""

formatted_string = ""
formatted_string += f"Question: {question}\nAnswer: {answer}\n\n"
return formatted_string.strip()

# llm
llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0)

q_a_pairs = ""
for q in questions:

rag_chain = (
{"context": itemgetter("question") | retriever,
"question": itemgetter("question"),
"q_a_pairs": itemgetter("q_a_pairs")}
| decomposition_prompt
| llm
| StrOutputParser())

answer = rag_chain.invoke({"question":q,"q_a_pairs":q_a_pairs})
q_a_pair = format_qa_pair(q,answer)
q_a_pairs = q_a_pairs + "\n---\n"+ q_a_pair
融合回答

​ 如果这些小问题上下文没有相关性或者说没有依赖性,我们又该怎么做呢?可以将每一个小问题单独的去解决生成答案,最后将所有小问题的答案融合生成复杂问题的汇总答案就行了。

image-20250621160518166

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# Answer each sub-question individually 

from langchain import hub
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

# RAG prompt
prompt_rag = hub.pull("rlm/rag-prompt")

def retrieve_and_rag(question,prompt_rag,sub_question_generator_chain):
"""RAG on each sub-question"""

# Use our decomposition /
sub_questions = sub_question_generator_chain.invoke({"question":question})

# Initialize a list to hold RAG chain results
rag_results = []

for sub_question in sub_questions:

# Retrieve documents for each sub-question
retrieved_docs = retriever.get_relevant_documents(sub_question)

# Use retrieved documents and sub-question in RAG chain
answer = (prompt_rag | llm | StrOutputParser()).invoke({"context": retrieved_docs,
"question": sub_question})
rag_results.append(answer)

return rag_results,sub_questions

# Wrap the retrieval and RAG process in a RunnableLambda for integration into a chain
answers, questions = retrieve_and_rag(question, prompt_rag, generate_queries_decomposition)

def format_qa_pairs(questions, answers):
"""Format Q and A pairs"""

formatted_string = ""
for i, (question, answer) in enumerate(zip(questions, answers), start=1):
formatted_string += f"Question {i}: {question}\nAnswer {i}: {answer}\n\n"
return formatted_string.strip()

context = format_qa_pairs(questions, answers)

# Prompt
template = """Here is a set of Q+A pairs:

{context}

Use these to synthesize an answer to the question: {question}
"""

prompt = ChatPromptTemplate.from_template(template)

final_rag_chain = (
prompt
| llm
| StrOutputParser()
)

final_rag_chain.invoke({"context":context,"question":question})

后退重复生成

​ 对于复杂问题,除了上面说的拆分成子问题有没有其他的方式让我们的提示词变得更丰富了呢?答案肯定是有的,就是后退重复生成。啥意思呢,就是说,我们第一次的提示词不丰富,那我们让第一次生成的结果加到提示词中,这时候我们的提示词是不是更丰富了,在用这个丰富的提示词再去生成,是不是结果更好了呢,如果不够,我们再将生成的结果加入提示词再生成一次呢,一直到满意为止。

image-20250621160518166

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# Few Shot Examples
from langchain_core.prompts import ChatPromptTemplate, FewShotChatMessagePromptTemplate
examples = [
{
"input": "Could the members of The Police perform lawful arrests?",
"output": "what can the members of The Police do?",
},
{
"input": "Jan Sindel’s was born in what country?",
"output": "what is Jan Sindel’s personal history?",
},
]
# We now transform these to example messages
example_prompt = ChatPromptTemplate.from_messages(
[
("human", "{input}"),
("ai", "{output}"),
]
)
few_shot_prompt = FewShotChatMessagePromptTemplate(
example_prompt=example_prompt,
examples=examples,
)
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"""You are an expert at world knowledge. Your task is to step back and paraphrase a question to a more generic step-back question, which is easier to answer. Here are a few examples:""",
),
# Few shot examples
few_shot_prompt,
# New question
("user", "{question}"),
]
)
generate_queries_step_back = prompt | ChatOpenAI(temperature=0) | StrOutputParser()
question = "What is task decomposition for LLM agents?"
generate_queries_step_back.invoke({"question": question})
# Response prompt
response_prompt_template = """You are an expert of world knowledge. I am going to ask you a question. Your response should be comprehensive and not contradicted with the following context if they are relevant. Otherwise, ignore them if they are not relevant.

# {normal_context}
# {step_back_context}

# Original Question: {question}
# Answer:"""
response_prompt = ChatPromptTemplate.from_template(response_prompt_template)

chain = (
{
# Retrieve context using the normal question
"normal_context": RunnableLambda(lambda x: x["question"]) | retriever,
# Retrieve context using the step-back question
"step_back_context": generate_queries_step_back | retriever,
# Pass on the question
"question": lambda x: x["question"],
}
| response_prompt
| ChatOpenAI(temperature=0)
| StrOutputParser()
)

chain.invoke({"question": question})

假想文档

​ 如果我们的向量数据库中,没有我们提问的所需要的文档和相关知识怎么办?这时候从向量库的检索是没办法帮我们丰富提示词的,但是我们让llm帮我们生成一个假想文档,这样我们就可以根据这个文档获得跟丰富的提示词从而生成出更丰满的答案了。

image-20250621160518166

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from langchain.prompts import ChatPromptTemplate

# HyDE document genration
template = """Please write a scientific paper passage to answer the question
Question: {question}
Passage:"""
prompt_hyde = ChatPromptTemplate.from_template(template)

from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI

generate_docs_for_retrieval = (
prompt_hyde | ChatOpenAI(temperature=0) | StrOutputParser()
)

# Run
question = "What is task decomposition for LLM agents?"
generate_docs_for_retrieval.invoke({"question":question})
# Retrieve
retrieval_chain = generate_docs_for_retrieval | retriever
retireved_docs = retrieval_chain.invoke({"question":question})
retireved_docs
# RAG
template = """Answer the following question based on this context:

{context}

Question: {question}
"""

prompt = ChatPromptTemplate.from_template(template)

final_rag_chain = (
prompt
| llm
| StrOutputParser()
)

final_rag_chain.invoke({"context":retireved_docs,"question":question})

简述

25年第一次面试,电话面试,这次面试没有准备好,还没来得及复习。

自我介绍

19年毕业,5年多工作经验,目前正在优美互动这边工作。

定位:高开,之前带领三四个人,目前因为团队规模,各自一摊。

目前团队业务是做一些小应用投放海外市场,赚取一些收益。进入团队快一年的时间,参与了文生图和人名查询等项目。

之前华为团队也工作了三年左右的时间,业务是以洞察、评估、规划和收益四个部分向运营商提供数字化机会点发现。自己也参与了全球数据沙盘项目,到路网算法开始参与项目,然后开始OTN to 楼宇参与业务拓展,到最后的模型收编和原子能力编排重构项目。

业务爱好:打篮球,跑步,看看书,有时间也会写写博客。

项目经验

知识库

文生图

原子能力

redis

什么情况下redis的查询会很慢

两种情况,传输的时候很慢,redis处理的时候很慢

1、传输的时候很慢:网速不好,网络服务器部署太远。

2、redis处理很慢:

​ 1、存在bigkey,hotkey,占用了带宽和处理速度

​ 2、存在复杂查询,复杂度在O(n)

redis中的大key怎么删除

1、分隔:将一个bigkey分隔成多个小key,例如将一个含有上万字段的hash按照一定的策略拆分成多个hash

2、手动清理:redis4.0+可以使用 UNLINK 命令来异步删除一个或者多个指定的key,redis4.0以下可以考虑使用SCAN命令结合DEL命令来份批次删除。

3、采用合适的数据结构:例如二进制数据不使用String保存,使用HyperLogLog统计页面UV

4、开启lazy-free(惰性删除/延迟释放):lazy-free特性式redis4.0开始引进的,指的是让redis采用异步的方式延迟释放key使用的内存,将该操作交给单独的子线程处理,避免阻塞主线程。

数据库

怎么优化sql

万变不离其宗,所有查询的优化都是基于底层存储结构。

​ 1、innerbd是以B+树的形式保存的,根据索引和id构建了B+树,所以我们在查询的时候要注意使用索引,这样可以更快的找到数据。但是在使用的时候要注意最左匹配原则和隐式计算。

​ 2、索引B+数据的叶子结点保存的只是数据的id并不是数据的所有值,所以即使根据索引找到了数据的叶子结点,如果叶子结点的字段不够查询需要,还要根据id去找具体数据,这个过程叫做回表,但是如果叶子结点包含的字段值够我们查询需要,就不需要去查询具体数据,减少了回表这个过程。

​ 3、查看执行计划,根据执行计划优化sql。

执行计划怎么查看

image-20250309103934302

https://blog.csdn.net/tannins_/article/details/140007565

比较重要的字段:

select_type:查询类型 simple 简单语句, primary 复杂查询, union 联合查询 等等

table:对应的表

type:索引类型,all < index < range < eq_ref < const < system

possible_key:分析时可能用到的索引

key:实际使用的索引

key_len:使用到索引的长度

ref:哪些列或者常量被用于查找索引列上的值

row:估算查找到满足条件的记录所需要的读取的行数

extra:执行计划的其他信息

线程

线程池的参数

两个线程数:核心线程数和最大线程数

两个停留时间相关:停留时间值和时间单位

三个过程相关:产生的工厂方法,排队的队列,拒绝的策略

线程池参数中的排序队列的注意点

队列的选择

​ 一般分为:直接提交队列,有界任务队列,无界任务队列,优先任务队列,延迟队列。

​ SynchronousQueue:直接提交队列,此队列不存储任何元素,直接将每个插入操作必须等待一个对应的移除操作,反之依然。通常用于创建无界线程池,最大线程数实际取决于系统或JVM的限制

​ ArrayBlockingQueue:有界队列,此队列是一个基于数据结构的有界阻塞队列,新元素插入队列时,如果队列已满,则插入线程会被阻塞直到队列中有空间可用。使用有界队列,可以更好的控制资源的使用防止资源耗尽。

​ LinkedBlockingQueue:此队列按照先进先出的排序规则对元素进行排序,且队列的容量Integer.MAX_VALUE,使用无界队列时,线程池的大小将只受到corePoolSize的约束,因为任务可以在队列中无限期的执行等待,然而,这可能导致资源耗尽。

​ PriorityBlockingQueue:优先队列,此队列根据元素的优先级进行排序,优先级高的元素先被移除,对于按优先级执行任务的应用场景非常有用。

​ DelayQueue:延迟队列,此队列中的元素只有当起指定延迟时间到了才能够从队列中获取到该元素

注意控制队列的长度,避免造成内存溢出

线程池中公平锁的线程是怎么唤醒的

理解有点问题,公平锁是指一个锁,而线程池是一个池,线程池中的排序队列是可以有多种队列,但是队列的本质还是从头部获取第一个元素,所以线程池还是总是从队列中获取第一个元素,只是可能有排序方式的不一样,导致不一定是先入先出。

jvm

cpu飙升怎么排查

1、使用top命令查看系统cpu占用情况,找出占用最高的进程pid

2、使用命令查找进程内占用cpu最高的线程id:ps H -eo pid,tid,%cpu | grep 最高的进程pid

3、根绝线程id查找堆栈信息,注意要将线程id转化成16进制

4、在根据堆栈信息定位问题

简介

首先我们得明白一下概念,dify、ollama、deepseek都是什么?

dify:一个平台,一个简单易用开源的LLMOps(Language Model Operations)平台,我们可以在上面可视化的开发编排AI应用。

ollama:一个开源的大模型服务工具,帮助用户快速在本地运行大模型。

deepseek:国产的大模型。

所以我们要在dify编排大模型应用,就需要有一个大模型,而ollama刚好可以帮我部署运行大模型。所以ollama在这里更像是dify平台和deepseek大模型之间的连接工具。

ollama安装

可以在ollama官网直接下载安装https://ollama.com/。

linux安装命令:curl -fsSL https://ollama.com/install.sh | sh

安装之后可以通过 ollama -v 命令查看是否安装好。

image-20250320165712115

ollama的默认端口是11434,安装完可以通过浏览器打开ip和port界面,可以看到界面提示 ollama is running

image-20250320172632758

有默认端口,就可以改端口,可以通过修改启动文件,从而制定端口

vim /etc/systemd/system/ollama.service

可以看到这里我指定了9876端口

image-20250320172913316

deepseek安装

直接运行ollama run deepseek-r1:7b命令按转,当然也可以安装其他版本或者其他厂商的大模型。

image-20250320170141452

dify安装

这里我是通过docker安装的,docker的安装过程就详细展示,可自行搜索教程安装docker。

dify的安装命令:

1
2
3
4
git clone https://github.com/langgenius/dify.git
cd dify/docker
cp .env.example .env
docker compose up -d

其中dify默认的端口是80,我们要想修改端口的话,可以在 .env 环境配置中修改端口号。可以看到,这里我改成了8080。改完重启dify

image-20250320170830107

dify连接deepseek

通过浏览器打开 http://ip:port 界面 ip为自己主机ip,port为上面自己设置的port。在一系列账号密码注册之后到主页面。在主页面的个人账号下设置里配置大模型。image-20250320171734695

在setting里面,找到模型厂商(Model Provider),在下面的 to be configured 的配置中找到ollama的配置,安装好插件配置,成功安装后如图。

image-20250320172145700

在右下角中的Add model中添加模型。

image-20250320173110641

model name:一定要注意,写的是具体部署的模型型号,我们部署的deepseek-r1:7b,那这里也要填写deepseek-r1:7b,开始我以为可以随意命名,结果搞了半天都没连接上。

base url:填写上面ollama可以在浏览器打开的地址。

连接完就可以使用了,后面再分享怎么构建知识库怎么使用。

简述

实现备份,将源minio数据备份到其他minio上。

先决条件

1、minio版本相同,最好相同,如果版本不同可能会发生未知错误。

2、站点复制需要开启桶级别版本控制(Bucket Versioning),并且会自动开启已创建的Bucket的版本控制。不可以关闭站点复制的MinIO部署上的版本控制功能。

image-20241011222201154

3、源minio中有Bucket,并且有数据。目标minio中没有任何Bucket和Object。

4、源minio和目标minio最好有相同的IDP,IDP可在页面设置,且设置足够的权限。

image-20241011222424095

操作设置

页面上直接添加备份minio,设置好url和账号密码即可。

image-20241011222607409

注意如果已经有备份站点了,需要再添加一个的话,需要保留原有站点,在peer sites栏下面点击加号按钮新加一个,如下图。

image-20241013111252255

安装部署指南

开发过程均使用的Ubuntu,下面说明指南均基于Ubuntu使用。

中间件安装

需要安装 redis,mysql,minio,rabbitmq 自行搜索,服务器安装即可。

redis参考链接:https://cloud.tencent.com/developer/article/1639658

mysql参考链接:https://blog.csdn.net/xz2005/article/details/130145465

minio参考链接:https://www.cnblogs.com/hunttown/p/17358797.html

rabbitmq参考链接:https://www.cnblogs.com/hunttown/p/17352729.html

安装验证:redis和mysql可用本地连接工具可连接使用,minio和rabbitmq均有可视化界面,可在可视化界面登陆打开。

注意:注意服务器的端口有没有打开

服务器43.154.80.35中已有安装好的中间件,可直接使用,账号密码,代码中的setting.py文件中都有,如果要修改成其他地址中间件,可直接在replace.py中修改相应字段,运行统一修改。

注:现在settings.py文件设置的是服务器43.154.80.35的内网ip,服务器账号密码和各中间件账号密码私信发送。

网络环境安装

服务器要求,

外网,ubuntu

安装软件

privoxy,tor,obfs4proxy

graph LR

a[代码]--8118端口-->b[privoxy]--9050端口-->c[tor]--网桥/obfs4proxy-->d[暗网]
代码

解压压缩包,网站目录下的setting文件中配置了代理端口,将请求发送到本级的8118端口

image-20240828231130808

privoxy

1、安装:sudo apt-get install privoxy

2、修改配置文件:

​ 进入配置文件 vim /etc/privoxy/config

​ 修改接收代理:listen-address 127.0.0.1:8118

​ 修改转发数据:最后一行添加 forward-socks5t / 127.0.0.1:9050 . (注意最后的点不能丢)

image-20240828232233501

image-20240828232433118

​ 配置完重启使配置生效

tor

1、安装tor:sudo apt-get install tor

2、安装obfs4proxy: sudo apt-get install obfs4proxy

3、tor配置文件中添加网桥,vi /etc/tor/torrc,进行如图配置

​ 查看tor 9050端口是否打开,若没有打开,去掉注释打开端口

image-20241003092814823

​ 配置obfs4proxy

​ 配置网桥,地址可用 obfs4 51.178.86.168:54874 773A7F4428AE519A892152EDA963477D85EE672A cert=xghcVpPhAAktkvVpYY6LDsE5iVayo4ADztSEwj0YcqGERxr3+v+RqScaOCC1O/uxeZinWA iat-mode=0,建议多申请几个做备用,网桥也会过期不可用

image-20240828232652882

​ 配置完重启使配置生效

验证网络通路

检测privoxy端口使用:netstat -an | grep 8118

检测tor端口使用:netstat -an | grep 9050

检测整个网络通路:curl -x http://127.0.0.1:8118 http://u5lyidiw4lpkonoctpqzxgyk6xop7w7w3oho4dzzsi272rwnjhyx7ayd.onion/

通路截图:

image-20241003091320866

驱动安装

chrome和chromedriver安装

可借鉴:https://blog.csdn.net/weixin_44184990/article/details/123590435

Google-chrome:版本- Google Chrome 127.0.6533.88

Chromedriver:版本- ChromeDriver 127.0.6533.88

安装路径:/usr/bin/

虚拟环境安装

python环境:安装python,可参考https://blog.csdn.net/qq_45536969/article/details/130124934

虚拟环境:

​ 安装虚拟环境:安装可参考https://blog.csdn.net/m0_64880493/article/details/132964831

​ 安装依赖:安装requirements.txt文件中的所有依赖 pip install -r requirements.txt

代码运行

以下均以43.154.148.210服务器为例

1、打开虚拟环境,虚拟环境在 /opt/dwSpiders/dw-env/bin/下,命令:source /opt/dwSpiders/dw-env/bin/activate,各自安装的目录地址不一样,需要修改命中的路径,

2、进入网站目录,以torrez网站为例,命令:cd /opt/dwSpiders/torrez

3、添加修改cookie,43.154.80.35服务器上的redis中有示例,使用tor登陆后可按示例将cookie填写到自己的redis中(注:leakbase网站没有验证码,登陆逻辑已经写好,直接第4步运行即可)

image-20240902223652351

4、启动代码:python3 run_torrez.py &

5、查看日志:如果当前窗口运行代码,则会自动弹出日志,如果不是,则进入网站目录下的logs目录,查看当前日期日志文件。

6、cookie过期:网站的cookie会过期,过期后直接在redis库中替换就行

7、如需要重跑,需要删除redis相应网站下dupefilter和bloom_fliter_img,这是url请求的指纹和图片的请求指纹,去重使用。

8、mq的使用,在每个网站目录下都加了mq的配置,如要使用,在setting文件中打开即可

image-20240902523652351

9、不建议使用start.sh统一启动,可进入各自网站目录下单独启动运行,根据不同的服务器目录,start.sh需要配置不同的虚拟环境目录和代码目录

中间件初始化

mysql

MySQL的初始化比较简单,在安装好的MySQL中安装 dw-spider 库,库中需要新建5张表(goods、original_page、post、site、user),如图。建表语句代码db目录下有。

image-20240902223652352

minio

minio的初始化也比较简单,在其中新建dw-bucket,如图。

image-20240902223652353

redis

redis中需要预置8个网站的cookie:Onniforums、asap、breachforums、darkdock、dread、mgmgrand、nexus、torrez

redis的key为 网站名称:网站名称_cookie,示例:asap:asap_cookie

redis的value为 hash类型的键值对,如图

Onniforums:image-20240902223652354

asap:image-20240902223652355

breachforums:image-20240902223652356

darkdock:image-20240902223652357

dread:image-20240902223652358

mgmgrand:image-20240902223652359

nexus:image-20240902223652360

torrez:image-20240902223652361

下图为代码运行后的redis截图,其中截了torrez网站和单独的bloom_filter_img。

bloom_filter_img:为所有图片请求过滤所用,内存放所有已请求的图片地址,其内存放的图片地址不会二次请求,如果重跑需删除,代码运行后会自行添加。

dupefilter:为站内地址请求过滤所有,内存放所有已请求的地址hash值,其内已存放的地址不会二次请求,如果重跑需要删除,代码运行后会自动添加。

start_urls:为代码运行时,起始请求的地址,重跑时需要删除,运行后会自动添加,起始地址请求后会自动删除。

torrez_cookie:为网站cookie,运行时需要添加,且保证cookie在有效期内。

image-20240928172856612

rabbitmq

需要新建5个队列:goods、page、post、site、user。

可在可视化界面新建,如图。

image-20240902223652361

其初始交换机为scrapy,无需配置,代码运行时会自动配置。

注:每个队列都在每个网站的mqPipeline.py文件中绑定,不建议修改。

中间件配置修改

代码中有replace.py文件,运行次文件可统一修改当前文件夹下的setting.py文件中的中间件配置。replcae.py中的old_str字段为当前中间件配置,new_str字段为需要修改后的中间件配置,代码运行将setting文件中的old_str配置修改成new_str配置。

各级缓存

spring中有三级缓存:一级二级三级。三个都是存储bean相关的内容,但是具体存储的内容有所差异。

一级缓存 二级缓存 三级缓存
名称 singletonObjects earlySingletonObjects singletonFactories
类型 ConcurrentHashMap HashMap HashMap
定义 定义是在DefaultSingletonBeanRegistry类中 定义是在DefaultSingletonBeanRegistry类中 定义是在DefaultSingletonBeanRegistry类中
缓存内容 存放就绪状态的Bean 早期曝光的Bean 创建用于获取Bean的工厂类-ObjectFactory实例

image-20240427123420433

一级缓存:就绪状态的bean,保存在该缓存中的Bean所实现Aware子接口的方法已经回调完毕,自定义初始化方法已经执行完毕,也经过BeanPostProcessor实现类的postProcessorBeforeInitialization、postProcessorAfterInitialization方法处理。

二级缓存:早期曝光的bean,一般只有处于循环引用状态的bean才会被保存在该缓存中,所实现的Aware子接口方法还未回调,自定义初始化方法还未执行。

三级缓存:存放获取bean的工厂类-ObjectFactory实例,在IoC容器中,所有刚被创建出来的bean,默认都会保存到该缓存中。

解决循环依赖问题

什么是循环依赖:

1
2
3
4
5
6
7
8
9
10
11
@Service
public class CircularServiceA {
@Autowired
private CircularServiceB circularServiceB;

}
@Service
public class CircularServiceB {
@Autowired
private CircularServiceA circularServiceA;
}

在 A 和 B 循环依赖的场景中:

B populatedBean 查找依赖项 A 的时候,从一级缓存中虽然未获取到 A,但是发现 A 在创建中。

此时,从三级缓存中获取 A 的 singletonFactory 调用工厂方法,创建 getEarlyBeanReference A 的早期引用并返回。

B 引用到 A ,B 就可以初始化完毕,然后 A 同样也可以初始化完毕了。

image-20240427154008953

AOP代理

说是二级缓存其实已经能解决循环依赖的问题,但是为什么需要三级缓存呢。说是解决AOP代理的问题。

如果只有二级缓存,虽然能解决循环依赖,但是在查询二级缓存中返回的实例,而不是我们需要的代理对象,所以添加了三级缓存,在三级对象中获取实例的时候,会经过判断,如果是代理对象则返回代理对象,如果是实例就返回实例。

其实具体的我说的也不是很清楚,可以参考连接:https://zhuanlan.zhihu.com/p/377878056

简介

现在大模型大行其道,我们普通人没有那么多资源,有没有办法搭建一个大模型玩一下呢,答案肯定是有的,我们可以搭建一个知识库,用一下大模型。

总所周知,大模型的训练是需要大量资源的,我们没有这么多资源,那我们就得想办法绕过训练或者减少训练。这时候知识库就是一个比较好的选择,它不需要对大模型进行大量的训练,大模型只是帮我们生成一个类人话的答案。

架构

目前比较用的比较多的就是LangChain框架,这是一种基于Langchain 与 ChatGLM 等大语言模型的本地知识库问答应用实现。

可以从上面的原理图看出知识库的整个实现原理。

1、先加载文件,文件可以是结构化的也可以是非结构化的

2、读取文本,这就很好理解了,将加载进来的文件读取

3、分割文本,将读取的文本分割成一段一段的,便于提取其中的关键字和让内容更内敛

4、向量化,将分割后的文本向量化,

5、存储,将向量化后的文本存入向量数据库中,当然可能也有一些结构型数据,直接存入关系型数据库中

6、问句向量化,将我们提问的问句给向量化,理论上和上面的文本向量化一样,问句也是一个文本

7、向量化匹配,通过问句的向量去向量库里面匹配文本,可能会匹配出多个,我们只取top n

8、生成prompt,将匹配到的文本作为上下文和问题一起生成prompt

9、将prompt提交给LLM生成回答

上面就是知识库的原理和流程。可以看到,其实大模型在这里只是扮演一个类人化回答生成的作用,回答中的知识点其实都是在向量化匹配的过程中匹配出来的。所以这里不需要大模型做大量的训练。

LangChain只是做了一个框架,上面的原理和流程也只是一个大纲,具体的细节我们还是有很多可操控空间的。

比如说文本分割,框架提供的默认分割方法是分割到什么程度,我们需要的又是分割到什么程度,如果框架提供的分割粒度比较答,一篇文章分割之后,分割成了几个大段,我们后面匹配到了,一起放到大模型中生成回答也是一个巨大的计算量或者有些大模型都不支持,文本大了,相当于参数就多了,那生成回答的计算量就变大了,甚至有些参数都支持不了这么多参数。

在比如说,在向量匹配的过程中,我们只能使用框架提供的匹配方法嘛,当然不是,毕竟框架不可能面面俱到,我们可能要做一些权重微调,或者换一种算法,再或者有的时候需要我们去关系型数据库中匹配。

最后,既然大模型只是一个生成类人回答的作用,那我们是不是就可以把LLM就是一个接口,其中具体是什么大模型,我们不是特别关注,毕竟只要能给我们根据知识点生成一段通顺的回答就行,至于是LangChain还是chatGLM,又或者是阿里清华的大模型,我们都不是特别关注。

0%