博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊storm的ack机制
阅读量:6485 次
发布时间:2019-06-23

本文共 25219 字,大约阅读时间需要 84 分钟。

本文主要研究一下storm的ack机制

实例

SentenceSpout

public class AckSentenceSpout extends BaseRichSpout {    private ConcurrentHashMap
pending; private SpoutOutputCollector collector; private int index = 0; private String[] sentences = { "my dog has fleas", "i like cold beverages", "the dog ate my homework", "don't have a cow man", "i don't think i like fleas" }; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; this.pending = new ConcurrentHashMap
(); } @Override public void nextTuple() { Values values = new Values(sentences[index]); UUID msgId = UUID.randomUUID(); this.pending.put(msgId, values);// this.collector.emit(values); //NOTE 这里要传入msgId this.collector.emit(values, msgId); index++; if (index >= sentences.length) { index = 0; } Utils.sleep(100); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } @Override public void ack(Object msgId) { this.pending.remove(msgId); } //NOTE 对于ack是失败的,要重新发送 @Override public void fail(Object msgId) { this.collector.emit(this.pending.get(msgId), msgId); }}复制代码
  • 对spout来说,需要在emit的时候要指定msgId,然后需要缓存数据,在ack时删除,在fail的时候重新发送进行重试

AckWordCountBolt

public class AckWordCountBolt extends BaseRichBolt {    private static final Logger LOGGER = LoggerFactory.getLogger(AckWordCountBolt.class);    private OutputCollector collector;    private HashMap
counts = null; public void prepare(Map config, TopologyContext context, OutputCollector collector) { this.collector = collector; this.counts = new HashMap
(); } public void execute(Tuple tuple) { try{ String word = tuple.getStringByField("word"); Long count = this.counts.get(word); if(count == null){ count = 0L; } count++; this.counts.put(word, count); //NOTE 传入当前处理的tuple作为anchor this.collector.emit(tuple, new Values(word, count)); //NOTE 这里要自己ack this.collector.ack(tuple); }catch (Exception e){ LOGGER.error(e.getMessage(),e); //NOTE 处理异常要fail this.collector.fail(tuple); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); }}复制代码
  • 对于bolt来说,要做两件事情,一是要anchor,在emit的时候把输入及输出tuple连接起来,构建tuple tree;而要对处理完的tuple进行ack,失败进行fail操作

源码解析

SpoutOutputCollectorImpl.emit

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java

@Override    public List
emit(String streamId, List
tuple, Object messageId) { try { return sendSpoutMsg(streamId, tuple, messageId, null); } catch (InterruptedException e) { LOG.warn("Spout thread interrupted during emit()."); throw new RuntimeException(e); } } private List
sendSpoutMsg(String stream, List
values, Object messageId, Integer outTaskId) throws InterruptedException { emittedCount.increment(); List
outTasks; if (outTaskId != null) { outTasks = taskData.getOutgoingTasks(outTaskId, stream, values); } else { outTasks = taskData.getOutgoingTasks(stream, values); } final boolean needAck = (messageId != null) && hasAckers; final List
ackSeq = needAck ? new ArrayList<>() : null; final long rootId = needAck ? MessageId.generateId(random) : 0; for (int i = 0; i < outTasks.size(); i++) { // perf critical path. don't use iterators. Integer t = outTasks.get(i); MessageId msgId; if (needAck) { long as = MessageId.generateId(random); msgId = MessageId.makeRootId(rootId, as); ackSeq.add(as); } else { msgId = MessageId.makeUnanchored(); } final TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), values, executor.getComponentId(), this.taskId, stream, msgId); AddressedTuple adrTuple = new AddressedTuple(t, tuple); executor.getExecutorTransfer().tryTransfer(adrTuple, executor.getPendingEmits()); } if (isEventLoggers) { taskData.sendToEventLogger(executor, values, executor.getComponentId(), messageId, random, executor.getPendingEmits()); } if (needAck) { boolean sample = executor.samplerCheck(); TupleInfo info = new TupleInfo(); info.setTaskId(this.taskId); info.setStream(stream); info.setMessageId(messageId); if (isDebug) { info.setValues(values); } if (sample) { info.setTimestamp(System.currentTimeMillis()); } pending.put(rootId, info); List
ackInitTuple = new Values(rootId, Utils.bitXorVals(ackSeq), this.taskId); taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits()); } else if (messageId != null) { // Reusing TupleInfo object as we directly call executor.ackSpoutMsg() & are not sending msgs. perf critical if (isDebug) { if (spoutExecutorThdId != Thread.currentThread().getId()) { throw new RuntimeException("Detected background thread emitting tuples for the spout. " + "Spout Output Collector should only emit from the main spout executor thread."); } } globalTupleInfo.clear(); globalTupleInfo.setStream(stream); globalTupleInfo.setValues(values); globalTupleInfo.setMessageId(messageId); globalTupleInfo.setTimestamp(0); globalTupleInfo.setId("0:"); Long timeDelta = 0L; executor.ackSpoutMsg(executor, taskData, timeDelta, globalTupleInfo); } return outTasks; }复制代码
  • 对于needAck的,首先创建rootId,然后调用ackSeq.add(as),之后触发taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits())操作

BoltOutputCollectorImpl.ack&fail

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java

@Override    public void ack(Tuple input) {        if (!ackingEnabled) {            return;        }        long ackValue = ((TupleImpl) input).getAckVal();        Map
anchorsToIds = input.getMessageId().getAnchorsToIds(); for (Map.Entry
entry : anchorsToIds.entrySet()) { task.sendUnanchored(Acker.ACKER_ACK_STREAM_ID, new Values(entry.getKey(), Utils.bitXor(entry.getValue(), ackValue)), executor.getExecutorTransfer(), executor.getPendingEmits()); } long delta = tupleTimeDelta((TupleImpl) input); if (isDebug) { LOG.info("BOLT ack TASK: {} TIME: {} TUPLE: {}", taskId, delta, input); } if (!task.getUserContext().getHooks().isEmpty()) { BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta); boltAckInfo.applyOn(task.getUserContext()); } if (delta >= 0) { executor.getStats().boltAckedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta, task.getTaskMetrics().getAcked(input.getSourceStreamId())); } } @Override public void fail(Tuple input) { if (!ackingEnabled) { return; } Set
roots = input.getMessageId().getAnchors(); for (Long root : roots) { task.sendUnanchored(Acker.ACKER_FAIL_STREAM_ID, new Values(root), executor.getExecutorTransfer(), executor.getPendingEmits()); } long delta = tupleTimeDelta((TupleImpl) input); if (isDebug) { LOG.info("BOLT fail TASK: {} TIME: {} TUPLE: {}", taskId, delta, input); } BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta); boltFailInfo.applyOn(task.getUserContext()); if (delta >= 0) { executor.getStats().boltFailedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta, task.getTaskMetrics().getFailed(input.getSourceStreamId())); } }复制代码
  • BoltOutputCollectorImpl的ack及fail均是调用task.sendUnanchored操作
  • ack发送到Acker.ACKER_ACK_STREAM_ID,fail发送到Acker.ACKER_FAIL_STREAM_ID

