Rxjava实现三级缓存的两种方式

本文正如标题所说的用rxjava实现数据的三级缓存分别为内存,磁盘,网络,刚好最近在看Android源码设计模式解析与实战(受里面的ImageLoader的设计启发),我把代码放到了我的hot项目中。github地址

使用concat()和first()的操作符

用concat()和first()的操作符来实现,这是我在看Android源码设计模式解析与实战,作者在第一章的时候就介绍ImageLoader的设计。在内存中存储的方式LruCache来实现的,磁盘存储的方式就是序列化存储。

定义一个接口

1
2
3
4
5
6
7
8
/**
* Created by wukewei on 16/6/19.
*/
public interface ICache {
<T> Observable<T> get(String key, Class<T> cls);
<T> void put(String key, T t);
}

2.内存存储的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* Created by wukewei on 16/6/19.
*/
public class MemoryCache implements ICache{
private LruCache<String, String> mCache;
public MemoryCache() {
final int maxMemory = (int) Runtime.getRuntime().maxMemory();
final int cacheSize = maxMemory / 8;
mCache = new LruCache<String, String>(cacheSize) {
@Override
protected int sizeOf(String key, String value) {
try {
return value.getBytes("UTF-8").length;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return value.getBytes().length;
}
}
};
}
@Override
public <T> Observable<T> get(final String key, final Class<T> cls) {
return Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
String result = mCache.get(key);
if (subscriber.isUnsubscribed()) {
return;
}
if (TextUtils.isEmpty(result)) {
subscriber.onNext(null);
} else {
T t = new Gson().fromJson(result, cls);
subscriber.onNext(t);
}
subscriber.onCompleted();
}
});
}
@Override
public <T> void put(String key, T t) {
if (null != t) {
mCache.put(key, new Gson().toJson(t));
}
}
public void clearMemory(String key) {
mCache.remove(key);
}
}

