本系列主要是基於Spark的推薦演算法實戰系列,本文為首篇,歡迎關注!

1.Swing演算法介紹

Swing演算法原理比較簡單,是阿里早期使用到的一種召回演算法,在阿里多個業務被驗證過非常有效的一種召回方式,它認為 user-item-user 的結構比 itemCF 的單邊結構更穩定,截止目前並沒有公開的論文進行介紹和說明(可能是因為比較簡單,阿里看不上哈哈),但是根據網上的各種資料,對該演算法的原理進行介紹,如有錯誤,歡迎指正。

Swing指的是鞦韆,例如使用者

u

和使用者

v

,都購買過同一件商品

i

,則三者之間會構成一個類似鞦韆的關係圖。若使用者

u

和使用者

v

之間除了購買過

i

外,還購買過商品

j

,則認為兩件商品是具有某種程度上的相似的。

也就是說,商品與商品之間的相似關係,是透過使用者關係來傳遞的。為了衡量物品

i

j

的相似性,考察都購買了物品

i

j

的使用者

u

和使用者

v

, 如果這兩個使用者共同購買的物品越少,則物品

i

j

的相似性越高。

Swing演算法的表示式如下:

sim\left \langle i,j \right \rangle = \sum_{u \in U_i \bigcap  U_j} \sum_{v \in U_i \bigcap  U_j} \frac{1}{\alpha + |I_u \bigcap I_v|} \\

2.Swing Python實現

# -*- coding: utf-8 -*-

“”“

Author : Thinkgamer

File : Swing。py

Software: PyCharm

Desc : 基於movie lens資料集實現Swing演算法

”“”

import pandas as pd

from itertools import combinations

import json

import os

alpha = 0。5

top_k = 20

def load_data(train_path, test_path):

train_data = pd。read_csv(train_path, sep=“\t”, engine=“python”, names=[“userid”, “movieid”, “rate”, “event_timestamp”])

test_data = pd。read_csv(test_path, sep=“\t”, engine=“python”, names=[“userid”, “movieid”, “rate”, “event_timestamp”])

print(train_data。head(5))

print(test_data。head(5))

return train_data, test_data

def get_uitems_iusers(train):

u_items = dict()

i_users = dict()

for index, row in train。iterrows():

u_items。setdefault(row[“userid”], set())

i_users。setdefault(row[“movieid”], set())

u_items[row[“userid”]]。add(row[“movieid”])

i_users[row[“movieid”]]。add(row[“userid”])

print(“使用的使用者個數為:{}”。format(len(u_items)))

print(“使用的item個數為:{}”。format(len(i_users)))

return u_items, i_users

def cal_similarity(u_items, i_users):

item_pairs = list(combinations(i_users。keys(), 2))

print(“item pairs length:{}”。format(len(item_pairs))) # 1410360

item_sim_dict = dict()

cnt = 0

for (i, j) in item_pairs:

cnt += 1

print(cnt)

user_pairs = list(combinations(i_users[i] & i_users[j], 2))

result = 0。0

for (u, v) in user_pairs:

result += 1 / (alpha + list(u_items[u] & u_items[v])。__len__())

item_sim_dict。setdefault(i, dict())

item_sim_dict[i][j] = result

# print(item_sim_dict[i][j])

return item_sim_dict

def save_item_sims(item_sim_dict, path):

new_item_sim_dict = dict()

for item, sim_items in item_sim_dict。items():

new_item_sim_dict。setdefault(item, dict())

new_item_sim_dict[item] = dict(sorted(sim_items。items(), key = lambda k:k[1], reverse=True)[:top_k])

json。dump(new_item_sim_dict, open(path, “w”))

print(“item 相似 item({})儲存成功!”。format(top_k))

return new_item_sim_dict

def evaluate(item_sim_dict, test):

# 可以參考《推薦系統開發實戰》中的cf驗證方式

pass

if __name__ == “__main__”:

train_data_path = “。。/。。/data/ml-100k/ua。base”

test_data_path = “。。/。。/data/ml-100k/ua。test”

item_sim_save_path = “。。/。。/model/swing/item_sim_dict。json”

train, test = load_data(train_data_path, test_data_path)

if not os。path。exists(item_sim_save_path):

u_items, i_users = get_uitems_iusers(train)

item_sim_dict = cal_similarity(u_items, i_users)

new_item_sim_dict = save_item_sims(item_sim_dict, item_sim_save_path)

else:

new_item_sim_dict = json。load(open(item_sim_save_path, “r”))

evaluate(new_item_sim_dict, test)

3.Swing Spark實現

建立Swing類,其中的評估函式和predict函式這裡並未提供,感興趣的可以自己實現

/**

* @ClassName: Swing

* @Description: 實現Swing演算法

* @author: Thinkgamer

**/