Task.sendUnanchored

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Task.java

// Non Blocking call. If cannot emit to destination immediately, such tuples will be added to `pendingEmits` argument    public void sendUnanchored(String stream, List values, ExecutorTransfer transfer, Queue
pendingEmits) { Tuple tuple = getTuple(stream, values); List
tasks = getOutgoingTasks(stream, values); for (Integer t : tasks) { AddressedTuple addressedTuple = new AddressedTuple(t, tuple); transfer.tryTransfer(addressedTuple, pendingEmits); } }复制代码
  • 这里调用了ExecutorTransfer.tryTransfer

ExecutorTransfer.tryTransfer

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java

// adds addressedTuple to destination Q if it is not full. else adds to pendingEmits (if its not null)    public boolean tryTransfer(AddressedTuple addressedTuple, Queue
pendingEmits) { if (isDebug) { LOG.info("TRANSFERRING tuple {}", addressedTuple); } JCQueue localQueue = getLocalQueue(addressedTuple); if (localQueue != null) { return tryTransferLocal(addressedTuple, localQueue, pendingEmits); } return workerData.tryTransferRemote(addressedTuple, pendingEmits, serializer); } /** * Adds tuple to localQueue (if overflow is empty). If localQueue is full adds to pendingEmits instead. pendingEmits can be null. * Returns false if unable to add to localQueue. */ public boolean tryTransferLocal(AddressedTuple tuple, JCQueue localQueue, Queue
pendingEmits) { workerData.checkSerialize(serializer, tuple); if (pendingEmits != null) { if (pendingEmits.isEmpty() && localQueue.tryPublish(tuple)) { queuesToFlush.set(tuple.dest - indexingBase, localQueue); return true; } else { pendingEmits.add(tuple); return false; } } else { return localQueue.tryPublish(tuple); } }复制代码
  • 这里先根据addressedTuple判断目标队列是否是本地,是的话,调用tryTransferLocal;不是的话,则调用workerData.tryTransferRemote
  • tryTransferLocal操作,执行的localQueue.tryPublish,就是将数据放到JCQueue的recvQueue队列中
  • workerData.tryTransferRemote的话,是通过WorkerTransfer将数据放到TransferDrainer,在flush的时候传输到远程的node节点

