實時海量日誌分析系統的架構設計、實現以及思考

2021-02-21 架構文摘

點擊藍色「架構文摘」關注我喲

加個「星標」,每天上午 09:25,乾貨推送!

作者: 天府雲創  

CSDN: https://blog.csdn.net/enweitech/article/details/73332630

1 序

對ETL系統中數據轉換和存儲操作的相關日誌進行記錄以及實時分析有助於我們更好的觀察和監控ETL系統的相關指標(如單位時間某些操作的處理時間),發現系統中出現的缺陷和性能瓶頸。

由於需要對日誌進行實時分析,所以Storm是我們想到的首個框架。Storm是一個分布式實時計算系統,它可以很好的處理流式數據。利用storm我們幾乎可以直接實現一個日誌分析系統,但是將日誌分析系統進行模塊化設計可以收到更好的效果。模塊化的設計至少有兩方面的優點:

模塊化設計可以使功能更加清晰。整個日誌分析系統可以分為「數據採集-數據緩衝-數據處理-數據存儲」四個步驟。Apache項目下的flumeng框架可以很好的從多源目標收集數據,所以我們用它來從ETL系統中收集日誌信息;由於採集數據與處理數據的速度可能會出現不一致,所以我們需要一個消息中間件來作為緩衝,kafka是一個極好的選擇;然後對流式數據的處理,我們將選擇大名鼎鼎的storm了,同時為了更好的對數據進行處理,我們把drools與storm進行了整合,分離出了數據處理規則,這樣更有利於管理規則;最後,我們選擇redis作為我們處理數據的存儲工具,redis是一個內存資料庫,可以基於健值進行快速的存取。

模塊化設計之後,storm和前兩個步驟之間就獲得了很好的解耦,storm集群如果出現問題,數據採集以及數據緩衝的操作還可以繼續運行,數據不會丟失。

2 相關框架的介紹和安裝2.1.1 原理介紹

Flume是一個高可用、高可靠、分布式的海量日誌採集、聚合和傳輸系統。Flume支持在日誌系統中定製日誌發送方,用於收集數據;同時,Flume提供對數據進行簡單處理,並寫到各種數據接收方的能力。它擁有一個簡單的、可擴展的流式數據流架構,如下圖所示:

日誌收集系統就是由一個或者多個agent(代理)組成,每個agent由source、channel、sink三部分組成,source是數據的來源,channel是數據進行傳輸的通道,sink用於將數據傳輸到指定的地方。我們可以把agent看做一段水管,source是水管的入口,sink是水管的出口,數據流就是水流。Agent本質上是一個jvm進程,agent各個組件之間是通過event來進行觸發和協調的。

2.1.2 flumeng的安裝
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source配置信息
#r1的type為avro表示該source接收的數據協議為avro,且接收數據由avro客戶端事件驅動
#(也就是說resource要通過avro-cliet向其發送數據)
a1.sources.r1.type = avro
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#sink配置信息
# type為logger意將數據輸出至日誌中(也就是列印在屏幕上)
a1.sinks.k1.type = logger
#channel配置信息
#type為memory意將數據存儲至內存中
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#將source和sink綁定至該channel上
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
該配置文件,配置了一個source為avro的伺服器端用於日誌的收集。具體的情況將在後面ETL系統與flume整合中介紹。

2.2 kafka2.2.1 原理介紹

Kafka是linkedin用於日誌處理的分布式消息隊列。Kafka的架構如下圖所示:

Kafka的存儲策略有一下幾點:

kafka以topic來進行消息管理,每個topic包括多個partition,每個partition包括一個邏輯log,由多個segment組成。

每個segment中存儲多條消息,消息id由其邏輯位置決定,即從消息id可直接定位到消息的存儲位置,避免id到位置的額外映射。

每個partition在內存中對應一個index,記錄每個segment中的第一條消息的偏移。

發布者發到某個topic的消息會被均勻的分布到多個partition上(隨機或根據用戶指定的回調函數進行分布),broker收到發布消息往對應partition的最後一個segment上添加該消息,當某個segment上的消息條數達到配置值或消息發布時間超過閾值時,segment上的消息會被flush到磁碟,只有flush到磁碟上的消息訂閱者才能訂閱到,segment達到一定的大小後將不會再往該segment寫數據,broker會創建新的segment。

