データフロー
深度: [MEDIUM] / 確信度: [VERIFIED] 最終更新: 2026-02-16(Phase 1 セッション2)
本ドキュメントはLMCacheのKVキャッシュ store / retrieve パスを追跡する。
Store パス概要
vLLMのattentionレイヤー実行中に、KVキャッシュをGPUからCPU(および後段ストレージ)に退避するパス。
レイヤーワイズ方式(use_layerwise=True)が主要パスであり、各attentionレイヤーの実行直後にそのレイヤーのKVデータを転送する。
sequenceDiagram
participant vLLM as vLLM Attention Layer
participant Adapter as V1Impl (adapter)
participant Engine as LMCacheEngine
participant TDB as TokenDatabase
participant GPU as GPUConnector
participant SM as StorageManager
participant CPU as LocalCPUBackend
Note over vLLM,Adapter: Layer 0 開始
vLLM->>Adapter: save_kv_layer(layer_name, kv_layer, attn_metadata)
Note over Adapter: layer==0 のとき、各リクエストに対して Generator 生成
Adapter->>Engine: store_layer(token_ids, mask, kvcaches, slot_mapping, ...)
activate Engine
Engine->>TDB: process_tokens(tokens, mask)
TDB-->>Engine: [(start, end, CacheEngineKey), ...]
Engine->>SM: batched_allocate(shape, dtype, batch_size=num_layers)
SM-->>Engine: List[MemoryObj] × num_layers
Engine->>GPU: batched_from_gpu(memory_objs, starts, ends, ...)
Note over GPU: GPU Generator 初期化
GPU-->>Engine: Generator (primed)
Note over Engine: yield (Layer 0 の DMA 準備完了)
deactivate Engine
Adapter->>Engine: next(generator) — Layer 0
activate Engine
Engine->>GPU: next(mem_obj_generator)
Note over GPU: lmc_ops.single_layer_kv_transfer<br/>(paged GPU → buffer → pinned CPU)
GPU-->>Engine: yield
Engine->>SM: batched_put(keys[0], memory_objs[0])
SM->>CPU: batched_submit_put_task(keys, objs)
Note over CPU: hot_cache[key] = memory_obj
Note over Engine: yield (Layer 0 完了)
deactivate Engine
Note over vLLM,Adapter: Layer 1 以降も同じパターン繰り返し
vLLM->>Adapter: save_kv_layer(...)
Adapter->>Engine: next(generator) — Layer N
Note over Engine: GPU転送 → StorageManager.batched_put
各コンポーネントの役割
1. LMCacheConnectorV1Impl(adapter)
参照: target/LMCache/lmcache/integration/vllm/vllm_v1_adapter.py:964 (save_kv_layer)
vLLMのKVConnectorBase_V1.save_kv_layer()フックから呼ばれるアダプタ。
LMCacheConnectorV1Dynamicは純粋な委譲シェルであり、実装はV1Implに集約。
Layer 0での処理:
connector_metadata.requestsを走査し、save_spec.can_saveがTrueのリクエストを処理skip_leading_tokensをLMCacheのchunk_size(256)の倍数に切り下げてマスク境界を整合store_maskを構築:プレフィックス部分=False、新規部分=TrueLMCacheEngine.store_layer()を呼んでGeneratorを取得、self.layerwise_storersに追加- 最初のリクエストのみ
sync=TrueでCUDAストリームを同期
全レイヤー共通: self.layerwise_storers内の全Generatorをnext()で1ステップ進める。
2. LMCacheEngine.store_layer()
参照: target/LMCache/lmcache/v1/cache_engine.py:528
Generator関数であり、呼び出し側(adapter)が1レイヤーごとにnext()で進める。
初期化フェーズ(最初のyieldまで):
TokenDatabase.process_tokens()でトークン列をチャンク分割し、各チャンクのCacheEngineKeyを取得StorageManager.contains()で既存チャンクをスキップ(layer 0のキーで判定)StorageManager.batched_allocate()で各チャンク×全レイヤー分のMemoryObjを確保- チャンク×レイヤー → レイヤー×チャンクに転置
GPUConnector.batched_from_gpu()でGPU転送Generatorを生成・prime
レイヤーループ(num_layers回yield):
yield → next(mem_obj_generator) → batched_put(keys[layer_id], memory_objs[layer_id])
各レイヤーで「GPU→CPU DMA」→「ストレージ書き込み」を実行。
重要: メモリ確保失敗時(batched_allocateがNone)はbreakで即座にstore中止。yieldだけ行ってストレージには書かない。
3. ChunkedTokenDatabase.process_tokens()
参照: target/LMCache/lmcache/v1/token_database.py:309
トークン列をチャンク(デフォルト256トークン)に分割し、プレフィックスチェーンハッシュを計算。
ハッシュアルゴリズム: vLLMと完全に同一。
vllm.utils.hashing.get_hash_fn_by_name("sha256_cbor")を直接利用- NONE_HASHも
vllm.v1.core.kv_cache_utils.NONE_HASHから取得 - ハッシュ入力:
(prefix_hash, token_tuple, extra_keys)
マスク処理: maskのFalse数(=already-cached prefix長)がchunk_sizeの倍数であることを検証。False区間のチャンクはスキップ。
CacheEngineKey生成: _make_key_by_hash()で(model_name, world_size, worker_id, chunk_hash, kv_dtype, request_configs)の6タプルを構築。その後split_layers()でレイヤーIDを付与したLayerCacheEngineKeyに分割。
4. VLLMPagedMemLayerwiseGPUConnector.batched_from_gpu()
参照: target/LMCache/lmcache/v1/gpu_connector/gpu_connectors.py:1212
GPU上のページドKVキャッシュからCPU上のMemoryObjにデータを転送するGenerator関数。
2段転送パス(use_gpu=True時):
- Paged GPU → 中間GPUバッファ:
lmc_ops.single_layer_kv_transfer()(CUDAカーネル)でslot_mappingに基づきscatter→gatherコピー - GPUバッファ → Pinned CPU:
memory_obj.tensor.copy_(..., non_blocking=True)で非同期DMA
直接転送パス(use_gpu=False時):
lmc_ops.single_layer_kv_transfer()でpaged GPUから直接pinned CPUへ(チャンク単位)
CUDAストリーム: self.store_stream(専用ストリーム)を使用し、計算ストリームとオーバーラップ可能。sync=Trueの場合のみstore_stream.synchronize()で同期。
出力形式: MemoryFormat.KV_T2D = [num_tokens, 2, hidden_dim](token-major、K/Vインターリーブ)。MLAの場合はKV_MLA_FMT = [num_tokens, hidden_dim]。
5. StorageManager.batched_put()
参照: target/LMCache/lmcache/v1/storage_backend/storage_manager.py:388
登録された全ストレージバックエンドにデータを配布するディスパッチャ。
処理フロー:
allocator_backend(通常LocalCPUBackend)の元データをそのまま利用OrderedDict順に全バックエンド(L1→L2→L3)を走査- 異なるallocatorを持つバックエンドには
allocate_and_copy_objects()で新たにメモリ確保+コピー - 各バックエンドの
batched_submit_put_task()を呼び出し - 最後にref_countをデクリメント
注意: put()メソッドは非推奨(RuntimeErrorを投げる)。batched_put()が唯一のエントリポイント。
6. LocalCPUBackend.submit_put_task()
参照: target/LMCache/lmcache/v1/storage_backend/local_cpu_backend.py:141
同期実行(バックグラウンドスレッドなし)。cpu_lock下で以下を実行:
- 既存キーの重複チェック
memory_obj.ref_count_up()でrefcount増加hot_cache[key] = memory_objで保存cache_policy.update_on_put(key)でEvictionポリシー更新(LRU: OrderedDictの末尾に移動、等)- 必要に応じてcontrollerへADMITメッセージ送信(
batched_msg_sender経由)
パイプライン動作の詳細
store_layerとbatched_from_gpuは2つの入れ子Generatorでパイプライン動作する:
store_layer Generator: [初期化] → yield → [L0転送+保存] → yield → [L1転送+保存] → yield → ...
batched_from_gpu Generator: [初期化] → yield → [L0 DMA] → yield → [L1 DMA] → yield → ...
タイミング(num_layers=Nの場合):
store_layerはN+1回yield(初期化1回 + レイヤーN回)batched_from_gpuはN+1回yield(初期化prime + レイヤーN回)- adapterは合計
N回next()を呼ぶ(各attentionレイヤー実行後)
パイプラインのステップ: Layer Lのnext()呼び出しで、batched_from_gpuがLayer LのDMAを実行し、store_layerがLayer LのStorageManager書き込みを行う。つまりDMAとストレージ書き込みは同一レイヤーで連続実行される。
データ構造
| 構造 | 型 | 説明 |
|---|---|---|
| CacheEngineKey | (model_name, world_size, worker_id, chunk_hash, kv_dtype, request_configs) | チャンク単位のキー(レイヤー横断) |
| LayerCacheEngineKey | CacheEngineKey + layer_id | レイヤー単位のキー |
| MemoryObj | pinned CPU tensor wrapper | ref_count管理、MemoryObjMetadata付き |
| MemoryFormat.KV_T2D | [num_tokens, 2, hidden_dim] | レイヤーワイズ形式(token-major) |
| MemoryFormat.KV_MLA_FMT | [num_tokens, hidden_dim] | MLA形式(K/V統合) |
| store_mask | torch.Tensor[bool] | False=キャッシュ済みprefix、True=新規トークン |
| slot_mapping | torch.Tensor[long] | トークン位置→vLLMページドメモリのflat slot |
| hot_cache | OrderedDict[CacheEngineKey, MemoryObj] | L1 CPUキャッシュ(Evictionポリシー付き) |
Retrieve パス概要
KVキャッシュをストレージ(CPU/Disk/Remote)からGPUのvLLMページドメモリに復元するパス。 2フェーズ設計: Scheduler側のlookup(ヒット判定+prefetch指示)と、Worker側のload(実際のGPU転送)に分離。
Scheduler→Worker間の情報伝達
sequenceDiagram
participant Sched as V1Impl (Scheduler側)
participant LC as LookupClient
participant Worker as V1Impl (Worker側)
participant Engine as LMCacheEngine
Note over Sched: vLLM Scheduler.schedule() から呼ばれる
Sched->>LC: lookup(token_ids, req_id)
LC-->>Sched: num_external_hit_tokens
Note over Sched: LoadSpec(vllm_cached, lmcache_cached) を生成
Sched->>Sched: update_state_after_alloc() → can_load=True
Sched->>Sched: build_connector_meta() → ReqMeta(load_spec) を構築
Note over Sched: ConnectorMetadata を SchedulerOutput に添付
Note over Worker: Forward開始時
Worker->>Engine: start_load_kv(forward_context)
Note over Engine: Bulk or Layerwise retrieve 実行
LookupClient の動作
参照: target/LMCache/lmcache/v1/lookup_client/lmcache_lookup_client.py:28
LMCacheLookupClientはvLLMのSchedulerプロセスで動作する。LMCacheEngine(Worker側)とはZMQ IPC(REQ/REP)で通信。
処理フロー:
process_tokens()でトークン列をチャンクハッシュに分割- ハッシュ列をmsgpackシリアライズし、ZMQで
LookupServerに送信 LookupServer(Worker側)がStorageManager.contains()で存在チェック- ヒットトークン数を返却
キャッシュ: 同一リクエストの2回目以降のlookupはreqs_status辞書から即座に返す。
Retrieve パスの2モード
| モード | 条件 | エントリポイント | 特徴 |
|---|---|---|---|
| Bulk | use_layerwise=False(デフォルト) | LMCacheEngine.retrieve() | 全レイヤー一括取得→一括GPU転送 |
| Layerwise | use_layerwise=True | LMCacheEngine.retrieve_layer() | レイヤー単位Generator、パイプライン可能 |
Bulk Retrieve パス
sequenceDiagram
participant Adapter as V1Impl (Worker側)
participant Engine as LMCacheEngine
participant TDB as TokenDatabase
participant SM as StorageManager
participant CPU as LocalCPUBackend
participant GPU as GPUConnector (V2)
Adapter->>Engine: retrieve(tokens, mask, kvcaches, slot_mapping, ...)
activate Engine
Engine->>TDB: process_tokens(tokens, mask)
TDB-->>Engine: [(start, end, CacheEngineKey), ...]
alt async_loading == True
Note over Engine: event_managerからprefetch済みMemoryObjを取得
Engine->>Engine: _async_process_tokens_internal()
else sync loading
Engine->>SM: get_block_mapping(chunk_infos)
SM-->>Engine: {backend_name: [(key, start, end)]}
Engine->>SM: batched_get(keys, location)
SM->>CPU: batched_get_blocking(keys)
CPU-->>SM: List[MemoryObj](ref_count_up済み)
SM-->>Engine: List[MemoryObj]
end
Engine->>GPU: batched_to_gpu(memory_objs, starts, ends, slot_mapping=...)
Note over GPU: load_stream上で全チャンクをGPU転送
GPU->>GPU: lmc_ops.multi_layer_kv_transfer(memobj→paged KV)
GPU-->>Engine: 完了
Note over Engine: memory_obj.ref_count_down() で解放
Engine-->>Adapter: ret_mask (bool tensor)
deactivate Engine
_process_tokens_internal() の詳細
参照: target/LMCache/lmcache/v1/cache_engine.py:1527
process_tokens()でチャンク分割・ハッシュ計算StorageManager.get_block_mapping()でチャンクの所在バックエンドを特定- 各バックエンドの
batched_contains()をprefix match方式で呼び出し - チャンクを所在バックエンドごとにグルーピング
- 各バックエンドの
- バックエンドごとに
batched_get()でMemoryObjを取得 - 取得失敗時は
last_failed_block_start以降のret_maskをFalseに戻し、チャンクリストを切り詰め
_async_process_tokens_internal() の詳細
参照: target/LMCache/lmcache/v1/cache_engine.py:1463
非同期プリフェッチ済みの結果をevent_managerから取得するパス。
event_manager.pop_event(EventType.LOADING, req_id)でprefetch結果のFutureを取得future.result()でlist[list[tuple[CacheEngineKey, MemoryObj]]](tier×chunk)を取得process_tokens()で再度チャンク分割し、memory_obj_mapからマッチングしてチャンクリストを構築- 未使用の
MemoryObjはref_count_down()で即座に解放
StorageManager.batched_get() のwrite-back
参照: target/LMCache/lmcache/v1/storage_backend/storage_manager.py:484
リモートバックエンド(Disk/Remote)からデータを取得した場合、自動的にLocalCPUBackendにwrite-backする。
LocalCPUBackend以外から取得 &&LocalCPUBackendが存在 && 全MemoryObjがnon-None →batched_submit_put_task()でL1にコピー
Layerwise Retrieve パス
sequenceDiagram
participant Adapter as V1Impl (Worker側)
participant Engine as LMCacheEngine
participant TDB as TokenDatabase
participant SM as StorageManager
participant GPU as GPUConnector (Layerwise)
Note over Adapter: start_load_kv() 内
Adapter->>Engine: retrieve_layer(tokens, mask, kvcaches, slot_mapping, sync)
activate Engine
Engine->>TDB: process_tokens(tokens, mask)
TDB-->>Engine: [(start, end, CacheEngineKey), ...]
Note over Engine: contains(layer0_key) でヒット判定 + location統一チェック
Engine->>SM: layerwise_batched_get(keys_layer_major, location)
Note over SM: Layer 0 の get_non_blocking を asyncio.create_task() で投入
Engine->>GPU: batched_to_gpu(starts, ends, ...) → mem_obj_consumer Generator 生成
GPU-->>Engine: mem_obj_consumer primed (yield)
Engine-->>Adapter: yield torch.sum(ret_mask) — Layer 0 のヒット数
deactivate Engine
Note over Adapter: next(retriever) で Layer 0 データ受領
Adapter->>Engine: next(retriever)
activate Engine
Note over Engine: Layer 0 の task.result() を取得
Engine->>GPU: mem_obj_consumer.send(mem_objs_layer0)
Note over GPU: CPU→GPUバッファ copy(load_stream)
SM-->>Engine: Layer 1 の task yield
Engine-->>Adapter: yield None
deactivate Engine
Note over Adapter: wait_for_layer_load(layer_name) で同期
Adapter->>Engine: next(retriever)
activate Engine
Note over Engine: Layer N-1 処理...
Engine-->>Adapter: yield ret_mask(最終レイヤー後)
deactivate Engine
retrieve_layer() の Generator 構造
参照: target/LMCache/lmcache/v1/cache_engine.py:851
num_layers + 3回yieldする(ヒットあり時):
- yield 0:
torch.sum(ret_mask)— ヒットトークン数(sglang統合向け) - yield 1 ~ N-1:
None— 各レイヤーのGPU転送進行中 - yield N:
None— 最終レイヤー処理中 - yield N+1:
next(mem_obj_consumer)で同期 - yield N+2:
ret_mask— 最終結果
各レイヤーで:
next(get_generator)でStorageManagerから非同期取得したFutureを受け取るtask.result()でList[MemoryObj]を取得(ブロッキング)mem_obj_consumer.send(mem_objs_layer)でGPUコネクタにデータを渡すMemoryObj.ref_count_down()は全レイヤー完了後にバッチで実行
Layerwise GPUConnector.batched_to_gpu() のパイプライン
参照: target/LMCache/lmcache/v1/gpu_connector/gpu_connectors.py:683
VLLMBufferLayerwiseGPUConnectorはnum_layers + 2回のイテレーションで3段パイプラインを実行:
| イテレーション i | 操作1: paged書き込み | 操作2: RoPE補正+gap zeroing | 操作3: CPU→GPU load |
|---|---|---|---|
| i = 0 | — | — | yieldでmem_objs_layer0受領、load_stream上でcopy |
| i = 1 | — | Layer 0のRoPE補正 | yieldでmem_objs_layer1受領、load_stream上でcopy |
| i = 2 | Layer 0をpagedメモリに書き込み | Layer 1のRoPE補正 | yieldでmem_objs_layer2受領 |
| … | Layer i-2 | Layer i-1 | Layer i |
| i = N | Layer N-2 | Layer N-1 | yield(同期用、データなし) |
| i = N+1 | Layer N-1 | — | — |
ダブルバッファ: compute_gpu_buffer_objとload_gpu_buffer_objをping-pongして、RoPE計算とDMAをオーバーラップ。
RoPE位置補正: cache_positions=Trueの場合、保存時の位置と現在の位置の差分でfused_rotary_emb()を適用。保存時位置はMemoryObjMetadata.cached_positionsから取得。
gap zeroing: チャンク間のギャップ位置(連続しないstart/endの隙間)をゼロ埋め。
非同期プリフェッチの全体フロー
sequenceDiagram
participant Sched as V1Impl (Scheduler)
participant LC as LookupClient
participant LS as LookupServer (Worker)
participant SM as StorageManager
participant EM as EventManager
participant Worker as V1Impl (Worker)
participant Engine as LMCacheEngine
Note over Sched: get_num_new_matched_tokens() 内
Sched->>LC: lookup(token_ids, req_id)
LC->>LS: ZMQ REQ (hashes + offsets + req_id)
LS->>SM: async_lookup_and_prefetch(lookup_id, keys, ...)
Note over SM: 各バックエンドに batched_async_contains → batched_get_non_blocking
SM->>EM: add_event(LOADING, lookup_id, all_done_task)
LS-->>LC: num_hit_tokens
LC-->>Sched: num_external_hit_tokens
Note over Sched: build_connector_meta() で LoadSpec を ConnectorMetadata に格納
Note over Worker: start_load_kv() 内
Worker->>Engine: retrieve(tokens, mask, ..., req_id=req_id)
Engine->>EM: pop_event(LOADING, req_id)
Note over Engine: future.result() で prefetch 済み MemoryObj を取得
Engine->>Engine: _async_process_tokens_internal()
Engine->>GPU: batched_to_gpu(memory_objs, ...)
ポイント:
- Scheduler側のlookupがprefetchをトリガーし、Worker側のretrieveがprefetch結果を消費する
EventManagerが両者をlookup_id(=req_id)で紐付け- prefetchは
asyncio.create_task()で非同期実行され、Worker側のretrieveまでに完了していればブロッキングなし
token_mask と ret_mask の意味
| マスク | 用途 | 値の意味 |
|---|---|---|
token_mask | adapter側で構築 | False=vLLMがキャッシュ済み(チャンク境界まで切り詰め)、True=LMCacheから要ロード |
ret_mask | Engine内部で構築 | True=実際にLMCacheから取得成功、False=未取得 |
mask(Engine引数) | token_maskと同じ | process_tokens()のFalseプレフィックス=スキップ対象 |
token_maskのFalse区間はvllm_cached_tokensをchunk_sizeの倍数に切り下げた範囲。これにより、vLLMとLMCacheのキャッシュ境界がチャンク単位で整合する(オーバーラップ領域はLMCacheが上書き)。
エラーハンドリング
- 部分的取得失敗:
ret_mask.sum() < expectedの場合、record_failed_blocks()で失敗ブロックIDを計算し、_invalid_block_idsに蓄積。vLLMのSchedulerが次stepでget_block_ids_with_load_errors()で回収し、再計算を指示 - StorageManager.batched_get():
memory_obj=Noneが返された場合、last_failed_block_start以降を切り捨て(prefix matchの性質上、途中の欠損以降は全て無効) - 健全性チェック:
is_healthy()==Falseの場合、retrieve自体をスキップ(ゼロマスクを返す)