StormCommon.systemTopology

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java

public static StormTopology systemTopology(Map
topoConf, StormTopology topology) throws InvalidTopologyException { return _instance.systemTopologyImpl(topoConf, topology); } protected StormTopology systemTopologyImpl(Map
topoConf, StormTopology topology) throws InvalidTopologyException { validateBasic(topology); StormTopology ret = topology.deepCopy(); addAcker(topoConf, ret); if (hasEventLoggers(topoConf)) { addEventLogger(topoConf, ret); } addMetricComponents(topoConf, ret); addSystemComponents(topoConf, ret); addMetricStreams(ret); addSystemStreams(ret); validateStructure(ret); return ret; } public static void addAcker(Map
conf, StormTopology topology) { int ackerNum = ObjectReader.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS))); Map
inputs = ackerInputs(topology); Map
outputStreams = new HashMap
(); outputStreams.put(Acker.ACKER_ACK_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms"))); outputStreams.put(Acker.ACKER_FAIL_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms"))); outputStreams.put(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms"))); Map
ackerConf = new HashMap<>(); ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum); ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS))); Bolt acker = Thrift.prepareSerializedBoltDetails(inputs, makeAckerBolt(), outputStreams, ackerNum, ackerConf); for (Bolt bolt : topology.get_bolts().values()) { ComponentCommon common = bolt.get_common(); common.put_to_streams(Acker.ACKER_ACK_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "ack-val"))); common.put_to_streams(Acker.ACKER_FAIL_STREAM_ID, Thrift.outputFields(Arrays.asList("id"))); common.put_to_streams(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.outputFields(Arrays.asList("id"))); } for (SpoutSpec spout : topology.get_spouts().values()) { ComponentCommon common = spout.get_common(); Map
spoutConf = componentConf(spout); spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS))); common.set_json_conf(JSONValue.toJSONString(spoutConf)); common.put_to_streams(Acker.ACKER_INIT_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task"))); common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_ACK_STREAM_ID), Thrift.prepareDirectGrouping()); common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_FAIL_STREAM_ID), Thrift.prepareDirectGrouping()); common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_RESET_TIMEOUT_STREAM_ID), Thrift.prepareDirectGrouping()); } topology.put_to_bolts(Acker.ACKER_COMPONENT_ID, acker); } public static Map
ackerInputs(StormTopology topology) { Map
inputs = new HashMap<>(); Set
boltIds = topology.get_bolts().keySet(); Set
spoutIds = topology.get_spouts().keySet(); for (String id : spoutIds) { inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_INIT_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id"))); } for (String id : boltIds) { inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_ACK_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id"))); inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_FAIL_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id"))); inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_RESET_TIMEOUT_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id"))); } return inputs; } public static IBolt makeAckerBolt() { return _instance.makeAckerBoltImpl(); } public IBolt makeAckerBoltImpl() { return new Acker(); }复制代码
  • WorkerState构造器里头调用了systemTopology方法,添加了一些系统的组件,比如Acker、MetricsConsumerBolt、SystemBolt
  • addAcker执行了创建ack的逻辑,ackerNum为ObjectReader.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS))),即如果Config.TOPOLOGY_ACKER_EXECUTORS没有配置,则取Config.TOPOLOGY_WORKERS的值
  • 这里对ack配置了Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,值为ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)),也就是Acker配置了tickTuple,Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS的时候触发超时操作
  • Thrift.prepareSerializedBoltDetails传入参数的时候,调用makeAckerBolt()方法,创建Acker
  • ack里头对input及output配置了Acker.ACKER_ACK_STREAM_ID、Acker.ACKER_FAIL_STREAM_ID
  • addAcker对spout配置了Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,Acker.ACKER_ACK_STREAM_ID、Acker.ACKER_FAIL_STREAM_ID、Acker.ACKER_RESET_TIMEOUT_STREAM_ID