2.2.2 kafka集群的搭建

Kafka集群的搭建需要依賴zookeeper來進行負載均衡,所以我們需要在安裝kafka之前搭建zookeeper集群。

zookeeper集群的搭建,本系統用到了兩臺機器。具體搭建過程見http://blog.csdn.net/itleochen/article/details/17453881

分別下載kafka_2.9.2-0.8.1的安裝包到兩臺機器,並解壓該安裝包。

打開conf/server.properties文件,修改配置項broker.id、zookeeper.connect、partitions以及host.name為相應的值。

分別啟動kafka即完成了集群的搭建。

2.3 storm2.3.1 原理介紹

Storm是一個分布式的、高容錯的實時計算系統。Storm對於實時計算的的意義相當於Hadoop對於批處理的意義。hadoop為我們提供了Map和Reduce原語,使我們對數據進行批處理變的非常的簡單和優美。同樣,Storm也對數據的實時計算提供了簡單Spout和Bolt原語。

Strom集群裡面有兩種節點,控制節點和工作節點,控制節點上面運行一個nimbus(類似於hadoop中的JobTracker)後臺程序,Nimbus負責在集群裡面分布代碼,分配工作給機器, 並且監控狀態。每一個工作節點上面運行一個叫做Supervisor(類似Hadoop中的TaskTracker)的節點。Supervisor會監聽分配給它那臺機器的工作,根據需要啟動/關閉工作進程。每一個工作進程執行一個Topology(類似hadoop中的Job)的一個子集;一個運行的Topology由運行在很多機器上的很多工作進程 Worker(類似Hadoop中的Child)組成。結構如下圖所示:

Stream是storm裡面的關鍵抽象。一個stream是一個沒有邊界的tuple序列。storm提供一些原語來分布式地、可靠地把一個stream傳輸進一個新的stream。比如:你可以把一個tweets流傳輸到熱門話題的流。

storm提供的最基本的處理stream的原語是spout和bolt。你可以實現Spout和Bolt對應的接口以處理你的應用的邏輯。

Spout是流的源頭。比如一個spout可能從Kestrel隊列裡面讀取消息並且把這些消息發射成一個流。通常Spout會從外部數據源(隊列、資料庫等)讀取數據,然後封裝成Tuple形式,之後發送到Stream中。Spout是一個主動的角色,在接口內部有個nextTuple函數,Storm框架會不停的調用該函數。

Bolt可以接收任意多個輸入stream。Bolt處理輸入的Stream,並產生新的輸出Stream。Bolt可以執行過濾、函數操作、Join、操作資料庫等任何操作。Bolt是一個被動的角色,其接口中有一個execute(Tuple input)方法,在接收到消息之後會調用此函數,用戶可以在此方法中執行自己的處理邏輯。

spout和bolt所組成一個網絡會被打包成topology, topology是storm裡面最高一級的抽象(類似 Job), 你可以把topology提交給storm的集群來運行。Topology的結構如下圖所示:

2.3.2 storm集群的搭建

Storm集群的搭建也要依賴於zookeeper,本系統中storm與kafka共用同樣一個zookeeper集群。


storm.zookeeper.servers: //zookeeper集群


-「10.200.187.71″


-「10.200.187.73″


storm.local.dir:「/usr/endy/fks/storm-workdir「


storm.messaging.transport:「backtype.storm.messaging.netty.Context」


storm.messaging.netty.server_worker_threads:1


storm.messaging.netty.client_worker_threads:1


storm.messaging.netty.buffer_size:5242880


storm.messaging.netty.max_retries:100


storm.messaging.netty.max_wait_ms:1000


storm.messaging.netty.min_wait_ms:100

注意:在每個配置項前面必須留有空格,否則會無法識別。storm.messaging.* 部分是Netty的配置。如果沒有該部分。那麼Storm默認還是使用ZeroMQ。

