Skip to content

使用While类写异步循环

zhangxin edited this page Mar 7, 2017 · 2 revisions

目录

介绍

作为全异步软件,ZStack中大量的使用异步函数,而异步函数是没法使用传统的同步循环(例如for..., while...)。举个例子:

private void httpCall() {
    // make a http call
}

public void loop() {
    for (int i=0; i<10; i++) {
        httpCall();
    }
}

上面例子中,httpCall()函数同步执行,所以我们可以用一个for循环,每次httpCall()执行完成并返回后,才会执行一下次httpCall()。把上面的例子换成异步方式:

private void httpCall(NoErrorCompletion completion) {
    // make a http call
    new Thread(completion::done).start();
}

public void loop() {
    for (int i=0; i<10; i++) {
        httpCall(new NoErrorCompletion() {
            @Override
            public void done() {
                // httpCall() 调用完成
            }
        });
    }
}

在异步例子中,每次httpCall()执行时都会开一个新的线程执行具体的操作,并在执行完成后调用传入的callback接口NoErrorCompletion.done()通知调用者。在这种情况下,for循环会瞬时完成,而不再向我们期望那样上一个httpCall()执行完后再执行下一个httpCall()

为了在异步操作时模拟同步操作的循环,ZStack使用While类做异步循环。While的使用方式很简单,提供一个要遍历的集合,一个遍历集合是执行的函数体,一个集合遍历完成后执行的callback即可。下面我们用例子展示While的使用方式。

每次执行一个异步操作

private void httpCall(NoErrorCompletion completion) {
    // make a http call
    new Thread(completion::done).start();
}

public void loop() {
    List<Integer> list = Arrays.asList(0,1,2,3,4,5,6,7,8,9);

    new While<>(list).each((Integer i, NoErrorCompletion completion) -> {
        httpCall(completion);
    }).run(new NoErrorCompletion() {
        @Override
        public void done() {
            // list遍历完后要执行的操作放在这里
        }
    });
}

这里使用了each()函数传入一个lambda,第一个参数是集合中当前遍历的元素,第二个为NoErrorCompletion interface,通过调用NoErrorCompletion.done()函数通知While操作执行完成,继续遍历下一个元素。run()函数同样接受一个NoErrorCompletion interface,当整个集合遍历完后done()函数被执行。

每次执行多个异步操作

private void httpCall(NoErrorCompletion completion) {
    // make a http call
    new Thread(completion::done).start();
}

public void loop() {
    List<Integer> list = Arrays.asList(0,1,2,3,4,5,6,7,8,9);

    new While<>(list).step((Integer i, NoErrorCompletion completion) -> {
        httpCall(completion);
    }, 2).run(new NoErrorCompletion() {
        @Override
        public void done() {
            // list遍历完后要执行的操作放在这里
        }
    });
}

这里使用了step()函数,它跟each()函数接收同样的lambda,除此之外它接收一个Integer型参数用于指定每次并发执行的操作数目。上例中,会同时有两个操作并发执行。

这里的并发数表示遍历集合时最大的并发操作数。例如上面例子中,第一次遍历就同时执行两个httpCall(),它们完成的时间可能不同,假设第一个httpCall()耗费10秒,而第二个耗费1分钟,那么第一个httpCall()执行完成调用NoErrorCompletion.done()就会马上触发While执行下一个、也就是第三个httpCall(),这样While始终保持最大两个并发操作。

同时执行所有异步操作

private void httpCall(NoErrorCompletion completion) {
    // make a http call
    new Thread(completion::done).start();
}

public void loop() {
    List<Integer> list = Arrays.asList(0,1,2,3,4,5,6,7,8,9);

    new While<>(list).all((Integer i, NoErrorCompletion completion) -> {
        httpCall(completion);
    }).run(new NoErrorCompletion() {
        @Override
        public void done() {
            // list遍历完后要执行的操作放在这里
        }
    });
}

这里使用了all()函数,它会同时遍历集合中所有元素,触发所有异步操作,并在所有操作完成后代用run()中指定的callback。all()等同于step()的特殊情况,相当于step(..., list.size())

真实的例子

下面我们看一个真实的例子:

new While<>(bsUuids).all((bsUuid, completion) -> {
    ExpungeImageMsg emsg = new ExpungeImageMsg();
    emsg.setBackupStorageUuid(bsUuid);
    emsg.setImageUuid(self.getUuid());
    bus.makeTargetServiceIdByResourceUuid(emsg, ImageConstant.SERVICE_ID, self.getUuid());
    bus.send(emsg, new CloudBusCallBack(completion) {
        @Override
        public void run(MessageReply reply) {
            if (!reply.isSuccess()) {
                logger.warn(reply.getError().toString());
            }

            completion.done();
        }
    });
}).run(new NoErrorCompletion(msg) {
    @Override
    public void done() {
        bus.publish(new APIExpungeImageEvent(msg.getId()));
    }
});

废弃的异步消息集合方法

While作为唯一的异步循环方法,原来CloudBus上定义的消息集合方法已废弃,不应该再被使用:

public interface CloudBus extends Component {
    @Deprecated
    void send(List<? extends NeedReplyMessage> msgs, CloudBusListCallBack callBack);

    @Deprecated
    void send(List<? extends NeedReplyMessage> msgs, int parallelLevel, CloudBusListCallBack callBack);

    @Deprecated
    void send(List<? extends NeedReplyMessage> msgs, int parallelLevel, CloudBusSteppingCallback callback);
}