磁盘存储的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
/**
* Created by wukewei on 16/6/19.
*/
public class DiskCache implements ICache{
private static final String NAME = ".db";
public static long OTHER_CACHE_TIME = 10 * 60 * 1000;
public static long WIFI_CACHE_TIME = 30 * 60 * 1000;
File fileDir;
public DiskCache() {
fileDir = CacheLoader.getApplication().getCacheDir();
}
@Override
public <T> Observable<T> get(final String key, final Class<T> cls) {
return Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
T t = (T) getDiskData1(key + NAME);
if (subscriber.isUnsubscribed()) {
return;
}
if (t == null) {
subscriber.onNext(null);
} else {
subscriber.onNext(t);
}
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
@Override
public <T> void put(final String key, final T t) {
Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
boolean isSuccess = isSave(key + NAME, t);
if (!subscriber.isUnsubscribed() && isSuccess) {
subscriber.onNext(t);
subscriber.onCompleted();
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
}
/**
* 保存数据
*/
private <T> boolean isSave(String fileName, T t) {
File file = new File(fileDir, fileName);
ObjectOutputStream objectOut = null;
boolean isSuccess = false;
try {
FileOutputStream out = new FileOutputStream(file);
objectOut = new ObjectOutputStream(out);
objectOut.writeObject(t);
objectOut.flush();
isSuccess=true;
} catch (IOException e) {
Log.e("写入缓存错误",e.getMessage());
} catch (Exception e) {
Log.e("写入缓存错误",e.getMessage());
} finally {
closeSilently(objectOut);
}
return isSuccess;
}
/**
* 获取保存的数据
*/
private Object getDiskData1(String fileName) {
File file = new File(fileDir, fileName);
if (isCacheDataFailure(file)) {
return null;
}
if (!file.exists()) {
return null;
}
Object o = null;
ObjectInputStream read = null;
try {
read = new ObjectInputStream(new FileInputStream(file));
o = read.readObject();
} catch (StreamCorruptedException e) {
Log.e("读取错误", e.getMessage());
} catch (IOException e) {
Log.e("读取错误", e.getMessage());
} catch (ClassNotFoundException e) {
Log.e("错误", e.getMessage());
} finally {
closeSilently(read);
}
return o;
}
private void closeSilently(Closeable closeable) {
if (closeable != null) {
try {
closeable.close();
} catch (Exception ignored) {
}
}
}
/**
* 判断缓存是否已经失效
*/
private boolean isCacheDataFailure(File dataFile) {
if (!dataFile.exists()) {
return false;
}
long existTime = System.currentTimeMillis() - dataFile.lastModified();
boolean failure = false;
if (NetWorkUtil.getNetworkType(CacheLoader.getApplication()) == NetWorkUtil.NETTYPE_WIFI) {
failure = existTime > WIFI_CACHE_TIME ? true : false;
} else {
failure = existTime > OTHER_CACHE_TIME ? true : false;
}
return failure;
}
public void clearDisk(String key) {
File file = new File(fileDir, key + NAME);
if (file.exists()) file.delete();
}
}

isCacheDataFailure()方式中就是判断当前的数据是否失效,我是根据当前的网络状况来分wifi状况和非wifi状况,wifi状态下数据过期时间比较短,其他状态过期时间比较长。

CacheLoader的设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
**
* Created by wukewei on 16/6/19.
*/
public class CacheLoader {
private static Application application;
public static Application getApplication() {
return application;
}
private ICache mMemoryCache, mDiskCache;
private CacheLoader() {
mMemoryCache = new MemoryCache();
mDiskCache = new DiskCache();
}
private static CacheLoader loader;
public static CacheLoader getInstance(Context context) {
application = (Application) context.getApplicationContext();
if (loader == null) {
synchronized (CacheLoader.class) {
if (loader == null) {
loader = new CacheLoader();
}
}
}
return loader;
}
public <T> Observable<T> asDataObservable(String key, Class<T> cls, NetworkCache<T> networkCache) {
Observable observable = Observable.concat(
memory(key, cls),
disk(key, cls),
network(key, cls, networkCache))
.first(new Func1<T, Boolean>() {
@Override
public Boolean call(T t) {
return t != null;
}
});
return observable;
}
private <T> Observable<T> memory(String key, Class<T> cls) {
return mMemoryCache.get(key, cls).doOnNext(new Action1<T>() {
@Override
public void call(T t) {
if (null != t) {
Log.d("我是来自内存","我是来自内存");
}
}
});
}
private <T> Observable<T> disk(final String key, Class<T> cls) {
return mDiskCache.get(key, cls)
.doOnNext(new Action1<T>() {
@Override
public void call(T t) {
if (null != t) {
Log.d("我是来自磁盘","我是来自磁盘");
mMemoryCache.put(key, t);
}
}
});
}
private <T> Observable<T> network(final String key, Class<T> cls
, NetworkCache<T> networkCache) {
return networkCache.get(key, cls)
.doOnNext(new Action1<T>() {
@Override
public void call(T t) {
if (null != t) {
Log.d("我是来自网络","我是来自网络");
mDiskCache.put(key, t);
mMemoryCache.put(key, t);
}
}
});
}
public void clearMemory(String key) {
((MemoryCache)mMemoryCache).clearMemory(key);
}
public void clearMemoryDisk(String key) {
((MemoryCache)mMemoryCache).clearMemory(key);
((DiskCache)mDiskCache).clearDisk(key);
}
}

网络获取的NetworkCache

1
2
3
4
5
6
/**
* Created by wukewei on 16/6/19.
*/
public abstract class NetworkCache<T> {
public abstract Observable<T> get(String key, final Class<T> cls);
}

6.接下来看怎么使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/**
* Created by wukewei on 16/5/30.
*/
public class ItemPresenter extends BasePresenter<ItemContract.View> implements ItemContract.Presenter {
private static final String key = "new_list";
protected int pn = 1;
protected void replacePn() {
pn = 1;
}
private boolean isRefresh() {
return pn == 1;
}
private NetworkCache<ListPopular> networkCache;
public ItemPresenter(Activity activity, ItemContract.View view) {
super(activity, view);
}
@Override
public void getListData(String type) {
if (isRefresh()) mView.showLoading();
networkCache = new NetworkCache<ListPopular>() {
@Override
public Observable<ListPopular> get(String key, Class<ListPopular> cls) {
return mHotApi.getPopular(ItemPresenter.this.pn, Constants.PAGE_SIZE, type)
.compose(SchedulersCompat.applyIoSchedulers())
.compose(RxResultHelper.handleResult())
.flatMap(populars -> {
ListPopular popular = new ListPopular(populars);
return Observable.just(popular);
});
}
};
Subscription subscription = CacheLoader.getInstance(mActivity)
.asDataObservable(key + type + ItemPresenter.this.pn, ListPopular.class, networkCache)
.map(listPopular -> listPopular.data)
.subscribe(populars -> {
mView.showContent();
if (isRefresh()) {
if (populars.size() == 0) mView.showNotdata();
mView.addRefreshData(populars);
} else {
mView.addLoadMoreData(populars);
}
}, throwable -> {
if (isRefresh())
mView.showError(ErrorHanding.handleError(throwable));
handleError(throwable);
});
addSubscrebe(subscription);
}
}

一定要给个key,我是根据key来获取数据的,还要就是给个类型。但是这个我设计的这个缓存还是不是很理想,接来下想要实现的就是在传入的时候类的class都不用给明,要是有好的实现的方式,欢迎告诉我。

使用BehaviorSubject

BehaviorSubject的实现方法,废话不多说直接上代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
/**
* Created by wukewei on 16/6/20.
*/
public class BehaviorSubjectFragment extends BaseFragment {
public static BehaviorSubjectFragment newInstance() {
BehaviorSubjectFragment fragment = new BehaviorSubjectFragment();
return fragment;
}
String diskData = null;
String networkData = "从服务器获取的数据";
BehaviorSubject<String> cache;
View mView;
@Nullable
@Override
public View onCreateView(LayoutInflater inflater, @Nullable ViewGroup container, @Nullable Bundle savedInstanceState) {
mView = inflater.inflate(R.layout.fragment_content, container, false);
init();
return mView;
}
private void init() {
mView.findViewById(R.id.get).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
subscriptionData(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.d("onNext", s);
}
});
}
});
mView.findViewById(R.id.memory).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
BehaviorSubjectFragment.this.cache = null;
}
});
mView.findViewById(R.id.memory_disk).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
BehaviorSubjectFragment.this.cache = null;
BehaviorSubjectFragment.this.diskData = null;
}
});
}
private void loadNewWork() {
Observable<String> o = Observable.just(networkData)
.doOnNext(new Action1<String>() {
@Override
public void call(String s) {
BehaviorSubjectFragment.this.diskData = s;
Log.d("写入磁盘", "写入磁盘");
}
});
o.subscribe(new Action1<String>() {
@Override
public void call(String s) {
cache.onNext(s);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
});
}
private Subscription subscriptionData(@NonNull Observer<String> observer) {
if (cache == null) {
cache = BehaviorSubject.create();
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
String data = diskData;
if (data == null) {
Log.d("来自网络", "来自网络");
loadNewWork();
} else {
Log.d("来自磁盘", "来自磁盘");
subscriber.onNext(data);
}
}
})
.subscribeOn(Schedulers.io())
.subscribe(cache);
} else {
Log.d("来自内存", "来自内存");
}
return cache.observeOn(AndroidSchedulers.mainThread()).subscribe(observer);
}
}

其中最主要的是subscriptionData()这个方法,就是先判断 cache是否存在要是存在的话就返回内存中数据,再去判断磁盘数据是否存在,如果存在就返回,要是前面两种都不存在的时候,再去网络中获取数据。还有最重要的是当你从网络获取数据的时候要记得保存在内存中和保存在磁盘中,在磁盘获取数据的时候把它赋值给内存。