storm.zookeeper.servers:
- 「10.200.187.71″
- 「10.200.187.73″
nimbus.host: 「10.200.187.71″
supervisor.slots.ports:
- 6700
- 6701
- 6702
storm.local.dir: 「/usr/endy/fks/storm-workdir」
storm.messaging.transport: 「backtype.storm.messaging.netty.Context」
storm.messaging.netty.server_worker_threads: 1
storm.messaging.netty.client_worker_threads: 1
storm.messaging.netty.buffer_size: 5242880
storm.messaging.netty.max_retries: 100
storm.messaging.netty.max_wait_ms: 1000
storm.messaging.netty.min_wait_ms: 100

注意

nimbus.host是nimbus的IP或hostname

supervisor.slots.ports 是配置slot的ip地址。配了幾個地址,就有幾個slot,即幾個worker。如果嘗試提交的topology所聲明的worker數超過當前可用的slot,該topology提交會失敗。

storm.messaging 部分是Netty的配置。

2.4 drools

Drools是一個基於Java的、開源的規則引擎,可以將複雜多變的規則從硬編碼中解放出來,以規則腳本的形式存放在文件中,使得規則的變更不需要修正代碼重啟機器就可以立即在線上環境生效。日誌分析系統中,drools的作用是利用不同的規則對日誌信息進行處理,以獲得我們想要的數據。但是,Drools本身不是一個分布式框架,所以規則引擎對log的處理無法做到分布式。我們的策略是將drools整合到storm的bolt中去,這就就解決了drools無法分布式的問題。這是因為bolt可以作為task分發給多個worker來處理,這樣drools中的規則也自然被多個worker處理了。

2.5 redis

Redis是key-value存儲系統,它支持較為豐富的數據結構,有String,list,set,hash以及zset。與memcached一樣,為了保證效率,數據都是緩存在內存中。區別的是redis會周期性的把更新的數據寫入磁碟或者把修改操作寫入追加的記錄文件,並且在此基礎上實現了master-slave(主從)同步。Redis是內存資料庫,所以有非常快速的存取效率。日誌分析系統數據量並不是特別大,但是對存取的速度要求較高,所以選擇redis有很大的優勢。

3 各個框架的整合3.1 ETL系統整合flumeng

Flume如何收集ETL系統中的日誌是我需要考慮的第一個問題。log4j2提供了專門的Appender-FlumeAppender用於將log信息發送到flume系統,並不需要我們來實現。我們在log4j2的配置文件中配置了ETL系統將log信息發送到的目的地,即avro伺服器端。該伺服器端我們在flume的配置文件中進行了配置。配置信息如下所示:

 producer.sources=s


producer.channels=c


producer.sinks=r


producer.sources.s.type=avro


producer.sources.s.channels=c


producer.sources.s.bind=10.200.187.71


producer.sources.s.port=4141

3.2 flumeng與kafka的整合

我們從ETL系統中獲得了日誌信息,將該信息不作任何處理傳遞到sink端,sink端發送數據到kafka。這個發送過程需要我們編寫代碼來實現,我們的實現代碼為KafkaSink類。主要代碼如下所示:

public class KafkaSink extends AbstractSink implements Configurable {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSink.class);
private Properties parameters;
private Producer<String, String> producer;
private Context context;
@Override
public void configure(Context context) {
this.context = context;
ImmutableMap<String, String> props = context.getParameters();

parameters = new Properties();
for (String key : props.keySet()) {
String value = props.get(key);
this.parameters.put(key, value);
}
}
@Override
public synchronized void start() {
super.start();
ProducerConfig config = new ProducerConfig(this.parameters);
this.producer = new Producer<String, String>(config);
}
@Override
public Status process() throws EventDeliveryException {
Status status = null;
// Start transaction
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
// This try clause includes whatever Channel operations you want to do
Event event = ch.take();
String partitionKey = (String) parameters.get(KafkaFlumeConstans.PARTITION_KEY_NAME);
String encoding = StringUtils.defaultIfEmpty(
(String) this.parameters.get(KafkaFlumeConstans.ENCODING_KEY_NAME),
KafkaFlumeConstans.DEFAULT_ENCODING);
String topic = Preconditions.checkNotNull(
(String) this.parameters.get(KafkaFlumeConstans.CUSTOME_TOPIC_KEY_NAME),
「custom.topic.name is required」);
String eventData = new String(event.getBody(), encoding);
KeyedMessage<String, String> data;
// if partition key does』nt exist
if (StringUtils.isEmpty(partitionKey)) {
data = new KeyedMessage<String, String>(topic, eventData);
} else {
data = new KeyedMessage<String, String>(topic, String.valueOf(new Random().nextInt(Integer.parseInt(partitionKey))), eventData);
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info(「Send Message to Kafka : [" + eventData + "] — [" + EventHelper.dumpEvent(event) + "]「);
}
producer.send(data);
txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
status = Status.BACKOFF;
// re-throw all Errors
if (t instanceof Error) {
throw (Error) t;
}
} finally {
txn.close();
}
return status;
}
@Override
public void stop() {
producer.close();
}
}