class SwingModel(spark: SparkSession) extends Serializable{

var alpha: Option[Double] = Option(0。0)

var items: Option[ArrayBuffer[String]] = Option(new ArrayBuffer[String]())

var userIntersectionMap: Option[Map[String, Map[String, Int]]] = Option(Map[String, Map[String, Int]]())

/*

* @Description 給引數 alpha賦值

* @Param double

* @return cf。SwingModel

**/

def setAlpha(alpha: Double): SwingModel = {

this。alpha = Option(alpha)

this

}

/*

* @Description 給所有的item進行賦值

* @Param [array]

* @return cf。SwingModel

**/

def setAllItems(array: Array[String]): SwingModel = {

this。items = Option(array。toBuffer。asInstanceOf[ArrayBuffer[String]])

this

}

/*

* @Description 獲取兩兩使用者有行為的item交集個數

* @Param [spark, data]

* @return scala。collection。immutable。Map>

**/

def calUserRateItemIntersection(data: RDD[(String, String, Double)]): Map[String, Map[String, Int]] = {

val rdd = data。map(l => (l。_1, l。_2))。groupByKey()。map(l => (l。_1, l。_2。toSet))

val map = (rdd cartesian rdd)。map(l => (l。_1。_1, (l。_2。_1, (l。_1。_2 & l。_2。_2)。toArray。length)))

。groupByKey()

。map(l => (l。_1, l。_2。toMap))

。collectAsMap()。toMap

map。take(10)。foreach(println)

map

}

def fit(data: RDD[(String, String, Double)]): RDD[(String, String, Double)]= {

this。userIntersectionMap = Option(this。calUserRateItemIntersection(data))

println(this。userIntersectionMap。take(10))

val rdd = data。map(l => (l。_2, l。_1))。groupByKey()。map(l => (l。_1, l。_2。toSet))

val result: RDD[(String, String, Double)] = (rdd cartesian rdd)。map(l => {

val item1 = l。_1。_1

val item2 = l。_2。_1

val intersectionUsers = l。_1。_2 & l。_2。_2

var score = 0。0

for(u1 <- intersectionUsers){

for(u2 <- intersectionUsers){

score += 1。0 / (this。userIntersectionMap。get。get(u1)。get(u2)。toDouble + this。alpha。get)

}

}

(item1, item2, score) // (item1, item2, swingsocre)

})

result

}

def evalute(test: RDD[(String, String, Double)]) = { }

def predict(userid: String) = { }

def predict(userids: Array[String]) = { }

}

main函式呼叫

object Swing {

def main(args: Array[String]): Unit = {

val spark = SparkSession。builder()。master(“local[10]”)。appName(“Swing”)。enableHiveSupport()。getOrCreate()

Logger。getRootLogger。setLevel(Level。WARN)

val trainDataPath = “data/ml-100k/ua。base”

val testDataPath = “data/ml-100k/ua。test”

import spark。sqlContext。implicits。_

val train: RDD[(String, String, Double)] = spark。sparkContext。textFile(trainDataPath)。map(_。split(“\t”))。map(l => (l(0), l(1), l(2)。toDouble))

val test: RDD[(String, String, Double)] = spark。sparkContext。textFile(testDataPath)。map(_。split(“\t”))。map(l => (l(0), l(1), l(2)。toDouble))

val items: Array[String] = train。map(_。_2)。collect()

val swing = new SwingModel(spark)。setAlpha(1)。setAllItems(items)

val itemSims: RDD[(String, String, Double)] = swing。fit(train)

swing。evalute(test)

swing。predict(“”)

swing。predict(Array(“”, “”))

spark。close()

}

}

4.Swing在阿里飛豬的應用

Swing演算法介紹、實現與在阿里飛豬的實戰應用

航旅使用者的行為有稀疏和發散的特點。利用右圖一個具體的使用者例項來說明這兩個特點:使用者在第一天點選了兩個大理一日遊,第 20 天點選了一些馬爾地夫蜜月相關的商品,第 21 天又點選了大理的一日遊。稀疏性體現在一個月只來了 3 次,點選了 8 個寶貝。發散性體現在使用者大理一日遊和出國蜜月遊兩個 topic 感興趣。

Swing演算法介紹、實現與在阿里飛豬的實戰應用

在使用者有行為的情況下進行召回,我們常採用的方法是基於 User-Rate 矩陣的協同過濾方法 ( 如 ItemCF,Swing。ItemCF 認為同時點選兩個商品的使用者越多則這兩個商品越相似。Swing 是在阿里多個業務被驗證過非常有效的一種召回方式,它認為 user-item-user 的結構比 itemCF 的單邊結構更穩定 ),但是由於航旅使用者行為稀疏,基於 User-Rate 矩陣召回結果的準確率比較低,泛化性差。針對這兩個問題我們可以透過擴充歷史資料來增加樣本覆蓋。航旅場景因為使用者點選資料比較稀疏,需要比電商 ( 淘寶 ) 擴充更多 ( 時間更長 ) 的資料才夠。這又帶來了興趣點轉移多的問題。在這裡我們採用對行為序列進行 session 劃分,保證相關性。

Swing演算法介紹、實現與在阿里飛豬的實戰應用

這裡以 swing 為例講解一下構造約束的方式。我們以使用者的行為意圖為中心,將表示共同意圖的商品聚合在一個序列中,如上圖對使用者行為序列的切分。

Swing演算法介紹、實現與在阿里飛豬的實戰應用

在這個 case 中,上面是傳統 swing 的召回結果,下面是基於 session 的召回結果。當 trigger 是沙溪古鎮一日遊的時候,上面有一個杭州莫干山和玉龍雪山一日遊,這兩個不相關結果的出現是因為它們是熱門商品,也稱哈利波特效應。下面的召回結果就都是和沙溪古鎮相關的了。從指標來看,session-based 召回比 swing 和 itemCF 都高。

參考:

https://

zhuanlan。zhihu。com/p/67

126386

https://www。

infoq。cn/article/qfl1nx

cxhuxv723imb7v

搜尋關注微信公眾號【搜尋與推薦Wiki】,信條【All In CTR、DL、ML、RL、NLP】

搜尋關注微信公眾號【搜尋與推薦Wiki】,專注於搜尋和推薦系統,以系列分享為主,持續打造精品內容!