序
本文主要研究一下storm的ack机制
实例
SentenceSpout
public class AckSentenceSpout extends BaseRichSpout { private ConcurrentHashMappending; private SpoutOutputCollector collector; private int index = 0; private String[] sentences = { "my dog has fleas", "i like cold beverages", "the dog ate my homework", "don't have a cow man", "i don't think i like fleas" }; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; this.pending = new ConcurrentHashMap (); } @Override public void nextTuple() { Values values = new Values(sentences[index]); UUID msgId = UUID.randomUUID(); this.pending.put(msgId, values);// this.collector.emit(values); //NOTE 这里要传入msgId this.collector.emit(values, msgId); index++; if (index >= sentences.length) { index = 0; } Utils.sleep(100); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); } @Override public void ack(Object msgId) { this.pending.remove(msgId); } //NOTE 对于ack是失败的,要重新发送 @Override public void fail(Object msgId) { this.collector.emit(this.pending.get(msgId), msgId); }}复制代码
- 对spout来说,需要在emit的时候要指定msgId,然后需要缓存数据,在ack时删除,在fail的时候重新发送进行重试
AckWordCountBolt
public class AckWordCountBolt extends BaseRichBolt { private static final Logger LOGGER = LoggerFactory.getLogger(AckWordCountBolt.class); private OutputCollector collector; private HashMapcounts = null; public void prepare(Map config, TopologyContext context, OutputCollector collector) { this.collector = collector; this.counts = new HashMap (); } public void execute(Tuple tuple) { try{ String word = tuple.getStringByField("word"); Long count = this.counts.get(word); if(count == null){ count = 0L; } count++; this.counts.put(word, count); //NOTE 传入当前处理的tuple作为anchor this.collector.emit(tuple, new Values(word, count)); //NOTE 这里要自己ack this.collector.ack(tuple); }catch (Exception e){ LOGGER.error(e.getMessage(),e); //NOTE 处理异常要fail this.collector.fail(tuple); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); }}复制代码
- 对于bolt来说,要做两件事情,一是要anchor,在emit的时候把输入及输出tuple连接起来,构建tuple tree;而要对处理完的tuple进行ack,失败进行fail操作
源码解析
SpoutOutputCollectorImpl.emit
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
@Override public Listemit(String streamId, List
- 对于needAck的,首先创建rootId,然后调用ackSeq.add(as),之后触发taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits())操作
BoltOutputCollectorImpl.ack&fail
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
@Override public void ack(Tuple input) { if (!ackingEnabled) { return; } long ackValue = ((TupleImpl) input).getAckVal(); MapanchorsToIds = input.getMessageId().getAnchorsToIds(); for (Map.Entry entry : anchorsToIds.entrySet()) { task.sendUnanchored(Acker.ACKER_ACK_STREAM_ID, new Values(entry.getKey(), Utils.bitXor(entry.getValue(), ackValue)), executor.getExecutorTransfer(), executor.getPendingEmits()); } long delta = tupleTimeDelta((TupleImpl) input); if (isDebug) { LOG.info("BOLT ack TASK: {} TIME: {} TUPLE: {}", taskId, delta, input); } if (!task.getUserContext().getHooks().isEmpty()) { BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta); boltAckInfo.applyOn(task.getUserContext()); } if (delta >= 0) { executor.getStats().boltAckedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta, task.getTaskMetrics().getAcked(input.getSourceStreamId())); } } @Override public void fail(Tuple input) { if (!ackingEnabled) { return; } Set roots = input.getMessageId().getAnchors(); for (Long root : roots) { task.sendUnanchored(Acker.ACKER_FAIL_STREAM_ID, new Values(root), executor.getExecutorTransfer(), executor.getPendingEmits()); } long delta = tupleTimeDelta((TupleImpl) input); if (isDebug) { LOG.info("BOLT fail TASK: {} TIME: {} TUPLE: {}", taskId, delta, input); } BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta); boltFailInfo.applyOn(task.getUserContext()); if (delta >= 0) { executor.getStats().boltFailedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta, task.getTaskMetrics().getFailed(input.getSourceStreamId())); } }复制代码
- BoltOutputCollectorImpl的ack及fail均是调用task.sendUnanchored操作
- ack发送到Acker.ACKER_ACK_STREAM_ID,fail发送到Acker.ACKER_FAIL_STREAM_ID
Task.sendUnanchored
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Task.java
// Non Blocking call. If cannot emit to destination immediately, such tuples will be added to `pendingEmits` argument public void sendUnanchored(String stream, Listvalues, ExecutorTransfer transfer, Queue pendingEmits) { Tuple tuple = getTuple(stream, values); List tasks = getOutgoingTasks(stream, values); for (Integer t : tasks) { AddressedTuple addressedTuple = new AddressedTuple(t, tuple); transfer.tryTransfer(addressedTuple, pendingEmits); } }复制代码
- 这里调用了ExecutorTransfer.tryTransfer
ExecutorTransfer.tryTransfer
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java
// adds addressedTuple to destination Q if it is not full. else adds to pendingEmits (if its not null) public boolean tryTransfer(AddressedTuple addressedTuple, QueuependingEmits) { if (isDebug) { LOG.info("TRANSFERRING tuple {}", addressedTuple); } JCQueue localQueue = getLocalQueue(addressedTuple); if (localQueue != null) { return tryTransferLocal(addressedTuple, localQueue, pendingEmits); } return workerData.tryTransferRemote(addressedTuple, pendingEmits, serializer); } /** * Adds tuple to localQueue (if overflow is empty). If localQueue is full adds to pendingEmits instead. pendingEmits can be null. * Returns false if unable to add to localQueue. */ public boolean tryTransferLocal(AddressedTuple tuple, JCQueue localQueue, Queue pendingEmits) { workerData.checkSerialize(serializer, tuple); if (pendingEmits != null) { if (pendingEmits.isEmpty() && localQueue.tryPublish(tuple)) { queuesToFlush.set(tuple.dest - indexingBase, localQueue); return true; } else { pendingEmits.add(tuple); return false; } } else { return localQueue.tryPublish(tuple); } }复制代码
- 这里先根据addressedTuple判断目标队列是否是本地,是的话,调用tryTransferLocal;不是的话,则调用workerData.tryTransferRemote
- tryTransferLocal操作,执行的localQueue.tryPublish,就是将数据放到JCQueue的recvQueue队列中
- workerData.tryTransferRemote的话,是通过WorkerTransfer将数据放到TransferDrainer,在flush的时候传输到远程的node节点
StormCommon.systemTopology
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
public static StormTopology systemTopology(MaptopoConf, StormTopology topology) throws InvalidTopologyException { return _instance.systemTopologyImpl(topoConf, topology); } protected StormTopology systemTopologyImpl(Map topoConf, StormTopology topology) throws InvalidTopologyException { validateBasic(topology); StormTopology ret = topology.deepCopy(); addAcker(topoConf, ret); if (hasEventLoggers(topoConf)) { addEventLogger(topoConf, ret); } addMetricComponents(topoConf, ret); addSystemComponents(topoConf, ret); addMetricStreams(ret); addSystemStreams(ret); validateStructure(ret); return ret; } public static void addAcker(Map conf, StormTopology topology) { int ackerNum = ObjectReader.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS))); Map inputs = ackerInputs(topology); Map outputStreams = new HashMap (); outputStreams.put(Acker.ACKER_ACK_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms"))); outputStreams.put(Acker.ACKER_FAIL_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms"))); outputStreams.put(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.directOutputFields(Arrays.asList("id", "time-delta-ms"))); Map ackerConf = new HashMap<>(); ackerConf.put(Config.TOPOLOGY_TASKS, ackerNum); ackerConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS))); Bolt acker = Thrift.prepareSerializedBoltDetails(inputs, makeAckerBolt(), outputStreams, ackerNum, ackerConf); for (Bolt bolt : topology.get_bolts().values()) { ComponentCommon common = bolt.get_common(); common.put_to_streams(Acker.ACKER_ACK_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "ack-val"))); common.put_to_streams(Acker.ACKER_FAIL_STREAM_ID, Thrift.outputFields(Arrays.asList("id"))); common.put_to_streams(Acker.ACKER_RESET_TIMEOUT_STREAM_ID, Thrift.outputFields(Arrays.asList("id"))); } for (SpoutSpec spout : topology.get_spouts().values()) { ComponentCommon common = spout.get_common(); Map spoutConf = componentConf(spout); spoutConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS))); common.set_json_conf(JSONValue.toJSONString(spoutConf)); common.put_to_streams(Acker.ACKER_INIT_STREAM_ID, Thrift.outputFields(Arrays.asList("id", "init-val", "spout-task"))); common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_ACK_STREAM_ID), Thrift.prepareDirectGrouping()); common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_FAIL_STREAM_ID), Thrift.prepareDirectGrouping()); common.put_to_inputs(Utils.getGlobalStreamId(Acker.ACKER_COMPONENT_ID, Acker.ACKER_RESET_TIMEOUT_STREAM_ID), Thrift.prepareDirectGrouping()); } topology.put_to_bolts(Acker.ACKER_COMPONENT_ID, acker); } public static Map ackerInputs(StormTopology topology) { Map inputs = new HashMap<>(); Set boltIds = topology.get_bolts().keySet(); Set spoutIds = topology.get_spouts().keySet(); for (String id : spoutIds) { inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_INIT_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id"))); } for (String id : boltIds) { inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_ACK_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id"))); inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_FAIL_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id"))); inputs.put(Utils.getGlobalStreamId(id, Acker.ACKER_RESET_TIMEOUT_STREAM_ID), Thrift.prepareFieldsGrouping(Arrays.asList("id"))); } return inputs; } public static IBolt makeAckerBolt() { return _instance.makeAckerBoltImpl(); } public IBolt makeAckerBoltImpl() { return new Acker(); }复制代码
- WorkerState构造器里头调用了systemTopology方法,添加了一些系统的组件,比如Acker、MetricsConsumerBolt、SystemBolt
- addAcker执行了创建ack的逻辑,ackerNum为ObjectReader.getInt(conf.get(Config.TOPOLOGY_ACKER_EXECUTORS), ObjectReader.getInt(conf.get(Config.TOPOLOGY_WORKERS))),即如果Config.TOPOLOGY_ACKER_EXECUTORS没有配置,则取Config.TOPOLOGY_WORKERS的值
- 这里对ack配置了Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,值为ObjectReader.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)),也就是Acker配置了tickTuple,Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS的时候触发超时操作
- Thrift.prepareSerializedBoltDetails传入参数的时候,调用makeAckerBolt()方法,创建Acker
- ack里头对input及output配置了Acker.ACKER_ACK_STREAM_ID、Acker.ACKER_FAIL_STREAM_ID
- addAcker对spout配置了Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,Acker.ACKER_ACK_STREAM_ID、Acker.ACKER_FAIL_STREAM_ID、Acker.ACKER_RESET_TIMEOUT_STREAM_ID
Acker
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/Acker.java
public class Acker implements IBolt { public static final String ACKER_COMPONENT_ID = "__acker"; public static final String ACKER_INIT_STREAM_ID = "__ack_init"; public static final String ACKER_ACK_STREAM_ID = "__ack_ack"; public static final String ACKER_FAIL_STREAM_ID = "__ack_fail"; public static final String ACKER_RESET_TIMEOUT_STREAM_ID = "__ack_reset_timeout"; public static final int TIMEOUT_BUCKET_NUM = 3; private static final Logger LOG = LoggerFactory.getLogger(Acker.class); private static final long serialVersionUID = 4430906880683183091L; private OutputCollector collector; private RotatingMappending; @Override public void prepare(Map topoConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.pending = new RotatingMap<>(TIMEOUT_BUCKET_NUM); } @Override public void execute(Tuple input) { if (TupleUtils.isTick(input)) { Map tmp = pending.rotate(); LOG.debug("Number of timeout tuples:{}", tmp.size()); return; } boolean resetTimeout = false; String streamId = input.getSourceStreamId(); Object id = input.getValue(0); AckObject curr = pending.get(id); if (ACKER_INIT_STREAM_ID.equals(streamId)) { if (curr == null) { curr = new AckObject(); pending.put(id, curr); } curr.updateAck(input.getLong(1)); curr.spoutTask = input.getInteger(2); } else if (ACKER_ACK_STREAM_ID.equals(streamId)) { if (curr == null) { curr = new AckObject(); pending.put(id, curr); } curr.updateAck(input.getLong(1)); } else if (ACKER_FAIL_STREAM_ID.equals(streamId)) { // For the case that ack_fail message arrives before ack_init if (curr == null) { curr = new AckObject(); } curr.failed = true; pending.put(id, curr); } else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) { resetTimeout = true; if (curr != null) { pending.put(id, curr); } //else if it has not been added yet, there is no reason time it out later on } else if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) { collector.flush(); return; } else { LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask()); return; } int task = curr.spoutTask; if (task >= 0 && (curr.val == 0 || curr.failed || resetTimeout)) { Values tuple = new Values(id, getTimeDeltaMillis(curr.startTime)); if (curr.val == 0) { pending.remove(id); collector.emitDirect(task, ACKER_ACK_STREAM_ID, tuple); } else if (curr.failed) { pending.remove(id); collector.emitDirect(task, ACKER_FAIL_STREAM_ID, tuple); } else if (resetTimeout) { collector.emitDirect(task, ACKER_RESET_TIMEOUT_STREAM_ID, tuple); } else { throw new IllegalStateException("The checks are inconsistent we reach what should be unreachable code."); } } collector.ack(input); } @Override public void cleanup() { LOG.info("Acker: cleanup successfully"); } private long getTimeDeltaMillis(long startTimeMillis) { return Time.currentTimeMillis() - startTimeMillis; } private static class AckObject { public long val = 0L; public long startTime = Time.currentTimeMillis(); public int spoutTask = -1; public boolean failed = false; // val xor value public void updateAck(Long value) { val = Utils.bitXor(val, value); } }}复制代码
- 对于tickTuple,执行RotatingMap.rotate操作
- 对于成功则调用AckObject的updateAck操作,对于失败的重新放回pending中
- 最后判断,如果AckObject的val为0的话,表示整个tuple tree都操作成功,则往ACKER_ACK_STREAM_ID通知;如果是failed的则往ACKER_FAIL_STREAM_ID通知;如果是resetTimeout的则往ACKER_RESET_TIMEOUT_STREAM_ID通知
SpoutExecutor
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
public class SpoutExecutor extends Executor { //...... @Override public void tupleActionFn(int taskId, TupleImpl tuple) throws Exception { String streamId = tuple.getSourceStreamId(); if (Constants.SYSTEM_FLUSH_STREAM_ID.equals(streamId)) { spoutOutputCollector.flush(); } else if (streamId.equals(Constants.SYSTEM_TICK_STREAM_ID)) { pending.rotate(); } else if (streamId.equals(Constants.METRICS_TICK_STREAM_ID)) { metricsTick(idToTask.get(taskId - idToTaskBase), tuple); } else if (streamId.equals(Constants.CREDENTIALS_CHANGED_STREAM_ID)) { Object spoutObj = idToTask.get(taskId - idToTaskBase).getTaskObject(); if (spoutObj instanceof ICredentialsListener) { ((ICredentialsListener) spoutObj).setCredentials((Map) tuple.getValue(0)); } } else if (streamId.equals(Acker.ACKER_RESET_TIMEOUT_STREAM_ID)) { Long id = (Long) tuple.getValue(0); TupleInfo pendingForId = pending.get(id); if (pendingForId != null) { pending.put(id, pendingForId); } } else { Long id = (Long) tuple.getValue(0); Long timeDeltaMs = (Long) tuple.getValue(1); TupleInfo tupleInfo = pending.remove(id); if (tupleInfo != null && tupleInfo.getMessageId() != null) { if (taskId != tupleInfo.getTaskId()) { throw new RuntimeException("Fatal error, mismatched task ids: " + taskId + " " + tupleInfo.getTaskId()); } Long timeDelta = null; if (hasAckers) { long startTimeMs = tupleInfo.getTimestamp(); if (startTimeMs != 0) { timeDelta = timeDeltaMs; } } if (streamId.equals(Acker.ACKER_ACK_STREAM_ID)) { ackSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo); } else if (streamId.equals(Acker.ACKER_FAIL_STREAM_ID)) { failSpoutMsg(this, idToTask.get(taskId - idToTaskBase), timeDelta, tupleInfo, "FAIL-STREAM"); } } } } public void ackSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo) { try { ISpout spout = (ISpout) taskData.getTaskObject(); int taskId = taskData.getTaskId(); if (executor.getIsDebug()) { LOG.info("SPOUT Acking message {} {}", tupleInfo.getId(), tupleInfo.getMessageId()); } spout.ack(tupleInfo.getMessageId()); if (!taskData.getUserContext().getHooks().isEmpty()) { // avoid allocating SpoutAckInfo obj if not necessary new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext()); } if (hasAckers && timeDelta != null) { executor.getStats().spoutAckedTuple(tupleInfo.getStream(), timeDelta, taskData.getTaskMetrics().getAcked(tupleInfo.getStream())); } } catch (Exception e) { throw Utils.wrapInRuntime(e); } } public void failSpoutMsg(SpoutExecutor executor, Task taskData, Long timeDelta, TupleInfo tupleInfo, String reason) { try { ISpout spout = (ISpout) taskData.getTaskObject(); int taskId = taskData.getTaskId(); if (executor.getIsDebug()) { LOG.info("SPOUT Failing {} : {} REASON: {}", tupleInfo.getId(), tupleInfo, reason); } spout.fail(tupleInfo.getMessageId()); new SpoutFailInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext()); if (timeDelta != null) { executor.getStats().spoutFailedTuple(tupleInfo.getStream(), timeDelta, taskData.getTaskMetrics().getFailed(tupleInfo.getStream())); } } catch (Exception e) { throw Utils.wrapInRuntime(e); } }}复制代码
- SpoutExecutor在tupleActionFn里头,如果接收到ACKER_ACK_STREAM_ID,则进行ackSpoutMsg操作;如果接收到ACKER_FAIL_STREAM_ID,则进行failSpoutMsg操作
- SpoutExecutor的ackSpoutMsg及failSpoutMsg里头分别调用了具体spout的ack及fail方法,将ack的结果通知到原始的spout
小结
- storm通过ack机制保证least once processing的语义
- storm在WorkerState构造器里头调用了systemTopology方法,对提交的topology添加了一些系统的组件,比如Acker、MetricsConsumerBolt、SystemBolt;addAcker里头添加了acker,也对spout进行了ack相关的配置
- spout的emit方法如果带messageId的话,则表示需要ack,然后会触发taskData.sendUnanchored(Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer(), executor.getPendingEmits())操作
- bolt通过BoltOutputCollectorImpl的ack或fail方法将ack信息发送出去,里头调用了task.sendUnanchored操作,而该操作是调用ExecutorTransfer.tryTransfer,将addressedTuple发送到目标队列(
如果是远程node则会远程进行远程调用
),发送到的stream为Acker.ACKER_ACK_STREAM_ID或者Acker.ACKER_FAIL_STREAM_ID - acker接收到Acker.ACKER_ACK_STREAM_ID调用AckObject的updateAck操作,对于Acker.ACKER_FAIL_STREAM_ID则重新放回pending中,然后对AckObject的val进行判断,如果为0的话,表示整个tuple tree都操作成功,则emitDirect往ACKER_ACK_STREAM_ID通知;如果是failed的则emitDirect往ACKER_FAIL_STREAM_ID通知对应的task;如果是resetTimeout的则往ACKER_RESET_TIMEOUT_STREAM_ID通知对应的task
- SpoutExecutor接收到接收到ACKER_ACK_STREAM_ID,则进行ackSpoutMsg操作;接收到ACKER_FAIL_STREAM_ID,则进行failSpoutMsg操作;ackSpoutMsg及failSpoutMsg里头分别调用了具体spout的ack及fail方法,将ack的结果通知到原始的spout