該類中,我們讀取了一些配置信息,這些配置信息我們在flumeng的flume-conf.properties文件中進行了定義,定義內容如下:

 producer.sinks.r.type=org.apache.flume.plugins.KafkaSink


producer.sinks.r.metadata.broker.list=10.200.187.71:9092


producer.sinks.r.partition.key=0


producer.sinks.r.serializer.class=kafka.serializer.StringEncoder


producer.sinks.r.request.required.acks=0


producer.sinks.r.max.message.size=1000000


producer.sinks.r.producer.type=sync


producer.sinks.r.custom.encoding=UTF-8


producer.sinks.r.custom.topic.name=fks1


producer.sinks.r.channel=c


producer.channels.c.type=memory


producer.channels.c.capacity=1000


將上面的KafkaSink類打包成flumeng-kafka.jar,並將該jar包以及kafka_2.9.2-0.8.1.jar、metrics-annotation-2.2.0.jar、metrics-core-2.2.0.jar、Scala-compiler.jar、scala-library.jar、zkclient-0.3.jar放到flume的lib目錄下,啟動flume,我們就可以將ETL系統中產生的日誌信息發送到kafka中的fks1這個topic中去了。

3.3 kafka與storm的整合

Storm中的spout如何主動消費kafka中的消息需要我們編寫代碼來實現,httpsgithub.comwurstmeisterstorm-kafka-0.8-plus實現了一個kafka與storm整合的插件,下載該插件,將插件中的jar包以及metrics-core-2.2.0.jar、scala-compiler2.9.2.jar放到storm的lib目錄下。利用插件中的StormSpout類,我們就可以消費kafka中的消息了。主要代碼如下所示:

 public class KafkaSpout extends BaseRichSpout {
public static class MessageAndRealOffset {
public Message msg;
public long offset;

public MessageAndRealOffset(Message msg, long offset) {
this.msg = msg;
this.offset = offset;
}
}
static enum EmitState {
EMITTED_MORE_LEFT,
EMITTED_END,
NO_EMITTED
}
public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
String _uuid = UUID.randomUUID().toString();
SpoutConfig _spoutConfig;
SpoutOutputCollector _collector;
PartitionCoordinator _coordinator;
DynamicPartitionConnections _connections;
ZkState _state;
long _lastUpdateMs = 0;
int _currPartitionIndex = 0;
public KafkaSpout(SpoutConfig spoutConf) {
_spoutConfig = spoutConf;
}
@Override
public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
_collector = collector;

Map stateConf = new HashMap(conf);
List zkServers = _spoutConfig.zkServers;
if (zkServers == null) {
zkServers = (List) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
}
Integer zkPort = _spoutConfig.zkPort;
if (zkPort == null) {
zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
}
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
_state = new ZkState(stateConf);
_connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));
// using TransactionalState like this is a hack
int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
if (_spoutConfig.hosts instanceof StaticHosts) {
_coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
} else {
_coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
}
context.registerMetric(「kafkaOffset」, new IMetric() {
KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_spoutConfig.topic, _connections);
@Override
public Object getValueAndReset() {
List pms = _coordinator.getMyManagedPartitions();
Set latestPartitions = new HashSet();
for (PartitionManager pm : pms) {
latestPartitions.add(pm.getPartition());
}
_kafkaOffsetMetric.refreshPartitions(latestPartitions);
for (PartitionManager pm : pms) {
_kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset());
}
return _kafkaOffsetMetric.getValueAndReset();
}
}, 60);
context.registerMetric(「kafkaPartition」, new IMetric() {
@Override
public Object getValueAndReset() {
List pms = _coordinator.getMyManagedPartitions();
Map concatMetricsDataMaps = new HashMap();
for (PartitionManager pm : pms) {
concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
}
return concatMetricsDataMaps;
}
}, 60);
}
@Override
public void close() {
_state.close();
}
@Override
public void nextTuple() {
List managers = _coordinator.getMyManagedPartitions();
for (int i = 0; i < managers.size(); i++) { // in case the number of managers decreased _currPartitionIndex = _currPartitionIndex % managers.size(); EmitState state = managers.get(_currPartitionIndex).next(_collector); if (state != EmitState.EMITTED_MORE_LEFT) { _currPartitionIndex = (_currPartitionIndex + 1) % managers.size(); } if (state != EmitState.NO_EMITTED) { break; } } long now = System.currentTimeMillis(); if ((now – _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
commit();
}
}
@Override
public void ack(Object msgId) {
KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {
m.ack(id.offset);
}
}
@Override
public void fail(Object msgId) {
KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
if (m != null) {
m.fail(id.offset);
}
}
@Override
public void deactivate() {
commit();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(_spoutConfig.scheme.getOutputFields());
}

