nanusl
3/29/2018 - 12:35 PM

简单线程池使用

简单线程池使用

      //检查参数
        if (null != menuInfo && !storeCodes.isEmpty()) {
            pubInfo.setMenuId(menuInfo.getMenuId())
                    .setPubType(Boolean.FALSE).setState(WxAppConstants.PUB_STEP_FIRST);

            ListeningExecutorService executorService = MoreExecutors.listeningDecorator(new ScheduledThreadPoolExecutor(storeCodes.size() + 1));

            //创建总发布记录
            Integer pubInfoId = wxMenuPubDAO.create(pubInfo);

            final CountDownLatch count = new CountDownLatch(storeCodes.size());

            storeCodes.forEach(storeCode -> {

                //异步提交
                ListenableFuture<Boolean> distTask = executorService.submit(() -> {
                    WxMpService baseMpService = wxCoreService.getBaseMpService(storeCode);
                    WxMpSettingInfo settingInfo = wxMpSettingDAO.queryByStore(storeCode);
                    //如果存在授权链接
                    String replace = replaceAuthCodeAndStateCode(menuInfo.getMenuBody(), settingInfo);
                    menuInfo.setMenuBody(replace);
                    WxMenu menu = WxMenu.fromJson(replace);
                    String menuCreate = baseMpService.getMenuService().menuCreate(menu);
                    return true;
                });

                //回调记录具体发布条件
                Futures.addCallback(distTask, new FutureCallback<Boolean>() {
                    @Override
                    public void onSuccess(Boolean result) {
                        wxMenuDistDAO.create(new WxMenuDistInfo().setMenuPubId(pubInfoId)
                                .setMenuBody(menuInfo.getMenuBody()).setStoreCode(storeCode)
                                .setState(result));
                        count.countDown();
                    }

                    @Override
                    @ParametersAreNonnullByDefault
                    public void onFailure(Throwable throwable) {
                        logger.error("自定义菜单发布子任务出错!", throwable);
                        WxMenuDistInfo wxMenuDistInfo = new WxMenuDistInfo()
                                .setMenuPubId(pubInfoId)
                                .setMenuBody(menuInfo.getMenuBody()).setStoreCode(storeCode)
                                .setState(Boolean.FALSE);
                        if (throwable instanceof WxErrorException) {
                            wxMenuDistInfo.setMessage("微信API调用出错:" + ((WxErrorException) throwable).getError().toString());
                        } else if (throwable instanceof JsonSyntaxException) {
                            wxMenuDistInfo.setMessage("菜单模板JSON格式有误!");
                        } else {
                            //其他错误
                            wxMenuDistInfo.setMessage(throwable.toString());
                        }
                        wxMenuDistDAO.create(wxMenuDistInfo);
                        count.countDown();
                    }
                }, executorService);
            });

            //组发布子任务全部完成后执行最终回调检查
            executorService.submit(() -> {
                try {
                    count.await();
                    WxMenuPubInfo wxMenuPubInfo = new WxMenuPubInfo();
                    //查询pubInfo状态
                    if (wxMenuDistDAO.checkState(pubInfoId)) {
                        wxMenuPubInfo.setState(WxAppConstants.PUB_STEP_DONE);
                    } else {
                        wxMenuPubInfo.setState(WxAppConstants.PUB_STEP_PART);
                    }
                    wxMenuPubDAO.update(pubInfoId, wxMenuPubInfo);
                } catch (InterruptedException e) {
                    logger.error("CountDownLatch出错!", e);
                } finally {
                    executorService.shutdownNow();
                    executorService.shutdown();
                }
            });
            return true;
        }