Swing演算法介紹、實現與在阿里飛豬的實戰應用
本系列主要是基於Spark的推薦演算法實戰系列,本文為首篇,歡迎關注!
1.Swing演算法介紹
Swing演算法原理比較簡單,是阿里早期使用到的一種召回演算法,在阿里多個業務被驗證過非常有效的一種召回方式,它認為 user-item-user 的結構比 itemCF 的單邊結構更穩定,截止目前並沒有公開的論文進行介紹和說明(可能是因為比較簡單,阿里看不上哈哈),但是根據網上的各種資料,對該演算法的原理進行介紹,如有錯誤,歡迎指正。
Swing指的是鞦韆,例如使用者
和使用者
,都購買過同一件商品
,則三者之間會構成一個類似鞦韆的關係圖。若使用者
和使用者
之間除了購買過
外,還購買過商品
,則認為兩件商品是具有某種程度上的相似的。
也就是說,商品與商品之間的相似關係,是透過使用者關係來傳遞的。為了衡量物品
和
的相似性,考察都購買了物品
和
的使用者
和使用者
, 如果這兩個使用者共同購買的物品越少,則物品
和
的相似性越高。
Swing演算法的表示式如下:
2.Swing Python實現
# -*- coding: utf-8 -*-
“”“
Author : Thinkgamer
File : Swing。py
Software: PyCharm
Desc : 基於movie lens資料集實現Swing演算法
”“”
import pandas as pd
from itertools import combinations
import json
import os
alpha = 0。5
top_k = 20
def load_data(train_path, test_path):
train_data = pd。read_csv(train_path, sep=“\t”, engine=“python”, names=[“userid”, “movieid”, “rate”, “event_timestamp”])
test_data = pd。read_csv(test_path, sep=“\t”, engine=“python”, names=[“userid”, “movieid”, “rate”, “event_timestamp”])
print(train_data。head(5))
print(test_data。head(5))
return train_data, test_data
def get_uitems_iusers(train):
u_items = dict()
i_users = dict()
for index, row in train。iterrows():
u_items。setdefault(row[“userid”], set())
i_users。setdefault(row[“movieid”], set())
u_items[row[“userid”]]。add(row[“movieid”])
i_users[row[“movieid”]]。add(row[“userid”])
print(“使用的使用者個數為:{}”。format(len(u_items)))
print(“使用的item個數為:{}”。format(len(i_users)))
return u_items, i_users
def cal_similarity(u_items, i_users):
item_pairs = list(combinations(i_users。keys(), 2))
print(“item pairs length:{}”。format(len(item_pairs))) # 1410360
item_sim_dict = dict()
cnt = 0
for (i, j) in item_pairs:
cnt += 1
print(cnt)
user_pairs = list(combinations(i_users[i] & i_users[j], 2))
result = 0。0
for (u, v) in user_pairs:
result += 1 / (alpha + list(u_items[u] & u_items[v])。__len__())
item_sim_dict。setdefault(i, dict())
item_sim_dict[i][j] = result
# print(item_sim_dict[i][j])
return item_sim_dict
def save_item_sims(item_sim_dict, path):
new_item_sim_dict = dict()
for item, sim_items in item_sim_dict。items():
new_item_sim_dict。setdefault(item, dict())
new_item_sim_dict[item] = dict(sorted(sim_items。items(), key = lambda k:k[1], reverse=True)[:top_k])
json。dump(new_item_sim_dict, open(path, “w”))
print(“item 相似 item({})儲存成功!”。format(top_k))
return new_item_sim_dict
def evaluate(item_sim_dict, test):
# 可以參考《推薦系統開發實戰》中的cf驗證方式
pass
if __name__ == “__main__”:
train_data_path = “。。/。。/data/ml-100k/ua。base”
test_data_path = “。。/。。/data/ml-100k/ua。test”
item_sim_save_path = “。。/。。/model/swing/item_sim_dict。json”
train, test = load_data(train_data_path, test_data_path)
if not os。path。exists(item_sim_save_path):
u_items, i_users = get_uitems_iusers(train)
item_sim_dict = cal_similarity(u_items, i_users)
new_item_sim_dict = save_item_sims(item_sim_dict, item_sim_save_path)
else:
new_item_sim_dict = json。load(open(item_sim_save_path, “r”))
evaluate(new_item_sim_dict, test)
3.Swing Spark實現
建立Swing類,其中的評估函式和predict函式這裡並未提供,感興趣的可以自己實現
/**
* @ClassName: Swing
* @Description: 實現Swing演算法
* @author: Thinkgamer
**/
class SwingModel(spark: SparkSession) extends Serializable{
var alpha: Option[Double] = Option(0。0)
var items: Option[ArrayBuffer[String]] = Option(new ArrayBuffer[String]())
var userIntersectionMap: Option[Map[String, Map[String, Int]]] = Option(Map[String, Map[String, Int]]())
/*
* @Description 給引數 alpha賦值
* @Param double
* @return cf。SwingModel
**/
def setAlpha(alpha: Double): SwingModel = {
this。alpha = Option(alpha)
this
}
/*
* @Description 給所有的item進行賦值
* @Param [array]
* @return cf。SwingModel
**/
def setAllItems(array: Array[String]): SwingModel = {
this。items = Option(array。toBuffer。asInstanceOf[ArrayBuffer[String]])
this
}
/*
* @Description 獲取兩兩使用者有行為的item交集個數
* @Param [spark, data]
* @return scala。collection。immutable。Map
**/
def calUserRateItemIntersection(data: RDD[(String, String, Double)]): Map[String, Map[String, Int]] = {
val rdd = data。map(l => (l。_1, l。_2))。groupByKey()。map(l => (l。_1, l。_2。toSet))
val map = (rdd cartesian rdd)。map(l => (l。_1。_1, (l。_2。_1, (l。_1。_2 & l。_2。_2)。toArray。length)))
。groupByKey()
。map(l => (l。_1, l。_2。toMap))
。collectAsMap()。toMap
map。take(10)。foreach(println)
map
}
def fit(data: RDD[(String, String, Double)]): RDD[(String, String, Double)]= {
this。userIntersectionMap = Option(this。calUserRateItemIntersection(data))
println(this。userIntersectionMap。take(10))
val rdd = data。map(l => (l。_2, l。_1))。groupByKey()。map(l => (l。_1, l。_2。toSet))
val result: RDD[(String, String, Double)] = (rdd cartesian rdd)。map(l => {
val item1 = l。_1。_1
val item2 = l。_2。_1
val intersectionUsers = l。_1。_2 & l。_2。_2
var score = 0。0
for(u1 <- intersectionUsers){
for(u2 <- intersectionUsers){
score += 1。0 / (this。userIntersectionMap。get。get(u1)。get(u2)。toDouble + this。alpha。get)
}
}
(item1, item2, score) // (item1, item2, swingsocre)
})
result
}
def evalute(test: RDD[(String, String, Double)]) = { }
def predict(userid: String) = { }
def predict(userids: Array[String]) = { }
}
main函式呼叫
object Swing {
def main(args: Array[String]): Unit = {
val spark = SparkSession。builder()。master(“local[10]”)。appName(“Swing”)。enableHiveSupport()。getOrCreate()
Logger。getRootLogger。setLevel(Level。WARN)
val trainDataPath = “data/ml-100k/ua。base”
val testDataPath = “data/ml-100k/ua。test”
import spark。sqlContext。implicits。_
val train: RDD[(String, String, Double)] = spark。sparkContext。textFile(trainDataPath)。map(_。split(“\t”))。map(l => (l(0), l(1), l(2)。toDouble))
val test: RDD[(String, String, Double)] = spark。sparkContext。textFile(testDataPath)。map(_。split(“\t”))。map(l => (l(0), l(1), l(2)。toDouble))
val items: Array[String] = train。map(_。_2)。collect()
val swing = new SwingModel(spark)。setAlpha(1)。setAllItems(items)
val itemSims: RDD[(String, String, Double)] = swing。fit(train)
swing。evalute(test)
swing。predict(“”)
swing。predict(Array(“”, “”))
spark。close()
}
}
4.Swing在阿里飛豬的應用
航旅使用者的行為有稀疏和發散的特點。利用右圖一個具體的使用者例項來說明這兩個特點:使用者在第一天點選了兩個大理一日遊,第 20 天點選了一些馬爾地夫蜜月相關的商品,第 21 天又點選了大理的一日遊。稀疏性體現在一個月只來了 3 次,點選了 8 個寶貝。發散性體現在使用者大理一日遊和出國蜜月遊兩個 topic 感興趣。
在使用者有行為的情況下進行召回,我們常採用的方法是基於 User-Rate 矩陣的協同過濾方法 ( 如 ItemCF,Swing。ItemCF 認為同時點選兩個商品的使用者越多則這兩個商品越相似。Swing 是在阿里多個業務被驗證過非常有效的一種召回方式,它認為 user-item-user 的結構比 itemCF 的單邊結構更穩定 ),但是由於航旅使用者行為稀疏,基於 User-Rate 矩陣召回結果的準確率比較低,泛化性差。針對這兩個問題我們可以透過擴充歷史資料來增加樣本覆蓋。航旅場景因為使用者點選資料比較稀疏,需要比電商 ( 淘寶 ) 擴充更多 ( 時間更長 ) 的資料才夠。這又帶來了興趣點轉移多的問題。在這裡我們採用對行為序列進行 session 劃分,保證相關性。
這裡以 swing 為例講解一下構造約束的方式。我們以使用者的行為意圖為中心,將表示共同意圖的商品聚合在一個序列中,如上圖對使用者行為序列的切分。
在這個 case 中,上面是傳統 swing 的召回結果,下面是基於 session 的召回結果。當 trigger 是沙溪古鎮一日遊的時候,上面有一個杭州莫干山和玉龍雪山一日遊,這兩個不相關結果的出現是因為它們是熱門商品,也稱哈利波特效應。下面的召回結果就都是和沙溪古鎮相關的了。從指標來看,session-based 召回比 swing 和 itemCF 都高。
參考:
https://
zhuanlan。zhihu。com/p/67
126386
https://www。
infoq。cn/article/qfl1nx
cxhuxv723imb7v
搜尋關注微信公眾號【搜尋與推薦Wiki】,信條【All In CTR、DL、ML、RL、NLP】
搜尋關注微信公眾號【搜尋與推薦Wiki】,專注於搜尋和推薦系統,以系列分享為主,持續打造精品內容!