private void commit() {
_lastUpdateMs = System.currentTimeMillis();
for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
manager.commit();
}
}

}


3.4 Storm中bolt與drSools的整合

storm中bolt與drools的整合 Drools可以將storm中處理數據的規則提取到一個drl文件中,該文件就成了唯一處理規則的文件。任何時候規則出現變化,我們只需要修改該drl文件,而不會改變其它的代碼。Bolt與drools的整合代碼如下所示:

publicclassLogRulesBoltimplementsIBasicBolt{


Loggerlogger=LoggerFactory.getLogger(LogRulesBolt.class);


privatestaticfinallongserialVersionUID=1L;


publicstaticfinalStringLOG_ENTRY=「str」;


privateStatelessKnowledgeSessionksession;


privateStringdrlFile;


publicLogRulesBolt()


{}





publicLogRulesBolt(StringdrlFile)


{


this.drlFile=drlFile;


}


@Override


publicvoidprepare(MapstormConf,TopologyContextcontext){


KnowledgeBuilderkbuilder=KnowledgeBuilderFactory.newKnowledgeBuilder();


try{


kbuilder.add(ResourceFactory.newInputStreamResource(newFileInputStream(newFile(drlFile))),ResourceType.DRL);


}catch(FileNotFoundExceptione){


logger.error(e.getMessage());


}


KnowledgeBasekbase=KnowledgeBaseFactory.newKnowledgeBase();


kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());


ksession=kbase.newStatelessKnowledgeSession();


}


@Override


publicvoidexecute(Tupleinput,BasicOutputCollectorcollector){


StringlogContent=(String)input.getValueByField(LOG_ENTRY);


logContent=logContent.trim();


if(!」」.equals(logContent)&&logContent!=null)


{


LogEntryentry=newLogEntry(logContent);


try{


ksession.execute(entry);


}catch(Exceptione)


{


logger.error(「droolstohandlelog["+logContent+"]isfailure!」);


logger.error(e.getMessage());


}





collector.emit(newValues(entry));


}


else


{


logger.error(「logcontentisempty!」);


}


}


@Override


publicvoidcleanup(){


}


@Override


publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){


declarer.declare(newFields(LOG_ENTRY));


}


@Override


publicMap<String,Object>getComponentConfiguration(){


returnnull;


}


}


通過規則處理數據之後,我們就可以將處理過的數據發送到下一個bolt中,然後將數據存儲到redis中。

4 相關思考4.1 系統的優點

模塊化的設計,使功能分散到各個模塊中,對各個功能進行了解耦,使系統的容錯性更高。

kafka作為中間緩衝,解決了flume和storm速度不匹配的問題。

利用drools將規則和數據進行了解耦。把規則寫到一個配置文件中,避免了每次修改規則就要修改代碼的缺點。

storm和drools整合解決了drools的規則引擎無法並行化的問題。

