Retrofit+Rxjava服务器IP轮询重试机制实现

  |   0 评论   |   2,994 浏览

    app在启动时会请求一些配置信息,其中就包括ip的路由表,将这份路由表存储到本地,至于是sp还是对象持久化抑或是其他方式,可根据实际情况自行选择。

    因为项目网络层由Retrofit+Rxjava+Okhttp实现,Retrofit运行时无法改变baseUrl,即使可以通过反射的方式来改变baseUrl,也无法对已经生成的service对象起作用,而且我的项目中所有service对象都通过Dagger2注入,所以最终使用了这样一种方式。

    url交给UrlManager来管理

    public class UrlManager { 
       public static final String[] HOST_SITE = {"https://xxx/api/",xxx}; 
       public static final String[] HOST_WEB = {"https://xxx/",xxx};  
       public static final String HOST_SITE_DEBUG = "https://xxx/api/";  
       public static final String HOST_WEB_DEBUG = "https://xxx/"; 
       public static List<String> list;   
       public static Random random = new Random();  
       public static String getHostSite() {   
          if(BuildConfig.IS_DEBUG) {        
             return processUrl(HOST_SITE_DEBUG);
          } else {
            String host = getDynamicHost(); 
            if (!TextUtils.isEmpty(host)) return host;  
            return HOST_SITE[random.nextInt(HOST_SITE.length)];
          }
      } 
                   
      @Nullable
      private static String getDynamicHost() {
          int index = (int) SPUtils.get(NeutronApplication.getContext(), Constants.URL, 0);
          if (list != null && list.size() > 0 && index < list.size())  
              return list.get(index);    
              return null;
      }
      public static String getHostWeb() { 
         if (BuildConfig.IS_DEBUG) {         
             return processUrl(HOST_WEB_DEBUG);
         } else { 
             String host = getDynamicHost();  
             if (!TextUtils.isEmpty(host)) return host;    
             return HOST_WEB[random.nextInt(HOST_WEB.length)];
         }
      } 
       public static void setHosts(List<String> list) {
            UrlManager.list = list;
            RxHelper.setCounterAttempts(list.size());
      }  
      public static void updateUrlIndex(int i) {
           if (list != null && i >= list.size())
            i = 0;
          SPUtils.put(NeutronApplication.getContext(),Constants.URL, i);
      }   
       public static void updateUrlIndex() {  
          int o = (int) SPUtils.get(NeutronApplication.getContext(), Constants.URL, 0);
           updateUrlIndex(o + 1);
     }
    }

    app启动时拉取到配置后设置UrlManager中的路由表,然后每次根据索引去路由表动态拿请求地址,那路由索引由谁来控制呢?
    因为我将项目中的rxjava抽取了一层RxHelper,所以这件事就交给RxHelper来干了,可以覆盖所有的网络请求。

    public class RxHelper {  
      private static final int COUNTER_START = 0;  
      private static int COUNTER_ATTEMPTS = 3;  
      public static void setCounterAttempts(int counterAttempts) {
            COUNTER_ATTEMPTS = counterAttempts;
      } 
      public static <T> rx.Observable.Transformer<T, T> handleResult() { 
             return tObservable -> tObservable
                    .flatMap(RxHelper::createData)
                    .retryWhen(observable -> observable.compose(zipWithFlatMap()))
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribeOn(Schedulers.io());
      }   
      public static <T> rx.Observable.Transformer<T,T> handleResultWithOutRetryPolicy(){
              return tObservable -> tObservable.flatMap(RxHelper::createData)
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribeOn(Schedulers.io());
      }
      public static <T> Observable.Transformer<T, Long> zipWithFlatMap() { 
             return observable ->
                    observable.zipWith(Observable.range(COUNTER_START, COUNTER_ATTEMPTS),
                            (t, repeatAttempt) -> repeatAttempt)
                            .flatMap(new Func1<Integer, Observable<Long>>() {  
                                @Override
                                public Observable<Long> call(Integer repeatAttempt) {
                                    UrlManager.updateUrlIndex(repeatAttempt);  
                                    return Observable.timer(repeatAttempt * 200, TimeUnit.MILLISECONDS);
                                }
                            });
      }   
      private static <T> Observable<T> createData(final T t) { 
             return Observable.create(new Observable.OnSubscribe<T>() {  
                @Override
                public void call(Subscriber<? super T> subscriber) {  
                    try {
                        subscriber.onNext(t);
                        subscriber.onCompleted();
                    } catch (Exception e) {
                        LogUtils.logw("Rxhelper: " + e.toString());
                        subscriber.onError(e);
                    }
                }
            });
        }
    }

    这样在每次请求错误时,会递增路由表索引,继续下次请求,轮询的间隔为Observable.timer(repeatAttempt * 200, TimeUnit.MILLISECONDS);
    对RxJava的retryWhen不理解的同学请移步对RxJava中.repeatWhen()和.retryWhen()操作符的思考

    之前也说了,retrofit不能修改baseUrl,反射的方式也不适合我的项目,至于利用builder生成新的retrofit对象的方式更不考虑了,那我是怎么实现运行时修改请求地址的呢?别忘了okhttp是可以添加拦截器的,在OkHttpIntercepter中:

    public class OkHttpInterceptor implements Interceptor {   
        @Override
        public Response intercept(Chain chain) throws IOException {     
           //配置request
            Request request = chain.request();
            Request.Builder requestBuilder = request.newBuilder();
            String url = UrlManager.getHostSite();
            Uri parse = Uri.parse(url);
            String host = parse.getHost();
            HttpUrl httpUrl = request.url().newBuilder().host(host).build();
            requestBuilder.url(httpUrl);
            Response.Builder responseBuilder = chain.proceed(requestBuilder.build()).newBuilder();
            Response response = responseBuilder.build();       
            return response;
        }
    }

    拦截请求的url,修改其host,这样整个流程就ok了,http的各种错误码的处理也是可以在拦截器中统一处理的,至于其他健壮性的考虑此处就不做过多阐述了。

    有同学问我,如果想处理最后一次error通知怎么办呢?可以这样做,修改过的RxHelper如下:

    public class RxHelper {  
         private static final int COUNTER_START = 0; 
         private static int COUNTER_ATTEMPTS = 3;   
         public static void setCounterAttempts(int counterAttempts) {
            COUNTER_ATTEMPTS = counterAttempts;
         }   
        public static <T> rx.Observable.Transformer<T, T> handleResult() { 
               return tObservable -> tObservable
                    .flatMap(RxHelper::createData)
                    .retryWhen(error -> delayedRetry(error))
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribeOn(Schedulers.io());
        }    
        //猫腻主要在这个方法
        private static Observable<Object> delayedRetry(Observable<? extends Throwable> error) {
                return error.zipWith(Observable.range(COUNTER_START, COUNTER_ATTEMPTS + 1),
                    (i, repeatAttempt) -> repeatAttempt)
                    .flatMap(o -> {
                        UrlManager.updateUrlIndex(o);
                        LogUtils.logw("repeat: " + o);    
                        return o < COUNTER_ATTEMPTS ? Observable.timer(o * 200, TimeUnit.MILLISECONDS)
                                : error.flatMap(Observable::error);
                    });
        }  
        public static <T> rx.Observable.Transformer<T, T> handleResultWithOutRetryPolicy() {
                return tObservable -> tObservable.flatMap(RxHelper::createData)
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribeOn(Schedulers.io());
        }   
        private static <T> Observable<T> createData(final T t) {   
             return Observable.create(new Observable.OnSubscribe<T>() {    
                @Override
                public void call(Subscriber<? super T> subscriber) {  
                    try {
                        subscriber.onNext(t);
                        subscriber.onCompleted();
                    } catch (Exception e) {
                        subscriber.onError(e);
                    }
                }
            });
        }
    }

    转自:https://gold.xitu.io/post/584e6d5961ff4b0058e9240a



    >