Acker

storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Acker.java

public class Acker implements IBolt {    public static final String ACKER_COMPONENT_ID = "__acker";    public static final String ACKER_INIT_STREAM_ID = "__ack_init";    public static final String ACKER_ACK_STREAM_ID = "__ack_ack";    public static final String ACKER_FAIL_STREAM_ID = "__ack_fail";    public static final String ACKER_RESET_TIMEOUT_STREAM_ID = "__ack_reset_timeout";    public static final int TIMEOUT_BUCKET_NUM = 3;    private static final Logger LOG = LoggerFactory.getLogger(Acker.class);    private static final long serialVersionUID = 4430906880683183091L;    private OutputCollector collector;    private RotatingMap
pending; @Override public void prepare(Map
topoConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.pending = new RotatingMap<>(TIMEOUT_BUCKET_NUM); } @Override public void execute(Tuple input) { if (TupleUtils.isTick(input)) { Map
tmp = pending.rotate(); LOG.debug("Number of timeout tuples:{}", tmp.size()); return; } boolean resetTimeout = false; String streamId = input.getSourceStreamId(); Object id = input.getValue(0); AckObject curr = pending.get(id); if (ACKER_INIT_STREAM_ID.equals(streamId)) { if (curr == null) { curr = new AckObject(); pending.put(id, curr); } curr.updateAck(input.getLong(1)); curr.spoutTask = input.getInteger(2); } else if (ACKER_ACK_STREAM_ID.equals(streamId)) { if (curr == null) { curr = new AckObject(); pending.put(id, curr); } curr.updateAck(input.getLong(1)); } else if (ACKER_FAIL_STREAM_ID.equals(streamId)) { // For the case that ack_fail message arrives before ack_init if (curr == null) { curr = new AckObject(); } curr.failed = true; pending.put(id, curr); } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) { resetTimeout = true; if (curr != null) { pending.put(id, curr); } //else if it has not been added yet, there is no reason time it out later on } else if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) { collector.flush(); return; } else { LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask()); return; } int task = curr.spoutTask; if (task >= 0 && (curr.val == 0 || curr.failed || resetTimeout)) { Values tuple = new Values(id, getTimeDeltaMillis(curr.startTime)); if (curr.val == 0) { pending.remove(id); collector.emitDirect(task, ACKER_ACK_STREAM_ID, tuple); } else if (curr.failed) { pending.remove(id); collector.emitDirect(task, ACKER_FAIL_STREAM_ID, tuple); } else if (resetTimeout) { collector.emitDirect(task, ACKER_RESET_TIMEOUT_STREAM_ID, tuple); } else { throw new IllegalStateException("The checks are inconsistent we reach what should be unreachable code."); } } collector.ack(input); } @Override public void cleanup() { LOG.info("Acker: cleanup successfully"); } private long getTimeDeltaMillis(long startTimeMillis) { return Time.currentTimeMillis() - startTimeMillis; } private static class AckObject { public long val = 0L; public long startTime = Time.currentTimeMillis(); public int spoutTask = -1; public boolean failed = false; // val xor value public void updateAck(Long value) { val = Utils.bitXor(val, value); } }}复制代码
  • 对于tickTuple,执行RotatingMap.rotate操作
  • 对于成功则调用AckObject的updateAck操作,对于失败的重新放回pending中
  • 最后判断,如果AckObject的val为0的话,表示整个tuple tree都操作成功,则往ACKER_ACK_STREAM_ID通知;如果是failed的则往ACKER_FAIL_STREAM_ID通知;如果是resetTimeout的则往ACKER_RESET_TIMEOUT_STREAM_ID通知

SpoutExecutor

storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java

public class SpoutExecutor extends Executor {	//......    @Override    public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception {        String streamId = tuple.getSourceStreamId();        if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) {            spoutOutputCollector.flush();        } else if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) {            pending.rotate();        } else if (streamId.equals(Constants.METRICS_TICK_STREAM_ID)) {            metricsTick(idToTask.get(taskId - idToTaskBase), tuple);        } else if (streamId.equals(Constants.CREDENTIALS_CHANGED_STREAM_ID)) {            Object spoutObj = idToTask.get(taskId - idToTaskBase).getTaskObject();            if (spoutObj instanceof ICredentialsListener) {                ((ICredentialsListener) spoutObj).setCredentials((Map
) tuple.getValue(0)); } } else if (streamId.equals(Acker.ACKER_RESET_TIMEOUT_STREAM_ID)) { Long id = (Long) tuple.getValue(0); TupleInfo pendingForId = pending.get(id); if (pendingForId != null) { pending.put(id, pendingForId); } } else { Long id = (Long) tuple.getValue(0); Long timeDeltaMs = (Long) tuple.getValue(1); TupleInfo tupleInfo = pending.remove(id); if (tupleInfo != null && tupleInfo.getMessageId() != null) { if (taskId != tupleInfo.getTaskId()) { throw new RuntimeException("Fatal error, mismatched task ids: " + taskId + " " + tupleInfo.getTaskId()); } Long timeDelta = null; if (hasAckers) { long startTimeMs = tupleInfo.getTimestamp(); if (startTimeMs != 0) { timeDelta = timeDeltaMs; } } if (streamId.equals(Acker.ACKER_ACK_STREAM_ID)) { ackSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo); } else if (streamId.equals(Acker.ACKER_FAIL_STREAM_ID)) { failSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo, "FAIL-STREAM"); } } } } public void ackSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo) { try { ISpout spout = (ISpout) taskData.getTaskObject(); int taskId = taskData.getTaskId(); if (executor.getIsDebug()) { LOG.info("SPOUT Acking message {} {}", tupleInfo.getId(), tupleInfo.getMessageId()); } spout.ack(tupleInfo.getMessageId()); if (!taskData.getUserContext().getHooks().isEmpty()) { // avoid allocating SpoutAckInfo obj if not necessary new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext()); } if (hasAckers && timeDelta != null) { executor.getStats().spoutAckedTuple(tupleInfo.getStream(), timeDelta, taskData.getTaskMetrics().getAcked(tupleInfo.getStream())); } } catch (Exception e) { throw Utils.wrapInRuntime(e); } } public void failSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo, String reason) { try { ISpout spout = (ISpout) taskData.getTaskObject(); int taskId = taskData.getTaskId(); if (executor.getIsDebug()) { LOG.info("SPOUT Failing {} : {} REASON: {}", tupleInfo.getId(), tupleInfo, reason); } spout.fail(tupleInfo.getMessageId()); new SpoutFailInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext()); if (timeDelta != null) { executor.getStats().spoutFailedTuple(tupleInfo.getStream(), timeDelta, taskData.getTaskMetrics().getFailed(tupleInfo.getStream())); } } catch (Exception e) { throw Utils.wrapInRuntime(e); } }}复制代码
  • SpoutExecutor在tupleActionFn里头,如果接收到ACKER_ACK_STREAM_ID,则进行ackSpoutMsg操作;如果接收到ACKER_FAIL_STREAM_ID,则进行failSpoutMsg操作
  • SpoutExecutor的ackSpoutMsg及failSpoutMsg里头分别调用了具体spout的ack及fail方法,将ack的结果通知到原始的spout

小结

  • storm通过ack机制保证least once processing的语义
  • storm在WorkerState构造器里头调用了systemTopology方法,对提交的topology添加了一些系统的组件,比如Acker、MetricsConsumerBolt、SystemBolt;addAcker里头添加了acker,也对spout进行了ack相关的配置
  • spout的emit方法如果带messageId的话,则表示需要ack,然后会触发taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits())操作
  • bolt通过BoltOutputCollectorImpl的ack或fail方法将ack信息发送出去,里头调用了task.sendUnanchored操作,而该操作是调用ExecutorTransfer.tryTransfer,将addressedTuple发送到目标队列(如果是远程node则会远程进行远程调用),发送到的stream为Acker.ACKER_ACK_STREAM_ID或者Acker.ACKER_FAIL_STREAM_ID
  • acker接收到Acker.ACKER_ACK_STREAM_ID调用AckObject的updateAck操作,对于Acker.ACKER_FAIL_STREAM_ID则重新放回pending中,然后对AckObject的val进行判断,如果为0的话,表示整个tuple tree都操作成功,则emitDirect往ACKER_ACK_STREAM_ID通知;如果是failed的则emitDirect往ACKER_FAIL_STREAM_ID通知对应的task;如果是resetTimeout的则往ACKER_RESET_TIMEOUT_STREAM_ID通知对应的task
  • SpoutExecutor接收到接收到ACKER_ACK_STREAM_ID,则进行ackSpoutMsg操作;接收到ACKER_FAIL_STREAM_ID,则进行failSpoutMsg操作;ackSpoutMsg及failSpoutMsg里头分别调用了具体spout的ack及fail方法,将ack的结果通知到原始的spout

doc

转载地址:http://uwsuo.baihongyu.com/

你可能感兴趣的文章
安卓Glide(4.7.1)使用笔记 01 - 引入项目
查看>>
中金易云:为出版社找到下一本《解忧杂货店》
查看>>
Flex布局
查看>>
Material Design之 AppbarLayout 开发实践总结
查看>>
Flutter之MaterialApp使用详解
查看>>
DataBinding最全使用说明
查看>>
原生Js交互之DSBridge
查看>>
Matlab编程之——卷积神经网络CNN代码解析
查看>>
三篇文章了解 TiDB 技术内幕 —— 说计算
查看>>
copy strong weak assign的区别
查看>>
OpenCV 入门
查看>>
css 3D transform变换
查看>>
ele表格合并行之后的selection选中
查看>>
正则表达式分解剖析(一文悟透正则表达式)
查看>>
解决UILable标点符号居中的问题
查看>>
HTML5新特性教程
查看>>
ImageOptim-无损图片压缩Mac版
查看>>
传统运维团队转型应该注意哪些问题?
查看>>
JavaScript函数(二)
查看>>
Airbnb改进部署管道安全性,规范部署顺序
查看>>