redis是內存資料庫,可以很快速的寫數據到資料庫中,加快了整個系統的處理速度,避免了資料庫的瓶頸。

4.2 待考慮的問題

整個系統還沒有用大量數據進行測試,穩定性以及性能瓶頸需要進一步的考慮、發現和改進。

在現有的系統中,flume只能發送數據到kafka的單個broker的單個partition中,後期需要修改代碼以適應多個broker多個partition。這點是可以實現的,我已經實現了一部分。可以將數據發送到單個broker的多個partition中。

現有的系統,修改規則文件之後,需要重新啟動topology,無法進行熱加載。這點是需要進一步考慮的。

drools是一個優異的規則引擎。但是它的速度仍然讓我有點擔心。這個問題可能在以後數據變大之後會體現出來。我們思考了esper這個開源的規則引擎,它的速度更快,但是它類sql語言的規則處理語言不是太適合我們的日誌分析系統。以後是不是能夠作進一步的開發,用esper代替drools是我們要考慮的一個問題。

思考現有的架構,flume並不是缺一不可的模塊,我們可以在ETL系統中直接將log信息發送到kafka中,然後利用storm進行處理。但是為了整個系統的可擴展性(例如我們還想要將log信息發送到HDFS中,利用flume可以直接配置)和易配置性,利用flume會更好。是否要用flume,flume是否會影響整個系統的速度,需要以後進一步的論證。

flume、kafka、storm、redis的各個參數的取值對系統的影響也較大。所以這些參數需要在以後的應用中選定合適的值。

4.3 框架層面的思考

flume是純java實現的框架,比較有趣的是各種source接口(如avro source、thrift source)以及sink(HDFS sink、Logger sink)接口的實現。以後有興趣可以進一步閱讀原始碼。

kafka的思路很好,充分利用了磁碟順序寫入和順序讀取的路子,存儲的性能很好,只要幾個節點就能處理大量的消息了;另外,它突破了常規的一些消息中間件由服務端來記錄消息消費狀態的傳統,徹底由客戶端自己來記錄究竟處理到哪裡了,失敗也罷成功也罷,客戶端本來是最清楚的了,由它來記錄消費狀態是最適合不過了。Kafka中這種處理思路是我們值得學習的地方,我們也可以看代碼來體會這種設計。Kafka是由scala實現的,沒有scala基礎的可以先看看scala編程。

storm主要是用clojure、java來實現的,還包括部分的Python代碼。代碼量25000行左右。在它的原始碼中,用java實現框架結構,clojure實現功能細節。storm中的模擬本地集群的實現,保證消息只處理一次的功能的實現,都很巧妙,值得我們去看代碼,不管是現在用得到還是用不到。

redis是c實現的,速度很快,代碼量不大。

  如有收穫,點個在看,誠摯感謝

相關焦點

  • 百億級訪問量的實時監控系統如何實現?
    正如圖中所示,羅馬監控體系希望能夠汲取各方優秀的架構設計理念,融合不同的監控維度實現監控體系的「一體化」、「全鏈路」等。接下來,筆者帶大家從系統架構設計的角度逐一進行剖析。• 單元化部署:監控系統需要支撐單元化部署(支持多機房單元化部署)• 數據集中化:監控數據集中化處理、分析、存儲等(便於數據統計等)整體架構Roma系統架構如下圖所示:
  • 阿里雲實時大數據解決方案,助力企業實時分析與決策
    如何從這些海量的歷史數據和每日實時增量數據中快速匯總分析、挖掘出業務價值已成為業務最基本的需求。在這個過程中,很多公司推出了批處理、實時計算,但離線批量數倉和實時分析具有不可調和性,離線數倉滿足不了業務時效性的要求,而絕對的實時數倉也不切實際,「近實時」才有意義,而實時分析、近實時的分析構建離不開實時數倉系統的構建。
  • 基於大數據技術的安管平臺架構與設計
    大數據總體架構設計是基於信息安全驅動下的大數據分析和挖掘,涉及數據採集、集成、存儲、處理、分析、評估、預測等大數據全生命周期管理過程。而大數據相關技術應用需解決兩大基本問題,第一是數據的存儲問題,如何存儲龐大的數據量的問題;第二是數據的計算問題,如何處理分析海量的數據。因此,從生命周期和技術應用角度出發,把大數據安全分析平臺整體架構分為五大層級,分別為數據源層、數據採集層、數據存儲層、數據計算引擎、數據分析層和應用層。
  • FFT實時譜分析系統的FPGA設計和實現
    整個設計採用流水線工作方式,保證了系統的速度,避免了瓶頸的出現;整個系統採用FPGA實現,實驗表明,該系統既有DSP器件實現的靈活性又有專用 FFT晶片實現的高速數據吞吐能力,可以廣泛地應用於數位訊號處理的各個領域。
  • 數據戰爭——直面海量處理+實時分析的雙重挑戰
    從博客論壇到遊戲社區再到微博,從網際網路到移動網際網路再到物聯網,人類以及各類物理實體的實時聯網已經而且還將繼續產生難以估量的數據。對於時刻關注市場走向的企業來講,他們需要關注的數據顯然已經不僅限於企業內部資料庫中的業務數據,還要包括網際網路(以及未來的物聯網)上各類網絡活動所產生的相關數據記錄。顯然,大數據是一種創新,它在任何時候都知道你在哪裡。
  • 支付系統高可用架構設計實戰
    報警主要分為單機報警和集群報警,而宜信支付系統屬於集群部署。實時預警主要依靠各個業務系統實時埋點數據統計分析實現,因此難度主要在數據埋點和分析系統上。 3.2.2 埋點數據 要做到實時分析,又不影響交易系統的響應時間,宜信支付系統在各個模塊中通過redis實時做數據埋點,然後將埋點數據匯總到分析系統,分析系統根據規則進行分析報警。
  • 揭秘:騰訊阿里京東推薦系統架構如何設計?
    至此,在10月21日的搜索及推薦系統架構設計專場中,有五位演講嘉賓對他們熟悉的推薦系統架構技術進行了分享,他們分別是來自第四範式資深算法科學家程曉澄、騰訊音樂娛樂集團技術總監李深遠、58轉轉搜索推薦部負責人張相於、阿里巴巴高級技術專家鄧萬禧、京東集團架構師尹德位。《推薦系統架構演進的實戰分享》首先為大家介紹今天的第一位演講嘉賓來自第四範式資深算法科學家程曉澄。
  • Uber下一代支付平臺的系統架構設計
    為了實現這一點,我們將每筆交易更改都保存在實體更改日誌中,以便我們的系統通過實體更改日誌的每個用戶版本號對寫回進行序列化。我們使用包含版本號的欄位對舊系統中的每筆交易進行雙重寫入。這樣,即使同一作業進行了多個並發調整,寫回也不會出現混亂,並且最終結果始終是一致的。
  • 軟體項目實訓及課程設計指導——如何實現面向對象的系統架構設計
    軟體項目實訓及課程設計指導——如何實現面向對象的系統架構設計1、什麼是面向對象的軟體應用系統的架構設計從軟體應用系統的架構設計師的角度來看,所謂的軟體應用系統的系統架構就是一套構建軟體應用系統的整體結構的各種設計準則
  • 管理員必備:大數據日誌分析常用工具_ThinkServer伺服器_伺服器...
    當我們環顧數據中心的時候,你很難忽略所有基礎架構所產生的大數據的潛力價值。有伺服器和應用程式的日誌,有網絡和存儲產生的傳輸日誌,以及資料庫和應用程式產生的元數據,等等,可以說數據中心每天每時每刻都在產生龐大的數據。
  • 底層基於Apache Hudi的DLA最佳實踐 | 海量、低成本日誌分析
    背景信息日誌作為一種特殊的數據,對處理歷史數據、診斷問題以及了解系統活動等有著非常重要的作用。對數據分析人員、開發人員或者運維人員而言,日誌都是其工作過程中必不可缺的數據來源。通常情況下,為節約成本,我們會將日誌設定一定的保存時間,只分析該時間段內的日誌,此類日誌稱之為「熱」日誌。這種做法,短期內可以滿足使用需求,但從長期來看,大量的歷史日誌被擱置,無法發揮其價值。
  • 架構師之路--談架構師的基本素養和日誌處理
    提起門面模式又想起來一個典型代表:spring mvc裡的上下文,不管用的具體實現是哪一個,都走統一的接口:ApplicationContext。實現門面模式的技術叫動態綁定,是Java多態的一個體現。  slf4j剛開始使用的時候遇到了很多問題,因為大家都在使用自己的日誌系統和通用接口。
  • 納秒級高性能日誌系統 · ATC 2018
    NanoLog 的設計與實現原理。日誌是系統可觀測性的重要一環,相信很多工程師都有線上出問題臨時加日誌查問題的經歷,作者剛剛又重新經歷了這一過程,稍有經驗的開發者都會在系統中加入很多日誌方便生產環境的問題排查,更有經驗的開發者會謹慎地在系統中(尤其是低延遲的實時系統)添加日誌,因為列印日誌這件看起來簡單的事情實際上會帶來很大的額外開銷。
  • 軟體項目實訓及課程設計指導——系統設計中的系統架構設計示例
    而且有許多軟體企業為了使得自己的應用系統能夠更廣泛地滿足不同應用平臺下的用戶需求,往往會對同一個軟體系統提供多個平臺的版本,如C/S版、B/S版以及移動App版(包括平板電腦版等)。因此,軟體系統的設計人員需要找出影響軟體系統架構選擇的決定因素有哪些、併合理地進行權衡——軟體系統的設計應該是理性地「思考」和「選擇」的最終結果——「沒有最好、只有最合適」。
  • 萬億數據下的多維實時分析系統,如何做到亞秒級響應
    導語當業務發展到一定規模,實時數據倉庫是一個必要的基礎服務。從數據驅動方面考慮,多維實時數據分析系統的重要性也不言而喻。但是當數據量巨大的情況下,拿騰訊看點來說,一天上報的數據量達到萬億級的規模,要實現極低延遲的實時計算和亞秒級的多維實時查詢是有技術挑戰的。
  • 浪擎鏡像系統 SQLServer資料庫實時備份技術
    DoSTOR存儲在線 浪擎鏡像系統是業界成熟的應用系統實時備份容災解決方案。1。前言浪擎SQLServer鏡像通過資料庫邏輯層的複製技術,可以方便地實現SQLServer資料庫的實時備份容災,嚴格保障事務的一致性和完整性,在實時備份、本地和異地容災領域具有非常大的優勢。
  • 淺談IT運維分析.
    作為業務系統運行的應用日誌,交易流水能夠非常準確的、近乎實時的體現應用系統的運行狀態,通過對於採集的海量信息的實時處理,使對於支撐核心業務的IT應用系統的運行狀態了如指掌,精確的交易管理成為可能。Why now?大家可能會不約而同的問一個問題:「這麼好的解決方案為什麼十幾年前,沒有人能想的到?」
  • 日誌易:日誌大數據助力中國平安旗下iTutorGroup提升在線課堂體驗
    為服務如此龐大的全球客戶群,iTutorGroup採用自建IDC+公有雲的網絡建設方案,其網絡專線已遍布世界各地,面對跨地域、跨雲平臺的龐雜網絡體系,用戶急需一套平臺,實現對系統、網絡狀態和帶寬使用情況的實時監控與分析,日誌易智能日誌中心方案可以統一收集分散的系統、網絡日誌,提供搜索處理語言SPL(SearchProcessingLanguage)方便日誌查詢及統計,通過大屏Galaxee展示統計分析結果
  • 使用Storm實現實時大數據分析!
    簡單和明了,Storm讓大數據分析變得輕鬆加愉快。當今世界,公司的日常運營經常會生成TB級別的數據。數據來源囊括了網際網路裝置可以捕獲的任何類型數據,網站、社交媒體、交易型商業數據以及其它商業環境中創建的數據。考慮到數據的生成量,實時處理成為了許多機構需要面對的首要挑戰。
  • TUP第19期綜述:從12306看海量並發網站架構(含PPT下載)
    (CSDN當時上線的兩個專題 【從鐵路訂票系統看高並發網站技術解決之道】、【如果由我來設計12306.cn】也都收到了大量的反饋和回復)。大數據、海量用戶的網際網路服務能力、大型網站面對高負載和並發的解決之道,一直以來都是全世界公認的技術難題。